Belle II Software development
TxModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/TxModule.h>
10
11#include <framework/pcore/EvtMessage.h>
12#include <framework/pcore/DataStoreStreamer.h>
13#include <framework/pcore/ProcHandler.h>
14#include <framework/core/RandomNumbers.h>
15#include <framework/core/Environment.h>
16
17using namespace std;
18using namespace Belle2;
19
20TxModule::TxModule(RingBuffer* rbuf) : Module(), m_streamer(nullptr), m_blockingInsert(true)
21{
22 //Set module properties
23 setDescription("Encode DataStore into RingBuffer");
25 setType("Tx");
26
27 m_rbuf = rbuf;
28 m_nsent = 0;
30
31 if (rbuf) {
32 setName("Tx" + std::to_string(rbuf->shmid()));
33 B2DEBUG(32, "Tx: Constructor with RingBuffer done.");
34 }
35}
36
37TxModule::~TxModule() = default;
38
40{
43
46
47 if ((Environment::Instance().getStreamingObjects()).size() > 0) {
49 B2DEBUG(32, "Tx: Streaming objects limited : " << (Environment::Instance().getStreamingObjects()).size() << " objects");
50 }
51
52 B2DEBUG(32, getName() << " initialized.");
53}
54
55
57{
59 //NOTE: only needs to be done in input process, that way the parallel processes
60 // will never see runs out of order
61 B2DEBUG(35, "beginRun called (will wait for reading processes to finish processing previous run...).");
62 //wait until RB is both empty and all attached reading processes have finished..
63 while (!m_rbuf->isDead() and !m_rbuf->allRxWaiting()) {
64 usleep(500);
65 }
66 }
67 B2DEBUG(35, "beginRun done.");
68}
69
70
72{
74 //Save event level random generator into datastore to send it to other processes
75 if (!m_randomgenerator.isValid()) {
77 } else {
79 }
80 }
81
82 // Stream DataStore in EvtMessage, also stream transient objects and objects of durability c_Persistent
83 EvtMessage* msg = m_streamer->streamDataStore(true, true);
84
85 // Put the message in ring buffer
86 for (;;) {
87 int stat = m_rbuf->insq((int*)msg->buffer(), msg->paddedSize(), true);
88 if (stat >= 0) break;
89 if (!m_blockingInsert) {
90 B2WARNING("Ring buffer seems full, removing some previous data.");
91 m_rbuf->remq(nullptr);
92 }
93 // usleep(200);
94 usleep(20);
95 }
96 m_nsent++;
97
98 B2DEBUG(35, "Tx: objs sent in buffer. Size = " << msg->size());
99
100 // Release EvtMessage buffer
101 delete msg;
102}
103
105{
106 B2DEBUG(35, "endRun done.");
107}
108
109
111{
112 B2DEBUG(32, "Tx: terminate called");
113
115 delete m_streamer;
116 m_rbuf = nullptr;
117}
118
Stream/restore DataStore objects to/from EvtMessage.
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
int paddedSize() const
Same as size(), but as size of an integer array.
Definition: EvtMessage.cc:99
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:186
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:82
void setName(const std::string &name)
Set the name of the module.
Definition: Module.h:213
void setType(const std::string &type)
Set the module type.
Definition: Module.cc:48
static bool isInputProcess()
Return true if the process is an input process.
Definition: ProcHandler.cc:228
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:73
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
Definition: RingBuffer.cc:189
void txAttached()
Increase the number of attached Tx counter.
Definition: RingBuffer.cc:373
void txDetached()
Decrease the number of attached Tx counter.
Definition: RingBuffer.cc:381
int shmid() const
Return ID of the shared memory.
Definition: RingBuffer.cc:458
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
Definition: RingBuffer.cc:400
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:308
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
Definition: RingBuffer.cc:394
DataStoreStreamer * m_streamer
DataStoreStreamer.
Definition: TxModule.h:66
virtual void initialize() override
Module functions to be called from main process.
Definition: TxModule.cc:39
virtual void event() override
This method is the core of the module.
Definition: TxModule.cc:71
bool m_blockingInsert
Whether to block until we can insert data into the ring buffer in event().
Definition: TxModule.h:72
TxModule(RingBuffer *rbuf)
Constructor.
Definition: TxModule.cc:20
virtual void endRun() override
This method is called if the current run ends.
Definition: TxModule.cc:104
virtual void terminate() override
This method is called at the end of the event processing.
Definition: TxModule.cc:110
RingBuffer * m_rbuf
RingBuffer (not owned by us)
Definition: TxModule.h:63
virtual void beginRun() override
Called when entering a new run.
Definition: TxModule.cc:56
int m_nsent
No. of sent events.
Definition: TxModule.h:69
StoreObjPtr< RandomGenerator > m_randomgenerator
Random Generator object to pass to RxModule.
Definition: TxModule.h:78
bool m_handleMergeable
Whether to handle Mergeable objects.
Definition: TxModule.h:74
int m_compressionLevel
Compression parameter.
Definition: TxModule.h:60
bool m_sendRandomState
Whether to transfer the RandomGenerator state.
Definition: TxModule.h:75
Abstract base class for different kinds of events.
STL namespace.