Belle II Software  release-08-01-10
SocketLib.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <unistd.h>
10 #include <stdlib.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <netdb.h>
14 #include <sys/socket.h>
15 #include <netinet/in.h>
16 #include <errno.h>
17 #include <sys/time.h>
18 #include <signal.h>
19 
20 #include "daq/dataflow/SocketLib.h"
21 
22 #define MAXHOSTNAME 100
23 
24 using namespace Belle2;
25 
26 // SocketIO class
27 SocketIO::SocketIO()
28 {
29 }
30 
31 SocketIO::~SocketIO()
32 {
33 }
34 
35 
36 int SocketIO::put(int sock, char* data, int len)
37 {
38  int to_size = htonl(len);
39  // printf("SocketIO::put(%d): sending size...\n",sock);
40  int br = write_data(sock, (char*)&to_size, 4);
41  if (br < 0) {
42  perror("put: sending size");
43  return br;
44  }
45  // printf("SocketIO::put(%d): sending data...\n",sock);
46  int bcount = write_data(sock, data, len);
47  if (bcount < 0) perror("put: sending data");
48  return (bcount);
49 }
50 
51 int SocketIO::put_wordbuf(int sock, int* data, int len)
52 {
53  // printf("SocketIO::put_wordbuf(%d): sending data...\n",sock);
54  int bcount = write_data(sock, (char*)data, len * sizeof(int));
55  if (bcount < 0) perror("put: sending data");
56  if (bcount <= 0)
57  return bcount;
58  else
59  return ((bcount - 1) / 4 + 1);
60 }
61 
62 int SocketIO::write_data(int sock, char* data, int len)
63 {
64  errno = 0;
65  char* ptr = data;
66  int bcount = 0;
67 
68  // printf("write_data( sock=%d. data=%p. len=%d )\n", sock, data, len);
69 
70  while (bcount < len) {
71  int br = 0;
72  if ((br =::write(sock, ptr, len - bcount)) > 0) {
73  bcount += br;
74  ptr += br;
75  }
76  if (br < 0) {
77  switch (errno) {
78  case EINTR:
79  return -1;
80  // continue;
81  case EPIPE:
82  return -1; // connection closed, sigpipe
83  case ENETUNREACH:
84  case EHOSTUNREACH:
85  case ETIMEDOUT:
86  usleep(500);
87  continue;
88  default: return -1;
89  }
90  }
91  }
92  return (bcount);
93 }
94 
95 int SocketIO::get(int sock, char* data, int len)
96 {
97  int gcount;
98  int br = read_data(sock, (char*)&gcount, 4);
99  if (br <= 0) return br;
100  gcount = ntohl(gcount);
101  if (gcount > len) {
102  printf("buffer too small : %d(%d)", gcount, len);
103  exit(0);
104  }
105  int bcount = read_data(sock, data, gcount);
106  return (bcount);
107 }
108 
109 int SocketIO::get_wordbuf(int sock, int* wrdbuf, int len)
110 {
111  int gcount;
112  int br = read_data(sock, (char*)wrdbuf, sizeof(int));
113  if (br <= 0) return br;
114  // gcount = ntohl(wrdbuf[0]);
115  gcount = wrdbuf[0];
116  // printf ( "gcount = %8.8x (%d)\n", gcount, gcount );
117  if (gcount > len) {
118  printf("buffer too small : %d(%d)", gcount, len);
119  exit(0);
120  }
121  read_data(sock, (char*)&wrdbuf[1], (gcount - 1) * sizeof(int));
122  // printf ( "term = %8.8x\n", wrdbuf[gcount-1] );
123  return (wrdbuf[0]);
124 }
125 
126 int SocketIO::get_pxd(int sock, char* data, int len)
127 {
128 #define MAX_PXD_FRAMES 256
129  const int headerlen = 8;
130  int* pxdheader = (int*) data; // TODO should it be unsigned int?
131  int* pxdheadertable = (int*) &data[headerlen]; // TODO should it be unsigned int?
132  int framenr = 0, tablelen = 0, datalen = 0;
133  int br = read_data(sock, data, headerlen);
134  if (br <= 0) return br;
135  if (pxdheader[0] != htonl(0xCAFEBABE)) {
136  printf("pxdheader wrong : Magic %X , Frames %X \n", pxdheader[0], ntohl(pxdheader[1]));
137  exit(0);
138  }
139  pxdheader[0] = ntohl(pxdheader[0]);
140  framenr = pxdheader[1] = ntohl(pxdheader[1]);
141  if (framenr > MAX_PXD_FRAMES) {
142  printf("MAX_PXD_FRAMES too small : %d(%d) \n", framenr, MAX_PXD_FRAMES);
143  exit(0);
144  }
145  tablelen = 4 * framenr;
146  br = read_data(sock, (char*)&data[headerlen], tablelen);
147  if (br <= 0) return br;
148  for (int i = 0; i < framenr; i++) {
149  pxdheadertable[i] = ntohl(pxdheadertable[i]);
150  datalen += (pxdheadertable[i] + 3) & 0xFFFFFFFC;
151  }
152 
153  if (datalen + headerlen + tablelen > len) {
154  printf("buffer too small : %d %d %d(%d) \n", headerlen, tablelen, datalen, len);
155  exit(0);
156  }
157  int bcount = read_data(sock, data + headerlen + tablelen, datalen);
158  if (br <= 0) return br;
159  return (headerlen + tablelen + bcount);
160 }
161 
162 int SocketIO::read_data(int sock, char* data, int len)
163 {
164  char* buf = data;
165  int bcount = 0;
166 
167  while (bcount < len) {
168  int br = 0;
169  if ((br =::read(sock, buf, len - bcount)) > 0) {
170  bcount += br;
171  buf += br;
172  }
173  fflush(stdout);
174  if (br == 0) return 0;
175  if (br < 0) {
176  switch (errno) {
177  case EINTR:
178  if (m_int == 1) {
179  printf("read: interrupted!\n");
180  m_int = 0;
181  return -2;
182  } else
183  // continue;
184  return -1;
185  case EAGAIN:
186  continue;
187  default:
188  perror("SocketIO:read");
189  fprintf(stderr, "sock = %d, buf=%p, len = %d\n", sock, buf, len - bcount);
190  return -1;
191  }
192  }
193  }
194  // printf ( "SocketIO::read_data ended : bcount = %d!!!\n", bcount );
195  return (bcount);
196 }
197 
198 void SocketIO::interrupt(void)
199 {
200  m_int = 1;
201 }
202 
203 // SocketRecv class
204 
205 SocketRecv::SocketRecv(u_short p)
206 {
207 
208  struct sockaddr_in sa;
209  bzero(&sa, sizeof(struct sockaddr_in));
210 
211  m_errno = 0;
212 
213  sa.sin_family = AF_INET;
214  sa.sin_port = htons(p);
215 
216  int s;
217  if ((s = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
218  m_errno = errno;
219  perror("SocketRecv::socket");
220  return;
221  }
222 
223  int optval = 1;
224  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &optval, 4);
225 
226  int sizeval = D2_SOCKBUF_SIZE;
227  setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
228  setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
229 
230  signal(SIGPIPE, SIG_IGN); // ignore SIGPIPE
231  // printf ( "SocketRecv: SIGPIPE will be ignored\n" );
232  // fflush ( stdout );
233 
234  if ((bind(s, (struct sockaddr*)&sa, sizeof(sa))) < 0) {
235  m_errno = errno;
236  perror("SocketRecv::bind");
237  return;
238  }
239 
240  m_sock = s;
241  m_sender = 0;
242  listen(s, 3);
243  printf("SocketRecv:: initialized, sock=%d\n", m_sock);
244 
245  return;
246 
247 }
248 
249 SocketRecv::~SocketRecv()
250 {
251  if (m_sender > 0) {
252  shutdown(m_sender, 2);
253  ::close(m_sender);
254  printf("SocketRecv:: receiving socket %d closed\n", m_sender);
255  }
256 
257  shutdown(m_sock, 2);
258  ::close(m_sock);
259  printf("SocketRecv:: connection socket %d closed\n", m_sock);
260 }
261 
262 int SocketRecv::reconnect(int ntry)
263 {
264  // Close existing socket once.
265  shutdown(m_sock, 2);
266  ::close(m_sock);
267 
268  // Setup socket parameters again;
269  bzero(&m_sa, sizeof(m_sa));
270  bcopy(m_hp->h_addr, (char*)&m_sa.sin_addr, m_hp->h_length);
271  m_sa.sin_family = m_hp->h_addrtype;
272  m_sa.sin_port = htons((u_short)m_port);
273 
274  // Reopen the socket
275  int s;
276  m_sock = -1;
277  if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
278  m_errno = errno;
279  perror("RSocketRecv:socket");
280  return -3;
281  }
282  int sizeval = D2_SOCKBUF_SIZE;
283  setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
284  setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
285  int yes = 1;
286  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
287 
288  m_sock = s;
289 
290  // Connect again
291  int maxretry = ntry;
292  // printf ("RSocketRecv: reconnecting socket %d, try %d times with 5sec. interval.\n", m_sock, ntry );
293 
294  for (;;) {
295  printf("RSocketRecv: reconnecting (trial %d) \n", ntry - maxretry + 1);
296  int istat = connect(m_sock, (struct sockaddr*)&m_sa, sizeof(m_sa));
297  if (istat >= 0) {
298  printf("RSocketRecv: reconnected\n");
299  return 0;
300  }
301  maxretry--;
302  if (maxretry == 0) return -1;
303  sleep(5);
304  }
305  printf("RSocketRecv: m_sock = %d reconnected.\n", m_sock);
306 }
307 
308 int SocketRecv::accept()
309 {
310  m_errno = 0;
311  struct sockaddr_in isa;
312  socklen_t i = sizeof(isa);
313  getsockname(m_sock, (struct sockaddr*)&isa, &i);
314 
315  int t;
316  if ((t =::accept(m_sock, (struct sockaddr*)&isa, &i)) < 0) {
317  m_errno = errno;
318  return (-1);
319  }
320 
321  printf("SocketRecv:: connection request accepted, sender=%d\n", t);
322 
323  m_sender = t;
324  return (t);
325 }
326 
327 int SocketRecv::close()
328 {
329  // ::close ( m_sender );
330  // m_sender = 0;
331  printf("SocketRecv: destructed, m_sender = %d\n", m_sender);
332  return 0;
333 }
334 
335 int SocketRecv::examine()
336 {
337  // printf("SocketRecv::examine(): waiting for client connection ...\n");
338 
339  m_errno = 0;
340  fd_set ready;
341  FD_ZERO(&ready);
342  FD_SET(m_sock, &ready);
343  struct timeval to;
344  to.tv_sec = 0;
345  to.tv_usec = 0;
346  if (select(FD_SETSIZE, &ready, 0, 0, &to) < 0) {
347  m_errno = errno;
348  perror("select");
349  return (-1);
350  }
351  if (FD_ISSET(m_sock, &ready)) {
352  printf("SocketRecv::connected!!!!\n");
353  return (1);
354  } else
355  return (0);
356 }
357 
358 int SocketRecv::get(char* data, int len)
359 {
360  // printf("SocketSend::get()\n");
361 
362  m_errno = 0;
363  if (m_sender > 0)
364  return m_io.get(m_sender, data, len);
365  else
366  return -1;
367 }
368 
369 int SocketRecv::get_wordbuf(int* data, int len)
370 {
371  // printf("SocketSend::get()\n");
372 
373  m_errno = 0;
374  if (m_sender > 0)
375  return m_io.get_wordbuf(m_sender, data, len);
376  else
377  return -1;
378 }
379 
380 int SocketRecv::read(char* data, int len)
381 {
382  m_errno = 0;
383  if (m_sender > 0)
384  return m_io.read_data(m_sender, data, len);
385  else
386  return -1;
387 }
388 
389 int SocketRecv::put(char* data, int len)
390 {
391  // printf("SocketRecv::put (sd = %d)\n", m_sender);
392  m_errno = 0;
393  if (m_sender > 0)
394  return m_io.put(m_sender, data, len);
395  else
396  return -1;
397 }
398 
399 int SocketRecv::write(char* data, int len)
400 {
401  m_errno = 0;
402  if (m_sender > 0)
403  return m_io.write_data(m_sender, data, len);
404  else
405  return -1;
406 }
407 
408 
409 int SocketRecv::sock() const
410 {
411  return m_sock;
412 }
413 
414 void SocketRecv::sock(int sockid)
415 {
416  m_sock = sockid;
417 }
418 
419 int SocketRecv::sender() const
420 {
421  return m_sender;
422 }
423 
424 int SocketRecv::err() const
425 {
426  return m_errno;
427 }
428 
429 void SocketRecv::interrupt()
430 {
431  m_io.interrupt();
432 }
433 
434 
435 // SocketSend class
436 
437 SocketSend::SocketSend(const char* node, u_short port)
438 {
439  m_errno = 0;
440  m_sock = -1;
441  if ((m_hp = gethostbyname(node)) == NULL) {
442  m_errno = errno;
443  fprintf(stderr,
444  "SocketSend::gethostbyname(%s): not found\n", node);
445  return;
446  }
447 
448  struct sockaddr_in sa;
449  bzero(&sa, sizeof(sa));
450  bcopy(m_hp->h_addr, (char*)&sa.sin_addr, m_hp->h_length);
451  sa.sin_family = m_hp->h_addrtype;
452  sa.sin_port = htons((u_short)port);
453 
454  int s;
455  m_sock = -1;
456  if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
457  m_errno = errno;
458  perror("SocketSend:socket");
459  return;
460  }
461  int sizeval = D2_SOCKBUF_SIZE;
462  setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
463  setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
464 
465  signal(SIGPIPE, SIG_IGN); // ignore SIGPIPE
466  // printf ( "SocketSend: SIGPIPE will be ignored\n" );
467  // fflush ( stdout );
468 
469  int maxretry = 10;
470 tryagain:
471  if (connect(s, (struct sockaddr*)&sa, sizeof(sa)) < 0) {
472  m_errno = errno;
473  perror("SocketSend:connect");
474  printf("connection error..... m_sock set to %d\n", m_sock);
475  if (m_errno == ETIMEDOUT) {
476  printf(".... try again after 5 sec. \n");
477  maxretry--;
478  sleep(5);
479  if (maxretry == 0) exit(1);
480  goto tryagain;
481  }
482  return;
483  }
484 
485  m_sock = s;
486  m_port = port;
487  strcpy(m_node, node);
488  signal(SIGPIPE, SIG_IGN);
489  printf("SocketSend: initialized, m_sock = %d\n", s);
490 }
491 
492 SocketSend::~SocketSend()
493 {
494  shutdown(m_sock, 2);
495  ::close(m_sock);
496  printf("SocketSend: destructed, m_sock = %d\n", m_sock);
497 }
498 
499 int SocketSend::reconnect(int ntry)
500 {
501  // Close existing socket once.
502  shutdown(m_sock, 2);
503  ::close(m_sock);
504 
505  // Setup socket parameters again;
506  bzero(&m_sa, sizeof(m_sa));
507  bcopy(m_hp->h_addr, (char*)&m_sa.sin_addr, m_hp->h_length);
508  m_sa.sin_family = m_hp->h_addrtype;
509  m_sa.sin_port = htons((u_short)m_port);
510 
511  // Reopen the socket
512  int s;
513  m_sock = -1;
514  if ((s = socket(m_hp->h_addrtype, SOCK_STREAM, 0)) < 0) {
515  m_errno = errno;
516  perror("RSocketRecv:socket");
517  return -3;
518  }
519  int sizeval = D2_SOCKBUF_SIZE;
520  setsockopt(s, SOL_SOCKET, SO_SNDBUF, &sizeval, 4);
521  setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sizeval, 4);
522  int yes = 1;
523  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, 4);
524 
525  m_sock = s;
526 
527  // Connect again
528  int maxretry = ntry;
529  // printf ("RSocketRecv: reconnecting socket %d, try %d times with 5sec. interval.\n", m_sock, ntry );
530 
531  for (;;) {
532  printf("SocketSend: reconnecting (trial %d) \n", ntry - maxretry + 1);
533  int istat = connect(m_sock, (struct sockaddr*)&m_sa, sizeof(m_sa));
534  if (istat >= 0) {
535  printf("SocketSend: reconnected\n");
536  return 0;
537  }
538  maxretry--;
539  if (maxretry == 0) return -1;
540  sleep(5);
541  }
542  printf("SocketSend: m_sock = %d reconnected.\n", m_sock);
543 }
544 
545 int SocketSend::get(char* data, int len)
546 {
547  m_errno = 0;
548  return m_io.get(m_sock, data, len);
549 }
550 
551 int SocketSend::get_pxd(char* data, int len)
552 {
553  m_errno = 0;
554  return m_io.get_pxd(m_sock, data, len);
555 }
556 
557 int SocketSend::read(char* data, int len)
558 {
559  m_errno = 0;
560  return m_io.read_data(m_sock, data, len);
561 }
562 
563 int SocketSend::put(char* data, int len)
564 {
565  m_errno = 0;
566  // printf("SocketSend::put (sd = %d)\n", m_sock);
567  return m_io.put(m_sock, data, len);
568 }
569 
570 int SocketSend::put_wordbuf(int* data, int len)
571 {
572  m_errno = 0;
573  // printf("SocketSend::put (sd = %d)\n", m_sock);
574  return m_io.put_wordbuf(m_sock, data, len);
575 }
576 
577 int SocketSend::write(char* data, int len)
578 {
579  m_errno = 0;
580  return m_io.write_data(m_sock, data, len);
581 }
582 
583 char* SocketSend::node()
584 {
585  return m_node;
586 }
587 
588 int SocketSend::port()
589 {
590  return m_port;
591 }
592 
593 int SocketSend::sock()
594 {
595  return m_sock;
596 }
597 
598 void SocketSend::sock(int sockid)
599 {
600  m_sock = sockid;
601 }
602 
603 int SocketSend::err()
604 {
605  return m_errno;
606 }
607 
608 
609 
610 
611 
612 
613 
614 
int get_pxd(int sock, char *data, int len)
Definition: SocketLib.cc:126
Abstract base class for different kinds of events.