8#include <daq/rfarm/event/hltsocket/HLTSocket.h>
9#include <framework/logging/Logger.h>
15#include <sys/socket.h>
16#include <netinet/in.h>
25HLTSocket::~HLTSocket()
29 m_initialized =
false;
32int HLTSocket::put(
char* data,
int len)
34 const int to_size = htonl(len);
35 const int br = write_data((
char*) &to_size,
sizeof(
int));
37 B2ERROR(
"Can not write to broken pipe.");
40 B2ERROR(
"Error in sending the size: " << strerror(errno));
43 const int bcount = write_data(data, len);
45 B2ERROR(
"Can not write to broken pipe.");
47 }
else if (bcount < 0) {
48 B2ERROR(
"Error in sending the data: " << strerror(errno));
51 B2ASSERT(
"Written buffer size != buffer size in data!", bcount == len);
55int HLTSocket::put_wordbuf(
int* data,
int len)
58 const int gcount = data[0];
59 B2ASSERT(
"The first entry in the data must be the buffer size!", gcount == len);
61 int bcount = write_data((
char*) data, len *
sizeof(
int));
63 B2ERROR(
"Can not write to broken pipe.");
65 }
else if (bcount < 0) {
66 B2ERROR(
"Error in sending the data: " << strerror(errno));
69 bcount = ((bcount - 1) /
sizeof(
int) + 1);
71 B2ASSERT(
"Written buffer size != buffer size in data!", bcount == len);
77int HLTSocket::write_data(
char* data,
int len)
83 while (bcount < len) {
85 if ((br = ::write(m_socket, ptr, len - bcount)) > 0) {
103int HLTSocket::get(
char* data,
int len)
106 const int br = read_data((
char*) &gcount, 4);
108 B2ERROR(
"Error in getting the size: " << strerror(errno));
110 }
else if (br == 0) {
111 B2WARNING(
"No data from socket!");
115 gcount = ntohl(gcount);
117 B2ERROR(
"buffer too small! " << gcount <<
" < " << len);
120 const int bcount = read_data(data, gcount);
122 B2ERROR(
"Error in getting the data: " << strerror(errno));
124 }
else if (bcount == 0) {
125 B2WARNING(
"No data from socket!");
128 B2ASSERT(
"Read buffer size != buffer size in data!", bcount == gcount);
132int HLTSocket::get_wordbuf(
int* wrdbuf,
int len)
134 int br = read_data((
char*) wrdbuf,
sizeof(
int));
136 B2ERROR(
"Error in getting the size: " << strerror(errno));
138 }
else if (br == 0) {
139 B2WARNING(
"No data from socket!");
143 const int gcount = (wrdbuf[0] - 1) *
sizeof(
int);
145 B2ERROR(
"buffer too small! " << gcount <<
" < " << len);
149 const int bcount = read_data((
char*) &wrdbuf[1], gcount);
151 B2ERROR(
"Error in getting the data: " << strerror(errno));
153 }
else if (bcount == 0) {
154 B2WARNING(
"No data from socket!");
158 B2ASSERT(
"Read buffer size != buffer size in data: " << bcount <<
" != " << gcount, bcount == gcount);
162int HLTSocket::read_data(
char* data,
int len)
167 while (bcount < len) {
169 if ((br = ::read(m_socket, buf, len - bcount)) > 0) {
173 if (br == 0)
return 0;
188bool HLTSocket::accept(
unsigned int port)
194 if (m_listener <= 0) {
195 B2RESULT(
"Started listening for new clients");
196 struct sockaddr_in sa;
197 bzero(&sa,
sizeof(
struct sockaddr_in));
199 sa.sin_family = AF_INET;
200 sa.sin_port = htons(port);
202 if ((m_listener = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
203 B2ERROR(
"Socket initialization failed: " << strerror(errno));
208 setsockopt(m_listener, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
210 int sizeval = D2_SOCKBUF_SIZE;
211 setsockopt(m_listener, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
212 setsockopt(m_listener, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
214 signal(SIGPIPE, SIG_IGN);
216 if ((bind(m_listener, (
struct sockaddr*) &sa,
sizeof(sa))) < 0) {
217 B2ERROR(
"Socket binding failed: " << strerror(errno));
221 listen(m_listener, 3);
224 struct sockaddr_in isa;
225 socklen_t i =
sizeof(isa);
226 getsockname(m_listener, (
struct sockaddr*)&isa, &i);
228 B2RESULT(
"Started accepting new clients");
229 if ((m_socket =::accept(m_listener, (
struct sockaddr*)&isa, &i)) < 0) {
230 B2ERROR(
"Socket accepting failed: " << strerror(errno));
234 B2RESULT(
"Accepted connection with socket: " << m_socket);
235 m_initialized =
true;
239void HLTSocket::close(
int socket)
244 B2RESULT(
"Socket closed: " << socket);
248bool HLTSocket::connect(
const std::string& hostName,
unsigned int port,
const HLTMainLoop& mainLoop)
254 if ((hp = gethostbyname(hostName.c_str())) == NULL) {
255 B2ERROR(
"Host not found: " << strerror(errno));
259 struct sockaddr_in sa;
260 bzero(&sa,
sizeof(sa));
261 bcopy(hp->h_addr, (
char*) &sa.sin_addr, hp->h_length);
262 sa.sin_family = hp->h_addrtype;
263 sa.sin_port = htons((u_short) port);
265 if ((m_socket = socket(hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
266 B2ERROR(
"Socket initialization failed: " << strerror(errno));
270 int sizeval = D2_SOCKBUF_SIZE;
271 setsockopt(m_socket, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
272 setsockopt(m_socket, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
274 setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
276 signal(SIGPIPE, SIG_IGN);
278 using namespace std::chrono_literals;
281 while (::connect(m_socket, (
struct sockaddr*) &sa,
sizeof(sa)) < 0) {
282 if (errno == ETIMEDOUT or errno == ECONNREFUSED) {
283 B2WARNING(
"Connection failed, will retry in 1 second... " << maxretry);
284 std::this_thread::sleep_for(1s);
286 B2ERROR(
"Socket initialization failed: " << strerror(errno));
290 if (not mainLoop.isRunning()) {
296 B2RESULT(
"Connected with socket: " << m_socket);
297 m_initialized =
true;
301bool HLTSocket::initialized()
const
303 return m_initialized;
305void HLTSocket::deinitialize()
307 m_initialized =
false;
Abstract base class for different kinds of events.