Belle II Software  release-08-01-10
SeqRootInputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/modules/rootio/SeqRootInputModule.h>
10 
11 #include <framework/core/Environment.h>
12 #include <framework/datastore/DataStore.h>
13 #include <framework/datastore/StoreObjPtr.h>
14 #include <framework/dataobjects/FileMetaData.h>
15 #include <framework/io/RootIOUtilities.h>
16 #include <framework/database/Configuration.h>
17 
18 #include <cmath>
19 #include <cstdio>
20 
21 using namespace std;
22 using namespace Belle2;
23 
24 //-----------------------------------------------------------------
25 // Register the Module
26 //-----------------------------------------------------------------
27 REG_MODULE(SeqRootInput);
28 
29 //-----------------------------------------------------------------
30 // Implementation
31 //-----------------------------------------------------------------
32 
33 SeqRootInputModule::SeqRootInputModule() : Module()
34 {
35  //Set module properties
36  setDescription("Read .sroot files produced by SeqRootOutput.");
38 
39  //Parameter definition
40  addParam("inputFileName", m_inputFileName,
41  "Input file name. Can also be a gzip-compressed file (with suffix .gz). "
42  "Parameter can be overridden using the -i argument to basf2.",
43  string(""));
44  vector<string> empty;
45  addParam("inputFileNames", m_filelist, "List of input files", empty);
46  addParam("fileNameIsPattern", m_fileNameIsPattern, "If true interpret the output "
47  "filename as a boost::format pattern instead of the standard where "
48  "subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'",
49  false);
50  addParam("declareRealData", m_realData, "Declare the input to be real, not generated data", false);
51 }
52 
53 SeqRootInputModule::~SeqRootInputModule() = default;
54 
56 {
57  // Specify input file(list)
58  if (!m_inputFileName.empty() && !m_filelist.empty()) {
59  B2FATAL("Cannot specify both 'inputFileName' and 'inputFileNames'");
60  }
61  const std::vector<std::string>& inputFiles = Environment::Instance().getInputFilesOverride();
62  if (!inputFiles.empty()) { // Override parameter specification
63  if (inputFiles.size() > 1) {
64  m_filelist = inputFiles;
65  }
66  m_inputFileName = inputFiles[0];
67  m_nfile = m_filelist.size();
68  } else if (m_filelist.size() > 0) {
69  m_nfile = m_filelist.size();
71  } else {
72  m_nfile = 1;
73  }
74 
75  // Initialize DataStoreStreamer
77 
78  // Read the first event in SeqRoot file and restore in DataStore.
79  // This is necessary to create object tables before TTree initialization
80  // if used together with TTree based output (RootOutput module).
81 
82  EvtMessage* evtmsg = nullptr;
83  // Open input file
84  m_file = new SeqFile(m_inputFileName.c_str(), "r", nullptr, 0, m_fileNameIsPattern);
85  if (m_file->status() <= 0)
86  B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
87 
88  B2INFO("SeqRootInput : Open " << m_inputFileName);
89 
90  //Read StreamerInfo and the first event
91  int info_cnt = 0;
92  while (true) {
93  auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
94  int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
95  if (size > 0) {
96  evtmsg = new EvtMessage(evtbuf);
98  if (evtmsg->type() == MSG_STREAMERINFO) {
99  // StreamerInfo was read
100  B2INFO("Reading StreamerInfo");
101  if (info_cnt != 0) B2FATAL("SeqRootInput : Reading StreamerInfos twice");
102  info_cnt++;
103  } else {
104  // first event was read
105  delete[] evtbuf;
106  delete evtmsg;
107  break;
108  }
109  delete[] evtbuf;
110  delete evtmsg;
111 
112  } else {
113  B2FATAL("SeqRootInput : Error in reading first event");
114  }
115  }
116  m_fileptr = 0;
117 
118  if (m_realData) {
120  fileMetaData.registerInDataStore();
121  fileMetaData.create();
122  fileMetaData->declareRealData();
123  }
124  // make sure global tag replay is disabled and users have to specify a globaltag.
125  // We don't have input file metadata so this is all we can do.
127 }
128 
129 
131 {
132  gettimeofday(&m_t0, nullptr);
133  m_size = 0.0;
134  m_size2 = 0.0;
135  m_nevt = 0;
136  B2INFO("SeqRootInput: beginRun called.");
137 }
138 
139 
141 {
142  // on first call: first event is already loaded. This is actually called once
143  // before the first beginRun() since we are the module setting the EventInfo
144  // so don't get confused by the m_nevt=0 in beginRun()
145  if (++m_nevt == 0) return;
146 
147  // Get a SeqRoot record from the file
148  auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
149  EvtMessage* evtmsg = nullptr;
150  int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
151  if (size < 0) {
152  B2ERROR("SeqRootInput : file read error");
153  delete m_file;
154  m_file = nullptr;
155  delete[] evtbuf;
156  evtbuf = nullptr;
157  return;
158  } else if (size == 0) {
159  B2INFO("SeqRootInput : EOF detected");
160  delete m_file;
161  m_file = nullptr;
162  m_fileptr++;
163  if (m_fileptr >= m_nfile) {
164  delete[] evtbuf;
165  evtbuf = nullptr;
166  return;
167  }
168  printf("fileptr = %d ( of %d )\n", m_fileptr, m_nfile);
169  fflush(stdout);
171  m_file = new SeqFile(m_inputFileName, "r");
172  if (m_file->status() <= 0)
173  B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
174  B2INFO("SeqRootInput : Open " << m_inputFileName);
175  evtmsg = new EvtMessage(evtbuf);
176  // Skip the first record (StreamerInfo)
177  int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
178  if (is <= 0) {
179  B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
180  }
181  // Read next record
182  is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
183  if (is <= 0) {
184  B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
185  }
186  } else {
187  // printf("SeqRootInput : read = %d\n", size);
188  evtmsg = new EvtMessage(evtbuf);
189  }
190 
191  // Statistics
192  double dsize = (double)size / 1000.0;
193  m_size += dsize;
194  m_size2 += dsize * dsize;
195 
196  if (evtmsg->type() == MSG_STREAMERINFO) {
197  B2WARNING("SeqRootInput : StreamerInfo is found in the middle of *.sroot-* files. Skip record");
198  int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
199  if (is <= 0) {
200  B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
201  }
202  evtmsg = new EvtMessage(evtbuf);
203  }
204 
205  // Restore objects in DataStore
206  m_streamer->restoreDataStore(evtmsg);
207 
208  // Delete buffers
209  delete[] evtbuf;
210  evtbuf = nullptr;
211  delete evtmsg;
212  evtmsg = nullptr;
213 }
214 
216 {
217  // End time
218  gettimeofday(&m_tend, nullptr);
219  auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
220  (m_tend.tv_usec - m_t0.tv_usec));
221 
222  // Statistics
223  // Sigma^2 = Sum(X^2)/n - (Sum(X)/n)^2
224 
225  double flowmb = m_size / etime * 1000.0;
226  double evrate = (double)m_nevt / (etime / 1000.0);
227  double avesize = m_size / (double)m_nevt;
228  double avesize2 = m_size2 / (double)m_nevt;
229  double sigma2 = avesize2 - avesize * avesize;
230  double sigma = sqrt(sigma2);
231 
232  // printf ( "m_size = %f, m_size2 = %f, m_nevt = %d\n", m_size, m_size2, m_nevt );
233  // printf ( "avesize2 = %f, avesize = %f, avesize*avesize = %f\n", avesize2, avesize, avesize*avesize );
234  B2INFO("SeqRootInput : " << m_nevt << " events read with total bytes of " << m_size << " kB");
235  B2INFO("SeqRootInput : event rate = " << evrate << " (KHz)");
236  B2INFO("SeqRootInput : flow rate = " << flowmb << " (MB/s)");
237  B2INFO("SeqRootInput : event size = " << avesize << " +- " << sigma << " (kB)");
238 
239  B2INFO("SeqRootInput: endRun done.");
240 }
241 
242 
244 {
245  delete m_streamer;
246  delete m_file;
247  B2INFO("SeqRootInput: terminate called");
248 }
static Configuration & getInstance()
Get a reference to the instance which will be used when the Database is initialized.
void setInputGlobaltags(const std::vector< std::string > &inputTags)
To be called by input modules with the tags to be added from input files.
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
@ c_Persistent
Object is available during entire execution time.
Definition: DataStore.h:60
const std::vector< std::string > & getInputFilesOverride() const
Return overriden input file names, or empty vector if none were set.
Definition: Environment.h:103
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
static const unsigned int c_MaxEventSize
maximal EvtMessage size, in bytes (200MB).
Definition: EvtMessage.h:63
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
int status() const
Returns status after constructor call.
Definition: SeqFile.cc:135
int read(char *buf, int max)
Read a record from a file.
Definition: SeqFile.cc:174
int m_nevt
Total nr. of events in the file.
DataStoreStreamer * m_streamer
DataStoreStreamer.
virtual void initialize() override
Module functions to be called from main process.
virtual void event() override
This method is the core of the module.
struct timeval m_tend
time at end of current run.
virtual void endRun() override
This method is called if the current run ends.
virtual void terminate() override
This method is called at the end of the event processing.
std::string m_inputFileName
File name.
bool m_fileNameIsPattern
If true the output filename will be interpreted as a boost::format pattern.
SeqFile * m_file
Blocked file handler.
virtual void beginRun() override
Module functions to be called from event process.
bool m_realData
Is the input real data?
std::vector< std::string > m_filelist
List of all file names to read.
int m_fileptr
Index of current file in m_filelist.
double m_size
total transferred data, in kB.
int m_nfile
Number of files to read (aka m_filelist.size())
double m_size2
sum of squares of data transferred in each event, in kB^2.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
double sqrt(double a)
sqrt for double
Definition: beamHelpers.h:28
Abstract base class for different kinds of events.