Belle II Software  release-05-02-19
StorageSerializer.cc
1 //+
2 // File : StorageSerializer.cc
3 // Description : Sequential ROOT output module for pbasf2
4 //
5 // Author : Tomoyuki Konno, Tokyo Metropolitan Univerisity
6 // Date : 13 - Aug - 2010
7 // 6 - Sep - 2012, Use of DataStoreStreamer, clean up
8 // 9 - Dec - 2013, Modification for DAQ use
9 //-
10 
11 #include <daq/storage/modules/StorageSerializer.h>
12 
13 #include <framework/datastore/DataStore.h>
14 #include <framework/datastore/StoreObjPtr.h>
15 #include <framework/dataobjects/EventMetaData.h>
16 
17 #include <iostream>
18 #include <TList.h>
19 #include <TClass.h>
20 
21 using namespace std;
22 using namespace Belle2;
23 
24 //-----------------------------------------------------------------
25 // Register the Module
26 //-----------------------------------------------------------------
27 REG_MODULE(StorageSerializer)
28 
29 //-----------------------------------------------------------------
30 // Implementation
31 //-----------------------------------------------------------------
32 
34 {
35  //Set module properties
36  setDescription("Storage serializer module");
37 
38  //Parameter definition
39  addParam("compressionLevel", m_compressionLevel, "Compression Level", 0);
40  addParam("OutputBufferName", m_obuf_name, "Output buffer name", string(""));
41  addParam("OutputBufferSize", m_obuf_size, "Output buffer size", 10);
42  addParam("NodeID", m_nodeid, "Node(subsystem) ID", 0);
43  B2DEBUG(100, "StorageSerializer: Constructor done.");
44 }
45 
46 
47 StorageSerializerModule::~StorageSerializerModule() { }
48 
49 void StorageSerializerModule::initialize()
50 {
51  m_msghandler = new MsgHandler(m_compressionLevel);
52  m_streamer = new DataStoreStreamer(m_compressionLevel);
53  m_expno = m_runno = -1;
54  m_count = m_count_0 = 0;
55  if (m_obuf_name.size() > 0 && m_obuf_size > 0) {
56  m_obuf.open(m_obuf_name.c_str(), m_obuf_size * 1000000);
57  } else {
58  B2FATAL("Failed to load arguments for shared buffer (" <<
59  m_obuf_name.c_str() << ":" << m_obuf_size << ")");
60  }
61  std::cout << "[DEBUG] StorageSerializer: initialized." << std::endl;
62 }
63 
64 
65 void StorageSerializerModule::beginRun()
66 {
67  B2INFO("StorageSerializer: beginRun called.");
68 }
69 
70 int StorageSerializerModule::writeStreamerInfos()
71 {
72  TList* minilist = 0 ;
73  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
74  DataStore::StoreEntryMap& map = DataStore::Instance().getStoreEntryMap(DataStore::EDurability(durability));
75  for (DataStore::StoreEntryIter iter = map.begin(); iter != map.end(); ++iter) {
76  const TClass* entryClass = iter->second.objClass;
77  std::cout << "Recording StreamerInfo : durability " << durability
78  << " : Class Name " << entryClass->GetName() << std::endl;
79  TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
80  if (!minilist) minilist = new TList();
81  minilist->Add((TObject*)vinfo);
82  }
83  }
84  if (minilist) {
85  m_msghandler->add(minilist, "StreamerInfo");
86  EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
87  (msg->header())->nObjects = 1; // No. of objects
88  (msg->header())->nArrays = 0; // No. of arrays
89  int nword = (msg->size() - 1) / 4 + 1;
90  m_obuf.write((int*)msg->buffer(), nword, false, 0, true);
91  int size = msg->size();
92  B2INFO("Wrote StreamerInfo to a file : " << size << "bytes");
93  delete msg;
94  delete minilist;
95  return size;
96  } else {
97  B2WARNING("No StreamerInfo in memory");
98  }
99  return 0;
100 }
101 
102 void StorageSerializerModule::event()
103 {
104  StoreObjPtr<EventMetaData> evtmetadata;
105  unsigned int expno = evtmetadata->getExperiment();
106  unsigned int runno = evtmetadata->getRun();
107  unsigned int subno = 0;//evtmetadata->getRun() & 0xFF;
108  m_obuf.lock();
109  SharedEventBuffer::Header* header = m_obuf.getHeader();
110  struct dataheader {
111  int nword;
112  int type;
113  unsigned int expno;
114  unsigned int runno;
115  } hd;
116  if (header->runno < runno || header->expno < expno) {
117  m_count = 0;
118  B2INFO("New run detected: expno = " << expno << " runno = "
119  << runno << " subno = " << subno);
120  header->expno = expno;
121  header->runno = runno;
122  header->subno = subno;
123  hd.nword = 4;
124  hd.type = MSG_STREAMERINFO;
125  hd.expno = expno;
126  hd.runno = runno;
127  m_obuf.write((int*)&hd, hd.nword, false, 0, true);
128  m_nbyte = writeStreamerInfos();
129  }
130  hd.nword = 4;
131  hd.type = MSG_EVENT;
132  hd.expno = expno;
133  hd.runno = runno;
134  m_obuf.write((int*)&hd, hd.nword, false, 0, true);
135  EvtMessage* msg = m_streamer->streamDataStore(DataStore::c_Event);
136  int nword = (msg->size() - 1) / 4 + 1;
137  m_obuf.write((int*)msg->buffer(), nword, false, 0, true);
138  m_obuf.unlock();
139  m_nbyte += msg->size();
140  delete msg;
141  if (m_count < 10000 && (m_count < 10 || (m_count > 10 && m_count < 100 && m_count % 10 == 0) ||
142  (m_count > 100 && m_count < 1000 && m_count % 100 == 0) ||
143  (m_count > 1000 && m_count < 10000 && m_count % 1000 == 0))) {
144  std::cout << "[DEBUG] Storage count = " << m_count << " nword = " << nword << std::endl;
145  }
146  m_count++;
147 }
148 
149 void StorageSerializerModule::endRun()
150 {
151  std::cout << "[DEBUG] StorageSerializer : endRun called" << std::endl;
152 }
153 
154 
155 void StorageSerializerModule::terminate()
156 {
157  std::cout << "[DEBUG] terminate called" << std::endl;
158 }
159 
Belle2::DataStore::StoreEntryIter
StoreEntryMap::iterator StoreEntryIter
Iterator for a StoreEntry map.
Definition: DataStore.h:90
Belle2::DataStore::StoreEntryMap
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:89
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::SharedEventBuffer::Header
Definition: SharedEventBuffer.h:17
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2::EvtMessage::size
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:95
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::StoreObjPtr
Type-safe access to single objects in the data store.
Definition: ParticleList.h:33
Belle2::EvtMessage::header
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:162
Belle2::StorageSerializerModule
Class definition for the output module of Sequential ROOT I/O.
Definition: StorageSerializer.h:28
Belle2::EvtMessage::buffer
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:77
Belle2::DataStoreStreamer
Stream/restore DataStore objects to/from EvtMessage.
Definition: DataStoreStreamer.h:33
Belle2::MsgHandler
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:104
Belle2::DataStore::EDurability
EDurability
Durability types.
Definition: DataStore.h:60