 |
Belle II Software
release-05-02-19
|
9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/TxModule.h>
12 #include <framework/pcore/EvtMessage.h>
13 #include <framework/pcore/DataStoreStreamer.h>
14 #include <framework/pcore/ProcHandler.h>
15 #include <framework/core/RandomNumbers.h>
16 #include <framework/core/Environment.h>
21 TxModule::TxModule(
RingBuffer* rbuf) :
Module(), m_streamer(nullptr), m_blockingInsert(true)
34 B2DEBUG(32,
"Tx: Constructor with RingBuffer done.");
38 TxModule::~TxModule() =
default;
50 B2DEBUG(32,
"Tx: Streaming objects limited : " << (
Environment::Instance().getStreamingObjects()).size() <<
" objects");
53 B2DEBUG(32,
getName() <<
" initialized.");
62 B2DEBUG(35,
"beginRun called (will wait for reading processes to finish processing previous run...).");
68 B2DEBUG(35,
"beginRun done.");
91 B2WARNING(
"Ring buffer seems full, removing some previous data.");
99 B2DEBUG(35,
"Tx: objs sent in buffer. Size = " << msg->
size());
107 B2DEBUG(35,
"endRun done.");
113 B2DEBUG(32,
"Tx: terminate called");
int paddedSize() const
Same as size(), but as size of an integer array.
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
int m_nsent
No. of sent events.
virtual void event() override
This method is the core of the module.
virtual void initialize() override
Module functions to be called from main process.
bool m_handleMergeable
Whether to handle Mergeable objects.
void txAttached()
Increase the number of attached Tx counter.
void setDescription(const std::string &description)
Sets the description of the module.
virtual void beginRun() override
Called when entering a new run.
Class to manage streamed object.
RingBuffer * m_rbuf
RingBuffer (not owned by us)
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
virtual void terminate() override
This method is called at the end of the event processing.
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
Class to manage a Ring Buffer placed in an IPC shared memory.
bool m_sendRandomState
Whether to transfer the RandomGenerator state.
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
int size() const
Get size of message including headers.
Abstract base class for different kinds of events.
void txDetached()
Decrease the number of attached Tx counter.
void setType(const std::string &type)
Set the module type.
int m_compressionLevel
Compression parameter.
StoreObjPtr< RandomGenerator > m_randomgenerator
Random Generator object to pass to RxModule.
DataStoreStreamer * m_streamer
DataStoreStreamer.
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
static bool isInputProcess()
Return true if the process is an input process.
virtual void endRun() override
This method is called if the current run ends.
char * buffer()
Get buffer address.
@ c_Input
This module is an input module (reads data).
int remq(int *buf)
Pick up a buffer from the RingBuffer.
static Environment & Instance()
Static method to get a reference to the Environment instance.
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
int shmid() const
Return ID of the shared memory.
const std::string & getName() const
Returns the name of the module.
void setName(const std::string &name)
Set the name of the module.
Stream/restore DataStore objects to/from EvtMessage.
bool m_blockingInsert
Whether to block until we can insert data into the ring buffer in event().
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.