9 #include <daq/storage/modules/StorageSerializer.h> 
   11 #include <framework/datastore/DataStore.h> 
   12 #include <framework/datastore/StoreObjPtr.h> 
   13 #include <framework/dataobjects/EventMetaData.h> 
   34   setDescription(
"Storage serializer module");
 
   37   addParam(
"compressionLevel", m_compressionLevel, 
"Compression Level", 0);
 
   38   addParam(
"OutputBufferName", m_obuf_name, 
"Output buffer name", 
string(
""));
 
   39   addParam(
"OutputBufferSize", m_obuf_size, 
"Output buffer size", 10);
 
   40   addParam(
"NodeID", m_nodeid, 
"Node(subsystem) ID", 0);
 
   41   B2DEBUG(100, 
"StorageSerializer: Constructor done.");
 
   45 StorageSerializerModule::~StorageSerializerModule() { }
 
   47 void StorageSerializerModule::initialize()
 
   49   m_msghandler = 
new MsgHandler(m_compressionLevel);
 
   51   m_expno = m_runno = -1;
 
   52   m_count = m_count_0 = 0;
 
   53   if (m_obuf_name.size() > 0 && m_obuf_size > 0) {
 
   54     m_obuf.open(m_obuf_name.c_str(), m_obuf_size * 1000000);
 
   56     B2FATAL(
"Failed to load arguments for shared buffer (" <<
 
   57             m_obuf_name.c_str() << 
":" << m_obuf_size << 
")");
 
   59   std::cout << 
"[DEBUG] StorageSerializer: initialized." << std::endl;
 
   63 void StorageSerializerModule::beginRun()
 
   65   B2INFO(
"StorageSerializer: beginRun called.");
 
   68 int StorageSerializerModule::writeStreamerInfos()
 
   71   for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
 
   74       const TClass* entryClass = iter->second.objClass;
 
   75       std::cout << 
"Recording StreamerInfo : durability " << durability
 
   76                 << 
" : Class Name " << entryClass->GetName() << std::endl;
 
   77       TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
 
   78       if (!minilist) minilist  =  
new TList();
 
   79       minilist->Add(
reinterpret_cast<TObject*
>(vinfo));
 
   83     m_msghandler->add(minilist, 
"StreamerInfo");
 
   84     EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
 
   85     (msg->
header())->nObjects = 1;       
 
   86     (msg->
header())->nArrays = 0;    
 
   87     int nword = (msg->
size() - 1) / 4 + 1;
 
   88     m_obuf.write((
int*)msg->
buffer(), nword, 
false, 0, 
true);
 
   89     int size = msg->
size();
 
   90     B2INFO(
"Wrote StreamerInfo to a file : " << size << 
"bytes");
 
   95     B2WARNING(
"No StreamerInfo in memory");
 
  100 void StorageSerializerModule::event()
 
  103   unsigned int expno = evtmetadata->getExperiment();
 
  104   unsigned int runno = evtmetadata->getRun();
 
  105   unsigned int subno = 0;
 
  114   if (header->runno < runno || header->expno < expno) {
 
  116     B2INFO(
"New run detected: expno = " << expno << 
" runno = " 
  117            << runno << 
" subno = " << subno);
 
  118     header->expno = expno;
 
  119     header->runno = runno;
 
  120     header->subno = subno;
 
  122     hd.type = MSG_STREAMERINFO;
 
  125     m_obuf.write((
int*)&hd, hd.nword, 
false, 0, 
true);
 
  126     m_nbyte = writeStreamerInfos();
 
  132   m_obuf.write((
int*)&hd, hd.nword, 
false, 0, 
true);
 
  133   EvtMessage* msg = m_streamer->streamDataStore(DataStore::c_Event);
 
  134   int nword = (msg->
size() - 1) / 4 + 1;
 
  135   m_obuf.write((
int*)msg->
buffer(), nword, 
false, 0, 
true);
 
  137   m_nbyte += msg->
size();
 
  139   if (m_count < 10000 && (m_count < 10 || (m_count > 10 && m_count < 100 && m_count % 10 == 0) ||
 
  140                           (m_count > 100 && m_count < 1000 && m_count % 100 == 0) ||
 
  141                           (m_count > 1000 && m_count < 10000 && m_count % 1000 == 0))) {
 
  142     std::cout << 
"[DEBUG] Storage count = " << m_count << 
" nword = " << nword << std::endl;
 
  147 void StorageSerializerModule::endRun()
 
  149   std::cout << 
"[DEBUG] StorageSerializer : endRun called" << std::endl;
 
  153 void StorageSerializerModule::terminate()
 
  155   std::cout << 
"[DEBUG] terminate called" << std::endl;
 
Stream/restore DataStore objects to/from EvtMessage.
StoreEntryMap::iterator StoreEntryIter
Iterator for a StoreEntry map.
EDurability
Durability types.
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Class to manage streamed object.
EvtHeader * header()
Get pointer to EvtHeader.
char * buffer()
Get buffer address.
int size() const
Get size of message including headers.
A class to encode/decode an EvtMessage.
Class definition for the output module of Sequential ROOT I/O.
Type-safe access to single objects in the data store.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.