Belle II Software development
FastRbuf2DsModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/rfarm/event/modules/FastRbuf2DsModule.h>
10#include <TSystem.h>
11#include <stdlib.h>
12
13using namespace std;
14using namespace Belle2;
15
16static FastRbuf2DsModule* s_input = NULL;
17
18//-----------------------------------------------------------------
19// Rbuf-Read Thread Interface
20//-----------------------------------------------------------------
21void* RunRbufReader(void*)
22{
23 s_input->ReadRbufInThread();
24 return NULL;
25}
26
27//-----------------------------------------------------------------
28// Register the Module
29//-----------------------------------------------------------------
30REG_MODULE(FastRbuf2Ds);
31
32//-----------------------------------------------------------------
33// Implementation
34//-----------------------------------------------------------------
35
37{
38 //Set module properties
39 setDescription("Encode DataStore into RingBuffer");
40 // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
41
42 addParam("RingBufferName", m_rbufname, "Name of RingBuffer",
43 string("InputRbuf"));
44 addParam("CompressionLevel", m_compressionLevel, "Compression level",
45 0);
46 addParam("NumThreads", m_numThread, "Number of threads for object decoding",
47 1);
48
49 m_rbuf = NULL;
50 m_nrecv = 0;
51 // m_compressionLevel = 0;
52
53 s_input = this;
54
55 //Parameter definition
56 B2INFO("Rx: Constructor done.");
57}
58
59
60FastRbuf2DsModule::~FastRbuf2DsModule()
61{
62}
63
65{
66 gSystem->Load("libdataobjects");
67
68 // m_rbuf = new RingBuffer(m_rbufname.c_str(), RBUFSIZE);
69 m_rbuf = new RingBuffer(m_rbufname.c_str());
70
71 // Initialize DataStoreStreamer, use Instance to use threads
72 // m_streamer = &(DataStoreStreamer::Instance());
74
75 // Read the first event in FastSeqRoot file and restore in DataStore.
76 // This is necessary to create object tables before TTree initialization
77 // if used together with TTree based output (RootOutput module).
78 EvtMessage* evtmsg = NULL;
79
80 // Prefetch the first record in Ring Buffer
81 int size;
82 char* evtbuf = new char[MAXEVTSIZE];
83 while ((size = m_rbuf->remq((int*)evtbuf)) == 0) usleep(20);
84
85 // Read 1st event in DataStore
86 if (size > 0) {
87 evtmsg = new EvtMessage(evtbuf);
89 } else {
90 B2FATAL("SeqRootInput : Error in reading first event");
91 }
92 delete evtmsg;
93 delete[] evtbuf;
94
95 // Create decoder threads
96 pthread_attr_t thread_attr;
97 pthread_attr_init(&thread_attr);
98 // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
99 // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
100 // pthread_t thr_input;
101 pthread_create(&m_thr_input, NULL, RunRbufReader, NULL);
102
103
104 /*
105 for (;;) {
106 if ( m_streamer->getDecoderStatus() == 0 ) break;
107 while ((size = m_rbuf->remq((int*)evtbuf)) == 0) usleep ( 20 );
108 if (size > 0) {
109 m_streamer->queueEvtMessage(evtbuf);
110 } else {
111 B2FATAL("FastSeqRootInput : Error in reading first event");
112 }
113 }
114 */
115
116 // delete[] evtbuf;
117
118 m_nrecv = -1;
119
120 B2INFO("Rx initialized.");
121}
122
124{
125 printf("ReadFileInThread started!!\n");
126 int rf_nevt [[maybe_unused]] = 0;
127 for (;;) {
128 char* evtbuf = new char[EvtMessage::c_MaxEventSize];
129 int size;
130 while ((size = m_rbuf->remq((int*)evtbuf)) == 0) usleep(20);
131 if (size == 0) { // TODO this condition is always false
132 printf("ReadRbufInThread : ERROR! record with size=0 detected!!!!!\n");
134 delete[] evtbuf;
135 return;
136 } else if (size > 0) {
138 } else {
139 B2FATAL("FastRbuf2Ds : Error in reading first event");
140 }
141 rf_nevt++;
142 // if ( rf_nevt%1000 == 0 ) printf ( "ReadRbufInThread : %d events\n", rf_nevt );
143 }
144}
145
146
148{
149 B2INFO("beginRun called.");
150}
151
152
154{
155 m_nrecv++;
156 // First event is already loaded
157 if (m_nrecv == 0) return;
158
159 // Restore DataStore with objects in top of queue
161
162 B2INFO("FastRbuf2Ds: DataStore Restored!!");
163 return;
164 // return type;
165}
166
168{
169 //fill Run data
170
171 B2INFO("FastRbuf2Ds: endRun done.");
172}
173
174
176{
177 pthread_join(m_thr_input, NULL);
178 B2INFO("FastRbuf2Ds: terminate called");
179}
180
Stream/restore DataStore objects to/from EvtMessage.
int queueEvtMessage(char *msg)
Queue EvtMessage for destreaming.
int restoreDataStoreAsync()
Restore objects in DataStore from temporary buffer.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
static const unsigned int c_MaxEventSize
maximal EvtMessage size, in bytes (200MB).
Definition: EvtMessage.h:63
A class definition of an input module for Sequential ROOT I/O.
DataStoreStreamer * m_streamer
DataStore streamer.
void initialize() override
Module functions to be called from main process.
void event() override
This method is the core of the module.
void endRun() override
This method is called if the current run ends.
pthread_t m_thr_input
Input thread ID.
void terminate() override
This method is called at the end of the event processing.
void beginRun() override
Module functions to be called from event process.
int m_nrecv
No. of sent events.
int m_compressionLevel
Compression Level.
std::string m_rbufname
RingBuffer ID.
FastRbuf2DsModule()
Constructor / Destructor.
void ReadRbufInThread()
Function to read event from RB.
int m_numThread
Number of decoder threads.
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int remq(int *buf)
Pick up a buffer from the RingBuffer.
Definition: RingBuffer.cc:308
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.