Belle II Software  release-08-01-10
DataStoreStreamer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/DataStoreStreamer.h>
10 #include <framework/pcore/MsgHandler.h>
11 #include <framework/pcore/Mergeable.h>
12 
13 #include <framework/datastore/DataStore.h>
14 #include <framework/logging/Logger.h>
15 #include <framework/datastore/StoreObjPtr.h>
16 #include <framework/dataobjects/EventMetaData.h>
17 
18 #include <TClonesArray.h>
19 #include <TClass.h>
20 #include <TStreamerInfo.h>
21 #include <TList.h>
22 
23 #include <unistd.h>
24 #include <cstdio> // for NULL, printf
25 
26 #include <algorithm>
27 #include <queue>
28 
29 using namespace Belle2;
30 
31 namespace {
32 // Thread related
33  static DataStoreStreamer* s_streamer = nullptr;
34 
35  static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
36  static pthread_mutex_t mutex_thread[DataStoreStreamer::c_maxThreads];
37 //static char* evtbuf_thread[DataStoreStreamer::c_maxThreads];
38  static int msg_compLevel[DataStoreStreamer::c_maxThreads];
39 
40  static std::queue<char*> my_evtbuf[DataStoreStreamer::c_maxThreads];
41 
42  static std::queue<int> my_nobjs;
43  static std::queue<int> my_narrays;
44  static std::queue<std::vector<TObject*>> my_objlist;
45  static std::queue<std::vector<std::string>> my_namelist;
46 
47  static int my_decstat[DataStoreStreamer::c_maxThreads];
48 };
49 
50 void* RunDecodeEvtMessage(void* targ)
51 {
52  auto* id = (int*)targ;
53  // DataStoreStreamer::Instance().decodeEvtMessage(*id);
54  s_streamer->decodeEvtMessage(*id);
55  return nullptr;
56 }
57 
58 DataStoreStreamer::DataStoreStreamer(int complevel, bool handleMergeable, int maxthread):
59  m_compressionLevel(complevel),
60  m_handleMergeable(handleMergeable),
61  m_initStatus(0),
62  m_maxthread(maxthread),
63  m_threadin(0)
64  //, m_threadout(0)
65 {
66  if ((unsigned int)m_maxthread > c_maxThreads) {
67  B2FATAL("DataStoreStreamer : Too many threads " << m_maxthread);
69  }
71 
72  if (m_maxthread > 0) {
73  // Run decoder threads as sustainable detached threads
74  pthread_attr_t thread_attr;
75  pthread_attr_init(&thread_attr);
76  // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
77  // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
78  for (int i = 0; i < m_maxthread; i++) {
79  my_decstat[i] = 0;
80  m_pt[i] = (pthread_t)0;
81  m_id[i] = i;
82  // m_pmsghandler[i] = new MsgHandler ( m_compressionLevel );
83  mutex_thread[i] = PTHREAD_MUTEX_INITIALIZER;
84  msg_compLevel[i] = m_compressionLevel;
85  }
86  for (int i = 0; i < m_maxthread; i++) {
87  // args.evtbuf = m_evtbuf[i];
88  pthread_create(&m_pt[i], nullptr, RunDecodeEvtMessage, (void*)&m_id[i]);
89  }
90  pthread_attr_destroy(&thread_attr);
91  }
92  s_streamer = this;
93 }
94 
95 // Destructor
97 {
98  delete m_msghandler;
99 }
100 
101 void DataStoreStreamer::setStreamingObjects(const std::vector<std::string>& objlist)
102 {
103  m_streamobjnames = objlist;
104 }
105 
106 bool DataStoreStreamer::isMergeable(const TObject* object)
107 {
108  return object->InheritsFrom(Mergeable::Class());
109 }
110 
112 {
113  static_cast<Mergeable*>(object)->clear();
114 }
115 void DataStoreStreamer::mergeIntoExisting(TObject* existing, const TObject* received)
116 {
117  auto* existingObject = static_cast<Mergeable*>(existing);
118  existingObject->merge(static_cast<const Mergeable*>(received));
119 }
121 {
123  for (auto& entryPair : map) {
124  DataStore::StoreEntry& entry = entryPair.second;
125  if (isMergeable(entry.object)) {
126  static_cast<Mergeable*>(entry.object)->removeSideEffects();
127  static_cast<Mergeable*>(entry.object)->clear();
128  }
129  }
130 }
131 
132 
133 // Stream DataStore
134 EvtMessage* DataStoreStreamer::streamDataStore(bool addPersistentDurability, bool streamTransientObjects)
135 {
136  // Clear Message Handler
137  m_msghandler->clear();
138 
139  // Stream objects (for all included durabilities)
140  int narrays = 0;
141  int nobjs = 0;
143  while (true) {
144  auto& map = DataStore::Instance().getStoreEntryMap(durability);
145  for (auto &[name, entry] : map) {
146  //skip transient objects/arrays?
147  if (!streamTransientObjects and entry.dontWriteOut)
148  continue;
149 
150  //skip objects not in the list
151  if (!m_streamobjnames.empty()) {
152  auto pos = std::find(m_streamobjnames.begin(), m_streamobjnames.end(), name);
153  if (pos == m_streamobjnames.end()) continue;
154  }
155 
156  //verify that bits are unused
157  if (entry.object->TestBit(c_IsTransient)) {
158  B2FATAL("DataStoreStreamer::c_IsTransient bit is set for " << name << "!");
159  }
160  if (entry.object->TestBit(c_IsNull)) {
161  B2FATAL("DataStoreStreamer::c_IsNull bit is set for " << name << "!");
162  }
163  if (entry.object->TestBit(c_PersistentDurability)) {
164  B2FATAL("DataStoreStreamer::c_PersistentDurability bit is set for " << name << "!");
165  }
166  //verify TObject bits are serialised
167  if (entry.object->IsA()->CanIgnoreTObjectStreamer()) {
168  B2FATAL("TObject streamers disabled for " << name << "!");
169  }
170  //store some information in TObject bits to ensure consistent state even if entry.ptr is NULL
171  entry.object->SetBit(c_IsTransient, entry.dontWriteOut);
172  entry.object->SetBit(c_IsNull, (entry.ptr == nullptr));
173  entry.object->SetBit(c_PersistentDurability, (durability == DataStore::c_Persistent));
174  m_msghandler->add(entry.object, name);
175  B2DEBUG(100, "adding item " << name);
176 
177  if (entry.isArray)
178  narrays++;
179  else
180  nobjs++;
181 
182  //reset bits (are checked to be false when streaming the object)
183  entry.object->SetBit(c_IsTransient, false);
184  entry.object->SetBit(c_IsNull, false);
185  entry.object->SetBit(c_PersistentDurability, false);
186 
187  bool merge = m_handleMergeable and entry.ptr != nullptr and isMergeable(entry.object);
188  if (merge)
189  clearMergeable(entry.object);
190  }
191 
192  if (addPersistentDurability and durability == DataStore::c_Event)
193  durability = DataStore::c_Persistent;
194  else
195  break;
196  }
197 
198  // Encode EvtMessage
199  EvtMessage* msg = m_msghandler->encode_msg(MSG_EVENT);
200  (msg->header())->nObjects = nobjs;
201  (msg->header())->nArrays = narrays;
202 
203  return msg;
204 }
205 
206 // Restore DataStore
208 {
209  if (msg->type() == MSG_TERMINATE) {
210  B2INFO("Got termination message. Exitting...");
211  //msg doesn't really contain data, set EventMetaData to something equivalent
212  StoreObjPtr<EventMetaData> eventMetaData;
213  if (m_initStatus == 0 && DataStore::Instance().getInitializeActive())
214  eventMetaData.registerInDataStore();
215  eventMetaData.create();
216  eventMetaData->setEndOfData();
217  } else {
218  // Clear Message Handler
219  m_msghandler->clear();
220 
221  // List of objects to be restored
222  std::vector<TObject*> objlist;
223  std::vector<std::string> namelist;
224 
225  // Decode EvtMessage
226  m_msghandler->decode_msg(msg, objlist, namelist);
227  int nobjs = (msg->header())->nObjects;
228  int narrays = (msg->header())->nArrays;
229  if (unsigned(nobjs + narrays) != objlist.size())
230  B2WARNING("restoreDataStore(): inconsistent #objects/#arrays in header");
231 
232  // Restore objects in DataStore
233  for (int i = 0; i < nobjs + narrays; i++) {
234  TObject* obj = objlist.at(i);
235  bool array = (dynamic_cast<TClonesArray*>(obj) != nullptr);
236  if (obj != nullptr) {
237 
238  // Read and Build StreamerInfo
239  if (msg->type() == MSG_STREAMERINFO) {
240  restoreStreamerInfos(static_cast<TList*>(obj));
241  return 0;
242  }
243 
244  bool isPersistent = obj->TestBit(c_PersistentDurability);
245  DataStore::EDurability durability = isPersistent ? (DataStore::c_Persistent) : (DataStore::c_Event);
246  TClass* cl = obj->IsA();
247  if (array)
248  cl = static_cast<TClonesArray*>(obj)->GetClass();
249  if (m_initStatus == 0 && DataStore::Instance().getInitializeActive()) { //are we called by the module's initialize() function?
250  auto flags = obj->TestBit(c_IsTransient) ? DataStore::c_DontWriteOut : DataStore::c_WriteOut;
251  DataStore::Instance().registerEntry(namelist.at(i), durability, cl, array, flags);
252  }
253  DataStore::StoreEntry* entry = DataStore::Instance().getEntry(StoreAccessorBase(namelist.at(i), durability, cl, array));
254  B2ASSERT("Can not find a data store entry with the name " << namelist.at(i) << ". Did you maybe forget to register it?", entry);
255  //only restore object if it is valid for current event
256  bool ptrIsNULL = obj->TestBit(c_IsNull);
257  if (!ptrIsNULL) {
258  bool merge = m_handleMergeable and !array and entry->ptr != nullptr and isMergeable(obj);
259  if (merge) {
260  B2DEBUG(100, "Will now merge " << namelist.at(i));
261 
262  mergeIntoExisting(entry->ptr, obj);
263  delete obj;
264  } else {
265  //note: replace=true
266  DataStore::Instance().createObject(obj, true,
267  StoreAccessorBase(namelist.at(i), durability, cl, array));
268 
269  //reset bits of object in DataStore (are checked to be false when streaming the object)
270  obj->SetBit(c_IsTransient, false);
271  obj->SetBit(c_IsNull, false);
272  obj->SetBit(c_PersistentDurability, false);
273  }
274  // B2DEBUG(100, "restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " stored");
275  } else {
276  //usually entry should already be invalidated, but e.g. for CrashHandler, it might not be.
277  if (entry->ptr)
278  entry->invalidate();
279  //not stored, clean up
280  delete obj;
281  }
282  } else {
283  //DataStore always has non-NULL content (wether they're available is a different matter)
284  B2ERROR("restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " is NULL!");
285  }
286  }
287 
288  }
289  // Return with normal exit status
290  if (m_initStatus == 0) m_initStatus = 1;
291  return 0;
292 }
293 
294 // Parallel EvtMessage Destreamer implemented using thread
295 
297 {
298  // EOF case
299  if (evtbuf == nullptr) {
300  printf("queueEvtMessage : NULL evtbuf detected. \n");
301  for (int i = 0; i < m_maxthread; i++) {
302  while (my_evtbuf[i].size() >= c_maxQueueDepth) usleep(10);
303  my_evtbuf[m_threadin].push(evtbuf);
304  }
305  return 0;
306  }
307 
308  // Put the event buffer in the queue of current thread
309  for (;;) {
310  if (my_evtbuf[m_threadin].size() < c_maxQueueDepth) {
311  pthread_mutex_lock(&mutex_thread[m_threadin]);
312  my_evtbuf[m_threadin].push(evtbuf);
313  pthread_mutex_unlock(&mutex_thread[m_threadin]);
314  break;
315  }
316  usleep(20);
317  }
318 
319  // Switch to next thread
320  my_decstat[m_threadin] = 1; // Event queued for decoding
321  m_threadin++;
322  if (m_threadin >= m_maxthread) m_threadin = 0;
323  return 1;
324 }
325 
327 {
328  printf("decodeEvtMessge : started. Thread ID = %d\n", id);
329  // Clear Message Handler
330  // m_msghandler->clear();
331  // MsgHandler* msghandler = new MsgHandler(m_compressionLevel);
332  // MsgHandler* msghandler = m_pmsghandler[id];
333 
334  pthread_mutex_lock(&mutex); // Lock thread
335  MsgHandler msghandler(msg_compLevel[id]);
336  pthread_mutex_unlock(&mutex); // Unlock thread
337 
338  for (;;) {
339  // Clear message handler event by event
340  // MsgHandler msghandler(m_compressionLevel);
341  // m_pmsghandler[id]->clear();
342  // Wait for event in queue becomes ready
343  msghandler.clear();
344 
345  while (my_evtbuf[id].size() <= 0) usleep(10);
346 
347  // Pick up event buffer
348  pthread_mutex_lock(&mutex_thread[id]);
349  int nqueue = my_evtbuf[id].size();
350  if (nqueue <= 0) printf("!!!!! Nqueue = %d\n", nqueue);
351  char* evtbuf = my_evtbuf[id].front(); my_evtbuf[id].pop();
352  pthread_mutex_unlock(&mutex_thread[id]);
353 
354  // In case of EOF
355  if (evtbuf == nullptr) {
356  printf("decodeEvtMessage: NULL evtbuf detected, nq = %d\n", nqueue);
357  my_nobjs.push(-1);
358  return nullptr;
359  }
360 
361  // Construct EvtMessage
362  auto* msg = new EvtMessage(evtbuf);
363 
364  // Decode EvtMessage into Objects
365  std::vector<TObject*> objlist;
366  std::vector<std::string> namelist;
367 
368  // pthread_mutex_lock(&mutex); // Lock test
369  msghandler.decode_msg(msg, objlist, namelist);
370  // pthread_mutex_unlock(&mutex); // Unlock test
371 
372 
373  // Queue them for the registration in DataStore
374  while (my_nobjs.size() >= c_maxQueueDepth) usleep(10);
375  pthread_mutex_lock(&mutex); // Lock queueing
376  my_objlist.push(objlist);
377  my_namelist.push(namelist);
378  my_nobjs.push((msg->header())->nObjects);
379  my_narrays.push((msg->header())->nArrays);
380  pthread_mutex_unlock(&mutex); // Unlock queueing
381 
382  // Release EvtMessage
383  delete msg;
384  delete[] evtbuf;
385 
386  // Preparation for next event
387  my_decstat[id] = 0; // Ready to read next event
388 
389  }
390 
391  return nullptr;
392 }
393 
395 {
396 // Wait for the queue to become ready
397  while (my_nobjs.empty()) usleep(10);
398 
399  // Register decoded objects in DataStore
400 
401  // Pick up event on the top and remove it from the queue
402  pthread_mutex_lock(&mutex);
403  int nobjs = my_nobjs.front(); my_nobjs.pop();
404  if (nobjs == -1) {
405  printf("restoreDataStore: EOF detected. exitting with status 0\n");
406  pthread_mutex_unlock(&mutex);
407  return 0;
408  }
409  int narrays = my_narrays.front(); my_narrays.pop();
410  std::vector<TObject*> objlist = my_objlist.front(); my_objlist.pop();
411  std::vector<std::string> namelist = my_namelist.front(); my_namelist.pop();
412  pthread_mutex_unlock(&mutex);
413 
414  // Restore objects in DataStore
415  //TODO refactor, just copied & pasted right now
416  for (int i = 0; i < nobjs + narrays; i++) {
417  // printf ( "restoring object %d = %s\n", i, namelist.at(i).c_str() );
418  bool array = (dynamic_cast<TClonesArray*>(objlist.at(i)) != nullptr);
419  if (objlist.at(i) != nullptr) {
420  TObject* obj = objlist.at(i);
421  bool isPersistent = obj->TestBit(c_PersistentDurability);
422  DataStore::EDurability durability = isPersistent ? (DataStore::c_Persistent) : (DataStore::c_Event);
423  TClass* cl = obj->IsA();
424  if (array)
425  cl = static_cast<TClonesArray*>(obj)->GetClass();
426  if (m_initStatus == 0 && DataStore::Instance().getInitializeActive()) { //are we called by the module's initialize() function?
427  auto flags = obj->TestBit(c_IsTransient) ? DataStore::c_DontWriteOut : DataStore::c_WriteOut;
428  DataStore::Instance().registerEntry(namelist.at(i), durability, cl, array, flags);
429  }
430  //only restore object if it is valid for current event
431  bool ptrIsNULL = obj->TestBit(c_IsNull);
432  if (!ptrIsNULL) {
433  DataStore::Instance().createObject(obj, true,
434  StoreAccessorBase(namelist.at(i), durability, cl, array));
435  B2DEBUG(100, "restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " stored");
436 
437  //reset bits of object in DataStore (are checked to be false when streaming the object)
438  obj->SetBit(c_IsTransient, false);
439  obj->SetBit(c_IsNull, false);
440  obj->SetBit(c_PersistentDurability, false);
441  } else {
442  //not stored, clean up
443  delete obj;
444  }
445  } else {
446  //DataStore always has non-NULL content (wether they're available is a different matter)
447  B2ERROR("restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " is NULL!");
448  }
449  }
450 
451  // printf ( "Objects restored in DataStore\n" );
452 
453  return 1;
454 }
455 
457 {
458  m_maxthread = maxthread;
459 }
460 
462 {
463  return m_maxthread;
464 }
465 
467 {
468  // printf ( "Decode thread %d = %d\n", m_threadin, m_done_decode[m_threadin] );
469  return (my_decstat[m_threadin]);
470 }
471 
473 {
474  // printf ( "Decode thread %d = %d\n", m_threadin, m_done_decode[m_threadin] );
476 }
477 
478 
480 {
481  //
482  // Copy from TSocket::RecvStreamerInfos()
483  //
484 
485  // TList *list = (TList*)mess->ReadObject(TList::Class());
486  TStreamerInfo* info;
487  TObjLink* lnk = list->FirstLink();
488 
489  std::vector<std::string> class_name;
490 
491  // First call BuildCheck for regular class
492  while (lnk) {
493  info = (TStreamerInfo*)lnk->GetObject();
494 
495  int ovlap = 0;
496  for (auto& itr : class_name) {
497  if (strcmp(itr.c_str(), info->GetName()) == 0) {
498  ovlap = 1;
499  B2DEBUG(100, "Regular Class Loop : The class " << info->GetName() << " has already appeared. Skipping...");
500  break;
501  }
502  }
503 
504  // If the same class is in the object, ignore it. ( Otherwise it causes error. )
505  if (ovlap == 0) {
506  std::string temp_classname = info->GetName();
507  class_name.push_back(temp_classname);
508 
509  TObject* element = info->GetElements()->UncheckedAt(0);
510  Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
511  if (!isstl) {
512  info->BuildCheck();
513  // if (gDebug > 0)
514  B2INFO("importing TStreamerInfo: " << info->GetName() <<
515  " version = " << info->GetClassVersion());
516  }
517  }
518  lnk = lnk->Next();
519  }
520 
521 
522  class_name.clear();
523  // Then call BuildCheck for stl class
524  lnk = list->FirstLink();
525  while (lnk) {
526  info = (TStreamerInfo*)lnk->GetObject();
527 
528  int ovlap = 0;
529  for (auto& itr : class_name) {
530  if (strcmp(itr.c_str(), info->GetName()) == 0) {
531  ovlap = 1;
532  B2DEBUG(100, "STL Class Loop : The class " << info->GetName() << " has already appeared. Skipping...");
533  break;
534  }
535  }
536 
537  // If the same class is in the object, ignore it. ( Otherwise it causes error. )
538  if (ovlap == 0) {
539  std::string temp_classname = info->GetName();
540  class_name.push_back(temp_classname);
541 
542  TObject* element = info->GetElements()->UncheckedAt(0);
543  Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
544  if (isstl) {
545  info->BuildCheck();
546  // if (gDebug > 0)
547  B2INFO("STL importing TStreamerInfo: " << info->GetName() <<
548  " version = " << info->GetClassVersion());
549  }
550  }
551  lnk = lnk->Next();
552  }
553 
554  return 0;
555 }
Stream/restore DataStore objects to/from EvtMessage.
static const unsigned int c_maxQueueDepth
Ask Itoh-san.
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
int getMaxThreads()
maximum number of threads.
int restoreStreamerInfos(const TList *list)
restore StreamerInfo from data in a file
pthread_t m_pt[c_maxThreads]
thread pointer
int queueEvtMessage(char *msg)
Queue EvtMessage for destreaming.
void * decodeEvtMessage(int id)
Decode EvtMessage and store objects in temporary buffer.
int restoreDataStoreAsync()
Restore objects in DataStore from temporary buffer.
std::vector< std::string > m_streamobjnames
names of object to be streamed
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
static bool isMergeable(const TObject *object)
Is the given object of a type that can be merged?
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
int m_threadin
current thread?
@ c_PersistentDurability
Object is of persistent durability.
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
static void mergeIntoExisting(TObject *existing, const TObject *received)
Assuming both objects are mergeable, merge 'received' into 'existing'.
MsgHandler * m_msghandler
MsgHandler.
int m_decoderStatus[c_maxThreads]
thread decoder status.
int m_initStatus
first event flag.
int m_id[c_maxThreads]
thread index.
bool m_handleMergeable
Whether to handle Mergeable objects.
void setMaxThreads(int)
maximum number of threads.
void setDecoderStatus(int)
Ask Itoh-san about this.
int m_compressionLevel
Compression level in streaming.
int getDecoderStatus()
Ask Itoh-san about this.
static void clearMergeable(TObject *object)
assuming object is mergeable, clear its contents.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
DataStoreStreamer(int complevel=0, bool handleMergeable=true, int maxthread=0)
Constructor.
bool getInitializeActive() const
Are we currently initializing modules?
Definition: DataStore.h:502
@ c_WriteOut
Object/array should be saved by output modules.
Definition: DataStore.h:70
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
Definition: DataStore.h:325
EDurability
Durability types.
Definition: DataStore.h:58
@ c_Persistent
Object is available during entire execution time.
Definition: DataStore.h:60
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
StoreEntry * getEntry(const StoreAccessorBase &accessor)
Check whether an entry with the correct type is registered in the DataStore map and return it.
Definition: DataStore.cc:294
bool createObject(TObject *object, bool replace, const StoreAccessorBase &accessor)
Create a new object/array in the DataStore or add an existing one.
Definition: DataStore.cc:316
bool registerEntry(const std::string &name, EDurability durability, TClass *objClass, bool array, EStoreFlags storeFlags)
Register an entry in the DataStore map.
Definition: DataStore.cc:190
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
Abstract base class for objects that can be merged.
Definition: Mergeable.h:31
virtual void merge(const Mergeable *other)=0
Merge object 'other' into this one.
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
Definition: MsgHandler.cc:106
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67
virtual void clear()
Clear object list.
Definition: MsgHandler.cc:40
Base class for StoreObjPtr and StoreArray for easier common treatment.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
std::vector< std::vector< double > > merge(std::vector< std::vector< std::vector< double >>> toMerge)
merge { vector<double> a, vector<double> b} into {a, b}
Definition: tools.h:41
Abstract base class for different kinds of events.
Wraps a stored array/object, stored under unique (name, durability) key.
Definition: StoreEntry.h:22
TObject * ptr
The pointer to the returned object, either equal to 'object' or null, depending on wether the object ...
Definition: StoreEntry.h:51
TObject * object
The pointer to the actual object.
Definition: StoreEntry.h:48
void invalidate()
invalidate entry for next event.
Definition: StoreEntry.cc:77