Belle II Software development
GetEventFromSocket.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/expreco/modules/GetEventFromSocket.h>
10
11#include <TSystem.h>
12
13#include <stdlib.h>
14#include <sys/time.h>
15#include <sys/types.h>
16#include <sys/socket.h>
17#include <netinet/in.h>
18#include <unistd.h>
19
20//#define MAXEVTSIZE 400000000
21
22#define TIME_WAIT 10000
23
24using namespace std;
25using namespace Belle2;
26
27//-----------------------------------------------------------------
28// Register the Module
29//-----------------------------------------------------------------
30REG_MODULE(GetEventFromSocket);
31
32//-----------------------------------------------------------------
33// Implementation
34//-----------------------------------------------------------------
35
37{
38 //Set module properties
39 setDescription("Get Event from EvtSocket");
40 // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
41
42 vector<string> emptyhosts;
43 addParam("Hosts", m_hosts, "GetEventFromSocket hosts", emptyhosts);
44 vector<int> emptysocks;
45 addParam("Ports", m_ports, "GetEventFromSocket Ports", emptysocks);
46 m_nrecv = 0;
48
49 //Parameter definition
50 B2INFO("Rx: Constructor done.");
51}
52
53
54GetEventFromSocketModule::~GetEventFromSocketModule()
55{
56}
57
59{
60 // Check event sources are not empty
61 if (m_hosts.size() == 0) {
62 B2FATAL("No hosts specified");
63 }
64
65 // Load data objects definitions
66 gSystem->Load("libdataobjects");
67
68 // Open receiving socekt
69 for (int i = 0; i < (int)m_hosts.size(); i++) {
70 // printf("Connecting to %s (port %d)\n", m_hosts[i].c_str(), m_ports[i]);
71 B2INFO("Connecting to " << m_hosts[i] << "(port " << m_ports[i] << ")");
72 EvtSocketSend* evtsock = new EvtSocketSend(m_hosts[i].c_str(), m_ports[i]);
73 m_socks.push_back(evtsock);
74 }
75 printf("All hosts connected\n");
76
77 // Initialize DataStoreStreamer
79
80 // Prefetch first record in GetEventFromSocket
81 EvtMessage* msg = receive();
82 if (msg == NULL) {
83 B2FATAL("Did not receive any data, stopping initialization.");
84 return;
85 }
87
88 // Delete buffers
89 delete msg;
90
91 m_nrecv = -1;
92
93 B2INFO("Rx initialized.");
94}
95
96EvtMessage* GetEventFromSocketModule::receive()
97{
98 // printf("GetEventFromSocket:receive here!\n");
99 B2DEBUG(1001, "GetEventFromSocket:receive here!");
100
101 fd_set fds;
102 fd_set readfds;
103 FD_ZERO(&readfds);
104 int maxfd = 0;
105 for (int i = 0; i < (int)m_socks.size(); i++) {
106 int sockfd = (m_socks[i]->sock())->sock();
107 FD_SET(sockfd, &readfds);
108 if (sockfd > maxfd) maxfd = sockfd;
109 }
110
111 while (1) {
112 // Look at socket
113 memcpy(&fds, &readfds, sizeof(fd_set));
114 // select() with blocking until data is ready on socket
115 int selstat = select(maxfd + 1, &fds, NULL, NULL, NULL);
116 // Check data on the socket or not
117 for (int i = 0; i < (int)m_socks.size(); i++) {
118 int sockfd = (m_socks[i]->sock())->sock();
119 if (FD_ISSET(sockfd, &fds)) {
120 EvtMessage* msg = m_socks[i]->recv();
121 return msg;
122 }
123 }
124 // printf("Select loop : should not come here. selstat = %d\n", selstat);
125 B2ERROR("Select loop : should not come here. selstat = " << selstat);
126 usleep(TIME_WAIT);
127 }
128}
129
131{
132 B2INFO("beginRun called.");
133}
134
135
137{
138 m_nrecv++;
139 // First event is already loaded
140 if (m_nrecv == 0) return;
141
142 // Get a record from socket
143 EvtMessage* msg = receive();
144 B2INFO("Rx: got an event from Socket, size=" << msg->size());
145 // Check for termination record
146 if (msg->type() == MSG_TERMINATE) {
147 B2INFO("Rx: got termination message. Exitting....");
148 return;
149 // Flag End Of File !!!!!
150 // return msg->type(); // EOF
151 }
152
153 // Restore DataStore
155 B2INFO("Rx: DataStore Restored!!");
156
157 // Delete EvtMessage
158 delete msg;
159
160 return;
161}
162
164{
165 //fill Run data
166
167 B2INFO("endRun done.");
168}
169
170
172{
173 delete m_streamer;
174 B2INFO("terminate called");
175}
176
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
virtual void event()
This method is the core of the module.
DataStoreStreamer * m_streamer
DataStoreStreamer.
virtual void initialize()
Module functions to be called from main process.
virtual void beginRun()
Module functions to be called from event process.
GetEventFromSocketModule()
Constructor / Destructor.
virtual void terminate()
This method is called at the end of the event processing.
virtual void endRun()
This method is called if the current run ends.
std::vector< EvtSocketSend * > m_socks
Reciever Socket.
int m_compressionLevel
Compression Level.
std::vector< std::string > m_hosts
Receiver Port.
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.