Belle II Software  release-05-01-25
StorageDeserializer.cc
1 //+
2 // File : StorageDeserializer.cc
3 // Description : Module to receive data from eb2rx and store online disk
4 //
5 // Author : Tomoyuki Konno, Tokyo Metropolitan University
6 // Date : 16 - Oct - 2013
7 //-
8 
9 #include <daq/storage/modules/StorageDeserializer.h>
10 
11 #include <framework/datastore/StoreObjPtr.h>
12 #include <framework/dataobjects/EventMetaData.h>
13 
14 #include <framework/datastore/StoreArray.h>
15 #include <framework/pcore/MsgHandler.h>
16 
17 #include <rawdata/dataobjects/RawPXD.h>
18 
19 #include <iostream>
20 
21 using namespace Belle2;
22 
23 //-----------------------------------------------------------------
24 // Register the Module
25 //-----------------------------------------------------------------
26 REG_MODULE(StorageDeserializer)
27 
28 //-----------------------------------------------------------------
29 // Implementation
30 //-----------------------------------------------------------------
31 
32 //StorageDeserializerModule* StorageDeserializerModule::g_module = NULL;
33 //
34 //EvtMessage* StorageDeserializerModule::streamDataStore()
35 //{
36 // return g_module->m_streamer->streamDataStore(DataStore::c_Event);
37 //}
38 
40 {
41  setDescription("Storage deserializer module");
42 
43  addParam("CompressionLevel", m_compressionLevel, "Compression level", 0);
44  addParam("EB2", m_eb2, "Over capsuled by eb2", 1);
45  addParam("InputBufferName", m_ibuf_name, "Input buffer name", std::string(""));
46  addParam("InputBufferSize", m_ibuf_size, "Input buffer size", 100);
47  addParam("NodeName", m_nodename, "Node(subsystem) name", std::string(""));
48  addParam("NodeID", m_nodeid, "Node(subsystem) ID", 0);
49  addParam("UseShmFlag", m_shmflag, "Use shared memory to communicate with Runcontroller", 0);
50 
51  m_count = 0;
52  //g_module = this;
53  B2DEBUG(100, "StorageDeserializer: Constructor done.");
54 }
55 
56 
57 StorageDeserializerModule::~StorageDeserializerModule()
58 {
59 }
60 
62 {
63  std::cout << "StorageDeserializer: initialize() started." << std::endl;
64  if (m_ibuf_name.size() > 0 && m_ibuf_size > 0) {
65  m_ibuf.open(m_ibuf_name, m_ibuf_size * 1000000);
66  } else {
67  B2FATAL("Failed to load arguments for shared buffer (" <<
68  m_ibuf_name.c_str() << ":" << m_ibuf_size << ")");
69  }
70  if (m_shmflag > 0) {
71  if (m_nodename.size() == 0 || m_nodeid < 0) {
72  m_shmflag = 0;
73  } else {
74  m_info.open(m_nodename, m_nodeid);
75  }
76  }
77  m_handler = new MsgHandler(m_compressionLevel);
79  m_package = new DataStorePackage(m_streamer, m_eb2);
80 
81  rawpxdarray.registerInDataStore();
82  if (m_info.isAvailable()) {
83  m_info.reportReady();
84  }
85  m_count = 0;
86  while (true) {
87  m_package->setSerial(m_ibuf.read((int*)m_package->getData().getBuffer(), true, false));
88  if (m_package->restore()) {
89  if (m_info.isAvailable()) {
90  m_info.setInputNBytes(m_package->getData().getByteSize());
91  m_info.setInputCount(1);
92  }
93  break;
94  }
95  }
96  if (m_info.isAvailable()) {
97  m_info.reportReady();
98  }
99  std::cout << "StorageDeserializer: initialize() done." << std::endl;
100 }
101 
103 {
104  m_count++;
105  if (m_count == 1) return;
106  while (true) {
107  m_package->setSerial(m_ibuf.read((int*)m_package->getData().getBuffer(), true, false));
108  if (m_package->restore()) {
109  if (m_info.isAvailable()) {
110  m_info.addInputNBytes(m_package->getData().getByteSize());
111  m_info.setInputCount(m_count);
112  }
113  break;
114  }
115  }
116  StoreObjPtr<EventMetaData> evtmetadata;
117  if (evtmetadata.isValid()) {
118  if (m_expno != evtmetadata->getExperiment() ||
119  m_runno != evtmetadata->getRun()) {
120  if (m_info.isAvailable()) {
121  m_info.setInputNBytes(m_package->getData().getByteSize());
122  m_info.setInputCount(1);
123  }
124  }
125  m_expno = evtmetadata->getExperiment();
126  m_runno = evtmetadata->getRun();
127  m_evtno = evtmetadata->getEvent();
128  if (m_info.isAvailable()) {
129  m_info.setExpNumber(m_expno);
130  m_info.setRunNumber(m_runno);
131  }
132  } else {
133  B2WARNING("NO event meta data " << m_package->getData().getExpNumber() << "." <<
134  m_package->getData().getRunNumber() << "." <<
135  m_package->getData().getEventNumber() << " nword = " <<
136  m_package->getData().getWordSize());
137  B2WARNING("Last event meta data " << m_expno << "." << m_runno << "." << m_evtno);
138  }
139 }
140 
142 {
143  std::cout << "StorageDeserializer: beginRun called." << std::endl;
144 }
145 
147 {
148  std::cout << "StorageDeserializer: endRun done." << std::endl;
149 }
150 
151 
153 {
154  std::cout << "StorageDeserializer: terminate called" << std::endl;
155 }
156 
157 
Belle2::StorageDeserializerModule
A class definition of an input module for Sequential ROOT I/O.
Definition: StorageDeserializer.h:32
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::StorageDeserializerModule::terminate
virtual void terminate()
This method is called at the end of the event processing.
Definition: StorageDeserializer.cc:152
Belle2::DataStorePackage
Definition: DataStorePackage.h:18
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::StoreObjPtr
Type-safe access to single objects in the data store.
Definition: ParticleList.h:33
Belle2::StorageDeserializerModule::initialize
virtual void initialize()
Module functions to be called from main process.
Definition: StorageDeserializer.cc:61
Belle2::StorageDeserializerModule::event
virtual void event()
This method is the core of the module.
Definition: StorageDeserializer.cc:102
Belle2::StorageDeserializerModule::m_streamer
DataStoreStreamer * m_streamer
DataStoreStreamer.
Definition: StorageDeserializer.h:77
Belle2::StorageDeserializerModule::beginRun
virtual void beginRun()
Module functions to be called from event process.
Definition: StorageDeserializer.cc:141
Belle2::StorageDeserializerModule::endRun
virtual void endRun()
This method is called if the current run ends.
Definition: StorageDeserializer.cc:146
Belle2::DataStoreStreamer
Stream/restore DataStore objects to/from EvtMessage.
Definition: DataStoreStreamer.h:33
Belle2::MsgHandler
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:104
Belle2::StoreObjPtr::isValid
bool isValid() const
Check whether the object was created.
Definition: StoreObjPtr.h:120