1 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
2 #include <framework/logging/Logger.h>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
18 HLTSocket::~HLTSocket()
22 m_initialized =
false;
25 int HLTSocket::put(
char* data,
int len)
27 const int to_size = htonl(len);
28 const int br = write_data((
char*) &to_size,
sizeof(
int));
30 B2ERROR(
"Can not write to broken pipe.");
33 B2ERROR(
"Error in sending the size: " << strerror(errno));
36 const int bcount = write_data(data, len);
38 B2ERROR(
"Can not write to broken pipe.");
40 }
else if (bcount < 0) {
41 B2ERROR(
"Error in sending the data: " << strerror(errno));
44 B2ASSERT(
"Written buffer size != buffer size in data!", bcount == len);
48 int HLTSocket::put_wordbuf(
int* data,
int len)
51 const int gcount = data[0];
52 B2ASSERT(
"The first entry in the data must be the buffer size!", gcount == len);
54 int bcount = write_data((
char*) data, len *
sizeof(
int));
56 B2ERROR(
"Can not write to broken pipe.");
58 }
else if (bcount < 0) {
59 B2ERROR(
"Error in sending the data: " << strerror(errno));
62 bcount = ((bcount - 1) /
sizeof(
int) + 1);
64 B2ASSERT(
"Written buffer size != buffer size in data!", bcount == len);
70 int HLTSocket::write_data(
char* data,
int len)
77 while (bcount < len) {
78 if ((br = ::write(m_socket, ptr, len - bcount)) > 0) {
96 int HLTSocket::get(
char* data,
int len)
99 const int br = read_data((
char*) &gcount, 4);
101 B2ERROR(
"Error in getting the size: " << strerror(errno));
103 }
else if (br == 0) {
104 B2WARNING(
"No data from socket!");
108 gcount = ntohl(gcount);
110 B2ERROR(
"buffer too small! " << gcount <<
" < " << len);
113 const int bcount = read_data(data, gcount);
115 B2ERROR(
"Error in getting the data: " << strerror(errno));
117 }
else if (bcount == 0) {
118 B2WARNING(
"No data from socket!");
121 B2ASSERT(
"Read buffer size != buffer size in data!", bcount == gcount);
125 int HLTSocket::get_wordbuf(
int* wrdbuf,
int len)
127 int br = read_data((
char*) wrdbuf,
sizeof(
int));
129 B2ERROR(
"Error in getting the size: " << strerror(errno));
131 }
else if (br == 0) {
132 B2WARNING(
"No data from socket!");
136 const int gcount = (wrdbuf[0] - 1) *
sizeof(
int);
138 B2ERROR(
"buffer too small! " << gcount <<
" < " << len);
142 const int bcount = read_data((
char*) &wrdbuf[1], gcount);
144 B2ERROR(
"Error in getting the data: " << strerror(errno));
146 }
else if (bcount == 0) {
147 B2WARNING(
"No data from socket!");
151 B2ASSERT(
"Read buffer size != buffer size in data: " << bcount <<
" != " << gcount, bcount == gcount);
155 int HLTSocket::read_data(
char* data,
int len)
161 while (bcount < len) {
162 if ((br = ::read(m_socket, buf, len - bcount)) > 0) {
166 if (br == 0)
return 0;
181 bool HLTSocket::accept(
unsigned int port)
187 if (m_listener <= 0) {
188 B2RESULT(
"Started listening for new clients");
189 struct sockaddr_in sa;
190 bzero(&sa,
sizeof(
struct sockaddr_in));
192 sa.sin_family = AF_INET;
193 sa.sin_port = htons(port);
195 if ((m_listener = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
196 B2ERROR(
"Socket initialization failed: " << strerror(errno));
201 setsockopt(m_listener, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
203 int sizeval = D2_SOCKBUF_SIZE;
204 setsockopt(m_listener, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
205 setsockopt(m_listener, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
207 signal(SIGPIPE, SIG_IGN);
209 if ((bind(m_listener, (
struct sockaddr*) &sa,
sizeof(sa))) < 0) {
210 B2ERROR(
"Socket binding failed: " << strerror(errno));
214 listen(m_listener, 3);
217 struct sockaddr_in isa;
218 socklen_t i =
sizeof(isa);
219 getsockname(m_listener, (
struct sockaddr*)&isa, &i);
221 B2RESULT(
"Started accepting new clients");
222 if ((m_socket =::accept(m_listener, (
struct sockaddr*)&isa, &i)) < 0) {
223 B2ERROR(
"Socket accepting failed: " << strerror(errno));
227 B2RESULT(
"Accepted connection with socket: " << m_socket);
228 m_initialized =
true;
232 void HLTSocket::close(
int socket)
237 B2RESULT(
"Socket closed: " << socket);
241 bool HLTSocket::connect(
const std::string& hostName,
unsigned int port,
const HLTMainLoop& mainLoop)
247 if ((hp = gethostbyname(hostName.c_str())) == NULL) {
248 B2ERROR(
"Host not found: " << strerror(errno));
252 struct sockaddr_in sa;
253 bzero(&sa,
sizeof(sa));
254 bcopy(hp->h_addr, (
char*) &sa.sin_addr, hp->h_length);
255 sa.sin_family = hp->h_addrtype;
256 sa.sin_port = htons((u_short) port);
258 if ((m_socket = socket(hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
259 B2ERROR(
"Socket initialization failed: " << strerror(errno));
263 int sizeval = D2_SOCKBUF_SIZE;
264 setsockopt(m_socket, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
265 setsockopt(m_socket, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
267 setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
269 signal(SIGPIPE, SIG_IGN);
271 using namespace std::chrono_literals;
274 while (::connect(m_socket, (
struct sockaddr*) &sa,
sizeof(sa)) < 0) {
275 if (errno == ETIMEDOUT or errno == ECONNREFUSED) {
276 B2WARNING(
"Connection failed, will retry in 1 second... " << maxretry);
277 std::this_thread::sleep_for(1s);
279 B2ERROR(
"Socket initialization failed: " << strerror(errno));
283 if (not mainLoop.isRunning()) {
289 B2RESULT(
"Connected with socket: " << m_socket);
290 m_initialized =
true;
294 bool HLTSocket::initialized()
const
296 return m_initialized;
298 void HLTSocket::deinitialize()
300 m_initialized =
false;