Belle II Software  release-05-02-19
TxModule.cc
1 //+
2 // File : TxModule.cc
3 // Description : Module to encode DataStore and place it in Ringbuffer
4 //
5 // Author : Ryosuke Itoh, IPNS, KEK
6 // Date : 13 - Aug - 2010
7 //-
8 
9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/TxModule.h>
11 
12 #include <framework/pcore/EvtMessage.h>
13 #include <framework/pcore/DataStoreStreamer.h>
14 #include <framework/pcore/ProcHandler.h>
15 #include <framework/core/RandomNumbers.h>
16 #include <framework/core/Environment.h>
17 
18 using namespace std;
19 using namespace Belle2;
20 
21 TxModule::TxModule(RingBuffer* rbuf) : Module(), m_streamer(nullptr), m_blockingInsert(true)
22 {
23  //Set module properties
24  setDescription("Encode DataStore into RingBuffer");
26  setType("Tx");
27 
28  m_rbuf = rbuf;
29  m_nsent = 0;
31 
32  if (rbuf) {
33  setName("Tx" + std::to_string(rbuf->shmid()));
34  B2DEBUG(32, "Tx: Constructor with RingBuffer done.");
35  }
36 }
37 
38 TxModule::~TxModule() = default;
39 
41 {
43  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
44 
45  m_rbuf->txAttached();
47 
48  if ((Environment::Instance().getStreamingObjects()).size() > 0) {
49  m_streamer->setStreamingObjects(Environment::Instance().getStreamingObjects());
50  B2DEBUG(32, "Tx: Streaming objects limited : " << (Environment::Instance().getStreamingObjects()).size() << " objects");
51  }
52 
53  B2DEBUG(32, getName() << " initialized.");
54 }
55 
56 
58 {
60  //NOTE: only needs to be done in input process, that way the parallel processes
61  // will never see runs out of order
62  B2DEBUG(35, "beginRun called (will wait for reading processes to finish processing previous run...).");
63  //wait until RB is both empty and all attached reading processes have finished..
64  while (!m_rbuf->isDead() and !m_rbuf->allRxWaiting()) {
65  usleep(500);
66  }
67  }
68  B2DEBUG(35, "beginRun done.");
69 }
70 
71 
73 {
74  if (m_sendRandomState) {
75  //Save event level random generator into datastore to send it to other processes
76  if (!m_randomgenerator.isValid()) {
78  } else {
80  }
81  }
82 
83  // Stream DataStore in EvtMessage, also stream transient objects and objects of durability c_Persistent
84  EvtMessage* msg = m_streamer->streamDataStore(true, true);
85 
86  // Put the message in ring buffer
87  for (;;) {
88  int stat = m_rbuf->insq((int*)msg->buffer(), msg->paddedSize(), true);
89  if (stat >= 0) break;
90  if (!m_blockingInsert) {
91  B2WARNING("Ring buffer seems full, removing some previous data.");
92  m_rbuf->remq(nullptr);
93  }
94  // usleep(200);
95  usleep(20);
96  }
97  m_nsent++;
98 
99  B2DEBUG(35, "Tx: objs sent in buffer. Size = " << msg->size());
100 
101  // Release EvtMessage buffer
102  delete msg;
103 }
104 
106 {
107  B2DEBUG(35, "endRun done.");
108 }
109 
110 
112 {
113  B2DEBUG(32, "Tx: terminate called");
114 
115  m_rbuf->txDetached();
116  delete m_streamer;
117  m_rbuf = nullptr;
118 }
119 
Belle2::EvtMessage::paddedSize
int paddedSize() const
Same as size(), but as size of an integer array.
Definition: EvtMessage.cc:100
Belle2::RingBuffer::isDead
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
Definition: RingBuffer.cc:397
Belle2::TxModule::m_nsent
int m_nsent
No. of sent events.
Definition: TxModule.h:69
Belle2::TxModule::event
virtual void event() override
This method is the core of the module.
Definition: TxModule.cc:72
Belle2::TxModule::initialize
virtual void initialize() override
Module functions to be called from main process.
Definition: TxModule.cc:40
Belle2::TxModule::m_handleMergeable
bool m_handleMergeable
Whether to handle Mergeable objects.
Definition: TxModule.h:74
Belle2::RingBuffer::txAttached
void txAttached()
Increase the number of attached Tx counter.
Definition: RingBuffer.cc:376
Belle2::Module::setDescription
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:216
Belle2::TxModule::beginRun
virtual void beginRun() override
Called when entering a new run.
Definition: TxModule.cc:57
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::TxModule::m_rbuf
RingBuffer * m_rbuf
RingBuffer (not owned by us)
Definition: TxModule.h:63
Belle2::Module::c_InternalSerializer
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:84
Belle2::DataStore::c_DontWriteOut
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:73
Belle2::TxModule::terminate
virtual void terminate() override
This method is called at the end of the event processing.
Definition: TxModule.cc:111
Belle2::RingBuffer::allRxWaiting
bool allRxWaiting() const
True if and only if buffer is empty and nbusy == 0.
Definition: RingBuffer.cc:403
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
Belle2::TxModule::m_sendRandomState
bool m_sendRandomState
Whether to transfer the RandomGenerator state.
Definition: TxModule.h:75
Belle2::Module::setPropertyFlags
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:210
Belle2::DataStoreStreamer::setStreamingObjects
void setStreamingObjects(const std::vector< std::string > &list)
Set names of objects to be streamed/destreamed.
Definition: DataStoreStreamer.cc:101
Belle2::EvtMessage::size
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:95
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RingBuffer::txDetached
void txDetached()
Decrease the number of attached Tx counter.
Definition: RingBuffer.cc:384
Belle2::Module::setType
void setType(const std::string &type)
Set the module type.
Definition: Module.cc:50
Belle2::TxModule::m_compressionLevel
int m_compressionLevel
Compression parameter.
Definition: TxModule.h:60
Belle2::TxModule::m_randomgenerator
StoreObjPtr< RandomGenerator > m_randomgenerator
Random Generator object to pass to RxModule.
Definition: TxModule.h:78
Belle2::TxModule::m_streamer
DataStoreStreamer * m_streamer
DataStoreStreamer.
Definition: TxModule.h:66
Belle2::RandomNumbers::getEventRandomGenerator
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:83
Belle2::ProcHandler::isInputProcess
static bool isInputProcess()
Return true if the process is an input process.
Definition: ProcHandler.cc:223
Belle2::TxModule::endRun
virtual void endRun() override
This method is called if the current run ends.
Definition: TxModule.cc:105
Belle2::EvtMessage::buffer
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:77
Belle2::Module::c_Input
@ c_Input
This module is an input module (reads data).
Definition: Module.h:80
Belle2::RingBuffer::remq
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:311
Belle2::Environment::Instance
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:31
Belle2::RingBuffer::insq
int insq(const int *buf, int size, bool checkTx=false)
Append a buffer to the RingBuffer.
Definition: RingBuffer.cc:192
Belle2::RingBuffer::shmid
int shmid() const
Return ID of the shared memory.
Definition: RingBuffer.cc:461
Belle2::Module::getName
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:189
Belle2::Module::setName
void setName(const std::string &name)
Set the name of the module.
Definition: Module.h:216
Belle2::DataStoreStreamer
Stream/restore DataStore objects to/from EvtMessage.
Definition: DataStoreStreamer.h:33
Belle2::TxModule::m_blockingInsert
bool m_blockingInsert
Whether to block until we can insert data into the ring buffer in event().
Definition: TxModule.h:72
Belle2::DataStoreStreamer::streamDataStore
EvtMessage * streamDataStore(bool addPersistentDurability, bool streamTransientObjects=false)
Store DataStore objects in EvtMessage.
Definition: DataStoreStreamer.cc:134