Belle II Software development
SeqRootOutputModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/modules/rootio/SeqRootOutputModule.h>
10#include <framework/datastore/DataStore.h>
11#include <framework/core/Environment.h>
12
13#include <cmath>
14
15#include <TClass.h>
16#include <TList.h>
17#include <TVirtualStreamerInfo.h>
18
19using namespace std;
20using namespace Belle2;
21
22//-----------------------------------------------------------------
23// Register the Module
24//-----------------------------------------------------------------
25REG_MODULE(SeqRootOutput);
26
27//-----------------------------------------------------------------
28// Implementation
29//-----------------------------------------------------------------
30
31SeqRootOutputModule::SeqRootOutputModule() : Module(), m_nevt(0), m_streamer(nullptr), m_size(0), m_size2(0)
32{
33 //Set module properties
34 setDescription("Save a sequential ROOT file (non-standard I/O format used in DAQ). See https://confluence.desy.de/display/BI/Software+PersistencyModules for further information and a comparison with the .root format.");
35 m_file = nullptr;
36 m_msghandler = nullptr;
37 m_streamerinfo = nullptr;
39
40 vector<string> emptyvector;
41 //Parameter definition
42 addParam("outputFileName", m_outputFileName,
43 "Output file name. Add a .gz suffix to save a gzip-compressed file. Parameter can be overridden using the -o argument to basf2.",
44 string("SeqRootOutput.sroot"));
45 addParam("compressionLevel", m_compressionLevel,
46 "Compression Level: 0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by 50%, higher levels have no noticable effect. NOTE: Because of a ROOT bug ( https://sft.its.cern.ch/jira/browse/ROOT-4550 ), this option currently causes memory leaks and is disabled.",
47 0);
48 addParam("saveObjs", m_saveObjs, "List of objects/arrays to be saved", emptyvector);
49 addParam("fileNameIsPattern", m_fileNameIsPattern, "If true interpret the output filename as a boost::format pattern "
50 "instead of the standard where subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'", false);
51}
52
53
54SeqRootOutputModule::~SeqRootOutputModule()
55{
56 if (m_streamerinfo != nullptr) delete m_streamerinfo;
57}
58
60{
61 const std::string& outputFileArgument = Environment::Instance().getOutputFileOverride();
62 if (!outputFileArgument.empty())
63 m_outputFileName = outputFileArgument;
64
65 // Open output file
66
67
68 // Message handler to encode serialized object
70
71 // DataStoreStreamer
74
75 //Write StreamerInfo at the beginning of a file
77
79
80 B2INFO("SeqRootOutput: initialized.");
81}
82
83
85{
86
87 // Statistics
88 gettimeofday(&m_t0, nullptr);
89 m_size = 0.0;
90 m_size2 = 0.0;
91 m_nevt = 0;
92
93 B2INFO("SeqRootOutput: beginRun called.");
94}
95
97{
98 // Stream DataStore in EvtMessage
100
101 // Store EvtMessage
102 int stat = m_file->write(msg->buffer());
103
104 // Clean up EvtMessage
105 delete msg;
106
107 // Statistics
108 double dsize = (double)stat / 1000.0;
109 m_size += dsize;
110 m_size2 += dsize * dsize;
111 m_nevt++;
112}
113
115{
116 //fill Run data
117
118 // End time
119 gettimeofday(&m_tend, nullptr);
120 auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
121 (m_tend.tv_usec - m_t0.tv_usec));
122
123 // Statistics
124 // Sigma^2 = Sum(X^2)/n - (Sum(X)/n)^2
125
126 double flowmb = m_size / etime * 1000.0;
127 double avesize = m_size / (double)m_nevt;
128 double avesize2 = m_size2 / (double)m_nevt;
129 double sigma2 = avesize2 - avesize * avesize;
130 double sigma = sqrt(sigma2);
131
132 B2INFO("SeqRootOutput : " << m_nevt << " events written with total bytes of " << m_size << " kB");
133 B2INFO("SeqRootOutput : flow rate = " << flowmb << " (MB/s)");
134 B2INFO("SeqRootOutput : event size = " << avesize << " +- " << sigma << " (kB)");
135
136 B2INFO("SeqRootOutput: endRun done.");
137}
138
139
141{
142 delete m_msghandler;
143 delete m_streamer;
144 delete m_file;
145
146 B2INFO("terminate called");
147}
148
149
151{
152 //
153 // Write StreamerInfo to a file
154 // Copy from RootOutputModule::initialize() and TSocket::SendStreamerInfos()
155 //
156
157 if (!m_msghandler) {
158 B2FATAL("DataStoreStreamer : m_msghandler is NULL.");
159 return;
160 }
161
162 TList* minilist = nullptr ;
163 for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
165
166 for (auto& iter : map) {
167 const TClass* entryClass = iter.second.objClass;
168 TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
169 B2INFO("Recording StreamerInfo : durability " << durability << " : Class Name " << entryClass->GetName());
170 if (!minilist) minilist = new TList();
171 minilist->Add(vinfo);
172 }
173 }
174
175 if (minilist) {
176 // TMessage messinfo(kMESS_STREAMERINFO);
177 // messinfo.WriteObject(minilist);
178 m_msghandler->add(minilist, "StreamerInfo");
179 // EvtMessage* msg = m_msghandler->encode_msg(MSG_EVENT);
180 EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
181 (msg->header())->nObjects = 1; // No. of objects
182 (msg->header())->nArrays = 0; // No. of arrays
183
184 // int size = m_file->write(msg->buffer());
185 m_streamerinfo_size = *((int*)(msg->buffer())); // nbytes in the buffer at the beginning
186
187 //copy the steamerINfo for later use
188 if (m_streamerinfo_size > 0) {
189 B2INFO("Get StreamerInfo from DataStore : " << m_streamerinfo_size << "bytes");
190 if (m_streamerinfo != nullptr) {
191 B2FATAL("getStreamerInfo() is called twice in the same run ");
192 } else {
194 }
196 } else {
197 B2FATAL("Invalid size of StreamerInfo : " << m_streamerinfo_size << "bytes");
198 }
199 delete minilist;
200 delete msg;
201 } else {
202 B2FATAL("Failed to get StreamerInfo : ");
203 }
204
205 return;
206}
Stream/restore DataStore objects to/from EvtMessage.
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
Definition: DataStore.h:325
static const int c_NDurabilityTypes
Number of Durability Types.
Definition: DataStore.h:63
EDurability
Durability types.
Definition: DataStore.h:58
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
const std::string & getOutputFileOverride() const
Return overriden output file name, or "" if none was set.
Definition: Environment.h:127
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
int write(const char *buf)
Write a record to a file.
Definition: SeqFile.cc:140
int m_nevt
Total nr. of events in the file.
DataStoreStreamer * m_streamer
DataStoreStreamer.
virtual void initialize() override
Module functions to be called from main process.
virtual void event() override
This method is the core of the module.
struct timeval m_tend
time at end of current run.
virtual void endRun() override
This method is called if the current run ends.
virtual void terminate() override
This method is called at the end of the event processing.
bool m_fileNameIsPattern
If true the output filename will be interpreted as a boost::format pattern.
int m_streamerinfo_size
The size of the StreamerInfo.
SeqFile * m_file
Blocked file handler.
MsgHandler * m_msghandler
Messaage handler.
virtual void beginRun() override
Module functions to be called from event process.
std::vector< std::string > m_saveObjs
List of objects to be saved.
int m_compressionLevel
Compression level.
SeqRootOutputModule()
Constructor / Destructor.
double m_size
total transferred data, in kB.
void getStreamerInfos()
! Write StreamerInfos to a file
char * m_streamerinfo
StreamerInfo to be written.
std::string m_outputFileName
File name.
double m_size2
sum of squares of data transferred in each event, in kB^2.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
double sqrt(double a)
sqrt for double
Definition: beamHelpers.h:28
Abstract base class for different kinds of events.
STL namespace.