Belle II Software  release-08-01-10
DataStoreStreamer Class Reference

Stream/restore DataStore objects to/from EvtMessage. More...

#include <DataStoreStreamer.h>

Collaboration diagram for DataStoreStreamer:

Public Member Functions

 DataStoreStreamer (int complevel=0, bool handleMergeable=true, int maxthread=0)
 Constructor. More...
 
 DataStoreStreamer (const DataStoreStreamer &)=delete
 No copying.
 
DataStoreStreameroperator= (const DataStoreStreamer &)=delete
 No assignment.
 
 ~DataStoreStreamer ()
 destructor
 
EvtMessagestreamDataStore (bool addPersistentDurability, bool streamTransientObjects=false)
 Store DataStore objects in EvtMessage. More...
 
int restoreDataStore (EvtMessage *msg)
 Restore DataStore objects from EvtMessage. More...
 
void setStreamingObjects (const std::vector< std::string > &list)
 Set names of objects to be streamed/destreamed.
 
int queueEvtMessage (char *msg)
 Queue EvtMessage for destreaming. More...
 
void * decodeEvtMessage (int id)
 Decode EvtMessage and store objects in temporary buffer. More...
 
int restoreDataStoreAsync ()
 Restore objects in DataStore from temporary buffer.
 
void setMaxThreads (int)
 maximum number of threads.
 
int getMaxThreads ()
 maximum number of threads.
 
void setDecoderStatus (int)
 Ask Itoh-san about this.
 
int getDecoderStatus ()
 Ask Itoh-san about this.
 

Static Public Member Functions

static bool isMergeable (const TObject *object)
 Is the given object of a type that can be merged?
 
static void clearMergeable (TObject *object)
 assuming object is mergeable, clear its contents. More...
 
static void mergeIntoExisting (TObject *existing, const TObject *received)
 Assuming both objects are mergeable, merge 'received' into 'existing'.
 
static void removeSideEffects ()
 call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durability).
 

Static Public Attributes

static const unsigned int c_maxThreads = 16
 global maximum number of threads (cannot set higher number).
 
static const unsigned int c_maxQueueDepth = 64
 Ask Itoh-san.
 

Private Types

enum  ETObjectBits {
  c_IsTransient = BIT(19) ,
  c_IsNull = BIT(20) ,
  c_PersistentDurability = BIT(21)
}
 bits to store in TObject. More...
 

Private Member Functions

int restoreStreamerInfos (const TList *list)
 restore StreamerInfo from data in a file
 

Private Attributes

MsgHandlerm_msghandler
 MsgHandler. More...
 
int m_compressionLevel
 Compression level in streaming.
 
bool m_handleMergeable
 Whether to handle Mergeable objects.
 
int m_initStatus
 first event flag. More...
 
std::vector< std::string > m_streamobjnames
 names of object to be streamed More...
 
int m_maxthread
 Max. More...
 
pthread_t m_pt [c_maxThreads]
 thread pointer
 
int m_id [c_maxThreads]
 thread index.
 
int m_threadin
 current thread?
 
int m_decoderStatus [c_maxThreads] {0}
 thread decoder status.
 

Detailed Description

Stream/restore DataStore objects to/from EvtMessage.

Main interface is provided by streamDataStore() and restoreDataStore(). Other functions provide more obscure features.

Definition at line 34 of file DataStoreStreamer.h.

Member Enumeration Documentation

◆ ETObjectBits

enum ETObjectBits
private

bits to store in TObject.

Bits 14-23 are available for use in derived classes, and are reused here to transmit additional information. This is really quite ugly and should be replaced with some more sane way of transmitting object-level data. All bits are checked before using them, so if they are used by other code we know what happens.

Enumerator
c_IsTransient 

The corresponding StoreEntry has flag c_DontWriteOut.

c_IsNull 

object is not valid for current event, set StoreEntry::ptr to NULL.

c_PersistentDurability 

Object is of persistent durability.

Definition at line 124 of file DataStoreStreamer.h.

124  {
125  c_IsTransient = BIT(19),
126  c_IsNull = BIT(20),
127  c_PersistentDurability = BIT(21)
128  };
@ c_PersistentDurability
Object is of persistent durability.
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.

Constructor & Destructor Documentation

◆ DataStoreStreamer()

DataStoreStreamer ( int  complevel = 0,
bool  handleMergeable = true,
int  maxthread = 0 
)
explicit

Constructor.

Parameters
complevelCompression level of streaming, 0 to disable
handleMergeableperform special handling for Mergeable objects?
maxthreadmaximal number of threads, 0 to disable

Definition at line 58 of file DataStoreStreamer.cc.

58  :
59  m_compressionLevel(complevel),
60  m_handleMergeable(handleMergeable),
61  m_initStatus(0),
62  m_maxthread(maxthread),
63  m_threadin(0)
64  //, m_threadout(0)
65 {
66  if ((unsigned int)m_maxthread > c_maxThreads) {
67  B2FATAL("DataStoreStreamer : Too many threads " << m_maxthread);
69  }
71 
72  if (m_maxthread > 0) {
73  // Run decoder threads as sustainable detached threads
74  pthread_attr_t thread_attr;
75  pthread_attr_init(&thread_attr);
76  // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
77  // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
78  for (int i = 0; i < m_maxthread; i++) {
79  my_decstat[i] = 0;
80  m_pt[i] = (pthread_t)0;
81  m_id[i] = i;
82  // m_pmsghandler[i] = new MsgHandler ( m_compressionLevel );
83  mutex_thread[i] = PTHREAD_MUTEX_INITIALIZER;
84  msg_compLevel[i] = m_compressionLevel;
85  }
86  for (int i = 0; i < m_maxthread; i++) {
87  // args.evtbuf = m_evtbuf[i];
88  pthread_create(&m_pt[i], nullptr, RunDecodeEvtMessage, (void*)&m_id[i]);
89  }
90  pthread_attr_destroy(&thread_attr);
91  }
92  s_streamer = this;
93 }
pthread_t m_pt[c_maxThreads]
thread pointer
int m_threadin
current thread?
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
MsgHandler * m_msghandler
MsgHandler.
int m_initStatus
first event flag.
int m_id[c_maxThreads]
thread index.
bool m_handleMergeable
Whether to handle Mergeable objects.
int m_compressionLevel
Compression level in streaming.
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103

Member Function Documentation

◆ clearMergeable()

void clearMergeable ( TObject *  object)
static

assuming object is mergeable, clear its contents.

Use this after sending it to prevent sending the same data again in the next event.

Definition at line 111 of file DataStoreStreamer.cc.

◆ decodeEvtMessage()

void * decodeEvtMessage ( int  id)

Decode EvtMessage and store objects in temporary buffer.

Parameters
idThread id

Definition at line 326 of file DataStoreStreamer.cc.

◆ queueEvtMessage()

int queueEvtMessage ( char *  msg)

Queue EvtMessage for destreaming.

Parameters
msgEvent buffer to be restored.

Definition at line 296 of file DataStoreStreamer.cc.

◆ restoreDataStore()

int restoreDataStore ( EvtMessage msg)

Restore DataStore objects from EvtMessage.

Parameters
msgEvtMessage to be restored.

Definition at line 207 of file DataStoreStreamer.cc.

◆ streamDataStore()

EvtMessage * streamDataStore ( bool  addPersistentDurability,
bool  streamTransientObjects = false 
)

Store DataStore objects in EvtMessage.

Parameters
addPersistentDurabilityBy default, only c_Event data is streamed. Setting this to true will add c_Persistent data to the EvtMessage.
streamTransientObjectsShould objects/arrays registered as transient be streamed?
Returns
pointer to EvtMessage, caller is responsible for deletion

Definition at line 134 of file DataStoreStreamer.cc.

Member Data Documentation

◆ m_initStatus

int m_initStatus
private

first event flag.

0 during first event, 1 otherwise.

Definition at line 145 of file DataStoreStreamer.h.

◆ m_maxthread

int m_maxthread
private

Max.

number of threads for asynchronous processing

4 for default

Definition at line 157 of file DataStoreStreamer.h.

◆ m_msghandler

MsgHandler* m_msghandler
private

MsgHandler.

MsgHandler object used to form/decode EvtMessage

Definition at line 134 of file DataStoreStreamer.h.

◆ m_streamobjnames

std::vector<std::string> m_streamobjnames
private

names of object to be streamed

If size=0, all objects to be streamed

Definition at line 151 of file DataStoreStreamer.h.


The documentation for this class was generated from the following files: