Belle II Software development
merger_merge.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8/* merger_merge.cc */
9
10#include <cstdio>
11#include <cstdlib>
12#include <unistd.h>
13#include <cstring>
14#include <poll.h>
15#include <csignal>
16#include <cerrno>
17#include <netinet/in.h>
18#include <map>
19#include <set>
20#include <vector>
21#include <algorithm>
22#include <arpa/inet.h>
23#include <fcntl.h>
24#include <netdb.h>
25#include <string>
26
27#define ROI_MAX_PACKET_SIZE (16384) /* bytes */
28
29#define NETWORK_ESTABLISH_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
30#define NETWORK_IO_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
31
32#include <boost/endian/arithmetic.hpp>
33
34using namespace std;
35
36
37#define LOG_FPRINTF (fprintf)
38#define ERR_FPRINTF (fprintf)
39#define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
40
41std::map<int, std::string> myconn; // file descriptor to IP string
42std::map<int, int> fd_to_hlt; // file descripto to HLT unit
43std::map<int, unsigned int> mycount; // ROIs per file descriptor, 0 summary
44std::set<int> hltused; // a sorted list of active hlt units
45std::vector<int> hlts; // initialized on run START
46std::map<int, unsigned int> hlt_min; // minimum trigger number from file descriptor in last intervall
47std::map<int, unsigned int> hlt_max; // maximum trigger number from file descriptor in last intervall
48
49std::set<int> triggers;
50unsigned int event_number_max = 0;
51unsigned int missing_walk_index = 0;
52bool enable_check = true;
53
54bool got_sigusr1 = false;
55bool got_sigusr2 = false;
56bool got_sigint = false;
57bool got_sigpipe = false;
58bool got_sigterm = false;
59
60
61void
62dump_binary(FILE* fp, const void* ptr, const size_t size)
63{
64 const unsigned int* p = (const unsigned int*)ptr;
65 const size_t _size = size / sizeof(unsigned int);
66
67
68 for (size_t i = 0; i < _size; i++) {
69 fprintf(fp, "%08x ", p[i]);
70 if (i % 8 == 7) fprintf(fp, "\n");
71 }
72 if (_size % 8 != 0) fprintf(fp, "\n");
73}
74
75
76static void catch_usr1_function(int /*signo*/)
77{
78// puts("SIGUSR1 caught\n");
79 got_sigusr1 = true;
80}
81
82static void catch_usr2_function(int /*signo*/)
83{
84// puts("SIGUSR2 caught\n");
85 got_sigusr2 = true;
86}
87
88static void catch_int_function(int /*signo*/)
89{
90// puts("SIGINT caught\n");
91 got_sigint = true;
92}
93
94static void catch_term_function(int /*signo*/)
95{
96// puts("SIGTERM caught\n");
97 got_sigterm = true;
98}
99
100static void catch_pipe_function(int /*signo*/)
101{
102// puts("SIGPIPE caught\n");
103 got_sigpipe = true;
104}
105
106void clear_triggers(void)
107{
108 triggers.clear();
109 event_number_max = 0;
110 missing_walk_index = 0;
111}
112
113void plot_triggers(void)
114{
115 int hltcount = hltused.size();
116 std::map<int, int> modmissing;
117 hlts.clear();
118 for (auto h : hltused) hlts.push_back(h);
119 if (!triggers.empty()) {
120 ERR_FPRINTF(stderr, "[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
121 *(--triggers.end()),
122 triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
123 int i = 0;
124 for (auto& it : triggers) {
125 int mod = it % hltcount;
126 modmissing[hlts[mod]]++;
127 if (i < 100) {
128 ERR_FPRINTF(stderr, "[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
129 i++;
130 if (i == 100) {
131 ERR_FPRINTF(stderr, "[WARNING] ... too many missing to report\n");
132 i++;
133 }
134 }
135 }
136 } else {
137 ERR_FPRINTF(stderr, "[RESULT] merger_merge: missing triggers 0\n");
138 }
139 for (auto m : modmissing) {
140 ERR_FPRINTF(stderr, "[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
141 }
142}
143
144void check_event_nr(unsigned int event_number)
145{
146 // this code might not detect missing trigger nr 0
147 // it is assumed, that run number change has been checked and handled before
148 if (event_number_max < event_number) {
149 for (uint32_t e = event_number_max + 1; e < event_number; e ++) {
150 triggers.insert(e);
151 }
152 event_number_max = event_number;
153 } else {
154 // we dont fill event_number in the if above, thus we dont have to remove it
155 triggers.erase(event_number);
156 }
157 if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
158 missing_walk_index++;
159 } else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
160 missing_walk_index--;
161 }
162 if (triggers.size() > 50000) {
163 // diable to avoid slow-down by too many in-flight triggers
164 enable_check = false;
165 ERR_FPRINTF(stderr, "[ERROR] Too many in-flight triggers -> disable checking until next run\n");
166 }
167}
168
169int
170b2_timed_blocking_io(const int sd, const int timeout /* secs */)
171{
172 int ret;
173
174 if (timeout > 0) {
175 struct timeval tv;
176
177 tv.tv_sec = timeout;
178 tv.tv_usec = 0;
179 ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
180 if (ret == -1) {
181 ERROR(setsockopt);
182 return -1;
183 }
184
185 tv.tv_sec = timeout;
186 tv.tv_usec = 0;
187 ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
188 if (ret == -1) {
189 ERROR(setsockopt);
190 return -1;
191 }
192 } else if (timeout == 0) {
193 ret = fcntl(sd, F_SETFL, O_NDELAY);
194 if (ret == -1) {
195 ERROR(fcntl);
196 return -1;
197 }
198 } else if (timeout < 0) {
199 ret = fcntl(sd, F_GETFL, O_NDELAY);
200 if (ret == -1) {
201 ERROR(fcntl);
202 return -1;
203 }
204 ret &= ~O_NDELAY;
205 ret = fcntl(sd, F_SETFL, ret);
206 if (ret == -1) {
207 ERROR(fcntl);
208 return -1;
209 }
210 }
211
212 return 0;
213}
214
215
216int
217b2_build_sockaddr_in(const char* hostname, const unsigned short port, struct sockaddr_in* in)
218{
219 memset(in, 0, sizeof(struct sockaddr_in));
220
221 in->sin_family = AF_INET;
222 {
223 struct hostent* hoste;
224 hoste = gethostbyname(hostname);
225 if (!hoste) {
226 ERROR(gethostbyname);
227 return -1;
228 }
229 in->sin_addr = *(struct in_addr*)(hoste->h_addr);
230 }
231 in->sin_port = htons(port);
232
233 return 0;
234}
235
236
237static int
238b2_create_tcp_socket(void)
239{
240 int sd, ret, one = 1;
241
242 sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
243 if (sd == -1) {
244 ERROR(socket);
245 return -1;
246 }
247
248#if 0
249 ret = b2_timed_blocking_io(sd, 0);
250 if (ret == -1) {
251 ERROR(b2_timed_blocking_io);
252 return -1;
253 }
254#endif
255
256 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
257 if (ret == -1) {
258 ERROR(setsockopt);
259 return -1;
260 }
261
262 return sd;
263}
264
265
266int /* returns socket descriptor */
267b2_create_accept_socket(const unsigned short port) /* in reality DOES NOT accept */
268{
269 int sd, ret;
270 struct sockaddr_in in;
271
272 sd = b2_create_tcp_socket();
273 if (sd < 0) {
274 ERROR(b2_create_tcp_socket);
275 return -1;
276 }
277
278 ret = b2_build_sockaddr_in("0.0.0.0", port, &in);
279 if (ret == -1) {
280 ERROR(b2_build_sockaddr_in);
281 return -1;
282 }
283
284 ret = bind(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
285 if (ret == -1) {
286 ERROR(bind);
287 return -1;
288 }
289
290 ret = listen(sd, 1);
291 if (ret == -1) {
292 ERROR(listen);
293 return -1;
294 }
295
296 return sd;
297}
298
299
300int /* returns socket descriptor */
301b2_create_connect_socket(const char* hostname, const unsigned short port)
302{
303 int sd, ret;
304 struct sockaddr_in in;
305
306 sd = b2_create_tcp_socket();
307 if (sd < 0) {
308 ERROR(b2_create_tcp_socket);
309 return -1;
310 }
311
312 ret = b2_build_sockaddr_in(hostname, port, &in);
313 if (ret == -1) {
314 ERROR(b2_build_sockaddr_in);
315 return -1;
316 }
317
318 ret = connect(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
319 if (ret == -1 && errno != EINPROGRESS) {
320 ERROR(connect);
321 return -1;
322 }
323
324 return sd;
325}
326
327
328int
329b2_send(const int sd, const void* buf, const size_t size)
330{
331 unsigned char* ptr = (unsigned char*)buf;
332 size_t n_bytes_remained = size;
333
334 for (;;) {
335 int ret, n_bytes_send;
336
337 ret = send(sd, ptr, n_bytes_remained, 0);
338 if (ret == -1 && errno != EINTR) {
339 ERROR(send);
340 return -1;
341 }
342 if (ret == -1 && errno == EINTR) {
343 fprintf(stderr, "%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
344 return -1;
345 }
346 if (ret == 0) {
347 fprintf(stderr, "%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
348 return -1;
349 }
350
351 n_bytes_send = ret;
352 ptr += n_bytes_send;
353
354 if (n_bytes_remained < size_t(n_bytes_send))
355 /* overrun: internal error */
356 {
357 fprintf(stderr, "%s:%d: send(): Internal error\n", __FILE__, __LINE__);
358 return -1;
359 }
360 n_bytes_remained -= n_bytes_send;
361
362 if (n_bytes_remained == 0)
363 /* fully sendout */
364 {
365 break;
366 }
367 }
368
369 return size;
370}
371
372
373int
374b2_recv(const int sd, void* buf, const size_t size)
375{
376 unsigned char* ptr = (unsigned char*)buf;
377 size_t n_bytes_remained = size;
378
379 for (;;) {
380 int ret, n_bytes_recv;
381
382 ret = recv(sd, ptr, n_bytes_remained, 0);
383 if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
384 ERROR(recv);
385 return -1;
386 }
387 if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
388 fprintf(stderr, "%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
389 return -1;
390 }
391 if (ret == 0) {
392 fprintf(stderr, "%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
393 return -1;
394 }
395
396 n_bytes_recv = ret;
397 ptr += n_bytes_recv;
398 if (n_bytes_remained < size_t(n_bytes_recv))
399 /* overrun: internal error */
400 {
401 fprintf(stderr, "%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
402 return -1;
403 }
404 n_bytes_remained -= n_bytes_recv;
405
406 if (n_bytes_remained == 0)
407 /* fully readout */
408 {
409 break;
410 }
411 }
412
413 return size;
414}
415
416
417
418static int
419MM_init_connect_to_onsen(const char* host, const unsigned int port)
420{
421 int sd, ret;
422 struct pollfd fds;
423
424 sd = b2_create_connect_socket(host, port);
425 if (sd == -1) {
426 ERROR(b2_create_connect_socket);
427 return -1;
428 }
429
430 fds.fd = sd;
431 fds.events = POLLOUT;
432 fds.revents = 0;
433 ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
434 switch (ret) {
435 case -1:
436 ERROR(poll);
437 return -1;
438
439 case 0:
440 ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
441 return -1;
442
443 case 1: {
444 int connection_error;
445 socklen_t optlen;
446
447 optlen = sizeof(connection_error);
448 ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
449 if (ret == -1) {
450 ERROR(getsockopt);
451 return -1;
452 }
453 if (connection_error) {
454 ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
455 return -1;
456 }
457
458 break;
459 }
460
461 default:
462 ERR_FPRINTF(stderr, "[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
463 return -1;
464 }
465
466 ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT /* secs */);
467 if (ret == -1) {
468 ERROR(b2_timed_blocking_io);
469 return -1;
470 }
471
472 return sd;
473}
474
475
476static int
477MM_init_accept_from_hltout2merger(const unsigned int port)
478{
479 int sd;
480 int one = 1, ret;
481 // struct pollfd fds;
482
483 LOG_FPRINTF(stderr, "[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
484
485 sd = b2_create_accept_socket(port);
486 if (sd == -1) {
487 ERROR(b2_create_accept_socket);
488 return -1;
489 }
490
491 ret = b2_timed_blocking_io(sd,
492 1); // This means, if the socket blocks longer than Xs, it will return a EAGAIN or EWOULDBLOCK (immediately)
493 if (ret == -1) {
494 ERROR(b2_timed_blocking_io);
495 return -1;
496 }
497
498 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
499 if (ret == -1) {
500 ERROR(setsockopt);
501 return -1;
502 }
503
504 /*
505 fds.fd = sd;
506 fds.events = POLLIN;
507 fds.revents = 0;
508 ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
509 switch (ret) {
510 case -1:
511 ERROR(poll);
512 return -1;
513
514 case 0:
515 ERR_FPRINTF(stderr, "merger_merge: accept(): Connection timed out\n");
516 return -1;
517
518 case 1: {
519 int ret, connection_error;
520 socklen_t optlen;
521
522 optlen = sizeof(connection_error);
523 ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
524 if (ret == -1) {
525 ERROR(getsockopt);
526 return -1;
527 }
528 if (connection_error) {
529 ERR_FPRINTF(stderr, "merger_merge: accept(): %s\n", strerror(errno));
530 return -1;
531 }
532
533 break;
534 }
535
536 default:
537 ERR_FPRINTF(stderr, "merger_merge: poll(): Unexpected error\n");
538 return -1;
539 }
540 */
541
542 /* Skip accept
543 nd = accept(sd, NULL, NULL);
544 if (nd == -1) {
545 ERROR(accept);
546 return -1;
547 }
548
549 close(sd);
550
551 ret = b2_timed_blocking_io(nd, NETWORK_IO_TIMEOUT / * secs * /);
552 if (ret == -1) {
553 ERROR(b2_timed_blocking_io);
554 return -1;
555 }
556 */
557
558 return sd;
559 // return nd;
560}
561
562
563static int
564MM_get_packet(const int sd_acc, unsigned char* buf)
565{
566 unsigned int header[2] = {}; // length is second word, thus read two
567
568 int ret = recv(sd_acc, &header, sizeof(unsigned int) * 2, MSG_PEEK);
569 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
570 ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Packet receive timed out\n");
571 return -1;
572 }
573 if (ret != 2 * sizeof(unsigned int)) {
574 ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
575 return -2;
576 }
577
580
581 size_t n_bytes_from_hltout = 2 * sizeof(unsigned int) + ntohl(header[1]);// OFFSET_LENGTH = 1
582
583 ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
584 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
585 ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
586 return -1;
587 }
588 if (size_t(ret) != n_bytes_from_hltout) {
589 ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
590 return -2;
591 }
592
593 return ret;
594}
595
596
597static int
598MM_term_connect_to_onsen(const int sd)
599{
600 return close(sd);
601}
602
603void print_stat(void)
604{
605 ERR_FPRINTF(stderr, "[INFO] --- STAT START ---\n");
606 unsigned int sum = 0;
607 for (auto& it : mycount) {
608 ERR_FPRINTF(stderr, "[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
609 if (it.first != 0) sum += it.second;
610 }
611 ERR_FPRINTF(stderr, "[INFO] sum %u out %u diff %d\n", sum, mycount[0], (int)(mycount[0] - sum));
612 plot_triggers();
613 ERR_FPRINTF(stderr, "[INFO] --- STAT END ---\n");
614}
615
616// Main
617
618int
619main(int argc, char* argv[])
620{
621 int current_runnr = -1; // problem: handover without abort
622 // int n_hltout = 0;
623 int sd_acc = -1;
624 int sd_con = -1;
625 int need_reconnection_to_onsen = 1;
626 int event_count = 0;
627 int connected_hlts = 0;
628 bool stop_running = false;
629
630 char onsen_host[1024];
631 unsigned short onsen_port;
632 /* PC test */
633 /* strcpy(onsen_host, "10.10.10.1"); */
634 /* onsen_port = 1024; */
635 /* real ONSEN */
636 /* strcpy(onsen_host, "10.10.10.80"); */
637 /* onsen_port = 24; */
638 // unsigned short accept_port[MM_MAX_HLTOUT];
639 unsigned short accept_port;
640
641 LOG_FPRINTF(stderr, "[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
642
643 if (argc < 4) {
644 ERR_FPRINTF(stderr, "[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
645 exit(1);
646 }
647
648 /* argv copy */
649 char* p;
650
651// p = argv[1]; // we do not want to change parameters, yet, even removing unused
652// strcpy(shmname, p);
653//
654// p = argv[2];
655// shmid = atoi(p);
656
657 p = argv[3];
658 strcpy(onsen_host, p);
659
660 p = argv[4];
661 onsen_port = atoi(p);
662
663 p = argv[5];
664 accept_port = atoi(p);
665
666 signal(SIGPIPE, catch_pipe_function);
667 signal(SIGTERM, catch_term_function);
668 signal(SIGINT, catch_int_function);
669 signal(SIGUSR1, catch_usr1_function);
670 signal(SIGUSR2, catch_usr2_function);
671
672 /* Create a port to accept connections*/
673 sd_acc = MM_init_accept_from_hltout2merger(accept_port);
674 LOG_FPRINTF(stderr, "[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
675
676
677 /* RoI transmission loop */
678 // RoI packets
679 size_t n_bytes_from_hltout;
680 size_t n_bytes_to_onsen;
681
682 unsigned char* buf = (unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
683 if (!buf) {
684 ERROR(valloc);
685 exit(1);
686 }
687
688 // Loop forever for ONSEN connection
689 bool connected = false;
690 while (!connected) {
691 // Connect to ONSEN if not
692 if (need_reconnection_to_onsen) {
693 /* in case of sd_con is connected, disconnect it */
694 if (sd_con != -1) close(sd_con);
695
696 /* connect to onsen untill connected */
697 for (;;) {
698 int sleep_sec = 2;
699 sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
700 // sd_con = 6;
701 if (sd_con != -1) {
702 /* connected: move to the recv->send loop */
703 need_reconnection_to_onsen = 0;
704 event_count = 0;
705 LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
706 connected = true;
707 break;
708 }
709
710 ERR_FPRINTF(stderr, "[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
711 sleep(sleep_sec);
712
713 /* retry connection */
714 continue;
715 }
716 }
717 }
718
719 // Preparation for select()
720
721 // printf("Starting select() loop\n") ;
722 fflush(stderr);
723 fflush(stdout);
724
725 clear_triggers();
726
727 fd_set allset;
728 FD_ZERO(&allset);
729 FD_SET(sd_acc, &allset);
730 int maxfd = sd_acc;
731 int minfd = sd_acc;
732 fd_set rset;//, wset;
733
734 // enable the checking of missing triggers
735 enable_check = true;
736 // Handle Obtain ROI and send it to ONSEN
737 while (!stop_running) {
738 memcpy(&rset, &allset, sizeof(rset));
739 // memcpy(&wset, &allset, sizeof(wset));
740
741 // struct timeval timeout;
742 // timeout.tv_sec = 0; // 1sec
743 // timeout.tv_usec = 1000; // 1msec (in microsec)
744 // printf ( "Select(): maxfd = %d, start select...; rset=%x, wset=%x\n", maxfd, rset, wset);
745 int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
746 // printf ( "Select(): returned with %d, rset = %8.8x\n", rc, rset );
747 if (got_sigusr1) {
748 got_sigusr1 = false;
749 ERR_FPRINTF(stderr, "[INFO] Got SIGUSR1, Run START\n");
750 print_stat();
751 hlt_min.clear(); // reset on run start
752 hlt_max.clear();
753 }
754 if (got_sigusr2) {
755 got_sigusr2 = false;
756 ERR_FPRINTF(stderr, "[INFO] Got SIGUSR2, Run STOP\n");
757 print_stat();
758 }
759 if (got_sigint) {
760 got_sigint = false;
761 ERR_FPRINTF(stderr, "[INFO] Got SIGINT, ABORT\n");
762 print_stat();
763 }
764 if (got_sigpipe) {
765 got_sigpipe = false;
766 ERR_FPRINTF(stderr, "[INFO] Got SIGPIPE\n");
767 }
768 if (got_sigterm) {
769 got_sigterm = false;
770 ERR_FPRINTF(stderr, "[INFO] Got SIGTERM\n");
771 }
772 if (rc < 0) {
773 perror("select");
774 continue;
775 } else if (rc == 0) { // timeout
776 continue;
777 }
778
779 if (FD_ISSET(sd_acc, &rset)) { // new connection
780 int t;
781 struct sockaddr_in isa;
782 socklen_t i = sizeof(isa);
783 getsockname(sd_acc, (struct sockaddr*)&isa, &i);
784 if ((t =::accept(sd_acc, (struct sockaddr*)&isa, &i)) < 0) {
785 // m_errno = errno;
786 ERR_FPRINTF(stderr, "[ERROR] Error on accepting new connection\n");
787 ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
788 return (-1);
789 }
790 LOG_FPRINTF(stderr, "[INFO] New socket connection t=%d\n", t);
791 char address[INET_ADDRSTRLEN];
792 inet_ntop(AF_INET, &isa.sin_addr, address, sizeof(address));
793 connected_hlts++;
794
795 LOG_FPRINTF(stderr, "[INFO] %d is IP <%s>\n", t, address);
796 myconn[t] = address;
797 // Assume IP4 and take last decimal as HLT number
798 char* ptr = strrchr(address, '.');
799 if (ptr) {
800 int nr = atoi(ptr + 1);
801 hltused.insert(nr);
802 fd_to_hlt[t] = nr;
803 }
804
805 fflush(stdout);
806 FD_SET(t, &allset);
807 if (minfd == sd_acc) minfd = t;
808 if (t > maxfd) maxfd = t;
809 continue;
810 } else {
811 for (int fd = minfd; fd < maxfd + 1; fd++) {
812 n_bytes_from_hltout = 0;
813 if (FD_ISSET(fd, &rset)) {
814 // printf ( "fd is available for reading = %d\n", fd );
815 /* recv packet */
816 int ret;
817 ret = MM_get_packet(fd, buf);
818 if (ret < 0) { // -2 will not have a errno set
819 if (ret == -1) {
820 ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
821 } else {
822 ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
823 }
824 ERR_FPRINTF(stderr, "[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
825 myconn[fd].c_str());
826 /* connection from HLT is lost */
827 // using exit here will dump ALL still open connections from other HLTs
828 // rethink if we dont exit but process these to limit loss of events.
829 // ERR_FPRINTF(stderr, "%s terminated\n", argv[0]);
830 // exit(1);
831 close(fd);
832 FD_CLR(fd, &allset); //opposite of FD_SET
833 connected_hlts--;
834 if (connected_hlts == 0) stop_running = true;
835 ret = 0;
836 }
837 n_bytes_from_hltout = ret;
838
839 // printf ( "RoI received : Event count = % d\n", event_count );
840 if (ret > 0) {
841 mycount[fd]++;
842 if (0 /*event_count < 5 || event_count % 100000 == 0*/) {
843 LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
844 LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
845 dump_binary(stderr, buf, n_bytes_from_hltout);
846 }
847 }
848 }
849
850 /* send packet */
851 if (n_bytes_from_hltout > 0) {
852 int ret;
853 unsigned char* ptr_head_to_onsen = buf;
854
855 // extract trigger number, run+exp nr and fill it in a table.
856
857 int runnr = 0;
858 unsigned int eventnr = 0;
859
860 // From ROIpayload.h
861 // enum { OFFSET_MAGIC = 0, OFFSET_LENGTH = 1, OFFSET_HEADER = 2, OFFSET_TRIGNR = 3, OFFSET_RUNNR = 4, OFFSET_ROIS = 5};
862 // enum { HEADER_SIZE_WO_LENGTH = 3, HEADER_SIZE_WITH_LENGTH = 5, HEADER_SIZE_WITH_LENGTH_AND_CRC = 6};
863
864 if (n_bytes_from_hltout >= 6 * 4) {
865 auto* iptr = (boost::endian::big_uint32_t*)ptr_head_to_onsen;
866 eventnr = iptr[3];
867 runnr = (iptr[4] & 0x3FFF00) >> 8;
868// ERR_FPRINTF(stderr, "%08X %08X %08X %08X %08X -> %08X %08X \n",
869// (unsigned int)iptr[0],(unsigned int) iptr[1],(unsigned int) iptr[2],(unsigned int) iptr[3],(unsigned int) iptr[4], eventnr, runnr);
870 } else {
871 LOG_FPRINTF(stderr, "[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
872 }
873
874 if (runnr > current_runnr) {
875 ERR_FPRINTF(stderr, "[WARNING] merger_merge: run number increases: got %d current %d trig %u\n", runnr, current_runnr,
876 eventnr);
877 print_stat();
878 clear_triggers();
879 current_runnr = runnr;
880 enable_check = true;
881 } else if (runnr < current_runnr) {
882 // got some event from old run
883 ERR_FPRINTF(stderr, "[WARNING] merger_merge: got trigger from older run: got %d current %d trig %u\n", runnr, current_runnr,
884 eventnr);
885 }
886
887 if (runnr == current_runnr) {
888 // seperate if, as we might set it in the if above
889 if (enable_check) check_event_nr(eventnr);
890 if (hlt_min[fd] == 0 or hlt_min[fd] > eventnr) hlt_min[fd] = eventnr;
891 if (hlt_max[fd] < eventnr) hlt_max[fd] = eventnr;
892 } // if we end the if here, we will send out old events to ONSEN!
893
894 n_bytes_to_onsen = n_bytes_from_hltout;
895 while (1) {
896 ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
897 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
898 ERR_FPRINTF(stderr, "[WARNING] merger_merge: socket buffer full, retry\n");
899 sleep(1);// Bad hack, wait a second
900 } else break;
901 }
902
903 if (ret == -1) {
904 ERROR(b2_send);
905 need_reconnection_to_onsen = 1;
906 event_count = 0;
907 ERR_FPRINTF(stderr, "[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
908 free(buf);
909 ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
910 print_stat();
911 exit(1);
912 }
913 if (ret == 0) {
914 ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_send(): Connection closed\n");
915 need_reconnection_to_onsen = 1;
916 event_count = 0;
917 free(buf);
918 ERR_FPRINTF(stderr, "[ERROR] Connection to ONSEN was closed on ONSEN side\n");
919 ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
920 print_stat();
921 exit(1);
922 }
923
924 mycount[0]++;
925 if (0 /*event_count < 5 || event_count % 10000 == 0*/) {
926 LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
927 dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
928 }
929 // } // if we end if here, we will NOT send old events to onsen, but only after we received the first new event
930 }
931 }
932 event_count++;
933 if (event_count % 10000 == 0) {
934 int hltcount = hltused.size();
935 if (triggers.empty()) {
936 // workaround for empty vector, but keep same structure for monitor parsing (kibana)
937 ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
938 event_number_max, event_number_max, missing_walk_index, triggers.size(),
939 0, event_number_max, -1, -1);
940 } else {
941 int mod = *triggers.begin() % hltcount;
942 ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
943 *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
944 *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
945 }
946
947 auto minIt = std::ranges::min_element(hlt_min.begin(), hlt_min.end(),
948 [](const auto & a, const auto & b) {
949 return a.second < b.second;
950 });
951
952 // Find the element with the maximum value
953 auto maxIt = std::ranges::max_element(hlt_max.begin(), hlt_max.end(),
954 [](const auto & a, const auto & b) {
955 return a.second < b.second;
956 });
957
958 ERR_FPRINTF(stderr, "[INFO] ALL, %u, %u, %u\n", minIt->second, maxIt->second, maxIt->second - minIt->second);
959 for (auto& h : hlt_max) ERR_FPRINTF(stderr, "[INFO] HLT%u, %u, %u, %u\n", fd_to_hlt[h.first], hlt_min[h.first], h.second,
960 h.second - hlt_min[h.first]);
961 hlt_min.clear();
962 hlt_max.clear();
963 }
964 }
965 }
966
967
968 /* termination: never reached */
969 MM_term_connect_to_onsen(sd_con);
970
971 if (connected_hlts == 0) {
972 ERR_FPRINTF(stderr, "[RESULT] Stopped because all HLTs closed connection\n");
973 }
974 print_stat();
975 ERR_FPRINTF(stderr, "[RESULT] %s terminated\n", argv[0]);
976 return 0;
977}
STL namespace.