Belle II Software development
SeqRootInputModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/modules/rootio/SeqRootInputModule.h>
10
11#include <framework/core/Environment.h>
12#include <framework/datastore/DataStore.h>
13#include <framework/datastore/StoreObjPtr.h>
14#include <framework/dataobjects/FileMetaData.h>
15#include <framework/database/Configuration.h>
16
17#include <cmath>
18#include <cstdio>
19
20using namespace std;
21using namespace Belle2;
22
23//-----------------------------------------------------------------
24// Register the Module
25//-----------------------------------------------------------------
26REG_MODULE(SeqRootInput);
27
28//-----------------------------------------------------------------
29// Implementation
30//-----------------------------------------------------------------
31
33{
34 //Set module properties
35 setDescription("Read .sroot files produced by SeqRootOutput.");
37
38 //Parameter definition
39 addParam("inputFileName", m_inputFileName,
40 "Input file name. Can also be a gzip-compressed file (with suffix .gz). "
41 "Parameter can be overridden using the -i argument to basf2.",
42 string(""));
43 vector<string> empty;
44 addParam("inputFileNames", m_filelist, "List of input files", empty);
45 addParam("fileNameIsPattern", m_fileNameIsPattern, "If true interpret the output "
46 "filename as a boost::format pattern instead of the standard where "
47 "subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'",
48 false);
49 addParam("declareRealData", m_realData, "Declare the input to be real, not generated data", false);
50}
51
52SeqRootInputModule::~SeqRootInputModule() = default;
53
55{
56 // Specify input file(list)
57 if (!m_inputFileName.empty() && !m_filelist.empty()) {
58 B2FATAL("Cannot specify both 'inputFileName' and 'inputFileNames'");
59 }
60 const std::vector<std::string>& inputFiles = Environment::Instance().getInputFilesOverride();
61 if (!inputFiles.empty()) { // Override parameter specification
62 if (inputFiles.size() > 1) {
63 m_filelist = inputFiles;
64 }
65 m_inputFileName = inputFiles[0];
66 m_nfile = m_filelist.size();
67 } else if (m_filelist.size() > 0) {
68 m_nfile = m_filelist.size();
70 } else {
71 m_nfile = 1;
72 }
73
74 // Initialize DataStoreStreamer
76
77 // Read the first event in SeqRoot file and restore in DataStore.
78 // This is necessary to create object tables before TTree initialization
79 // if used together with TTree based output (RootOutput module).
80
81 EvtMessage* evtmsg = nullptr;
82 // Open input file
83 m_file = new SeqFile(m_inputFileName.c_str(), "r", nullptr, 0, m_fileNameIsPattern);
84 if (m_file->status() <= 0)
85 B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
86
87 B2INFO("SeqRootInput : Open " << m_inputFileName);
88
89 //Read StreamerInfo and the first event
90 int info_cnt = 0;
91 while (true) {
92 auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
93 int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
94 if (size > 0) {
95 evtmsg = new EvtMessage(evtbuf);
97 if (evtmsg->type() == MSG_STREAMERINFO) {
98 // StreamerInfo was read
99 B2INFO("Reading StreamerInfo");
100 if (info_cnt != 0) B2FATAL("SeqRootInput : Reading StreamerInfos twice");
101 info_cnt++;
102 } else {
103 // first event was read
104 delete[] evtbuf;
105 delete evtmsg;
106 break;
107 }
108 delete[] evtbuf;
109 delete evtmsg;
110
111 } else {
112 B2FATAL("SeqRootInput : Error in reading first event");
113 }
114 }
115 m_fileptr = 0;
116
117 if (m_realData) {
119 fileMetaData.registerInDataStore();
120 fileMetaData.create();
121 fileMetaData->declareRealData();
122 }
123 // make sure global tag replay is disabled and users have to specify a globaltag.
124 // We don't have input file metadata so this is all we can do.
126}
127
128
130{
131 gettimeofday(&m_t0, nullptr);
132 m_size = 0.0;
133 m_size2 = 0.0;
134 m_nevt = 0;
135 B2INFO("SeqRootInput: beginRun called.");
136}
137
138
140{
141 // on first call: first event is already loaded. This is actually called once
142 // before the first beginRun() since we are the module setting the EventInfo
143 // so don't get confused by the m_nevt=0 in beginRun()
144 if (++m_nevt == 0) return;
145
146 // Get a SeqRoot record from the file
147 auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
148 EvtMessage* evtmsg = nullptr;
149 int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
150 if (size < 0) {
151 B2ERROR("SeqRootInput : file read error");
152 delete m_file;
153 m_file = nullptr;
154 delete[] evtbuf;
155 evtbuf = nullptr;
156 return;
157 } else if (size == 0) {
158 B2INFO("SeqRootInput : EOF detected");
159 delete m_file;
160 m_file = nullptr;
161 m_fileptr++;
162 if (m_fileptr >= m_nfile) {
163 delete[] evtbuf;
164 evtbuf = nullptr;
165 return;
166 }
167 printf("fileptr = %d ( of %d )\n", m_fileptr, m_nfile);
168 fflush(stdout);
170 m_file = new SeqFile(m_inputFileName, "r");
171 if (m_file->status() <= 0)
172 B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
173 B2INFO("SeqRootInput : Open " << m_inputFileName);
174 evtmsg = new EvtMessage(evtbuf);
175 // Skip the first record (StreamerInfo)
176 int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
177 if (is <= 0) {
178 B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
179 }
180 // Read next record
182 if (is <= 0) {
183 B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
184 }
185 } else {
186 // printf("SeqRootInput : read = %d\n", size);
187 evtmsg = new EvtMessage(evtbuf);
188 }
189
190 // Statistics
191 double dsize = (double)size / 1000.0;
192 m_size += dsize;
193 m_size2 += dsize * dsize;
194
195 if (evtmsg->type() == MSG_STREAMERINFO) {
196 B2WARNING("SeqRootInput : StreamerInfo is found in the middle of *.sroot-* files. Skip record");
197 int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
198 if (is <= 0) {
199 B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
200 }
201 evtmsg = new EvtMessage(evtbuf);
202 }
203
204 // Restore objects in DataStore
206
207 // Delete buffers
208 delete[] evtbuf;
209 evtbuf = nullptr;
210 delete evtmsg;
211 evtmsg = nullptr;
212}
213
215{
216 // End time
217 gettimeofday(&m_tend, nullptr);
218 auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
219 (m_tend.tv_usec - m_t0.tv_usec));
220
221 // Statistics
222 // Sigma^2 = Sum(X^2)/n - (Sum(X)/n)^2
223
224 double flowmb = m_size / etime * 1000.0;
225 double evrate = (double)m_nevt / (etime / 1000.0);
226 double avesize = m_size / (double)m_nevt;
227 double avesize2 = m_size2 / (double)m_nevt;
228 double sigma2 = avesize2 - avesize * avesize;
229 double sigma = sqrt(sigma2);
230
231 // printf ( "m_size = %f, m_size2 = %f, m_nevt = %d\n", m_size, m_size2, m_nevt );
232 // printf ( "avesize2 = %f, avesize = %f, avesize*avesize = %f\n", avesize2, avesize, avesize*avesize );
233 B2INFO("SeqRootInput : " << m_nevt << " events read with total bytes of " << m_size << " kB");
234 B2INFO("SeqRootInput : event rate = " << evrate << " (KHz)");
235 B2INFO("SeqRootInput : flow rate = " << flowmb << " (MB/s)");
236 B2INFO("SeqRootInput : event size = " << avesize << " +- " << sigma << " (kB)");
237
238 B2INFO("SeqRootInput: endRun done.");
239}
240
241
243{
244 delete m_streamer;
245 delete m_file;
246 B2INFO("SeqRootInput: terminate called");
247}
static Configuration & getInstance()
Get a reference to the instance which will be used when the Database is initialized.
void setInputGlobaltags(const std::vector< std::string > &inputTags)
To be called by input modules with the tags to be added from input files.
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
@ c_Persistent
Object is available during entire execution time.
Definition: DataStore.h:60
const std::vector< std::string > & getInputFilesOverride() const
Return overridden input file names, or empty vector if none were set.
Definition: Environment.h:116
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
static const unsigned int c_MaxEventSize
maximal EvtMessage size, in bytes (200MB).
Definition: EvtMessage.h:63
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
int status() const
Returns status after constructor call.
Definition: SeqFile.cc:135
int read(char *buf, int max)
Read a record from a file.
Definition: SeqFile.cc:174
int m_nevt
Total nr. of events in the file.
DataStoreStreamer * m_streamer
DataStoreStreamer.
virtual void initialize() override
Module functions to be called from main process.
virtual void event() override
This method is the core of the module.
struct timeval m_tend
time at end of current run.
virtual void endRun() override
This method is called if the current run ends.
virtual void terminate() override
This method is called at the end of the event processing.
std::string m_inputFileName
File name.
bool m_fileNameIsPattern
If true the output filename will be interpreted as a boost::format pattern.
SeqFile * m_file
Blocked file handler.
virtual void beginRun() override
Module functions to be called from event process.
bool m_realData
Is the input real data?
std::vector< std::string > m_filelist
List of all file names to read.
int m_fileptr
Index of current file in m_filelist.
double m_size
total transferred data, in kB.
SeqRootInputModule()
Constructor / Destructor.
int m_nfile
Number of files to read (aka m_filelist.size())
double m_size2
sum of squares of data transferred in each event, in kB^2.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:95
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:559
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:649
double sqrt(double a)
sqrt for double
Definition: beamHelpers.h:28
Abstract base class for different kinds of events.
STL namespace.