Belle II Software  release-05-01-25
merger_merge.cc
1 /* merger_merge.cc */
2 
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <string.h>
7 #include <poll.h>
8 #include <signal.h>
9 #include <netinet/in.h>
10 #include <map>
11 #include <set>
12 #include <vector>
13 #include <arpa/inet.h>
14 #include <fcntl.h>
15 #include <netdb.h>
16 
17 #define ROI_MAX_PACKET_SIZE (16384) /* bytes */
18 
19 #define NETWORK_ESTABLISH_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
20 #define NETWORK_IO_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
21 
22 #include <boost/spirit/home/support/detail/endian.hpp>
23 
24 using namespace std;
25 
26 
27 #define LOG_FPRINTF (fprintf)
28 #define ERR_FPRINTF (fprintf)
29 #define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
30 
31 std::map<int, std::string> myconn;
32 std::map<int, unsigned int> mycount;
33 std::set<int> hltused; // we want a sorted list
34 std::vector<int> hlts; // initialized on run START
35 
36 std::set<int> triggers;
37 unsigned int event_number_max = 0;
38 unsigned int missing_walk_index = 0;
39 
40 bool got_sigusr1 = false;
41 bool got_sigusr2 = false;
42 bool got_sigint = false;
43 bool got_sigpipe = false;
44 bool got_sigterm = false;
45 
46 void
47 dump_binary(FILE* fp, const void* ptr, const size_t size)
48 {
49  const unsigned int* p = (const unsigned int*)ptr;
50  const size_t _size = size / sizeof(unsigned int);
51 
52 
53  for (size_t i = 0; i < _size; i++) {
54  fprintf(fp, "%08x ", p[i]);
55  if (i % 8 == 7) fprintf(fp, "\n");
56  }
57  if (_size % 8 != 0) fprintf(fp, "\n");
58 }
59 
60 
61 static void catch_usr1_function(int /*signo*/)
62 {
63 // puts("SIGUSR1 caught\n");
64  got_sigusr1 = true;
65 }
66 
67 static void catch_usr2_function(int /*signo*/)
68 {
69 // puts("SIGUSR2 caught\n");
70  got_sigusr2 = true;
71 }
72 
73 static void catch_int_function(int /*signo*/)
74 {
75 // puts("SIGINT caught\n");
76  got_sigint = true;
77 }
78 
79 static void catch_term_function(int /*signo*/)
80 {
81 // puts("SIGTERM caught\n");
82  got_sigterm = true;
83 }
84 
85 static void catch_pipe_function(int /*signo*/)
86 {
87 // puts("SIGPIPE caught\n");
88  got_sigpipe = true;
89 }
90 
91 void clear_triggers(void)
92 {
93  triggers.clear();
94  event_number_max = 0;
95  missing_walk_index = 0;
96 }
97 
98 void plot_triggers(void)
99 {
100  int hltcount = hltused.size();
101  std::map<int, int> modmissing;
102  hlts.clear();
103  for (auto h : hltused) hlts.push_back(h);
104  if (!triggers.empty()) {
105  ERR_FPRINTF(stderr, "[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
106  *(--triggers.end()),
107  triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
108  int i = 0;
109  for (auto& it : triggers) {
110  int mod = it % hltcount;
111  modmissing[hlts[mod]]++;
112  if (i < 100) {
113  ERR_FPRINTF(stderr, "[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
114  i++;
115  if (i == 100) {
116  ERR_FPRINTF(stderr, "[WARNING] ... too many missing to report\n");
117  i++;
118  }
119  }
120  }
121  } else {
122  ERR_FPRINTF(stderr, "[RESULT] merger_merge: missing triggers 0\n");
123  }
124  for (auto m : modmissing) {
125  ERR_FPRINTF(stderr, "[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
126  }
127 }
128 
129 void check_event_nr(unsigned int event_number)
130 {
131  // this code might not detect missing trigger nr 0
132  // it is assumed, that run number change has been checked and handled before
133  if (event_number_max < event_number) {
134  for (uint32_t e = event_number_max + 1; e < event_number; e ++) {
135  triggers.insert(e);
136  }
137  event_number_max = event_number;
138  } else {
139  // we dont fill event_number in the if above, thus we dont have to remove it
140  triggers.erase(event_number);
141  }
142  if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
143  missing_walk_index++;
144  } else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
145  missing_walk_index--;
146  }
147 }
148 
149 int
150 b2_timed_blocking_io(const int sd, const int timeout /* secs */)
151 {
152  int ret;
153 
154 
155  if (timeout > 0) {
156  struct timeval tv;
157 
158  tv.tv_sec = timeout;
159  tv.tv_usec = 0;
160  ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
161  if (ret == -1) {
162  ERROR(setsockopt);
163  return -1;
164  }
165 
166  tv.tv_sec = timeout;
167  tv.tv_usec = 0;
168  ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
169  if (ret == -1) {
170  ERROR(setsockopt);
171  return -1;
172  }
173  } else if (timeout == 0) {
174  ret = fcntl(sd, F_SETFL, O_NDELAY);
175  if (ret == -1) {
176  ERROR(fcntl);
177  return -1;
178  }
179  } else if (timeout < 0) {
180  ret = fcntl(sd, F_GETFL, O_NDELAY);
181  if (ret == -1) {
182  ERROR(fcntl);
183  return -1;
184  }
185  ret &= ~O_NDELAY;
186  ret = fcntl(sd, F_SETFL, ret);
187  if (ret == -1) {
188  ERROR(fcntl);
189  return -1;
190  }
191  }
192 
193 
194  return 0;
195 }
196 
197 
198 int
199 b2_build_sockaddr_in(const char* hostname, const unsigned short port, struct sockaddr_in* in)
200 {
201  memset(in, 0, sizeof(struct sockaddr_in));
202 
203  in->sin_family = AF_INET;
204  {
205  struct hostent* hoste;
206  hoste = gethostbyname(hostname);
207  if (!hoste) {
208  ERROR(gethostbyname);
209  return -1;
210  }
211  in->sin_addr = *(struct in_addr*)(hoste->h_addr);
212  }
213  in->sin_port = htons(port);
214 
215 
216  return 0;
217 }
218 
219 
220 static int
221 b2_create_tcp_socket(void)
222 {
223  int sd, ret, one = 1;
224 
225 
226  sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
227  if (sd == -1) {
228  ERROR(socket);
229  return -1;
230  }
231 
232 #if 0
233  ret = b2_timed_blocking_io(sd, 0);
234  if (ret == -1) {
235  ERROR(b2_timed_blocking_io);
236  return -1;
237  }
238 #endif
239 
240  ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
241  if (ret == -1) {
242  ERROR(setsockopt);
243  return -1;
244  }
245 
246 
247  return sd;
248 }
249 
250 
251 int /* returns socket descriptor */
252 b2_create_accept_socket(const unsigned short port) /* in reality DOES NOT accept */
253 {
254  int sd, ret;
255  struct sockaddr_in in;
256 
257 
258  sd = b2_create_tcp_socket();
259  if (sd < 0) {
260  ERROR(b2_create_tcp_socket);
261  return -1;
262  }
263 
264  ret = b2_build_sockaddr_in("0.0.0.0", port, &in);
265  if (ret == -1) {
266  ERROR(b2_build_sockaddr_in);
267  return -1;
268  }
269 
270  ret = bind(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
271  if (ret == -1) {
272  ERROR(bind);
273  return -1;
274  }
275 
276  ret = listen(sd, 1);
277  if (ret == -1) {
278  ERROR(listen);
279  return -1;
280  }
281 
282 
283  return sd;
284 }
285 
286 
287 int /* returns socket descriptor */
288 b2_create_connect_socket(const char* hostname, const unsigned short port)
289 {
290  int sd, ret;
291  struct sockaddr_in in;
292 
293 
294  sd = b2_create_tcp_socket();
295  if (sd < 0) {
296  ERROR(b2_create_tcp_socket);
297  return -1;
298  }
299 
300  ret = b2_build_sockaddr_in(hostname, port, &in);
301  if (ret == -1) {
302  ERROR(b2_build_sockaddr_in);
303  return -1;
304  }
305 
306  ret = connect(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
307  if (ret == -1 && errno != EINPROGRESS) {
308  ERROR(connect);
309  return -1;
310  }
311 
312 
313  return sd;
314 }
315 
316 
317 int
318 b2_send(const int sd, const void* buf, const size_t size)
319 {
320  unsigned char* ptr = (unsigned char*)buf;
321  size_t n_bytes_remained = size;
322 
323 
324  for (;;) {
325  int ret, n_bytes_send;
326 
327  ret = send(sd, ptr, n_bytes_remained, 0);
328  if (ret == -1 && errno != EINTR) {
329  ERROR(send);
330  return -1;
331  }
332  if (ret == -1 && errno == EINTR) {
333  fprintf(stderr, "%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
334  return -1;
335  }
336  if (ret == 0) {
337  fprintf(stderr, "%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
338  return -1;
339  }
340 
341  n_bytes_send = ret;
342  ptr += n_bytes_send;
343 
344  if (n_bytes_remained < size_t(n_bytes_send))
345  /* overrun: internal error */
346  {
347  fprintf(stderr, "%s:%d: send(): Internal error\n", __FILE__, __LINE__);
348  return -1;
349  }
350  n_bytes_remained -= n_bytes_send;
351 
352  if (n_bytes_remained == 0)
353  /* fully sendout */
354  {
355  break;
356  }
357  }
358 
359 
360  return size;
361 }
362 
363 
364 int
365 b2_recv(const int sd, void* buf, const size_t size)
366 {
367  unsigned char* ptr = (unsigned char*)buf;
368  size_t n_bytes_remained = size;
369 
370 
371  for (;;) {
372  int ret, n_bytes_recv;
373 
374  ret = recv(sd, ptr, n_bytes_remained, 0);
375  if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
376  ERROR(recv);
377  return -1;
378  }
379  if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
380  fprintf(stderr, "%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
381  return -1;
382  }
383  if (ret == 0) {
384  fprintf(stderr, "%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
385  return -1;
386  }
387 
388  n_bytes_recv = ret;
389  ptr += n_bytes_recv;
390  if (n_bytes_remained < size_t(n_bytes_recv))
391  /* overrun: internal error */
392  {
393  fprintf(stderr, "%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
394  return -1;
395  }
396  n_bytes_remained -= n_bytes_recv;
397 
398  if (n_bytes_remained == 0)
399  /* fully readout */
400  {
401  break;
402  }
403  }
404 
405 
406  return size;
407 }
408 
409 
410 
411 static int
412 MM_init_connect_to_onsen(const char* host, const unsigned int port)
413 {
414  int sd, ret;
415  struct pollfd fds;
416 
417 
418  sd = b2_create_connect_socket(host, port);
419  if (sd == -1) {
420  ERROR(b2_create_connect_socket);
421  return -1;
422  }
423 
424  fds.fd = sd;
425  fds.events = POLLOUT;
426  fds.revents = 0;
427  ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
428  switch (ret) {
429  case -1:
430  ERROR(poll);
431  return -1;
432 
433  case 0:
434  ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
435  return -1;
436 
437  case 1: {
438  int connection_error;
439  socklen_t optlen;
440 
441  optlen = sizeof(connection_error);
442  ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
443  if (ret == -1) {
444  ERROR(getsockopt);
445  return -1;
446  }
447  if (connection_error) {
448  ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
449  return -1;
450  }
451 
452  break;
453  }
454 
455  default:
456  ERR_FPRINTF(stderr, "[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
457  return -1;
458  }
459 
460  ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT /* secs */);
461  if (ret == -1) {
462  ERROR(b2_timed_blocking_io);
463  return -1;
464  }
465 
466 
467  return sd;
468 }
469 
470 
471 static int
472 MM_init_accept_from_hltout2merger(const unsigned int port)
473 {
474  int sd;
475  int one = 1, ret;
476  // struct pollfd fds;
477 
478 
479  LOG_FPRINTF(stderr, "[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
480 
481  sd = b2_create_accept_socket(port);
482  if (sd == -1) {
483  ERROR(b2_create_accept_socket);
484  return -1;
485  }
486 
487  ret = b2_timed_blocking_io(sd,
488  1); // This means, if the socket blocks longer than Xs, it will return a EAGAIN or EWOULDBLOCK (immediately)
489  if (ret == -1) {
490  ERROR(b2_timed_blocking_io);
491  return -1;
492  }
493 
494  ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
495  if (ret == -1) {
496  ERROR(setsockopt);
497  return -1;
498  }
499 
500  /*
501  fds.fd = sd;
502  fds.events = POLLIN;
503  fds.revents = 0;
504  ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
505  switch (ret) {
506  case -1:
507  ERROR(poll);
508  return -1;
509 
510  case 0:
511  ERR_FPRINTF(stderr, "merger_merge: accept(): Connection timed out\n");
512  return -1;
513 
514  case 1: {
515  int ret, connection_error;
516  socklen_t optlen;
517 
518  optlen = sizeof(connection_error);
519  ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
520  if (ret == -1) {
521  ERROR(getsockopt);
522  return -1;
523  }
524  if (connection_error) {
525  ERR_FPRINTF(stderr, "merger_merge: accept(): %s\n", strerror(errno));
526  return -1;
527  }
528 
529  break;
530  }
531 
532  default:
533  ERR_FPRINTF(stderr, "merger_merge: poll(): Unexpected error\n");
534  return -1;
535  }
536  */
537 
538  /* Skip accept
539  nd = accept(sd, NULL, NULL);
540  if (nd == -1) {
541  ERROR(accept);
542  return -1;
543  }
544 
545  close(sd);
546 
547  ret = b2_timed_blocking_io(nd, NETWORK_IO_TIMEOUT / * secs * /);
548  if (ret == -1) {
549  ERROR(b2_timed_blocking_io);
550  return -1;
551  }
552  */
553 
554 
555  return sd;
556  // return nd;
557 }
558 
559 
560 static int
561 MM_get_packet(const int sd_acc, unsigned char* buf)
562 {
563  unsigned int header[2] = {}; // length is second word, thus read two
564 
565  int ret = recv(sd_acc, &header, sizeof(unsigned int) * 2, MSG_PEEK);
566  if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
567  ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Packet receive timed out\n");
568  return -1;
569  }
570  if (ret != 2 * sizeof(unsigned int)) {
571  ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
572  return -2;
573  }
574 
577 
578  size_t n_bytes_from_hltout = 2 * sizeof(unsigned int) + ntohl(header[1]);// OFFSET_LENGTH = 1
579 
580  ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
581  if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
582  ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
583  return -1;
584  }
585  if (size_t(ret) != n_bytes_from_hltout) {
586  ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
587  return -2;
588  }
589 
590  return ret;
591 }
592 
593 
594 static int
595 MM_term_connect_to_onsen(const int sd)
596 {
597  return close(sd);
598 }
599 
600 void print_stat(void)
601 {
602  ERR_FPRINTF(stderr, "[INFO] --- STAT START ---\n");
603  unsigned int sum = 0;
604  for (auto& it : mycount) {
605  ERR_FPRINTF(stderr, "[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
606  if (it.first != 0) sum += it.second;
607  }
608  ERR_FPRINTF(stderr, "[INFO] sum %u out %u diff %d\n", sum, mycount[0], (int)(mycount[0] - sum));
609  plot_triggers();
610  ERR_FPRINTF(stderr, "[INFO] --- STAT END ---\n");
611 }
612 
613 // Main
614 
615 int
616 main(int argc, char* argv[])
617 {
618  int current_runnr = -1; // problem: handover without abort
619  // int n_hltout = 0;
620  int sd_acc = -1;
621  int sd_con = -1;
622  int need_reconnection_to_onsen = 1;
623  int event_count = 0;
624  int connected_hlts = 0;
625  bool stop_running = false;
626 
627  char onsen_host[1024];
628  unsigned short onsen_port;
629  /* PC test */
630  /* strcpy(onsen_host, "10.10.10.1"); */
631  /* onsen_port = 1024; */
632  /* real ONSEN */
633  /* strcpy(onsen_host, "10.10.10.80"); */
634  /* onsen_port = 24; */
635  // unsigned short accept_port[MM_MAX_HLTOUT];
636  unsigned short accept_port;
637 
638  LOG_FPRINTF(stderr, "[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
639 
640  if (argc < 4) {
641  ERR_FPRINTF(stderr, "[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
642  exit(1);
643  }
644 
645  /* argv copy */
646  char* p;
647 
648 // p = argv[1];
649 // strcpy(shmname, p);
650 //
651 // p = argv[2];
652 // shmid = atoi(p);
653 
654  p = argv[3];
655  strcpy(onsen_host, p);
656 
657  p = argv[4];
658  onsen_port = atoi(p);
659 
660  p = argv[5];
661  accept_port = atoi(p);
662 
663  signal(SIGPIPE, catch_pipe_function);
664  signal(SIGTERM, catch_term_function);
665  signal(SIGINT, catch_int_function);
666  signal(SIGUSR1, catch_usr1_function);
667  signal(SIGUSR2, catch_usr2_function);
668 
669  /* Create a port to accept connections*/
670  sd_acc = MM_init_accept_from_hltout2merger(accept_port);
671  LOG_FPRINTF(stderr, "[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
672 
673 
674  /* RoI transmission loop */
675  // RoI packets
676  size_t n_bytes_from_hltout;
677  size_t n_bytes_to_onsen;
678 
679  unsigned char* buf = (unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
680  if (!buf) {
681  ERROR(valloc);
682  exit(1);
683  }
684 
685  // Loop forever for ONSEN connection
686  bool connected = false;
687  while (!connected) {
688  // Connect to ONSEN if not
689  if (need_reconnection_to_onsen) {
690  /* in case of sd_con is connected, disconnect it */
691  if (sd_con != -1) close(sd_con);
692 
693  /* connect to onsen untill connected */
694  for (;;) {
695  int sleep_sec = 2;
696  sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
697  // sd_con = 6;
698  if (sd_con != -1) {
699  /* connected: move to the recv->send loop */
700  need_reconnection_to_onsen = 0;
701  event_count = 0;
702  LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
703  connected = true;
704  break;
705  }
706 
707  ERR_FPRINTF(stderr, "[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
708  sleep(sleep_sec);
709 
710  /* retry connection */
711  continue;
712  }
713  }
714  }
715 
716  // Preparation for select()
717 
718  // printf("Starting select() loop\n") ;
719  fflush(stderr);
720  fflush(stdout);
721 
722  clear_triggers();
723 
724  fd_set allset;
725  FD_ZERO(&allset);
726  FD_SET(sd_acc, &allset);
727  int maxfd = sd_acc;
728  int minfd = sd_acc;
729  fd_set rset;//, wset;
730 
731  // Handle Obtain ROI and send it to ONSEN
732  while (!stop_running) {
733  memcpy(&rset, &allset, sizeof(rset));
734  // memcpy(&wset, &allset, sizeof(wset));
735 
736  // struct timeval timeout;
737  // timeout.tv_sec = 0; // 1sec
738  // timeout.tv_usec = 1000; // 1msec (in microsec)
739  // printf ( "Select(): maxfd = %d, start select...; rset=%x, wset=%x\n", maxfd, rset, wset);
740  int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
741  // printf ( "Select(): returned with %d, rset = %8.8x\n", rc, rset );
742  if (got_sigusr1) {
743  got_sigusr1 = false;
744  ERR_FPRINTF(stderr, "[INFO] Got SIGUSR1, Run START\n");
745  print_stat();
746  }
747  if (got_sigusr2) {
748  got_sigusr2 = false;
749  ERR_FPRINTF(stderr, "[INFO] Got SIGUSR2, Run STOP\n");
750  print_stat();
751  }
752  if (got_sigint) {
753  got_sigint = false;
754  ERR_FPRINTF(stderr, "[INFO] Got SIGINT, ABORT\n");
755  print_stat();
756  }
757  if (got_sigpipe) {
758  got_sigpipe = false;
759  ERR_FPRINTF(stderr, "[INFO] Got SIGPIPE\n");
760  }
761  if (got_sigterm) {
762  got_sigterm = false;
763  ERR_FPRINTF(stderr, "[INFO] Got SIGTERM\n");
764  }
765  if (rc < 0) {
766  perror("select");
767  continue;
768  } else if (rc == 0) { // timeout
769  continue;
770  }
771 
772  int t;
773  if (FD_ISSET(sd_acc, &rset)) { // new connection
774  struct sockaddr_in isa;
775  socklen_t i = sizeof(isa);
776  getsockname(sd_acc, (struct sockaddr*)&isa, &i);
777  if ((t =::accept(sd_acc, (struct sockaddr*)&isa, &i)) < 0) {
778  // m_errno = errno;
779  ERR_FPRINTF(stderr, "[ERROR] Error on accepting new connection\n");
780  ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
781  return (-1);
782  }
783  LOG_FPRINTF(stderr, "[INFO] New socket connection t=%d\n", t);
784  char address[INET_ADDRSTRLEN];
785  inet_ntop(AF_INET, &isa.sin_addr, address, sizeof(address));
786  connected_hlts++;
787 
788  LOG_FPRINTF(stderr, "[INFO] %d is IP <%s>\n", t, address);
789  myconn[t] = address;
790  // Assume IP4 and take last decimal as HLT number
791  char* ptr = strrchr(address, '.');
792  if (ptr) {
793  int nr = atoi(ptr + 1);
794  hltused.insert(nr);
795  }
796 
797  fflush(stdout);
798  FD_SET(t, &allset);
799  if (minfd == sd_acc) minfd = t;
800  if (t > maxfd) maxfd = t;
801  continue;
802  } else {
803  for (int fd = minfd; fd < maxfd + 1; fd++) {
804  n_bytes_from_hltout = 0;
805  if (FD_ISSET(fd, &rset)) {
806  // printf ( "fd is available for reading = %d\n", fd );
807  /* recv packet */
808  int ret;
809  ret = MM_get_packet(fd, buf);
810  if (ret < 0) { // -2 will not have a errno set
811  if (ret == -1) {
812  ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
813  } else {
814  ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
815  }
816  ERR_FPRINTF(stderr, "[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
817  myconn[fd].c_str());
818  /* connection from HLT is lost */
819  // using exit here will dump ALL still open connections from other HLTs
820  // rethink if we dont exit but process these to limit loss of events.
821  // ERR_FPRINTF(stderr, "%s terminated\n", argv[0]);
822  // exit(1);
823  close(fd);
824  FD_CLR(fd, &allset); //opposite of FD_SET
825  connected_hlts--;
826  if (connected_hlts == 0) stop_running = true;
827  ret = 0;
828  }
829  n_bytes_from_hltout = ret;
830 
831  // printf ( "RoI received : Event count = % d\n", event_count );
832  if (ret > 0) {
833  mycount[fd]++;
834  if (0 /*event_count < 5 || event_count % 100000 == 0*/) {
835  LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
836  LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
837  dump_binary(stderr, buf, n_bytes_from_hltout);
838  }
839  }
840  }
841 
842  /* send packet */
843  if (n_bytes_from_hltout > 0) {
844  int ret;
845  unsigned char* ptr_head_to_onsen = buf;
846 
847  // extract trigger number, run+exp nr and fill it in a table.
848 
849  int runnr = 0;
850  int eventnr = 0;
851 
852  // From ROIpayload.h
853  // enum { OFFSET_MAGIC = 0, OFFSET_LENGTH = 1, OFFSET_HEADER = 2, OFFSET_TRIGNR = 3, OFFSET_RUNNR = 4, OFFSET_ROIS = 5};
854  // enum { HEADER_SIZE_WO_LENGTH = 3, HEADER_SIZE_WITH_LENGTH = 5, HEADER_SIZE_WITH_LENGTH_AND_CRC = 6};
855 
856  if (n_bytes_from_hltout >= 6 * 4) {
857  boost::spirit::endian::ubig32_t* iptr = (boost::spirit::endian::ubig32_t*)ptr_head_to_onsen;
858  eventnr = iptr[3];
859  runnr = (iptr[4] & 0x3FFF00) >> 8;
860 // ERR_FPRINTF(stderr, "%08X %08X %08X %08X %08X -> %08X %08X \n",
861 // (unsigned int)iptr[0],(unsigned int) iptr[1],(unsigned int) iptr[2],(unsigned int) iptr[3],(unsigned int) iptr[4], eventnr, runnr);
862  } else {
863  LOG_FPRINTF(stderr, "[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
864  }
865 
866  if (runnr > current_runnr) {
867  ERR_FPRINTF(stderr, "[WARNING] merger_merge: run number increases: got %d current %d trig %d\n", runnr, current_runnr,
868  eventnr);
869  print_stat();
870  clear_triggers();
871  current_runnr = runnr;
872  } else if (runnr < current_runnr) {
873  // got some event from old run
874  ERR_FPRINTF(stderr, "[WARNING] merger_merge: got trigger from older run: got %d current %d trig %d\n", runnr, current_runnr,
875  eventnr);
876  }
877 
878  if (runnr == current_runnr) {
879  // seperate if, as we might set it in the if above
880  check_event_nr(eventnr);
881  } // if we end the if here, we will send out old events to ONSEN!
882 
883  n_bytes_to_onsen = n_bytes_from_hltout;
884  while (1) {
885  ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
886  if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
887  ERR_FPRINTF(stderr, "[WARNING] merger_merge: socket buffer full, retry\n");
888  sleep(1);// Bad hack, wait a second
889  } else break;
890  }
891 
892  if (ret == -1) {
893  ERROR(b2_send);
894  need_reconnection_to_onsen = 1;
895  event_count = 0;
896  ERR_FPRINTF(stderr, "[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
897  free(buf);
898  ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
899  print_stat();
900  exit(1);
901  }
902  if (ret == 0) {
903  ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_send(): Connection closed\n");
904  need_reconnection_to_onsen = 1;
905  event_count = 0;
906  free(buf);
907  ERR_FPRINTF(stderr, "[ERROR] Connection to ONSEN was closed on ONSEN side\n");
908  ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
909  print_stat();
910  exit(1);
911  }
912 
913  mycount[0]++;
914  if (0 /*event_count < 5 || event_count % 10000 == 0*/) {
915  LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
916  dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
917  }
918  // } // if we end if here, we will NOT send old events to onsen, but only after we received the first new event
919  }
920  }
921  event_count++;
922  if (event_count % 10000 == 0) {
923  int hltcount = hltused.size();
924  int mod = *triggers.begin() % hltcount;
925  if (triggers.empty()) {
926  // workaround for empty vector, but keep same structure for monitor parsing (kibana)
927  ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
928  event_number_max, event_number_max, missing_walk_index, triggers.size(),
929  0, event_number_max, mod, hlts[mod]);
930  } else {
931  ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
932  *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
933  *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
934  }
935  }
936  }
937  }
938 
939 
940  /* termination: never reached */
941  MM_term_connect_to_onsen(sd_con);
942 
943  if (connected_hlts == 0) {
944  ERR_FPRINTF(stderr, "[RESULT] Stopped because all HLTs closed connection\n");
945  }
946  print_stat();
947  ERR_FPRINTF(stderr, "[RESULT] %s terminated\n", argv[0]);
948  return 0;
949 }
950 
prepareAsicCrosstalkSimDB.e
e
aux.
Definition: prepareAsicCrosstalkSimDB.py:53
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77