Belle II Software  release-05-02-19
HLTSocket.cc
1 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
2 #include <framework/logging/Logger.h>
3 
4 #include <unistd.h>
5 #include <stdio.h>
6 #include <sys/types.h>
7 #include <netdb.h>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <errno.h>
11 #include <signal.h>
12 
13 #include <chrono>
14 #include <thread>
15 
16 using namespace Belle2;
17 
18 HLTSocket::~HLTSocket()
19 {
20  close(m_listener);
21  close(m_socket);
22  m_initialized = false;
23 }
24 
25 int HLTSocket::put(char* data, int len)
26 {
27  const int to_size = htonl(len);
28  const int br = write_data((char*) &to_size, sizeof(int));
29  if (br == 0) {
30  B2ERROR("Can not write to broken pipe.");
31  return 0;
32  } else if (br < 0) {
33  B2ERROR("Error in sending the size: " << strerror(errno));
34  return br;
35  }
36  const int bcount = write_data(data, len);
37  if (bcount == 0) {
38  B2ERROR("Can not write to broken pipe.");
39  return 0;
40  } else if (bcount < 0) {
41  B2ERROR("Error in sending the data: " << strerror(errno));
42  return bcount;
43  }
44  B2ASSERT("Written buffer size != buffer size in data!", bcount == len);
45  return bcount;
46 }
47 
48 int HLTSocket::put_wordbuf(int* data, int len)
49 {
50  // TODO: it is implicitly assumed that the first entry in the buffer is the buffer size!
51  const int gcount = data[0];
52  B2ASSERT("The first entry in the data must be the buffer size!", gcount == len);
53 
54  int bcount = write_data((char*) data, len * sizeof(int));
55  if (bcount == 0) {
56  B2ERROR("Can not write to broken pipe.");
57  return 0;
58  } else if (bcount < 0) {
59  B2ERROR("Error in sending the data: " << strerror(errno));
60  return bcount;
61  }
62  bcount = ((bcount - 1) / sizeof(int) + 1);
63 
64  B2ASSERT("Written buffer size != buffer size in data!", bcount == len);
65 
66  // ATTENTION: the returned size is size / 4
67  return bcount;
68 }
69 
70 int HLTSocket::write_data(char* data, int len)
71 {
72  errno = 0;
73  char* ptr = data;
74  int bcount = 0;
75  int br = 0;
76 
77  while (bcount < len) {
78  if ((br = ::write(m_socket, ptr, len - bcount)) > 0) {
79  bcount += br;
80  ptr += br;
81  }
82  if (br < 0) {
83  switch (errno) {
84  case EINTR:
85  return -1;
86  case EPIPE:
87  return 0; // connection closed, sigpipe
88  default:
89  return -1;
90  }
91  }
92  }
93  return bcount;
94 }
95 
96 int HLTSocket::get(char* data, int len)
97 {
98  int gcount;
99  const int br = read_data((char*) &gcount, 4);
100  if (br < 0) {
101  B2ERROR("Error in getting the size: " << strerror(errno));
102  return br;
103  } else if (br == 0) {
104  B2WARNING("No data from socket!");
105  return 0;
106  }
107 
108  gcount = ntohl(gcount);
109  if (gcount > len) {
110  B2ERROR("buffer too small! " << gcount << " < " << len);
111  return -1;
112  }
113  const int bcount = read_data(data, gcount);
114  if (bcount < 0) {
115  B2ERROR("Error in getting the data: " << strerror(errno));
116  return bcount;
117  } else if (bcount == 0) {
118  B2WARNING("No data from socket!");
119  return 0;
120  }
121  B2ASSERT("Read buffer size != buffer size in data!", bcount == gcount);
122  return bcount;
123 }
124 
125 int HLTSocket::get_wordbuf(int* wrdbuf, int len)
126 {
127  int br = read_data((char*) wrdbuf, sizeof(int));
128  if (br < 0) {
129  B2ERROR("Error in getting the size: " << strerror(errno));
130  return br;
131  } else if (br == 0) {
132  B2WARNING("No data from socket!");
133  return 0;
134  }
135 
136  const int gcount = (wrdbuf[0] - 1) * sizeof(int);
137  if (gcount > len) {
138  B2ERROR("buffer too small! " << gcount << " < " << len);
139  return -1;
140  }
141  // ATTENTION: the send size is size / 4
142  const int bcount = read_data((char*) &wrdbuf[1], gcount);
143  if (bcount < 0) {
144  B2ERROR("Error in getting the data: " << strerror(errno));
145  return bcount;
146  } else if (bcount == 0) {
147  B2WARNING("No data from socket!");
148  return 0;
149  }
150 
151  B2ASSERT("Read buffer size != buffer size in data: " << bcount << " != " << gcount, bcount == gcount);
152  return (wrdbuf[0]);
153 }
154 
155 int HLTSocket::read_data(char* data, int len)
156 {
157  char* buf = data;
158  int bcount = 0;
159  int br = 0;
160 
161  while (bcount < len) {
162  if ((br = ::read(m_socket, buf, len - bcount)) > 0) {
163  bcount += br;
164  buf += br;
165  }
166  if (br == 0) return 0;
167  if (br < 0) {
168  switch (errno) {
169  case EINTR:
170  return -1;
171  case EAGAIN:
172  continue;
173  default:
174  return -1;
175  }
176  }
177  }
178  return bcount;
179 }
180 
181 bool HLTSocket::accept(unsigned int port)
182 {
183  // Before going on make sure to close the old socket
184  close(m_socket);
185 
186  // Initialize the first listener
187  if (m_listener <= 0) {
188  B2RESULT("Started listening for new clients");
189  struct sockaddr_in sa;
190  bzero(&sa, sizeof(struct sockaddr_in));
191 
192  sa.sin_family = AF_INET;
193  sa.sin_port = htons(port);
194 
195  if ((m_listener = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
196  B2ERROR("Socket initialization failed: " << strerror(errno));
197  return false;
198  }
199 
200  int optval = 1;
201  setsockopt(m_listener, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
202 
203  int sizeval = D2_SOCKBUF_SIZE;
204  setsockopt(m_listener, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
205  setsockopt(m_listener, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
206 
207  signal(SIGPIPE, SIG_IGN);
208 
209  if ((bind(m_listener, (struct sockaddr*) &sa, sizeof(sa))) < 0) {
210  B2ERROR("Socket binding failed: " << strerror(errno));
211  return false;
212  }
213 
214  listen(m_listener, 3);
215  }
216 
217  struct sockaddr_in isa;
218  socklen_t i = sizeof(isa);
219  getsockname(m_listener, (struct sockaddr*)&isa, &i);
220 
221  B2RESULT("Started accepting new clients");
222  if ((m_socket =::accept(m_listener, (struct sockaddr*)&isa, &i)) < 0) {
223  B2ERROR("Socket accepting failed: " << strerror(errno));
224  return false;
225  }
226 
227  B2RESULT("Accepted connection with socket: " << m_socket);
228  m_initialized = true;
229  return true;
230 }
231 
232 void HLTSocket::close(int socket)
233 {
234  if (socket > 0) {
235  shutdown(socket, 2);
236  ::close(socket);
237  B2RESULT("Socket closed: " << socket);
238  }
239 }
240 
241 bool HLTSocket::connect(const std::string& hostName, unsigned int port, const HLTMainLoop& mainLoop)
242 {
243  // Before going on make sure to close the old socket
244  close(m_socket);
245 
246  struct hostent* hp;
247  if ((hp = gethostbyname(hostName.c_str())) == NULL) {
248  B2ERROR("Host not found: " << strerror(errno));
249  return false;
250  }
251 
252  struct sockaddr_in sa;
253  bzero(&sa, sizeof(sa));
254  bcopy(hp->h_addr, (char*) &sa.sin_addr, hp->h_length);
255  sa.sin_family = hp->h_addrtype;
256  sa.sin_port = htons((u_short) port);
257 
258  if ((m_socket = socket(hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
259  B2ERROR("Socket initialization failed: " << strerror(errno));
260  return false;
261  }
262 
263  int sizeval = D2_SOCKBUF_SIZE;
264  setsockopt(m_socket, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
265  setsockopt(m_socket, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
266  int yes = 1;
267  setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
268 
269  signal(SIGPIPE, SIG_IGN);
270 
271  using namespace std::chrono_literals;
272 
273  int maxretry = 0;
274  while (::connect(m_socket, (struct sockaddr*) &sa, sizeof(sa)) < 0) {
275  if (errno == ETIMEDOUT or errno == ECONNREFUSED) {
276  B2WARNING("Connection failed, will retry in 1 second... " << maxretry);
277  std::this_thread::sleep_for(1s);
278  } else {
279  B2ERROR("Socket initialization failed: " << strerror(errno));
280  return false;
281  }
282 
283  if (not mainLoop.isRunning()) {
284  return false;
285  }
286  maxretry++;
287  }
288 
289  B2RESULT("Connected with socket: " << m_socket);
290  m_initialized = true;
291  return true;
292 }
293 
294 bool HLTSocket::initialized() const
295 {
296  return m_initialized;
297 }
298 void HLTSocket::deinitialize()
299 {
300  m_initialized = false;
301 }
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::HLTMainLoop
Definition: HLTMainLoop.h:28