17#include <netinet/in.h>
27#define ROI_MAX_PACKET_SIZE (16384)
29#define NETWORK_ESTABLISH_TIMEOUT (-1)
30#define NETWORK_IO_TIMEOUT (-1)
32#include <boost/endian/arithmetic.hpp>
37#define LOG_FPRINTF (fprintf)
38#define ERR_FPRINTF (fprintf)
39#define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
41std::map<int, std::string> myconn;
42std::map<int, int> fd_to_hlt;
43std::map<int, unsigned int> mycount;
46std::map<int, unsigned int> hlt_min;
47std::map<int, unsigned int> hlt_max;
49std::set<int> triggers;
50unsigned int event_number_max = 0;
51unsigned int missing_walk_index = 0;
52bool enable_check =
true;
54bool got_sigusr1 =
false;
55bool got_sigusr2 =
false;
56bool got_sigint =
false;
57bool got_sigpipe =
false;
58bool got_sigterm =
false;
62dump_binary(FILE* fp,
const void* ptr,
const size_t size)
64 const unsigned int* p = (
const unsigned int*)ptr;
65 const size_t _size = size /
sizeof(
unsigned int);
68 for (
size_t i = 0; i < _size; i++) {
69 fprintf(fp,
"%08x ", p[i]);
70 if (i % 8 == 7) fprintf(fp,
"\n");
72 if (_size % 8 != 0) fprintf(fp,
"\n");
76static void catch_usr1_function(
int )
82static void catch_usr2_function(
int )
88static void catch_int_function(
int )
94static void catch_term_function(
int )
100static void catch_pipe_function(
int )
106void clear_triggers(
void)
109 event_number_max = 0;
110 missing_walk_index = 0;
113void plot_triggers(
void)
115 int hltcount = hltused.size();
116 std::map<int, int> modmissing;
118 for (
auto h : hltused) hlts.push_back(h);
119 if (!triggers.empty()) {
120 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
122 triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
124 for (
auto& it : triggers) {
125 int mod = it % hltcount;
126 modmissing[hlts[mod]]++;
128 ERR_FPRINTF(stderr,
"[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
131 ERR_FPRINTF(stderr,
"[WARNING] ... too many missing to report\n");
137 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: missing triggers 0\n");
139 for (
auto m : modmissing) {
140 ERR_FPRINTF(stderr,
"[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
144void check_event_nr(
unsigned int event_number)
148 if (event_number_max < event_number) {
149 for (uint32_t e = event_number_max + 1; e < event_number; e ++) {
152 event_number_max = event_number;
155 triggers.erase(event_number);
157 if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
158 missing_walk_index++;
159 }
else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
160 missing_walk_index--;
162 if (triggers.size() > 50000) {
164 enable_check =
false;
165 ERR_FPRINTF(stderr,
"[ERROR] Too many in-flight triggers -> disable checking until next run\n");
170b2_timed_blocking_io(
const int sd,
const int timeout )
179 ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv,
sizeof(
struct timeval));
187 ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv,
sizeof(
struct timeval));
192 }
else if (timeout == 0) {
193 ret = fcntl(sd, F_SETFL, O_NDELAY);
198 }
else if (timeout < 0) {
199 ret = fcntl(sd, F_GETFL, O_NDELAY);
205 ret = fcntl(sd, F_SETFL, ret);
217b2_build_sockaddr_in(
const char* hostname,
const unsigned short port,
struct sockaddr_in* in)
219 memset(in, 0,
sizeof(
struct sockaddr_in));
221 in->sin_family = AF_INET;
223 struct hostent* hoste;
224 hoste = gethostbyname(hostname);
226 ERROR(gethostbyname);
229 in->sin_addr = *(
struct in_addr*)(hoste->h_addr);
231 in->sin_port = htons(port);
238b2_create_tcp_socket(
void)
240 int sd, ret, one = 1;
242 sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
249 ret = b2_timed_blocking_io(sd, 0);
251 ERROR(b2_timed_blocking_io);
256 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
267b2_create_accept_socket(
const unsigned short port)
270 struct sockaddr_in in;
272 sd = b2_create_tcp_socket();
274 ERROR(b2_create_tcp_socket);
278 ret = b2_build_sockaddr_in(
"0.0.0.0", port, &in);
280 ERROR(b2_build_sockaddr_in);
284 ret = bind(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
301b2_create_connect_socket(
const char* hostname,
const unsigned short port)
304 struct sockaddr_in in;
306 sd = b2_create_tcp_socket();
308 ERROR(b2_create_tcp_socket);
312 ret = b2_build_sockaddr_in(hostname, port, &in);
314 ERROR(b2_build_sockaddr_in);
318 ret = connect(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
319 if (ret == -1 && errno != EINPROGRESS) {
329b2_send(
const int sd,
const void* buf,
const size_t size)
331 unsigned char* ptr = (
unsigned char*)buf;
332 size_t n_bytes_remained = size;
335 int ret, n_bytes_send;
337 ret = send(sd, ptr, n_bytes_remained, 0);
338 if (ret == -1 && errno != EINTR) {
342 if (ret == -1 && errno == EINTR) {
343 fprintf(stderr,
"%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
347 fprintf(stderr,
"%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
354 if (n_bytes_remained <
size_t(n_bytes_send))
357 fprintf(stderr,
"%s:%d: send(): Internal error\n", __FILE__, __LINE__);
360 n_bytes_remained -= n_bytes_send;
362 if (n_bytes_remained == 0)
374b2_recv(
const int sd,
void* buf,
const size_t size)
376 unsigned char* ptr = (
unsigned char*)buf;
377 size_t n_bytes_remained = size;
380 int ret, n_bytes_recv;
382 ret = recv(sd, ptr, n_bytes_remained, 0);
383 if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
387 if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
388 fprintf(stderr,
"%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
392 fprintf(stderr,
"%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
398 if (n_bytes_remained <
size_t(n_bytes_recv))
401 fprintf(stderr,
"%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
404 n_bytes_remained -= n_bytes_recv;
406 if (n_bytes_remained == 0)
419MM_init_connect_to_onsen(
const char* host,
const unsigned int port)
424 sd = b2_create_connect_socket(host, port);
426 ERROR(b2_create_connect_socket);
431 fds.events = POLLOUT;
433 ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
440 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
444 int connection_error;
447 optlen =
sizeof(connection_error);
448 ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
453 if (connection_error) {
454 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
462 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
466 ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT );
468 ERROR(b2_timed_blocking_io);
477MM_init_accept_from_hltout2merger(
const unsigned int port)
483 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
485 sd = b2_create_accept_socket(port);
487 ERROR(b2_create_accept_socket);
491 ret = b2_timed_blocking_io(sd,
494 ERROR(b2_timed_blocking_io);
498 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
564MM_get_packet(
const int sd_acc,
unsigned char* buf)
566 unsigned int header[2] = {};
568 int ret = recv(sd_acc, &header,
sizeof(
unsigned int) * 2, MSG_PEEK);
569 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
570 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Packet receive timed out\n");
573 if (ret != 2 *
sizeof(
unsigned int)) {
574 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
581 size_t n_bytes_from_hltout = 2 *
sizeof(
unsigned int) + ntohl(header[1]);
583 ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
584 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
585 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
588 if (
size_t(ret) != n_bytes_from_hltout) {
589 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
598MM_term_connect_to_onsen(
const int sd)
605 ERR_FPRINTF(stderr,
"[INFO] --- STAT START ---\n");
606 unsigned int sum = 0;
607 for (
auto& it : mycount) {
608 ERR_FPRINTF(stderr,
"[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
609 if (it.first != 0) sum += it.second;
611 ERR_FPRINTF(stderr,
"[INFO] sum %u out %u diff %d\n", sum, mycount[0], (
int)(mycount[0] - sum));
613 ERR_FPRINTF(stderr,
"[INFO] --- STAT END ---\n");
619main(
int argc,
char* argv[])
621 int current_runnr = -1;
625 int need_reconnection_to_onsen = 1;
627 int connected_hlts = 0;
628 bool stop_running =
false;
630 char onsen_host[1024];
631 unsigned short onsen_port;
639 unsigned short accept_port;
641 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
644 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
658 strcpy(onsen_host, p);
661 onsen_port = atoi(p);
664 accept_port = atoi(p);
666 signal(SIGPIPE, catch_pipe_function);
667 signal(SIGTERM, catch_term_function);
668 signal(SIGINT, catch_int_function);
669 signal(SIGUSR1, catch_usr1_function);
670 signal(SIGUSR2, catch_usr2_function);
673 sd_acc = MM_init_accept_from_hltout2merger(accept_port);
674 LOG_FPRINTF(stderr,
"[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
679 size_t n_bytes_from_hltout;
680 size_t n_bytes_to_onsen;
682 unsigned char* buf = (
unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
689 bool connected =
false;
692 if (need_reconnection_to_onsen) {
694 if (sd_con != -1) close(sd_con);
699 sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
703 need_reconnection_to_onsen = 0;
705 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
710 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
729 FD_SET(sd_acc, &allset);
737 while (!stop_running) {
738 memcpy(&rset, &allset,
sizeof(rset));
745 int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
749 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR1, Run START\n");
756 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR2, Run STOP\n");
761 ERR_FPRINTF(stderr,
"[INFO] Got SIGINT, ABORT\n");
766 ERR_FPRINTF(stderr,
"[INFO] Got SIGPIPE\n");
770 ERR_FPRINTF(stderr,
"[INFO] Got SIGTERM\n");
775 }
else if (rc == 0) {
779 if (FD_ISSET(sd_acc, &rset)) {
781 struct sockaddr_in isa;
782 socklen_t i =
sizeof(isa);
783 getsockname(sd_acc, (
struct sockaddr*)&isa, &i);
784 if ((t =::accept(sd_acc, (
struct sockaddr*)&isa, &i)) < 0) {
786 ERR_FPRINTF(stderr,
"[ERROR] Error on accepting new connection\n");
787 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
790 LOG_FPRINTF(stderr,
"[INFO] New socket connection t=%d\n", t);
791 char address[INET_ADDRSTRLEN];
792 inet_ntop(AF_INET, &isa.sin_addr, address,
sizeof(address));
795 LOG_FPRINTF(stderr,
"[INFO] %d is IP <%s>\n", t, address);
798 char* ptr = strrchr(address,
'.');
800 int nr = atoi(ptr + 1);
807 if (minfd == sd_acc) minfd = t;
808 if (t > maxfd) maxfd = t;
811 for (
int fd = minfd; fd < maxfd + 1; fd++) {
812 n_bytes_from_hltout = 0;
813 if (FD_ISSET(fd, &rset)) {
817 ret = MM_get_packet(fd, buf);
820 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
822 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
824 ERR_FPRINTF(stderr,
"[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
834 if (connected_hlts == 0) stop_running =
true;
837 n_bytes_from_hltout = ret;
843 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
844 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
845 dump_binary(stderr, buf, n_bytes_from_hltout);
851 if (n_bytes_from_hltout > 0) {
853 unsigned char* ptr_head_to_onsen = buf;
858 unsigned int eventnr = 0;
864 if (n_bytes_from_hltout >= 6 * 4) {
865 auto* iptr = (boost::endian::big_uint32_t*)ptr_head_to_onsen;
867 runnr = (iptr[4] & 0x3FFF00) >> 8;
871 LOG_FPRINTF(stderr,
"[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
874 if (runnr > current_runnr) {
875 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: run number increases: got %d current %d trig %u\n", runnr, current_runnr,
879 current_runnr = runnr;
881 }
else if (runnr < current_runnr) {
883 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: got trigger from older run: got %d current %d trig %u\n", runnr, current_runnr,
887 if (runnr == current_runnr) {
889 if (enable_check) check_event_nr(eventnr);
890 if (hlt_min[fd] == 0 or hlt_min[fd] > eventnr) hlt_min[fd] = eventnr;
891 if (hlt_max[fd] < eventnr) hlt_max[fd] = eventnr;
894 n_bytes_to_onsen = n_bytes_from_hltout;
896 ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
897 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
898 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: socket buffer full, retry\n");
905 need_reconnection_to_onsen = 1;
907 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
909 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
914 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_send(): Connection closed\n");
915 need_reconnection_to_onsen = 1;
918 ERR_FPRINTF(stderr,
"[ERROR] Connection to ONSEN was closed on ONSEN side\n");
919 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
926 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
927 dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
933 if (event_count % 10000 == 0) {
934 int hltcount = hltused.size();
935 if (triggers.empty()) {
937 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
938 event_number_max, event_number_max, missing_walk_index, triggers.size(),
939 0, event_number_max, -1, -1);
941 int mod = *triggers.begin() % hltcount;
942 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
943 *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
944 *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
947 auto minIt = std::ranges::min_element(hlt_min.begin(), hlt_min.end(),
948 [](
const auto & a,
const auto & b) {
949 return a.second < b.second;
953 auto maxIt = std::ranges::max_element(hlt_max.begin(), hlt_max.end(),
954 [](
const auto & a,
const auto & b) {
955 return a.second < b.second;
958 ERR_FPRINTF(stderr,
"[INFO] ALL, %u, %u, %u\n", minIt->second, maxIt->second, maxIt->second - minIt->second);
959 for (
auto& h : hlt_max) ERR_FPRINTF(stderr,
"[INFO] HLT%u, %u, %u, %u\n", fd_to_hlt[h.first], hlt_min[h.first], h.second,
960 h.second - hlt_min[h.first]);
969 MM_term_connect_to_onsen(sd_con);
971 if (connected_hlts == 0) {
972 ERR_FPRINTF(stderr,
"[RESULT] Stopped because all HLTs closed connection\n");
975 ERR_FPRINTF(stderr,
"[RESULT] %s terminated\n", argv[0]);