Belle II Software  release-05-02-19
SeqRootOutputModule.cc
1 //+
2 // Author : Ryosuke Itoh, IPNS, KEK
3 // Date : 13 - Aug - 2010
4 // 6 - Sep - 2012, Use of DataStoreStreamer, clean up
5 //-
6 
7 #include <framework/modules/rootio/SeqRootOutputModule.h>
8 #include <framework/datastore/DataStore.h>
9 #include <framework/core/Environment.h>
10 
11 #include <cmath>
12 
13 #include <TClass.h>
14 #include <TList.h>
15 #include <TVirtualStreamerInfo.h>
16 
17 using namespace std;
18 using namespace Belle2;
19 
20 //-----------------------------------------------------------------
21 // Register the Module
22 //-----------------------------------------------------------------
23 REG_MODULE(SeqRootOutput)
24 
25 //-----------------------------------------------------------------
26 // Implementation
27 //-----------------------------------------------------------------
28 
29 SeqRootOutputModule::SeqRootOutputModule() : Module(), m_nevt(0), m_streamer(nullptr), m_size(0), m_size2(0)
30 {
31  //Set module properties
32  setDescription("Save a sequential ROOT file (non-standard I/O format used in DAQ). See https://confluence.desy.de/display/BI/Software+PersistencyModules for further information and a comparison with the .root format.");
33  m_file = nullptr;
34  m_msghandler = nullptr;
35  m_streamerinfo = nullptr;
36  m_streamerinfo_size = 0;
37 
38  vector<string> emptyvector;
39  //Parameter definition
40  addParam("outputFileName" , m_outputFileName,
41  "Output file name. Add a .gz suffix to save a gzip-compressed file. Parameter can be overridden using the -o argument to basf2.",
42  string("SeqRootOutput.sroot"));
43  addParam("compressionLevel", m_compressionLevel,
44  "Compression Level: 0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by 50%, higher levels have no noticable effect. NOTE: Because of a ROOT bug ( https://sft.its.cern.ch/jira/browse/ROOT-4550 ), this option currently causes memory leaks and is disabled.",
45  0);
46  addParam("saveObjs", m_saveObjs, "List of objects/arrays to be saved", emptyvector);
47  addParam("fileNameIsPattern", m_fileNameIsPattern, "If true interpret the output filename as a boost::format pattern "
48  "instead of the standard where subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'", false);
49 }
50 
51 
52 SeqRootOutputModule::~SeqRootOutputModule()
53 {
54  if (m_streamerinfo != nullptr) delete m_streamerinfo;
55 }
56 
57 void SeqRootOutputModule::initialize()
58 {
59  const std::string& outputFileArgument = Environment::Instance().getOutputFileOverride();
60  if (!outputFileArgument.empty())
61  m_outputFileName = outputFileArgument;
62 
63  // Open output file
64 
65 
66  // Message handler to encode serialized object
67  m_msghandler = new MsgHandler(m_compressionLevel);
68 
69  // DataStoreStreamer
70  m_streamer = new DataStoreStreamer(m_compressionLevel);
71  m_streamer->setStreamingObjects(m_saveObjs);
72 
73  //Write StreamerInfo at the beginning of a file
74  getStreamerInfos();
75 
76  m_file = new SeqFile(m_outputFileName.c_str(), "w", m_streamerinfo, m_streamerinfo_size, m_fileNameIsPattern);
77 
78  B2INFO("SeqRootOutput: initialized.");
79 }
80 
81 
82 void SeqRootOutputModule::beginRun()
83 {
84 
85  // Statistics
86  gettimeofday(&m_t0, nullptr);
87  m_size = 0.0;
88  m_size2 = 0.0;
89  m_nevt = 0;
90 
91  B2INFO("SeqRootOutput: beginRun called.");
92 }
93 
94 void SeqRootOutputModule::event()
95 {
96  // Stream DataStore in EvtMessage
97  EvtMessage* msg = m_streamer->streamDataStore(false);
98 
99  // Store EvtMessage
100  int stat = m_file->write(msg->buffer());
101 
102  // Clean up EvtMessage
103  delete msg;
104 
105  // Statistics
106  double dsize = (double)stat / 1000.0;
107  m_size += dsize;
108  m_size2 += dsize * dsize;
109  m_nevt++;
110 }
111 
112 void SeqRootOutputModule::endRun()
113 {
114  //fill Run data
115 
116  // End time
117  gettimeofday(&m_tend, nullptr);
118  auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
119  (m_tend.tv_usec - m_t0.tv_usec));
120 
121  // Statistics
122  // Sigma^2 = Sum(X^2)/n - (Sum(X)/n)^2
123 
124  double flowmb = m_size / etime * 1000.0;
125  double avesize = m_size / (double)m_nevt;
126  double avesize2 = m_size2 / (double)m_nevt;
127  double sigma2 = avesize2 - avesize * avesize;
128  double sigma = sqrt(sigma2);
129 
130  B2INFO("SeqRootOutput : " << m_nevt << " events written with total bytes of " << m_size << " kB");
131  B2INFO("SeqRootOutput : flow rate = " << flowmb << " (MB/s)");
132  B2INFO("SeqRootOutput : event size = " << avesize << " +- " << sigma << " (kB)");
133 
134  B2INFO("SeqRootOutput: endRun done.");
135 }
136 
137 
138 void SeqRootOutputModule::terminate()
139 {
140  delete m_msghandler;
141  delete m_streamer;
142  delete m_file;
143 
144  B2INFO("terminate called");
145 }
146 
147 
148 void SeqRootOutputModule::getStreamerInfos()
149 {
150  //
151  // Write StreamerInfo to a file
152  // Copy from RootOutputModule::initialize() and TSocket::SendStreamerInfos()
153  //
154 
155  if (!m_msghandler) {
156  B2FATAL("DataStoreStreamer : m_msghandler is NULL.");
157  return;
158  }
159 
160  TList* minilist = nullptr ;
161  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
162  DataStore::StoreEntryMap& map = DataStore::Instance().getStoreEntryMap(DataStore::EDurability(durability));
163 
164  for (auto& iter : map) {
165  const TClass* entryClass = iter.second.objClass;
166  TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
167  B2INFO("Recording StreamerInfo : durability " << durability << " : Class Name " << entryClass->GetName());
168  if (!minilist) minilist = new TList();
169  minilist->Add(vinfo);
170  }
171  }
172 
173  if (minilist) {
174  // TMessage messinfo(kMESS_STREAMERINFO);
175  // messinfo.WriteObject(minilist);
176  m_msghandler->add(minilist, "StreamerInfo");
177  // EvtMessage* msg = m_msghandler->encode_msg(MSG_EVENT);
178  EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
179  (msg->header())->nObjects = 1; // No. of objects
180  (msg->header())->nArrays = 0; // No. of arrays
181 
182  // int size = m_file->write(msg->buffer());
183  m_streamerinfo_size = *((int*)(msg->buffer())); // nbytes in the buffer at the beginning
184 
185  //copy the steamerINfo for later use
186  if (m_streamerinfo_size > 0) {
187  B2INFO("Get StreamerInfo from DataStore : " << m_streamerinfo_size << "bytes");
188  if (m_streamerinfo != nullptr) {
189  B2FATAL("getStreamerInfo() is called twice in the same run ");
190  } else {
191  m_streamerinfo = new char[ m_streamerinfo_size ];
192  }
193  memcpy(m_streamerinfo, msg->buffer(), m_streamerinfo_size);
194  } else {
195  B2FATAL("Invalid size of StreamerInfo : " << m_streamerinfo_size << "bytes");
196  }
197  delete minilist;
198  delete msg;
199  } else {
200  B2FATAL("Failed to get StreamerInfo : ");
201  }
202 
203  return;
204 }
Belle2::DataStore::StoreEntryMap
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:89
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::SeqRootOutputModule
Output module for sequential ROOT I/O.
Definition: SeqRootOutputModule.h:28
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::EvtMessage::header
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:162
Belle2::EvtMessage::buffer
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:77
Belle2::DataStoreStreamer
Stream/restore DataStore objects to/from EvtMessage.
Definition: DataStoreStreamer.h:33
Belle2::SeqFile
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
Belle2::MsgHandler
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:104
Belle2::DataStore::EDurability
EDurability
Durability types.
Definition: DataStore.h:60