Belle II Software  release-08-01-10
TxModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/TxModule.h>
11 
12 #include <framework/pcore/EvtMessage.h>
13 #include <framework/pcore/DataStoreStreamer.h>
14 #include <framework/pcore/ProcHandler.h>
15 #include <framework/core/RandomNumbers.h>
16 #include <framework/core/Environment.h>
17 
18 using namespace std;
19 using namespace Belle2;
20 
21 TxModule::TxModule(RingBuffer* rbuf) : Module(), m_streamer(nullptr), m_blockingInsert(true)
22 {
23  //Set module properties
24  setDescription("Encode DataStore into RingBuffer");
26  setType("Tx");
27 
28  m_rbuf = rbuf;
29  m_nsent = 0;
31 
32  if (rbuf) {
33  setName("Tx" + std::to_string(rbuf->shmid()));
34  B2DEBUG(32, "Tx: Constructor with RingBuffer done.");
35  }
36 }
37 
38 TxModule::~TxModule() = default;
39 
41 {
43  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
44 
45  m_rbuf->txAttached();
47 
48  if ((Environment::Instance().getStreamingObjects()).size() > 0) {
49  m_streamer->setStreamingObjects(Environment::Instance().getStreamingObjects());
50  B2DEBUG(32, "Tx: Streaming objects limited : " << (Environment::Instance().getStreamingObjects()).size() << " objects");
51  }
52 
53  B2DEBUG(32, getName() << " initialized.");
54 }
55 
56 
58 {
60  //NOTE: only needs to be done in input process, that way the parallel processes
61  // will never see runs out of order
62  B2DEBUG(35, "beginRun called (will wait for reading processes to finish processing previous run...).");
63  //wait until RB is both empty and all attached reading processes have finished..
64  while (!m_rbuf->isDead() and !m_rbuf->allRxWaiting()) {
65  usleep(500);
66  }
67  }
68  B2DEBUG(35, "beginRun done.");
69 }
70 
71 
73 {
74  if (m_sendRandomState) {
75  //Save event level random generator into datastore to send it to other processes
76  if (!m_randomgenerator.isValid()) {
78  } else {
80  }
81  }
82 
83  // Stream DataStore in EvtMessage, also stream transient objects and objects of durability c_Persistent
84  EvtMessage* msg = m_streamer->streamDataStore(true, true);
85 
86  // Put the message in ring buffer
87  for (;;) {
88  int stat = m_rbuf->insq((int*)msg->buffer(), msg->paddedSize(), true);
89  if (stat >= 0) break;
90  if (!m_blockingInsert) {
91  B2WARNING("Ring buffer seems full, removing some previous data.");
92  m_rbuf->remq(nullptr);
93  }
94  // usleep(200);
95  usleep(20);
96  }
97  m_nsent++;
98 
99  B2DEBUG(35, "Tx: objs sent in buffer. Size = " << msg->size());
100 
101  // Release EvtMessage buffer
102  delete msg;
103 }
104 
106 {
107  B2DEBUG(35, "endRun done.");
108 }
109 
110 
112 {
113  B2DEBUG(32, "Tx: terminate called");
114 
115  m_rbuf->txDetached();
116  delete m_streamer;
117  m_rbuf = nullptr;
118 }
119 
Stream/restore DataStore objects to/from EvtMessage.
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
int paddedSize() const
Same as size(), but as size of an integer array.
Definition: EvtMessage.cc:99
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:82
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:187
void setName(const std::string &name)
Set the name of the module.
Definition: Module.h:214
void setType(const std::string &type)
Set the module type.
Definition: Module.cc:48
static bool isInputProcess()
Return true if the process is an input process.
Definition: ProcHandler.cc:228
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:73
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
Definition: RingBuffer.cc:189
void txAttached()
Increase the number of attached Tx counter.
Definition: RingBuffer.cc:373
void txDetached()
Decrease the number of attached Tx counter.
Definition: RingBuffer.cc:381
int shmid() const
Return ID of the shared memory.
Definition: RingBuffer.cc:458
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
Definition: RingBuffer.cc:400
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:308
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
Definition: RingBuffer.cc:394
DataStoreStreamer * m_streamer
DataStoreStreamer.
Definition: TxModule.h:66
virtual void initialize() override
Module functions to be called from main process.
Definition: TxModule.cc:40
virtual void event() override
This method is the core of the module.
Definition: TxModule.cc:72
bool m_blockingInsert
Whether to block until we can insert data into the ring buffer in event().
Definition: TxModule.h:72
virtual void endRun() override
This method is called if the current run ends.
Definition: TxModule.cc:105
virtual void terminate() override
This method is called at the end of the event processing.
Definition: TxModule.cc:111
RingBuffer * m_rbuf
RingBuffer (not owned by us)
Definition: TxModule.h:63
virtual void beginRun() override
Called when entering a new run.
Definition: TxModule.cc:57
int m_nsent
No. of sent events.
Definition: TxModule.h:69
StoreObjPtr< RandomGenerator > m_randomgenerator
Random Generator object to pass to RxModule.
Definition: TxModule.h:78
bool m_handleMergeable
Whether to handle Mergeable objects.
Definition: TxModule.h:74
int m_compressionLevel
Compression parameter.
Definition: TxModule.h:60
bool m_sendRandomState
Whether to transfer the RandomGenerator state.
Definition: TxModule.h:75
Abstract base class for different kinds of events.