17 #include <netinet/in.h>
21 #include <arpa/inet.h>
26 #define ROI_MAX_PACKET_SIZE (16384)
28 #define NETWORK_ESTABLISH_TIMEOUT (-1)
29 #define NETWORK_IO_TIMEOUT (-1)
31 #include <boost/endian/arithmetic.hpp>
36 #define LOG_FPRINTF (fprintf)
37 #define ERR_FPRINTF (fprintf)
38 #define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
40 std::map<int, std::string> myconn;
41 std::map<int, unsigned int> mycount;
42 std::set<int> hltused;
43 std::vector<int> hlts;
45 std::set<int> triggers;
46 unsigned int event_number_max = 0;
47 unsigned int missing_walk_index = 0;
48 bool enable_check =
true;
50 bool got_sigusr1 =
false;
51 bool got_sigusr2 =
false;
52 bool got_sigint =
false;
53 bool got_sigpipe =
false;
54 bool got_sigterm =
false;
57 dump_binary(FILE* fp,
const void* ptr,
const size_t size)
59 const unsigned int* p = (
const unsigned int*)ptr;
60 const size_t _size = size /
sizeof(
unsigned int);
63 for (
size_t i = 0; i < _size; i++) {
64 fprintf(fp,
"%08x ", p[i]);
65 if (i % 8 == 7) fprintf(fp,
"\n");
67 if (_size % 8 != 0) fprintf(fp,
"\n");
71 static void catch_usr1_function(
int )
77 static void catch_usr2_function(
int )
83 static void catch_int_function(
int )
89 static void catch_term_function(
int )
95 static void catch_pipe_function(
int )
101 void clear_triggers(
void)
104 event_number_max = 0;
105 missing_walk_index = 0;
108 void plot_triggers(
void)
110 int hltcount = hltused.size();
111 std::map<int, int> modmissing;
113 for (
auto h : hltused) hlts.push_back(h);
114 if (!triggers.empty()) {
115 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
117 triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
119 for (
auto& it : triggers) {
120 int mod = it % hltcount;
121 modmissing[hlts[mod]]++;
123 ERR_FPRINTF(stderr,
"[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
126 ERR_FPRINTF(stderr,
"[WARNING] ... too many missing to report\n");
132 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: missing triggers 0\n");
134 for (
auto m : modmissing) {
135 ERR_FPRINTF(stderr,
"[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
139 void check_event_nr(
unsigned int event_number)
143 if (event_number_max < event_number) {
144 for (uint32_t e = event_number_max + 1; e < event_number; e ++) {
147 event_number_max = event_number;
150 triggers.erase(event_number);
152 if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
153 missing_walk_index++;
154 }
else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
155 missing_walk_index--;
157 if (triggers.size() > 50000) {
159 enable_check =
false;
160 ERR_FPRINTF(stderr,
"[ERROR] Too many in-flight triggers -> disable checking until next run\n");
165 b2_timed_blocking_io(
const int sd,
const int timeout )
174 ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv,
sizeof(
struct timeval));
182 ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv,
sizeof(
struct timeval));
187 }
else if (timeout == 0) {
188 ret = fcntl(sd, F_SETFL, O_NDELAY);
193 }
else if (timeout < 0) {
194 ret = fcntl(sd, F_GETFL, O_NDELAY);
200 ret = fcntl(sd, F_SETFL, ret);
212 b2_build_sockaddr_in(
const char* hostname,
const unsigned short port,
struct sockaddr_in* in)
214 memset(in, 0,
sizeof(
struct sockaddr_in));
216 in->sin_family = AF_INET;
218 struct hostent* hoste;
219 hoste = gethostbyname(hostname);
221 ERROR(gethostbyname);
224 in->sin_addr = *(
struct in_addr*)(hoste->h_addr);
226 in->sin_port = htons(port);
233 b2_create_tcp_socket(
void)
235 int sd, ret, one = 1;
237 sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
244 ret = b2_timed_blocking_io(sd, 0);
246 ERROR(b2_timed_blocking_io);
251 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
262 b2_create_accept_socket(
const unsigned short port)
265 struct sockaddr_in in;
267 sd = b2_create_tcp_socket();
269 ERROR(b2_create_tcp_socket);
273 ret = b2_build_sockaddr_in(
"0.0.0.0", port, &in);
275 ERROR(b2_build_sockaddr_in);
279 ret = bind(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
296 b2_create_connect_socket(
const char* hostname,
const unsigned short port)
299 struct sockaddr_in in;
301 sd = b2_create_tcp_socket();
303 ERROR(b2_create_tcp_socket);
307 ret = b2_build_sockaddr_in(hostname, port, &in);
309 ERROR(b2_build_sockaddr_in);
313 ret = connect(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
314 if (ret == -1 && errno != EINPROGRESS) {
324 b2_send(
const int sd,
const void* buf,
const size_t size)
326 unsigned char* ptr = (
unsigned char*)buf;
327 size_t n_bytes_remained = size;
330 int ret, n_bytes_send;
332 ret = send(sd, ptr, n_bytes_remained, 0);
333 if (ret == -1 && errno != EINTR) {
337 if (ret == -1 && errno == EINTR) {
338 fprintf(stderr,
"%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
342 fprintf(stderr,
"%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
349 if (n_bytes_remained <
size_t(n_bytes_send))
352 fprintf(stderr,
"%s:%d: send(): Internal error\n", __FILE__, __LINE__);
355 n_bytes_remained -= n_bytes_send;
357 if (n_bytes_remained == 0)
369 b2_recv(
const int sd,
void* buf,
const size_t size)
371 unsigned char* ptr = (
unsigned char*)buf;
372 size_t n_bytes_remained = size;
375 int ret, n_bytes_recv;
377 ret = recv(sd, ptr, n_bytes_remained, 0);
378 if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
382 if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
383 fprintf(stderr,
"%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
387 fprintf(stderr,
"%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
393 if (n_bytes_remained <
size_t(n_bytes_recv))
396 fprintf(stderr,
"%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
399 n_bytes_remained -= n_bytes_recv;
401 if (n_bytes_remained == 0)
414 MM_init_connect_to_onsen(
const char* host,
const unsigned int port)
419 sd = b2_create_connect_socket(host, port);
421 ERROR(b2_create_connect_socket);
426 fds.events = POLLOUT;
428 ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
435 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
439 int connection_error;
442 optlen =
sizeof(connection_error);
443 ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
448 if (connection_error) {
449 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
457 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
461 ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT );
463 ERROR(b2_timed_blocking_io);
472 MM_init_accept_from_hltout2merger(
const unsigned int port)
478 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
480 sd = b2_create_accept_socket(port);
482 ERROR(b2_create_accept_socket);
486 ret = b2_timed_blocking_io(sd,
489 ERROR(b2_timed_blocking_io);
493 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
559 MM_get_packet(
const int sd_acc,
unsigned char* buf)
561 unsigned int header[2] = {};
563 int ret = recv(sd_acc, &header,
sizeof(
unsigned int) * 2, MSG_PEEK);
564 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
565 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Packet receive timed out\n");
568 if (ret != 2 *
sizeof(
unsigned int)) {
569 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
576 size_t n_bytes_from_hltout = 2 *
sizeof(
unsigned int) + ntohl(header[1]);
578 ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
579 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
580 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
583 if (
size_t(ret) != n_bytes_from_hltout) {
584 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
593 MM_term_connect_to_onsen(
const int sd)
598 void print_stat(
void)
600 ERR_FPRINTF(stderr,
"[INFO] --- STAT START ---\n");
601 unsigned int sum = 0;
602 for (
auto& it : mycount) {
603 ERR_FPRINTF(stderr,
"[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
604 if (it.first != 0) sum += it.second;
606 ERR_FPRINTF(stderr,
"[INFO] sum %u out %u diff %d\n", sum, mycount[0], (
int)(mycount[0] - sum));
608 ERR_FPRINTF(stderr,
"[INFO] --- STAT END ---\n");
614 main(
int argc,
char* argv[])
616 int current_runnr = -1;
620 int need_reconnection_to_onsen = 1;
622 int connected_hlts = 0;
623 bool stop_running =
false;
625 char onsen_host[1024];
626 unsigned short onsen_port;
634 unsigned short accept_port;
636 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
639 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
653 strcpy(onsen_host, p);
656 onsen_port = atoi(p);
659 accept_port = atoi(p);
661 signal(SIGPIPE, catch_pipe_function);
662 signal(SIGTERM, catch_term_function);
663 signal(SIGINT, catch_int_function);
664 signal(SIGUSR1, catch_usr1_function);
665 signal(SIGUSR2, catch_usr2_function);
668 sd_acc = MM_init_accept_from_hltout2merger(accept_port);
669 LOG_FPRINTF(stderr,
"[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
674 size_t n_bytes_from_hltout;
675 size_t n_bytes_to_onsen;
677 unsigned char* buf = (
unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
684 bool connected =
false;
687 if (need_reconnection_to_onsen) {
689 if (sd_con != -1) close(sd_con);
694 sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
698 need_reconnection_to_onsen = 0;
700 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
705 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
724 FD_SET(sd_acc, &allset);
732 while (!stop_running) {
733 memcpy(&rset, &allset,
sizeof(rset));
740 int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
744 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR1, Run START\n");
749 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR2, Run STOP\n");
754 ERR_FPRINTF(stderr,
"[INFO] Got SIGINT, ABORT\n");
759 ERR_FPRINTF(stderr,
"[INFO] Got SIGPIPE\n");
763 ERR_FPRINTF(stderr,
"[INFO] Got SIGTERM\n");
768 }
else if (rc == 0) {
772 if (FD_ISSET(sd_acc, &rset)) {
774 struct sockaddr_in isa;
775 socklen_t i =
sizeof(isa);
776 getsockname(sd_acc, (
struct sockaddr*)&isa, &i);
777 if ((t =::accept(sd_acc, (
struct sockaddr*)&isa, &i)) < 0) {
779 ERR_FPRINTF(stderr,
"[ERROR] Error on accepting new connection\n");
780 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
783 LOG_FPRINTF(stderr,
"[INFO] New socket connection t=%d\n", t);
784 char address[INET_ADDRSTRLEN];
785 inet_ntop(AF_INET, &isa.sin_addr, address,
sizeof(address));
788 LOG_FPRINTF(stderr,
"[INFO] %d is IP <%s>\n", t, address);
791 char* ptr = strrchr(address,
'.');
793 int nr = atoi(ptr + 1);
799 if (minfd == sd_acc) minfd = t;
800 if (t > maxfd) maxfd = t;
803 for (
int fd = minfd; fd < maxfd + 1; fd++) {
804 n_bytes_from_hltout = 0;
805 if (FD_ISSET(fd, &rset)) {
809 ret = MM_get_packet(fd, buf);
812 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
814 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
816 ERR_FPRINTF(stderr,
"[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
826 if (connected_hlts == 0) stop_running =
true;
829 n_bytes_from_hltout = ret;
835 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
836 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
837 dump_binary(stderr, buf, n_bytes_from_hltout);
843 if (n_bytes_from_hltout > 0) {
845 unsigned char* ptr_head_to_onsen = buf;
856 if (n_bytes_from_hltout >= 6 * 4) {
857 auto* iptr = (boost::endian::big_uint32_t*)ptr_head_to_onsen;
859 runnr = (iptr[4] & 0x3FFF00) >> 8;
863 LOG_FPRINTF(stderr,
"[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
866 if (runnr > current_runnr) {
867 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: run number increases: got %d current %d trig %d\n", runnr, current_runnr,
871 current_runnr = runnr;
873 }
else if (runnr < current_runnr) {
875 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: got trigger from older run: got %d current %d trig %d\n", runnr, current_runnr,
879 if (runnr == current_runnr) {
881 if (enable_check) check_event_nr(eventnr);
884 n_bytes_to_onsen = n_bytes_from_hltout;
886 ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
887 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
888 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: socket buffer full, retry\n");
895 need_reconnection_to_onsen = 1;
897 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
899 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
904 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_send(): Connection closed\n");
905 need_reconnection_to_onsen = 1;
908 ERR_FPRINTF(stderr,
"[ERROR] Connection to ONSEN was closed on ONSEN side\n");
909 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
916 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
917 dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
923 if (event_count % 10000 == 0) {
924 int hltcount = hltused.size();
925 if (triggers.empty()) {
927 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
928 event_number_max, event_number_max, missing_walk_index, triggers.size(),
929 0, event_number_max, -1, -1);
931 int mod = *triggers.begin() % hltcount;
932 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
933 *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
934 *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
942 MM_term_connect_to_onsen(sd_con);
944 if (connected_hlts == 0) {
945 ERR_FPRINTF(stderr,
"[RESULT] Stopped because all HLTs closed connection\n");
948 ERR_FPRINTF(stderr,
"[RESULT] %s terminated\n", argv[0]);
int main(int argc, char **argv)
Run all tests.