Belle II Software  release-08-01-10
merger_merge.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 /* merger_merge.cc */
9 
10 #include <cstdio>
11 #include <cstdlib>
12 #include <unistd.h>
13 #include <cstring>
14 #include <poll.h>
15 #include <csignal>
16 #include <cerrno>
17 #include <netinet/in.h>
18 #include <map>
19 #include <set>
20 #include <vector>
21 #include <arpa/inet.h>
22 #include <fcntl.h>
23 #include <netdb.h>
24 #include <string>
25 
26 #define ROI_MAX_PACKET_SIZE (16384) /* bytes */
27 
28 #define NETWORK_ESTABLISH_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
29 #define NETWORK_IO_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
30 
31 #include <boost/endian/arithmetic.hpp>
32 
33 using namespace std;
34 
35 
36 #define LOG_FPRINTF (fprintf)
37 #define ERR_FPRINTF (fprintf)
38 #define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
39 
40 std::map<int, std::string> myconn;
41 std::map<int, unsigned int> mycount;
42 std::set<int> hltused; // we want a sorted list
43 std::vector<int> hlts; // initialized on run START
44 
45 std::set<int> triggers;
46 unsigned int event_number_max = 0;
47 unsigned int missing_walk_index = 0;
48 
49 bool got_sigusr1 = false;
50 bool got_sigusr2 = false;
51 bool got_sigint = false;
52 bool got_sigpipe = false;
53 bool got_sigterm = false;
54 
55 void
56 dump_binary(FILE* fp, const void* ptr, const size_t size)
57 {
58  const unsigned int* p = (const unsigned int*)ptr;
59  const size_t _size = size / sizeof(unsigned int);
60 
61 
62  for (size_t i = 0; i < _size; i++) {
63  fprintf(fp, "%08x ", p[i]);
64  if (i % 8 == 7) fprintf(fp, "\n");
65  }
66  if (_size % 8 != 0) fprintf(fp, "\n");
67 }
68 
69 
70 static void catch_usr1_function(int /*signo*/)
71 {
72 // puts("SIGUSR1 caught\n");
73  got_sigusr1 = true;
74 }
75 
76 static void catch_usr2_function(int /*signo*/)
77 {
78 // puts("SIGUSR2 caught\n");
79  got_sigusr2 = true;
80 }
81 
82 static void catch_int_function(int /*signo*/)
83 {
84 // puts("SIGINT caught\n");
85  got_sigint = true;
86 }
87 
88 static void catch_term_function(int /*signo*/)
89 {
90 // puts("SIGTERM caught\n");
91  got_sigterm = true;
92 }
93 
94 static void catch_pipe_function(int /*signo*/)
95 {
96 // puts("SIGPIPE caught\n");
97  got_sigpipe = true;
98 }
99 
100 void clear_triggers(void)
101 {
102  triggers.clear();
103  event_number_max = 0;
104  missing_walk_index = 0;
105 }
106 
107 void plot_triggers(void)
108 {
109  int hltcount = hltused.size();
110  std::map<int, int> modmissing;
111  hlts.clear();
112  for (auto h : hltused) hlts.push_back(h);
113  if (!triggers.empty()) {
114  ERR_FPRINTF(stderr, "[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
115  *(--triggers.end()),
116  triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
117  int i = 0;
118  for (auto& it : triggers) {
119  int mod = it % hltcount;
120  modmissing[hlts[mod]]++;
121  if (i < 100) {
122  ERR_FPRINTF(stderr, "[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
123  i++;
124  if (i == 100) {
125  ERR_FPRINTF(stderr, "[WARNING] ... too many missing to report\n");
126  i++;
127  }
128  }
129  }
130  } else {
131  ERR_FPRINTF(stderr, "[RESULT] merger_merge: missing triggers 0\n");
132  }
133  for (auto m : modmissing) {
134  ERR_FPRINTF(stderr, "[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
135  }
136 }
137 
138 void check_event_nr(unsigned int event_number)
139 {
140  // this code might not detect missing trigger nr 0
141  // it is assumed, that run number change has been checked and handled before
142  if (event_number_max < event_number) {
143  for (uint32_t e = event_number_max + 1; e < event_number; e ++) {
144  triggers.insert(e);
145  }
146  event_number_max = event_number;
147  } else {
148  // we dont fill event_number in the if above, thus we dont have to remove it
149  triggers.erase(event_number);
150  }
151  if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
152  missing_walk_index++;
153  } else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
154  missing_walk_index--;
155  }
156 }
157 
158 int
159 b2_timed_blocking_io(const int sd, const int timeout /* secs */)
160 {
161  int ret;
162 
163 
164  if (timeout > 0) {
165  struct timeval tv;
166 
167  tv.tv_sec = timeout;
168  tv.tv_usec = 0;
169  ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
170  if (ret == -1) {
171  ERROR(setsockopt);
172  return -1;
173  }
174 
175  tv.tv_sec = timeout;
176  tv.tv_usec = 0;
177  ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
178  if (ret == -1) {
179  ERROR(setsockopt);
180  return -1;
181  }
182  } else if (timeout == 0) {
183  ret = fcntl(sd, F_SETFL, O_NDELAY);
184  if (ret == -1) {
185  ERROR(fcntl);
186  return -1;
187  }
188  } else if (timeout < 0) {
189  ret = fcntl(sd, F_GETFL, O_NDELAY);
190  if (ret == -1) {
191  ERROR(fcntl);
192  return -1;
193  }
194  ret &= ~O_NDELAY;
195  ret = fcntl(sd, F_SETFL, ret);
196  if (ret == -1) {
197  ERROR(fcntl);
198  return -1;
199  }
200  }
201 
202 
203  return 0;
204 }
205 
206 
207 int
208 b2_build_sockaddr_in(const char* hostname, const unsigned short port, struct sockaddr_in* in)
209 {
210  memset(in, 0, sizeof(struct sockaddr_in));
211 
212  in->sin_family = AF_INET;
213  {
214  struct hostent* hoste;
215  hoste = gethostbyname(hostname);
216  if (!hoste) {
217  ERROR(gethostbyname);
218  return -1;
219  }
220  in->sin_addr = *(struct in_addr*)(hoste->h_addr);
221  }
222  in->sin_port = htons(port);
223 
224 
225  return 0;
226 }
227 
228 
229 static int
230 b2_create_tcp_socket(void)
231 {
232  int sd, ret, one = 1;
233 
234 
235  sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
236  if (sd == -1) {
237  ERROR(socket);
238  return -1;
239  }
240 
241 #if 0
242  ret = b2_timed_blocking_io(sd, 0);
243  if (ret == -1) {
244  ERROR(b2_timed_blocking_io);
245  return -1;
246  }
247 #endif
248 
249  ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
250  if (ret == -1) {
251  ERROR(setsockopt);
252  return -1;
253  }
254 
255 
256  return sd;
257 }
258 
259 
260 int /* returns socket descriptor */
261 b2_create_accept_socket(const unsigned short port) /* in reality DOES NOT accept */
262 {
263  int sd, ret;
264  struct sockaddr_in in;
265 
266 
267  sd = b2_create_tcp_socket();
268  if (sd < 0) {
269  ERROR(b2_create_tcp_socket);
270  return -1;
271  }
272 
273  ret = b2_build_sockaddr_in("0.0.0.0", port, &in);
274  if (ret == -1) {
275  ERROR(b2_build_sockaddr_in);
276  return -1;
277  }
278 
279  ret = bind(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
280  if (ret == -1) {
281  ERROR(bind);
282  return -1;
283  }
284 
285  ret = listen(sd, 1);
286  if (ret == -1) {
287  ERROR(listen);
288  return -1;
289  }
290 
291 
292  return sd;
293 }
294 
295 
296 int /* returns socket descriptor */
297 b2_create_connect_socket(const char* hostname, const unsigned short port)
298 {
299  int sd, ret;
300  struct sockaddr_in in;
301 
302 
303  sd = b2_create_tcp_socket();
304  if (sd < 0) {
305  ERROR(b2_create_tcp_socket);
306  return -1;
307  }
308 
309  ret = b2_build_sockaddr_in(hostname, port, &in);
310  if (ret == -1) {
311  ERROR(b2_build_sockaddr_in);
312  return -1;
313  }
314 
315  ret = connect(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
316  if (ret == -1 && errno != EINPROGRESS) {
317  ERROR(connect);
318  return -1;
319  }
320 
321 
322  return sd;
323 }
324 
325 
326 int
327 b2_send(const int sd, const void* buf, const size_t size)
328 {
329  unsigned char* ptr = (unsigned char*)buf;
330  size_t n_bytes_remained = size;
331 
332 
333  for (;;) {
334  int ret, n_bytes_send;
335 
336  ret = send(sd, ptr, n_bytes_remained, 0);
337  if (ret == -1 && errno != EINTR) {
338  ERROR(send);
339  return -1;
340  }
341  if (ret == -1 && errno == EINTR) {
342  fprintf(stderr, "%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
343  return -1;
344  }
345  if (ret == 0) {
346  fprintf(stderr, "%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
347  return -1;
348  }
349 
350  n_bytes_send = ret;
351  ptr += n_bytes_send;
352 
353  if (n_bytes_remained < size_t(n_bytes_send))
354  /* overrun: internal error */
355  {
356  fprintf(stderr, "%s:%d: send(): Internal error\n", __FILE__, __LINE__);
357  return -1;
358  }
359  n_bytes_remained -= n_bytes_send;
360 
361  if (n_bytes_remained == 0)
362  /* fully sendout */
363  {
364  break;
365  }
366  }
367 
368 
369  return size;
370 }
371 
372 
373 int
374 b2_recv(const int sd, void* buf, const size_t size)
375 {
376  unsigned char* ptr = (unsigned char*)buf;
377  size_t n_bytes_remained = size;
378 
379 
380  for (;;) {
381  int ret, n_bytes_recv;
382 
383  ret = recv(sd, ptr, n_bytes_remained, 0);
384  if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
385  ERROR(recv);
386  return -1;
387  }
388  if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
389  fprintf(stderr, "%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
390  return -1;
391  }
392  if (ret == 0) {
393  fprintf(stderr, "%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
394  return -1;
395  }
396 
397  n_bytes_recv = ret;
398  ptr += n_bytes_recv;
399  if (n_bytes_remained < size_t(n_bytes_recv))
400  /* overrun: internal error */
401  {
402  fprintf(stderr, "%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
403  return -1;
404  }
405  n_bytes_remained -= n_bytes_recv;
406 
407  if (n_bytes_remained == 0)
408  /* fully readout */
409  {
410  break;
411  }
412  }
413 
414 
415  return size;
416 }
417 
418 
419 
420 static int
421 MM_init_connect_to_onsen(const char* host, const unsigned int port)
422 {
423  int sd, ret;
424  struct pollfd fds;
425 
426 
427  sd = b2_create_connect_socket(host, port);
428  if (sd == -1) {
429  ERROR(b2_create_connect_socket);
430  return -1;
431  }
432 
433  fds.fd = sd;
434  fds.events = POLLOUT;
435  fds.revents = 0;
436  ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
437  switch (ret) {
438  case -1:
439  ERROR(poll);
440  return -1;
441 
442  case 0:
443  ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
444  return -1;
445 
446  case 1: {
447  int connection_error;
448  socklen_t optlen;
449 
450  optlen = sizeof(connection_error);
451  ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
452  if (ret == -1) {
453  ERROR(getsockopt);
454  return -1;
455  }
456  if (connection_error) {
457  ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
458  return -1;
459  }
460 
461  break;
462  }
463 
464  default:
465  ERR_FPRINTF(stderr, "[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
466  return -1;
467  }
468 
469  ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT /* secs */);
470  if (ret == -1) {
471  ERROR(b2_timed_blocking_io);
472  return -1;
473  }
474 
475 
476  return sd;
477 }
478 
479 
480 static int
481 MM_init_accept_from_hltout2merger(const unsigned int port)
482 {
483  int sd;
484  int one = 1, ret;
485  // struct pollfd fds;
486 
487 
488  LOG_FPRINTF(stderr, "[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
489 
490  sd = b2_create_accept_socket(port);
491  if (sd == -1) {
492  ERROR(b2_create_accept_socket);
493  return -1;
494  }
495 
496  ret = b2_timed_blocking_io(sd,
497  1); // This means, if the socket blocks longer than Xs, it will return a EAGAIN or EWOULDBLOCK (immediately)
498  if (ret == -1) {
499  ERROR(b2_timed_blocking_io);
500  return -1;
501  }
502 
503  ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
504  if (ret == -1) {
505  ERROR(setsockopt);
506  return -1;
507  }
508 
509  /*
510  fds.fd = sd;
511  fds.events = POLLIN;
512  fds.revents = 0;
513  ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
514  switch (ret) {
515  case -1:
516  ERROR(poll);
517  return -1;
518 
519  case 0:
520  ERR_FPRINTF(stderr, "merger_merge: accept(): Connection timed out\n");
521  return -1;
522 
523  case 1: {
524  int ret, connection_error;
525  socklen_t optlen;
526 
527  optlen = sizeof(connection_error);
528  ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
529  if (ret == -1) {
530  ERROR(getsockopt);
531  return -1;
532  }
533  if (connection_error) {
534  ERR_FPRINTF(stderr, "merger_merge: accept(): %s\n", strerror(errno));
535  return -1;
536  }
537 
538  break;
539  }
540 
541  default:
542  ERR_FPRINTF(stderr, "merger_merge: poll(): Unexpected error\n");
543  return -1;
544  }
545  */
546 
547  /* Skip accept
548  nd = accept(sd, NULL, NULL);
549  if (nd == -1) {
550  ERROR(accept);
551  return -1;
552  }
553 
554  close(sd);
555 
556  ret = b2_timed_blocking_io(nd, NETWORK_IO_TIMEOUT / * secs * /);
557  if (ret == -1) {
558  ERROR(b2_timed_blocking_io);
559  return -1;
560  }
561  */
562 
563 
564  return sd;
565  // return nd;
566 }
567 
568 
569 static int
570 MM_get_packet(const int sd_acc, unsigned char* buf)
571 {
572  unsigned int header[2] = {}; // length is second word, thus read two
573 
574  int ret = recv(sd_acc, &header, sizeof(unsigned int) * 2, MSG_PEEK);
575  if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
576  ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Packet receive timed out\n");
577  return -1;
578  }
579  if (ret != 2 * sizeof(unsigned int)) {
580  ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
581  return -2;
582  }
583 
586 
587  size_t n_bytes_from_hltout = 2 * sizeof(unsigned int) + ntohl(header[1]);// OFFSET_LENGTH = 1
588 
589  ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
590  if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
591  ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
592  return -1;
593  }
594  if (size_t(ret) != n_bytes_from_hltout) {
595  ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
596  return -2;
597  }
598 
599  return ret;
600 }
601 
602 
603 static int
604 MM_term_connect_to_onsen(const int sd)
605 {
606  return close(sd);
607 }
608 
609 void print_stat(void)
610 {
611  ERR_FPRINTF(stderr, "[INFO] --- STAT START ---\n");
612  unsigned int sum = 0;
613  for (auto& it : mycount) {
614  ERR_FPRINTF(stderr, "[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
615  if (it.first != 0) sum += it.second;
616  }
617  ERR_FPRINTF(stderr, "[INFO] sum %u out %u diff %d\n", sum, mycount[0], (int)(mycount[0] - sum));
618  plot_triggers();
619  ERR_FPRINTF(stderr, "[INFO] --- STAT END ---\n");
620 }
621 
622 // Main
623 
624 int
625 main(int argc, char* argv[])
626 {
627  int current_runnr = -1; // problem: handover without abort
628  // int n_hltout = 0;
629  int sd_acc = -1;
630  int sd_con = -1;
631  int need_reconnection_to_onsen = 1;
632  int event_count = 0;
633  int connected_hlts = 0;
634  bool stop_running = false;
635 
636  char onsen_host[1024];
637  unsigned short onsen_port;
638  /* PC test */
639  /* strcpy(onsen_host, "10.10.10.1"); */
640  /* onsen_port = 1024; */
641  /* real ONSEN */
642  /* strcpy(onsen_host, "10.10.10.80"); */
643  /* onsen_port = 24; */
644  // unsigned short accept_port[MM_MAX_HLTOUT];
645  unsigned short accept_port;
646 
647  LOG_FPRINTF(stderr, "[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
648 
649  if (argc < 4) {
650  ERR_FPRINTF(stderr, "[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
651  exit(1);
652  }
653 
654  /* argv copy */
655  char* p;
656 
657 // p = argv[1];
658 // strcpy(shmname, p);
659 //
660 // p = argv[2];
661 // shmid = atoi(p);
662 
663  p = argv[3];
664  strcpy(onsen_host, p);
665 
666  p = argv[4];
667  onsen_port = atoi(p);
668 
669  p = argv[5];
670  accept_port = atoi(p);
671 
672  signal(SIGPIPE, catch_pipe_function);
673  signal(SIGTERM, catch_term_function);
674  signal(SIGINT, catch_int_function);
675  signal(SIGUSR1, catch_usr1_function);
676  signal(SIGUSR2, catch_usr2_function);
677 
678  /* Create a port to accept connections*/
679  sd_acc = MM_init_accept_from_hltout2merger(accept_port);
680  LOG_FPRINTF(stderr, "[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
681 
682 
683  /* RoI transmission loop */
684  // RoI packets
685  size_t n_bytes_from_hltout;
686  size_t n_bytes_to_onsen;
687 
688  unsigned char* buf = (unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
689  if (!buf) {
690  ERROR(valloc);
691  exit(1);
692  }
693 
694  // Loop forever for ONSEN connection
695  bool connected = false;
696  while (!connected) {
697  // Connect to ONSEN if not
698  if (need_reconnection_to_onsen) {
699  /* in case of sd_con is connected, disconnect it */
700  if (sd_con != -1) close(sd_con);
701 
702  /* connect to onsen untill connected */
703  for (;;) {
704  int sleep_sec = 2;
705  sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
706  // sd_con = 6;
707  if (sd_con != -1) {
708  /* connected: move to the recv->send loop */
709  need_reconnection_to_onsen = 0;
710  event_count = 0;
711  LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
712  connected = true;
713  break;
714  }
715 
716  ERR_FPRINTF(stderr, "[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
717  sleep(sleep_sec);
718 
719  /* retry connection */
720  continue;
721  }
722  }
723  }
724 
725  // Preparation for select()
726 
727  // printf("Starting select() loop\n") ;
728  fflush(stderr);
729  fflush(stdout);
730 
731  clear_triggers();
732 
733  fd_set allset;
734  FD_ZERO(&allset);
735  FD_SET(sd_acc, &allset);
736  int maxfd = sd_acc;
737  int minfd = sd_acc;
738  fd_set rset;//, wset;
739 
740  // Handle Obtain ROI and send it to ONSEN
741  while (!stop_running) {
742  memcpy(&rset, &allset, sizeof(rset));
743  // memcpy(&wset, &allset, sizeof(wset));
744 
745  // struct timeval timeout;
746  // timeout.tv_sec = 0; // 1sec
747  // timeout.tv_usec = 1000; // 1msec (in microsec)
748  // printf ( "Select(): maxfd = %d, start select...; rset=%x, wset=%x\n", maxfd, rset, wset);
749  int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
750  // printf ( "Select(): returned with %d, rset = %8.8x\n", rc, rset );
751  if (got_sigusr1) {
752  got_sigusr1 = false;
753  ERR_FPRINTF(stderr, "[INFO] Got SIGUSR1, Run START\n");
754  print_stat();
755  }
756  if (got_sigusr2) {
757  got_sigusr2 = false;
758  ERR_FPRINTF(stderr, "[INFO] Got SIGUSR2, Run STOP\n");
759  print_stat();
760  }
761  if (got_sigint) {
762  got_sigint = false;
763  ERR_FPRINTF(stderr, "[INFO] Got SIGINT, ABORT\n");
764  print_stat();
765  }
766  if (got_sigpipe) {
767  got_sigpipe = false;
768  ERR_FPRINTF(stderr, "[INFO] Got SIGPIPE\n");
769  }
770  if (got_sigterm) {
771  got_sigterm = false;
772  ERR_FPRINTF(stderr, "[INFO] Got SIGTERM\n");
773  }
774  if (rc < 0) {
775  perror("select");
776  continue;
777  } else if (rc == 0) { // timeout
778  continue;
779  }
780 
781  if (FD_ISSET(sd_acc, &rset)) { // new connection
782  int t;
783  struct sockaddr_in isa;
784  socklen_t i = sizeof(isa);
785  getsockname(sd_acc, (struct sockaddr*)&isa, &i);
786  if ((t =::accept(sd_acc, (struct sockaddr*)&isa, &i)) < 0) {
787  // m_errno = errno;
788  ERR_FPRINTF(stderr, "[ERROR] Error on accepting new connection\n");
789  ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
790  return (-1);
791  }
792  LOG_FPRINTF(stderr, "[INFO] New socket connection t=%d\n", t);
793  char address[INET_ADDRSTRLEN];
794  inet_ntop(AF_INET, &isa.sin_addr, address, sizeof(address));
795  connected_hlts++;
796 
797  LOG_FPRINTF(stderr, "[INFO] %d is IP <%s>\n", t, address);
798  myconn[t] = address;
799  // Assume IP4 and take last decimal as HLT number
800  char* ptr = strrchr(address, '.');
801  if (ptr) {
802  int nr = atoi(ptr + 1);
803  hltused.insert(nr);
804  }
805 
806  fflush(stdout);
807  FD_SET(t, &allset);
808  if (minfd == sd_acc) minfd = t;
809  if (t > maxfd) maxfd = t;
810  continue;
811  } else {
812  for (int fd = minfd; fd < maxfd + 1; fd++) {
813  n_bytes_from_hltout = 0;
814  if (FD_ISSET(fd, &rset)) {
815  // printf ( "fd is available for reading = %d\n", fd );
816  /* recv packet */
817  int ret;
818  ret = MM_get_packet(fd, buf);
819  if (ret < 0) { // -2 will not have a errno set
820  if (ret == -1) {
821  ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
822  } else {
823  ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
824  }
825  ERR_FPRINTF(stderr, "[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
826  myconn[fd].c_str());
827  /* connection from HLT is lost */
828  // using exit here will dump ALL still open connections from other HLTs
829  // rethink if we dont exit but process these to limit loss of events.
830  // ERR_FPRINTF(stderr, "%s terminated\n", argv[0]);
831  // exit(1);
832  close(fd);
833  FD_CLR(fd, &allset); //opposite of FD_SET
834  connected_hlts--;
835  if (connected_hlts == 0) stop_running = true;
836  ret = 0;
837  }
838  n_bytes_from_hltout = ret;
839 
840  // printf ( "RoI received : Event count = % d\n", event_count );
841  if (ret > 0) {
842  mycount[fd]++;
843  if (0 /*event_count < 5 || event_count % 100000 == 0*/) {
844  LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
845  LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
846  dump_binary(stderr, buf, n_bytes_from_hltout);
847  }
848  }
849  }
850 
851  /* send packet */
852  if (n_bytes_from_hltout > 0) {
853  int ret;
854  unsigned char* ptr_head_to_onsen = buf;
855 
856  // extract trigger number, run+exp nr and fill it in a table.
857 
858  int runnr = 0;
859  int eventnr = 0;
860 
861  // From ROIpayload.h
862  // enum { OFFSET_MAGIC = 0, OFFSET_LENGTH = 1, OFFSET_HEADER = 2, OFFSET_TRIGNR = 3, OFFSET_RUNNR = 4, OFFSET_ROIS = 5};
863  // enum { HEADER_SIZE_WO_LENGTH = 3, HEADER_SIZE_WITH_LENGTH = 5, HEADER_SIZE_WITH_LENGTH_AND_CRC = 6};
864 
865  if (n_bytes_from_hltout >= 6 * 4) {
866  auto* iptr = (boost::endian::big_uint32_t*)ptr_head_to_onsen;
867  eventnr = iptr[3];
868  runnr = (iptr[4] & 0x3FFF00) >> 8;
869 // ERR_FPRINTF(stderr, "%08X %08X %08X %08X %08X -> %08X %08X \n",
870 // (unsigned int)iptr[0],(unsigned int) iptr[1],(unsigned int) iptr[2],(unsigned int) iptr[3],(unsigned int) iptr[4], eventnr, runnr);
871  } else {
872  LOG_FPRINTF(stderr, "[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
873  }
874 
875  if (runnr > current_runnr) {
876  ERR_FPRINTF(stderr, "[WARNING] merger_merge: run number increases: got %d current %d trig %d\n", runnr, current_runnr,
877  eventnr);
878  print_stat();
879  clear_triggers();
880  current_runnr = runnr;
881  } else if (runnr < current_runnr) {
882  // got some event from old run
883  ERR_FPRINTF(stderr, "[WARNING] merger_merge: got trigger from older run: got %d current %d trig %d\n", runnr, current_runnr,
884  eventnr);
885  }
886 
887  if (runnr == current_runnr) {
888  // seperate if, as we might set it in the if above
889  check_event_nr(eventnr);
890  } // if we end the if here, we will send out old events to ONSEN!
891 
892  n_bytes_to_onsen = n_bytes_from_hltout;
893  while (1) {
894  ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
895  if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
896  ERR_FPRINTF(stderr, "[WARNING] merger_merge: socket buffer full, retry\n");
897  sleep(1);// Bad hack, wait a second
898  } else break;
899  }
900 
901  if (ret == -1) {
902  ERROR(b2_send);
903  need_reconnection_to_onsen = 1;
904  event_count = 0;
905  ERR_FPRINTF(stderr, "[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
906  free(buf);
907  ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
908  print_stat();
909  exit(1);
910  }
911  if (ret == 0) {
912  ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_send(): Connection closed\n");
913  need_reconnection_to_onsen = 1;
914  event_count = 0;
915  free(buf);
916  ERR_FPRINTF(stderr, "[ERROR] Connection to ONSEN was closed on ONSEN side\n");
917  ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
918  print_stat();
919  exit(1);
920  }
921 
922  mycount[0]++;
923  if (0 /*event_count < 5 || event_count % 10000 == 0*/) {
924  LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
925  dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
926  }
927  // } // if we end if here, we will NOT send old events to onsen, but only after we received the first new event
928  }
929  }
930  event_count++;
931  if (event_count % 10000 == 0) {
932  int hltcount = hltused.size();
933  int mod = *triggers.begin() % hltcount;
934  if (triggers.empty()) {
935  // workaround for empty vector, but keep same structure for monitor parsing (kibana)
936  ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
937  event_number_max, event_number_max, missing_walk_index, triggers.size(),
938  0, event_number_max, mod, hlts[mod]);
939  } else {
940  ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
941  *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
942  *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
943  }
944  }
945  }
946  }
947 
948 
949  /* termination: never reached */
950  MM_term_connect_to_onsen(sd_con);
951 
952  if (connected_hlts == 0) {
953  ERR_FPRINTF(stderr, "[RESULT] Stopped because all HLTs closed connection\n");
954  }
955  print_stat();
956  ERR_FPRINTF(stderr, "[RESULT] %s terminated\n", argv[0]);
957  return 0;
958 }
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91