9 #include <framework/pcore/DataStoreStreamer.h> 
   10 #include <framework/pcore/MsgHandler.h> 
   11 #include <framework/pcore/Mergeable.h> 
   13 #include <framework/datastore/DataStore.h> 
   14 #include <framework/logging/Logger.h> 
   15 #include <framework/datastore/StoreObjPtr.h> 
   16 #include <framework/dataobjects/EventMetaData.h> 
   18 #include <TClonesArray.h> 
   20 #include <TStreamerInfo.h> 
   35   static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
   42   static std::queue<int> my_nobjs;
 
   43   static std::queue<int> my_narrays;
 
   44   static std::queue<std::vector<TObject*>> my_objlist;
 
   45   static std::queue<std::vector<std::string>> my_namelist;
 
   50 void* RunDecodeEvtMessage(
void* targ)
 
   52   auto* 
id = (
int*)targ;
 
   54   s_streamer->decodeEvtMessage(*
id);
 
   59   m_compressionLevel(complevel),
 
   60   m_handleMergeable(handleMergeable),
 
   62   m_maxthread(maxthread),
 
   67     B2FATAL(
"DataStoreStreamer : Too many threads " << 
m_maxthread);
 
   74     pthread_attr_t thread_attr;
 
   75     pthread_attr_init(&thread_attr);
 
   80       m_pt[i] = (pthread_t)0;
 
   83       mutex_thread[i] = PTHREAD_MUTEX_INITIALIZER;
 
   88       pthread_create(&
m_pt[i], 
nullptr, RunDecodeEvtMessage, (
void*)&
m_id[i]);
 
   90     pthread_attr_destroy(&thread_attr);
 
  108   return object->InheritsFrom(Mergeable::Class());
 
  113   static_cast<Mergeable*
>(object)->clear();
 
  117   auto* existingObject = 
static_cast<Mergeable*
>(existing);
 
  123   for (
auto& entryPair : map) {
 
  145     for (
auto &[name, entry] : map) {
 
  147       if (!streamTransientObjects and entry.dontWriteOut)
 
  158         B2FATAL(
"DataStoreStreamer::c_IsTransient bit is set for " << name << 
"!");
 
  160       if (entry.object->TestBit(
c_IsNull)) {
 
  161         B2FATAL(
"DataStoreStreamer::c_IsNull bit is set for " << name << 
"!");
 
  164         B2FATAL(
"DataStoreStreamer::c_PersistentDurability bit is set for " << name << 
"!");
 
  167       if (entry.object->IsA()->CanIgnoreTObjectStreamer()) {
 
  168         B2FATAL(
"TObject streamers disabled for " << name << 
"!");
 
  172       entry.object->SetBit(
c_IsNull, (entry.ptr == 
nullptr));
 
  175       B2DEBUG(100, 
"adding item " << name);
 
  184       entry.object->SetBit(
c_IsNull, 
false);
 
  200   (msg->
header())->nObjects = nobjs;
 
  201   (msg->
header())->nArrays = narrays;
 
  209   if (msg->
type() == MSG_TERMINATE) {
 
  210     B2INFO(
"Got termination message. Exitting...");
 
  216     eventMetaData->setEndOfData();
 
  222     std::vector<TObject*> objlist;
 
  223     std::vector<std::string> namelist;
 
  227     int nobjs = (msg->
header())->nObjects;
 
  228     int narrays = (msg->
header())->nArrays;
 
  229     if (
unsigned(nobjs + narrays) != objlist.size())
 
  230       B2WARNING(
"restoreDataStore(): inconsistent #objects/#arrays in header");
 
  233     for (
int i = 0; i < nobjs + narrays; i++) {
 
  234       TObject* obj = objlist.at(i);
 
  235       bool array = (
dynamic_cast<TClonesArray*
>(obj) != 
nullptr);
 
  236       if (obj != 
nullptr) {
 
  239         if (msg->
type() == MSG_STREAMERINFO) {
 
  246         TClass* cl = obj->IsA();
 
  248           cl = 
static_cast<TClonesArray*
>(obj)->GetClass();
 
  254         B2ASSERT(
"Can not find a data store entry with the name " << namelist.at(i) << 
". Did you maybe forget to register it?", entry);
 
  256         bool ptrIsNULL = obj->TestBit(
c_IsNull);
 
  260             B2DEBUG(100, 
"Will now merge " << namelist.at(i));
 
  284         B2ERROR(
"restoreDS: " << (array ? 
"Array" : 
"Object") << 
": " << namelist.at(i) << 
" is NULL!");
 
  299   if (evtbuf == 
nullptr) {
 
  300     printf(
"queueEvtMessage : NULL evtbuf detected. \n");
 
  311       pthread_mutex_lock(&mutex_thread[
m_threadin]);
 
  313       pthread_mutex_unlock(&mutex_thread[
m_threadin]);
 
  328   printf(
"decodeEvtMessge : started. Thread ID = %d\n", 
id);
 
  334   pthread_mutex_lock(&mutex);     
 
  336   pthread_mutex_unlock(&mutex);    
 
  345     while (my_evtbuf[
id].size() <= 0) usleep(10);
 
  348     pthread_mutex_lock(&mutex_thread[
id]);
 
  349     int nqueue = my_evtbuf[id].size();
 
  350     if (nqueue <= 0) printf(
"!!!!! Nqueue = %d\n", nqueue);
 
  351     char* evtbuf = my_evtbuf[id].front(); my_evtbuf[id].pop();
 
  352     pthread_mutex_unlock(&mutex_thread[
id]);
 
  355     if (evtbuf == 
nullptr) {
 
  356       printf(
"decodeEvtMessage: NULL evtbuf detected, nq = %d\n", nqueue);
 
  365     std::vector<TObject*> objlist;
 
  366     std::vector<std::string> namelist;
 
  369     msghandler.
decode_msg(msg, objlist, namelist);
 
  375     pthread_mutex_lock(&mutex);     
 
  376     my_objlist.push(objlist);
 
  377     my_namelist.push(namelist);
 
  378     my_nobjs.push((msg->header())->nObjects);
 
  379     my_narrays.push((msg->header())->nArrays);
 
  380     pthread_mutex_unlock(&mutex);    
 
  397   while (my_nobjs.empty()) usleep(10);
 
  402   pthread_mutex_lock(&mutex);
 
  403   int nobjs = my_nobjs.front(); my_nobjs.pop();
 
  405     printf(
"restoreDataStore: EOF detected. exitting with status 0\n");
 
  406     pthread_mutex_unlock(&mutex);
 
  409   int narrays = my_narrays.front(); my_narrays.pop();
 
  410   std::vector<TObject*> objlist = my_objlist.front(); my_objlist.pop();
 
  411   std::vector<std::string> namelist = my_namelist.front(); my_namelist.pop();
 
  412   pthread_mutex_unlock(&mutex);
 
  416   for (
int i = 0; i < nobjs + narrays; i++) {
 
  418     bool array = (
dynamic_cast<TClonesArray*
>(objlist.at(i)) != 
nullptr);
 
  419     if (objlist.at(i) != 
nullptr) {
 
  420       TObject* obj = objlist.at(i);
 
  423       TClass* cl = obj->IsA();
 
  425         cl = 
static_cast<TClonesArray*
>(obj)->GetClass();
 
  431       bool ptrIsNULL = obj->TestBit(
c_IsNull);
 
  435         B2DEBUG(100, 
"restoreDS: " << (array ? 
"Array" : 
"Object") << 
": " << namelist.at(i) << 
" stored");
 
  447       B2ERROR(
"restoreDS: " << (array ? 
"Array" : 
"Object") << 
": " << namelist.at(i) << 
" is NULL!");
 
  487   TObjLink* lnk = list->FirstLink();
 
  489   std::vector<std::string> class_name;
 
  493     info = (TStreamerInfo*)lnk->GetObject();
 
  496     for (
auto& itr : class_name) {
 
  497       if (strcmp(itr.c_str(), info->GetName()) == 0) {
 
  499         B2DEBUG(100, 
"Regular Class Loop : The class " << info->GetName() << 
" has already appeared. Skipping...");
 
  506       std::string temp_classname = info->GetName();
 
  507       class_name.push_back(temp_classname);
 
  509       TObject* element = info->GetElements()->UncheckedAt(0);
 
  510       Bool_t isstl = element && strcmp(
"This", element->GetName()) == 0;
 
  514         B2INFO(
"importing TStreamerInfo: " << info->GetName() <<
 
  515                " version = " << info->GetClassVersion());
 
  524   lnk = list->FirstLink();
 
  526     info = (TStreamerInfo*)lnk->GetObject();
 
  529     for (
auto& itr : class_name) {
 
  530       if (strcmp(itr.c_str(), info->GetName()) == 0) {
 
  532         B2DEBUG(100, 
"STL Class Loop : The class " << info->GetName() << 
" has already appeared. Skipping...");
 
  539       std::string temp_classname = info->GetName();
 
  540       class_name.push_back(temp_classname);
 
  542       TObject* element = info->GetElements()->UncheckedAt(0);
 
  543       Bool_t isstl = element && strcmp(
"This", element->GetName()) == 0;
 
  547         B2INFO(
"STL importing TStreamerInfo: " << info->GetName() <<
 
  548                " version = " << info->GetClassVersion());
 
Stream/restore DataStore objects to/from EvtMessage.
static const unsigned int c_maxQueueDepth
Ask Itoh-san.
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
int getMaxThreads()
maximum number of threads.
int restoreStreamerInfos(const TList *list)
restore StreamerInfo from data in a file
pthread_t m_pt[c_maxThreads]
thread pointer
int queueEvtMessage(char *msg)
Queue EvtMessage for destreaming.
void * decodeEvtMessage(int id)
Decode EvtMessage and store objects in temporary buffer.
int restoreDataStoreAsync()
Restore objects in DataStore from temporary buffer.
std::vector< std::string > m_streamobjnames
names of object to be streamed
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
static bool isMergeable(const TObject *object)
Is the given object of a type that can be merged?
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
~DataStoreStreamer()
destructor
int m_threadin
current thread?
@ c_PersistentDurability
Object is of persistent durability.
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
static void mergeIntoExisting(TObject *existing, const TObject *received)
Assuming both objects are mergeable, merge 'received' into 'existing'.
MsgHandler * m_msghandler
MsgHandler.
int m_decoderStatus[c_maxThreads]
thread decoder status.
int m_initStatus
first event flag.
int m_id[c_maxThreads]
thread index.
bool m_handleMergeable
Whether to handle Mergeable objects.
void setMaxThreads(int)
maximum number of threads.
void setDecoderStatus(int)
Ask Itoh-san about this.
int m_compressionLevel
Compression level in streaming.
int getDecoderStatus()
Ask Itoh-san about this.
static void clearMergeable(TObject *object)
assuming object is mergeable, clear its contents.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
DataStoreStreamer(int complevel=0, bool handleMergeable=true, int maxthread=0)
Constructor.
bool getInitializeActive() const
Are we currently initializing modules?
@ c_WriteOut
Object/array should be saved by output modules.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
EDurability
Durability types.
@ c_Persistent
Object is available during entire execution time.
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
StoreEntry * getEntry(const StoreAccessorBase &accessor)
Check whether an entry with the correct type is registered in the DataStore map and return it.
bool createObject(TObject *object, bool replace, const StoreAccessorBase &accessor)
Create a new object/array in the DataStore or add an existing one.
bool registerEntry(const std::string &name, EDurability durability, TClass *objClass, bool array, EStoreFlags storeFlags)
Register an entry in the DataStore map.
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Class to manage streamed object.
ERecordType type() const
Get record type.
EvtHeader * header()
Get pointer to EvtHeader.
Abstract base class for objects that can be merged.
virtual void merge(const Mergeable *other)=0
Merge object 'other' into this one.
A class to encode/decode an EvtMessage.
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
virtual void clear()
Clear object list.
Base class for StoreObjPtr and StoreArray for easier common treatment.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
Abstract base class for different kinds of events.
Wraps a stored array/object, stored under unique (name, durability) key.
TObject * ptr
The pointer to the returned object, either equal to 'object' or null, depending on wether the object ...
TObject * object
The pointer to the actual object.
void invalidate()
invalidate entry for next event.