Belle II Software  release-08-01-10
Ds2RawModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/rfarm/event/modules/Ds2RawModule.h>
10 #include <framework/datastore/StoreObjPtr.h>
11 #include <framework/dataobjects/EventMetaData.h>
12 #include <daq/dataobjects/SendHeader.h>
13 #include <daq/dataobjects/SendTrailer.h>
14 
15 #include <stdlib.h>
16 
17 using namespace std;
18 using namespace Belle2;
19 
21 // static int fdout;
22 
23 //-----------------------------------------------------------------
24 // Register the Module
25 //-----------------------------------------------------------------
26 REG_MODULE(Ds2Raw)
27 
28 //-----------------------------------------------------------------
29 // Implementation
30 //-----------------------------------------------------------------
31 
33 {
34  //Set module properties
35  setDescription("Encode DataStore into RingBuffer");
36 
37  addParam("RingBufferName", m_rbufname, "Name of RingBuffer",
38  string("OutputRbuf"));
39  addParam("CompressionLevel", m_compressionLevel, "Compression level",
40  0);
41 
42  m_rbuf = NULL;
43  m_nsent = 0;
44  m_compressionLevel = 0;
45 
46  //Parameter definition
47  B2INFO("Ds2Raw: Constructor done.");
48 }
49 
50 
51 Ds2RawModule::~Ds2RawModule()
52 {
53 }
54 
55 void Ds2RawModule::initialize()
56 {
57 
58  // m_rbuf = new RingBuffer(m_rbufname.c_str(), RBUFSIZE);
59  m_rbuf = new RingBuffer(m_rbufname.c_str());
60  m_streamer = new DataStoreStreamer(m_compressionLevel);
61 
64 
65  B2INFO("Ds2Raw initialized.");
66 }
67 
68 
69 void Ds2RawModule::beginRun()
70 {
71  B2INFO("Ds2Raw: beginRun called.");
72 }
73 
74 
75 void Ds2RawModule::event()
76 {
77  // Stream DataStore in EvtMessage
78  EvtMessage* msg = m_streamer->streamDataStore(DataStore::c_Event);
79 
80  // Event Meta Data
82 
83  // Fill Header and Trailer
84  SendHeader hdr;
85  SendTrailer trl;
86 
87  // Number of total words
88  int msgsize = (msg->size() - 1) / 4 + 1;
89  int total_nwrds = msgsize + hdr.GetHdrNwords() + trl.GetTrlNwords();
90 
91  // Fill header and trailer
92  hdr.SetNwords(total_nwrds);
93  hdr.SetNumEventsinPacket(1);
94  hdr.SetNumNodesinPacket(1);
95  hdr.SetEventNumber(evtmeta->getEvent());
96  // hdr.SetExpRunWord(evtmeta->getRun());
97  hdr.SetSubRunNum(evtmeta->getSubrun()); // modified on Apr. 20, 2016 by SY
98  hdr.SetRunNum(evtmeta->getRun());
99  hdr.SetExpNum(evtmeta->getExperiment());
100  hdr.SetNodeID(HLT_SUBSYS_ID);
101  // trl.SetMagicWord();
102 
103  // Allocate raw buffer
104  int* buffer = new int[total_nwrds];
105 
106  // Fill header
107  memcpy(buffer, hdr.GetBuffer(), hdr.GetHdrNwords()*sizeof(int));
108 
109  // Fill EvtMessage
110  memcpy(buffer + hdr.GetHdrNwords(), msg->buffer(), msg->size());
111 
112  // Fill trailer
113  memcpy(buffer + hdr.GetHdrNwords() + msgsize, trl.GetBuffer(),
114  trl.GetTrlNwords()*sizeof(int));
115  // printf ( "trailer = %8.8x, %8.8x\n", *(buffer+hdr.GetHdrNwords()+msgsize),
116  // *(buffer+hdr.GetHdrNwords()+msgsize+1) );
117 
118  // Put the raw buffer in ring buffer
119  for (;;) {
120  int stat = m_rbuf->insq(buffer, total_nwrds);
121  if (stat >= 0) break;
122  // usleep(200);
123  usleep(20);
124  }
125 
128 
129  B2INFO("Ds2Raw: objs sent in buffer. Size = " << msg->size());
130 
131  // Release EvtMessage buffer
132  delete[] buffer;
133  delete msg;
134 
135  // return
136  m_nsent++;
137 
138 }
139 
140 void Ds2RawModule::endRun()
141 {
142  //fill Run data
143 
144  printf("Ds2Raw: endRun called.....\n");
145  B2INFO("Ds2Raw: endRun done.");
146 }
147 
148 
149 void Ds2RawModule::terminate()
150 {
151  delete m_streamer;
152 
153  // RingBuffer should not be deleted
154 
155  B2INFO("Ds2Raw: terminate called");
156 }
157 
Stream/restore DataStore objects to/from EvtMessage.
A class definition of an input module for Sequential ROOT I/O.
Definition: Ds2RawModule.h:30
Class to manage streamed object.
Definition: EvtMessage.h:59
char * buffer()
Get buffer address.
Definition: EvtMessage.cc:76
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
void SetNumEventsinPacket(int num_events)
set contents of Header
Definition: SendHeader.cc:61
int GetHdrNwords()
get contents of Header
Definition: SendHeader.cc:124
void SetNwords(int total_data_nwords)
initialize Header
Definition: SendHeader.cc:51
int * GetBuffer(void)
Get Header contents.
Definition: SendHeader.cc:32
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.