Belle II Software  release-05-02-19
RxModule.cc
1 //+
2 // File : RxModule.cc
3 // Description : Module to encode DataStore and place it in Ringbuffer
4 //
5 // Author : Ryosuke Itoh, IPNS, KEK
6 // Date : 13 - Aug - 2010
7 //-
8 
9 #include <framework/pcore/RxModule.h>
10 #include <framework/pcore/EvtMessage.h>
11 #include <framework/pcore/DataStoreStreamer.h>
12 #include <framework/core/RandomNumbers.h>
13 
14 #include <TSystem.h>
15 
16 using namespace std;
17 using namespace Belle2;
18 
19 RxModule::RxModule(RingBuffer* rbuf) : Module(), m_streamer(nullptr), m_nrecv(-1)
20 {
21  //Set module properties
22  setDescription("Decode data from RingBuffer into DataStore");
24  setType("Rx");
25 
26  m_rbuf = rbuf;
28  if (rbuf) {
29  setName("Rx" + std::to_string(rbuf->shmid()));
30  B2DEBUG(32, "Rx: Constructor with RingBuffer done.");
31  }
32 }
33 
34 
35 
36 RxModule::~RxModule() = default;
37 
39 {
40  delete m_streamer;
42 }
43 
45 {
46  auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
47  while (!m_rbuf->isDead()) {
48  int size = m_rbuf->remq((int*)evtbuf);
49  if (size != 0) {
50  B2DEBUG(35, "Rx: got an event from RingBuffer, size=" << size);
51 
52  // Restore objects in DataStore
53  EvtMessage evtmsg(evtbuf);
54  m_streamer->restoreDataStore(&evtmsg);
55  // Restore the event dependent random number object from Datastore
56  if (m_randomgenerator.isValid()) {
58  }
59  break;
60  }
61  usleep(20);
62  }
63 
64  delete[] evtbuf;
65 }
66 
68 {
69  gSystem->Load("libdataobjects");
70 
71  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
72 
73  initStreamer();
74 
75  // Read the first event in RingBuffer and restore in DataStore.
76  // This is necessary to create object tables before TTree initialization
77  // if used together with TTree based output (RootOutput module).
78  readEvent();
79 }
80 
82 
84 {
85  m_nrecv++;
86  // First event is already loaded in initialize()
87  if (m_nrecv == 0) return;
88 
89  // Get a record from ringbuf
90  readEvent();
91 }
92 
93 void RxModule::endRun() { }
94 
96 {
97  B2DEBUG(32, "Rx: terminate called");
98  delete m_streamer;
99 }
Belle2::RingBuffer::isDead
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
Definition: RingBuffer.cc:397
Belle2::RxModule::beginRun
virtual void beginRun() override
Called when entering a new run.
Definition: RxModule.cc:81
Belle2::RxModule::m_rbuf
RingBuffer * m_rbuf
attached RingBuffer.
Definition: RxModule.h:54
Belle2::EvtMessage::c_MaxEventSize
const static unsigned int c_MaxEventSize
maximal EvtMessage size, in bytes (200MB).
Definition: EvtMessage.h:64
Belle2::Module::setDescription
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:216
Belle2::RxModule::event
virtual void event() override
This method is the core of the module.
Definition: RxModule.cc:83
Belle2::RxModule::initialize
virtual void initialize() override
Module functions to be called from main process.
Definition: RxModule.cc:67
Belle2::RxModule::m_handleMergeable
bool m_handleMergeable
Whether to handle Mergeable objects.
Definition: RxModule.h:65
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::RxModule::m_nrecv
int m_nrecv
Current event number.
Definition: RxModule.h:63
Belle2::Module::c_InternalSerializer
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:84
Belle2::DataStore::c_DontWriteOut
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:73
Belle2::DataStoreStreamer::restoreDataStore
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
Definition: DataStoreStreamer.cc:207
Belle2::RxModule::terminate
virtual void terminate() override
This method is called at the end of the event processing.
Definition: RxModule.cc:95
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
Belle2::Module::setPropertyFlags
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:210
Belle2::RxModule::m_compressionLevel
int m_compressionLevel
Compression Level.
Definition: RxModule.h:60
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RxModule::m_streamer
DataStoreStreamer * m_streamer
Used for serialization.
Definition: RxModule.h:57
Belle2::Module::setType
void setType(const std::string &type)
Set the module type.
Definition: Module.cc:50
Belle2::RxModule::initStreamer
void initStreamer()
initialize m_streamer.
Definition: RxModule.cc:38
Belle2::RandomNumbers::getEventRandomGenerator
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:83
Belle2::RxModule::m_randomgenerator
StoreObjPtr< RandomGenerator > m_randomgenerator
Random Generator object to receive from TxModule.
Definition: RxModule.h:68
Belle2::RxModule::endRun
virtual void endRun() override
This method is called if the current run ends.
Definition: RxModule.cc:93
Belle2::Module::c_Input
@ c_Input
This module is an input module (reads data).
Definition: Module.h:80
Belle2::RingBuffer::remq
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:311
Belle2::RingBuffer::shmid
int shmid() const
Return ID of the shared memory.
Definition: RingBuffer.cc:461
Belle2::Module::setName
void setName(const std::string &name)
Set the name of the module.
Definition: Module.h:216
Belle2::DataStoreStreamer
Stream/restore DataStore objects to/from EvtMessage.
Definition: DataStoreStreamer.h:33
Belle2::RxModule::readEvent
void readEvent()
Gets data from m_rbuf and puts it into the data store.
Definition: RxModule.cc:44