17#include <netinet/in.h>
26#define ROI_MAX_PACKET_SIZE (16384)
28#define NETWORK_ESTABLISH_TIMEOUT (-1)
29#define NETWORK_IO_TIMEOUT (-1)
31#include <boost/endian/arithmetic.hpp>
36#define LOG_FPRINTF (fprintf)
37#define ERR_FPRINTF (fprintf)
38#define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
40std::map<int, std::string> myconn;
41std::map<int, unsigned int> mycount;
45std::set<int> triggers;
46unsigned int event_number_max = 0;
47unsigned int missing_walk_index = 0;
48bool enable_check =
true;
50bool got_sigusr1 =
false;
51bool got_sigusr2 =
false;
52bool got_sigint =
false;
53bool got_sigpipe =
false;
54bool got_sigterm =
false;
58dump_binary(FILE* fp,
const void* ptr,
const size_t size)
60 const unsigned int* p = (
const unsigned int*)ptr;
61 const size_t _size = size /
sizeof(
unsigned int);
64 for (
size_t i = 0; i < _size; i++) {
65 fprintf(fp,
"%08x ", p[i]);
66 if (i % 8 == 7) fprintf(fp,
"\n");
68 if (_size % 8 != 0) fprintf(fp,
"\n");
72static void catch_usr1_function(
int )
78static void catch_usr2_function(
int )
84static void catch_int_function(
int )
90static void catch_term_function(
int )
96static void catch_pipe_function(
int )
102void clear_triggers(
void)
105 event_number_max = 0;
106 missing_walk_index = 0;
109void plot_triggers(
void)
111 int hltcount = hltused.size();
112 std::map<int, int> modmissing;
114 for (
auto h : hltused) hlts.push_back(h);
115 if (!triggers.empty()) {
116 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
118 triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
120 for (
auto& it : triggers) {
121 int mod = it % hltcount;
122 modmissing[hlts[mod]]++;
124 ERR_FPRINTF(stderr,
"[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
127 ERR_FPRINTF(stderr,
"[WARNING] ... too many missing to report\n");
133 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: missing triggers 0\n");
135 for (
auto m : modmissing) {
136 ERR_FPRINTF(stderr,
"[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
140void check_event_nr(
unsigned int event_number)
144 if (event_number_max < event_number) {
145 for (uint32_t e = event_number_max + 1; e < event_number; e ++) {
148 event_number_max = event_number;
151 triggers.erase(event_number);
153 if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
154 missing_walk_index++;
155 }
else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
156 missing_walk_index--;
158 if (triggers.size() > 50000) {
160 enable_check =
false;
161 ERR_FPRINTF(stderr,
"[ERROR] Too many in-flight triggers -> disable checking until next run\n");
166b2_timed_blocking_io(
const int sd,
const int timeout )
175 ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv,
sizeof(
struct timeval));
183 ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv,
sizeof(
struct timeval));
188 }
else if (timeout == 0) {
189 ret = fcntl(sd, F_SETFL, O_NDELAY);
194 }
else if (timeout < 0) {
195 ret = fcntl(sd, F_GETFL, O_NDELAY);
201 ret = fcntl(sd, F_SETFL, ret);
213b2_build_sockaddr_in(
const char* hostname,
const unsigned short port,
struct sockaddr_in* in)
215 memset(in, 0,
sizeof(
struct sockaddr_in));
217 in->sin_family = AF_INET;
219 struct hostent* hoste;
220 hoste = gethostbyname(hostname);
222 ERROR(gethostbyname);
225 in->sin_addr = *(
struct in_addr*)(hoste->h_addr);
227 in->sin_port = htons(port);
234b2_create_tcp_socket(
void)
236 int sd, ret, one = 1;
238 sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
245 ret = b2_timed_blocking_io(sd, 0);
247 ERROR(b2_timed_blocking_io);
252 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
263b2_create_accept_socket(
const unsigned short port)
266 struct sockaddr_in in;
268 sd = b2_create_tcp_socket();
270 ERROR(b2_create_tcp_socket);
274 ret = b2_build_sockaddr_in(
"0.0.0.0", port, &in);
276 ERROR(b2_build_sockaddr_in);
280 ret = bind(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
297b2_create_connect_socket(
const char* hostname,
const unsigned short port)
300 struct sockaddr_in in;
302 sd = b2_create_tcp_socket();
304 ERROR(b2_create_tcp_socket);
308 ret = b2_build_sockaddr_in(hostname, port, &in);
310 ERROR(b2_build_sockaddr_in);
314 ret = connect(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
315 if (ret == -1 && errno != EINPROGRESS) {
325b2_send(
const int sd,
const void* buf,
const size_t size)
327 unsigned char* ptr = (
unsigned char*)buf;
328 size_t n_bytes_remained = size;
331 int ret, n_bytes_send;
333 ret = send(sd, ptr, n_bytes_remained, 0);
334 if (ret == -1 && errno != EINTR) {
338 if (ret == -1 && errno == EINTR) {
339 fprintf(stderr,
"%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
343 fprintf(stderr,
"%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
350 if (n_bytes_remained <
size_t(n_bytes_send))
353 fprintf(stderr,
"%s:%d: send(): Internal error\n", __FILE__, __LINE__);
356 n_bytes_remained -= n_bytes_send;
358 if (n_bytes_remained == 0)
370b2_recv(
const int sd,
void* buf,
const size_t size)
372 unsigned char* ptr = (
unsigned char*)buf;
373 size_t n_bytes_remained = size;
376 int ret, n_bytes_recv;
378 ret = recv(sd, ptr, n_bytes_remained, 0);
379 if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
383 if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
384 fprintf(stderr,
"%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
388 fprintf(stderr,
"%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
394 if (n_bytes_remained <
size_t(n_bytes_recv))
397 fprintf(stderr,
"%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
400 n_bytes_remained -= n_bytes_recv;
402 if (n_bytes_remained == 0)
415MM_init_connect_to_onsen(
const char* host,
const unsigned int port)
420 sd = b2_create_connect_socket(host, port);
422 ERROR(b2_create_connect_socket);
427 fds.events = POLLOUT;
429 ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
436 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
440 int connection_error;
443 optlen =
sizeof(connection_error);
444 ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
449 if (connection_error) {
450 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
458 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
462 ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT );
464 ERROR(b2_timed_blocking_io);
473MM_init_accept_from_hltout2merger(
const unsigned int port)
479 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
481 sd = b2_create_accept_socket(port);
483 ERROR(b2_create_accept_socket);
487 ret = b2_timed_blocking_io(sd,
490 ERROR(b2_timed_blocking_io);
494 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
560MM_get_packet(
const int sd_acc,
unsigned char* buf)
562 unsigned int header[2] = {};
564 int ret = recv(sd_acc, &header,
sizeof(
unsigned int) * 2, MSG_PEEK);
565 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
566 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Packet receive timed out\n");
569 if (ret != 2 *
sizeof(
unsigned int)) {
570 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
577 size_t n_bytes_from_hltout = 2 *
sizeof(
unsigned int) + ntohl(header[1]);
579 ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
580 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
581 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
584 if (
size_t(ret) != n_bytes_from_hltout) {
585 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
594MM_term_connect_to_onsen(
const int sd)
601 ERR_FPRINTF(stderr,
"[INFO] --- STAT START ---\n");
602 unsigned int sum = 0;
603 for (
auto& it : mycount) {
604 ERR_FPRINTF(stderr,
"[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
605 if (it.first != 0) sum += it.second;
607 ERR_FPRINTF(stderr,
"[INFO] sum %u out %u diff %d\n", sum, mycount[0], (
int)(mycount[0] - sum));
609 ERR_FPRINTF(stderr,
"[INFO] --- STAT END ---\n");
615main(
int argc,
char* argv[])
617 int current_runnr = -1;
621 int need_reconnection_to_onsen = 1;
623 int connected_hlts = 0;
624 bool stop_running =
false;
626 char onsen_host[1024];
627 unsigned short onsen_port;
635 unsigned short accept_port;
637 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
640 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
654 strcpy(onsen_host, p);
657 onsen_port = atoi(p);
660 accept_port = atoi(p);
662 signal(SIGPIPE, catch_pipe_function);
663 signal(SIGTERM, catch_term_function);
664 signal(SIGINT, catch_int_function);
665 signal(SIGUSR1, catch_usr1_function);
666 signal(SIGUSR2, catch_usr2_function);
669 sd_acc = MM_init_accept_from_hltout2merger(accept_port);
670 LOG_FPRINTF(stderr,
"[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
675 size_t n_bytes_from_hltout;
676 size_t n_bytes_to_onsen;
678 unsigned char* buf = (
unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
685 bool connected =
false;
688 if (need_reconnection_to_onsen) {
690 if (sd_con != -1) close(sd_con);
695 sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
699 need_reconnection_to_onsen = 0;
701 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
706 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
725 FD_SET(sd_acc, &allset);
733 while (!stop_running) {
734 memcpy(&rset, &allset,
sizeof(rset));
741 int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
745 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR1, Run START\n");
750 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR2, Run STOP\n");
755 ERR_FPRINTF(stderr,
"[INFO] Got SIGINT, ABORT\n");
760 ERR_FPRINTF(stderr,
"[INFO] Got SIGPIPE\n");
764 ERR_FPRINTF(stderr,
"[INFO] Got SIGTERM\n");
769 }
else if (rc == 0) {
773 if (FD_ISSET(sd_acc, &rset)) {
775 struct sockaddr_in isa;
776 socklen_t i =
sizeof(isa);
777 getsockname(sd_acc, (
struct sockaddr*)&isa, &i);
778 if ((t =::accept(sd_acc, (
struct sockaddr*)&isa, &i)) < 0) {
780 ERR_FPRINTF(stderr,
"[ERROR] Error on accepting new connection\n");
781 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
784 LOG_FPRINTF(stderr,
"[INFO] New socket connection t=%d\n", t);
785 char address[INET_ADDRSTRLEN];
786 inet_ntop(AF_INET, &isa.sin_addr, address,
sizeof(address));
789 LOG_FPRINTF(stderr,
"[INFO] %d is IP <%s>\n", t, address);
792 char* ptr = strrchr(address,
'.');
794 int nr = atoi(ptr + 1);
800 if (minfd == sd_acc) minfd = t;
801 if (t > maxfd) maxfd = t;
804 for (
int fd = minfd; fd < maxfd + 1; fd++) {
805 n_bytes_from_hltout = 0;
806 if (FD_ISSET(fd, &rset)) {
810 ret = MM_get_packet(fd, buf);
813 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
815 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
817 ERR_FPRINTF(stderr,
"[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
827 if (connected_hlts == 0) stop_running =
true;
830 n_bytes_from_hltout = ret;
836 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
837 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
838 dump_binary(stderr, buf, n_bytes_from_hltout);
844 if (n_bytes_from_hltout > 0) {
846 unsigned char* ptr_head_to_onsen = buf;
857 if (n_bytes_from_hltout >= 6 * 4) {
858 auto* iptr = (boost::endian::big_uint32_t*)ptr_head_to_onsen;
860 runnr = (iptr[4] & 0x3FFF00) >> 8;
864 LOG_FPRINTF(stderr,
"[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
867 if (runnr > current_runnr) {
868 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: run number increases: got %d current %d trig %d\n", runnr, current_runnr,
872 current_runnr = runnr;
874 }
else if (runnr < current_runnr) {
876 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: got trigger from older run: got %d current %d trig %d\n", runnr, current_runnr,
880 if (runnr == current_runnr) {
882 if (enable_check) check_event_nr(eventnr);
885 n_bytes_to_onsen = n_bytes_from_hltout;
887 ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
888 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
889 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: socket buffer full, retry\n");
896 need_reconnection_to_onsen = 1;
898 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
900 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
905 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_send(): Connection closed\n");
906 need_reconnection_to_onsen = 1;
909 ERR_FPRINTF(stderr,
"[ERROR] Connection to ONSEN was closed on ONSEN side\n");
910 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
917 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
918 dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
924 if (event_count % 10000 == 0) {
925 int hltcount = hltused.size();
926 if (triggers.empty()) {
928 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
929 event_number_max, event_number_max, missing_walk_index, triggers.size(),
930 0, event_number_max, -1, -1);
932 int mod = *triggers.begin() % hltcount;
933 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
934 *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
935 *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
943 MM_term_connect_to_onsen(sd_con);
945 if (connected_hlts == 0) {
946 ERR_FPRINTF(stderr,
"[RESULT] Stopped because all HLTs closed connection\n");
949 ERR_FPRINTF(stderr,
"[RESULT] %s terminated\n", argv[0]);