Belle II Software  release-08-01-10
GetEventFromSocket.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/expreco/modules/GetEventFromSocket.h>
10 
11 #include <TSystem.h>
12 
13 #include <stdlib.h>
14 #include <sys/time.h>
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <unistd.h>
19 
20 //#define MAXEVTSIZE 400000000
21 
22 #define TIME_WAIT 10000
23 
24 using namespace std;
25 using namespace Belle2;
26 
27 //-----------------------------------------------------------------
28 // Register the Module
29 //-----------------------------------------------------------------
30 REG_MODULE(GetEventFromSocket)
31 
32 //-----------------------------------------------------------------
33 // Implementation
34 //-----------------------------------------------------------------
35 
37 {
38  //Set module properties
39  setDescription("Get Event from EvtSocket");
40  // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
41 
42  vector<string> emptyhosts;
43  addParam("Hosts", m_hosts, "GetEventFromSocket hosts", emptyhosts);
44  vector<int> emptysocks;
45  addParam("Ports", m_ports, "GetEventFromSocket Ports", emptysocks);
46  m_nrecv = 0;
47  m_compressionLevel = 0;
48 
49  //Parameter definition
50  B2INFO("Rx: Constructor done.");
51 }
52 
53 
54 GetEventFromSocketModule::~GetEventFromSocketModule()
55 {
56 }
57 
58 void GetEventFromSocketModule::initialize()
59 {
60  // Check event sources are not empty
61  if (m_hosts.size() == 0) {
62  B2FATAL("No hosts specified");
63  }
64 
65  // Load data objects definitions
66  gSystem->Load("libdataobjects");
67 
68  // Open receiving socekt
69  for (int i = 0; i < (int)m_hosts.size(); i++) {
70  // printf("Connecting to %s (port %d)\n", m_hosts[i].c_str(), m_ports[i]);
71  B2INFO("Connecting to " << m_hosts[i] << "(port " << m_ports[i] << ")");
72  EvtSocketSend* evtsock = new EvtSocketSend(m_hosts[i].c_str(), m_ports[i]);
73  m_socks.push_back(evtsock);
74  }
75  printf("All hosts connected\n");
76 
77  // Initialize DataStoreStreamer
78  m_streamer = new DataStoreStreamer(m_compressionLevel);
79 
80  // Prefetch first record in GetEventFromSocket
81  EvtMessage* msg = receive();
82  if (msg == NULL) {
83  B2FATAL("Did not receive any data, stopping initialization.");
84  return;
85  }
86  m_streamer->restoreDataStore(msg);
87 
88  // Delete buffers
89  delete msg;
90 
91  m_nrecv = -1;
92 
93  B2INFO("Rx initialized.");
94 }
95 
96 EvtMessage* GetEventFromSocketModule::receive()
97 {
98  // printf("GetEventFromSocket:receive here!\n");
99  B2DEBUG(1001, "GetEventFromSocket:receive here!");
100 
101  fd_set fds;
102  fd_set readfds;
103  FD_ZERO(&readfds);
104  int maxfd = 0;
105  for (int i = 0; i < (int)m_socks.size(); i++) {
106  int sockfd = (m_socks[i]->sock())->sock();
107  FD_SET(sockfd, &readfds);
108  if (sockfd > maxfd) maxfd = sockfd;
109  }
110 
111  while (1) {
112  // Look at socket
113  memcpy(&fds, &readfds, sizeof(fd_set));
114  // select() with blocking until data is ready on socket
115  int selstat = select(maxfd + 1, &fds, NULL, NULL, NULL);
116  // Check data on the socket or not
117  for (int i = 0; i < (int)m_socks.size(); i++) {
118  int sockfd = (m_socks[i]->sock())->sock();
119  if (FD_ISSET(sockfd, &fds)) {
120  EvtMessage* msg = m_socks[i]->recv();
121  return msg;
122  }
123  }
124  // printf("Select loop : should not come here. selstat = %d\n", selstat);
125  B2ERROR("Select loop : should not come here. selstat = " << selstat);
126  usleep(TIME_WAIT);
127  }
128 }
129 
130 void GetEventFromSocketModule::beginRun()
131 {
132  B2INFO("beginRun called.");
133 }
134 
135 
136 void GetEventFromSocketModule::event()
137 {
138  m_nrecv++;
139  // First event is already loaded
140  if (m_nrecv == 0) return;
141 
142  // Get a record from socket
143  EvtMessage* msg = receive();
144  B2INFO("Rx: got an event from Socket, size=" << msg->size());
145  // Check for termination record
146  if (msg->type() == MSG_TERMINATE) {
147  B2INFO("Rx: got termination message. Exitting....");
148  return;
149  // Flag End Of File !!!!!
150  // return msg->type(); // EOF
151  }
152 
153  // Restore DataStore
154  m_streamer->restoreDataStore(msg);
155  B2INFO("Rx: DataStore Restored!!");
156 
157  // Delete EvtMessage
158  delete msg;
159 
160  return;
161 }
162 
163 void GetEventFromSocketModule::endRun()
164 {
165  //fill Run data
166 
167  B2INFO("endRun done.");
168 }
169 
170 
171 void GetEventFromSocketModule::terminate()
172 {
173  delete m_streamer;
174  B2INFO("terminate called");
175 }
176 
Stream/restore DataStore objects to/from EvtMessage.
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:94
A class definition of an input module for Sequential ROOT I/O.
Base class for Modules.
Definition: Module.h:72
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.