Belle II Software development
SeqRootInputModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/modules/rootio/SeqRootInputModule.h>
10
11#include <framework/core/Environment.h>
12#include <framework/datastore/DataStore.h>
13#include <framework/datastore/StoreObjPtr.h>
14#include <framework/dataobjects/FileMetaData.h>
15#include <framework/io/RootIOUtilities.h>
16#include <framework/database/Configuration.h>
17
18#include <cmath>
19#include <cstdio>
20
21using namespace std;
22using namespace Belle2;
23
24//-----------------------------------------------------------------
25// Register the Module
26//-----------------------------------------------------------------
27REG_MODULE(SeqRootInput);
28
29//-----------------------------------------------------------------
30// Implementation
31//-----------------------------------------------------------------
32
34{
35 //Set module properties
36 setDescription("Read .sroot files produced by SeqRootOutput.");
38
39 //Parameter definition
40 addParam("inputFileName", m_inputFileName,
41 "Input file name. Can also be a gzip-compressed file (with suffix .gz). "
42 "Parameter can be overridden using the -i argument to basf2.",
43 string(""));
44 vector<string> empty;
45 addParam("inputFileNames", m_filelist, "List of input files", empty);
46 addParam("fileNameIsPattern", m_fileNameIsPattern, "If true interpret the output "
47 "filename as a boost::format pattern instead of the standard where "
48 "subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'",
49 false);
50 addParam("declareRealData", m_realData, "Declare the input to be real, not generated data", false);
51}
52
53SeqRootInputModule::~SeqRootInputModule() = default;
54
56{
57 // Specify input file(list)
58 if (!m_inputFileName.empty() && !m_filelist.empty()) {
59 B2FATAL("Cannot specify both 'inputFileName' and 'inputFileNames'");
60 }
61 const std::vector<std::string>& inputFiles = Environment::Instance().getInputFilesOverride();
62 if (!inputFiles.empty()) { // Override parameter specification
63 if (inputFiles.size() > 1) {
64 m_filelist = inputFiles;
65 }
66 m_inputFileName = inputFiles[0];
67 m_nfile = m_filelist.size();
68 } else if (m_filelist.size() > 0) {
69 m_nfile = m_filelist.size();
71 } else {
72 m_nfile = 1;
73 }
74
75 // Initialize DataStoreStreamer
77
78 // Read the first event in SeqRoot file and restore in DataStore.
79 // This is necessary to create object tables before TTree initialization
80 // if used together with TTree based output (RootOutput module).
81
82 EvtMessage* evtmsg = nullptr;
83 // Open input file
84 m_file = new SeqFile(m_inputFileName.c_str(), "r", nullptr, 0, m_fileNameIsPattern);
85 if (m_file->status() <= 0)
86 B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
87
88 B2INFO("SeqRootInput : Open " << m_inputFileName);
89
90 //Read StreamerInfo and the first event
91 int info_cnt = 0;
92 while (true) {
93 auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
94 int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
95 if (size > 0) {
96 evtmsg = new EvtMessage(evtbuf);
98 if (evtmsg->type() == MSG_STREAMERINFO) {
99 // StreamerInfo was read
100 B2INFO("Reading StreamerInfo");
101 if (info_cnt != 0) B2FATAL("SeqRootInput : Reading StreamerInfos twice");
102 info_cnt++;
103 } else {
104 // first event was read
105 delete[] evtbuf;
106 delete evtmsg;
107 break;
108 }
109 delete[] evtbuf;
110 delete evtmsg;
111
112 } else {
113 B2FATAL("SeqRootInput : Error in reading first event");
114 }
115 }
116 m_fileptr = 0;
117
118 if (m_realData) {
120 fileMetaData.registerInDataStore();
121 fileMetaData.create();
122 fileMetaData->declareRealData();
123 }
124 // make sure global tag replay is disabled and users have to specify a globaltag.
125 // We don't have input file metadata so this is all we can do.
127}
128
129
131{
132 gettimeofday(&m_t0, nullptr);
133 m_size = 0.0;
134 m_size2 = 0.0;
135 m_nevt = 0;
136 B2INFO("SeqRootInput: beginRun called.");
137}
138
139
141{
142 // on first call: first event is already loaded. This is actually called once
143 // before the first beginRun() since we are the module setting the EventInfo
144 // so don't get confused by the m_nevt=0 in beginRun()
145 if (++m_nevt == 0) return;
146
147 // Get a SeqRoot record from the file
148 auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
149 EvtMessage* evtmsg = nullptr;
150 int size = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
151 if (size < 0) {
152 B2ERROR("SeqRootInput : file read error");
153 delete m_file;
154 m_file = nullptr;
155 delete[] evtbuf;
156 evtbuf = nullptr;
157 return;
158 } else if (size == 0) {
159 B2INFO("SeqRootInput : EOF detected");
160 delete m_file;
161 m_file = nullptr;
162 m_fileptr++;
163 if (m_fileptr >= m_nfile) {
164 delete[] evtbuf;
165 evtbuf = nullptr;
166 return;
167 }
168 printf("fileptr = %d ( of %d )\n", m_fileptr, m_nfile);
169 fflush(stdout);
171 m_file = new SeqFile(m_inputFileName, "r");
172 if (m_file->status() <= 0)
173 B2FATAL("SeqRootInput : Error in opening input file : " << m_inputFileName);
174 B2INFO("SeqRootInput : Open " << m_inputFileName);
175 evtmsg = new EvtMessage(evtbuf);
176 // Skip the first record (StreamerInfo)
177 int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
178 if (is <= 0) {
179 B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
180 }
181 // Read next record
183 if (is <= 0) {
184 B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
185 }
186 } else {
187 // printf("SeqRootInput : read = %d\n", size);
188 evtmsg = new EvtMessage(evtbuf);
189 }
190
191 // Statistics
192 double dsize = (double)size / 1000.0;
193 m_size += dsize;
194 m_size2 += dsize * dsize;
195
196 if (evtmsg->type() == MSG_STREAMERINFO) {
197 B2WARNING("SeqRootInput : StreamerInfo is found in the middle of *.sroot-* files. Skip record");
198 int is = m_file->read(evtbuf, EvtMessage::c_MaxEventSize);
199 if (is <= 0) {
200 B2FATAL("SeqRootInput : Error in reading file. error code = " << is);
201 }
202 evtmsg = new EvtMessage(evtbuf);
203 }
204
205 // Restore objects in DataStore
207
208 // Delete buffers
209 delete[] evtbuf;
210 evtbuf = nullptr;
211 delete evtmsg;
212 evtmsg = nullptr;
213}
214
216{
217 // End time
218 gettimeofday(&m_tend, nullptr);
219 auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
220 (m_tend.tv_usec - m_t0.tv_usec));
221
222 // Statistics
223 // Sigma^2 = Sum(X^2)/n - (Sum(X)/n)^2
224
225 double flowmb = m_size / etime * 1000.0;
226 double evrate = (double)m_nevt / (etime / 1000.0);
227 double avesize = m_size / (double)m_nevt;
228 double avesize2 = m_size2 / (double)m_nevt;
229 double sigma2 = avesize2 - avesize * avesize;
230 double sigma = sqrt(sigma2);
231
232 // printf ( "m_size = %f, m_size2 = %f, m_nevt = %d\n", m_size, m_size2, m_nevt );
233 // printf ( "avesize2 = %f, avesize = %f, avesize*avesize = %f\n", avesize2, avesize, avesize*avesize );
234 B2INFO("SeqRootInput : " << m_nevt << " events read with total bytes of " << m_size << " kB");
235 B2INFO("SeqRootInput : event rate = " << evrate << " (KHz)");
236 B2INFO("SeqRootInput : flow rate = " << flowmb << " (MB/s)");
237 B2INFO("SeqRootInput : event size = " << avesize << " +- " << sigma << " (kB)");
238
239 B2INFO("SeqRootInput: endRun done.");
240}
241
242
244{
245 delete m_streamer;
246 delete m_file;
247 B2INFO("SeqRootInput: terminate called");
248}
static Configuration & getInstance()
Get a reference to the instance which will be used when the Database is initialized.
void setInputGlobaltags(const std::vector< std::string > &inputTags)
To be called by input modules with the tags to be added from input files.
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
@ c_Persistent
Object is available during entire execution time.
Definition: DataStore.h:60
const std::vector< std::string > & getInputFilesOverride() const
Return overriden input file names, or empty vector if none were set.
Definition: Environment.h:115
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
static const unsigned int c_MaxEventSize
maximal EvtMessage size, in bytes (200MB).
Definition: EvtMessage.h:63
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
int status() const
Returns status after constructor call.
Definition: SeqFile.cc:135
int read(char *buf, int max)
Read a record from a file.
Definition: SeqFile.cc:174
int m_nevt
Total nr. of events in the file.
DataStoreStreamer * m_streamer
DataStoreStreamer.
virtual void initialize() override
Module functions to be called from main process.
virtual void event() override
This method is the core of the module.
struct timeval m_tend
time at end of current run.
virtual void endRun() override
This method is called if the current run ends.
virtual void terminate() override
This method is called at the end of the event processing.
std::string m_inputFileName
File name.
bool m_fileNameIsPattern
If true the output filename will be interpreted as a boost::format pattern.
SeqFile * m_file
Blocked file handler.
virtual void beginRun() override
Module functions to be called from event process.
bool m_realData
Is the input real data?
std::vector< std::string > m_filelist
List of all file names to read.
int m_fileptr
Index of current file in m_filelist.
double m_size
total transferred data, in kB.
SeqRootInputModule()
Constructor / Destructor.
int m_nfile
Number of files to read (aka m_filelist.size())
double m_size2
sum of squares of data transferred in each event, in kB^2.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
double sqrt(double a)
sqrt for double
Definition: beamHelpers.h:28
Abstract base class for different kinds of events.
STL namespace.