Belle II Software  release-05-02-19
DataStoreStreamer.h
1 //+
2 // File : DataStoreStreamer.h
3 // Description : Stream/Destream DataStore objects in EvtMessage
4 //
5 // Author : Ryosuke Itoh, IPNS, KEK
6 // Date : 5 - Sep - 2012
7 
8 #pragma once
9 
10 #include <framework/pcore/EvtMessage.h>
11 
12 #include <Rtypes.h> //for BIT()
13 
14 #include <pthread.h>
15 
16 #include <vector>
17 #include <string>
18 
19 class TList;
20 
21 namespace Belle2 {
26  class MsgHandler;
27 
34  public:
36  static const unsigned int c_maxThreads = 16;
38  static const unsigned int c_maxQueueDepth = 64;
39 
45  explicit DataStoreStreamer(int complevel = 0, bool handleMergeable = true, int maxthread = 0);
47  DataStoreStreamer(const DataStoreStreamer&) = delete;
50 
53 
54  // DataStore->EvtMessage
61  EvtMessage* streamDataStore(bool addPersistentDurability, bool streamTransientObjects = false);
62 
63 
64  // EvtMessage->DataStore
68  int restoreDataStore(EvtMessage* msg);
69 
71  void setStreamingObjects(const std::vector<std::string>& list);
72 
73  // Pipelined destreaming of EvtMessage using thread
74 
78  int queueEvtMessage(char* msg);
79 
83  void* decodeEvtMessage(int id);
84 
88 
90  void setMaxThreads(int);
92  int getMaxThreads();
94  void setDecoderStatus(int);
96  int getDecoderStatus();
97 
99  static bool isMergeable(const TObject* object);
100 
105  static void clearMergeable(TObject* object);
106 
108  static void mergeIntoExisting(TObject* existing, const TObject* received);
109 
111  static void removeSideEffects();
112 
113  private:
114 
116  int restoreStreamerInfos(const TList* list);
117 
124  c_IsTransient = BIT(19),
125  c_IsNull = BIT(20),
127  };
128 
134 
137 
145 
150  std::vector<std::string> m_streamobjnames;
151 
157 
160  pthread_t m_pt[c_maxThreads];
165  //int m_threadout;
168  //MsgHandler* m_pmsghandler[c_maxThreads];
169  // char* m_evtbuf[c_maxThreads];
170  //std::queue<char*> m_evtbuf[c_maxThreads];
171 
172  };
173 
174  // Function to hook DataStoreStreamer::decodeEvtMessage to pthread
175 
176  // void* RunDecodeEvtMessage ( void* );
177 
178 
180 } // namespace Belle2
Belle2::DataStoreStreamer::decodeEvtMessage
void * decodeEvtMessage(int id)
Decode EvtMessage and store objects in temporary buffer.
Definition: DataStoreStreamer.cc:326
Belle2::DataStoreStreamer::queueEvtMessage
int queueEvtMessage(char *msg)
Queue EvtMessage for destreaming.
Definition: DataStoreStreamer.cc:296
Belle2::DataStoreStreamer::m_id
int m_id[c_maxThreads]
thread index.
Definition: DataStoreStreamer.h:162
Belle2::DataStoreStreamer::~DataStoreStreamer
~DataStoreStreamer()
destructor
Definition: DataStoreStreamer.cc:96
Belle2::DataStoreStreamer::getMaxThreads
int getMaxThreads()
maximum number of threads.
Definition: DataStoreStreamer.cc:461
Belle2::DataStoreStreamer::restoreDataStoreAsync
int restoreDataStoreAsync()
Restore objects in DataStore from temporary buffer.
Definition: DataStoreStreamer.cc:394
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::DataStoreStreamer::isMergeable
static bool isMergeable(const TObject *object)
Is the given object of a type that can be merged?
Definition: DataStoreStreamer.cc:106
Belle2::DataStoreStreamer::m_compressionLevel
int m_compressionLevel
Compression level in streaming.
Definition: DataStoreStreamer.h:136
Belle2::DataStoreStreamer::c_PersistentDurability
@ c_PersistentDurability
Object is of persistent durability.
Definition: DataStoreStreamer.h:126
Belle2::DataStoreStreamer::setMaxThreads
void setMaxThreads(int)
maximum number of threads.
Definition: DataStoreStreamer.cc:456
Belle2::DataStoreStreamer::m_decoderStatus
int m_decoderStatus[c_maxThreads]
thread decoder status.
Definition: DataStoreStreamer.h:167
Belle2::DataStoreStreamer::setDecoderStatus
void setDecoderStatus(int)
Ask Itoh-san about this.
Definition: DataStoreStreamer.cc:472
Belle2::DataStoreStreamer::restoreDataStore
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
Definition: DataStoreStreamer.cc:207
Belle2::DataStoreStreamer::c_maxQueueDepth
static const unsigned int c_maxQueueDepth
Ask Itoh-san.
Definition: DataStoreStreamer.h:38
Belle2::DataStoreStreamer::m_pt
pthread_t m_pt[c_maxThreads]
thread pointer
Definition: DataStoreStreamer.h:160
Belle2::DataStoreStreamer::setStreamingObjects
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
Definition: DataStoreStreamer.cc:101
Belle2::DataStoreStreamer::removeSideEffects
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
Definition: DataStoreStreamer.cc:120
Belle2::DataStoreStreamer::clearMergeable
static void clearMergeable(TObject *object)
assuming object is mergeable, clear its contents.
Definition: DataStoreStreamer.cc:111
Belle2::DataStoreStreamer::c_IsNull
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
Definition: DataStoreStreamer.h:125
Belle2::DataStoreStreamer::m_handleMergeable
bool m_handleMergeable
Whether to handle Mergeable objects.
Definition: DataStoreStreamer.h:138
Belle2::DataStoreStreamer::c_maxThreads
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
Definition: DataStoreStreamer.h:36
Belle2::DataStoreStreamer::restoreStreamerInfos
int restoreStreamerInfos(const TList *list)
restore StreamerInfo from data in a file
Definition: DataStoreStreamer.cc:479
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::DataStoreStreamer::ETObjectBits
ETObjectBits
bits to store in TObject.
Definition: DataStoreStreamer.h:123
Belle2::DataStoreStreamer::c_IsTransient
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.
Definition: DataStoreStreamer.h:124
Belle2::DataStoreStreamer::m_threadin
int m_threadin
current thread?
Definition: DataStoreStreamer.h:164
Belle2::DataStoreStreamer::getDecoderStatus
int getDecoderStatus()
Ask Itoh-san about this.
Definition: DataStoreStreamer.cc:466
Belle2::DataStoreStreamer::DataStoreStreamer
DataStoreStreamer(int complevel=0, bool handleMergeable=true, int maxthread=0)
Constructor.
Definition: DataStoreStreamer.cc:58
Belle2::DataStoreStreamer::m_maxthread
int m_maxthread
Max.
Definition: DataStoreStreamer.h:156
Belle2::DataStoreStreamer::m_initStatus
int m_initStatus
first event flag.
Definition: DataStoreStreamer.h:144
Belle2::DataStoreStreamer::operator=
DataStoreStreamer & operator=(const DataStoreStreamer &)=delete
No assignment.
Belle2::DataStoreStreamer
Stream/restore DataStore objects to/from EvtMessage.
Definition: DataStoreStreamer.h:33
Belle2::DataStoreStreamer::m_streamobjnames
std::vector< std::string > m_streamobjnames
names of object to be streamed
Definition: DataStoreStreamer.h:150
Belle2::DataStoreStreamer::mergeIntoExisting
static void mergeIntoExisting(TObject *existing, const TObject *received)
Assuming both objects are mergeable, merge 'received' into 'existing'.
Definition: DataStoreStreamer.cc:115
Belle2::MsgHandler
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:104
Belle2::DataStoreStreamer::m_msghandler
MsgHandler * m_msghandler
MsgHandler.
Definition: DataStoreStreamer.h:133
Belle2::DataStoreStreamer::streamDataStore
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
Definition: DataStoreStreamer.cc:134