Belle II Software  release-06-02-00
SeqRootMergerModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/storage/modules/SeqRootMergerModule.h>
10 
11 #include <framework/core/Environment.h>
12 
13 #include <cstdlib>
14 #include <cstdio>
15 #include <fcntl.h>
16 
17 using namespace Belle2;
18 
19 //-----------------------------------------------------------------
20 // Register the Module
21 //-----------------------------------------------------------------
22 REG_MODULE(SeqRootMerger)
23 
24 //-----------------------------------------------------------------
25 // Implementation
26 //-----------------------------------------------------------------
27 
29 {
30  B2INFO("SeqRootMerger: Constructor done.");
31 }
32 
33 
34 SeqRootMergerModule::~SeqRootMergerModule()
35 {
36 
37 }
38 
40 {
41  const std::vector<std::string>& inputFiles = Environment::Instance().getInputFilesOverride();
42  for (size_t i = 0; i < inputFiles.size(); i++) {
43  m_file.push_back(new SeqFile(inputFiles[i].c_str(), "r"));
44  }
45  m_streamer = new DataStoreStreamer();
46  readFile();
47 }
48 
49 int SeqRootMergerModule::readFile()
50 {
51  B2INFO("SeqRootMerger: initialize() started.");
52 
53  char* evtbuf = new char[EvtMessage::c_MaxEventSize];
54  while (true) {
55  if (m_file.size() == 0) return 0;
56  int i = rand() % m_file.size();
57  int size = m_file[i]->read(evtbuf, EvtMessage::c_MaxEventSize);
58  if (size > 0) {
59  EvtMessage* evtmsg = new EvtMessage(evtbuf);
60  m_streamer->restoreDataStore(evtmsg);
61  if (evtmsg->type() == MSG_STREAMERINFO) {
62  B2INFO("Reading StreamerInfo");
63  delete evtmsg;
64  } else {
65  delete evtmsg;
66  break;
67  }
68  } else {
69  m_file.erase(m_file.begin() + i);
70  }
71  }
72  delete [] evtbuf;
73 
74  return m_file.size();
75 }
76 
78 {
79  readFile();
80 }
81 
83 {
84  B2INFO("SeqRootMerger: beginRun called.");
85 }
86 
88 {
89  B2INFO("SeqRootMerger: endRun done.");
90 }
91 
92 
94 {
95  B2INFO("SeqRootMerger: terminate called");
96 }
97 
98 
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
const std::vector< std::string > & getInputFilesOverride() const
Return overriden input file names, or empty vector if none were set.
Definition: Environment.h:103
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:29
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
static const unsigned int c_MaxEventSize
maximal EvtMessage size, in bytes (200MB).
Definition: EvtMessage.h:63
Base class for Modules.
Definition: Module.h:72
A class to manage I/O for a chain of blocked files.
Definition: SeqFile.h:22
A class definition of an input module for Sequential ROOT I/O.
virtual void event()
This method is the core of the module.
virtual void initialize()
Module functions to be called from main process.
virtual void beginRun()
Module functions to be called from event process.
virtual void terminate()
This method is called at the end of the event processing.
virtual void endRun()
This method is called if the current run ends.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.