9#include <framework/pcore/DataStoreStreamer.h> 
   10#include <framework/pcore/MsgHandler.h> 
   11#include <framework/pcore/Mergeable.h> 
   13#include <framework/datastore/DataStore.h> 
   14#include <framework/logging/Logger.h> 
   15#include <framework/datastore/StoreObjPtr.h> 
   16#include <framework/dataobjects/EventMetaData.h> 
   18#include <TClonesArray.h> 
   20#include <TStreamerInfo.h> 
   35  static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
   42  static std::queue<int> my_nobjs;
 
   43  static std::queue<int> my_narrays;
 
   44  static std::queue<std::vector<TObject*>> my_objlist;
 
   45  static std::queue<std::vector<std::string>> my_namelist;
 
   50void* RunDecodeEvtMessage(
void* targ)
 
   52  auto* 
id = (
int*)targ;
 
   54  s_streamer->decodeEvtMessage(*
id);
 
   59  m_compressionLevel(complevel),
 
   60  m_handleMergeable(handleMergeable),
 
   62  m_maxthread(maxthread),
 
   67    B2FATAL(
"DataStoreStreamer : Too many threads " << 
m_maxthread);
 
   74    pthread_attr_t thread_attr;
 
   75    pthread_attr_init(&thread_attr);
 
   80      m_pt[i] = (pthread_t)0;
 
   83      mutex_thread[i] = PTHREAD_MUTEX_INITIALIZER;
 
   88      pthread_create(&
m_pt[i], 
nullptr, RunDecodeEvtMessage, (
void*)&
m_id[i]);
 
   90    pthread_attr_destroy(&thread_attr);
 
  108  return object->InheritsFrom(Mergeable::Class());
 
  113  static_cast<Mergeable*
>(object)->clear();
 
  117  auto* existingObject = 
static_cast<Mergeable*
>(existing);
 
  123  for (
auto& entryPair : map) {
 
  145    for (
auto &[name, entry] : map) {
 
  147      if (!streamTransientObjects and entry.dontWriteOut)
 
  158        B2FATAL(
"DataStoreStreamer::c_IsTransient bit is set for " << name << 
"!");
 
  160      if (entry.object->TestBit(
c_IsNull)) {
 
  161        B2FATAL(
"DataStoreStreamer::c_IsNull bit is set for " << name << 
"!");
 
  164        B2FATAL(
"DataStoreStreamer::c_PersistentDurability bit is set for " << name << 
"!");
 
  167      if (entry.object->IsA()->CanIgnoreTObjectStreamer()) {
 
  168        B2FATAL(
"TObject streamers disabled for " << name << 
"!");
 
  172      entry.object->SetBit(
c_IsNull, (entry.ptr == 
nullptr));
 
  175      B2DEBUG(100, 
"adding item " << name);
 
  184      entry.object->SetBit(
c_IsNull, 
false);
 
  200  (msg->
header())->nObjects = nobjs;
 
  201  (msg->
header())->nArrays = narrays;
 
  209  if (msg->
type() == MSG_TERMINATE) {
 
  210    B2INFO(
"Got termination message. Exitting...");
 
  216    eventMetaData->setEndOfData();
 
  222    std::vector<TObject*> objlist;
 
  223    std::vector<std::string> namelist;
 
  227    int nobjs = (msg->
header())->nObjects;
 
  228    int narrays = (msg->
header())->nArrays;
 
  229    if (
unsigned(nobjs + narrays) != objlist.size())
 
  230      B2WARNING(
"restoreDataStore(): inconsistent #objects/#arrays in header");
 
  233    for (
int i = 0; i < nobjs + narrays; i++) {
 
  234      TObject* obj = objlist.at(i);
 
  235      bool array = (
dynamic_cast<TClonesArray*
>(obj) != 
nullptr);
 
  236      if (obj != 
nullptr) {
 
  239        if (msg->
type() == MSG_STREAMERINFO) {
 
  246        TClass* cl = obj->IsA();
 
  248          cl = 
static_cast<TClonesArray*
>(obj)->GetClass();
 
  254        B2ASSERT(
"Can not find a data store entry with the name " << namelist.at(i) << 
". Did you maybe forget to register it?", entry);
 
  256        bool ptrIsNULL = obj->TestBit(
c_IsNull);
 
  260            B2DEBUG(100, 
"Will now merge " << namelist.at(i));
 
  284        B2ERROR(
"restoreDS: " << (array ? 
"Array" : 
"Object") << 
": " << namelist.at(i) << 
" is NULL!");
 
  299  if (evtbuf == 
nullptr) {
 
  300    printf(
"queueEvtMessage : NULL evtbuf detected. \n");
 
  311      pthread_mutex_lock(&mutex_thread[
m_threadin]);
 
  313      pthread_mutex_unlock(&mutex_thread[
m_threadin]);
 
  328  printf(
"decodeEvtMessge : started. Thread ID = %d\n", 
id);
 
  334  pthread_mutex_lock(&mutex);     
 
  336  pthread_mutex_unlock(&mutex);    
 
  345    while (my_evtbuf[
id].size() <= 0) usleep(10);
 
  348    pthread_mutex_lock(&mutex_thread[
id]);
 
  349    int nqueue = my_evtbuf[id].size();
 
  350    if (nqueue <= 0) printf(
"!!!!! Nqueue = %d\n", nqueue);
 
  351    char* evtbuf = my_evtbuf[id].front(); my_evtbuf[id].pop();
 
  352    pthread_mutex_unlock(&mutex_thread[
id]);
 
  355    if (evtbuf == 
nullptr) {
 
  356      printf(
"decodeEvtMessage: NULL evtbuf detected, nq = %d\n", nqueue);
 
  365    std::vector<TObject*> objlist;
 
  366    std::vector<std::string> namelist;
 
  369    msghandler.
decode_msg(msg, objlist, namelist);
 
  375    pthread_mutex_lock(&mutex);     
 
  376    my_objlist.push(objlist);
 
  377    my_namelist.push(namelist);
 
  378    my_nobjs.push((msg->header())->nObjects);
 
  379    my_narrays.push((msg->header())->nArrays);
 
  380    pthread_mutex_unlock(&mutex);    
 
  397  while (my_nobjs.empty()) usleep(10);
 
  402  pthread_mutex_lock(&mutex);
 
  403  int nobjs = my_nobjs.front(); my_nobjs.pop();
 
  405    printf(
"restoreDataStore: EOF detected. exitting with status 0\n");
 
  406    pthread_mutex_unlock(&mutex);
 
  409  int narrays = my_narrays.front(); my_narrays.pop();
 
  410  std::vector<TObject*> objlist = my_objlist.front(); my_objlist.pop();
 
  411  std::vector<std::string> namelist = my_namelist.front(); my_namelist.pop();
 
  412  pthread_mutex_unlock(&mutex);
 
  416  for (
int i = 0; i < nobjs + narrays; i++) {
 
  418    bool array = (
dynamic_cast<TClonesArray*
>(objlist.at(i)) != 
nullptr);
 
  419    if (objlist.at(i) != 
nullptr) {
 
  420      TObject* obj = objlist.at(i);
 
  423      TClass* cl = obj->IsA();
 
  425        cl = 
static_cast<TClonesArray*
>(obj)->GetClass();
 
  431      bool ptrIsNULL = obj->TestBit(
c_IsNull);
 
  435        B2DEBUG(100, 
"restoreDS: " << (array ? 
"Array" : 
"Object") << 
": " << namelist.at(i) << 
" stored");
 
  447      B2ERROR(
"restoreDS: " << (array ? 
"Array" : 
"Object") << 
": " << namelist.at(i) << 
" is NULL!");
 
  487  TObjLink* lnk = list->FirstLink();
 
  489  std::vector<std::string> class_name;
 
  493    info = (TStreamerInfo*)lnk->GetObject();
 
  496    for (
auto& itr : class_name) {
 
  497      if (strcmp(itr.c_str(), info->GetName()) == 0) {
 
  499        B2DEBUG(100, 
"Regular Class Loop : The class " << info->GetName() << 
" has already appeared. Skipping...");
 
  506      std::string temp_classname = info->GetName();
 
  507      class_name.push_back(temp_classname);
 
  509      TObject* element = info->GetElements()->UncheckedAt(0);
 
  510      Bool_t isstl = element && strcmp(
"This", element->GetName()) == 0;
 
  514        B2INFO(
"importing TStreamerInfo: " << info->GetName() <<
 
  515               " version = " << info->GetClassVersion());
 
  524  lnk = list->FirstLink();
 
  526    info = (TStreamerInfo*)lnk->GetObject();
 
  529    for (
auto& itr : class_name) {
 
  530      if (strcmp(itr.c_str(), info->GetName()) == 0) {
 
  532        B2DEBUG(100, 
"STL Class Loop : The class " << info->GetName() << 
" has already appeared. Skipping...");
 
  539      std::string temp_classname = info->GetName();
 
  540      class_name.push_back(temp_classname);
 
  542      TObject* element = info->GetElements()->UncheckedAt(0);
 
  543      Bool_t isstl = element && strcmp(
"This", element->GetName()) == 0;
 
  547        B2INFO(
"STL importing TStreamerInfo: " << info->GetName() <<
 
  548               " version = " << info->GetClassVersion());
 
Stream/restore DataStore objects to/from EvtMessage.
static const unsigned int c_maxQueueDepth
Ask Itoh-san.
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
int getMaxThreads()
maximum number of threads.
int restoreStreamerInfos(const TList *list)
restore StreamerInfo from data in a file
pthread_t m_pt[c_maxThreads]
thread pointer
int queueEvtMessage(char *msg)
Queue EvtMessage for destreaming.
void * decodeEvtMessage(int id)
Decode EvtMessage and store objects in temporary buffer.
int restoreDataStoreAsync()
Restore objects in DataStore from temporary buffer.
std::vector< std::string > m_streamobjnames
names of object to be streamed
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
static bool isMergeable(const TObject *object)
Is the given object of a type that can be merged?
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
~DataStoreStreamer()
destructor
int m_threadin
current thread?
@ c_PersistentDurability
Object is of persistent durability.
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
static void mergeIntoExisting(TObject *existing, const TObject *received)
Assuming both objects are mergeable, merge 'received' into 'existing'.
MsgHandler * m_msghandler
MsgHandler.
int m_decoderStatus[c_maxThreads]
thread decoder status.
int m_initStatus
first event flag.
int m_id[c_maxThreads]
thread index.
bool m_handleMergeable
Whether to handle Mergeable objects.
void setMaxThreads(int)
maximum number of threads.
void setDecoderStatus(int)
Ask Itoh-san about this.
int m_compressionLevel
Compression level in streaming.
int getDecoderStatus()
Ask Itoh-san about this.
static void clearMergeable(TObject *object)
assuming object is mergeable, clear its contents.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
DataStoreStreamer(int complevel=0, bool handleMergeable=true, int maxthread=0)
Constructor.
bool getInitializeActive() const
Are we currently initializing modules?
@ c_WriteOut
Object/array should be saved by output modules.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
StoreEntryMap & getStoreEntryMap(EDurability durability)
Get a reference to the object/array map.
EDurability
Durability types.
@ c_Persistent
Object is available during entire execution time.
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
StoreEntry * getEntry(const StoreAccessorBase &accessor)
Check whether an entry with the correct type is registered in the DataStore map and return it.
bool createObject(TObject *object, bool replace, const StoreAccessorBase &accessor)
Create a new object/array in the DataStore or add an existing one.
bool registerEntry(const std::string &name, EDurability durability, TClass *objClass, bool array, EStoreFlags storeFlags)
Register an entry in the DataStore map.
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Class to manage streamed object.
ERecordType type() const
Get record type.
EvtHeader * header()
Get pointer to EvtHeader.
Abstract base class for objects that can be merged.
virtual void merge(const Mergeable *other)=0
Merge object 'other' into this one.
A class to encode/decode an EvtMessage.
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
virtual void clear()
Clear object list.
Base class for StoreObjPtr and StoreArray for easier common treatment.
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
bool create(bool replace=false)
Create a default object in the data store.
Type-safe access to single objects in the data store.
std::vector< std::vector< double > > merge(std::vector< std::vector< std::vector< double > > > toMerge)
merge { vector<double> a, vector<double> b} into {a, b}
Abstract base class for different kinds of events.
Wraps a stored array/object, stored under unique (name, durability) key.
TObject * ptr
The pointer to the returned object, either equal to 'object' or null, depending on wether the object ...
TObject * object
The pointer to the actual object.
void invalidate()
invalidate entry for next event.