Belle II Software  release-08-01-10
StorageSerializer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/storage/modules/StorageSerializer.h>
10 
11 #include <framework/datastore/DataStore.h>
12 #include <framework/datastore/StoreObjPtr.h>
13 #include <framework/dataobjects/EventMetaData.h>
14 
15 #include <iostream>
16 #include <TList.h>
17 #include <TClass.h>
18 
19 using namespace std;
20 using namespace Belle2;
21 
22 //-----------------------------------------------------------------
23 // Register the Module
24 //-----------------------------------------------------------------
25 REG_MODULE(StorageSerializer)
26 
27 //-----------------------------------------------------------------
28 // Implementation
29 //-----------------------------------------------------------------
30 
32 {
33  //Set module properties
34  setDescription("Storage serializer module");
35 
36  //Parameter definition
37  addParam("compressionLevel", m_compressionLevel, "Compression Level", 0);
38  addParam("OutputBufferName", m_obuf_name, "Output buffer name", string(""));
39  addParam("OutputBufferSize", m_obuf_size, "Output buffer size", 10);
40  addParam("NodeID", m_nodeid, "Node(subsystem) ID", 0);
41  B2DEBUG(100, "StorageSerializer: Constructor done.");
42 }
43 
44 
45 StorageSerializerModule::~StorageSerializerModule() { }
46 
47 void StorageSerializerModule::initialize()
48 {
49  m_msghandler = new MsgHandler(m_compressionLevel);
50  m_streamer = new DataStoreStreamer(m_compressionLevel);
51  m_expno = m_runno = -1;
52  m_count = m_count_0 = 0;
53  if (m_obuf_name.size() > 0 && m_obuf_size > 0) {
54  m_obuf.open(m_obuf_name.c_str(), m_obuf_size * 1000000);
55  } else {
56  B2FATAL("Failed to load arguments for shared buffer (" <<
57  m_obuf_name.c_str() << ":" << m_obuf_size << ")");
58  }
59  std::cout << "[DEBUG] StorageSerializer: initialized." << std::endl;
60 }
61 
62 
63 void StorageSerializerModule::beginRun()
64 {
65  B2INFO("StorageSerializer: beginRun called.");
66 }
67 
68 int StorageSerializerModule::writeStreamerInfos()
69 {
70  TList* minilist = 0 ;
71  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
72  DataStore::StoreEntryMap& map = DataStore::Instance().getStoreEntryMap(DataStore::EDurability(durability));
73  for (DataStore::StoreEntryIter iter = map.begin(); iter != map.end(); ++iter) {
74  const TClass* entryClass = iter->second.objClass;
75  std::cout << "Recording StreamerInfo : durability " << durability
76  << " : Class Name " << entryClass->GetName() << std::endl;
77  TVirtualStreamerInfo* vinfo = entryClass->GetStreamerInfo();
78  if (!minilist) minilist = new TList();
79  minilist->Add(reinterpret_cast<TObject*>(vinfo));
80  }
81  }
82  if (minilist) {
83  m_msghandler->add(minilist, "StreamerInfo");
84  EvtMessage* msg = m_msghandler->encode_msg(MSG_STREAMERINFO);
85  (msg->header())->nObjects = 1; // No. of objects
86  (msg->header())->nArrays = 0; // No. of arrays
87  int nword = (msg->size() - 1) / 4 + 1;
88  m_obuf.write((int*)msg->buffer(), nword, false, 0, true);
89  int size = msg->size();
90  B2INFO("Wrote StreamerInfo to a file : " << size << "bytes");
91  delete msg;
92  delete minilist;
93  return size;
94  } else {
95  B2WARNING("No StreamerInfo in memory");
96  }
97  return 0;
98 }
99 
100 void StorageSerializerModule::event()
101 {
102  StoreObjPtr<EventMetaData> evtmetadata;
103  unsigned int expno = evtmetadata->getExperiment();
104  unsigned int runno = evtmetadata->getRun();
105  unsigned int subno = 0;//evtmetadata->getRun() & 0xFF;
106  m_obuf.lock();
107  SharedEventBuffer::Header* header = m_obuf.getHeader();
108  struct dataheader {
109  int nword;
110  int type;
111  unsigned int expno;
112  unsigned int runno;
113  } hd;
114  if (header->runno < runno || header->expno < expno) {
115  m_count = 0;
116  B2INFO("New run detected: expno = " << expno << " runno = "
117  << runno << " subno = " << subno);
118  header->expno = expno;
119  header->runno = runno;
120  header->subno = subno;
121  hd.nword = 4;
122  hd.type = MSG_STREAMERINFO;
123  hd.expno = expno;
124  hd.runno = runno;
125  m_obuf.write((int*)&hd, hd.nword, false, 0, true);
126  m_nbyte = writeStreamerInfos();
127  }
128  hd.nword = 4;
129  hd.type = MSG_EVENT;
130  hd.expno = expno;
131  hd.runno = runno;
132  m_obuf.write((int*)&hd, hd.nword, false, 0, true);
133  EvtMessage* msg = m_streamer->streamDataStore(DataStore::c_Event);
134  int nword = (msg->size() - 1) / 4 + 1;
135  m_obuf.write((int*)msg->buffer(), nword, false, 0, true);
136  m_obuf.unlock();
137  m_nbyte += msg->size();
138  delete msg;
139  if (m_count < 10000 && (m_count < 10 || (m_count > 10 && m_count < 100 && m_count % 10 == 0) ||
140  (m_count > 100 && m_count < 1000 && m_count % 100 == 0) ||
141  (m_count > 1000 && m_count < 10000 && m_count % 1000 == 0))) {
142  std::cout << "[DEBUG] Storage count = " << m_count << " nword = " << nword << std::endl;
143  }
144  m_count++;
145 }
146 
147 void StorageSerializerModule::endRun()
148 {
149  std::cout << "[DEBUG] StorageSerializer : endRun called" << std::endl;
150 }
151 
152 
153 void StorageSerializerModule::terminate()
154 {
155  std::cout << "[DEBUG] terminate called" << std::endl;
156 }
157 
Stream/restore DataStore objects to/from EvtMessage.
StoreEntryMap::iterator StoreEntryIter
Iterator for a StoreEntry map.
Definition: DataStore.h:88
EDurability
Durability types.
Definition: DataStore.h:58
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
Class definition for the output module of Sequential ROOT I/O.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.