Belle II Software  release-06-02-00
Rbuf2DsModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/rfarm/event/modules/Rbuf2DsModule.h>
10 #include <TSystem.h>
11 #include <stdlib.h>
12 #include <signal.h>
13 
14 #include "framework/core/Environment.h"
15 
16 //extern int basf2SignalReceived;
17 
18 
19 namespace {
20 // Signal Handler
21  static int signalled = 0;
22  static void signalHandler(int sig)
23  {
24  signalled = sig;
25  printf("Rbuf2Ds : Signal received\n");
26  }
27 }
28 
29 using namespace std;
30 using namespace Belle2;
31 
32 
33 //-----------------------------------------------------------------
34 // Register the Module
35 //-----------------------------------------------------------------
36 REG_MODULE(Rbuf2Ds)
37 
38 //-----------------------------------------------------------------
39 // Implementation
40 //-----------------------------------------------------------------
41 
43 {
44  //Set module properties
45  setDescription("Encode DataStore into RingBuffer");
46  // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
47 
48  addParam("RingBufferName", m_rbufname, "Name of RingBuffer",
49  string("InputRbuf"));
50  addParam("CompressionLevel", m_compressionLevel, "Compression level",
51  0);
52 
53  m_rbuf = NULL;
54  m_nrecv = 0;
55  m_compressionLevel = 0;
56 
57 
58  //Parameter definition
59  B2INFO("Rx: Constructor done.");
60 }
61 
62 
63 Rbuf2DsModule::~Rbuf2DsModule()
64 {
65 }
66 
67 void Rbuf2DsModule::initialize()
68 {
69  gSystem->Load("libdataobjects");
70 
71  // m_rbuf = new RingBuffer(m_rbufname.c_str(), RBUFSIZE);
72  m_rbuf = new RingBuffer(m_rbufname.c_str());
73 
74  // Initialize DataStoreStreamer
75  m_streamer = new DataStoreStreamer(m_compressionLevel);
76 
77 
78  // Read the first event in RingBuffer and restore in DataStore.
79  // This is necessary to create object tables before TTree initialization
80  // if used together with SimpleOutput.
81 
82  // Prefetch the first record in Ring Buffer
83  int size;
84  char* evtbuf = new char[MAXEVTSIZE];
85  while ((size = m_rbuf->remq((int*)evtbuf)) == 0) {
86  // printf ( "Rx : evtbuf is not available yet....\n" );
87  // usleep(100);
88  if (signalled != 0) break;
89  usleep(20);
90  }
91 
92  // Restore objects in DataStore
93  EvtMessage* evtmsg = new EvtMessage(evtbuf);
94  m_streamer->restoreDataStore(evtmsg);
95 
96  // Delete buffers
97  delete evtmsg;
98  delete[] evtbuf;
99 
100  m_nrecv = -1;
101 
102  B2INFO("Rx initialized.");
103 }
104 
105 
106 void Rbuf2DsModule::beginRun()
107 {
108  if (Environment::Instance().getNumberProcesses() != 0) {
109  struct sigaction s;
110  memset(&s, '\0', sizeof(s));
111  s.sa_handler = signalHandler;
112  sigemptyset(&s.sa_mask);
113  if (sigaction(SIGINT, &s, NULL) != 0) {
114  B2FATAL("Rbuf2Ds: Error to connect signal handler");
115  }
116  printf("Ds2Rbuf : Signal Handler installed.\n");
117  }
118  B2INFO("beginRun called.");
119 }
120 
121 
122 void Rbuf2DsModule::event()
123 {
124  m_nrecv++;
125  // First event is already loaded
126  if (m_nrecv == 0) return;
127 
128  // Get a record from ringbuf
129  int size;
130 
131  char* evtbuf = new char[MAXEVTSIZE];
132  while ((size = m_rbuf->remq((int*)evtbuf)) == 0) {
133  // printf ( "Signal Status = %d\n", globalSignalReceived );
134  if (signalled != 0) break;
135  usleep(100);
136  // usleep(20);
137  }
138 
139  B2INFO("Rbuf2Ds: got an event from RingBuffer, size=" << size <<
140  " (proc= " << (int)getpid() << ")");
141 
142  // Build EvtMessage and decompose it
143  vector<TObject*> objlist;
144  vector<string> namelist;
145  EvtMessage* msg = new EvtMessage(evtbuf); // Have EvtMessage by ptr cpy
146  if (msg->type() == MSG_TERMINATE) {
147  B2INFO("Rx: got termination message. Exitting....");
148  return;
149  // Flag End Of File !!!!!
150  // return msg->type(); // EOF
151  }
152 
153  // Restore DataStore
154  // Restore DataStore
155  m_streamer->restoreDataStore(msg);
156 
157  /*
158  // Dummy event header for debugging
159  // Event Meta Data
160  StoreObjPtr<EventMetaData> evtm;
161  evtm.create();
162  evtm->setExperiment(1);
163  evtm->setRun(1);
164  evtm->setEvent(m_nrecv);
165  */
166 
167 
168  delete[] evtbuf;
169  delete msg;
170 
171  B2INFO("Rbuf2Ds: DataStore Restored!!");
172  return;
173  // return type;
174 }
175 
176 void Rbuf2DsModule::endRun()
177 {
178  //fill Run data
179 
180  B2INFO("Rbuf2Ds: endRun done.");
181 }
182 
183 
184 void Rbuf2DsModule::terminate()
185 {
186  B2INFO("Rbuf2Ds: terminate called");
187 }
188 
Stream/restore DataStore objects to/from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
Base class for Modules.
Definition: Module.h:72
A class definition of an input module for Sequential ROOT I/O.
Definition: Rbuf2DsModule.h:31
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.