Belle II Software development
DataStoreStreamer.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#pragma once
10
11#include <framework/pcore/EvtMessage.h>
12
13#include <Rtypes.h> //for BIT()
14
15#include <pthread.h>
16
17#include <vector>
18#include <string>
19
20class TList;
21
22namespace Belle2 {
27 class MsgHandler;
28
35 public:
37 static const unsigned int c_maxThreads = 16;
39 static const unsigned int c_maxQueueDepth = 64;
40
46 explicit DataStoreStreamer(int complevel = 0, bool handleMergeable = true, int maxthread = 0);
51
54
55 // DataStore->EvtMessage
62 EvtMessage* streamDataStore(bool addPersistentDurability, bool streamTransientObjects = false);
63
64
65 // EvtMessage->DataStore
70
72 void setStreamingObjects(const std::vector<std::string>& list);
73
74 // Pipelined destreaming of EvtMessage using thread
75
79 int queueEvtMessage(char* msg);
80
84 void* decodeEvtMessage(int id);
85
89
91 void setMaxThreads(int);
93 int getMaxThreads();
95 void setDecoderStatus(int);
97 int getDecoderStatus();
98
100 static bool isMergeable(const TObject* object);
101
106 static void clearMergeable(TObject* object);
107
109 static void mergeIntoExisting(TObject* existing, const TObject* received);
110
112 static void removeSideEffects();
113
114 private:
115
117 int restoreStreamerInfos(const TList* list);
118
125 c_IsTransient = BIT(19),
126 c_IsNull = BIT(20),
127 c_PersistentDurability = BIT(21)
128 };
129
135
138
146
151 std::vector<std::string> m_streamobjnames;
152
158
161 pthread_t m_pt[c_maxThreads];
166 //int m_threadout;
169 //MsgHandler* m_pmsghandler[c_maxThreads];
170 // char* m_evtbuf[c_maxThreads];
171 //std::queue<char*> m_evtbuf[c_maxThreads];
172
173 };
174
175 // Function to hook DataStoreStreamer::decodeEvtMessage to pthread
176
177 // void* RunDecodeEvtMessage ( void* );
178
179
181} // namespace Belle2
Stream/restore DataStore objects to/from EvtMessage.
static const unsigned int c_maxQueueDepth
Ask Itoh-san.
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
int getMaxThreads()
maximum number of threads.
int restoreStreamerInfos(const TList *list)
restore StreamerInfo from data in a file
pthread_t m_pt[c_maxThreads]
thread pointer
int queueEvtMessage(char *msg)
Queue EvtMessage for destreaming.
void * decodeEvtMessage(int id)
Decode EvtMessage and store objects in temporary buffer.
int restoreDataStoreAsync()
Restore objects in DataStore from temporary buffer.
std::vector< std::string > m_streamobjnames
names of object to be streamed
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
static bool isMergeable(const TObject *object)
Is the given object of a type that can be merged?
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
int m_threadin
current thread?
ETObjectBits
bits to store in TObject.
@ c_PersistentDurability
Object is of persistent durability.
@ c_IsNull
object is not valid for current event, set StoreEntry::ptr to NULL.
@ c_IsTransient
The corresponding StoreEntry has flag c_DontWriteOut.
static const unsigned int c_maxThreads
global maximum number of threads (cannot set higher number).
static void mergeIntoExisting(TObject *existing, const TObject *received)
Assuming both objects are mergeable, merge 'received' into 'existing'.
MsgHandler * m_msghandler
MsgHandler.
int m_decoderStatus[c_maxThreads]
thread decoder status.
int m_initStatus
first event flag.
DataStoreStreamer(const DataStoreStreamer &)=delete
No copying.
int m_id[c_maxThreads]
thread index.
bool m_handleMergeable
Whether to handle Mergeable objects.
void setMaxThreads(int)
maximum number of threads.
void setDecoderStatus(int)
Ask Itoh-san about this.
int m_compressionLevel
Compression level in streaming.
int getDecoderStatus()
Ask Itoh-san about this.
static void clearMergeable(TObject *object)
assuming object is mergeable, clear its contents.
DataStoreStreamer & operator=(const DataStoreStreamer &)=delete
No assignment.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
Abstract base class for different kinds of events.