Belle II Software  release-08-02-06
merger_merge.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 /* merger_merge.cc */
9 
10 #include <cstdio>
11 #include <cstdlib>
12 #include <unistd.h>
13 #include <cstring>
14 #include <poll.h>
15 #include <csignal>
16 #include <cerrno>
17 #include <netinet/in.h>
18 #include <map>
19 #include <set>
20 #include <vector>
21 #include <arpa/inet.h>
22 #include <fcntl.h>
23 #include <netdb.h>
24 #include <string>
25 
26 #define ROI_MAX_PACKET_SIZE (16384) /* bytes */
27 
28 #define NETWORK_ESTABLISH_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
29 #define NETWORK_IO_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
30 
31 #include <boost/endian/arithmetic.hpp>
32 
33 using namespace std;
34 
35 
36 #define LOG_FPRINTF (fprintf)
37 #define ERR_FPRINTF (fprintf)
38 #define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
39 
40 std::map<int, std::string> myconn;
41 std::map<int, unsigned int> mycount;
42 std::set<int> hltused; // we want a sorted list
43 std::vector<int> hlts; // initialized on run START
44 
45 std::set<int> triggers;
46 unsigned int event_number_max = 0;
47 unsigned int missing_walk_index = 0;
48 bool enable_check = true;
49 
50 bool got_sigusr1 = false;
51 bool got_sigusr2 = false;
52 bool got_sigint = false;
53 bool got_sigpipe = false;
54 bool got_sigterm = false;
55 
56 void
57 dump_binary(FILE* fp, const void* ptr, const size_t size)
58 {
59  const unsigned int* p = (const unsigned int*)ptr;
60  const size_t _size = size / sizeof(unsigned int);
61 
62 
63  for (size_t i = 0; i < _size; i++) {
64  fprintf(fp, "%08x ", p[i]);
65  if (i % 8 == 7) fprintf(fp, "\n");
66  }
67  if (_size % 8 != 0) fprintf(fp, "\n");
68 }
69 
70 
71 static void catch_usr1_function(int /*signo*/)
72 {
73 // puts("SIGUSR1 caught\n");
74  got_sigusr1 = true;
75 }
76 
77 static void catch_usr2_function(int /*signo*/)
78 {
79 // puts("SIGUSR2 caught\n");
80  got_sigusr2 = true;
81 }
82 
83 static void catch_int_function(int /*signo*/)
84 {
85 // puts("SIGINT caught\n");
86  got_sigint = true;
87 }
88 
89 static void catch_term_function(int /*signo*/)
90 {
91 // puts("SIGTERM caught\n");
92  got_sigterm = true;
93 }
94 
95 static void catch_pipe_function(int /*signo*/)
96 {
97 // puts("SIGPIPE caught\n");
98  got_sigpipe = true;
99 }
100 
101 void clear_triggers(void)
102 {
103  triggers.clear();
104  event_number_max = 0;
105  missing_walk_index = 0;
106 }
107 
108 void plot_triggers(void)
109 {
110  int hltcount = hltused.size();
111  std::map<int, int> modmissing;
112  hlts.clear();
113  for (auto h : hltused) hlts.push_back(h);
114  if (!triggers.empty()) {
115  ERR_FPRINTF(stderr, "[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
116  *(--triggers.end()),
117  triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
118  int i = 0;
119  for (auto& it : triggers) {
120  int mod = it % hltcount;
121  modmissing[hlts[mod]]++;
122  if (i < 100) {
123  ERR_FPRINTF(stderr, "[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
124  i++;
125  if (i == 100) {
126  ERR_FPRINTF(stderr, "[WARNING] ... too many missing to report\n");
127  i++;
128  }
129  }
130  }
131  } else {
132  ERR_FPRINTF(stderr, "[RESULT] merger_merge: missing triggers 0\n");
133  }
134  for (auto m : modmissing) {
135  ERR_FPRINTF(stderr, "[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
136  }
137 }
138 
139 void check_event_nr(unsigned int event_number)
140 {
141  // this code might not detect missing trigger nr 0
142  // it is assumed, that run number change has been checked and handled before
143  if (event_number_max < event_number) {
144  for (uint32_t e = event_number_max + 1; e < event_number; e ++) {
145  triggers.insert(e);
146  }
147  event_number_max = event_number;
148  } else {
149  // we dont fill event_number in the if above, thus we dont have to remove it
150  triggers.erase(event_number);
151  }
152  if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
153  missing_walk_index++;
154  } else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
155  missing_walk_index--;
156  }
157  if (triggers.size() > 50000) {
158  // diable to avoid slow-down by too many in-flight triggers
159  enable_check = false;
160  ERR_FPRINTF(stderr, "[ERROR] Too many in-flight triggers -> disable checking until next run\n");
161  }
162 }
163 
164 int
165 b2_timed_blocking_io(const int sd, const int timeout /* secs */)
166 {
167  int ret;
168 
169  if (timeout > 0) {
170  struct timeval tv;
171 
172  tv.tv_sec = timeout;
173  tv.tv_usec = 0;
174  ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
175  if (ret == -1) {
176  ERROR(setsockopt);
177  return -1;
178  }
179 
180  tv.tv_sec = timeout;
181  tv.tv_usec = 0;
182  ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
183  if (ret == -1) {
184  ERROR(setsockopt);
185  return -1;
186  }
187  } else if (timeout == 0) {
188  ret = fcntl(sd, F_SETFL, O_NDELAY);
189  if (ret == -1) {
190  ERROR(fcntl);
191  return -1;
192  }
193  } else if (timeout < 0) {
194  ret = fcntl(sd, F_GETFL, O_NDELAY);
195  if (ret == -1) {
196  ERROR(fcntl);
197  return -1;
198  }
199  ret &= ~O_NDELAY;
200  ret = fcntl(sd, F_SETFL, ret);
201  if (ret == -1) {
202  ERROR(fcntl);
203  return -1;
204  }
205  }
206 
207  return 0;
208 }
209 
210 
211 int
212 b2_build_sockaddr_in(const char* hostname, const unsigned short port, struct sockaddr_in* in)
213 {
214  memset(in, 0, sizeof(struct sockaddr_in));
215 
216  in->sin_family = AF_INET;
217  {
218  struct hostent* hoste;
219  hoste = gethostbyname(hostname);
220  if (!hoste) {
221  ERROR(gethostbyname);
222  return -1;
223  }
224  in->sin_addr = *(struct in_addr*)(hoste->h_addr);
225  }
226  in->sin_port = htons(port);
227 
228  return 0;
229 }
230 
231 
232 static int
233 b2_create_tcp_socket(void)
234 {
235  int sd, ret, one = 1;
236 
237  sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
238  if (sd == -1) {
239  ERROR(socket);
240  return -1;
241  }
242 
243 #if 0
244  ret = b2_timed_blocking_io(sd, 0);
245  if (ret == -1) {
246  ERROR(b2_timed_blocking_io);
247  return -1;
248  }
249 #endif
250 
251  ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
252  if (ret == -1) {
253  ERROR(setsockopt);
254  return -1;
255  }
256 
257  return sd;
258 }
259 
260 
261 int /* returns socket descriptor */
262 b2_create_accept_socket(const unsigned short port) /* in reality DOES NOT accept */
263 {
264  int sd, ret;
265  struct sockaddr_in in;
266 
267  sd = b2_create_tcp_socket();
268  if (sd < 0) {
269  ERROR(b2_create_tcp_socket);
270  return -1;
271  }
272 
273  ret = b2_build_sockaddr_in("0.0.0.0", port, &in);
274  if (ret == -1) {
275  ERROR(b2_build_sockaddr_in);
276  return -1;
277  }
278 
279  ret = bind(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
280  if (ret == -1) {
281  ERROR(bind);
282  return -1;
283  }
284 
285  ret = listen(sd, 1);
286  if (ret == -1) {
287  ERROR(listen);
288  return -1;
289  }
290 
291  return sd;
292 }
293 
294 
295 int /* returns socket descriptor */
296 b2_create_connect_socket(const char* hostname, const unsigned short port)
297 {
298  int sd, ret;
299  struct sockaddr_in in;
300 
301  sd = b2_create_tcp_socket();
302  if (sd < 0) {
303  ERROR(b2_create_tcp_socket);
304  return -1;
305  }
306 
307  ret = b2_build_sockaddr_in(hostname, port, &in);
308  if (ret == -1) {
309  ERROR(b2_build_sockaddr_in);
310  return -1;
311  }
312 
313  ret = connect(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
314  if (ret == -1 && errno != EINPROGRESS) {
315  ERROR(connect);
316  return -1;
317  }
318 
319  return sd;
320 }
321 
322 
323 int
324 b2_send(const int sd, const void* buf, const size_t size)
325 {
326  unsigned char* ptr = (unsigned char*)buf;
327  size_t n_bytes_remained = size;
328 
329  for (;;) {
330  int ret, n_bytes_send;
331 
332  ret = send(sd, ptr, n_bytes_remained, 0);
333  if (ret == -1 && errno != EINTR) {
334  ERROR(send);
335  return -1;
336  }
337  if (ret == -1 && errno == EINTR) {
338  fprintf(stderr, "%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
339  return -1;
340  }
341  if (ret == 0) {
342  fprintf(stderr, "%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
343  return -1;
344  }
345 
346  n_bytes_send = ret;
347  ptr += n_bytes_send;
348 
349  if (n_bytes_remained < size_t(n_bytes_send))
350  /* overrun: internal error */
351  {
352  fprintf(stderr, "%s:%d: send(): Internal error\n", __FILE__, __LINE__);
353  return -1;
354  }
355  n_bytes_remained -= n_bytes_send;
356 
357  if (n_bytes_remained == 0)
358  /* fully sendout */
359  {
360  break;
361  }
362  }
363 
364  return size;
365 }
366 
367 
368 int
369 b2_recv(const int sd, void* buf, const size_t size)
370 {
371  unsigned char* ptr = (unsigned char*)buf;
372  size_t n_bytes_remained = size;
373 
374  for (;;) {
375  int ret, n_bytes_recv;
376 
377  ret = recv(sd, ptr, n_bytes_remained, 0);
378  if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
379  ERROR(recv);
380  return -1;
381  }
382  if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
383  fprintf(stderr, "%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
384  return -1;
385  }
386  if (ret == 0) {
387  fprintf(stderr, "%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
388  return -1;
389  }
390 
391  n_bytes_recv = ret;
392  ptr += n_bytes_recv;
393  if (n_bytes_remained < size_t(n_bytes_recv))
394  /* overrun: internal error */
395  {
396  fprintf(stderr, "%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
397  return -1;
398  }
399  n_bytes_remained -= n_bytes_recv;
400 
401  if (n_bytes_remained == 0)
402  /* fully readout */
403  {
404  break;
405  }
406  }
407 
408  return size;
409 }
410 
411 
412 
413 static int
414 MM_init_connect_to_onsen(const char* host, const unsigned int port)
415 {
416  int sd, ret;
417  struct pollfd fds;
418 
419  sd = b2_create_connect_socket(host, port);
420  if (sd == -1) {
421  ERROR(b2_create_connect_socket);
422  return -1;
423  }
424 
425  fds.fd = sd;
426  fds.events = POLLOUT;
427  fds.revents = 0;
428  ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
429  switch (ret) {
430  case -1:
431  ERROR(poll);
432  return -1;
433 
434  case 0:
435  ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
436  return -1;
437 
438  case 1: {
439  int connection_error;
440  socklen_t optlen;
441 
442  optlen = sizeof(connection_error);
443  ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
444  if (ret == -1) {
445  ERROR(getsockopt);
446  return -1;
447  }
448  if (connection_error) {
449  ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
450  return -1;
451  }
452 
453  break;
454  }
455 
456  default:
457  ERR_FPRINTF(stderr, "[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
458  return -1;
459  }
460 
461  ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT /* secs */);
462  if (ret == -1) {
463  ERROR(b2_timed_blocking_io);
464  return -1;
465  }
466 
467  return sd;
468 }
469 
470 
471 static int
472 MM_init_accept_from_hltout2merger(const unsigned int port)
473 {
474  int sd;
475  int one = 1, ret;
476  // struct pollfd fds;
477 
478  LOG_FPRINTF(stderr, "[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
479 
480  sd = b2_create_accept_socket(port);
481  if (sd == -1) {
482  ERROR(b2_create_accept_socket);
483  return -1;
484  }
485 
486  ret = b2_timed_blocking_io(sd,
487  1); // This means, if the socket blocks longer than Xs, it will return a EAGAIN or EWOULDBLOCK (immediately)
488  if (ret == -1) {
489  ERROR(b2_timed_blocking_io);
490  return -1;
491  }
492 
493  ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
494  if (ret == -1) {
495  ERROR(setsockopt);
496  return -1;
497  }
498 
499  /*
500  fds.fd = sd;
501  fds.events = POLLIN;
502  fds.revents = 0;
503  ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
504  switch (ret) {
505  case -1:
506  ERROR(poll);
507  return -1;
508 
509  case 0:
510  ERR_FPRINTF(stderr, "merger_merge: accept(): Connection timed out\n");
511  return -1;
512 
513  case 1: {
514  int ret, connection_error;
515  socklen_t optlen;
516 
517  optlen = sizeof(connection_error);
518  ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
519  if (ret == -1) {
520  ERROR(getsockopt);
521  return -1;
522  }
523  if (connection_error) {
524  ERR_FPRINTF(stderr, "merger_merge: accept(): %s\n", strerror(errno));
525  return -1;
526  }
527 
528  break;
529  }
530 
531  default:
532  ERR_FPRINTF(stderr, "merger_merge: poll(): Unexpected error\n");
533  return -1;
534  }
535  */
536 
537  /* Skip accept
538  nd = accept(sd, NULL, NULL);
539  if (nd == -1) {
540  ERROR(accept);
541  return -1;
542  }
543 
544  close(sd);
545 
546  ret = b2_timed_blocking_io(nd, NETWORK_IO_TIMEOUT / * secs * /);
547  if (ret == -1) {
548  ERROR(b2_timed_blocking_io);
549  return -1;
550  }
551  */
552 
553  return sd;
554  // return nd;
555 }
556 
557 
558 static int
559 MM_get_packet(const int sd_acc, unsigned char* buf)
560 {
561  unsigned int header[2] = {}; // length is second word, thus read two
562 
563  int ret = recv(sd_acc, &header, sizeof(unsigned int) * 2, MSG_PEEK);
564  if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
565  ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Packet receive timed out\n");
566  return -1;
567  }
568  if (ret != 2 * sizeof(unsigned int)) {
569  ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
570  return -2;
571  }
572 
575 
576  size_t n_bytes_from_hltout = 2 * sizeof(unsigned int) + ntohl(header[1]);// OFFSET_LENGTH = 1
577 
578  ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
579  if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
580  ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
581  return -1;
582  }
583  if (size_t(ret) != n_bytes_from_hltout) {
584  ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
585  return -2;
586  }
587 
588  return ret;
589 }
590 
591 
592 static int
593 MM_term_connect_to_onsen(const int sd)
594 {
595  return close(sd);
596 }
597 
598 void print_stat(void)
599 {
600  ERR_FPRINTF(stderr, "[INFO] --- STAT START ---\n");
601  unsigned int sum = 0;
602  for (auto& it : mycount) {
603  ERR_FPRINTF(stderr, "[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
604  if (it.first != 0) sum += it.second;
605  }
606  ERR_FPRINTF(stderr, "[INFO] sum %u out %u diff %d\n", sum, mycount[0], (int)(mycount[0] - sum));
607  plot_triggers();
608  ERR_FPRINTF(stderr, "[INFO] --- STAT END ---\n");
609 }
610 
611 // Main
612 
613 int
614 main(int argc, char* argv[])
615 {
616  int current_runnr = -1; // problem: handover without abort
617  // int n_hltout = 0;
618  int sd_acc = -1;
619  int sd_con = -1;
620  int need_reconnection_to_onsen = 1;
621  int event_count = 0;
622  int connected_hlts = 0;
623  bool stop_running = false;
624 
625  char onsen_host[1024];
626  unsigned short onsen_port;
627  /* PC test */
628  /* strcpy(onsen_host, "10.10.10.1"); */
629  /* onsen_port = 1024; */
630  /* real ONSEN */
631  /* strcpy(onsen_host, "10.10.10.80"); */
632  /* onsen_port = 24; */
633  // unsigned short accept_port[MM_MAX_HLTOUT];
634  unsigned short accept_port;
635 
636  LOG_FPRINTF(stderr, "[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
637 
638  if (argc < 4) {
639  ERR_FPRINTF(stderr, "[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
640  exit(1);
641  }
642 
643  /* argv copy */
644  char* p;
645 
646 // p = argv[1];
647 // strcpy(shmname, p);
648 //
649 // p = argv[2];
650 // shmid = atoi(p);
651 
652  p = argv[3];
653  strcpy(onsen_host, p);
654 
655  p = argv[4];
656  onsen_port = atoi(p);
657 
658  p = argv[5];
659  accept_port = atoi(p);
660 
661  signal(SIGPIPE, catch_pipe_function);
662  signal(SIGTERM, catch_term_function);
663  signal(SIGINT, catch_int_function);
664  signal(SIGUSR1, catch_usr1_function);
665  signal(SIGUSR2, catch_usr2_function);
666 
667  /* Create a port to accept connections*/
668  sd_acc = MM_init_accept_from_hltout2merger(accept_port);
669  LOG_FPRINTF(stderr, "[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
670 
671 
672  /* RoI transmission loop */
673  // RoI packets
674  size_t n_bytes_from_hltout;
675  size_t n_bytes_to_onsen;
676 
677  unsigned char* buf = (unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
678  if (!buf) {
679  ERROR(valloc);
680  exit(1);
681  }
682 
683  // Loop forever for ONSEN connection
684  bool connected = false;
685  while (!connected) {
686  // Connect to ONSEN if not
687  if (need_reconnection_to_onsen) {
688  /* in case of sd_con is connected, disconnect it */
689  if (sd_con != -1) close(sd_con);
690 
691  /* connect to onsen untill connected */
692  for (;;) {
693  int sleep_sec = 2;
694  sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
695  // sd_con = 6;
696  if (sd_con != -1) {
697  /* connected: move to the recv->send loop */
698  need_reconnection_to_onsen = 0;
699  event_count = 0;
700  LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
701  connected = true;
702  break;
703  }
704 
705  ERR_FPRINTF(stderr, "[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
706  sleep(sleep_sec);
707 
708  /* retry connection */
709  continue;
710  }
711  }
712  }
713 
714  // Preparation for select()
715 
716  // printf("Starting select() loop\n") ;
717  fflush(stderr);
718  fflush(stdout);
719 
720  clear_triggers();
721 
722  fd_set allset;
723  FD_ZERO(&allset);
724  FD_SET(sd_acc, &allset);
725  int maxfd = sd_acc;
726  int minfd = sd_acc;
727  fd_set rset;//, wset;
728 
729  // enable the checking of missing triggers
730  enable_check = true;
731  // Handle Obtain ROI and send it to ONSEN
732  while (!stop_running) {
733  memcpy(&rset, &allset, sizeof(rset));
734  // memcpy(&wset, &allset, sizeof(wset));
735 
736  // struct timeval timeout;
737  // timeout.tv_sec = 0; // 1sec
738  // timeout.tv_usec = 1000; // 1msec (in microsec)
739  // printf ( "Select(): maxfd = %d, start select...; rset=%x, wset=%x\n", maxfd, rset, wset);
740  int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
741  // printf ( "Select(): returned with %d, rset = %8.8x\n", rc, rset );
742  if (got_sigusr1) {
743  got_sigusr1 = false;
744  ERR_FPRINTF(stderr, "[INFO] Got SIGUSR1, Run START\n");
745  print_stat();
746  }
747  if (got_sigusr2) {
748  got_sigusr2 = false;
749  ERR_FPRINTF(stderr, "[INFO] Got SIGUSR2, Run STOP\n");
750  print_stat();
751  }
752  if (got_sigint) {
753  got_sigint = false;
754  ERR_FPRINTF(stderr, "[INFO] Got SIGINT, ABORT\n");
755  print_stat();
756  }
757  if (got_sigpipe) {
758  got_sigpipe = false;
759  ERR_FPRINTF(stderr, "[INFO] Got SIGPIPE\n");
760  }
761  if (got_sigterm) {
762  got_sigterm = false;
763  ERR_FPRINTF(stderr, "[INFO] Got SIGTERM\n");
764  }
765  if (rc < 0) {
766  perror("select");
767  continue;
768  } else if (rc == 0) { // timeout
769  continue;
770  }
771 
772  if (FD_ISSET(sd_acc, &rset)) { // new connection
773  int t;
774  struct sockaddr_in isa;
775  socklen_t i = sizeof(isa);
776  getsockname(sd_acc, (struct sockaddr*)&isa, &i);
777  if ((t =::accept(sd_acc, (struct sockaddr*)&isa, &i)) < 0) {
778  // m_errno = errno;
779  ERR_FPRINTF(stderr, "[ERROR] Error on accepting new connection\n");
780  ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
781  return (-1);
782  }
783  LOG_FPRINTF(stderr, "[INFO] New socket connection t=%d\n", t);
784  char address[INET_ADDRSTRLEN];
785  inet_ntop(AF_INET, &isa.sin_addr, address, sizeof(address));
786  connected_hlts++;
787 
788  LOG_FPRINTF(stderr, "[INFO] %d is IP <%s>\n", t, address);
789  myconn[t] = address;
790  // Assume IP4 and take last decimal as HLT number
791  char* ptr = strrchr(address, '.');
792  if (ptr) {
793  int nr = atoi(ptr + 1);
794  hltused.insert(nr);
795  }
796 
797  fflush(stdout);
798  FD_SET(t, &allset);
799  if (minfd == sd_acc) minfd = t;
800  if (t > maxfd) maxfd = t;
801  continue;
802  } else {
803  for (int fd = minfd; fd < maxfd + 1; fd++) {
804  n_bytes_from_hltout = 0;
805  if (FD_ISSET(fd, &rset)) {
806  // printf ( "fd is available for reading = %d\n", fd );
807  /* recv packet */
808  int ret;
809  ret = MM_get_packet(fd, buf);
810  if (ret < 0) { // -2 will not have a errno set
811  if (ret == -1) {
812  ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
813  } else {
814  ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
815  }
816  ERR_FPRINTF(stderr, "[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
817  myconn[fd].c_str());
818  /* connection from HLT is lost */
819  // using exit here will dump ALL still open connections from other HLTs
820  // rethink if we dont exit but process these to limit loss of events.
821  // ERR_FPRINTF(stderr, "%s terminated\n", argv[0]);
822  // exit(1);
823  close(fd);
824  FD_CLR(fd, &allset); //opposite of FD_SET
825  connected_hlts--;
826  if (connected_hlts == 0) stop_running = true;
827  ret = 0;
828  }
829  n_bytes_from_hltout = ret;
830 
831  // printf ( "RoI received : Event count = % d\n", event_count );
832  if (ret > 0) {
833  mycount[fd]++;
834  if (0 /*event_count < 5 || event_count % 100000 == 0*/) {
835  LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
836  LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
837  dump_binary(stderr, buf, n_bytes_from_hltout);
838  }
839  }
840  }
841 
842  /* send packet */
843  if (n_bytes_from_hltout > 0) {
844  int ret;
845  unsigned char* ptr_head_to_onsen = buf;
846 
847  // extract trigger number, run+exp nr and fill it in a table.
848 
849  int runnr = 0;
850  int eventnr = 0;
851 
852  // From ROIpayload.h
853  // enum { OFFSET_MAGIC = 0, OFFSET_LENGTH = 1, OFFSET_HEADER = 2, OFFSET_TRIGNR = 3, OFFSET_RUNNR = 4, OFFSET_ROIS = 5};
854  // enum { HEADER_SIZE_WO_LENGTH = 3, HEADER_SIZE_WITH_LENGTH = 5, HEADER_SIZE_WITH_LENGTH_AND_CRC = 6};
855 
856  if (n_bytes_from_hltout >= 6 * 4) {
857  auto* iptr = (boost::endian::big_uint32_t*)ptr_head_to_onsen;
858  eventnr = iptr[3];
859  runnr = (iptr[4] & 0x3FFF00) >> 8;
860 // ERR_FPRINTF(stderr, "%08X %08X %08X %08X %08X -> %08X %08X \n",
861 // (unsigned int)iptr[0],(unsigned int) iptr[1],(unsigned int) iptr[2],(unsigned int) iptr[3],(unsigned int) iptr[4], eventnr, runnr);
862  } else {
863  LOG_FPRINTF(stderr, "[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
864  }
865 
866  if (runnr > current_runnr) {
867  ERR_FPRINTF(stderr, "[WARNING] merger_merge: run number increases: got %d current %d trig %d\n", runnr, current_runnr,
868  eventnr);
869  print_stat();
870  clear_triggers();
871  current_runnr = runnr;
872  enable_check = true;
873  } else if (runnr < current_runnr) {
874  // got some event from old run
875  ERR_FPRINTF(stderr, "[WARNING] merger_merge: got trigger from older run: got %d current %d trig %d\n", runnr, current_runnr,
876  eventnr);
877  }
878 
879  if (runnr == current_runnr) {
880  // seperate if, as we might set it in the if above
881  if (enable_check) check_event_nr(eventnr);
882  } // if we end the if here, we will send out old events to ONSEN!
883 
884  n_bytes_to_onsen = n_bytes_from_hltout;
885  while (1) {
886  ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
887  if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
888  ERR_FPRINTF(stderr, "[WARNING] merger_merge: socket buffer full, retry\n");
889  sleep(1);// Bad hack, wait a second
890  } else break;
891  }
892 
893  if (ret == -1) {
894  ERROR(b2_send);
895  need_reconnection_to_onsen = 1;
896  event_count = 0;
897  ERR_FPRINTF(stderr, "[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
898  free(buf);
899  ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
900  print_stat();
901  exit(1);
902  }
903  if (ret == 0) {
904  ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_send(): Connection closed\n");
905  need_reconnection_to_onsen = 1;
906  event_count = 0;
907  free(buf);
908  ERR_FPRINTF(stderr, "[ERROR] Connection to ONSEN was closed on ONSEN side\n");
909  ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
910  print_stat();
911  exit(1);
912  }
913 
914  mycount[0]++;
915  if (0 /*event_count < 5 || event_count % 10000 == 0*/) {
916  LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
917  dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
918  }
919  // } // if we end if here, we will NOT send old events to onsen, but only after we received the first new event
920  }
921  }
922  event_count++;
923  if (event_count % 10000 == 0) {
924  int hltcount = hltused.size();
925  if (triggers.empty()) {
926  // workaround for empty vector, but keep same structure for monitor parsing (kibana)
927  ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
928  event_number_max, event_number_max, missing_walk_index, triggers.size(),
929  0, event_number_max, -1, -1);
930  } else {
931  int mod = *triggers.begin() % hltcount;
932  ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
933  *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
934  *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
935  }
936  }
937  }
938  }
939 
940 
941  /* termination: never reached */
942  MM_term_connect_to_onsen(sd_con);
943 
944  if (connected_hlts == 0) {
945  ERR_FPRINTF(stderr, "[RESULT] Stopped because all HLTs closed connection\n");
946  }
947  print_stat();
948  ERR_FPRINTF(stderr, "[RESULT] %s terminated\n", argv[0]);
949  return 0;
950 }
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91