Belle II Software development
ReceiveEvent.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/expreco/modules/ReceiveEvent.h>
10
11#include <TSystem.h>
12
13#include <stdlib.h>
14
15//#define MAXEVTSIZE 400000000
16
17using namespace std;
18using namespace Belle2;
19
20//-----------------------------------------------------------------
21// Register the Module
22//-----------------------------------------------------------------
23REG_MODULE(ReceiveEvent);
24
25//-----------------------------------------------------------------
26// Implementation
27//-----------------------------------------------------------------
28
30{
31 //Set module properties
32 setDescription("Receive Event from ExpReco");
33 // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
34
35 addParam("Host", m_host, "Receiver host", string("expreco"));
36 addParam("Port", m_port, "Receiver Port", 7111);
37 m_nrecv = 0;
39
40 //Parameter definition
41 B2INFO("Rx: Constructor done.");
42}
43
44
45ReceiveEventModule::~ReceiveEventModule()
46{
47}
48
50{
51 // Load data objects definitions
52 gSystem->Load("libdataobjects");
53
54 // Open receiving socekt
55 m_recv = new EvtSocketSend(m_host.c_str(), m_port);
56
57 // Initialize DataStoreStreamer
59
60 // Prefetch first record in ReceiveEvent
61 EvtMessage* msg = m_recv->recv();
62 if (msg == NULL) {
63 B2FATAL("Did not receive any data, stopping initialization.");
64 return;
65 }
67
68 // Delete buffers
69 delete msg;
70
71 m_nrecv = -1;
72
73 B2INFO("Rx initialized.");
74}
75
76
78{
79 B2INFO("beginRun called.");
80}
81
82
84{
85 m_nrecv++;
86 // First event is already loaded
87 if (m_nrecv == 0) return;
88
89 // Get a record from socket
90 EvtMessage* msg = m_recv->recv();
91 if (msg == NULL) {
92 printf("Connection is closed. Reconnecting.\n");
93 int nrepeat = 5000;
94 for (;;) {
95 int rstat = (m_recv->sock())->reconnect(nrepeat);
96 if (rstat == - 1)
97 continue;
98 else
99 break;
100 }
101 // return;
102 }
103 B2INFO("Rx: got an event from Socket, size=" << msg->size());
104 // Check for termination record
105 if (msg->type() == MSG_TERMINATE) {
106 B2INFO("Rx: got termination message. Exitting....");
107 return;
108 // Flag End Of File !!!!!
109 // return msg->type(); // EOF
110 }
111
112 // Restore DataStore
114 B2INFO("Rx: DataStore Restored!!");
115
116 // Delete EvtMessage
117 delete msg;
118
119 return;
120}
121
123{
124 //fill Run data
125
126 B2INFO("endRun done.");
127}
128
129
131{
132 B2INFO("terminate called");
133}
134
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
DataStoreStreamer * m_streamer
DataStoreStreamer.
Definition: ReceiveEvent.h:56
void initialize() override
Module functions to be called from main process.
Definition: ReceiveEvent.cc:49
void event() override
This method is the core of the module.
Definition: ReceiveEvent.cc:83
void endRun() override
This method is called if the current run ends.
void terminate() override
This method is called at the end of the event processing.
EvtSocketSend * m_recv
Reciever Socket.
Definition: ReceiveEvent.h:53
std::string m_host
Receiver Port.
Definition: ReceiveEvent.h:49
void beginRun() override
Module functions to be called from event process.
Definition: ReceiveEvent.cc:77
ReceiveEventModule()
Constructor / Destructor.
Definition: ReceiveEvent.cc:29
int m_nrecv
No. of sent events.
Definition: ReceiveEvent.h:62
int m_compressionLevel
Compression Level.
Definition: ReceiveEvent.h:59
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.