Belle II Software development
TxModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/ProcHelper.h>
10#include <framework/pcore/TxModule.h>
11
12#include <framework/pcore/EvtMessage.h>
13#include <framework/pcore/DataStoreStreamer.h>
14#include <framework/pcore/ProcHandler.h>
15#include <framework/core/RandomNumbers.h>
16#include <framework/core/Environment.h>
17
18using namespace std;
19using namespace Belle2;
20
21TxModule::TxModule(RingBuffer* rbuf) : Module(), m_streamer(nullptr), m_blockingInsert(true)
22{
23 //Set module properties
24 setDescription("Encode DataStore into RingBuffer");
26 setType("Tx");
27
28 m_rbuf = rbuf;
29 m_nsent = 0;
31
32 if (rbuf) {
33 setName("Tx" + std::to_string(rbuf->shmid()));
34 B2DEBUG(32, "Tx: Constructor with RingBuffer done.");
35 }
36}
37
38TxModule::~TxModule() = default;
39
41{
44
47
48 if ((Environment::Instance().getStreamingObjects()).size() > 0) {
50 B2DEBUG(32, "Tx: Streaming objects limited : " << (Environment::Instance().getStreamingObjects()).size() << " objects");
51 }
52
53 B2DEBUG(32, getName() << " initialized.");
54}
55
56
58{
60 //NOTE: only needs to be done in input process, that way the parallel processes
61 // will never see runs out of order
62 B2DEBUG(35, "beginRun called (will wait for reading processes to finish processing previous run...).");
63 //wait until RB is both empty and all attached reading processes have finished..
64 while (!m_rbuf->isDead() and !m_rbuf->allRxWaiting()) {
65 usleep(500);
66 }
67 }
68 B2DEBUG(35, "beginRun done.");
69}
70
71
73{
75 //Save event level random generator into datastore to send it to other processes
76 if (!m_randomgenerator.isValid()) {
78 } else {
80 }
81 }
82
83 // Stream DataStore in EvtMessage, also stream transient objects and objects of durability c_Persistent
84 EvtMessage* msg = m_streamer->streamDataStore(true, true);
85
86 // Put the message in ring buffer
87 for (;;) {
88 int stat = m_rbuf->insq((int*)msg->buffer(), msg->paddedSize(), true);
89 if (stat >= 0) break;
90 if (!m_blockingInsert) {
91 B2WARNING("Ring buffer seems full, removing some previous data.");
92 m_rbuf->remq(nullptr);
93 }
94 // usleep(200);
95 usleep(20);
96 }
97 m_nsent++;
98
99 B2DEBUG(35, "Tx: objs sent in buffer. Size = " << msg->size());
100
101 // Release EvtMessage buffer
102 delete msg;
103}
104
106{
107 B2DEBUG(35, "endRun done.");
108}
109
110
112{
113 B2DEBUG(32, "Tx: terminate called");
114
116 delete m_streamer;
117 m_rbuf = nullptr;
118}
119
Stream/restore DataStore objects to/from EvtMessage.
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
int paddedSize() const
Same as size(), but as size of an integer array.
Definition: EvtMessage.cc:99
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:187
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:82
void setName(const std::string &name)
Set the name of the module.
Definition: Module.h:214
void setType(const std::string &type)
Set the module type.
Definition: Module.cc:48
static bool isInputProcess()
Return true if the process is an input process.
Definition: ProcHandler.cc:228
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:73
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
Definition: RingBuffer.cc:189
void txAttached()
Increase the number of attached Tx counter.
Definition: RingBuffer.cc:373
void txDetached()
Decrease the number of attached Tx counter.
Definition: RingBuffer.cc:381
int shmid() const
Return ID of the shared memory.
Definition: RingBuffer.cc:458
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
Definition: RingBuffer.cc:400
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:308
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
Definition: RingBuffer.cc:394
DataStoreStreamer * m_streamer
DataStoreStreamer.
Definition: TxModule.h:66
virtual void initialize() override
Module functions to be called from main process.
Definition: TxModule.cc:40
virtual void event() override
This method is the core of the module.
Definition: TxModule.cc:72
bool m_blockingInsert
Whether to block until we can insert data into the ring buffer in event().
Definition: TxModule.h:72
TxModule(RingBuffer *rbuf)
Constructor.
Definition: TxModule.cc:21
virtual void endRun() override
This method is called if the current run ends.
Definition: TxModule.cc:105
virtual void terminate() override
This method is called at the end of the event processing.
Definition: TxModule.cc:111
RingBuffer * m_rbuf
RingBuffer (not owned by us)
Definition: TxModule.h:63
virtual void beginRun() override
Called when entering a new run.
Definition: TxModule.cc:57
int m_nsent
No. of sent events.
Definition: TxModule.h:69
StoreObjPtr< RandomGenerator > m_randomgenerator
Random Generator object to pass to RxModule.
Definition: TxModule.h:78
bool m_handleMergeable
Whether to handle Mergeable objects.
Definition: TxModule.h:74
int m_compressionLevel
Compression parameter.
Definition: TxModule.h:60
bool m_sendRandomState
Whether to transfer the RandomGenerator state.
Definition: TxModule.h:75
Abstract base class for different kinds of events.
STL namespace.