9 #include <daq/storage/modules/StorageSerializer.h>
11 #include <framework/datastore/DataStore.h>
12 #include <framework/datastore/StoreObjPtr.h>
13 #include <framework/dataobjects/EventMetaData.h>
34 setDescription(
"Storage serializer module");
37 addParam(
"compressionLevel", m_compressionLevel,
"Compression Level", 0);
38 addParam(
"OutputBufferName", m_obuf_name,
"Output buffer name",
string(
""));
39 addParam(
"OutputBufferSize", m_obuf_size,
"Output buffer size", 10);
40 addParam(
"NodeID", m_nodeid,
"Node(subsystem) ID", 0);
41 B2DEBUG(100,
"StorageSerializer: Constructor done.");
45 StorageSerializerModule::~StorageSerializerModule() { }
47 void StorageSerializerModule::initialize()
49 m_msghandler =
new MsgHandler(m_compressionLevel);
51 m_expno = m_runno = -1;
52 m_count = m_count_0 = 0;
53 if (m_obuf_name.size() > 0 && m_obuf_size > 0) {
54 m_obuf.open(m_obuf_name.c_str(), m_obuf_size * 1000000);
56 B2FATAL(
"Failed to load arguments for shared buffer (" <<
57 m_obuf_name.c_str() <<
":" << m_obuf_size <<
")");
59 std::cout <<
"[DEBUG] StorageSerializer: initialized." << std::endl;
63 void StorageSerializerModule::beginRun()
65 B2INFO(
"StorageSerializer: beginRun called.");
68 int StorageSerializerModule::writeStreamerInfos()
71 for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
74 const TClass* entryClass = iter->second.objClass;
75 std::cout <<
"Recording StreamerInfo : durability " << durability
76 <<
" : Class Name " << entryClass->GetName() << std::endl;
77 TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
78 if (!minilist) minilist =
new TList();
79 minilist->Add(
reinterpret_cast<TObject*
>(vinfo));
83 m_msghandler->add(minilist,
"StreamerInfo");
84 EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
85 (msg->
header())->nObjects = 1;
86 (msg->
header())->nArrays = 0;
87 int nword = (msg->
size() - 1) / 4 + 1;
88 m_obuf.write((
int*)msg->
buffer(), nword,
false, 0,
true);
89 int size = msg->
size();
90 B2INFO(
"Wrote StreamerInfo to a file : " << size <<
"bytes");
95 B2WARNING(
"No StreamerInfo in memory");
100 void StorageSerializerModule::event()
103 unsigned int expno = evtmetadata->getExperiment();
104 unsigned int runno = evtmetadata->getRun();
105 unsigned int subno = 0;
114 if (header->runno < runno || header->expno < expno) {
116 B2INFO(
"New run detected: expno = " << expno <<
" runno = "
117 << runno <<
" subno = " << subno);
118 header->expno = expno;
119 header->runno = runno;
120 header->subno = subno;
122 hd.type = MSG_STREAMERINFO;
125 m_obuf.write((
int*)&hd, hd.nword,
false, 0,
true);
126 m_nbyte = writeStreamerInfos();
132 m_obuf.write((
int*)&hd, hd.nword,
false, 0,
true);
133 EvtMessage* msg = m_streamer->streamDataStore(DataStore::c_Event);
134 int nword = (msg->
size() - 1) / 4 + 1;
135 m_obuf.write((
int*)msg->
buffer(), nword,
false, 0,
true);
137 m_nbyte += msg->
size();
139 if (m_count < 10000 && (m_count < 10 || (m_count > 10 && m_count < 100 && m_count % 10 == 0) ||
140 (m_count > 100 && m_count < 1000 && m_count % 100 == 0) ||
141 (m_count > 1000 && m_count < 10000 && m_count % 1000 == 0))) {
142 std::cout <<
"[DEBUG] Storage count = " << m_count <<
" nword = " << nword << std::endl;
147 void StorageSerializerModule::endRun()
149 std::cout <<
"[DEBUG] StorageSerializer : endRun called" << std::endl;
153 void StorageSerializerModule::terminate()
155 std::cout <<
"[DEBUG] terminate called" << std::endl;
Stream/restore DataStore objects to/from EvtMessage.
StoreEntryMap::iterator StoreEntryIter
Iterator for a StoreEntry map.
EDurability
Durability types.
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Class to manage streamed object.
EvtHeader * header()
Get pointer to EvtHeader.
char * buffer()
Get buffer address.
int size() const
Get size of message including headers.
A class to encode/decode an EvtMessage.
Class definition for the output module of Sequential ROOT I/O.
Type-safe access to single objects in the data store.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.