Belle II Software  release-08-01-10
ReceiveEvent.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/expreco/modules/ReceiveEvent.h>
10 
11 #include <TSystem.h>
12 
13 #include <stdlib.h>
14 
15 //#define MAXEVTSIZE 400000000
16 
17 using namespace std;
18 using namespace Belle2;
19 
20 //-----------------------------------------------------------------
21 // Register the Module
22 //-----------------------------------------------------------------
23 REG_MODULE(ReceiveEvent)
24 
25 //-----------------------------------------------------------------
26 // Implementation
27 //-----------------------------------------------------------------
28 
30 {
31  //Set module properties
32  setDescription("Receive Event from ExpReco");
33  // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
34 
35  addParam("Host", m_host, "Receiver host", string("expreco"));
36  addParam("Port", m_port, "Receiver Port", 7111);
37  m_nrecv = 0;
38  m_compressionLevel = 0;
39 
40  //Parameter definition
41  B2INFO("Rx: Constructor done.");
42 }
43 
44 
45 ReceiveEventModule::~ReceiveEventModule()
46 {
47 }
48 
49 void ReceiveEventModule::initialize()
50 {
51  // Load data objects definitions
52  gSystem->Load("libdataobjects");
53 
54  // Open receiving socekt
55  m_recv = new EvtSocketSend(m_host.c_str(), m_port);
56 
57  // Initialize DataStoreStreamer
58  m_streamer = new DataStoreStreamer(m_compressionLevel);
59 
60  // Prefetch first record in ReceiveEvent
61  EvtMessage* msg = m_recv->recv();
62  if (msg == NULL) {
63  B2FATAL("Did not receive any data, stopping initialization.");
64  return;
65  }
66  m_streamer->restoreDataStore(msg);
67 
68  // Delete buffers
69  delete msg;
70 
71  m_nrecv = -1;
72 
73  B2INFO("Rx initialized.");
74 }
75 
76 
77 void ReceiveEventModule::beginRun()
78 {
79  B2INFO("beginRun called.");
80 }
81 
82 
83 void ReceiveEventModule::event()
84 {
85  m_nrecv++;
86  // First event is already loaded
87  if (m_nrecv == 0) return;
88 
89  // Get a record from socket
90  EvtMessage* msg = m_recv->recv();
91  if (msg == NULL) {
92  printf("Connection is closed. Reconnecting.\n");
93  int nrepeat = 5000;
94  for (;;) {
95  int rstat = (m_recv->sock())->reconnect(nrepeat);
96  if (rstat == - 1)
97  continue;
98  else
99  break;
100  }
101  // return;
102  }
103  B2INFO("Rx: got an event from Socket, size=" << msg->size());
104  // Check for termination record
105  if (msg->type() == MSG_TERMINATE) {
106  B2INFO("Rx: got termination message. Exitting....");
107  return;
108  // Flag End Of File !!!!!
109  // return msg->type(); // EOF
110  }
111 
112  // Restore DataStore
113  m_streamer->restoreDataStore(msg);
114  B2INFO("Rx: DataStore Restored!!");
115 
116  // Delete EvtMessage
117  delete msg;
118 
119  return;
120 }
121 
122 void ReceiveEventModule::endRun()
123 {
124  //fill Run data
125 
126  B2INFO("endRun done.");
127 }
128 
129 
130 void ReceiveEventModule::terminate()
131 {
132  B2INFO("terminate called");
133 }
134 
Stream/restore DataStore objects to/from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
A class definition of an input module for Sequential ROOT I/O.
Definition: ReceiveEvent.h:27
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.