Belle II Software  release-08-01-10
FastRbuf2DsModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/rfarm/event/modules/FastRbuf2DsModule.h>
10 #include <TSystem.h>
11 #include <stdlib.h>
12 
13 using namespace std;
14 using namespace Belle2;
15 
16 static FastRbuf2DsModule* s_input = NULL;
17 
18 //-----------------------------------------------------------------
19 // Rbuf-Read Thread Interface
20 //-----------------------------------------------------------------
21 void* RunRbufReader(void*)
22 {
23  s_input->ReadRbufInThread();
24  return NULL;
25 }
26 
27 //-----------------------------------------------------------------
28 // Register the Module
29 //-----------------------------------------------------------------
30 REG_MODULE(FastRbuf2Ds)
31 
32 //-----------------------------------------------------------------
33 // Implementation
34 //-----------------------------------------------------------------
35 
37 {
38  //Set module properties
39  setDescription("Encode DataStore into RingBuffer");
40  // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
41 
42  addParam("RingBufferName", m_rbufname, "Name of RingBuffer",
43  string("InputRbuf"));
44  addParam("CompressionLevel", m_compressionLevel, "Compression level",
45  0);
46  addParam("NumThreads", m_numThread, "Number of threads for object decoding",
47  1);
48 
49  m_rbuf = NULL;
50  m_nrecv = 0;
51  // m_compressionLevel = 0;
52 
53  s_input = this;
54 
55  //Parameter definition
56  B2INFO("Rx: Constructor done.");
57 }
58 
59 
60 FastRbuf2DsModule::~FastRbuf2DsModule()
61 {
62 }
63 
64 void FastRbuf2DsModule::initialize()
65 {
66  gSystem->Load("libdataobjects");
67 
68  // m_rbuf = new RingBuffer(m_rbufname.c_str(), RBUFSIZE);
69  m_rbuf = new RingBuffer(m_rbufname.c_str());
70 
71  // Initialize DataStoreStreamer, use Instance to use threads
72  // m_streamer = &(DataStoreStreamer::Instance());
73  m_streamer = new DataStoreStreamer(m_compressionLevel, true, m_numThread);
74 
75  // Read the first event in FastSeqRoot file and restore in DataStore.
76  // This is necessary to create object tables before TTree initialization
77  // if used together with TTree based output (RootOutput module).
78  EvtMessage* evtmsg = NULL;
79 
80  // Prefetch the first record in Ring Buffer
81  int size;
82  char* evtbuf = new char[MAXEVTSIZE];
83  while ((size = m_rbuf->remq((int*)evtbuf)) == 0) usleep(20);
84 
85  // Read 1st event in DataStore
86  if (size > 0) {
87  evtmsg = new EvtMessage(evtbuf);
88  m_streamer->restoreDataStore(evtmsg);
89  } else {
90  B2FATAL("SeqRootInput : Error in reading first event");
91  }
92  delete evtmsg;
93  delete[] evtbuf;
94 
95  // Create decoder threads
96  pthread_attr_t thread_attr;
97  pthread_attr_init(&thread_attr);
98  // pthread_attr_setschedpolicy(&thread_attr , SCHED_FIFO);
99  // pthread_attr_setdetachstate(&thread_attr , PTHREAD_CREATE_DETACHED);
100  // pthread_t thr_input;
101  pthread_create(&m_thr_input, NULL, RunRbufReader, NULL);
102 
103 
104  /*
105  for (;;) {
106  if ( m_streamer->getDecoderStatus() == 0 ) break;
107  while ((size = m_rbuf->remq((int*)evtbuf)) == 0) usleep ( 20 );
108  if (size > 0) {
109  m_streamer->queueEvtMessage(evtbuf);
110  } else {
111  B2FATAL("FastSeqRootInput : Error in reading first event");
112  }
113  }
114  */
115 
116  // delete[] evtbuf;
117 
118  m_nrecv = -1;
119 
120  B2INFO("Rx initialized.");
121 }
122 
123 void FastRbuf2DsModule::ReadRbufInThread()
124 {
125  printf("ReadFileInThread started!!\n");
126  int rf_nevt = 0;
127  for (;;) {
128  char* evtbuf = new char[EvtMessage::c_MaxEventSize];
129  int size;
130  while ((size = m_rbuf->remq((int*)evtbuf)) == 0) usleep(20);
131  if (size == 0) {
132  printf("ReadRbufInThread : ERROR! record with size=0 detected!!!!!\n");
133  m_streamer->queueEvtMessage(NULL);
134  delete[] evtbuf;
135  return;
136  } else if (size > 0) {
137  m_streamer->queueEvtMessage(evtbuf);
138  } else {
139  B2FATAL("FastRbuf2Ds : Error in reading first event");
140  }
141  rf_nevt++;
142  // if ( rf_nevt%1000 == 0 ) printf ( "ReadRbufInThread : %d events\n", rf_nevt );
143  }
144 }
145 
146 
147 void FastRbuf2DsModule::beginRun()
148 {
149  B2INFO("beginRun called.");
150 }
151 
152 
153 void FastRbuf2DsModule::event()
154 {
155  m_nrecv++;
156  // First event is already loaded
157  if (m_nrecv == 0) return;
158 
159  // Restore DataStore with objects in top of queue
160  m_streamer->restoreDataStoreAsync();
161 
162  B2INFO("FastRbuf2Ds: DataStore Restored!!");
163  return;
164  // return type;
165 }
166 
167 void FastRbuf2DsModule::endRun()
168 {
169  //fill Run data
170 
171  B2INFO("FastRbuf2Ds: endRun done.");
172 }
173 
174 
175 void FastRbuf2DsModule::terminate()
176 {
177  pthread_join(m_thr_input, NULL);
178  B2INFO("FastRbuf2Ds: terminate called");
179 }
180 
Stream/restore DataStore objects to/from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
A class definition of an input module for Sequential ROOT I/O.
void ReadRbufInThread()
Function to read event from RB.
Base class for Modules.
Definition: Module.h:72
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.