Belle II Software light-2406-ragdoll
DataStoreStreamer Class Reference

Stream/restore DataStore objects to/from EvtMessage. More...

#include <DataStoreStreamer.h>

Collaboration diagram for DataStoreStreamer:

Public Member Functions

 DataStoreStreamer (int complevel=0, bool handleMergeable=true, int maxthread=0)
 Constructor.
 
 DataStoreStreamer (const DataStoreStreamer &)=delete
 No copying.
 
DataStoreStreameroperator= (const DataStoreStreamer &)=delete
 No assignment.
 
 ~DataStoreStreamer ()
 destructor
 
EvtMessagestreamDataStore (bool addPersistentDurability, bool streamTransientObjects=false)
 Store DataStore objects in EvtMessage.
 
int restoreDataStore (EvtMessage *msg)
 Restore DataStore objects from EvtMessage.
 
void setStreamingObjects (const std::vector< std::string > &list)
 Set names of objects to be streamed/destreamed.
 
int queueEvtMessage (char *msg)
 Queue EvtMessage for destreaming.
 
void * decodeEvtMessage (int id)
 Decode EvtMessage and store objects in temporary buffer.
 
int restoreDataStoreAsync ()
 Restore objects in DataStore from temporary buffer.
 
void setMaxThreads (int)
 maximum number of threads.
 
int getMaxThreads ()
 maximum number of threads.
 
void setDecoderStatus (int)
 Ask Itoh-san about this.
 
int getDecoderStatus ()
 Ask Itoh-san about this.
 

Static Public Member Functions

static bool isMergeable (const TObject *object)
 Is the given object of a type that can be merged?
 
static void clearMergeable (TObject *object)
 assuming object is mergeable, clear its contents.
 
static void mergeIntoExisting (TObject *existing, const TObject *received)
 Assuming both objects are mergeable, merge 'received' into 'existing'.
 
static void removeSideEffects ()
 call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durability).
 

Static Public Attributes

static const unsigned int c_maxThreads = 16
 global maximum number of threads (cannot set higher number).
 
static const unsigned int c_maxQueueDepth = 64
 Ask Itoh-san.
 

Private Types

enum  ETObjectBits {
  c_IsTransient = BIT(19) ,
  c_IsNull = BIT(20) ,
  c_PersistentDurability = BIT(21)
}
 bits to store in TObject. More...
 

Private Member Functions

int restoreStreamerInfos (const TList *list)
 restore StreamerInfo from data in a file
 

Private Attributes

MsgHandlerm_msghandler
 MsgHandler.
 
int m_compressionLevel
 Compression level in streaming.
 
bool m_handleMergeable
 Whether to handle Mergeable objects.
 
int m_initStatus
 first event flag.
 
std::vector< std::string > m_streamobjnames
 names of object to be streamed
 
int m_maxthread
 Max.
 
pthread_t m_pt [c_maxThreads]
 thread pointer
 
int m_id [c_maxThreads]
 thread index.
 
int m_threadin
 current thread?
 
int m_decoderStatus [c_maxThreads] {0}
 thread decoder status.
 

Detailed Description

Stream/restore DataStore objects to/from EvtMessage.

Main interface is provided by streamDataStore() and restoreDataStore(). Other functions provide more obscure features.

Definition at line 34 of file DataStoreStreamer.h.

Member Enumeration Documentation

◆ ETObjectBits

enum ETObjectBits
private

bits to store in TObject.

Bits 14-23 are available for use in derived classes, and are reused here to transmit additional information. This is really quite ugly and should be replaced with some more sane way of transmitting object-level data. All bits are checked before using them, so if they are used by other code we know what happens.

Enumerator
c_IsTransient 

The corresponding StoreEntry has flag c_DontWriteOut.

c_IsNull 

object is not valid for current event, set StoreEntry::ptr to NULL.

c_PersistentDurability 

Object is of persistent durability.

Definition at line 124 of file DataStoreStreamer.h.

124 {
125 c_IsTransient = BIT(19),
126 c_IsNull = BIT(20),
127 c_PersistentDurability = BIT(21)
128 };
@ c_PersistentDurability
Object is of persistent durability.
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.

Constructor & Destructor Documentation

◆ DataStoreStreamer()

DataStoreStreamer ( int  complevel = 0,
bool  handleMergeable = true,
int  maxthread = 0 
)
explicit

Constructor.

Parameters
complevelCompression level of streaming, 0 to disable
handleMergeableperform special handling for Mergeable objects?
maxthreadmaximal number of threads, 0 to disable

Definition at line 58 of file DataStoreStreamer.cc.

58 :
59 m_compressionLevel(complevel),
60 m_handleMergeable(handleMergeable),
61 m_initStatus(0),
62 m_maxthread(maxthread),
63 m_threadin(0)
64 //, m_threadout(0)
65{
66 if ((unsigned int)m_maxthread > c_maxThreads) {
67 B2FATAL("DataStoreStreamer : Too many threads " << m_maxthread);
69 }
71
72 if (m_maxthread > 0) {
73 // Run decoder threads as sustainable detached threads
74 pthread_attr_t thread_attr;
75 pthread_attr_init(&thread_attr);
76 // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
77 // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
78 for (int i = 0; i < m_maxthread; i++) {
79 my_decstat[i] = 0;
80 m_pt[i] = (pthread_t)0;
81 m_id[i] = i;
82 // m_pmsghandler[i] = new MsgHandler ( m_compressionLevel );
83 mutex_thread[i] = PTHREAD_MUTEX_INITIALIZER;
84 msg_compLevel[i] = m_compressionLevel;
85 }
86 for (int i = 0; i < m_maxthread; i++) {
87 // args.evtbuf = m_evtbuf[i];
88 pthread_create(&m_pt[i], nullptr, RunDecodeEvtMessage, (void*)&m_id[i]);
89 }
90 pthread_attr_destroy(&thread_attr);
91 }
92 s_streamer = this;
93}
pthread_t m_pt[c_maxThreads]
thread pointer
int m_threadin
current thread?
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
MsgHandler * m_msghandler
MsgHandler.
int m_initStatus
first event flag.
int m_id[c_maxThreads]
thread index.
bool m_handleMergeable
Whether to handle Mergeable objects.
int m_compressionLevel
Compression level in streaming.
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103

◆ ~DataStoreStreamer()

destructor

Definition at line 96 of file DataStoreStreamer.cc.

97{
98 delete m_msghandler;
99}

Member Function Documentation

◆ clearMergeable()

void clearMergeable ( TObject *  object)
static

assuming object is mergeable, clear its contents.

Use this after sending it to prevent sending the same data again in the next event.

Definition at line 111 of file DataStoreStreamer.cc.

112{
113 static_cast<Mergeable*>(object)->clear();
114}
Abstract base class for objects that can be merged.
Definition: Mergeable.h:31

◆ decodeEvtMessage()

void * decodeEvtMessage ( int  id)

Decode EvtMessage and store objects in temporary buffer.

Parameters
idThread id

Definition at line 326 of file DataStoreStreamer.cc.

327{
328 printf("decodeEvtMessge : started. Thread ID = %d\n", id);
329 // Clear Message Handler
330 // m_msghandler->clear();
331 // MsgHandler* msghandler = new MsgHandler(m_compressionLevel);
332 // MsgHandler* msghandler = m_pmsghandler[id];
333
334 pthread_mutex_lock(&mutex); // Lock thread
335 MsgHandler msghandler(msg_compLevel[id]);
336 pthread_mutex_unlock(&mutex); // Unlock thread
337
338 for (;;) {
339 // Clear message handler event by event
340 // MsgHandler msghandler(m_compressionLevel);
341 // m_pmsghandler[id]->clear();
342 // Wait for event in queue becomes ready
343 msghandler.clear();
344
345 while (my_evtbuf[id].size() <= 0) usleep(10);
346
347 // Pick up event buffer
348 pthread_mutex_lock(&mutex_thread[id]);
349 int nqueue = my_evtbuf[id].size();
350 if (nqueue <= 0) printf("!!!!! Nqueue = %d\n", nqueue);
351 char* evtbuf = my_evtbuf[id].front(); my_evtbuf[id].pop();
352 pthread_mutex_unlock(&mutex_thread[id]);
353
354 // In case of EOF
355 if (evtbuf == nullptr) {
356 printf("decodeEvtMessage: NULL evtbuf detected, nq = %d\n", nqueue);
357 my_nobjs.push(-1);
358 return nullptr;
359 }
360
361 // Construct EvtMessage
362 auto* msg = new EvtMessage(evtbuf);
363
364 // Decode EvtMessage into Objects
365 std::vector<TObject*> objlist;
366 std::vector<std::string> namelist;
367
368 // pthread_mutex_lock(&mutex); // Lock test
369 msghandler.decode_msg(msg, objlist, namelist);
370 // pthread_mutex_unlock(&mutex); // Unlock test
371
372
373 // Queue them for the registration in DataStore
374 while (my_nobjs.size() >= c_maxQueueDepth) usleep(10);
375 pthread_mutex_lock(&mutex); // Lock queueing
376 my_objlist.push(objlist);
377 my_namelist.push(namelist);
378 my_nobjs.push((msg->header())->nObjects);
379 my_narrays.push((msg->header())->nArrays);
380 pthread_mutex_unlock(&mutex); // Unlock queueing
381
382 // Release EvtMessage
383 delete msg;
384 delete[] evtbuf;
385
386 // Preparation for next event
387 my_decstat[id] = 0; // Ready to read next event
388
389 }
390
391 return nullptr;
392}
static const unsigned int c_maxQueueDepth
Ask Itoh-san.
Class to manage streamed object.
Definition: EvtMessage.h:59

◆ getDecoderStatus()

int getDecoderStatus ( )

Ask Itoh-san about this.

Definition at line 466 of file DataStoreStreamer.cc.

467{
468 // printf ( "Decode thread %d = %d\n", m_threadin, m_done_decode[m_threadin] );
469 return (my_decstat[m_threadin]);
470}

◆ getMaxThreads()

int getMaxThreads ( )

maximum number of threads.

Definition at line 461 of file DataStoreStreamer.cc.

462{
463 return m_maxthread;
464}

◆ isMergeable()

bool isMergeable ( const TObject *  object)
static

Is the given object of a type that can be merged?

Definition at line 106 of file DataStoreStreamer.cc.

107{
108 return object->InheritsFrom(Mergeable::Class());
109}

◆ mergeIntoExisting()

void mergeIntoExisting ( TObject *  existing,
const TObject *  received 
)
static

Assuming both objects are mergeable, merge 'received' into 'existing'.

Definition at line 115 of file DataStoreStreamer.cc.

116{
117 auto* existingObject = static_cast<Mergeable*>(existing);
118 existingObject->merge(static_cast<const Mergeable*>(received));
119}
virtual void merge(const Mergeable *other)=0
Merge object 'other' into this one.

◆ queueEvtMessage()

int queueEvtMessage ( char *  msg)

Queue EvtMessage for destreaming.

Parameters
msgEvent buffer to be restored.

Definition at line 296 of file DataStoreStreamer.cc.

297{
298 // EOF case
299 if (evtbuf == nullptr) {
300 printf("queueEvtMessage : NULL evtbuf detected. \n");
301 for (int i = 0; i < m_maxthread; i++) {
302 while (my_evtbuf[i].size() >= c_maxQueueDepth) usleep(10);
303 my_evtbuf[m_threadin].push(evtbuf);
304 }
305 return 0;
306 }
307
308 // Put the event buffer in the queue of current thread
309 for (;;) {
310 if (my_evtbuf[m_threadin].size() < c_maxQueueDepth) {
311 pthread_mutex_lock(&mutex_thread[m_threadin]);
312 my_evtbuf[m_threadin].push(evtbuf);
313 pthread_mutex_unlock(&mutex_thread[m_threadin]);
314 break;
315 }
316 usleep(20);
317 }
318
319 // Switch to next thread
320 my_decstat[m_threadin] = 1; // Event queued for decoding
321 m_threadin++;
323 return 1;
324}

◆ removeSideEffects()

void removeSideEffects ( )
static

call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durability).

Definition at line 120 of file DataStoreStreamer.cc.

121{
123 for (auto& entryPair : map) {
124 DataStore::StoreEntry& entry = entryPair.second;
125 if (isMergeable(entry.object)) {
126 static_cast<Mergeable*>(entry.object)->removeSideEffects();
127 static_cast<Mergeable*>(entry.object)->clear();
128 }
129 }
130}
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
static bool isMergeable(const TObject *object)
Is the given object of a type that can be merged?
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
Definition: DataStore.h:325
@ c_Persistent
Object is available during entire execution time.
Definition: DataStore.h:60
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Wraps a stored array/object, stored under unique (name, durability) key.
Definition: StoreEntry.h:22
TObject * object
The pointer to the actual object.
Definition: StoreEntry.h:48

◆ restoreDataStore()

int restoreDataStore ( EvtMessage msg)

Restore DataStore objects from EvtMessage.

Parameters
msgEvtMessage to be restored.

Definition at line 207 of file DataStoreStreamer.cc.

208{
209 if (msg->type() == MSG_TERMINATE) {
210 B2INFO("Got termination message. Exitting...");
211 //msg doesn't really contain data, set EventMetaData to something equivalent
212 StoreObjPtr<EventMetaData> eventMetaData;
213 if (m_initStatus == 0 && DataStore::Instance().getInitializeActive())
214 eventMetaData.registerInDataStore();
215 eventMetaData.create();
216 eventMetaData->setEndOfData();
217 } else {
218 // Clear Message Handler
220
221 // List of objects to be restored
222 std::vector<TObject*> objlist;
223 std::vector<std::string> namelist;
224
225 // Decode EvtMessage
226 m_msghandler->decode_msg(msg, objlist, namelist);
227 int nobjs = (msg->header())->nObjects;
228 int narrays = (msg->header())->nArrays;
229 if (unsigned(nobjs + narrays) != objlist.size())
230 B2WARNING("restoreDataStore(): inconsistent #objects/#arrays in header");
231
232 // Restore objects in DataStore
233 for (int i = 0; i < nobjs + narrays; i++) {
234 TObject* obj = objlist.at(i);
235 bool array = (dynamic_cast<TClonesArray*>(obj) != nullptr);
236 if (obj != nullptr) {
237
238 // Read and Build StreamerInfo
239 if (msg->type() == MSG_STREAMERINFO) {
240 restoreStreamerInfos(static_cast<TList*>(obj));
241 return 0;
242 }
243
244 bool isPersistent = obj->TestBit(c_PersistentDurability);
245 DataStore::EDurability durability = isPersistent ? (DataStore::c_Persistent) : (DataStore::c_Event);
246 TClass* cl = obj->IsA();
247 if (array)
248 cl = static_cast<TClonesArray*>(obj)->GetClass();
249 if (m_initStatus == 0 && DataStore::Instance().getInitializeActive()) { //are we called by the module's initialize() function?
251 DataStore::Instance().registerEntry(namelist.at(i), durability, cl, array, flags);
252 }
253 DataStore::StoreEntry* entry = DataStore::Instance().getEntry(StoreAccessorBase(namelist.at(i), durability, cl, array));
254 B2ASSERT("Can not find a data store entry with the name " << namelist.at(i) << ". Did you maybe forget to register it?", entry);
255 //only restore object if it is valid for current event
256 bool ptrIsNULL = obj->TestBit(c_IsNull);
257 if (!ptrIsNULL) {
258 bool merge = m_handleMergeable and !array and entry->ptr != nullptr and isMergeable(obj);
259 if (merge) {
260 B2DEBUG(100, "Will now merge " << namelist.at(i));
261
262 mergeIntoExisting(entry->ptr, obj);
263 delete obj;
264 } else {
265 //note: replace=true
267 StoreAccessorBase(namelist.at(i), durability, cl, array));
268
269 //reset bits of object in DataStore (are checked to be false when streaming the object)
270 obj->SetBit(c_IsTransient, false);
271 obj->SetBit(c_IsNull, false);
272 obj->SetBit(c_PersistentDurability, false);
273 }
274 // B2DEBUG(100, "restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " stored");
275 } else {
276 //usually entry should already be invalidated, but e.g. for CrashHandler, it might not be.
277 if (entry->ptr)
278 entry->invalidate();
279 //not stored, clean up
280 delete obj;
281 }
282 } else {
283 //DataStore always has non-NULL content (wether they're available is a different matter)
284 B2ERROR("restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " is NULL!");
285 }
286 }
287
288 }
289 // Return with normal exit status
290 if (m_initStatus == 0) m_initStatus = 1;
291 return 0;
292}
int restoreStreamerInfos(const TList *list)
restore StreamerInfo from data in a file
static void mergeIntoExisting(TObject *existing, const TObject *received)
Assuming both objects are mergeable, merge 'received' into 'existing'.
bool getInitializeActive() const
Are we currently initializing modules?
Definition: DataStore.h:502
@ c_WriteOut
Object/array should be saved by output modules.
Definition: DataStore.h:70
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
EDurability
Durability types.
Definition: DataStore.h:58
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
StoreEntry * getEntry(const StoreAccessorBase &accessor)
Check whether an entry with the correct type is registered in the DataStore map and return it.
Definition: DataStore.cc:294
bool createObject(TObject *object, bool replace, const StoreAccessorBase &accessor)
Create a new object/array in the DataStore or add an existing one.
Definition: DataStore.cc:316
bool registerEntry(const std::string &name, EDurability durability, TClass *objClass, bool array, EStoreFlags storeFlags)
Register an entry in the DataStore map.
Definition: DataStore.cc:190
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
Definition: MsgHandler.cc:106
virtual void clear()
Clear object list.
Definition: MsgHandler.cc:40
Base class for StoreObjPtr and StoreArray for easier common treatment.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
TObject * ptr
The pointer to the returned object, either equal to 'object' or null, depending on wether the object ...
Definition: StoreEntry.h:51
void invalidate()
invalidate entry for next event.
Definition: StoreEntry.cc:77

◆ restoreDataStoreAsync()

int restoreDataStoreAsync ( )

Restore objects in DataStore from temporary buffer.

Definition at line 394 of file DataStoreStreamer.cc.

395{
396// Wait for the queue to become ready
397 while (my_nobjs.empty()) usleep(10);
398
399 // Register decoded objects in DataStore
400
401 // Pick up event on the top and remove it from the queue
402 pthread_mutex_lock(&mutex);
403 int nobjs = my_nobjs.front(); my_nobjs.pop();
404 if (nobjs == -1) {
405 printf("restoreDataStore: EOF detected. exitting with status 0\n");
406 pthread_mutex_unlock(&mutex);
407 return 0;
408 }
409 int narrays = my_narrays.front(); my_narrays.pop();
410 std::vector<TObject*> objlist = my_objlist.front(); my_objlist.pop();
411 std::vector<std::string> namelist = my_namelist.front(); my_namelist.pop();
412 pthread_mutex_unlock(&mutex);
413
414 // Restore objects in DataStore
415 //TODO refactor, just copied & pasted right now
416 for (int i = 0; i < nobjs + narrays; i++) {
417 // printf ( "restoring object %d = %s\n", i, namelist.at(i).c_str() );
418 bool array = (dynamic_cast<TClonesArray*>(objlist.at(i)) != nullptr);
419 if (objlist.at(i) != nullptr) {
420 TObject* obj = objlist.at(i);
421 bool isPersistent = obj->TestBit(c_PersistentDurability);
422 DataStore::EDurability durability = isPersistent ? (DataStore::c_Persistent) : (DataStore::c_Event);
423 TClass* cl = obj->IsA();
424 if (array)
425 cl = static_cast<TClonesArray*>(obj)->GetClass();
426 if (m_initStatus == 0 && DataStore::Instance().getInitializeActive()) { //are we called by the module's initialize() function?
428 DataStore::Instance().registerEntry(namelist.at(i), durability, cl, array, flags);
429 }
430 //only restore object if it is valid for current event
431 bool ptrIsNULL = obj->TestBit(c_IsNull);
432 if (!ptrIsNULL) {
434 StoreAccessorBase(namelist.at(i), durability, cl, array));
435 B2DEBUG(100, "restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " stored");
436
437 //reset bits of object in DataStore (are checked to be false when streaming the object)
438 obj->SetBit(c_IsTransient, false);
439 obj->SetBit(c_IsNull, false);
440 obj->SetBit(c_PersistentDurability, false);
441 } else {
442 //not stored, clean up
443 delete obj;
444 }
445 } else {
446 //DataStore always has non-NULL content (wether they're available is a different matter)
447 B2ERROR("restoreDS: " << (array ? "Array" : "Object") << ": " << namelist.at(i) << " is NULL!");
448 }
449 }
450
451 // printf ( "Objects restored in DataStore\n" );
452
453 return 1;
454}

◆ restoreStreamerInfos()

int restoreStreamerInfos ( const TList *  list)
private

restore StreamerInfo from data in a file

Definition at line 479 of file DataStoreStreamer.cc.

480{
481 //
482 // Copy from TSocket::RecvStreamerInfos()
483 //
484
485 // TList *list = (TList*)mess->ReadObject(TList::Class());
486 TStreamerInfo* info;
487 TObjLink* lnk = list->FirstLink();
488
489 std::vector<std::string> class_name;
490
491 // First call BuildCheck for regular class
492 while (lnk) {
493 info = (TStreamerInfo*)lnk->GetObject();
494
495 int ovlap = 0;
496 for (auto& itr : class_name) {
497 if (strcmp(itr.c_str(), info->GetName()) == 0) {
498 ovlap = 1;
499 B2DEBUG(100, "Regular Class Loop : The class " << info->GetName() << " has already appeared. Skipping...");
500 break;
501 }
502 }
503
504 // If the same class is in the object, ignore it. ( Otherwise it causes error. )
505 if (ovlap == 0) {
506 std::string temp_classname = info->GetName();
507 class_name.push_back(temp_classname);
508
509 TObject* element = info->GetElements()->UncheckedAt(0);
510 Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
511 if (!isstl) {
512 info->BuildCheck();
513 // if (gDebug > 0)
514 B2INFO("importing TStreamerInfo: " << info->GetName() <<
515 " version = " << info->GetClassVersion());
516 }
517 }
518 lnk = lnk->Next();
519 }
520
521
522 class_name.clear();
523 // Then call BuildCheck for stl class
524 lnk = list->FirstLink();
525 while (lnk) {
526 info = (TStreamerInfo*)lnk->GetObject();
527
528 int ovlap = 0;
529 for (auto& itr : class_name) {
530 if (strcmp(itr.c_str(), info->GetName()) == 0) {
531 ovlap = 1;
532 B2DEBUG(100, "STL Class Loop : The class " << info->GetName() << " has already appeared. Skipping...");
533 break;
534 }
535 }
536
537 // If the same class is in the object, ignore it. ( Otherwise it causes error. )
538 if (ovlap == 0) {
539 std::string temp_classname = info->GetName();
540 class_name.push_back(temp_classname);
541
542 TObject* element = info->GetElements()->UncheckedAt(0);
543 Bool_t isstl = element && strcmp("This", element->GetName()) == 0;
544 if (isstl) {
545 info->BuildCheck();
546 // if (gDebug > 0)
547 B2INFO("STL importing TStreamerInfo: " << info->GetName() <<
548 " version = " << info->GetClassVersion());
549 }
550 }
551 lnk = lnk->Next();
552 }
553
554 return 0;
555}

◆ setDecoderStatus()

void setDecoderStatus ( int  val)

Ask Itoh-san about this.

Definition at line 472 of file DataStoreStreamer.cc.

473{
474 // printf ( "Decode thread %d = %d\n", m_threadin, m_done_decode[m_threadin] );
476}
int m_decoderStatus[c_maxThreads]
thread decoder status.

◆ setMaxThreads()

void setMaxThreads ( int  maxthread)

maximum number of threads.

Definition at line 456 of file DataStoreStreamer.cc.

457{
458 m_maxthread = maxthread;
459}

◆ setStreamingObjects()

void setStreamingObjects ( const std::vector< std::string > &  list)

Set names of objects to be streamed/destreamed.

Definition at line 101 of file DataStoreStreamer.cc.

102{
103 m_streamobjnames = objlist;
104}
std::vector< std::string > m_streamobjnames
names of object to be streamed

◆ streamDataStore()

EvtMessage * streamDataStore ( bool  addPersistentDurability,
bool  streamTransientObjects = false 
)

Store DataStore objects in EvtMessage.

Parameters
addPersistentDurabilityBy default, only c_Event data is streamed. Setting this to true will add c_Persistent data to the EvtMessage.
streamTransientObjectsShould objects/arrays registered as transient be streamed?
Returns
pointer to EvtMessage, caller is responsible for deletion

Definition at line 134 of file DataStoreStreamer.cc.

135{
136 // Clear Message Handler
138
139 // Stream objects (for all included durabilities)
140 int narrays = 0;
141 int nobjs = 0;
143 while (true) {
144 auto& map = DataStore::Instance().getStoreEntryMap(durability);
145 for (auto &[name, entry] : map) {
146 //skip transient objects/arrays?
147 if (!streamTransientObjects and entry.dontWriteOut)
148 continue;
149
150 //skip objects not in the list
151 if (!m_streamobjnames.empty()) {
152 auto pos = std::find(m_streamobjnames.begin(), m_streamobjnames.end(), name);
153 if (pos == m_streamobjnames.end()) continue;
154 }
155
156 //verify that bits are unused
157 if (entry.object->TestBit(c_IsTransient)) {
158 B2FATAL("DataStoreStreamer::c_IsTransient bit is set for " << name << "!");
159 }
160 if (entry.object->TestBit(c_IsNull)) {
161 B2FATAL("DataStoreStreamer::c_IsNull bit is set for " << name << "!");
162 }
163 if (entry.object->TestBit(c_PersistentDurability)) {
164 B2FATAL("DataStoreStreamer::c_PersistentDurability bit is set for " << name << "!");
165 }
166 //verify TObject bits are serialised
167 if (entry.object->IsA()->CanIgnoreTObjectStreamer()) {
168 B2FATAL("TObject streamers disabled for " << name << "!");
169 }
170 //store some information in TObject bits to ensure consistent state even if entry.ptr is NULL
171 entry.object->SetBit(c_IsTransient, entry.dontWriteOut);
172 entry.object->SetBit(c_IsNull, (entry.ptr == nullptr));
173 entry.object->SetBit(c_PersistentDurability, (durability == DataStore::c_Persistent));
174 m_msghandler->add(entry.object, name);
175 B2DEBUG(100, "adding item " << name);
176
177 if (entry.isArray)
178 narrays++;
179 else
180 nobjs++;
181
182 //reset bits (are checked to be false when streaming the object)
183 entry.object->SetBit(c_IsTransient, false);
184 entry.object->SetBit(c_IsNull, false);
185 entry.object->SetBit(c_PersistentDurability, false);
186
187 bool merge = m_handleMergeable and entry.ptr != nullptr and isMergeable(entry.object);
188 if (merge)
189 clearMergeable(entry.object);
190 }
191
192 if (addPersistentDurability and durability == DataStore::c_Event)
193 durability = DataStore::c_Persistent;
194 else
195 break;
196 }
197
198 // Encode EvtMessage
199 EvtMessage* msg = m_msghandler->encode_msg(MSG_EVENT);
200 (msg->header())->nObjects = nobjs;
201 (msg->header())->nArrays = narrays;
202
203 return msg;
204}
static void clearMergeable(TObject *object)
assuming object is mergeable, clear its contents.
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67

Member Data Documentation

◆ c_maxQueueDepth

const unsigned int c_maxQueueDepth = 64
static

Ask Itoh-san.

Definition at line 39 of file DataStoreStreamer.h.

◆ c_maxThreads

const unsigned int c_maxThreads = 16
static

global maximum number of threads (cannot set higher number).

Definition at line 37 of file DataStoreStreamer.h.

◆ m_compressionLevel

int m_compressionLevel
private

Compression level in streaming.

Definition at line 137 of file DataStoreStreamer.h.

◆ m_decoderStatus

int m_decoderStatus[c_maxThreads] {0}
private

thread decoder status.

Definition at line 168 of file DataStoreStreamer.h.

◆ m_handleMergeable

bool m_handleMergeable
private

Whether to handle Mergeable objects.

Definition at line 139 of file DataStoreStreamer.h.

◆ m_id

int m_id[c_maxThreads]
private

thread index.

Definition at line 163 of file DataStoreStreamer.h.

◆ m_initStatus

int m_initStatus
private

first event flag.

0 during first event, 1 otherwise.

Definition at line 145 of file DataStoreStreamer.h.

◆ m_maxthread

int m_maxthread
private

Max.

number of threads for asynchronous processing

4 for default

Definition at line 157 of file DataStoreStreamer.h.

◆ m_msghandler

MsgHandler* m_msghandler
private

MsgHandler.

MsgHandler object used to form/decode EvtMessage

Definition at line 134 of file DataStoreStreamer.h.

◆ m_pt

pthread_t m_pt[c_maxThreads]
private

thread pointer

Definition at line 161 of file DataStoreStreamer.h.

◆ m_streamobjnames

std::vector<std::string> m_streamobjnames
private

names of object to be streamed

If size=0, all objects to be streamed

Definition at line 151 of file DataStoreStreamer.h.

◆ m_threadin

int m_threadin
private

current thread?

Definition at line 165 of file DataStoreStreamer.h.


The documentation for this class was generated from the following files: