Belle II Software  release-08-01-10
RxSocketModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/dataflow/modules/RxSocketModule.h>
10 
11 #include <TSystem.h>
12 
13 #include <stdlib.h>
14 
15 //#define MAXEVTSIZE 400000000
16 
17 using namespace std;
18 using namespace Belle2;
19 
20 //-----------------------------------------------------------------
21 // Register the Module
22 //-----------------------------------------------------------------
23 REG_MODULE(RxSocket)
24 
25 //-----------------------------------------------------------------
26 // Implementation
27 //-----------------------------------------------------------------
28 
30 {
31  //Set module properties
32  setDescription("Encode DataStore into RingBuffer");
33  // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
34 
35  addParam("Port", m_port, "Receiver Port", 1111);
36  m_nrecv = 0;
37  m_compressionLevel = 0;
38 
39  //Parameter definition
40  B2INFO("Rx: Constructor done.");
41 }
42 
43 
44 RxSocketModule::~RxSocketModule()
45 {
46 }
47 
48 void RxSocketModule::initialize()
49 {
50  // Load data objects definitions
51  gSystem->Load("libdataobjects");
52 
53  // Open receiving socekt
54  m_recv = new EvtSocketRecv(m_port);
55 
56  // Initialize DataStoreStreamer
57  m_streamer = new DataStoreStreamer(m_compressionLevel);
58 
59  // Prefetch first record in RxSocket
60  EvtMessage* msg = m_recv->recv();
61  if (msg == NULL) {
62  return;
63  }
64  m_streamer->restoreDataStore(msg);
65 
66  // Delete buffers
67  delete msg;
68 
69  m_nrecv = -1;
70 
71  B2INFO("Rx initialized.");
72 }
73 
74 
75 void RxSocketModule::beginRun()
76 {
77  B2INFO("beginRun called.");
78 }
79 
80 
81 void RxSocketModule::event()
82 {
83  m_nrecv++;
84  // First event is already loaded
85  if (m_nrecv == 0) return;
86 
87  // Get a record from socket
88  EvtMessage* msg = m_recv->recv();
89  if (msg == NULL) {
90  return;
91  }
92  B2INFO("Rx: got an event from Socket, size=" << msg->size());
93  // Check for termination record
94  if (msg->type() == MSG_TERMINATE) {
95  B2INFO("Rx: got termination message. Exitting....");
96  return;
97  // Flag End Of File !!!!!
98  // return msg->type(); // EOF
99  }
100 
101  // Restore DataStore
102  m_streamer->restoreDataStore(msg);
103  B2INFO("Rx: DataStore Restored!!");
104 
105  // Delete EvtMessage
106  delete msg;
107 
108  return;
109 }
110 
111 void RxSocketModule::endRun()
112 {
113  //fill Run data
114 
115  B2INFO("endRun done.");
116 }
117 
118 
119 void RxSocketModule::terminate()
120 {
121  B2INFO("terminate called");
122 }
123 
Stream/restore DataStore objects to/from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
A class definition of an input module for Sequential ROOT I/O.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.