9#include <framework/pcore/DataStoreStreamer.h>
10#include <framework/pcore/MsgHandler.h>
11#include <framework/pcore/Mergeable.h>
13#include <framework/datastore/DataStore.h>
14#include <framework/logging/Logger.h>
15#include <framework/datastore/StoreObjPtr.h>
16#include <framework/dataobjects/EventMetaData.h>
18#include <TClonesArray.h>
20#include <TStreamerInfo.h>
35 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
42 static std::queue<int> my_nobjs;
43 static std::queue<int> my_narrays;
44 static std::queue<std::vector<TObject*>> my_objlist;
45 static std::queue<std::vector<std::string>> my_namelist;
50void* RunDecodeEvtMessage(
void* targ)
52 auto*
id = (
int*)targ;
54 s_streamer->decodeEvtMessage(*
id);
59 m_compressionLevel(complevel),
60 m_handleMergeable(handleMergeable),
62 m_maxthread(maxthread),
67 B2FATAL(
"DataStoreStreamer : Too many threads " <<
m_maxthread);
74 pthread_attr_t thread_attr;
75 pthread_attr_init(&thread_attr);
80 m_pt[i] = (pthread_t)0;
83 mutex_thread[i] = PTHREAD_MUTEX_INITIALIZER;
88 pthread_create(&
m_pt[i],
nullptr, RunDecodeEvtMessage, (
void*)&
m_id[i]);
90 pthread_attr_destroy(&thread_attr);
108 return object->InheritsFrom(Mergeable::Class());
113 static_cast<Mergeable*
>(object)->clear();
117 auto* existingObject =
static_cast<Mergeable*
>(existing);
123 for (
auto& entryPair : map) {
145 for (
auto &[name, entry] : map) {
147 if (!streamTransientObjects and entry.dontWriteOut)
158 B2FATAL(
"DataStoreStreamer::c_IsTransient bit is set for " << name <<
"!");
160 if (entry.object->TestBit(
c_IsNull)) {
161 B2FATAL(
"DataStoreStreamer::c_IsNull bit is set for " << name <<
"!");
164 B2FATAL(
"DataStoreStreamer::c_PersistentDurability bit is set for " << name <<
"!");
167 if (entry.object->IsA()->CanIgnoreTObjectStreamer()) {
168 B2FATAL(
"TObject streamers disabled for " << name <<
"!");
172 entry.object->SetBit(
c_IsNull, (entry.ptr ==
nullptr));
175 B2DEBUG(100,
"adding item " << name);
184 entry.object->SetBit(
c_IsNull,
false);
200 (msg->
header())->nObjects = nobjs;
201 (msg->
header())->nArrays = narrays;
209 if (msg->
type() == MSG_TERMINATE) {
210 B2INFO(
"Got termination message. Exitting...");
216 eventMetaData->setEndOfData();
222 std::vector<TObject*> objlist;
223 std::vector<std::string> namelist;
227 int nobjs = (msg->
header())->nObjects;
228 int narrays = (msg->
header())->nArrays;
229 if (
unsigned(nobjs + narrays) != objlist.size())
230 B2WARNING(
"restoreDataStore(): inconsistent #objects/#arrays in header");
233 for (
int i = 0; i < nobjs + narrays; i++) {
234 TObject* obj = objlist.at(i);
235 bool array = (
dynamic_cast<TClonesArray*
>(obj) !=
nullptr);
236 if (obj !=
nullptr) {
239 if (msg->
type() == MSG_STREAMERINFO) {
246 TClass* cl = obj->IsA();
248 cl =
static_cast<TClonesArray*
>(obj)->GetClass();
254 B2ASSERT(
"Can not find a data store entry with the name " << namelist.at(i) <<
". Did you maybe forget to register it?", entry);
256 bool ptrIsNULL = obj->TestBit(
c_IsNull);
260 B2DEBUG(100,
"Will now merge " << namelist.at(i));
284 B2ERROR(
"restoreDS: " << (array ?
"Array" :
"Object") <<
": " << namelist.at(i) <<
" is NULL!");
299 if (evtbuf ==
nullptr) {
300 printf(
"queueEvtMessage : NULL evtbuf detected. \n");
311 pthread_mutex_lock(&mutex_thread[
m_threadin]);
313 pthread_mutex_unlock(&mutex_thread[
m_threadin]);
328 printf(
"decodeEvtMessge : started. Thread ID = %d\n",
id);
334 pthread_mutex_lock(&mutex);
336 pthread_mutex_unlock(&mutex);
345 while (my_evtbuf[
id].size() <= 0) usleep(10);
348 pthread_mutex_lock(&mutex_thread[
id]);
349 int nqueue = my_evtbuf[id].size();
350 if (nqueue <= 0) printf(
"!!!!! Nqueue = %d\n", nqueue);
351 char* evtbuf = my_evtbuf[id].front(); my_evtbuf[id].pop();
352 pthread_mutex_unlock(&mutex_thread[
id]);
355 if (evtbuf ==
nullptr) {
356 printf(
"decodeEvtMessage: NULL evtbuf detected, nq = %d\n", nqueue);
365 std::vector<TObject*> objlist;
366 std::vector<std::string> namelist;
369 msghandler.
decode_msg(msg, objlist, namelist);
375 pthread_mutex_lock(&mutex);
376 my_objlist.push(objlist);
377 my_namelist.push(namelist);
378 my_nobjs.push((msg->header())->nObjects);
379 my_narrays.push((msg->header())->nArrays);
380 pthread_mutex_unlock(&mutex);
397 while (my_nobjs.empty()) usleep(10);
402 pthread_mutex_lock(&mutex);
403 int nobjs = my_nobjs.front(); my_nobjs.pop();
405 printf(
"restoreDataStore: EOF detected. exitting with status 0\n");
406 pthread_mutex_unlock(&mutex);
409 int narrays = my_narrays.front(); my_narrays.pop();
410 std::vector<TObject*> objlist = my_objlist.front(); my_objlist.pop();
411 std::vector<std::string> namelist = my_namelist.front(); my_namelist.pop();
412 pthread_mutex_unlock(&mutex);
416 for (
int i = 0; i < nobjs + narrays; i++) {
418 bool array = (
dynamic_cast<TClonesArray*
>(objlist.at(i)) !=
nullptr);
419 if (objlist.at(i) !=
nullptr) {
420 TObject* obj = objlist.at(i);
423 TClass* cl = obj->IsA();
425 cl =
static_cast<TClonesArray*
>(obj)->GetClass();
431 bool ptrIsNULL = obj->TestBit(
c_IsNull);
435 B2DEBUG(100,
"restoreDS: " << (array ?
"Array" :
"Object") <<
": " << namelist.at(i) <<
" stored");
447 B2ERROR(
"restoreDS: " << (array ?
"Array" :
"Object") <<
": " << namelist.at(i) <<
" is NULL!");
487 TObjLink* lnk = list->FirstLink();
489 std::vector<std::string> class_name;
493 info = (TStreamerInfo*)lnk->GetObject();
496 for (
auto& itr : class_name) {
497 if (strcmp(itr.c_str(), info->GetName()) == 0) {
499 B2DEBUG(100,
"Regular Class Loop : The class " << info->GetName() <<
" has already appeared. Skipping...");
506 std::string temp_classname = info->GetName();
507 class_name.push_back(temp_classname);
509 TObject* element = info->GetElements()->UncheckedAt(0);
510 Bool_t isstl = element && strcmp(
"This", element->GetName()) == 0;
514 B2INFO(
"importing TStreamerInfo: " << info->GetName() <<
515 " version = " << info->GetClassVersion());
524 lnk = list->FirstLink();
526 info = (TStreamerInfo*)lnk->GetObject();
529 for (
auto& itr : class_name) {
530 if (strcmp(itr.c_str(), info->GetName()) == 0) {
532 B2DEBUG(100,
"STL Class Loop : The class " << info->GetName() <<
" has already appeared. Skipping...");
539 std::string temp_classname = info->GetName();
540 class_name.push_back(temp_classname);
542 TObject* element = info->GetElements()->UncheckedAt(0);
543 Bool_t isstl = element && strcmp(
"This", element->GetName()) == 0;
547 B2INFO(
"STL importing TStreamerInfo: " << info->GetName() <<
548 " version = " << info->GetClassVersion());
Stream/restore DataStore objects to/from EvtMessage.
static const unsigned int c_maxQueueDepth
Ask Itoh-san.
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
int getMaxThreads()
maximum number of threads.
int restoreStreamerInfos(const TList *list)
restore StreamerInfo from data in a file
pthread_t m_pt[c_maxThreads]
thread pointer
int queueEvtMessage(char *msg)
Queue EvtMessage for destreaming.
void * decodeEvtMessage(int id)
Decode EvtMessage and store objects in temporary buffer.
int restoreDataStoreAsync()
Restore objects in DataStore from temporary buffer.
std::vector< std::string > m_streamobjnames
names of object to be streamed
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
static bool isMergeable(const TObject *object)
Is the given object of a type that can be merged?
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
~DataStoreStreamer()
destructor
int m_threadin
current thread?
@ c_PersistentDurability
Object is of persistent durability.
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
static void mergeIntoExisting(TObject *existing, const TObject *received)
Assuming both objects are mergeable, merge 'received' into 'existing'.
MsgHandler * m_msghandler
MsgHandler.
int m_decoderStatus[c_maxThreads]
thread decoder status.
int m_initStatus
first event flag.
int m_id[c_maxThreads]
thread index.
bool m_handleMergeable
Whether to handle Mergeable objects.
void setMaxThreads(int)
maximum number of threads.
void setDecoderStatus(int)
Ask Itoh-san about this.
int m_compressionLevel
Compression level in streaming.
int getDecoderStatus()
Ask Itoh-san about this.
static void clearMergeable(TObject *object)
assuming object is mergeable, clear its contents.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
DataStoreStreamer(int complevel=0, bool handleMergeable=true, int maxthread=0)
Constructor.
bool getInitializeActive() const
Are we currently initializing modules?
@ c_WriteOut
Object/array should be saved by output modules.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
EDurability
Durability types.
@ c_Persistent
Object is available during entire execution time.
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
StoreEntry * getEntry(const StoreAccessorBase &accessor)
Check whether an entry with the correct type is registered in the DataStore map and return it.
bool createObject(TObject *object, bool replace, const StoreAccessorBase &accessor)
Create a new object/array in the DataStore or add an existing one.
bool registerEntry(const std::string &name, EDurability durability, TClass *objClass, bool array, EStoreFlags storeFlags)
Register an entry in the DataStore map.
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Class to manage streamed object.
ERecordType type() const
Get record type.
EvtHeader * header()
Get pointer to EvtHeader.
Abstract base class for objects that can be merged.
virtual void merge(const Mergeable *other)=0
Merge object 'other' into this one.
A class to encode/decode an EvtMessage.
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
virtual void clear()
Clear object list.
Base class for StoreObjPtr and StoreArray for easier common treatment.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
std::vector< std::vector< double > > merge(std::vector< std::vector< std::vector< double > > > toMerge)
merge { vector<double> a, vector<double> b} into {a, b}
Abstract base class for different kinds of events.
Wraps a stored array/object, stored under unique (name, durability) key.
TObject * ptr
The pointer to the returned object, either equal to 'object' or null, depending on wether the object ...
TObject * object
The pointer to the actual object.
void invalidate()
invalidate entry for next event.