Belle II Software  release-08-01-10
PartialSeqRootReader.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/storage/modules/PartialSeqRootReader.h>
10 
11 #include <iostream>
12 #include <cstdio>
13 #include <fcntl.h>
14 #include <unistd.h>
15 
16 using namespace Belle2;
17 
18 //-----------------------------------------------------------------
19 // Register the Module
20 //-----------------------------------------------------------------
21 REG_MODULE(PartialSeqRootReader)
22 
23 //-----------------------------------------------------------------
24 // Implementation
25 //-----------------------------------------------------------------
26 
28 {
29  m_buf = new char[1024 * 1024 * 100];//100MB
30  setDescription("Partial sroot file reader");
31 
32  addParam("FilePath", m_path, "File path ", std::string(""));
33  addParam("FileNoMin", m_filemin, "Minimum filenumber ", 0);
34  addParam("FileNoMax", m_filemax, "Maximum filenumber ", -1);
35  m_fd = -1;
36  m_fileno = -1;
37  B2INFO("PartialSeqRootReader: Constructor done.");
38 }
39 
40 
41 PartialSeqRootReaderModule::~PartialSeqRootReaderModule()
42 {
43  close(m_fd);
44  delete [] m_buf;
45 }
46 
47 int PartialSeqRootReaderModule::openFile(int fileno)
48 {
49  if (m_fd > 0) close(m_fd);
50  if (fileno > m_filemax) return -1;
51  char filename[200];
52  if (fileno > 0) {
53  sprintf(filename, "%s-%d", m_path.c_str(), fileno);
54  } else if (fileno == 0) {
55  sprintf(filename, "%s", m_path.c_str());
56  }
57  int fd = ::open(filename, O_RDONLY);
58  if (fd < 0) {
59  B2ERROR("file open error (" << strerror(errno) << "): " << filename);
60  return -1;
61  }
62  std::cout << "SeqFile: " << filename << " opened (fd=" << m_fd << ")" << std::endl;
63  B2INFO("SeqFile: " << filename << " opened (fd=" << m_fd << ")");
64  return fd;
65 }
66 
67 int PartialSeqRootReaderModule::readFile()
68 {
69  int ret = ::read(m_fd, m_buf, sizeof(int));
70  if (ret < 0) {
71  return ret;
72  } else if (ret == 0) {
73  m_fd = openFile(m_fileno++);
74  if (m_fd <= 0) return -1;
75  ret = ::read(m_fd, m_buf, sizeof(int));
76  }
77  int size = *(int*)m_buf;
78  ret = ::read(m_fd, (char*)(m_buf + sizeof(int)), size - sizeof(int));
79  if (ret <= 0) {
80  return -1;
81  }
82  return size;
83 }
84 
86 {
87  B2INFO("PartialSeqRootReader: initialize() started.");
88  m_streamer = new DataStoreStreamer();
89  m_fd = openFile(0);
90  m_fileno = m_filemin;
91  while (true) {
92  if (readFile() < 0) return;
93  EvtMessage* evtmsg = new EvtMessage(m_buf);
94  m_streamer->restoreDataStore(evtmsg);
95  if (evtmsg->type() == MSG_STREAMERINFO) {
96  B2INFO("Reading StreamerInfo");
97  delete evtmsg;
98  } else {
99  delete evtmsg;
100  break;
101  }
102  }
103  if (m_fileno > 0) {
104  m_fd = openFile(m_fileno++);
105  } else {
106  m_fileno++;
107  }
108  while (true) {
109  if (readFile() < 0) return;
110  EvtMessage* evtmsg = new EvtMessage(m_buf);
111  m_streamer->restoreDataStore(evtmsg);
112  if (evtmsg->type() == MSG_STREAMERINFO) {
113  B2INFO("Reading StreamerInfo");
114  delete evtmsg;
115  } else {
116  delete evtmsg;
117  break;
118  }
119  }
120 
121  B2INFO("PartialSeqRootReader: initialize() done.");
122 }
123 
125 {
126  while (true) {
127  if (readFile() < 0) return;
128  EvtMessage* evtmsg = new EvtMessage(m_buf);
129  m_streamer->restoreDataStore(evtmsg);
130  if (evtmsg->type() == MSG_STREAMERINFO) {
131  B2INFO("Reading StreamerInfo");
132  delete evtmsg;
133  } else {
134  delete evtmsg;
135  break;
136  }
137  }
138 }
139 
141 {
142  B2INFO("PartialSeqRootReader: beginRun called.");
143 }
144 
146 {
147  B2INFO("PartialSeqRootReader: endRun done.");
148 }
149 
150 
152 {
153  B2INFO("PartialSeqRootReader: terminate called");
154 }
155 
156 
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
Base class for Modules.
Definition: Module.h:72
A class definition of an input module for Sequential ROOT I/O.
void initialize() override
Module functions to be called from main process.
void event() override
This method is the core of the module.
void endRun() override
This method is called if the current run ends.
void terminate() override
This method is called at the end of the event processing.
void beginRun() override
Module functions to be called from event process.
REG_MODULE(arichBtest)
Register the Module.
Abstract base class for different kinds of events.