Belle II Software  release-06-00-14
SeqRootOutputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/modules/rootio/SeqRootOutputModule.h>
10 #include <framework/datastore/DataStore.h>
11 #include <framework/core/Environment.h>
12 
13 #include <cmath>
14 
15 #include <TClass.h>
16 #include <TList.h>
17 #include <TVirtualStreamerInfo.h>
18 
19 using namespace std;
20 using namespace Belle2;
21 
22 //-----------------------------------------------------------------
23 // Register the Module
24 //-----------------------------------------------------------------
25 REG_MODULE(SeqRootOutput)
26 
27 //-----------------------------------------------------------------
28 // Implementation
29 //-----------------------------------------------------------------
30 
31 SeqRootOutputModule::SeqRootOutputModule() : Module(), m_nevt(0), m_streamer(nullptr), m_size(0), m_size2(0)
32 {
33  //Set module properties
34  setDescription("Save a sequential ROOT file (non-standard I/O format used in DAQ). See https://confluence.desy.de/display/BI/Software+PersistencyModules for further information and a comparison with the .root format.");
35  m_file = nullptr;
36  m_msghandler = nullptr;
37  m_streamerinfo = nullptr;
38  m_streamerinfo_size = 0;
39 
40  vector<string> emptyvector;
41  //Parameter definition
42  addParam("outputFileName" , m_outputFileName,
43  "Output file name. Add a .gz suffix to save a gzip-compressed file. Parameter can be overridden using the -o argument to basf2.",
44  string("SeqRootOutput.sroot"));
45  addParam("compressionLevel", m_compressionLevel,
46  "Compression Level: 0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by 50%, higher levels have no noticable effect. NOTE: Because of a ROOT bug ( https://sft.its.cern.ch/jira/browse/ROOT-4550 ), this option currently causes memory leaks and is disabled.",
47  0);
48  addParam("saveObjs", m_saveObjs, "List of objects/arrays to be saved", emptyvector);
49  addParam("fileNameIsPattern", m_fileNameIsPattern, "If true interpret the output filename as a boost::format pattern "
50  "instead of the standard where subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'", false);
51 }
52 
53 
54 SeqRootOutputModule::~SeqRootOutputModule()
55 {
56  if (m_streamerinfo != nullptr) delete m_streamerinfo;
57 }
58 
59 void SeqRootOutputModule::initialize()
60 {
61  const std::string& outputFileArgument = Environment::Instance().getOutputFileOverride();
62  if (!outputFileArgument.empty())
63  m_outputFileName = outputFileArgument;
64 
65  // Open output file
66 
67 
68  // Message handler to encode serialized object
69  m_msghandler = new MsgHandler(m_compressionLevel);
70 
71  // DataStoreStreamer
72  m_streamer = new DataStoreStreamer(m_compressionLevel);
73  m_streamer->setStreamingObjects(m_saveObjs);
74 
75  //Write StreamerInfo at the beginning of a file
76  getStreamerInfos();
77 
78  m_file = new SeqFile(m_outputFileName.c_str(), "w", m_streamerinfo, m_streamerinfo_size, m_fileNameIsPattern);
79 
80  B2INFO("SeqRootOutput: initialized.");
81 }
82 
83 
84 void SeqRootOutputModule::beginRun()
85 {
86 
87  // Statistics
88  gettimeofday(&m_t0, nullptr);
89  m_size = 0.0;
90  m_size2 = 0.0;
91  m_nevt = 0;
92 
93  B2INFO("SeqRootOutput: beginRun called.");
94 }
95 
96 void SeqRootOutputModule::event()
97 {
98  // Stream DataStore in EvtMessage
99  EvtMessage* msg = m_streamer->streamDataStore(false);
100 
101  // Store EvtMessage
102  int stat = m_file->write(msg->buffer());
103 
104  // Clean up EvtMessage
105  delete msg;
106 
107  // Statistics
108  double dsize = (double)stat / 1000.0;
109  m_size += dsize;
110  m_size2 += dsize * dsize;
111  m_nevt++;
112 }
113 
114 void SeqRootOutputModule::endRun()
115 {
116  //fill Run data
117 
118  // End time
119  gettimeofday(&m_tend, nullptr);
120  auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
121  (m_tend.tv_usec - m_t0.tv_usec));
122 
123  // Statistics
124  // Sigma^2 = Sum(X^2)/n - (Sum(X)/n)^2
125 
126  double flowmb = m_size / etime * 1000.0;
127  double avesize = m_size / (double)m_nevt;
128  double avesize2 = m_size2 / (double)m_nevt;
129  double sigma2 = avesize2 - avesize * avesize;
130  double sigma = sqrt(sigma2);
131 
132  B2INFO("SeqRootOutput : " << m_nevt << " events written with total bytes of " << m_size << " kB");
133  B2INFO("SeqRootOutput : flow rate = " << flowmb << " (MB/s)");
134  B2INFO("SeqRootOutput : event size = " << avesize << " +- " << sigma << " (kB)");
135 
136  B2INFO("SeqRootOutput: endRun done.");
137 }
138 
139 
140 void SeqRootOutputModule::terminate()
141 {
142  delete m_msghandler;
143  delete m_streamer;
144  delete m_file;
145 
146  B2INFO("terminate called");
147 }
148 
149 
150 void SeqRootOutputModule::getStreamerInfos()
151 {
152  //
153  // Write StreamerInfo to a file
154  // Copy from RootOutputModule::initialize() and TSocket::SendStreamerInfos()
155  //
156 
157  if (!m_msghandler) {
158  B2FATAL("DataStoreStreamer : m_msghandler is NULL.");
159  return;
160  }
161 
162  TList* minilist = nullptr ;
163  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
164  DataStore::StoreEntryMap& map = DataStore::Instance().getStoreEntryMap(DataStore::EDurability(durability));
165 
166  for (auto& iter : map) {
167  const TClass* entryClass = iter.second.objClass;
168  TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
169  B2INFO("Recording StreamerInfo : durability " << durability << " : Class Name " << entryClass->GetName());
170  if (!minilist) minilist = new TList();
171  minilist->Add(vinfo);
172  }
173  }
174 
175  if (minilist) {
176  // TMessage messinfo(kMESS_STREAMERINFO);
177  // messinfo.WriteObject(minilist);
178  m_msghandler->add(minilist, "StreamerInfo");
179  // EvtMessage* msg = m_msghandler->encode_msg(MSG_EVENT);
180  EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
181  (msg->header())->nObjects = 1; // No. of objects
182  (msg->header())->nArrays = 0; // No. of arrays
183 
184  // int size = m_file->write(msg->buffer());
185  m_streamerinfo_size = *((int*)(msg->buffer())); // nbytes in the buffer at the beginning
186 
187  //copy the steamerINfo for later use
188  if (m_streamerinfo_size > 0) {
189  B2INFO("Get StreamerInfo from DataStore : " << m_streamerinfo_size << "bytes");
190  if (m_streamerinfo != nullptr) {
191  B2FATAL("getStreamerInfo() is called twice in the same run ");
192  } else {
193  m_streamerinfo = new char[ m_streamerinfo_size ];
194  }
195  memcpy(m_streamerinfo, msg->buffer(), m_streamerinfo_size);
196  } else {
197  B2FATAL("Invalid size of StreamerInfo : " << m_streamerinfo_size << "bytes");
198  }
199  delete minilist;
200  delete msg;
201  } else {
202  B2FATAL("Failed to get StreamerInfo : ");
203  }
204 
205  return;
206 }
Stream/restore DataStore objects to/from EvtMessage.
EDurability
Durability types.
Definition: DataStore.h:58
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
Base class for Modules.
Definition: Module.h:72
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
Output module for sequential ROOT I/O.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.