Belle II Software  release-08-01-10
SeqRootOutputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/modules/rootio/SeqRootOutputModule.h>
10 #include <framework/datastore/DataStore.h>
11 #include <framework/core/Environment.h>
12 
13 #include <cmath>
14 
15 #include <TClass.h>
16 #include <TList.h>
17 #include <TVirtualStreamerInfo.h>
18 
19 using namespace std;
20 using namespace Belle2;
21 
22 //-----------------------------------------------------------------
23 // Register the Module
24 //-----------------------------------------------------------------
25 REG_MODULE(SeqRootOutput);
26 
27 //-----------------------------------------------------------------
28 // Implementation
29 //-----------------------------------------------------------------
30 
31 SeqRootOutputModule::SeqRootOutputModule() : Module(), m_nevt(0), m_streamer(nullptr), m_size(0), m_size2(0)
32 {
33  //Set module properties
34  setDescription("Save a sequential ROOT file (non-standard I/O format used in DAQ). See https://confluence.desy.de/display/BI/Software+PersistencyModules for further information and a comparison with the .root format.");
35  m_file = nullptr;
36  m_msghandler = nullptr;
37  m_streamerinfo = nullptr;
39 
40  vector<string> emptyvector;
41  //Parameter definition
42  addParam("outputFileName", m_outputFileName,
43  "Output file name. Add a .gz suffix to save a gzip-compressed file. Parameter can be overridden using the -o argument to basf2.",
44  string("SeqRootOutput.sroot"));
45  addParam("compressionLevel", m_compressionLevel,
46  "Compression Level: 0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by 50%, higher levels have no noticable effect. NOTE: Because of a ROOT bug ( https://sft.its.cern.ch/jira/browse/ROOT-4550 ), this option currently causes memory leaks and is disabled.",
47  0);
48  addParam("saveObjs", m_saveObjs, "List of objects/arrays to be saved", emptyvector);
49  addParam("fileNameIsPattern", m_fileNameIsPattern, "If true interpret the output filename as a boost::format pattern "
50  "instead of the standard where subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'", false);
51 }
52 
53 
54 SeqRootOutputModule::~SeqRootOutputModule()
55 {
56  if (m_streamerinfo != nullptr) delete m_streamerinfo;
57 }
58 
60 {
61  const std::string& outputFileArgument = Environment::Instance().getOutputFileOverride();
62  if (!outputFileArgument.empty())
63  m_outputFileName = outputFileArgument;
64 
65  // Open output file
66 
67 
68  // Message handler to encode serialized object
70 
71  // DataStoreStreamer
74 
75  //Write StreamerInfo at the beginning of a file
77 
79 
80  B2INFO("SeqRootOutput: initialized.");
81 }
82 
83 
85 {
86 
87  // Statistics
88  gettimeofday(&m_t0, nullptr);
89  m_size = 0.0;
90  m_size2 = 0.0;
91  m_nevt = 0;
92 
93  B2INFO("SeqRootOutput: beginRun called.");
94 }
95 
97 {
98  // Stream DataStore in EvtMessage
99  EvtMessage* msg = m_streamer->streamDataStore(false);
100 
101  // Store EvtMessage
102  int stat = m_file->write(msg->buffer());
103 
104  // Clean up EvtMessage
105  delete msg;
106 
107  // Statistics
108  double dsize = (double)stat / 1000.0;
109  m_size += dsize;
110  m_size2 += dsize * dsize;
111  m_nevt++;
112 }
113 
115 {
116  //fill Run data
117 
118  // End time
119  gettimeofday(&m_tend, nullptr);
120  auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
121  (m_tend.tv_usec - m_t0.tv_usec));
122 
123  // Statistics
124  // Sigma^2 = Sum(X^2)/n - (Sum(X)/n)^2
125 
126  double flowmb = m_size / etime * 1000.0;
127  double avesize = m_size / (double)m_nevt;
128  double avesize2 = m_size2 / (double)m_nevt;
129  double sigma2 = avesize2 - avesize * avesize;
130  double sigma = sqrt(sigma2);
131 
132  B2INFO("SeqRootOutput : " << m_nevt << " events written with total bytes of " << m_size << " kB");
133  B2INFO("SeqRootOutput : flow rate = " << flowmb << " (MB/s)");
134  B2INFO("SeqRootOutput : event size = " << avesize << " +- " << sigma << " (kB)");
135 
136  B2INFO("SeqRootOutput: endRun done.");
137 }
138 
139 
141 {
142  delete m_msghandler;
143  delete m_streamer;
144  delete m_file;
145 
146  B2INFO("terminate called");
147 }
148 
149 
151 {
152  //
153  // Write StreamerInfo to a file
154  // Copy from RootOutputModule::initialize() and TSocket::SendStreamerInfos()
155  //
156 
157  if (!m_msghandler) {
158  B2FATAL("DataStoreStreamer : m_msghandler is NULL.");
159  return;
160  }
161 
162  TList* minilist = nullptr ;
163  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
165 
166  for (auto& iter : map) {
167  const TClass* entryClass = iter.second.objClass;
168  TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
169  B2INFO("Recording StreamerInfo : durability " << durability << " : Class Name " << entryClass->GetName());
170  if (!minilist) minilist = new TList();
171  minilist->Add(vinfo);
172  }
173  }
174 
175  if (minilist) {
176  // TMessage messinfo(kMESS_STREAMERINFO);
177  // messinfo.WriteObject(minilist);
178  m_msghandler->add(minilist, "StreamerInfo");
179  // EvtMessage* msg = m_msghandler->encode_msg(MSG_EVENT);
180  EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
181  (msg->header())->nObjects = 1; // No. of objects
182  (msg->header())->nArrays = 0; // No. of arrays
183 
184  // int size = m_file->write(msg->buffer());
185  m_streamerinfo_size = *((int*)(msg->buffer())); // nbytes in the buffer at the beginning
186 
187  //copy the steamerINfo for later use
188  if (m_streamerinfo_size > 0) {
189  B2INFO("Get StreamerInfo from DataStore : " << m_streamerinfo_size << "bytes");
190  if (m_streamerinfo != nullptr) {
191  B2FATAL("getStreamerInfo() is called twice in the same run ");
192  } else {
193  m_streamerinfo = new char[ m_streamerinfo_size ];
194  }
195  memcpy(m_streamerinfo, msg->buffer(), m_streamerinfo_size);
196  } else {
197  B2FATAL("Invalid size of StreamerInfo : " << m_streamerinfo_size << "bytes");
198  }
199  delete minilist;
200  delete msg;
201  } else {
202  B2FATAL("Failed to get StreamerInfo : ");
203  }
204 
205  return;
206 }
Stream/restore DataStore objects to/from EvtMessage.
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
Definition: DataStore.h:325
static const int c_NDurabilityTypes
Number of Durability Types.
Definition: DataStore.h:63
EDurability
Durability types.
Definition: DataStore.h:58
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
const std::string & getOutputFileOverride() const
Return overriden output file name, or "" if none was set.
Definition: Environment.h:115
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
int write(const char *buf)
Write a record to a file.
Definition: SeqFile.cc:140
int m_nevt
Total nr. of events in the file.
DataStoreStreamer * m_streamer
DataStoreStreamer.
virtual void initialize() override
Module functions to be called from main process.
virtual void event() override
This method is the core of the module.
struct timeval m_tend
time at end of current run.
virtual void endRun() override
This method is called if the current run ends.
virtual void terminate() override
This method is called at the end of the event processing.
bool m_fileNameIsPattern
If true the output filename will be interpreted as a boost::format pattern.
int m_streamerinfo_size
The size of the StreamerInfo.
SeqFile * m_file
Blocked file handler.
MsgHandler * m_msghandler
Messaage handler.
virtual void beginRun() override
Module functions to be called from event process.
std::vector< std::string > m_saveObjs
List of objects to be saved.
int m_compressionLevel
Compression level.
double m_size
total transferred data, in kB.
void getStreamerInfos()
! Write StreamerInfos to a file
char * m_streamerinfo
StreamerInfo to be written.
std::string m_outputFileName
File name.
double m_size2
sum of squares of data transferred in each event, in kB^2.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
double sqrt(double a)
sqrt for double
Definition: beamHelpers.h:28
Abstract base class for different kinds of events.