Belle II Software  release-08-01-10
Ds2RbufModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/rfarm/event/modules/Ds2RbufModule.h>
10 
11 #include <framework/datastore/DataStore.h>
12 
13 #include <stdlib.h>
14 
15 using namespace std;
16 using namespace Belle2;
17 
18 //-----------------------------------------------------------------
19 // Register the Module
20 //-----------------------------------------------------------------
21 REG_MODULE(Ds2Rbuf)
22 
23 //-----------------------------------------------------------------
24 // Implementation
25 //-----------------------------------------------------------------
26 
28 {
29  //Set module properties
30  setDescription("Encode DataStore into RingBuffer");
31 
32  vector<string> emptyvector;
33  addParam("RingBufferName", m_rbufname, "Name of RingBuffer",
34  string("OutputRbuf"));
35  addParam("CompressionLevel", m_compressionLevel, "Compression level",
36  0);
37  addParam("saveObjs", m_saveobjs, "List of objects to be sent", emptyvector);
38 
39  m_rbuf = NULL;
40  m_nsent = 0;
41  m_compressionLevel = 0;
42 
43  //Parameter definition
44  B2INFO("Ds2Rbuf: Constructor done.");
45 }
46 
47 
48 Ds2RbufModule::~Ds2RbufModule()
49 {
50 }
51 
52 void Ds2RbufModule::initialize()
53 {
54 
55  // m_rbuf = new RingBuffer(m_rbufname.c_str(), RBUFSIZE);
56  m_rbuf = new RingBuffer(m_rbufname);
57  m_streamer = new DataStoreStreamer(m_compressionLevel);
58  m_streamer->setStreamingObjects(m_saveobjs);
59 
60  B2INFO("Ds2Rbuf initialized.");
61 }
62 
63 
64 void Ds2RbufModule::beginRun()
65 {
66  B2INFO("Ds2Rbuf: beginRun called.");
67 }
68 
69 
70 void Ds2RbufModule::event()
71 {
72  // Stream DataStore in EvtMessage
73  EvtMessage* msg = m_streamer->streamDataStore(DataStore::c_Event);
74  // EvtMessage* msg = m_streamer->streamDataStore(DataStore::c_Event, false, true);
75 
76  B2INFO("Ds2Rbuf: msgsize = " << msg->size());
77 
78  // printf("message size = %d\n", msg->size());
79  // Put the message in ring buffer
80  for (;;) {
81  int stat = m_rbuf->insq((int*)msg->buffer(), msg->paddedSize());
82  if (stat >= 0) break;
83  usleep(100);
84  // usleep(20);
85  }
86 
87  B2INFO("Ds2Rbuf: objs sent in buffer. Size = " << msg->size());
88 
89  // Release EvtMessage buffer
90  delete msg;
91 
92  // return
93  m_nsent++;
94 
95 }
96 
97 void Ds2RbufModule::endRun()
98 {
99  //fill Run data
100 
101  printf("Ds2Rbuf: endRun called.....\n");
102  B2INFO("Ds2Rbuf: endRun done.");
103 }
104 
105 
106 void Ds2RbufModule::terminate()
107 {
108  delete m_streamer;
109 
110  // RingBuffer should not be deleted
111 
112  B2INFO("Ds2Rbuf: terminate called");
113 }
114 
Stream/restore DataStore objects to/from EvtMessage.
A class definition of an input module for Sequential ROOT I/O.
Definition: Ds2RbufModule.h:29
Class to manage streamed object.
Definition: EvtMessage.h:59
int paddedSize() const
Same as size(), but as size of an integer array.
Definition: EvtMessage.cc:99
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.