11 #include <daq/storage/modules/StorageSerializer.h>
13 #include <framework/datastore/DataStore.h>
14 #include <framework/datastore/StoreObjPtr.h>
15 #include <framework/dataobjects/EventMetaData.h>
36 setDescription(
"Storage serializer module");
39 addParam(
"compressionLevel", m_compressionLevel,
"Compression Level", 0);
40 addParam(
"OutputBufferName", m_obuf_name,
"Output buffer name",
string(
""));
41 addParam(
"OutputBufferSize", m_obuf_size,
"Output buffer size", 10);
42 addParam(
"NodeID", m_nodeid,
"Node(subsystem) ID", 0);
43 B2DEBUG(100,
"StorageSerializer: Constructor done.");
47 StorageSerializerModule::~StorageSerializerModule() { }
49 void StorageSerializerModule::initialize()
51 m_msghandler =
new MsgHandler(m_compressionLevel);
53 m_expno = m_runno = -1;
54 m_count = m_count_0 = 0;
55 if (m_obuf_name.size() > 0 && m_obuf_size > 0) {
56 m_obuf.open(m_obuf_name.c_str(), m_obuf_size * 1000000);
58 B2FATAL(
"Failed to load arguments for shared buffer (" <<
59 m_obuf_name.c_str() <<
":" << m_obuf_size <<
")");
61 std::cout <<
"[DEBUG] StorageSerializer: initialized." << std::endl;
65 void StorageSerializerModule::beginRun()
67 B2INFO(
"StorageSerializer: beginRun called.");
70 int StorageSerializerModule::writeStreamerInfos()
73 for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
76 const TClass* entryClass = iter->second.objClass;
77 std::cout <<
"Recording StreamerInfo : durability " << durability
78 <<
" : Class Name " << entryClass->GetName() << std::endl;
79 TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
80 if (!minilist) minilist =
new TList();
81 minilist->Add((TObject*)vinfo);
85 m_msghandler->add(minilist,
"StreamerInfo");
86 EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
87 (msg->
header())->nObjects = 1;
88 (msg->
header())->nArrays = 0;
89 int nword = (msg->
size() - 1) / 4 + 1;
90 m_obuf.write((
int*)msg->
buffer(), nword,
false, 0,
true);
91 int size = msg->
size();
92 B2INFO(
"Wrote StreamerInfo to a file : " << size <<
"bytes");
97 B2WARNING(
"No StreamerInfo in memory");
102 void StorageSerializerModule::event()
105 unsigned int expno = evtmetadata->getExperiment();
106 unsigned int runno = evtmetadata->getRun();
107 unsigned int subno = 0;
116 if (header->runno < runno || header->expno < expno) {
118 B2INFO(
"New run detected: expno = " << expno <<
" runno = "
119 << runno <<
" subno = " << subno);
120 header->expno = expno;
121 header->runno = runno;
122 header->subno = subno;
124 hd.type = MSG_STREAMERINFO;
127 m_obuf.write((
int*)&hd, hd.nword,
false, 0,
true);
128 m_nbyte = writeStreamerInfos();
134 m_obuf.write((
int*)&hd, hd.nword,
false, 0,
true);
135 EvtMessage* msg = m_streamer->streamDataStore(DataStore::c_Event);
136 int nword = (msg->
size() - 1) / 4 + 1;
137 m_obuf.write((
int*)msg->
buffer(), nword,
false, 0,
true);
139 m_nbyte += msg->
size();
141 if (m_count < 10000 && (m_count < 10 || (m_count > 10 && m_count < 100 && m_count % 10 == 0) ||
142 (m_count > 100 && m_count < 1000 && m_count % 100 == 0) ||
143 (m_count > 1000 && m_count < 10000 && m_count % 1000 == 0))) {
144 std::cout <<
"[DEBUG] Storage count = " << m_count <<
" nword = " << nword << std::endl;
149 void StorageSerializerModule::endRun()
151 std::cout <<
"[DEBUG] StorageSerializer : endRun called" << std::endl;
155 void StorageSerializerModule::terminate()
157 std::cout <<
"[DEBUG] terminate called" << std::endl;