Belle II Software development
StorageDeserializer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/storage/modules/StorageDeserializer.h>
10
11#include <framework/datastore/StoreObjPtr.h>
12#include <framework/dataobjects/EventMetaData.h>
13
14#include <framework/datastore/StoreArray.h>
15#include <framework/pcore/MsgHandler.h>
16
17#include <rawdata/dataobjects/RawPXD.h>
18
19#include <iostream>
20
21using namespace Belle2;
22
23//-----------------------------------------------------------------
24// Register the Module
25//-----------------------------------------------------------------
26REG_MODULE(StorageDeserializer);
27
28//-----------------------------------------------------------------
29// Implementation
30//-----------------------------------------------------------------
31
32//StorageDeserializerModule* StorageDeserializerModule::g_module = NULL;
33//
34//EvtMessage* StorageDeserializerModule::streamDataStore()
35//{
36// return g_module->m_streamer->streamDataStore(DataStore::c_Event);
37//}
38
40{
41 setDescription("Storage deserializer module");
42
43 addParam("CompressionLevel", m_compressionLevel, "Compression level", 0);
44 addParam("EB2", m_eb2, "Over capsuled by eb2", 1);
45 addParam("InputBufferName", m_ibuf_name, "Input buffer name", std::string(""));
46 addParam("InputBufferSize", m_ibuf_size, "Input buffer size", 100);
47 addParam("NodeName", m_nodename, "Node(subsystem) name", std::string(""));
48 addParam("NodeID", m_nodeid, "Node(subsystem) ID", 0);
49 addParam("UseShmFlag", m_shmflag, "Use shared memory to communicate with Runcontroller", 0);
50
51 m_count = 0;
52 //g_module = this;
53 B2DEBUG(100, "StorageDeserializer: Constructor done.");
54}
55
56
57StorageDeserializerModule::~StorageDeserializerModule()
58{
59}
60
62{
63 std::cout << "StorageDeserializer: initialize() started." << std::endl;
64 if (m_ibuf_name.size() > 0 && m_ibuf_size > 0) {
65 m_ibuf.open(m_ibuf_name, m_ibuf_size * 1000000);
66 } else {
67 B2FATAL("Failed to load arguments for shared buffer (" <<
68 m_ibuf_name.c_str() << ":" << m_ibuf_size << ")");
69 }
70 if (m_shmflag > 0) {
71 if (m_nodename.size() == 0 || m_nodeid < 0) {
72 m_shmflag = 0;
73 } else {
74 m_info.open(m_nodename, m_nodeid);
75 }
76 }
77 m_handler = new MsgHandler(m_compressionLevel);
79 m_package = new DataStorePackage(m_streamer, m_eb2);
80
81 rawpxdarray.registerInDataStore();
82 if (m_info.isAvailable()) {
83 m_info.reportReady();
84 }
85 m_count = 0;
86 while (true) {
87 m_package->setSerial(m_ibuf.read((int*)m_package->getData().getBuffer(), true, false));
88 if (m_package->restore()) {
89 if (m_info.isAvailable()) {
90 m_info.setInputNBytes(m_package->getData().getByteSize());
91 m_info.setInputCount(1);
92 }
93 break;
94 }
95 }
96 if (m_info.isAvailable()) {
97 m_info.reportReady();
98 }
99 std::cout << "StorageDeserializer: initialize() done." << std::endl;
100}
101
103{
104 m_count++;
105 if (m_count == 1) return;
106 while (true) {
107 m_package->setSerial(m_ibuf.read((int*)m_package->getData().getBuffer(), true, false));
108 if (m_package->restore()) {
109 if (m_info.isAvailable()) {
110 m_info.addInputNBytes(m_package->getData().getByteSize());
111 m_info.setInputCount(m_count);
112 }
113 break;
114 }
115 }
116 StoreObjPtr<EventMetaData> evtmetadata;
117 if (evtmetadata.isValid()) {
118 if (m_expno != evtmetadata->getExperiment() ||
119 m_runno != evtmetadata->getRun()) {
120 if (m_info.isAvailable()) {
121 m_info.setInputNBytes(m_package->getData().getByteSize());
122 m_info.setInputCount(1);
123 }
124 }
125 m_expno = evtmetadata->getExperiment();
126 m_runno = evtmetadata->getRun();
127 m_evtno = evtmetadata->getEvent();
128 if (m_info.isAvailable()) {
129 m_info.setExpNumber(m_expno);
130 m_info.setRunNumber(m_runno);
131 }
132 } else {
133 B2WARNING("NO event meta data " << m_package->getData().getExpNumber() << "." <<
134 m_package->getData().getRunNumber() << "." <<
135 m_package->getData().getEventNumber() << " nword = " <<
136 m_package->getData().getWordSize());
137 B2WARNING("Last event meta data " << m_expno << "." << m_runno << "." << m_evtno);
138 }
139}
140
142{
143 std::cout << "StorageDeserializer: beginRun called." << std::endl;
144}
145
147{
148 std::cout << "StorageDeserializer: endRun done." << std::endl;
149}
150
151
153{
154 std::cout << "StorageDeserializer: terminate called" << std::endl;
155}
156
157
Stream/restore DataStore objects to/from EvtMessage.
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
DataStoreStreamer * m_streamer
DataStoreStreamer.
void initialize() override
Module functions to be called from main process.
void event() override
This method is the core of the module.
StorageDeserializerModule()
Constructor / Destructor.
void endRun() override
This method is called if the current run ends.
void terminate() override
This method is called at the end of the event processing.
void beginRun() override
Module functions to be called from event process.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
bool isValid() const
Check whether the object was created.
Definition: StoreObjPtr.h:111
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.