Belle II Software development
RxModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/RxModule.h>
10#include <framework/pcore/EvtMessage.h>
11#include <framework/pcore/DataStoreStreamer.h>
12#include <framework/core/RandomNumbers.h>
13
14#include <TSystem.h>
15
16using namespace std;
17using namespace Belle2;
18
19RxModule::RxModule(RingBuffer* rbuf) : Module(), m_streamer(nullptr), m_nrecv(-1)
20{
21 //Set module properties
22 setDescription("Decode data from RingBuffer into DataStore");
24 setType("Rx");
25
26 m_rbuf = rbuf;
28 if (rbuf) {
29 setName("Rx" + std::to_string(rbuf->shmid()));
30 B2DEBUG(32, "Rx: Constructor with RingBuffer done.");
31 }
32}
33
34
35
36RxModule::~RxModule() = default;
37
39{
40 delete m_streamer;
42}
43
45{
46 auto* evtbuf = new char[EvtMessage::c_MaxEventSize];
47 while (!m_rbuf->isDead()) {
48 int size = m_rbuf->remq((int*)evtbuf);
49 if (size != 0) {
50 B2DEBUG(35, "Rx: got an event from RingBuffer, size=" << size);
51
52 // Restore objects in DataStore
53 EvtMessage evtmsg(evtbuf);
55 // Restore the event dependent random number object from Datastore
56 if (m_randomgenerator.isValid()) {
58 }
59 break;
60 }
61 usleep(20);
62 }
63
64 delete[] evtbuf;
65}
66
68{
69 gSystem->Load("libdataobjects");
70
72
74
75 // Read the first event in RingBuffer and restore in DataStore.
76 // This is necessary to create object tables before TTree initialization
77 // if used together with TTree based output (RootOutput module).
78 readEvent();
79}
80
82
84{
85 m_nrecv++;
86 // First event is already loaded in initialize()
87 if (m_nrecv == 0) return;
88
89 // Get a record from ringbuf
90 readEvent();
91}
92
94
96{
97 B2DEBUG(32, "Rx: terminate called");
98 delete m_streamer;
99}
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
Class to manage streamed object.
Definition: EvtMessage.h:59
static const unsigned int c_MaxEventSize
maximal EvtMessage size, in bytes (200MB).
Definition: EvtMessage.h:63
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:82
void setName(const std::string &name)
Set the name of the module.
Definition: Module.h:214
void setType(const std::string &type)
Set the module type.
Definition: Module.cc:48
static RandomGenerator & getEventRandomGenerator()
return reference to the event dependent random generator
Definition: RandomNumbers.h:73
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int shmid() const
Return ID of the shared memory.
Definition: RingBuffer.cc:458
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:308
bool isDead() const
If True, the ring buffer is empty and has no attached Tx modules (i.e.
Definition: RingBuffer.cc:394
DataStoreStreamer * m_streamer
Used for serialization.
Definition: RxModule.h:57
virtual void initialize() override
Module functions to be called from main process.
Definition: RxModule.cc:67
void initStreamer()
initialize m_streamer.
Definition: RxModule.cc:38
virtual void event() override
This method is the core of the module.
Definition: RxModule.cc:83
virtual void endRun() override
This method is called if the current run ends.
Definition: RxModule.cc:93
virtual void terminate() override
This method is called at the end of the event processing.
Definition: RxModule.cc:95
RingBuffer * m_rbuf
attached RingBuffer.
Definition: RxModule.h:54
virtual void beginRun() override
Called when entering a new run.
Definition: RxModule.cc:81
int m_nrecv
Current event number.
Definition: RxModule.h:63
void readEvent()
Gets data from m_rbuf and puts it into the data store.
Definition: RxModule.cc:44
StoreObjPtr< RandomGenerator > m_randomgenerator
Random Generator object to receive from TxModule.
Definition: RxModule.h:68
bool m_handleMergeable
Whether to handle Mergeable objects.
Definition: RxModule.h:65
int m_compressionLevel
Compression Level.
Definition: RxModule.h:60
RxModule(RingBuffer *rbuf)
Constructor.
Definition: RxModule.cc:19
Abstract base class for different kinds of events.
STL namespace.