9 #include <netinet/in.h>
13 #include <arpa/inet.h>
17 #define ROI_MAX_PACKET_SIZE (16384)
19 #define NETWORK_ESTABLISH_TIMEOUT (-1)
20 #define NETWORK_IO_TIMEOUT (-1)
22 #include <boost/spirit/home/support/detail/endian.hpp>
27 #define LOG_FPRINTF (fprintf)
28 #define ERR_FPRINTF (fprintf)
29 #define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
31 std::map<int, std::string> myconn;
32 std::map<int, unsigned int> mycount;
33 std::set<int> hltused;
34 std::vector<int> hlts;
36 std::set<int> triggers;
37 unsigned int event_number_max = 0;
38 unsigned int missing_walk_index = 0;
40 bool got_sigusr1 =
false;
41 bool got_sigusr2 =
false;
42 bool got_sigint =
false;
43 bool got_sigpipe =
false;
44 bool got_sigterm =
false;
47 dump_binary(FILE* fp,
const void* ptr,
const size_t size)
49 const unsigned int* p = (
const unsigned int*)ptr;
50 const size_t _size = size /
sizeof(
unsigned int);
53 for (
size_t i = 0; i < _size; i++) {
54 fprintf(fp,
"%08x ", p[i]);
55 if (i % 8 == 7) fprintf(fp,
"\n");
57 if (_size % 8 != 0) fprintf(fp,
"\n");
61 static void catch_usr1_function(
int )
67 static void catch_usr2_function(
int )
73 static void catch_int_function(
int )
79 static void catch_term_function(
int )
85 static void catch_pipe_function(
int )
91 void clear_triggers(
void)
95 missing_walk_index = 0;
98 void plot_triggers(
void)
100 int hltcount = hltused.size();
101 std::map<int, int> modmissing;
103 for (
auto h : hltused) hlts.push_back(h);
104 if (!triggers.empty()) {
105 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
107 triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
109 for (
auto& it : triggers) {
110 int mod = it % hltcount;
111 modmissing[hlts[mod]]++;
113 ERR_FPRINTF(stderr,
"[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
116 ERR_FPRINTF(stderr,
"[WARNING] ... too many missing to report\n");
122 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: missing triggers 0\n");
124 for (
auto m : modmissing) {
125 ERR_FPRINTF(stderr,
"[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
129 void check_event_nr(
unsigned int event_number)
133 if (event_number_max < event_number) {
134 for (uint32_t e = event_number_max + 1;
e < event_number;
e ++) {
137 event_number_max = event_number;
140 triggers.erase(event_number);
142 if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
143 missing_walk_index++;
144 }
else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
145 missing_walk_index--;
150 b2_timed_blocking_io(
const int sd,
const int timeout )
160 ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv,
sizeof(
struct timeval));
168 ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv,
sizeof(
struct timeval));
173 }
else if (timeout == 0) {
174 ret = fcntl(sd, F_SETFL, O_NDELAY);
179 }
else if (timeout < 0) {
180 ret = fcntl(sd, F_GETFL, O_NDELAY);
186 ret = fcntl(sd, F_SETFL, ret);
199 b2_build_sockaddr_in(
const char* hostname,
const unsigned short port,
struct sockaddr_in* in)
201 memset(in, 0,
sizeof(
struct sockaddr_in));
203 in->sin_family = AF_INET;
205 struct hostent* hoste;
206 hoste = gethostbyname(hostname);
208 ERROR(gethostbyname);
211 in->sin_addr = *(
struct in_addr*)(hoste->h_addr);
213 in->sin_port = htons(port);
221 b2_create_tcp_socket(
void)
223 int sd, ret, one = 1;
226 sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
233 ret = b2_timed_blocking_io(sd, 0);
235 ERROR(b2_timed_blocking_io);
240 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
252 b2_create_accept_socket(
const unsigned short port)
255 struct sockaddr_in in;
258 sd = b2_create_tcp_socket();
260 ERROR(b2_create_tcp_socket);
264 ret = b2_build_sockaddr_in(
"0.0.0.0", port, &in);
266 ERROR(b2_build_sockaddr_in);
270 ret = bind(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
288 b2_create_connect_socket(
const char* hostname,
const unsigned short port)
291 struct sockaddr_in in;
294 sd = b2_create_tcp_socket();
296 ERROR(b2_create_tcp_socket);
300 ret = b2_build_sockaddr_in(hostname, port, &in);
302 ERROR(b2_build_sockaddr_in);
306 ret = connect(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
307 if (ret == -1 && errno != EINPROGRESS) {
318 b2_send(
const int sd,
const void* buf,
const size_t size)
320 unsigned char* ptr = (
unsigned char*)buf;
321 size_t n_bytes_remained = size;
325 int ret, n_bytes_send;
327 ret = send(sd, ptr, n_bytes_remained, 0);
328 if (ret == -1 && errno != EINTR) {
332 if (ret == -1 && errno == EINTR) {
333 fprintf(stderr,
"%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
337 fprintf(stderr,
"%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
344 if (n_bytes_remained <
size_t(n_bytes_send))
347 fprintf(stderr,
"%s:%d: send(): Internal error\n", __FILE__, __LINE__);
350 n_bytes_remained -= n_bytes_send;
352 if (n_bytes_remained == 0)
365 b2_recv(
const int sd,
void* buf,
const size_t size)
367 unsigned char* ptr = (
unsigned char*)buf;
368 size_t n_bytes_remained = size;
372 int ret, n_bytes_recv;
374 ret = recv(sd, ptr, n_bytes_remained, 0);
375 if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
379 if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
380 fprintf(stderr,
"%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
384 fprintf(stderr,
"%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
390 if (n_bytes_remained <
size_t(n_bytes_recv))
393 fprintf(stderr,
"%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
396 n_bytes_remained -= n_bytes_recv;
398 if (n_bytes_remained == 0)
412 MM_init_connect_to_onsen(
const char* host,
const unsigned int port)
418 sd = b2_create_connect_socket(host, port);
420 ERROR(b2_create_connect_socket);
425 fds.events = POLLOUT;
427 ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
434 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
438 int connection_error;
441 optlen =
sizeof(connection_error);
442 ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
447 if (connection_error) {
448 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
456 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
460 ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT );
462 ERROR(b2_timed_blocking_io);
472 MM_init_accept_from_hltout2merger(
const unsigned int port)
479 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
481 sd = b2_create_accept_socket(port);
483 ERROR(b2_create_accept_socket);
487 ret = b2_timed_blocking_io(sd,
490 ERROR(b2_timed_blocking_io);
494 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
561 MM_get_packet(
const int sd_acc,
unsigned char* buf)
563 unsigned int header[2] = {};
565 int ret = recv(sd_acc, &header,
sizeof(
unsigned int) * 2, MSG_PEEK);
566 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
567 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Packet receive timed out\n");
570 if (ret != 2 *
sizeof(
unsigned int)) {
571 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
578 size_t n_bytes_from_hltout = 2 *
sizeof(
unsigned int) + ntohl(header[1]);
580 ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
581 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
582 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
585 if (
size_t(ret) != n_bytes_from_hltout) {
586 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
595 MM_term_connect_to_onsen(
const int sd)
600 void print_stat(
void)
602 ERR_FPRINTF(stderr,
"[INFO] --- STAT START ---\n");
603 unsigned int sum = 0;
604 for (
auto& it : mycount) {
605 ERR_FPRINTF(stderr,
"[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
606 if (it.first != 0) sum += it.second;
608 ERR_FPRINTF(stderr,
"[INFO] sum %u out %u diff %d\n", sum, mycount[0], (
int)(mycount[0] - sum));
610 ERR_FPRINTF(stderr,
"[INFO] --- STAT END ---\n");
616 main(
int argc,
char* argv[])
618 int current_runnr = -1;
622 int need_reconnection_to_onsen = 1;
624 int connected_hlts = 0;
625 bool stop_running =
false;
627 char onsen_host[1024];
628 unsigned short onsen_port;
636 unsigned short accept_port;
638 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
641 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
655 strcpy(onsen_host, p);
658 onsen_port = atoi(p);
661 accept_port = atoi(p);
663 signal(SIGPIPE, catch_pipe_function);
664 signal(SIGTERM, catch_term_function);
665 signal(SIGINT, catch_int_function);
666 signal(SIGUSR1, catch_usr1_function);
667 signal(SIGUSR2, catch_usr2_function);
670 sd_acc = MM_init_accept_from_hltout2merger(accept_port);
671 LOG_FPRINTF(stderr,
"[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
676 size_t n_bytes_from_hltout;
677 size_t n_bytes_to_onsen;
679 unsigned char* buf = (
unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
686 bool connected =
false;
689 if (need_reconnection_to_onsen) {
691 if (sd_con != -1) close(sd_con);
696 sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
700 need_reconnection_to_onsen = 0;
702 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
707 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
726 FD_SET(sd_acc, &allset);
732 while (!stop_running) {
733 memcpy(&rset, &allset,
sizeof(rset));
740 int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
744 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR1, Run START\n");
749 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR2, Run STOP\n");
754 ERR_FPRINTF(stderr,
"[INFO] Got SIGINT, ABORT\n");
759 ERR_FPRINTF(stderr,
"[INFO] Got SIGPIPE\n");
763 ERR_FPRINTF(stderr,
"[INFO] Got SIGTERM\n");
768 }
else if (rc == 0) {
773 if (FD_ISSET(sd_acc, &rset)) {
774 struct sockaddr_in isa;
775 socklen_t i =
sizeof(isa);
776 getsockname(sd_acc, (
struct sockaddr*)&isa, &i);
777 if ((t =::accept(sd_acc, (
struct sockaddr*)&isa, &i)) < 0) {
779 ERR_FPRINTF(stderr,
"[ERROR] Error on accepting new connection\n");
780 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
783 LOG_FPRINTF(stderr,
"[INFO] New socket connection t=%d\n", t);
784 char address[INET_ADDRSTRLEN];
785 inet_ntop(AF_INET, &isa.sin_addr, address,
sizeof(address));
788 LOG_FPRINTF(stderr,
"[INFO] %d is IP <%s>\n", t, address);
791 char* ptr = strrchr(address,
'.');
793 int nr = atoi(ptr + 1);
799 if (minfd == sd_acc) minfd = t;
800 if (t > maxfd) maxfd = t;
803 for (
int fd = minfd; fd < maxfd + 1; fd++) {
804 n_bytes_from_hltout = 0;
805 if (FD_ISSET(fd, &rset)) {
809 ret = MM_get_packet(fd, buf);
812 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
814 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
816 ERR_FPRINTF(stderr,
"[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
826 if (connected_hlts == 0) stop_running =
true;
829 n_bytes_from_hltout = ret;
835 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
836 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
837 dump_binary(stderr, buf, n_bytes_from_hltout);
843 if (n_bytes_from_hltout > 0) {
845 unsigned char* ptr_head_to_onsen = buf;
856 if (n_bytes_from_hltout >= 6 * 4) {
857 boost::spirit::endian::ubig32_t* iptr = (boost::spirit::endian::ubig32_t*)ptr_head_to_onsen;
859 runnr = (iptr[4] & 0x3FFF00) >> 8;
863 LOG_FPRINTF(stderr,
"[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
866 if (runnr > current_runnr) {
867 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: run number increases: got %d current %d trig %d\n", runnr, current_runnr,
871 current_runnr = runnr;
872 }
else if (runnr < current_runnr) {
874 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: got trigger from older run: got %d current %d trig %d\n", runnr, current_runnr,
878 if (runnr == current_runnr) {
880 check_event_nr(eventnr);
883 n_bytes_to_onsen = n_bytes_from_hltout;
885 ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
886 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
887 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: socket buffer full, retry\n");
894 need_reconnection_to_onsen = 1;
896 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
898 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
903 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_send(): Connection closed\n");
904 need_reconnection_to_onsen = 1;
907 ERR_FPRINTF(stderr,
"[ERROR] Connection to ONSEN was closed on ONSEN side\n");
908 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
915 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
916 dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
922 if (event_count % 10000 == 0) {
923 int hltcount = hltused.size();
924 int mod = *triggers.begin() % hltcount;
925 if (triggers.empty()) {
927 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
928 event_number_max, event_number_max, missing_walk_index, triggers.size(),
929 0, event_number_max, mod, hlts[mod]);
931 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
932 *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
933 *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
941 MM_term_connect_to_onsen(sd_con);
943 if (connected_hlts == 0) {
944 ERR_FPRINTF(stderr,
"[RESULT] Stopped because all HLTs closed connection\n");
947 ERR_FPRINTF(stderr,
"[RESULT] %s terminated\n", argv[0]);