Belle II Software  release-08-01-10
StorageDeserializer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/storage/modules/StorageDeserializer.h>
10 
11 #include <framework/datastore/StoreObjPtr.h>
12 #include <framework/dataobjects/EventMetaData.h>
13 
14 #include <framework/datastore/StoreArray.h>
15 #include <framework/pcore/MsgHandler.h>
16 
17 #include <rawdata/dataobjects/RawPXD.h>
18 
19 #include <iostream>
20 
21 using namespace Belle2;
22 
23 //-----------------------------------------------------------------
24 // Register the Module
25 //-----------------------------------------------------------------
26 REG_MODULE(StorageDeserializer)
27 
28 //-----------------------------------------------------------------
29 // Implementation
30 //-----------------------------------------------------------------
31 
32 //StorageDeserializerModule* StorageDeserializerModule::g_module = NULL;
33 //
34 //EvtMessage* StorageDeserializerModule::streamDataStore()
35 //{
36 // return g_module->m_streamer->streamDataStore(DataStore::c_Event);
37 //}
38 
40 {
41  setDescription("Storage deserializer module");
42 
43  addParam("CompressionLevel", m_compressionLevel, "Compression level", 0);
44  addParam("EB2", m_eb2, "Over capsuled by eb2", 1);
45  addParam("InputBufferName", m_ibuf_name, "Input buffer name", std::string(""));
46  addParam("InputBufferSize", m_ibuf_size, "Input buffer size", 100);
47  addParam("NodeName", m_nodename, "Node(subsystem) name", std::string(""));
48  addParam("NodeID", m_nodeid, "Node(subsystem) ID", 0);
49  addParam("UseShmFlag", m_shmflag, "Use shared memory to communicate with Runcontroller", 0);
50 
51  m_count = 0;
52  //g_module = this;
53  B2DEBUG(100, "StorageDeserializer: Constructor done.");
54 }
55 
56 
57 StorageDeserializerModule::~StorageDeserializerModule()
58 {
59 }
60 
62 {
63  std::cout << "StorageDeserializer: initialize() started." << std::endl;
64  if (m_ibuf_name.size() > 0 && m_ibuf_size > 0) {
65  m_ibuf.open(m_ibuf_name, m_ibuf_size * 1000000);
66  } else {
67  B2FATAL("Failed to load arguments for shared buffer (" <<
68  m_ibuf_name.c_str() << ":" << m_ibuf_size << ")");
69  }
70  if (m_shmflag > 0) {
71  if (m_nodename.size() == 0 || m_nodeid < 0) {
72  m_shmflag = 0;
73  } else {
74  m_info.open(m_nodename, m_nodeid);
75  }
76  }
77  m_handler = new MsgHandler(m_compressionLevel);
79  m_package = new DataStorePackage(m_streamer, m_eb2);
80 
81  rawpxdarray.registerInDataStore();
82  if (m_info.isAvailable()) {
83  m_info.reportReady();
84  }
85  m_count = 0;
86  while (true) {
87  m_package->setSerial(m_ibuf.read((int*)m_package->getData().getBuffer(), true, false));
88  if (m_package->restore()) {
89  if (m_info.isAvailable()) {
90  m_info.setInputNBytes(m_package->getData().getByteSize());
91  m_info.setInputCount(1);
92  }
93  break;
94  }
95  }
96  if (m_info.isAvailable()) {
97  m_info.reportReady();
98  }
99  std::cout << "StorageDeserializer: initialize() done." << std::endl;
100 }
101 
103 {
104  m_count++;
105  if (m_count == 1) return;
106  while (true) {
107  m_package->setSerial(m_ibuf.read((int*)m_package->getData().getBuffer(), true, false));
108  if (m_package->restore()) {
109  if (m_info.isAvailable()) {
110  m_info.addInputNBytes(m_package->getData().getByteSize());
111  m_info.setInputCount(m_count);
112  }
113  break;
114  }
115  }
116  StoreObjPtr<EventMetaData> evtmetadata;
117  if (evtmetadata.isValid()) {
118  if (m_expno != evtmetadata->getExperiment() ||
119  m_runno != evtmetadata->getRun()) {
120  if (m_info.isAvailable()) {
121  m_info.setInputNBytes(m_package->getData().getByteSize());
122  m_info.setInputCount(1);
123  }
124  }
125  m_expno = evtmetadata->getExperiment();
126  m_runno = evtmetadata->getRun();
127  m_evtno = evtmetadata->getEvent();
128  if (m_info.isAvailable()) {
129  m_info.setExpNumber(m_expno);
130  m_info.setRunNumber(m_runno);
131  }
132  } else {
133  B2WARNING("NO event meta data " << m_package->getData().getExpNumber() << "." <<
134  m_package->getData().getRunNumber() << "." <<
135  m_package->getData().getEventNumber() << " nword = " <<
136  m_package->getData().getWordSize());
137  B2WARNING("Last event meta data " << m_expno << "." << m_runno << "." << m_evtno);
138  }
139 }
140 
142 {
143  std::cout << "StorageDeserializer: beginRun called." << std::endl;
144 }
145 
147 {
148  std::cout << "StorageDeserializer: endRun done." << std::endl;
149 }
150 
151 
153 {
154  std::cout << "StorageDeserializer: terminate called" << std::endl;
155 }
156 
157 
Stream/restore DataStore objects to/from EvtMessage.
Base class for Modules.
Definition: Module.h:72
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
A class definition of an input module for Sequential ROOT I/O.
DataStoreStreamer * m_streamer
DataStoreStreamer.
void initialize() override
Module functions to be called from main process.
void event() override
This method is the core of the module.
void endRun() override
This method is called if the current run ends.
void terminate() override
This method is called at the end of the event processing.
void beginRun() override
Module functions to be called from event process.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
bool isValid() const
Check whether the object was created.
Definition: StoreObjPtr.h:111
REG_MODULE(arichBtest)
Register the Module.
Abstract base class for different kinds of events.