Belle II Software  release-08-01-10
DataStoreStreamer.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #pragma once
10 
11 #include <framework/pcore/EvtMessage.h>
12 
13 #include <Rtypes.h> //for BIT()
14 
15 #include <pthread.h>
16 
17 #include <vector>
18 #include <string>
19 
20 class TList;
21 
22 namespace Belle2 {
27  class MsgHandler;
28 
35  public:
37  static const unsigned int c_maxThreads = 16;
39  static const unsigned int c_maxQueueDepth = 64;
40 
46  explicit DataStoreStreamer(int complevel = 0, bool handleMergeable = true, int maxthread = 0);
51 
54 
55  // DataStore->EvtMessage
62  EvtMessage* streamDataStore(bool addPersistentDurability, bool streamTransientObjects = false);
63 
64 
65  // EvtMessage->DataStore
69  int restoreDataStore(EvtMessage* msg);
70 
72  void setStreamingObjects(const std::vector<std::string>& list);
73 
74  // Pipelined destreaming of EvtMessage using thread
75 
79  int queueEvtMessage(char* msg);
80 
84  void* decodeEvtMessage(int id);
85 
89 
91  void setMaxThreads(int);
93  int getMaxThreads();
95  void setDecoderStatus(int);
97  int getDecoderStatus();
98 
100  static bool isMergeable(const TObject* object);
101 
106  static void clearMergeable(TObject* object);
107 
109  static void mergeIntoExisting(TObject* existing, const TObject* received);
110 
112  static void removeSideEffects();
113 
114  private:
115 
117  int restoreStreamerInfos(const TList* list);
118 
125  c_IsTransient = BIT(19),
126  c_IsNull = BIT(20),
127  c_PersistentDurability = BIT(21)
128  };
129 
135 
138 
146 
151  std::vector<std::string> m_streamobjnames;
152 
158 
161  pthread_t m_pt[c_maxThreads];
166  //int m_threadout;
169  //MsgHandler* m_pmsghandler[c_maxThreads];
170  // char* m_evtbuf[c_maxThreads];
171  //std::queue<char*> m_evtbuf[c_maxThreads];
172 
173  };
174 
175  // Function to hook DataStoreStreamer::decodeEvtMessage to pthread
176 
177  // void* RunDecodeEvtMessage ( void* );
178 
179 
181 } // namespace Belle2
Stream/restore DataStore objects to/from EvtMessage.
static const unsigned int c_maxQueueDepth
Ask Itoh-san.
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
int getMaxThreads()
maximum number of threads.
int restoreStreamerInfos(const TList *list)
restore StreamerInfo from data in a file
pthread_t m_pt[c_maxThreads]
thread pointer
int queueEvtMessage(char *msg)
Queue EvtMessage for destreaming.
void * decodeEvtMessage(int id)
Decode EvtMessage and store objects in temporary buffer.
int restoreDataStoreAsync()
Restore objects in DataStore from temporary buffer.
std::vector< std::string > m_streamobjnames
names of object to be streamed
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
static bool isMergeable(const TObject *object)
Is the given object of a type that can be merged?
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
int m_threadin
current thread?
ETObjectBits
bits to store in TObject.
@ c_PersistentDurability
Object is of persistent durability.
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
static void mergeIntoExisting(TObject *existing, const TObject *received)
Assuming both objects are mergeable, merge 'received' into 'existing'.
MsgHandler * m_msghandler
MsgHandler.
int m_decoderStatus[c_maxThreads]
thread decoder status.
int m_initStatus
first event flag.
DataStoreStreamer(const DataStoreStreamer &)=delete
No copying.
int m_id[c_maxThreads]
thread index.
bool m_handleMergeable
Whether to handle Mergeable objects.
void setMaxThreads(int)
maximum number of threads.
void setDecoderStatus(int)
Ask Itoh-san about this.
int m_compressionLevel
Compression level in streaming.
int getDecoderStatus()
Ask Itoh-san about this.
static void clearMergeable(TObject *object)
assuming object is mergeable, clear its contents.
DataStoreStreamer & operator=(const DataStoreStreamer &)=delete
No assignment.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
DataStoreStreamer(int complevel=0, bool handleMergeable=true, int maxthread=0)
Constructor.
Class to manage streamed object.
Definition: EvtMessage.h:59
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
Abstract base class for different kinds of events.