9#include <daq/expreco/modules/GetEventFromSocket.h>
16#include <sys/socket.h>
17#include <netinet/in.h>
22#define TIME_WAIT 10000
42 vector<string> emptyhosts;
44 vector<int> emptysocks;
45 addParam(
"Ports", m_ports,
"GetEventFromSocket Ports", emptysocks);
50 B2INFO(
"Rx: Constructor done.");
54GetEventFromSocketModule::~GetEventFromSocketModule()
62 B2FATAL(
"No hosts specified");
66 gSystem->Load(
"libdataobjects");
69 for (
int i = 0; i < (int)
m_hosts.size(); i++) {
71 B2INFO(
"Connecting to " <<
m_hosts[i] <<
"(port " << m_ports[i] <<
")");
75 printf(
"All hosts connected\n");
83 B2FATAL(
"Did not receive any data, stopping initialization.");
93 B2INFO(
"Rx initialized.");
99 B2DEBUG(1001,
"GetEventFromSocket:receive here!");
105 for (
int i = 0; i < (int)
m_socks.size(); i++) {
106 int sockfd = (
m_socks[i]->sock())->sock();
107 FD_SET(sockfd, &readfds);
108 if (sockfd > maxfd) maxfd = sockfd;
113 memcpy(&fds, &readfds,
sizeof(fd_set));
115 int selstat = select(maxfd + 1, &fds, NULL, NULL, NULL);
117 for (
int i = 0; i < (int)
m_socks.size(); i++) {
118 int sockfd = (
m_socks[i]->sock())->sock();
119 if (FD_ISSET(sockfd, &fds)) {
125 B2ERROR(
"Select loop : should not come here. selstat = " << selstat);
132 B2INFO(
"beginRun called.");
144 B2INFO(
"Rx: got an event from Socket, size=" << msg->
size());
146 if (msg->
type() == MSG_TERMINATE) {
147 B2INFO(
"Rx: got termination message. Exitting....");
155 B2INFO(
"Rx: DataStore Restored!!");
167 B2INFO(
"endRun done.");
174 B2INFO(
"terminate called");
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
Class to manage streamed object.
ERecordType type() const
Get record type.
int size() const
Get size of message including headers.
virtual void event()
This method is the core of the module.
DataStoreStreamer * m_streamer
DataStoreStreamer.
virtual void initialize()
Module functions to be called from main process.
virtual void beginRun()
Module functions to be called from event process.
GetEventFromSocketModule()
Constructor / Destructor.
virtual void terminate()
This method is called at the end of the event processing.
virtual void endRun()
This method is called if the current run ends.
std::vector< EvtSocketSend * > m_socks
Reciever Socket.
int m_nrecv
No. of sent events.
int m_compressionLevel
Compression Level.
std::vector< std::string > m_hosts
Receiver Port.
void setDescription(const std::string &description)
Sets the description of the module.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.