Belle II Software  release-08-01-10
Rbuf2DsModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/rfarm/event/modules/Rbuf2DsModule.h>
10 #include <TSystem.h>
11 #include <stdlib.h>
12 #include <signal.h>
13 
14 #include "framework/core/Environment.h"
15 
16 //extern int basf2SignalReceived;
17 
18 
19 namespace {
20 // Signal Handler
21  static int signalled = 0;
22  static void signalHandler(int sig)
23  {
24  signalled = sig;
25  printf("Rbuf2Ds : Signal received\n");
26  }
27 }
28 
29 using namespace std;
30 using namespace Belle2;
31 
32 
33 //-----------------------------------------------------------------
34 // Register the Module
35 //-----------------------------------------------------------------
36 REG_MODULE(Rbuf2Ds)
37 
38 //-----------------------------------------------------------------
39 // Implementation
40 //-----------------------------------------------------------------
41 
43 {
44  //Set module properties
45  setDescription("Encode DataStore into RingBuffer");
46  // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
47 
48  addParam("RingBufferName", m_rbufname, "Name of RingBuffer",
49  string("InputRbuf"));
50  addParam("CompressionLevel", m_compressionLevel, "Compression level",
51  0);
52 
53  m_rbuf = NULL;
54  m_nrecv = 0;
55  m_compressionLevel = 0;
56 
57 
58  //Parameter definition
59  B2INFO("Rx: Constructor done.");
60 }
61 
62 
63 Rbuf2DsModule::~Rbuf2DsModule()
64 {
65 }
66 
67 void Rbuf2DsModule::initialize()
68 {
69  gSystem->Load("libdataobjects");
70 
71  // m_rbuf = new RingBuffer(m_rbufname.c_str(), RBUFSIZE);
72  m_rbuf = new RingBuffer(m_rbufname.c_str());
73 
74  // Initialize DataStoreStreamer
75  m_streamer = new DataStoreStreamer(m_compressionLevel);
76 
77 
78  // Read the first event in RingBuffer and restore in DataStore.
79  // This is necessary to create object tables before TTree initialization
80  // if used together with SimpleOutput.
81 
82  // Prefetch the first record in Ring Buffer
83  int size;
84  char* evtbuf = new char[MAXEVTSIZE];
85  while ((size = m_rbuf->remq((int*)evtbuf)) == 0) {
86  // printf ( "Rx : evtbuf is not available yet....\n" );
87  // usleep(100);
88  if (signalled != 0) break;
89  usleep(20);
90  }
91 
92  // Restore objects in DataStore
93  EvtMessage* evtmsg = new EvtMessage(evtbuf);
94  m_streamer->restoreDataStore(evtmsg);
95 
96  // Delete buffers
97  delete evtmsg;
98  delete[] evtbuf;
99 
100  m_nrecv = -1;
101 
102  B2INFO("Rx initialized.");
103 }
104 
105 
106 void Rbuf2DsModule::beginRun()
107 {
108  if (Environment::Instance().getNumberProcesses() != 0) {
109  struct sigaction s;
110  memset(&s, '\0', sizeof(s));
111  s.sa_handler = signalHandler;
112  sigemptyset(&s.sa_mask);
113  if (sigaction(SIGINT, &s, NULL) != 0) {
114  B2FATAL("Rbuf2Ds: Error to connect signal handler");
115  }
116  printf("Ds2Rbuf : Signal Handler installed.\n");
117  }
118  B2INFO("beginRun called.");
119 }
120 
121 
122 void Rbuf2DsModule::event()
123 {
124  m_nrecv++;
125  // First event is already loaded
126  if (m_nrecv == 0) return;
127 
128  // Get a record from ringbuf
129  int size;
130 
131  char* evtbuf = new char[MAXEVTSIZE];
132  while ((size = m_rbuf->remq((int*)evtbuf)) == 0) {
133  // printf ( "Signal Status = %d\n", globalSignalReceived );
134  if (signalled != 0) break;
135  usleep(100);
136  // usleep(20);
137  }
138 
139  B2INFO("Rbuf2Ds: got an event from RingBuffer, size=" << size <<
140  " (proc= " << (int)getpid() << ")");
141 
142  // Build EvtMessage and decompose it
143  EvtMessage* msg = new EvtMessage(evtbuf); // Have EvtMessage by ptr cpy
144  if (msg->type() == MSG_TERMINATE) {
145  B2INFO("Rx: got termination message. Exitting....");
146  return;
147  // Flag End Of File !!!!!
148  // return msg->type(); // EOF
149  }
150 
151  // Restore DataStore
152  // Restore DataStore
153  m_streamer->restoreDataStore(msg);
154 
155  /*
156  // Dummy event header for debugging
157  // Event Meta Data
158  StoreObjPtr<EventMetaData> evtm;
159  evtm.create();
160  evtm->setExperiment(1);
161  evtm->setRun(1);
162  evtm->setEvent(m_nrecv);
163  */
164 
165 
166  delete[] evtbuf;
167  delete msg;
168 
169  B2INFO("Rbuf2Ds: DataStore Restored!!");
170  return;
171  // return type;
172 }
173 
174 void Rbuf2DsModule::endRun()
175 {
176  //fill Run data
177 
178  B2INFO("Rbuf2Ds: endRun done.");
179 }
180 
181 
182 void Rbuf2DsModule::terminate()
183 {
184  B2INFO("Rbuf2Ds: terminate called");
185 }
186 
Stream/restore DataStore objects to/from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
Base class for Modules.
Definition: Module.h:72
A class definition of an input module for Sequential ROOT I/O.
Definition: Rbuf2DsModule.h:31
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.