Belle II Software light-2406-ragdoll
DataStoreStreamer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/DataStoreStreamer.h>
10#include <framework/pcore/MsgHandler.h>
11#include <framework/pcore/Mergeable.h>
12
13#include <framework/datastore/DataStore.h>
14#include <framework/logging/Logger.h>
15#include <framework/datastore/StoreObjPtr.h>
16#include <framework/dataobjects/EventMetaData.h>
17
18#include <TClonesArray.h>
19#include <TClass.h>
20#include <TStreamerInfo.h>
21#include <TList.h>
22
23#include <unistd.h>
24#include <cstdio> // for NULL, printf
25
26#include <algorithm>
27#include <queue>
28
29using namespace Belle2;
30
31namespace {
32// Thread related
33 static DataStoreStreamer* s_streamer = nullptr;
34
35 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
36 static pthread_mutex_t mutex_thread[DataStoreStreamer::c_maxThreads];
37//static char* evtbuf_thread[DataStoreStreamer::c_maxThreads];
38 static int msg_compLevel[DataStoreStreamer::c_maxThreads];
39
40 static std::queue<char*> my_evtbuf[DataStoreStreamer::c_maxThreads];
41
42 static std::queue<int> my_nobjs;
43 static std::queue<int> my_narrays;
44 static std::queue<std::vector<TObject*>> my_objlist;
45 static std::queue<std::vector<std::string>> my_namelist;
46
47 static int my_decstat[DataStoreStreamer::c_maxThreads];
48};
49
50void* RunDecodeEvtMessage(void* targ)
51{
52 auto* id = (int*)targ;
53 // DataStoreStreamer::Instance().decodeEvtMessage(*id);
54 s_streamer->decodeEvtMessage(*id);
55 return nullptr;
56}
57
58DataStoreStreamer::DataStoreStreamer(int complevel, bool handleMergeable, int maxthread):
59 m_compressionLevel(complevel),
60 m_handleMergeable(handleMergeable),
61 m_initStatus(0),
62 m_maxthread(maxthread),
63 m_threadin(0)
64 //, m_threadout(0)
65{
66 if ((unsigned int)m_maxthread > c_maxThreads) {
67 B2FATAL("DataStoreStreamer : Too many threads " << m_maxthread);
69 }
71
72 if (m_maxthread > 0) {
73 // Run decoder threads as sustainable detached threads
74 pthread_attr_t thread_attr;
75 pthread_attr_init(&thread_attr);
76 // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
77 // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
78 for (int i = 0; i < m_maxthread; i++) {
79 my_decstat[i] = 0;
80 m_pt[i] = (pthread_t)0;
81 m_id[i] = i;
82 // m_pmsghandler[i] = new MsgHandler ( m_compressionLevel );
83 mutex_thread[i] = PTHREAD_MUTEX_INITIALIZER;
84 msg_compLevel[i] = m_compressionLevel;
85 }
86 for (int i = 0; i < m_maxthread; i++) {
87 // args.evtbuf = m_evtbuf[i];
88 pthread_create(&m_pt[i], nullptr, RunDecodeEvtMessage, (void*)&m_id[i]);
89 }
90 pthread_attr_destroy(&thread_attr);
91 }
92 s_streamer = this;
93}
94
95// Destructor
97{
98 delete m_msghandler;
99}
100
101void DataStoreStreamer::setStreamingObjects(const std::vector<std::string>& objlist)
102{
103 m_streamobjnames = objlist;
104}
105
106bool DataStoreStreamer::isMergeable(const TObject* object)
107{
108 return object->InheritsFrom(Mergeable::Class());
109}
110
112{
113 static_cast<Mergeable*>(object)->clear();
114}
115void DataStoreStreamer::mergeIntoExisting(TObject* existing, const TObject* received)
116{
117 auto* existingObject = static_cast<Mergeable*>(existing);
118 existingObject->merge(static_cast<const Mergeable*>(received));
119}
121{
123 for (auto& entryPair : map) {
124 DataStore::StoreEntry& entry = entryPair.second;
125 if (isMergeable(entry.object)) {
126 static_cast<Mergeable*>(entry.object)->removeSideEffects();
127 static_cast<Mergeable*>(entry.object)->clear();
128 }
129 }
130}
131
132
133// Stream DataStore
134EvtMessage* DataStoreStreamer::streamDataStore(bool addPersistentDurability, bool streamTransientObjects)
135{
136 // Clear Message Handler
138
139 // Stream objects (for all included durabilities)
140 int narrays = 0;
141 int nobjs = 0;
143 while (true) {
144 auto& map = DataStore::Instance().getStoreEntryMap(durability);
145 for (auto &[name, entry] : map) {
146 //skip transient objects/arrays?
147 if (!streamTransientObjects and entry.dontWriteOut)
148 continue;
149
150 //skip objects not in the list
151 if (!m_streamobjnames.empty()) {
152 auto pos = std::find(m_streamobjnames.begin(), m_streamobjnames.end(), name);
153 if (pos == m_streamobjnames.end()) continue;
154 }
155
156 //verify that bits are unused
157 if (entry.object->TestBit(c_IsTransient)) {
158 B2FATAL("DataStoreStreamer::c_IsTransient bit is set for " << name << "!");
159 }
160 if (entry.object->TestBit(c_IsNull)) {
161 B2FATAL("DataStoreStreamer::c_IsNull bit is set for " << name << "!");
162 }
163 if (entry.object->TestBit(c_PersistentDurability)) {
164 B2FATAL("DataStoreStreamer::c_PersistentDurability bit is set for " << name << "!");
165 }
166 //verify TObject bits are serialised
167 if (entry.object->IsA()->CanIgnoreTObjectStreamer()) {
168 B2FATAL("TObject streamers disabled for " << name << "!");
169 }
170 //store some information in TObject bits to ensure consistent state even if entry.ptr is NULL
171 entry.object->SetBit(c_IsTransient, entry.dontWriteOut);
172 entry.object->SetBit(c_IsNull, (entry.ptr == nullptr));
173 entry.object->SetBit(c_PersistentDurability, (durability == DataStore::c_Persistent));
174 m_msghandler->add(entry.object, name);
175 B2DEBUG(100, "adding item " << name);
176
177 if (entry.isArray)
178 narrays++;
179 else
180 nobjs++;
181
182 //reset bits (are checked to be false when streaming the object)
183 entry.object->SetBit(c_IsTransient, false);
184 entry.object->SetBit(c_IsNull, false);
185 entry.object->SetBit(c_PersistentDurability, false);
186
187 bool merge = m_handleMergeable and entry.ptr != nullptr and isMergeable(entry.object);
188 if (merge)
189 clearMergeable(entry.object);
190 }
191
192 if (addPersistentDurability and durability == DataStore::c_Event)
193 durability = DataStore::c_Persistent;
194 else
195 break;
196 }
197
198 // Encode EvtMessage
199 EvtMessage* msg = m_msghandler->encode_msg(MSG_EVENT);
200 (msg->header())->nObjects = nobjs;
201 (msg->header())->nArrays = narrays;
202
203 return msg;
204}
205
206// Restore DataStore
208{
209 if (msg->type() == MSG_TERMINATE) {
210 B2INFO("Got termination message. Exitting...");
211 //msg doesn't really contain data, set EventMetaData to something equivalent
212 StoreObjPtr<EventMetaData> eventMetaData;
213 if (m_initStatus == 0 && DataStore::Instance().getInitializeActive())
214 eventMetaData.registerInDataStore();
215 eventMetaData.create();
216 eventMetaData->setEndOfData();
217 } else {
218 // Clear Message Handler
220
221 // List of objects to be restored
222 std::vector<TObject*> objlist;
223 std::vector<std::string> namelist;
224
225 // Decode EvtMessage
226 m_msghandler->decode_msg(msg, objlist, namelist);
227 int nobjs = (msg->header())->nObjects;
228 int narrays = (msg->header())->nArrays;
229 if (unsigned(nobjs + narrays) != objlist.size())
230 B2WARNING("restoreDataStore(): inconsistent #objects/#arrays in header");
231
232 // Restore objects in DataStore
233 for (int i = 0; i < nobjs + narrays; i++) {
234 TObject* obj = objlist.at(i);
235 bool array = (dynamic_cast<TClonesArray*>(obj) != nullptr);
236 if (obj != nullptr) {
237
238 // Read and Build StreamerInfo
239 if (msg->type() == MSG_STREAMERINFO) {
240 restoreStreamerInfos(static_cast<TList*>(obj));
241 return 0;
242 }
243
244 bool isPersistent = obj->TestBit(c_PersistentDurability);
245 DataStore::EDurability durability = isPersistent ? (DataStore::c_Persistent) : (DataStore::c_Event);
246 TClass* cl = obj->IsA();
247 if (array)
248 cl = static_cast<TClonesArray*>(obj)->GetClass();
249 if (m_initStatus == 0 && DataStore::Instance().getInitializeActive()) { //are we called by the module's initialize() function?
251 DataStore::Instance().registerEntry(namelist.at(i), durability, cl, array, flags);
252 }
253 DataStore::StoreEntry* entry = DataStore::Instance().getEntry(StoreAccessorBase(namelist.at(i), durability, cl, array));
254 B2ASSERT("Can not find a data store entry with the name " << namelist.at(i) << ". Did you maybe forget to register it?", entry);
255 //only restore object if it is valid for current event
256 bool ptrIsNULL = obj->TestBit(c_IsNull);
257 if (!ptrIsNULL) {
258 bool merge = m_handleMergeable and !array and entry->ptr != nullptr and isMergeable(obj);
259 if (merge) {
260 B2DEBUG(100, "Will now merge " << namelist.at(i));
261
262 mergeIntoExisting(entry->ptr, obj);
263 delete obj;
264 } else {
265 //note: replace=true
267 StoreAccessorBase(namelist.at(i), durability, cl, array));
268
269 //reset bits of object in DataStore (are checked to be false when streaming the object)
270 obj->SetBit(c_IsTransient, false);
271 obj->SetBit(c_IsNull, false);
272 obj->SetBit(c_PersistentDurability, false);
273 }
274 // B2DEBUG(100, "restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " stored");
275 } else {
276 //usually entry should already be invalidated, but e.g. for CrashHandler, it might not be.
277 if (entry->ptr)
278 entry->invalidate();
279 //not stored, clean up
280 delete obj;
281 }
282 } else {
283 //DataStore always has non-NULL content (wether they're available is a different matter)
284 B2ERROR("restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " is NULL!");
285 }
286 }
287
288 }
289 // Return with normal exit status
290 if (m_initStatus == 0) m_initStatus = 1;
291 return 0;
292}
293
294// Parallel EvtMessage Destreamer implemented using thread
295
297{
298 // EOF case
299 if (evtbuf == nullptr) {
300 printf("queueEvtMessage : NULL evtbuf detected. \n");
301 for (int i = 0; i < m_maxthread; i++) {
302 while (my_evtbuf[i].size() >= c_maxQueueDepth) usleep(10);
303 my_evtbuf[m_threadin].push(evtbuf);
304 }
305 return 0;
306 }
307
308 // Put the event buffer in the queue of current thread
309 for (;;) {
310 if (my_evtbuf[m_threadin].size() < c_maxQueueDepth) {
311 pthread_mutex_lock(&mutex_thread[m_threadin]);
312 my_evtbuf[m_threadin].push(evtbuf);
313 pthread_mutex_unlock(&mutex_thread[m_threadin]);
314 break;
315 }
316 usleep(20);
317 }
318
319 // Switch to next thread
320 my_decstat[m_threadin] = 1; // Event queued for decoding
321 m_threadin++;
323 return 1;
324}
325
327{
328 printf("decodeEvtMessge : started. Thread ID = %d\n", id);
329 // Clear Message Handler
330 // m_msghandler->clear();
331 // MsgHandler* msghandler = new MsgHandler(m_compressionLevel);
332 // MsgHandler* msghandler = m_pmsghandler[id];
333
334 pthread_mutex_lock(&mutex); // Lock thread
335 MsgHandler msghandler(msg_compLevel[id]);
336 pthread_mutex_unlock(&mutex); // Unlock thread
337
338 for (;;) {
339 // Clear message handler event by event
340 // MsgHandler msghandler(m_compressionLevel);
341 // m_pmsghandler[id]->clear();
342 // Wait for event in queue becomes ready
343 msghandler.clear();
344
345 while (my_evtbuf[id].size() <= 0) usleep(10);
346
347 // Pick up event buffer
348 pthread_mutex_lock(&mutex_thread[id]);
349 int nqueue = my_evtbuf[id].size();
350 if (nqueue <= 0) printf("!!!!! Nqueue = %d\n", nqueue);
351 char* evtbuf = my_evtbuf[id].front(); my_evtbuf[id].pop();
352 pthread_mutex_unlock(&mutex_thread[id]);
353
354 // In case of EOF
355 if (evtbuf == nullptr) {
356 printf("decodeEvtMessage: NULL evtbuf detected, nq = %d\n", nqueue);
357 my_nobjs.push(-1);
358 return nullptr;
359 }
360
361 // Construct EvtMessage
362 auto* msg = new EvtMessage(evtbuf);
363
364 // Decode EvtMessage into Objects
365 std::vector<TObject*> objlist;
366 std::vector<std::string> namelist;
367
368 // pthread_mutex_lock(&mutex); // Lock test
369 msghandler.decode_msg(msg, objlist, namelist);
370 // pthread_mutex_unlock(&mutex); // Unlock test
371
372
373 // Queue them for the registration in DataStore
374 while (my_nobjs.size() >= c_maxQueueDepth) usleep(10);
375 pthread_mutex_lock(&mutex); // Lock queueing
376 my_objlist.push(objlist);
377 my_namelist.push(namelist);
378 my_nobjs.push((msg->header())->nObjects);
379 my_narrays.push((msg->header())->nArrays);
380 pthread_mutex_unlock(&mutex); // Unlock queueing
381
382 // Release EvtMessage
383 delete msg;
384 delete[] evtbuf;
385
386 // Preparation for next event
387 my_decstat[id] = 0; // Ready to read next event
388
389 }
390
391 return nullptr;
392}
393
395{
396// Wait for the queue to become ready
397 while (my_nobjs.empty()) usleep(10);
398
399 // Register decoded objects in DataStore
400
401 // Pick up event on the top and remove it from the queue
402 pthread_mutex_lock(&mutex);
403 int nobjs = my_nobjs.front(); my_nobjs.pop();
404 if (nobjs == -1) {
405 printf("restoreDataStore: EOF detected. exitting with status 0\n");
406 pthread_mutex_unlock(&mutex);
407 return 0;
408 }
409 int narrays = my_narrays.front(); my_narrays.pop();
410 std::vector<TObject*> objlist = my_objlist.front(); my_objlist.pop();
411 std::vector<std::string> namelist = my_namelist.front(); my_namelist.pop();
412 pthread_mutex_unlock(&mutex);
413
414 // Restore objects in DataStore
415 //TODO refactor, just copied & pasted right now
416 for (int i = 0; i < nobjs + narrays; i++) {
417 // printf ( "restoring object %d = %s\n", i, namelist.at(i).c_str() );
418 bool array = (dynamic_cast<TClonesArray*>(objlist.at(i)) != nullptr);
419 if (objlist.at(i) != nullptr) {
420 TObject* obj = objlist.at(i);
421 bool isPersistent = obj->TestBit(c_PersistentDurability);
422 DataStore::EDurability durability = isPersistent ? (DataStore::c_Persistent) : (DataStore::c_Event);
423 TClass* cl = obj->IsA();
424 if (array)
425 cl = static_cast<TClonesArray*>(obj)->GetClass();
426 if (m_initStatus == 0 && DataStore::Instance().getInitializeActive()) { //are we called by the module's initialize() function?
428 DataStore::Instance().registerEntry(namelist.at(i), durability, cl, array, flags);
429 }
430 //only restore object if it is valid for current event
431 bool ptrIsNULL = obj->TestBit(c_IsNull);
432 if (!ptrIsNULL) {
434 StoreAccessorBase(namelist.at(i), durability, cl, array));
435 B2DEBUG(100, "restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " stored");
436
437 //reset bits of object in DataStore (are checked to be false when streaming the object)
438 obj->SetBit(c_IsTransient, false);
439 obj->SetBit(c_IsNull, false);
440 obj->SetBit(c_PersistentDurability, false);
441 } else {
442 //not stored, clean up
443 delete obj;
444 }
445 } else {
446 //DataStore always has non-NULL content (wether they're available is a different matter)
447 B2ERROR("restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " is NULL!");
448 }
449 }
450
451 // printf ( "Objects restored in DataStore\n" );
452
453 return 1;
454}
455
457{
458 m_maxthread = maxthread;
459}
460
462{
463 return m_maxthread;
464}
465
467{
468 // printf ( "Decode thread %d = %d\n", m_threadin, m_done_decode[m_threadin] );
469 return (my_decstat[m_threadin]);
470}
471
473{
474 // printf ( "Decode thread %d = %d\n", m_threadin, m_done_decode[m_threadin] );
476}
477
478
480{
481 //
482 // Copy from TSocket::RecvStreamerInfos()
483 //
484
485 // TList *list = (TList*)mess->ReadObject(TList::Class());
486 TStreamerInfo* info;
487 TObjLink* lnk = list->FirstLink();
488
489 std::vector<std::string> class_name;
490
491 // First call BuildCheck for regular class
492 while (lnk) {
493 info = (TStreamerInfo*)lnk->GetObject();
494
495 int ovlap = 0;
496 for (auto& itr : class_name) {
497 if (strcmp(itr.c_str(), info->GetName()) == 0) {
498 ovlap = 1;
499 B2DEBUG(100, "Regular Class Loop : The class " << info->GetName() << " has already appeared. Skipping...");
500 break;
501 }
502 }
503
504 // If the same class is in the object, ignore it. ( Otherwise it causes error. )
505 if (ovlap == 0) {
506 std::string temp_classname = info->GetName();
507 class_name.push_back(temp_classname);
508
509 TObject* element = info->GetElements()->UncheckedAt(0);
510 Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
511 if (!isstl) {
512 info->BuildCheck();
513 // if (gDebug > 0)
514 B2INFO("importing TStreamerInfo: " << info->GetName() <<
515 " version = " << info->GetClassVersion());
516 }
517 }
518 lnk = lnk->Next();
519 }
520
521
522 class_name.clear();
523 // Then call BuildCheck for stl class
524 lnk = list->FirstLink();
525 while (lnk) {
526 info = (TStreamerInfo*)lnk->GetObject();
527
528 int ovlap = 0;
529 for (auto& itr : class_name) {
530 if (strcmp(itr.c_str(), info->GetName()) == 0) {
531 ovlap = 1;
532 B2DEBUG(100, "STL Class Loop : The class " << info->GetName() << " has already appeared. Skipping...");
533 break;
534 }
535 }
536
537 // If the same class is in the object, ignore it. ( Otherwise it causes error. )
538 if (ovlap == 0) {
539 std::string temp_classname = info->GetName();
540 class_name.push_back(temp_classname);
541
542 TObject* element = info->GetElements()->UncheckedAt(0);
543 Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
544 if (isstl) {
545 info->BuildCheck();
546 // if (gDebug > 0)
547 B2INFO("STL importing TStreamerInfo: " << info->GetName() <<
548 " version = " << info->GetClassVersion());
549 }
550 }
551 lnk = lnk->Next();
552 }
553
554 return 0;
555}
Stream/restore DataStore objects to/from EvtMessage.
static const unsigned int c_maxQueueDepth
Ask Itoh-san.
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
int getMaxThreads()
maximum number of threads.
int restoreStreamerInfos(const TList *list)
restore StreamerInfo from data in a file
pthread_t m_pt[c_maxThreads]
thread pointer
int queueEvtMessage(char *msg)
Queue EvtMessage for destreaming.
void * decodeEvtMessage(int id)
Decode EvtMessage and store objects in temporary buffer.
int restoreDataStoreAsync()
Restore objects in DataStore from temporary buffer.
std::vector< std::string > m_streamobjnames
names of object to be streamed
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
static bool isMergeable(const TObject *object)
Is the given object of a type that can be merged?
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
int m_threadin
current thread?
@ c_PersistentDurability
Object is of persistent durability.
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
static void mergeIntoExisting(TObject *existing, const TObject *received)
Assuming both objects are mergeable, merge 'received' into 'existing'.
MsgHandler * m_msghandler
MsgHandler.
int m_decoderStatus[c_maxThreads]
thread decoder status.
int m_initStatus
first event flag.
int m_id[c_maxThreads]
thread index.
bool m_handleMergeable
Whether to handle Mergeable objects.
void setMaxThreads(int)
maximum number of threads.
void setDecoderStatus(int)
Ask Itoh-san about this.
int m_compressionLevel
Compression level in streaming.
int getDecoderStatus()
Ask Itoh-san about this.
static void clearMergeable(TObject *object)
assuming object is mergeable, clear its contents.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
DataStoreStreamer(int complevel=0, bool handleMergeable=true, int maxthread=0)
Constructor.
bool getInitializeActive() const
Are we currently initializing modules?
Definition: DataStore.h:502
@ c_WriteOut
Object/array should be saved by output modules.
Definition: DataStore.h:70
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
Definition: DataStore.h:325
EDurability
Durability types.
Definition: DataStore.h:58
@ c_Persistent
Object is available during entire execution time.
Definition: DataStore.h:60
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
StoreEntry * getEntry(const StoreAccessorBase &accessor)
Check whether an entry with the correct type is registered in the DataStore map and return it.
Definition: DataStore.cc:294
bool createObject(TObject *object, bool replace, const StoreAccessorBase &accessor)
Create a new object/array in the DataStore or add an existing one.
Definition: DataStore.cc:316
bool registerEntry(const std::string &name, EDurability durability, TClass *objClass, bool array, EStoreFlags storeFlags)
Register an entry in the DataStore map.
Definition: DataStore.cc:190
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
Abstract base class for objects that can be merged.
Definition: Mergeable.h:31
virtual void merge(const Mergeable *other)=0
Merge object 'other' into this one.
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
Definition: MsgHandler.cc:106
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67
virtual void clear()
Clear object list.
Definition: MsgHandler.cc:40
Base class for StoreObjPtr and StoreArray for easier common treatment.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
Abstract base class for different kinds of events.
Definition: ClusterUtils.h:24
Wraps a stored array/object, stored under unique (name, durability) key.
Definition: StoreEntry.h:22
TObject * ptr
The pointer to the returned object, either equal to 'object' or null, depending on wether the object ...
Definition: StoreEntry.h:51
TObject * object
The pointer to the actual object.
Definition: StoreEntry.h:48
void invalidate()
invalidate entry for next event.
Definition: StoreEntry.cc:77