Belle II Software  release-08-01-10
HLTSocket.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
9 #include <framework/logging/Logger.h>
10 
11 #include <unistd.h>
12 #include <stdio.h>
13 #include <sys/types.h>
14 #include <netdb.h>
15 #include <sys/socket.h>
16 #include <netinet/in.h>
17 #include <errno.h>
18 #include <signal.h>
19 
20 #include <chrono>
21 #include <thread>
22 
23 using namespace Belle2;
24 
25 HLTSocket::~HLTSocket()
26 {
27  close(m_listener);
28  close(m_socket);
29  m_initialized = false;
30 }
31 
32 int HLTSocket::put(char* data, int len)
33 {
34  const int to_size = htonl(len);
35  const int br = write_data((char*) &to_size, sizeof(int));
36  if (br == 0) {
37  B2ERROR("Can not write to broken pipe.");
38  return 0;
39  } else if (br < 0) {
40  B2ERROR("Error in sending the size: " << strerror(errno));
41  return br;
42  }
43  const int bcount = write_data(data, len);
44  if (bcount == 0) {
45  B2ERROR("Can not write to broken pipe.");
46  return 0;
47  } else if (bcount < 0) {
48  B2ERROR("Error in sending the data: " << strerror(errno));
49  return bcount;
50  }
51  B2ASSERT("Written buffer size != buffer size in data!", bcount == len);
52  return bcount;
53 }
54 
55 int HLTSocket::put_wordbuf(int* data, int len)
56 {
57  // TODO: it is implicitly assumed that the first entry in the buffer is the buffer size!
58  const int gcount = data[0];
59  B2ASSERT("The first entry in the data must be the buffer size!", gcount == len);
60 
61  int bcount = write_data((char*) data, len * sizeof(int));
62  if (bcount == 0) {
63  B2ERROR("Can not write to broken pipe.");
64  return 0;
65  } else if (bcount < 0) {
66  B2ERROR("Error in sending the data: " << strerror(errno));
67  return bcount;
68  }
69  bcount = ((bcount - 1) / sizeof(int) + 1);
70 
71  B2ASSERT("Written buffer size != buffer size in data!", bcount == len);
72 
73  // ATTENTION: the returned size is size / 4
74  return bcount;
75 }
76 
77 int HLTSocket::write_data(char* data, int len)
78 {
79  errno = 0;
80  char* ptr = data;
81  int bcount = 0;
82  int br = 0;
83 
84  while (bcount < len) {
85  if ((br = ::write(m_socket, ptr, len - bcount)) > 0) {
86  bcount += br;
87  ptr += br;
88  }
89  if (br < 0) {
90  switch (errno) {
91  case EINTR:
92  return -1;
93  case EPIPE:
94  return 0; // connection closed, sigpipe
95  default:
96  return -1;
97  }
98  }
99  }
100  return bcount;
101 }
102 
103 int HLTSocket::get(char* data, int len)
104 {
105  int gcount;
106  const int br = read_data((char*) &gcount, 4);
107  if (br < 0) {
108  B2ERROR("Error in getting the size: " << strerror(errno));
109  return br;
110  } else if (br == 0) {
111  B2WARNING("No data from socket!");
112  return 0;
113  }
114 
115  gcount = ntohl(gcount);
116  if (gcount > len) {
117  B2ERROR("buffer too small! " << gcount << " < " << len);
118  return -1;
119  }
120  const int bcount = read_data(data, gcount);
121  if (bcount < 0) {
122  B2ERROR("Error in getting the data: " << strerror(errno));
123  return bcount;
124  } else if (bcount == 0) {
125  B2WARNING("No data from socket!");
126  return 0;
127  }
128  B2ASSERT("Read buffer size != buffer size in data!", bcount == gcount);
129  return bcount;
130 }
131 
132 int HLTSocket::get_wordbuf(int* wrdbuf, int len)
133 {
134  int br = read_data((char*) wrdbuf, sizeof(int));
135  if (br < 0) {
136  B2ERROR("Error in getting the size: " << strerror(errno));
137  return br;
138  } else if (br == 0) {
139  B2WARNING("No data from socket!");
140  return 0;
141  }
142 
143  const int gcount = (wrdbuf[0] - 1) * sizeof(int);
144  if (gcount > len) {
145  B2ERROR("buffer too small! " << gcount << " < " << len);
146  return -1;
147  }
148  // ATTENTION: the send size is size / 4
149  const int bcount = read_data((char*) &wrdbuf[1], gcount);
150  if (bcount < 0) {
151  B2ERROR("Error in getting the data: " << strerror(errno));
152  return bcount;
153  } else if (bcount == 0) {
154  B2WARNING("No data from socket!");
155  return 0;
156  }
157 
158  B2ASSERT("Read buffer size != buffer size in data: " << bcount << " != " << gcount, bcount == gcount);
159  return (wrdbuf[0]);
160 }
161 
162 int HLTSocket::read_data(char* data, int len)
163 {
164  char* buf = data;
165  int bcount = 0;
166  int br = 0;
167 
168  while (bcount < len) {
169  if ((br = ::read(m_socket, buf, len - bcount)) > 0) {
170  bcount += br;
171  buf += br;
172  }
173  if (br == 0) return 0;
174  if (br < 0) {
175  switch (errno) {
176  case EINTR:
177  return -1;
178  case EAGAIN:
179  continue;
180  default:
181  return -1;
182  }
183  }
184  }
185  return bcount;
186 }
187 
188 bool HLTSocket::accept(unsigned int port)
189 {
190  // Before going on make sure to close the old socket
191  close(m_socket);
192 
193  // Initialize the first listener
194  if (m_listener <= 0) {
195  B2RESULT("Started listening for new clients");
196  struct sockaddr_in sa;
197  bzero(&sa, sizeof(struct sockaddr_in));
198 
199  sa.sin_family = AF_INET;
200  sa.sin_port = htons(port);
201 
202  if ((m_listener = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
203  B2ERROR("Socket initialization failed: " << strerror(errno));
204  return false;
205  }
206 
207  int optval = 1;
208  setsockopt(m_listener, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
209 
210  int sizeval = D2_SOCKBUF_SIZE;
211  setsockopt(m_listener, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
212  setsockopt(m_listener, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
213 
214  signal(SIGPIPE, SIG_IGN);
215 
216  if ((bind(m_listener, (struct sockaddr*) &sa, sizeof(sa))) < 0) {
217  B2ERROR("Socket binding failed: " << strerror(errno));
218  return false;
219  }
220 
221  listen(m_listener, 3);
222  }
223 
224  struct sockaddr_in isa;
225  socklen_t i = sizeof(isa);
226  getsockname(m_listener, (struct sockaddr*)&isa, &i);
227 
228  B2RESULT("Started accepting new clients");
229  if ((m_socket =::accept(m_listener, (struct sockaddr*)&isa, &i)) < 0) {
230  B2ERROR("Socket accepting failed: " << strerror(errno));
231  return false;
232  }
233 
234  B2RESULT("Accepted connection with socket: " << m_socket);
235  m_initialized = true;
236  return true;
237 }
238 
239 void HLTSocket::close(int socket)
240 {
241  if (socket > 0) {
242  shutdown(socket, 2);
243  ::close(socket);
244  B2RESULT("Socket closed: " << socket);
245  }
246 }
247 
248 bool HLTSocket::connect(const std::string& hostName, unsigned int port, const HLTMainLoop& mainLoop)
249 {
250  // Before going on make sure to close the old socket
251  close(m_socket);
252 
253  struct hostent* hp;
254  if ((hp = gethostbyname(hostName.c_str())) == NULL) {
255  B2ERROR("Host not found: " << strerror(errno));
256  return false;
257  }
258 
259  struct sockaddr_in sa;
260  bzero(&sa, sizeof(sa));
261  bcopy(hp->h_addr, (char*) &sa.sin_addr, hp->h_length);
262  sa.sin_family = hp->h_addrtype;
263  sa.sin_port = htons((u_short) port);
264 
265  if ((m_socket = socket(hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
266  B2ERROR("Socket initialization failed: " << strerror(errno));
267  return false;
268  }
269 
270  int sizeval = D2_SOCKBUF_SIZE;
271  setsockopt(m_socket, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
272  setsockopt(m_socket, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
273  int yes = 1;
274  setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
275 
276  signal(SIGPIPE, SIG_IGN);
277 
278  using namespace std::chrono_literals;
279 
280  int maxretry = 0;
281  while (::connect(m_socket, (struct sockaddr*) &sa, sizeof(sa)) < 0) {
282  if (errno == ETIMEDOUT or errno == ECONNREFUSED) {
283  B2WARNING("Connection failed, will retry in 1 second... " << maxretry);
284  std::this_thread::sleep_for(1s);
285  } else {
286  B2ERROR("Socket initialization failed: " << strerror(errno));
287  return false;
288  }
289 
290  if (not mainLoop.isRunning()) {
291  return false;
292  }
293  maxretry++;
294  }
295 
296  B2RESULT("Connected with socket: " << m_socket);
297  m_initialized = true;
298  return true;
299 }
300 
301 bool HLTSocket::initialized() const
302 {
303  return m_initialized;
304 }
305 void HLTSocket::deinitialize()
306 {
307  m_initialized = false;
308 }
Abstract base class for different kinds of events.