Belle II Software development
StorageSerializer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/storage/modules/StorageSerializer.h>
10
11#include <framework/datastore/DataStore.h>
12#include <framework/datastore/StoreObjPtr.h>
13#include <framework/dataobjects/EventMetaData.h>
14
15#include <iostream>
16#include <TList.h>
17#include <TClass.h>
18
19using namespace std;
20using namespace Belle2;
21
22//-----------------------------------------------------------------
23// Register the Module
24//-----------------------------------------------------------------
25REG_MODULE(StorageSerializer);
26
27//-----------------------------------------------------------------
28// Implementation
29//-----------------------------------------------------------------
30
32{
33 //Set module properties
34 setDescription("Storage serializer module");
35
36 //Parameter definition
37 addParam("compressionLevel", m_compressionLevel, "Compression Level", 0);
38 addParam("OutputBufferName", m_obuf_name, "Output buffer name", string(""));
39 addParam("OutputBufferSize", m_obuf_size, "Output buffer size", 10);
40 addParam("NodeID", m_nodeid, "Node(subsystem) ID", 0);
41 B2DEBUG(100, "StorageSerializer: Constructor done.");
42}
43
44
45StorageSerializerModule::~StorageSerializerModule() { }
46
48{
51 m_expno = m_runno = -1;
52 m_count = m_count_0 = 0;
53 if (m_obuf_name.size() > 0 && m_obuf_size > 0) {
54 m_obuf.open(m_obuf_name.c_str(), m_obuf_size * 1000000);
55 } else {
56 B2FATAL("Failed to load arguments for shared buffer (" <<
57 m_obuf_name.c_str() << ":" << m_obuf_size << ")");
58 }
59 std::cout << "[DEBUG] StorageSerializer: initialized." << std::endl;
60}
61
62
64{
65 B2INFO("StorageSerializer: beginRun called.");
66}
67
68int StorageSerializerModule::writeStreamerInfos()
69{
70 TList* minilist = 0 ;
71 for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
73 for (DataStore::StoreEntryIter iter = map.begin(); iter != map.end(); ++iter) {
74 const TClass* entryClass = iter->second.objClass;
75 std::cout << "Recording StreamerInfo : durability " << durability
76 << " : Class Name " << entryClass->GetName() << std::endl;
77 TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
78 if (!minilist) minilist = new TList();
79 minilist->Add(reinterpret_cast<TObject*>(vinfo));
80 }
81 }
82 if (minilist) {
83 m_msghandler->add(minilist, "StreamerInfo");
84 EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
85 (msg->header())->nObjects = 1; // No. of objects
86 (msg->header())->nArrays = 0; // No. of arrays
87 int nword = (msg->size() - 1) / 4 + 1;
88 m_obuf.write((int*)msg->buffer(), nword, false, 0, true);
89 int size = msg->size();
90 B2INFO("Wrote StreamerInfo to a file : " << size << "bytes");
91 delete msg;
92 delete minilist;
93 return size;
94 } else {
95 B2WARNING("No StreamerInfo in memory");
96 }
97 return 0;
98}
99
101{
102 StoreObjPtr<EventMetaData> evtmetadata;
103 unsigned int expno = evtmetadata->getExperiment();
104 unsigned int runno = evtmetadata->getRun();
105 unsigned int subno = 0;//evtmetadata->getRun() & 0xFF;
106 m_obuf.lock();
107 SharedEventBuffer::Header* header = m_obuf.getHeader();
108 struct dataheader {
109 int nword;
110 int type;
111 unsigned int expno;
112 unsigned int runno;
113 } hd;
114 if (header->runno < runno || header->expno < expno) {
115 m_count = 0;
116 B2INFO("New run detected: expno = " << expno << " runno = "
117 << runno << " subno = " << subno);
118 header->expno = expno;
119 header->runno = runno;
120 header->subno = subno;
121 hd.nword = 4;
122 hd.type = MSG_STREAMERINFO;
123 hd.expno = expno;
124 hd.runno = runno;
125 m_obuf.write((int*)&hd, hd.nword, false, 0, true);
126 m_nbyte = writeStreamerInfos();
127 }
128 hd.nword = 4;
129 hd.type = MSG_EVENT;
130 hd.expno = expno;
131 hd.runno = runno;
132 m_obuf.write((int*)&hd, hd.nword, false, 0, true);
134 int nword = (msg->size() - 1) / 4 + 1;
135 m_obuf.write((int*)msg->buffer(), nword, false, 0, true);
136 m_obuf.unlock();
137 m_nbyte += msg->size();
138 delete msg;
139 if (m_count < 10000 && (m_count < 10 || (m_count > 10 && m_count < 100 && m_count % 10 == 0) ||
140 (m_count > 100 && m_count < 1000 && m_count % 100 == 0) ||
141 (m_count > 1000 && m_count < 10000 && m_count % 1000 == 0))) {
142 std::cout << "[DEBUG] Storage count = " << m_count << " nword = " << nword << std::endl;
143 }
144 m_count++;
145}
146
148{
149 std::cout << "[DEBUG] StorageSerializer : endRun called" << std::endl;
150}
151
152
154{
155 std::cout << "[DEBUG] terminate called" << std::endl;
156}
157
Stream/restore DataStore objects to/from EvtMessage.
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
StoreEntryMap::iterator StoreEntryIter
Iterator for a StoreEntry map.
Definition: DataStore.h:88
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
Definition: DataStore.h:325
static const int c_NDurabilityTypes
Number of Durability Types.
Definition: DataStore.h:63
EDurability
Durability types.
Definition: DataStore.h:58
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67
DataStoreStreamer * m_streamer
DataStoreStreamer.
std::string m_obuf_name
Ring Buffer to dump sampled output stream.
void initialize() override
Module functions to be called from main process.
unsigned int m_count
Exp number, Run number.
StorageSerializerModule()
Constructor / Destructor.
void event() override
This method is the core of the module.
void endRun() override
This method is called if the current run ends.
void terminate() override
This method is called at the end of the event processing.
MsgHandler * m_msghandler
Messaage handler.
void beginRun() override
Module functions to be called from event process.
int m_compressionLevel
Compression level.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.