Belle II Software development
StorageSerializer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/storage/modules/StorageSerializer.h>
10
11#include <framework/datastore/DataStore.h>
12#include <framework/datastore/StoreObjPtr.h>
13#include <framework/dataobjects/EventMetaData.h>
14
15#include <iostream>
16#include <TList.h>
17#include <TClass.h>
18
19using namespace std;
20using namespace Belle2;
21
22//-----------------------------------------------------------------
23// Register the Module
24//-----------------------------------------------------------------
25REG_MODULE(StorageSerializer);
26
27//-----------------------------------------------------------------
28// Implementation
29//-----------------------------------------------------------------
30
32{
33 //Set module properties
34 setDescription("Storage serializer module");
35
36 //Parameter definition
37 addParam("compressionLevel", m_compressionLevel, "Compression Level", 0);
38 addParam("OutputBufferName", m_obuf_name, "Output buffer name", string(""));
39 addParam("OutputBufferSize", m_obuf_size, "Output buffer size", 10);
40 addParam("NodeID", m_nodeid, "Node(subsystem) ID", 0);
41 B2DEBUG(100, "StorageSerializer: Constructor done.");
42}
43
44
45StorageSerializerModule::~StorageSerializerModule() { }
46
48{
51 m_expno = m_runno = -1;
52 m_count = m_count_0 = 0;
53 if (m_obuf_name.size() > 0 && m_obuf_size > 0) {
54 m_obuf.open(m_obuf_name.c_str(), m_obuf_size * 1000000);
55 } else {
56 B2FATAL("Failed to load arguments for shared buffer (" <<
57 m_obuf_name.c_str() << ":" << m_obuf_size << ")");
58 }
59 std::cout << "[DEBUG] StorageSerializer: initialized." << std::endl;
60}
61
62
64{
65 B2INFO("StorageSerializer: beginRun called.");
66}
67
68int StorageSerializerModule::writeStreamerInfos()
69{
70 TList* minilist = 0 ;
71 for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
73 for (DataStore::StoreEntryIter iter = map.begin(); iter != map.end(); ++iter) {
74 const TClass* entryClass = iter->second.objClass;
75 std::cout << "Recording StreamerInfo : durability " << durability
76 << " : Class Name " << entryClass->GetName() << std::endl;
77 TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
78 if (!minilist) minilist = new TList();
79 minilist->Add(reinterpret_cast<TObject*>(vinfo));
80 }
81 }
82 if (minilist) {
83 m_msghandler->add(minilist, "StreamerInfo");
84 EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
85 (msg->header())->nObjects = 1; // No. of objects
86 (msg->header())->nArrays = 0; // No. of arrays
87 int nword = (msg->size() - 1) / 4 + 1;
88 m_obuf.write((int*)msg->buffer(), nword, false, 0, true);
89 int size = msg->size();
90 B2INFO("Wrote StreamerInfo to a file : " << size << "bytes");
91 delete msg;
92 delete minilist;
93 return size;
94 } else {
95 B2WARNING("No StreamerInfo in memory");
96 }
97 return 0;
98}
99
101{
102 StoreObjPtr<EventMetaData> evtmetadata;
103 unsigned int expno = evtmetadata->getExperiment();
104 unsigned int runno = evtmetadata->getRun();
105 unsigned int subno = 0;//evtmetadata->getRun() & 0xFF;
106 m_obuf.lock();
107 SharedEventBuffer::Header* header = m_obuf.getHeader();
108 struct dataheader {
109 int nword;
110 int type;
111 unsigned int expno;
112 unsigned int runno;
113 } hd;
114 if (header->runno < runno || header->expno < expno) {
115 m_count = 0;
116 B2INFO("New run detected: expno = " << expno << " runno = "
117 << runno << " subno = " << subno);
118 header->expno = expno;
119 header->runno = runno;
120 header->subno = subno;
121 hd.nword = 4;
122 hd.type = MSG_STREAMERINFO;
123 hd.expno = expno;
124 hd.runno = runno;
125 m_obuf.write((int*)&hd, hd.nword, false, 0, true);
126 m_nbyte = writeStreamerInfos();
127 }
128 hd.nword = 4;
129 hd.type = MSG_EVENT;
130 hd.expno = expno;
131 hd.runno = runno;
132 m_obuf.write((int*)&hd, hd.nword, false, 0, true);
133 EvtMessage* msg = m_streamer->streamDataStore(DataStore::c_Event);
134 int nword = (msg->size() - 1) / 4 + 1;
135 m_obuf.write((int*)msg->buffer(), nword, false, 0, true);
136 m_obuf.unlock();
137 m_nbyte += msg->size();
138 delete msg;
139 if (m_count < 10000 && (m_count < 10 || (m_count > 10 && m_count < 100 && m_count % 10 == 0) ||
140 (m_count > 100 && m_count < 1000 && m_count % 100 == 0) ||
141 (m_count > 1000 && m_count < 10000 && m_count % 1000 == 0))) {
142 std::cout << "[DEBUG] Storage count = " << m_count << " nword = " << nword << std::endl;
143 }
144 m_count++;
145}
146
148{
149 std::cout << "[DEBUG] StorageSerializer : endRun called" << std::endl;
150}
151
152
154{
155 std::cout << "[DEBUG] terminate called" << std::endl;
156}
157
Stream/restore DataStore objects to/from EvtMessage.
StoreEntryMap::iterator StoreEntryIter
Iterator for a StoreEntry map.
Definition DataStore.h:88
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
Definition DataStore.h:325
static const int c_NDurabilityTypes
Number of Durability Types.
Definition DataStore.h:63
EDurability
Durability types.
Definition DataStore.h:58
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition DataStore.cc:53
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition DataStore.h:87
Class to manage streamed object.
Definition EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
char * buffer()
Get buffer address.
Definition EvtMessage.cc:76
int size() const
Get size of message including headers.
Definition EvtMessage.cc:94
void setDescription(const std::string &description)
Sets the description of the module.
Definition Module.cc:214
Module()
Constructor.
Definition Module.cc:30
A class to encode/decode an EvtMessage.
Definition MsgHandler.h:103
DataStoreStreamer * m_streamer
DataStoreStreamer.
std::string m_obuf_name
Ring Buffer to dump sampled output stream.
void initialize() override
Module functions to be called from main process.
unsigned int m_count
Exp number, Run number.
StorageSerializerModule()
Constructor / Destructor.
void event() override
This method is the core of the module.
void endRun() override
This method is called if the current run ends.
void terminate() override
This method is called at the end of the event processing.
MsgHandler * m_msghandler
Messaage handler.
void beginRun() override
Module functions to be called from event process.
int m_compressionLevel
Compression level.
Type-safe access to single objects in the data store.
Definition StoreObjPtr.h:96
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition Module.h:559
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition Module.h:649
Abstract base class for different kinds of events.
STL namespace.