Belle II Software  release-05-02-19
SeqRootInputModule.cc
1 //+
2 // Author : Ryosuke Itoh, IPNS, KEK
3 // Date : 13 - Aug - 2010
4 // 6 - Sep - 2012 modified to use DataStoreStreamer, clean up
5 //-
6 
7 #include <framework/modules/rootio/SeqRootInputModule.h>
8 
9 #include <framework/core/Environment.h>
10 #include <framework/datastore/DataStore.h>
11 #include <framework/datastore/StoreObjPtr.h>
12 #include <framework/dataobjects/FileMetaData.h>
13 #include <framework/io/RootIOUtilities.h>
14 #include <framework/database/Configuration.h>
15 
16 #include <cmath>
17 #include <cstdio>
18 
19 using namespace std;
20 using namespace Belle2;
21 
22 //-----------------------------------------------------------------
23 // Register the Module
24 //-----------------------------------------------------------------
25 REG_MODULE(SeqRootInput)
26 
27 //-----------------------------------------------------------------
28 // Implementation
29 //-----------------------------------------------------------------
30 
32 {
33  //Set module properties
34  setDescription("Read .sroot files produced by SeqRootOutput.");
35  setPropertyFlags(c_Input);
36 
37  //Parameter definition
38  addParam("inputFileName" , m_inputFileName,
39  "Input file name. Can also be a gzip-compressed file (with suffix .gz). "
40  "Parameter can be overridden using the -i argument to basf2.",
41  string(""));
42  vector<string> empty;
43  addParam("inputFileNames", m_filelist, "List of input files", empty);
44  addParam("fileNameIsPattern", m_fileNameIsPattern, "If true interpret the output "
45  "filename as a boost::format pattern instead of the standard where "
46  "subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'",
47  false);
48  addParam("declareRealData", m_realData, "Declare the input to be real, not generated data", false);
49 }
50 
51 SeqRootInputModule::~SeqRootInputModule() = default;
52 
53 void SeqRootInputModule::initialize()
54 {
55  // Specify input file(list)
56  if (!m_inputFileName.empty() && !m_filelist.empty()) {
57  B2FATAL("Cannot specify both 'inputFileName' and 'inputFileNames'");
58  }
59  const std::vector<std::string>& inputFiles = Environment::Instance().getInputFilesOverride();
60  if (!inputFiles.empty()) { // Override parameter specification
61  if (inputFiles.size() > 1) {
62  m_filelist = inputFiles;
63  }
64  m_inputFileName = inputFiles[0];
65  m_nfile = m_filelist.size();
66  } else if (m_filelist.size() > 0) {
67  m_nfile = m_filelist.size();
68  m_inputFileName = m_filelist[0];
69  } else {
70  m_nfile = 1;
71  }
72 
73  // Initialize DataStoreStreamer
74  m_streamer = new DataStoreStreamer();
75 
76  // Read the first event in SeqRoot file and restore in DataStore.
77  // This is necessary to create object tables before TTree initialization
78  // if used together with TTree based output (RootOutput module).
79 
80  EvtMessage* evtmsg = nullptr;
81  // Open input file
82  m_file = new SeqFile(m_inputFileName.c_str(), "r", nullptr, 0, m_fileNameIsPattern);
83  if (m_file->status() <= 0)
84  B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
85 
86  B2INFO("SeqRootInput : Open " << m_inputFileName);
87 
88  //Read StreamerInfo and the first event
89  int info_cnt = 0;
90  while (true) {
91  auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
92  int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
93  if (size > 0) {
94  evtmsg = new EvtMessage(evtbuf);
95  m_streamer->restoreDataStore(evtmsg);
96  if (evtmsg->type() == MSG_STREAMERINFO) {
97  // StreamerInfo was read
98  B2INFO("Reading StreamerInfo");
99  if (info_cnt != 0) B2FATAL("SeqRootInput : Reading StreamerInfos twice");
100  info_cnt++;
101  } else {
102  // first event was read
103  delete[] evtbuf;
104  delete evtmsg;
105  break;
106  }
107  delete[] evtbuf;
108  delete evtmsg;
109 
110  } else {
111  B2FATAL("SeqRootInput : Error in reading first event");
112  }
113  }
114  m_fileptr = 0;
115 
116  if (m_realData) {
117  StoreObjPtr<FileMetaData> fileMetaData("", DataStore::c_Persistent);
118  fileMetaData.registerInDataStore();
119  fileMetaData.create();
120  fileMetaData->declareRealData();
121  }
122  // make sure global tag replay is disabled and users have to specify a globaltag.
123  // We don't have input file metadata so this is all we can do.
124  Conditions::Configuration::getInstance().setInputGlobaltags({});
125 }
126 
127 
128 void SeqRootInputModule::beginRun()
129 {
130  gettimeofday(&m_t0, nullptr);
131  m_size = 0.0;
132  m_size2 = 0.0;
133  m_nevt = 0;
134  B2INFO("SeqRootInput: beginRun called.");
135 }
136 
137 
138 void SeqRootInputModule::event()
139 {
140  // on first call: first event is already loaded. This is actually called once
141  // before the first beginRun() since we are the module setting the EventInfo
142  // so don't get confused by the m_nevt=0 in beginRun()
143  if (++m_nevt == 0) return;
144 
145  // Get a SeqRoot record from the file
146  auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
147  EvtMessage* evtmsg = nullptr;
148  int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
149  if (size < 0) {
150  B2ERROR("SeqRootInput : file read error");
151  delete m_file;
152  m_file = nullptr;
153  delete[] evtbuf;
154  evtbuf = nullptr;
155  return;
156  } else if (size == 0) {
157  B2INFO("SeqRootInput : EOF detected");
158  delete m_file;
159  m_file = nullptr;
160  m_fileptr++;
161  if (m_fileptr >= m_nfile) {
162  delete[] evtbuf;
163  evtbuf = nullptr;
164  return;
165  }
166  printf("fileptr = %d ( of %d )\n", m_fileptr, m_nfile);
167  fflush(stdout);
168  m_inputFileName = m_filelist[m_fileptr];
169  m_file = new SeqFile(m_inputFileName, "r");
170  if (m_file->status() <= 0)
171  B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
172  B2INFO("SeqRootInput : Open " << m_inputFileName);
173  evtmsg = new EvtMessage(evtbuf);
174  // Skip the first record (StreamerInfo)
175  int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
176  if (is <= 0) {
177  B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
178  }
179  // Read next record
180  is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
181  if (is <= 0) {
182  B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
183  }
184  } else {
185  // printf("SeqRootInput : read = %d\n", size);
186  evtmsg = new EvtMessage(evtbuf);
187  }
188 
189  // Statistics
190  double dsize = (double)size / 1000.0;
191  m_size += dsize;
192  m_size2 += dsize * dsize;
193 
194  if (evtmsg->type() == MSG_STREAMERINFO) {
195  B2WARNING("SeqRootInput : StreamerInfo is found in the middle of *.sroot-* files. Skip record");
196  int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
197  if (is <= 0) {
198  B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
199  }
200  evtmsg = new EvtMessage(evtbuf);
201  }
202 
203  // Restore objects in DataStore
204  m_streamer->restoreDataStore(evtmsg);
205 
206  // Delete buffers
207  delete[] evtbuf;
208  evtbuf = nullptr;
209  delete evtmsg;
210  evtmsg = nullptr;
211 }
212 
213 void SeqRootInputModule::endRun()
214 {
215  // End time
216  gettimeofday(&m_tend, nullptr);
217  auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
218  (m_tend.tv_usec - m_t0.tv_usec));
219 
220  // Statistics
221  // Sigma^2 = Sum(X^2)/n - (Sum(X)/n)^2
222 
223  double flowmb = m_size / etime * 1000.0;
224  double evrate = (double)m_nevt / (etime / 1000.0);
225  double avesize = m_size / (double)m_nevt;
226  double avesize2 = m_size2 / (double)m_nevt;
227  double sigma2 = avesize2 - avesize * avesize;
228  double sigma = sqrt(sigma2);
229 
230  // printf ( "m_size = %f, m_size2 = %f, m_nevt = %d\n", m_size, m_size2, m_nevt );
231  // printf ( "avesize2 = %f, avesize = %f, avesize*avesize = %f\n", avesize2, avesize, avesize*avesize );
232  B2INFO("SeqRootInput : " << m_nevt << " events read with total bytes of " << m_size << " kB");
233  B2INFO("SeqRootInput : event rate = " << evrate << " (KHz)");
234  B2INFO("SeqRootInput : flow rate = " << flowmb << " (MB/s)");
235  B2INFO("SeqRootInput : event size = " << avesize << " +- " << sigma << " (kB)");
236 
237  B2INFO("SeqRootInput: endRun done.");
238 }
239 
240 
241 void SeqRootInputModule::terminate()
242 {
243  delete m_streamer;
244  delete m_file;
245  B2INFO("SeqRootInput: terminate called");
246 }
Belle2::EvtMessage::type
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:115
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2::ProcType::c_Input
@ c_Input
Input Process.
Belle2::SeqRootInputModule
Module to read files produced by SeqRootOutputModule.
Definition: SeqRootInputModule.h:22
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::StoreObjPtr
Type-safe access to single objects in the data store.
Definition: ParticleList.h:33
Belle2::DataStoreStreamer
Stream/restore DataStore objects to/from EvtMessage.
Definition: DataStoreStreamer.h:33
Belle2::SeqFile
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22