Belle II Software  release-06-02-00
SeqRootInputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/modules/rootio/SeqRootInputModule.h>
10 
11 #include <framework/core/Environment.h>
12 #include <framework/datastore/DataStore.h>
13 #include <framework/datastore/StoreObjPtr.h>
14 #include <framework/dataobjects/FileMetaData.h>
15 #include <framework/io/RootIOUtilities.h>
16 #include <framework/database/Configuration.h>
17 
18 #include <cmath>
19 #include <cstdio>
20 
21 using namespace std;
22 using namespace Belle2;
23 
24 //-----------------------------------------------------------------
25 // Register the Module
26 //-----------------------------------------------------------------
27 REG_MODULE(SeqRootInput)
28 
29 //-----------------------------------------------------------------
30 // Implementation
31 //-----------------------------------------------------------------
32 
34 {
35  //Set module properties
36  setDescription("Read .sroot files produced by SeqRootOutput.");
37  setPropertyFlags(c_Input);
38 
39  //Parameter definition
40  addParam("inputFileName" , m_inputFileName,
41  "Input file name. Can also be a gzip-compressed file (with suffix .gz). "
42  "Parameter can be overridden using the -i argument to basf2.",
43  string(""));
44  vector<string> empty;
45  addParam("inputFileNames", m_filelist, "List of input files", empty);
46  addParam("fileNameIsPattern", m_fileNameIsPattern, "If true interpret the output "
47  "filename as a boost::format pattern instead of the standard where "
48  "subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'",
49  false);
50  addParam("declareRealData", m_realData, "Declare the input to be real, not generated data", false);
51 }
52 
53 SeqRootInputModule::~SeqRootInputModule() = default;
54 
55 void SeqRootInputModule::initialize()
56 {
57  // Specify input file(list)
58  if (!m_inputFileName.empty() && !m_filelist.empty()) {
59  B2FATAL("Cannot specify both 'inputFileName' and 'inputFileNames'");
60  }
61  const std::vector<std::string>& inputFiles = Environment::Instance().getInputFilesOverride();
62  if (!inputFiles.empty()) { // Override parameter specification
63  if (inputFiles.size() > 1) {
64  m_filelist = inputFiles;
65  }
66  m_inputFileName = inputFiles[0];
67  m_nfile = m_filelist.size();
68  } else if (m_filelist.size() > 0) {
69  m_nfile = m_filelist.size();
70  m_inputFileName = m_filelist[0];
71  } else {
72  m_nfile = 1;
73  }
74 
75  // Initialize DataStoreStreamer
76  m_streamer = new DataStoreStreamer();
77 
78  // Read the first event in SeqRoot file and restore in DataStore.
79  // This is necessary to create object tables before TTree initialization
80  // if used together with TTree based output (RootOutput module).
81 
82  EvtMessage* evtmsg = nullptr;
83  // Open input file
84  m_file = new SeqFile(m_inputFileName.c_str(), "r", nullptr, 0, m_fileNameIsPattern);
85  if (m_file->status() <= 0)
86  B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
87 
88  B2INFO("SeqRootInput : Open " << m_inputFileName);
89 
90  //Read StreamerInfo and the first event
91  int info_cnt = 0;
92  while (true) {
93  auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
94  int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
95  if (size > 0) {
96  evtmsg = new EvtMessage(evtbuf);
97  m_streamer->restoreDataStore(evtmsg);
98  if (evtmsg->type() == MSG_STREAMERINFO) {
99  // StreamerInfo was read
100  B2INFO("Reading StreamerInfo");
101  if (info_cnt != 0) B2FATAL("SeqRootInput : Reading StreamerInfos twice");
102  info_cnt++;
103  } else {
104  // first event was read
105  delete[] evtbuf;
106  delete evtmsg;
107  break;
108  }
109  delete[] evtbuf;
110  delete evtmsg;
111 
112  } else {
113  B2FATAL("SeqRootInput : Error in reading first event");
114  }
115  }
116  m_fileptr = 0;
117 
118  if (m_realData) {
119  StoreObjPtr<FileMetaData> fileMetaData("", DataStore::c_Persistent);
120  fileMetaData.registerInDataStore();
121  fileMetaData.create();
122  fileMetaData->declareRealData();
123  }
124  // make sure global tag replay is disabled and users have to specify a globaltag.
125  // We don't have input file metadata so this is all we can do.
126  Conditions::Configuration::getInstance().setInputGlobaltags({});
127 }
128 
129 
130 void SeqRootInputModule::beginRun()
131 {
132  gettimeofday(&m_t0, nullptr);
133  m_size = 0.0;
134  m_size2 = 0.0;
135  m_nevt = 0;
136  B2INFO("SeqRootInput: beginRun called.");
137 }
138 
139 
140 void SeqRootInputModule::event()
141 {
142  // on first call: first event is already loaded. This is actually called once
143  // before the first beginRun() since we are the module setting the EventInfo
144  // so don't get confused by the m_nevt=0 in beginRun()
145  if (++m_nevt == 0) return;
146 
147  // Get a SeqRoot record from the file
148  auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
149  EvtMessage* evtmsg = nullptr;
150  int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
151  if (size < 0) {
152  B2ERROR("SeqRootInput : file read error");
153  delete m_file;
154  m_file = nullptr;
155  delete[] evtbuf;
156  evtbuf = nullptr;
157  return;
158  } else if (size == 0) {
159  B2INFO("SeqRootInput : EOF detected");
160  delete m_file;
161  m_file = nullptr;
162  m_fileptr++;
163  if (m_fileptr >= m_nfile) {
164  delete[] evtbuf;
165  evtbuf = nullptr;
166  return;
167  }
168  printf("fileptr = %d ( of %d )\n", m_fileptr, m_nfile);
169  fflush(stdout);
170  m_inputFileName = m_filelist[m_fileptr];
171  m_file = new SeqFile(m_inputFileName, "r");
172  if (m_file->status() <= 0)
173  B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
174  B2INFO("SeqRootInput : Open " << m_inputFileName);
175  evtmsg = new EvtMessage(evtbuf);
176  // Skip the first record (StreamerInfo)
177  int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
178  if (is <= 0) {
179  B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
180  }
181  // Read next record
182  is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
183  if (is <= 0) {
184  B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
185  }
186  } else {
187  // printf("SeqRootInput : read = %d\n", size);
188  evtmsg = new EvtMessage(evtbuf);
189  }
190 
191  // Statistics
192  double dsize = (double)size / 1000.0;
193  m_size += dsize;
194  m_size2 += dsize * dsize;
195 
196  if (evtmsg->type() == MSG_STREAMERINFO) {
197  B2WARNING("SeqRootInput : StreamerInfo is found in the middle of *.sroot-* files. Skip record");
198  int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
199  if (is <= 0) {
200  B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
201  }
202  evtmsg = new EvtMessage(evtbuf);
203  }
204 
205  // Restore objects in DataStore
206  m_streamer->restoreDataStore(evtmsg);
207 
208  // Delete buffers
209  delete[] evtbuf;
210  evtbuf = nullptr;
211  delete evtmsg;
212  evtmsg = nullptr;
213 }
214 
215 void SeqRootInputModule::endRun()
216 {
217  // End time
218  gettimeofday(&m_tend, nullptr);
219  auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
220  (m_tend.tv_usec - m_t0.tv_usec));
221 
222  // Statistics
223  // Sigma^2 = Sum(X^2)/n - (Sum(X)/n)^2
224 
225  double flowmb = m_size / etime * 1000.0;
226  double evrate = (double)m_nevt / (etime / 1000.0);
227  double avesize = m_size / (double)m_nevt;
228  double avesize2 = m_size2 / (double)m_nevt;
229  double sigma2 = avesize2 - avesize * avesize;
230  double sigma = sqrt(sigma2);
231 
232  // printf ( "m_size = %f, m_size2 = %f, m_nevt = %d\n", m_size, m_size2, m_nevt );
233  // printf ( "avesize2 = %f, avesize = %f, avesize*avesize = %f\n", avesize2, avesize, avesize*avesize );
234  B2INFO("SeqRootInput : " << m_nevt << " events read with total bytes of " << m_size << " kB");
235  B2INFO("SeqRootInput : event rate = " << evrate << " (KHz)");
236  B2INFO("SeqRootInput : flow rate = " << flowmb << " (MB/s)");
237  B2INFO("SeqRootInput : event size = " << avesize << " +- " << sigma << " (kB)");
238 
239  B2INFO("SeqRootInput: endRun done.");
240 }
241 
242 
243 void SeqRootInputModule::terminate()
244 {
245  delete m_streamer;
246  delete m_file;
247  B2INFO("SeqRootInput: terminate called");
248 }
Stream/restore DataStore objects to/from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
Base class for Modules.
Definition: Module.h:72
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
Module to read files produced by SeqRootOutputModule.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:95
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
@ c_Input
Input Process.
Abstract base class for different kinds of events.