9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/TxModule.h>
12 #include <framework/pcore/EvtMessage.h>
13 #include <framework/pcore/DataStoreStreamer.h>
14 #include <framework/pcore/ProcHandler.h>
15 #include <framework/core/RandomNumbers.h>
16 #include <framework/core/Environment.h>
21 TxModule::TxModule(
RingBuffer* rbuf) :
Module(), m_streamer(nullptr), m_blockingInsert(true)
34 B2DEBUG(32,
"Tx: Constructor with RingBuffer done.");
38 TxModule::~TxModule() =
default;
50 B2DEBUG(32,
"Tx: Streaming objects limited : " << (
Environment::Instance().getStreamingObjects()).size() <<
" objects");
53 B2DEBUG(32,
getName() <<
" initialized.");
62 B2DEBUG(35,
"beginRun called (will wait for reading processes to finish processing previous run...).");
68 B2DEBUG(35,
"beginRun done.");
91 B2WARNING(
"Ring buffer seems full, removing some previous data.");
99 B2DEBUG(35,
"Tx: objs sent in buffer. Size = " << msg->
size());
107 B2DEBUG(35,
"endRun done.");
113 B2DEBUG(32,
"Tx: terminate called");
Stream/restore DataStore objects to/from EvtMessage.
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Class to manage streamed object.
int paddedSize() const
Same as size(), but as size of an integer array.
char * buffer()
Get buffer address.
int size() const
Get size of message including headers.
void setDescription(const std::string &description)
Sets the description of the module.
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
@ c_Input
This module is an input module (reads data).
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
const std::string & getName() const
Returns the name of the module.
void setName(const std::string &name)
Set the name of the module.
void setType(const std::string &type)
Set the module type.
static bool isInputProcess()
Return true if the process is an input process.
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Class to manage a Ring Buffer placed in an IPC shared memory.
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
void txAttached()
Increase the number of attached Tx counter.
void txDetached()
Decrease the number of attached Tx counter.
int shmid() const
Return ID of the shared memory.
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
int remq(int *buf)
Pick up a buffer from the RingBuffer.
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
DataStoreStreamer * m_streamer
DataStoreStreamer.
virtual void initialize() override
Module functions to be called from main process.
virtual void event() override
This method is the core of the module.
bool m_blockingInsert
Whether to block until we can insert data into the ring buffer in event().
virtual void endRun() override
This method is called if the current run ends.
virtual void terminate() override
This method is called at the end of the event processing.
RingBuffer * m_rbuf
RingBuffer (not owned by us)
virtual void beginRun() override
Called when entering a new run.
int m_nsent
No. of sent events.
StoreObjPtr< RandomGenerator > m_randomgenerator
Random Generator object to pass to RxModule.
bool m_handleMergeable
Whether to handle Mergeable objects.
int m_compressionLevel
Compression parameter.
bool m_sendRandomState
Whether to transfer the RandomGenerator state.
Abstract base class for different kinds of events.