9 #include <framework/modules/rootio/SeqRootOutputModule.h>
10 #include <framework/datastore/DataStore.h>
11 #include <framework/core/Environment.h>
17 #include <TVirtualStreamerInfo.h>
34 setDescription(
"Save a sequential ROOT file (non-standard I/O format used in DAQ). See https://confluence.desy.de/display/BI/Software+PersistencyModules for further information and a comparison with the .root format.");
36 m_msghandler =
nullptr;
37 m_streamerinfo =
nullptr;
38 m_streamerinfo_size = 0;
40 vector<string> emptyvector;
42 addParam(
"outputFileName" , m_outputFileName,
43 "Output file name. Add a .gz suffix to save a gzip-compressed file. Parameter can be overridden using the -o argument to basf2.",
44 string(
"SeqRootOutput.sroot"));
45 addParam(
"compressionLevel", m_compressionLevel,
46 "Compression Level: 0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by 50%, higher levels have no noticable effect. NOTE: Because of a ROOT bug ( https://sft.its.cern.ch/jira/browse/ROOT-4550 ), this option currently causes memory leaks and is disabled.",
48 addParam(
"saveObjs", m_saveObjs,
"List of objects/arrays to be saved", emptyvector);
49 addParam(
"fileNameIsPattern", m_fileNameIsPattern,
"If true interpret the output filename as a boost::format pattern "
50 "instead of the standard where subsequent files are named .sroot-N. For example 'myfile-f%08d.sroot'",
false);
54 SeqRootOutputModule::~SeqRootOutputModule()
56 if (m_streamerinfo !=
nullptr)
delete m_streamerinfo;
59 void SeqRootOutputModule::initialize()
61 const std::string& outputFileArgument = Environment::Instance().getOutputFileOverride();
62 if (!outputFileArgument.empty())
63 m_outputFileName = outputFileArgument;
69 m_msghandler =
new MsgHandler(m_compressionLevel);
73 m_streamer->setStreamingObjects(m_saveObjs);
78 m_file =
new SeqFile(m_outputFileName.c_str(),
"w", m_streamerinfo, m_streamerinfo_size, m_fileNameIsPattern);
80 B2INFO(
"SeqRootOutput: initialized.");
84 void SeqRootOutputModule::beginRun()
88 gettimeofday(&m_t0,
nullptr);
93 B2INFO(
"SeqRootOutput: beginRun called.");
96 void SeqRootOutputModule::event()
99 EvtMessage* msg = m_streamer->streamDataStore(
false);
102 int stat = m_file->write(msg->
buffer());
108 double dsize = (double)stat / 1000.0;
110 m_size2 += dsize * dsize;
114 void SeqRootOutputModule::endRun()
119 gettimeofday(&m_tend,
nullptr);
120 auto etime = (double)((m_tend.tv_sec - m_t0.tv_sec) * 1000000 +
121 (m_tend.tv_usec - m_t0.tv_usec));
126 double flowmb = m_size / etime * 1000.0;
127 double avesize = m_size / (double)m_nevt;
128 double avesize2 = m_size2 / (double)m_nevt;
129 double sigma2 = avesize2 - avesize * avesize;
130 double sigma = sqrt(sigma2);
132 B2INFO(
"SeqRootOutput : " << m_nevt <<
" events written with total bytes of " << m_size <<
" kB");
133 B2INFO(
"SeqRootOutput : flow rate = " << flowmb <<
" (MB/s)");
134 B2INFO(
"SeqRootOutput : event size = " << avesize <<
" +- " << sigma <<
" (kB)");
136 B2INFO(
"SeqRootOutput: endRun done.");
140 void SeqRootOutputModule::terminate()
146 B2INFO(
"terminate called");
150 void SeqRootOutputModule::getStreamerInfos()
158 B2FATAL(
"DataStoreStreamer : m_msghandler is NULL.");
162 TList* minilist = nullptr ;
163 for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
166 for (
auto& iter : map) {
167 const TClass* entryClass = iter.second.objClass;
168 TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
169 B2INFO(
"Recording StreamerInfo : durability " << durability <<
" : Class Name " << entryClass->GetName());
170 if (!minilist) minilist =
new TList();
171 minilist->Add(vinfo);
178 m_msghandler->add(minilist,
"StreamerInfo");
180 EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
181 (msg->
header())->nObjects = 1;
182 (msg->
header())->nArrays = 0;
185 m_streamerinfo_size = *((
int*)(msg->
buffer()));
188 if (m_streamerinfo_size > 0) {
189 B2INFO(
"Get StreamerInfo from DataStore : " << m_streamerinfo_size <<
"bytes");
190 if (m_streamerinfo !=
nullptr) {
191 B2FATAL(
"getStreamerInfo() is called twice in the same run ");
193 m_streamerinfo =
new char[ m_streamerinfo_size ];
195 memcpy(m_streamerinfo, msg->
buffer(), m_streamerinfo_size);
197 B2FATAL(
"Invalid size of StreamerInfo : " << m_streamerinfo_size <<
"bytes");
202 B2FATAL(
"Failed to get StreamerInfo : ");
Stream/restore DataStore objects to/from EvtMessage.
EDurability
Durability types.
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Class to manage streamed object.
EvtHeader * header()
Get pointer to EvtHeader.
char * buffer()
Get buffer address.
A class to encode/decode an EvtMessage.
A class to manage I/O for a chain of blocked files.
Output module for sequential ROOT I/O.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.