9 #include <daq/expreco/modules/GetEventFromSocket.h>
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
22 #define TIME_WAIT 10000
39 setDescription(
"Get Event from EvtSocket");
42 vector<string> emptyhosts;
43 addParam(
"Hosts", m_hosts,
"GetEventFromSocket hosts", emptyhosts);
44 vector<int> emptysocks;
45 addParam(
"Ports", m_ports,
"GetEventFromSocket Ports", emptysocks);
47 m_compressionLevel = 0;
50 B2INFO(
"Rx: Constructor done.");
54 GetEventFromSocketModule::~GetEventFromSocketModule()
58 void GetEventFromSocketModule::initialize()
61 if (m_hosts.size() == 0) {
62 B2FATAL(
"No hosts specified");
66 gSystem->Load(
"libdataobjects");
69 for (
int i = 0; i < (int)m_hosts.size(); i++) {
71 B2INFO(
"Connecting to " << m_hosts[i] <<
"(port " << m_ports[i] <<
")");
73 m_socks.push_back(evtsock);
75 printf(
"All hosts connected\n");
83 B2FATAL(
"Did not receive any data, stopping initialization.");
86 m_streamer->restoreDataStore(msg);
93 B2INFO(
"Rx initialized.");
96 EvtMessage* GetEventFromSocketModule::receive()
99 B2DEBUG(1001,
"GetEventFromSocket:receive here!");
105 for (
int i = 0; i < (int)m_socks.size(); i++) {
106 int sockfd = (m_socks[i]->sock())->sock();
107 FD_SET(sockfd, &readfds);
108 if (sockfd > maxfd) maxfd = sockfd;
113 memcpy(&fds, &readfds,
sizeof(fd_set));
115 int selstat = select(maxfd + 1, &fds, NULL, NULL, NULL);
117 for (
int i = 0; i < (int)m_socks.size(); i++) {
118 int sockfd = (m_socks[i]->sock())->sock();
119 if (FD_ISSET(sockfd, &fds)) {
125 B2ERROR(
"Select loop : should not come here. selstat = " << selstat);
130 void GetEventFromSocketModule::beginRun()
132 B2INFO(
"beginRun called.");
136 void GetEventFromSocketModule::event()
140 if (m_nrecv == 0)
return;
144 B2INFO(
"Rx: got an event from Socket, size=" << msg->
size());
146 if (msg->
type() == MSG_TERMINATE) {
147 B2INFO(
"Rx: got termination message. Exitting....");
154 m_streamer->restoreDataStore(msg);
155 B2INFO(
"Rx: DataStore Restored!!");
163 void GetEventFromSocketModule::endRun()
167 B2INFO(
"endRun done.");
171 void GetEventFromSocketModule::terminate()
174 B2INFO(
"terminate called");
Stream/restore DataStore objects to/from EvtMessage.
Class to manage streamed object.
ERecordType type() const
Get record type.
int size() const
Get size of message including headers.
A class definition of an input module for Sequential ROOT I/O.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.