Belle II Software development
merger_merge.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8/* merger_merge.cc */
9
10#include <cstdio>
11#include <cstdlib>
12#include <unistd.h>
13#include <cstring>
14#include <poll.h>
15#include <csignal>
16#include <cerrno>
17#include <netinet/in.h>
18#include <map>
19#include <set>
20#include <vector>
21#include <arpa/inet.h>
22#include <fcntl.h>
23#include <netdb.h>
24#include <string>
25
26#define ROI_MAX_PACKET_SIZE (16384) /* bytes */
27
28#define NETWORK_ESTABLISH_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
29#define NETWORK_IO_TIMEOUT (-1) /* seconds (0 or negative specifies forever wait) */
30
31#include <boost/endian/arithmetic.hpp>
32
33using namespace std;
34
35
36#define LOG_FPRINTF (fprintf)
37#define ERR_FPRINTF (fprintf)
38#define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
39
40std::map<int, std::string> myconn;
41std::map<int, unsigned int> mycount;
42std::set<int> hltused; // we want a sorted list
43std::vector<int> hlts; // initialized on run START
44
45std::set<int> triggers;
46unsigned int event_number_max = 0;
47unsigned int missing_walk_index = 0;
48bool enable_check = true;
49
50bool got_sigusr1 = false;
51bool got_sigusr2 = false;
52bool got_sigint = false;
53bool got_sigpipe = false;
54bool got_sigterm = false;
55
56
57void
58dump_binary(FILE* fp, const void* ptr, const size_t size)
59{
60 const unsigned int* p = (const unsigned int*)ptr;
61 const size_t _size = size / sizeof(unsigned int);
62
63
64 for (size_t i = 0; i < _size; i++) {
65 fprintf(fp, "%08x ", p[i]);
66 if (i % 8 == 7) fprintf(fp, "\n");
67 }
68 if (_size % 8 != 0) fprintf(fp, "\n");
69}
70
71
72static void catch_usr1_function(int /*signo*/)
73{
74// puts("SIGUSR1 caught\n");
75 got_sigusr1 = true;
76}
77
78static void catch_usr2_function(int /*signo*/)
79{
80// puts("SIGUSR2 caught\n");
81 got_sigusr2 = true;
82}
83
84static void catch_int_function(int /*signo*/)
85{
86// puts("SIGINT caught\n");
87 got_sigint = true;
88}
89
90static void catch_term_function(int /*signo*/)
91{
92// puts("SIGTERM caught\n");
93 got_sigterm = true;
94}
95
96static void catch_pipe_function(int /*signo*/)
97{
98// puts("SIGPIPE caught\n");
99 got_sigpipe = true;
100}
101
102void clear_triggers(void)
103{
104 triggers.clear();
105 event_number_max = 0;
106 missing_walk_index = 0;
107}
108
109void plot_triggers(void)
110{
111 int hltcount = hltused.size();
112 std::map<int, int> modmissing;
113 hlts.clear();
114 for (auto h : hltused) hlts.push_back(h);
115 if (!triggers.empty()) {
116 ERR_FPRINTF(stderr, "[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
117 *(--triggers.end()),
118 triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
119 int i = 0;
120 for (auto& it : triggers) {
121 int mod = it % hltcount;
122 modmissing[hlts[mod]]++;
123 if (i < 100) {
124 ERR_FPRINTF(stderr, "[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
125 i++;
126 if (i == 100) {
127 ERR_FPRINTF(stderr, "[WARNING] ... too many missing to report\n");
128 i++;
129 }
130 }
131 }
132 } else {
133 ERR_FPRINTF(stderr, "[RESULT] merger_merge: missing triggers 0\n");
134 }
135 for (auto m : modmissing) {
136 ERR_FPRINTF(stderr, "[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
137 }
138}
139
140void check_event_nr(unsigned int event_number)
141{
142 // this code might not detect missing trigger nr 0
143 // it is assumed, that run number change has been checked and handled before
144 if (event_number_max < event_number) {
145 for (uint32_t e = event_number_max + 1; e < event_number; e ++) {
146 triggers.insert(e);
147 }
148 event_number_max = event_number;
149 } else {
150 // we dont fill event_number in the if above, thus we dont have to remove it
151 triggers.erase(event_number);
152 }
153 if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
154 missing_walk_index++;
155 } else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
156 missing_walk_index--;
157 }
158 if (triggers.size() > 50000) {
159 // diable to avoid slow-down by too many in-flight triggers
160 enable_check = false;
161 ERR_FPRINTF(stderr, "[ERROR] Too many in-flight triggers -> disable checking until next run\n");
162 }
163}
164
165int
166b2_timed_blocking_io(const int sd, const int timeout /* secs */)
167{
168 int ret;
169
170 if (timeout > 0) {
171 struct timeval tv;
172
173 tv.tv_sec = timeout;
174 tv.tv_usec = 0;
175 ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
176 if (ret == -1) {
177 ERROR(setsockopt);
178 return -1;
179 }
180
181 tv.tv_sec = timeout;
182 tv.tv_usec = 0;
183 ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
184 if (ret == -1) {
185 ERROR(setsockopt);
186 return -1;
187 }
188 } else if (timeout == 0) {
189 ret = fcntl(sd, F_SETFL, O_NDELAY);
190 if (ret == -1) {
191 ERROR(fcntl);
192 return -1;
193 }
194 } else if (timeout < 0) {
195 ret = fcntl(sd, F_GETFL, O_NDELAY);
196 if (ret == -1) {
197 ERROR(fcntl);
198 return -1;
199 }
200 ret &= ~O_NDELAY;
201 ret = fcntl(sd, F_SETFL, ret);
202 if (ret == -1) {
203 ERROR(fcntl);
204 return -1;
205 }
206 }
207
208 return 0;
209}
210
211
212int
213b2_build_sockaddr_in(const char* hostname, const unsigned short port, struct sockaddr_in* in)
214{
215 memset(in, 0, sizeof(struct sockaddr_in));
216
217 in->sin_family = AF_INET;
218 {
219 struct hostent* hoste;
220 hoste = gethostbyname(hostname);
221 if (!hoste) {
222 ERROR(gethostbyname);
223 return -1;
224 }
225 in->sin_addr = *(struct in_addr*)(hoste->h_addr);
226 }
227 in->sin_port = htons(port);
228
229 return 0;
230}
231
232
233static int
234b2_create_tcp_socket(void)
235{
236 int sd, ret, one = 1;
237
238 sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
239 if (sd == -1) {
240 ERROR(socket);
241 return -1;
242 }
243
244#if 0
245 ret = b2_timed_blocking_io(sd, 0);
246 if (ret == -1) {
247 ERROR(b2_timed_blocking_io);
248 return -1;
249 }
250#endif
251
252 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
253 if (ret == -1) {
254 ERROR(setsockopt);
255 return -1;
256 }
257
258 return sd;
259}
260
261
262int /* returns socket descriptor */
263b2_create_accept_socket(const unsigned short port) /* in reality DOES NOT accept */
264{
265 int sd, ret;
266 struct sockaddr_in in;
267
268 sd = b2_create_tcp_socket();
269 if (sd < 0) {
270 ERROR(b2_create_tcp_socket);
271 return -1;
272 }
273
274 ret = b2_build_sockaddr_in("0.0.0.0", port, &in);
275 if (ret == -1) {
276 ERROR(b2_build_sockaddr_in);
277 return -1;
278 }
279
280 ret = bind(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
281 if (ret == -1) {
282 ERROR(bind);
283 return -1;
284 }
285
286 ret = listen(sd, 1);
287 if (ret == -1) {
288 ERROR(listen);
289 return -1;
290 }
291
292 return sd;
293}
294
295
296int /* returns socket descriptor */
297b2_create_connect_socket(const char* hostname, const unsigned short port)
298{
299 int sd, ret;
300 struct sockaddr_in in;
301
302 sd = b2_create_tcp_socket();
303 if (sd < 0) {
304 ERROR(b2_create_tcp_socket);
305 return -1;
306 }
307
308 ret = b2_build_sockaddr_in(hostname, port, &in);
309 if (ret == -1) {
310 ERROR(b2_build_sockaddr_in);
311 return -1;
312 }
313
314 ret = connect(sd, (const struct sockaddr*)&in, sizeof(struct sockaddr_in));
315 if (ret == -1 && errno != EINPROGRESS) {
316 ERROR(connect);
317 return -1;
318 }
319
320 return sd;
321}
322
323
324int
325b2_send(const int sd, const void* buf, const size_t size)
326{
327 unsigned char* ptr = (unsigned char*)buf;
328 size_t n_bytes_remained = size;
329
330 for (;;) {
331 int ret, n_bytes_send;
332
333 ret = send(sd, ptr, n_bytes_remained, 0);
334 if (ret == -1 && errno != EINTR) {
335 ERROR(send);
336 return -1;
337 }
338 if (ret == -1 && errno == EINTR) {
339 fprintf(stderr, "%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
340 return -1;
341 }
342 if (ret == 0) {
343 fprintf(stderr, "%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
344 return -1;
345 }
346
347 n_bytes_send = ret;
348 ptr += n_bytes_send;
349
350 if (n_bytes_remained < size_t(n_bytes_send))
351 /* overrun: internal error */
352 {
353 fprintf(stderr, "%s:%d: send(): Internal error\n", __FILE__, __LINE__);
354 return -1;
355 }
356 n_bytes_remained -= n_bytes_send;
357
358 if (n_bytes_remained == 0)
359 /* fully sendout */
360 {
361 break;
362 }
363 }
364
365 return size;
366}
367
368
369int
370b2_recv(const int sd, void* buf, const size_t size)
371{
372 unsigned char* ptr = (unsigned char*)buf;
373 size_t n_bytes_remained = size;
374
375 for (;;) {
376 int ret, n_bytes_recv;
377
378 ret = recv(sd, ptr, n_bytes_remained, 0);
379 if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
380 ERROR(recv);
381 return -1;
382 }
383 if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
384 fprintf(stderr, "%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
385 return -1;
386 }
387 if (ret == 0) {
388 fprintf(stderr, "%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
389 return -1;
390 }
391
392 n_bytes_recv = ret;
393 ptr += n_bytes_recv;
394 if (n_bytes_remained < size_t(n_bytes_recv))
395 /* overrun: internal error */
396 {
397 fprintf(stderr, "%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
398 return -1;
399 }
400 n_bytes_remained -= n_bytes_recv;
401
402 if (n_bytes_remained == 0)
403 /* fully readout */
404 {
405 break;
406 }
407 }
408
409 return size;
410}
411
412
413
414static int
415MM_init_connect_to_onsen(const char* host, const unsigned int port)
416{
417 int sd, ret;
418 struct pollfd fds;
419
420 sd = b2_create_connect_socket(host, port);
421 if (sd == -1) {
422 ERROR(b2_create_connect_socket);
423 return -1;
424 }
425
426 fds.fd = sd;
427 fds.events = POLLOUT;
428 fds.revents = 0;
429 ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
430 switch (ret) {
431 case -1:
432 ERROR(poll);
433 return -1;
434
435 case 0:
436 ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
437 return -1;
438
439 case 1: {
440 int connection_error;
441 socklen_t optlen;
442
443 optlen = sizeof(connection_error);
444 ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
445 if (ret == -1) {
446 ERROR(getsockopt);
447 return -1;
448 }
449 if (connection_error) {
450 ERR_FPRINTF(stderr, "[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
451 return -1;
452 }
453
454 break;
455 }
456
457 default:
458 ERR_FPRINTF(stderr, "[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
459 return -1;
460 }
461
462 ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT /* secs */);
463 if (ret == -1) {
464 ERROR(b2_timed_blocking_io);
465 return -1;
466 }
467
468 return sd;
469}
470
471
472static int
473MM_init_accept_from_hltout2merger(const unsigned int port)
474{
475 int sd;
476 int one = 1, ret;
477 // struct pollfd fds;
478
479 LOG_FPRINTF(stderr, "[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
480
481 sd = b2_create_accept_socket(port);
482 if (sd == -1) {
483 ERROR(b2_create_accept_socket);
484 return -1;
485 }
486
487 ret = b2_timed_blocking_io(sd,
488 1); // This means, if the socket blocks longer than Xs, it will return a EAGAIN or EWOULDBLOCK (immediately)
489 if (ret == -1) {
490 ERROR(b2_timed_blocking_io);
491 return -1;
492 }
493
494 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(int));
495 if (ret == -1) {
496 ERROR(setsockopt);
497 return -1;
498 }
499
500 /*
501 fds.fd = sd;
502 fds.events = POLLIN;
503 fds.revents = 0;
504 ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
505 switch (ret) {
506 case -1:
507 ERROR(poll);
508 return -1;
509
510 case 0:
511 ERR_FPRINTF(stderr, "merger_merge: accept(): Connection timed out\n");
512 return -1;
513
514 case 1: {
515 int ret, connection_error;
516 socklen_t optlen;
517
518 optlen = sizeof(connection_error);
519 ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
520 if (ret == -1) {
521 ERROR(getsockopt);
522 return -1;
523 }
524 if (connection_error) {
525 ERR_FPRINTF(stderr, "merger_merge: accept(): %s\n", strerror(errno));
526 return -1;
527 }
528
529 break;
530 }
531
532 default:
533 ERR_FPRINTF(stderr, "merger_merge: poll(): Unexpected error\n");
534 return -1;
535 }
536 */
537
538 /* Skip accept
539 nd = accept(sd, NULL, NULL);
540 if (nd == -1) {
541 ERROR(accept);
542 return -1;
543 }
544
545 close(sd);
546
547 ret = b2_timed_blocking_io(nd, NETWORK_IO_TIMEOUT / * secs * /);
548 if (ret == -1) {
549 ERROR(b2_timed_blocking_io);
550 return -1;
551 }
552 */
553
554 return sd;
555 // return nd;
556}
557
558
559static int
560MM_get_packet(const int sd_acc, unsigned char* buf)
561{
562 unsigned int header[2] = {}; // length is second word, thus read two
563
564 int ret = recv(sd_acc, &header, sizeof(unsigned int) * 2, MSG_PEEK);
565 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
566 ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Packet receive timed out\n");
567 return -1;
568 }
569 if (ret != 2 * sizeof(unsigned int)) {
570 ERR_FPRINTF(stderr, "[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
571 return -2;
572 }
573
576
577 size_t n_bytes_from_hltout = 2 * sizeof(unsigned int) + ntohl(header[1]);// OFFSET_LENGTH = 1
578
579 ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
580 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
581 ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
582 return -1;
583 }
584 if (size_t(ret) != n_bytes_from_hltout) {
585 ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
586 return -2;
587 }
588
589 return ret;
590}
591
592
593static int
594MM_term_connect_to_onsen(const int sd)
595{
596 return close(sd);
597}
598
599void print_stat(void)
600{
601 ERR_FPRINTF(stderr, "[INFO] --- STAT START ---\n");
602 unsigned int sum = 0;
603 for (auto& it : mycount) {
604 ERR_FPRINTF(stderr, "[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
605 if (it.first != 0) sum += it.second;
606 }
607 ERR_FPRINTF(stderr, "[INFO] sum %u out %u diff %d\n", sum, mycount[0], (int)(mycount[0] - sum));
608 plot_triggers();
609 ERR_FPRINTF(stderr, "[INFO] --- STAT END ---\n");
610}
611
612// Main
613
614int
615main(int argc, char* argv[])
616{
617 int current_runnr = -1; // problem: handover without abort
618 // int n_hltout = 0;
619 int sd_acc = -1;
620 int sd_con = -1;
621 int need_reconnection_to_onsen = 1;
622 int event_count = 0;
623 int connected_hlts = 0;
624 bool stop_running = false;
625
626 char onsen_host[1024];
627 unsigned short onsen_port;
628 /* PC test */
629 /* strcpy(onsen_host, "10.10.10.1"); */
630 /* onsen_port = 1024; */
631 /* real ONSEN */
632 /* strcpy(onsen_host, "10.10.10.80"); */
633 /* onsen_port = 24; */
634 // unsigned short accept_port[MM_MAX_HLTOUT];
635 unsigned short accept_port;
636
637 LOG_FPRINTF(stderr, "[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
638
639 if (argc < 4) {
640 ERR_FPRINTF(stderr, "[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
641 exit(1);
642 }
643
644 /* argv copy */
645 char* p;
646
647// p = argv[1];
648// strcpy(shmname, p);
649//
650// p = argv[2];
651// shmid = atoi(p);
652
653 p = argv[3];
654 strcpy(onsen_host, p);
655
656 p = argv[4];
657 onsen_port = atoi(p);
658
659 p = argv[5];
660 accept_port = atoi(p);
661
662 signal(SIGPIPE, catch_pipe_function);
663 signal(SIGTERM, catch_term_function);
664 signal(SIGINT, catch_int_function);
665 signal(SIGUSR1, catch_usr1_function);
666 signal(SIGUSR2, catch_usr2_function);
667
668 /* Create a port to accept connections*/
669 sd_acc = MM_init_accept_from_hltout2merger(accept_port);
670 LOG_FPRINTF(stderr, "[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
671
672
673 /* RoI transmission loop */
674 // RoI packets
675 size_t n_bytes_from_hltout;
676 size_t n_bytes_to_onsen;
677
678 unsigned char* buf = (unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
679 if (!buf) {
680 ERROR(valloc);
681 exit(1);
682 }
683
684 // Loop forever for ONSEN connection
685 bool connected = false;
686 while (!connected) {
687 // Connect to ONSEN if not
688 if (need_reconnection_to_onsen) {
689 /* in case of sd_con is connected, disconnect it */
690 if (sd_con != -1) close(sd_con);
691
692 /* connect to onsen untill connected */
693 for (;;) {
694 int sleep_sec = 2;
695 sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
696 // sd_con = 6;
697 if (sd_con != -1) {
698 /* connected: move to the recv->send loop */
699 need_reconnection_to_onsen = 0;
700 event_count = 0;
701 LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
702 connected = true;
703 break;
704 }
705
706 ERR_FPRINTF(stderr, "[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
707 sleep(sleep_sec);
708
709 /* retry connection */
710 continue;
711 }
712 }
713 }
714
715 // Preparation for select()
716
717 // printf("Starting select() loop\n") ;
718 fflush(stderr);
719 fflush(stdout);
720
721 clear_triggers();
722
723 fd_set allset;
724 FD_ZERO(&allset);
725 FD_SET(sd_acc, &allset);
726 int maxfd = sd_acc;
727 int minfd = sd_acc;
728 fd_set rset;//, wset;
729
730 // enable the checking of missing triggers
731 enable_check = true;
732 // Handle Obtain ROI and send it to ONSEN
733 while (!stop_running) {
734 memcpy(&rset, &allset, sizeof(rset));
735 // memcpy(&wset, &allset, sizeof(wset));
736
737 // struct timeval timeout;
738 // timeout.tv_sec = 0; // 1sec
739 // timeout.tv_usec = 1000; // 1msec (in microsec)
740 // printf ( "Select(): maxfd = %d, start select...; rset=%x, wset=%x\n", maxfd, rset, wset);
741 int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
742 // printf ( "Select(): returned with %d, rset = %8.8x\n", rc, rset );
743 if (got_sigusr1) {
744 got_sigusr1 = false;
745 ERR_FPRINTF(stderr, "[INFO] Got SIGUSR1, Run START\n");
746 print_stat();
747 }
748 if (got_sigusr2) {
749 got_sigusr2 = false;
750 ERR_FPRINTF(stderr, "[INFO] Got SIGUSR2, Run STOP\n");
751 print_stat();
752 }
753 if (got_sigint) {
754 got_sigint = false;
755 ERR_FPRINTF(stderr, "[INFO] Got SIGINT, ABORT\n");
756 print_stat();
757 }
758 if (got_sigpipe) {
759 got_sigpipe = false;
760 ERR_FPRINTF(stderr, "[INFO] Got SIGPIPE\n");
761 }
762 if (got_sigterm) {
763 got_sigterm = false;
764 ERR_FPRINTF(stderr, "[INFO] Got SIGTERM\n");
765 }
766 if (rc < 0) {
767 perror("select");
768 continue;
769 } else if (rc == 0) { // timeout
770 continue;
771 }
772
773 if (FD_ISSET(sd_acc, &rset)) { // new connection
774 int t;
775 struct sockaddr_in isa;
776 socklen_t i = sizeof(isa);
777 getsockname(sd_acc, (struct sockaddr*)&isa, &i);
778 if ((t =::accept(sd_acc, (struct sockaddr*)&isa, &i)) < 0) {
779 // m_errno = errno;
780 ERR_FPRINTF(stderr, "[ERROR] Error on accepting new connection\n");
781 ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
782 return (-1);
783 }
784 LOG_FPRINTF(stderr, "[INFO] New socket connection t=%d\n", t);
785 char address[INET_ADDRSTRLEN];
786 inet_ntop(AF_INET, &isa.sin_addr, address, sizeof(address));
787 connected_hlts++;
788
789 LOG_FPRINTF(stderr, "[INFO] %d is IP <%s>\n", t, address);
790 myconn[t] = address;
791 // Assume IP4 and take last decimal as HLT number
792 char* ptr = strrchr(address, '.');
793 if (ptr) {
794 int nr = atoi(ptr + 1);
795 hltused.insert(nr);
796 }
797
798 fflush(stdout);
799 FD_SET(t, &allset);
800 if (minfd == sd_acc) minfd = t;
801 if (t > maxfd) maxfd = t;
802 continue;
803 } else {
804 for (int fd = minfd; fd < maxfd + 1; fd++) {
805 n_bytes_from_hltout = 0;
806 if (FD_ISSET(fd, &rset)) {
807 // printf ( "fd is available for reading = %d\n", fd );
808 /* recv packet */
809 int ret;
810 ret = MM_get_packet(fd, buf);
811 if (ret < 0) { // -2 will not have a errno set
812 if (ret == -1) {
813 ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
814 } else {
815 ERR_FPRINTF(stderr, "[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
816 }
817 ERR_FPRINTF(stderr, "[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
818 myconn[fd].c_str());
819 /* connection from HLT is lost */
820 // using exit here will dump ALL still open connections from other HLTs
821 // rethink if we dont exit but process these to limit loss of events.
822 // ERR_FPRINTF(stderr, "%s terminated\n", argv[0]);
823 // exit(1);
824 close(fd);
825 FD_CLR(fd, &allset); //opposite of FD_SET
826 connected_hlts--;
827 if (connected_hlts == 0) stop_running = true;
828 ret = 0;
829 }
830 n_bytes_from_hltout = ret;
831
832 // printf ( "RoI received : Event count = % d\n", event_count );
833 if (ret > 0) {
834 mycount[fd]++;
835 if (0 /*event_count < 5 || event_count % 100000 == 0*/) {
836 LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
837 LOG_FPRINTF(stderr, "[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
838 dump_binary(stderr, buf, n_bytes_from_hltout);
839 }
840 }
841 }
842
843 /* send packet */
844 if (n_bytes_from_hltout > 0) {
845 int ret;
846 unsigned char* ptr_head_to_onsen = buf;
847
848 // extract trigger number, run+exp nr and fill it in a table.
849
850 int runnr = 0;
851 int eventnr = 0;
852
853 // From ROIpayload.h
854 // enum { OFFSET_MAGIC = 0, OFFSET_LENGTH = 1, OFFSET_HEADER = 2, OFFSET_TRIGNR = 3, OFFSET_RUNNR = 4, OFFSET_ROIS = 5};
855 // enum { HEADER_SIZE_WO_LENGTH = 3, HEADER_SIZE_WITH_LENGTH = 5, HEADER_SIZE_WITH_LENGTH_AND_CRC = 6};
856
857 if (n_bytes_from_hltout >= 6 * 4) {
858 auto* iptr = (boost::endian::big_uint32_t*)ptr_head_to_onsen;
859 eventnr = iptr[3];
860 runnr = (iptr[4] & 0x3FFF00) >> 8;
861// ERR_FPRINTF(stderr, "%08X %08X %08X %08X %08X -> %08X %08X \n",
862// (unsigned int)iptr[0],(unsigned int) iptr[1],(unsigned int) iptr[2],(unsigned int) iptr[3],(unsigned int) iptr[4], eventnr, runnr);
863 } else {
864 LOG_FPRINTF(stderr, "[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
865 }
866
867 if (runnr > current_runnr) {
868 ERR_FPRINTF(stderr, "[WARNING] merger_merge: run number increases: got %d current %d trig %d\n", runnr, current_runnr,
869 eventnr);
870 print_stat();
871 clear_triggers();
872 current_runnr = runnr;
873 enable_check = true;
874 } else if (runnr < current_runnr) {
875 // got some event from old run
876 ERR_FPRINTF(stderr, "[WARNING] merger_merge: got trigger from older run: got %d current %d trig %d\n", runnr, current_runnr,
877 eventnr);
878 }
879
880 if (runnr == current_runnr) {
881 // seperate if, as we might set it in the if above
882 if (enable_check) check_event_nr(eventnr);
883 } // if we end the if here, we will send out old events to ONSEN!
884
885 n_bytes_to_onsen = n_bytes_from_hltout;
886 while (1) {
887 ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
888 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
889 ERR_FPRINTF(stderr, "[WARNING] merger_merge: socket buffer full, retry\n");
890 sleep(1);// Bad hack, wait a second
891 } else break;
892 }
893
894 if (ret == -1) {
895 ERROR(b2_send);
896 need_reconnection_to_onsen = 1;
897 event_count = 0;
898 ERR_FPRINTF(stderr, "[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
899 free(buf);
900 ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
901 print_stat();
902 exit(1);
903 }
904 if (ret == 0) {
905 ERR_FPRINTF(stderr, "[ERROR] merger_merge: b2_send(): Connection closed\n");
906 need_reconnection_to_onsen = 1;
907 event_count = 0;
908 free(buf);
909 ERR_FPRINTF(stderr, "[ERROR] Connection to ONSEN was closed on ONSEN side\n");
910 ERR_FPRINTF(stderr, "[ERROR] %s terminated\n", argv[0]);
911 print_stat();
912 exit(1);
913 }
914
915 mycount[0]++;
916 if (0 /*event_count < 5 || event_count % 10000 == 0*/) {
917 LOG_FPRINTF(stderr, "[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
918 dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
919 }
920 // } // if we end if here, we will NOT send old events to onsen, but only after we received the first new event
921 }
922 }
923 event_count++;
924 if (event_count % 10000 == 0) {
925 int hltcount = hltused.size();
926 if (triggers.empty()) {
927 // workaround for empty vector, but keep same structure for monitor parsing (kibana)
928 ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
929 event_number_max, event_number_max, missing_walk_index, triggers.size(),
930 0, event_number_max, -1, -1);
931 } else {
932 int mod = *triggers.begin() % hltcount;
933 ERR_FPRINTF(stderr, "[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
934 *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
935 *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
936 }
937 }
938 }
939 }
940
941
942 /* termination: never reached */
943 MM_term_connect_to_onsen(sd_con);
944
945 if (connected_hlts == 0) {
946 ERR_FPRINTF(stderr, "[RESULT] Stopped because all HLTs closed connection\n");
947 }
948 print_stat();
949 ERR_FPRINTF(stderr, "[RESULT] %s terminated\n", argv[0]);
950 return 0;
951}
STL namespace.