Belle II Software  release-08-01-10
RxModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/RxModule.h>
10 #include <framework/pcore/EvtMessage.h>
11 #include <framework/pcore/DataStoreStreamer.h>
12 #include <framework/core/RandomNumbers.h>
13 
14 #include <TSystem.h>
15 
16 using namespace std;
17 using namespace Belle2;
18 
19 RxModule::RxModule(RingBuffer* rbuf) : Module(), m_streamer(nullptr), m_nrecv(-1)
20 {
21  //Set module properties
22  setDescription("Decode data from RingBuffer into DataStore");
24  setType("Rx");
25 
26  m_rbuf = rbuf;
28  if (rbuf) {
29  setName("Rx" + std::to_string(rbuf->shmid()));
30  B2DEBUG(32, "Rx: Constructor with RingBuffer done.");
31  }
32 }
33 
34 
35 
36 RxModule::~RxModule() = default;
37 
39 {
40  delete m_streamer;
42 }
43 
45 {
46  auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
47  while (!m_rbuf->isDead()) {
48  int size = m_rbuf->remq((int*)evtbuf);
49  if (size != 0) {
50  B2DEBUG(35, "Rx: got an event from RingBuffer, size=" << size);
51 
52  // Restore objects in DataStore
53  EvtMessage evtmsg(evtbuf);
54  m_streamer->restoreDataStore(&evtmsg);
55  // Restore the event dependent random number object from Datastore
56  if (m_randomgenerator.isValid()) {
58  }
59  break;
60  }
61  usleep(20);
62  }
63 
64  delete[] evtbuf;
65 }
66 
68 {
69  gSystem->Load("libdataobjects");
70 
71  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
72 
73  initStreamer();
74 
75  // Read the first event in RingBuffer and restore in DataStore.
76  // This is necessary to create object tables before TTree initialization
77  // if used together with TTree based output (RootOutput module).
78  readEvent();
79 }
80 
82 
84 {
85  m_nrecv++;
86  // First event is already loaded in initialize()
87  if (m_nrecv == 0) return;
88 
89  // Get a record from ringbuf
90  readEvent();
91 }
92 
93 void RxModule::endRun() { }
94 
96 {
97  B2DEBUG(32, "Rx: terminate called");
98  delete m_streamer;
99 }
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
Class to manage streamed object.
Definition: EvtMessage.h:59
static const unsigned int c_MaxEventSize
maximal EvtMessage size, in bytes (200MB).
Definition: EvtMessage.h:63
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:82
void setName(const std::string &name)
Set the name of the module.
Definition: Module.h:214
void setType(const std::string &type)
Set the module type.
Definition: Module.cc:48
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:73
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int shmid() const
Return ID of the shared memory.
Definition: RingBuffer.cc:458
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:308
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
Definition: RingBuffer.cc:394
DataStoreStreamer * m_streamer
Used for serialization.
Definition: RxModule.h:57
virtual void initialize() override
Module functions to be called from main process.
Definition: RxModule.cc:67
void initStreamer()
initialize m_streamer.
Definition: RxModule.cc:38
virtual void event() override
This method is the core of the module.
Definition: RxModule.cc:83
virtual void endRun() override
This method is called if the current run ends.
Definition: RxModule.cc:93
virtual void terminate() override
This method is called at the end of the event processing.
Definition: RxModule.cc:95
RingBuffer * m_rbuf
attached RingBuffer.
Definition: RxModule.h:54
virtual void beginRun() override
Called when entering a new run.
Definition: RxModule.cc:81
int m_nrecv
Current event number.
Definition: RxModule.h:63
void readEvent()
Gets data from m_rbuf and puts it into the data store.
Definition: RxModule.cc:44
StoreObjPtr< RandomGenerator > m_randomgenerator
Random Generator object to receive from TxModule.
Definition: RxModule.h:68
bool m_handleMergeable
Whether to handle Mergeable objects.
Definition: RxModule.h:65
int m_compressionLevel
Compression Level.
Definition: RxModule.h:60
Abstract base class for different kinds of events.