17 #include <netinet/in.h>
21 #include <arpa/inet.h>
26 #define ROI_MAX_PACKET_SIZE (16384)
28 #define NETWORK_ESTABLISH_TIMEOUT (-1)
29 #define NETWORK_IO_TIMEOUT (-1)
31 #include <boost/endian/arithmetic.hpp>
36 #define LOG_FPRINTF (fprintf)
37 #define ERR_FPRINTF (fprintf)
38 #define ERROR(func) { fprintf(stderr, "[ERROR] %s:%d: "#func"(): %s\n", __FILE__, __LINE__, strerror(errno));}
40 std::map<int, std::string> myconn;
41 std::map<int, unsigned int> mycount;
42 std::set<int> hltused;
43 std::vector<int> hlts;
45 std::set<int> triggers;
46 unsigned int event_number_max = 0;
47 unsigned int missing_walk_index = 0;
49 bool got_sigusr1 =
false;
50 bool got_sigusr2 =
false;
51 bool got_sigint =
false;
52 bool got_sigpipe =
false;
53 bool got_sigterm =
false;
56 dump_binary(FILE* fp,
const void* ptr,
const size_t size)
58 const unsigned int* p = (
const unsigned int*)ptr;
59 const size_t _size = size /
sizeof(
unsigned int);
62 for (
size_t i = 0; i < _size; i++) {
63 fprintf(fp,
"%08x ", p[i]);
64 if (i % 8 == 7) fprintf(fp,
"\n");
66 if (_size % 8 != 0) fprintf(fp,
"\n");
70 static void catch_usr1_function(
int )
76 static void catch_usr2_function(
int )
82 static void catch_int_function(
int )
88 static void catch_term_function(
int )
94 static void catch_pipe_function(
int )
100 void clear_triggers(
void)
103 event_number_max = 0;
104 missing_walk_index = 0;
107 void plot_triggers(
void)
109 int hltcount = hltused.size();
110 std::map<int, int> modmissing;
112 for (
auto h : hltused) hlts.push_back(h);
113 if (!triggers.empty()) {
114 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: trigger low=%u high=%u missing %lu delta %u max %u\n", *triggers.begin(),
116 triggers.size(), *(--triggers.end()) - *triggers.begin(), event_number_max);
118 for (
auto& it : triggers) {
119 int mod = it % hltcount;
120 modmissing[hlts[mod]]++;
122 ERR_FPRINTF(stderr,
"[INFO] Miss trig %u (%d) HLT%d\n", it, mod, hlts[mod]);
125 ERR_FPRINTF(stderr,
"[WARNING] ... too many missing to report\n");
131 ERR_FPRINTF(stderr,
"[RESULT] merger_merge: missing triggers 0\n");
133 for (
auto m : modmissing) {
134 ERR_FPRINTF(stderr,
"[INFO] merger_merge: missing triggers from HLT%d: %d\n", m.first, m.second);
138 void check_event_nr(
unsigned int event_number)
142 if (event_number_max < event_number) {
143 for (uint32_t e = event_number_max + 1; e < event_number; e ++) {
146 event_number_max = event_number;
149 triggers.erase(event_number);
151 if (triggers.size() > missing_walk_index + 1 && event_number_max - *std::next(triggers.begin(), missing_walk_index + 1) > 100000) {
152 missing_walk_index++;
153 }
else if (missing_walk_index > 0 && event_number_max - *std::next(triggers.begin(), missing_walk_index) < 100000) {
154 missing_walk_index--;
159 b2_timed_blocking_io(
const int sd,
const int timeout )
169 ret = setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, &tv,
sizeof(
struct timeval));
177 ret = setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv,
sizeof(
struct timeval));
182 }
else if (timeout == 0) {
183 ret = fcntl(sd, F_SETFL, O_NDELAY);
188 }
else if (timeout < 0) {
189 ret = fcntl(sd, F_GETFL, O_NDELAY);
195 ret = fcntl(sd, F_SETFL, ret);
208 b2_build_sockaddr_in(
const char* hostname,
const unsigned short port,
struct sockaddr_in* in)
210 memset(in, 0,
sizeof(
struct sockaddr_in));
212 in->sin_family = AF_INET;
214 struct hostent* hoste;
215 hoste = gethostbyname(hostname);
217 ERROR(gethostbyname);
220 in->sin_addr = *(
struct in_addr*)(hoste->h_addr);
222 in->sin_port = htons(port);
230 b2_create_tcp_socket(
void)
232 int sd, ret, one = 1;
235 sd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
242 ret = b2_timed_blocking_io(sd, 0);
244 ERROR(b2_timed_blocking_io);
249 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
261 b2_create_accept_socket(
const unsigned short port)
264 struct sockaddr_in in;
267 sd = b2_create_tcp_socket();
269 ERROR(b2_create_tcp_socket);
273 ret = b2_build_sockaddr_in(
"0.0.0.0", port, &in);
275 ERROR(b2_build_sockaddr_in);
279 ret = bind(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
297 b2_create_connect_socket(
const char* hostname,
const unsigned short port)
300 struct sockaddr_in in;
303 sd = b2_create_tcp_socket();
305 ERROR(b2_create_tcp_socket);
309 ret = b2_build_sockaddr_in(hostname, port, &in);
311 ERROR(b2_build_sockaddr_in);
315 ret = connect(sd, (
const struct sockaddr*)&in,
sizeof(
struct sockaddr_in));
316 if (ret == -1 && errno != EINPROGRESS) {
327 b2_send(
const int sd,
const void* buf,
const size_t size)
329 unsigned char* ptr = (
unsigned char*)buf;
330 size_t n_bytes_remained = size;
334 int ret, n_bytes_send;
336 ret = send(sd, ptr, n_bytes_remained, 0);
337 if (ret == -1 && errno != EINTR) {
341 if (ret == -1 && errno == EINTR) {
342 fprintf(stderr,
"%s:%d: recv(): Packet send timed out\n", __FILE__, __LINE__);
346 fprintf(stderr,
"%s:%d: send(): Connection closed\n", __FILE__, __LINE__);
353 if (n_bytes_remained <
size_t(n_bytes_send))
356 fprintf(stderr,
"%s:%d: send(): Internal error\n", __FILE__, __LINE__);
359 n_bytes_remained -= n_bytes_send;
361 if (n_bytes_remained == 0)
374 b2_recv(
const int sd,
void* buf,
const size_t size)
376 unsigned char* ptr = (
unsigned char*)buf;
377 size_t n_bytes_remained = size;
381 int ret, n_bytes_recv;
383 ret = recv(sd, ptr, n_bytes_remained, 0);
384 if (ret == -1 && (errno != EINTR && errno != EWOULDBLOCK)) {
388 if (ret == -1 && (errno == EINTR || errno == EWOULDBLOCK)) {
389 fprintf(stderr,
"%s:%d: recv(): Packet receive timed out\n", __FILE__, __LINE__);
393 fprintf(stderr,
"%s:%d: recv(): Connection closed\n", __FILE__, __LINE__);
399 if (n_bytes_remained <
size_t(n_bytes_recv))
402 fprintf(stderr,
"%s:%d: recv(): Internal error\n", __FILE__, __LINE__);
405 n_bytes_remained -= n_bytes_recv;
407 if (n_bytes_remained == 0)
421 MM_init_connect_to_onsen(
const char* host,
const unsigned int port)
427 sd = b2_create_connect_socket(host, port);
429 ERROR(b2_create_connect_socket);
434 fds.events = POLLOUT;
436 ret = poll(&fds, 1, NETWORK_ESTABLISH_TIMEOUT * 1000);
443 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: Connection timed out\n");
447 int connection_error;
450 optlen =
sizeof(connection_error);
451 ret = getsockopt(sd, SOL_SOCKET, SO_ERROR, &connection_error, &optlen);
456 if (connection_error) {
457 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: connect() to ONSEN: %s\n", strerror(errno));
465 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: poll() connect to ONSEN: Unexpected error\n");
469 ret = b2_timed_blocking_io(sd, NETWORK_IO_TIMEOUT );
471 ERROR(b2_timed_blocking_io);
481 MM_init_accept_from_hltout2merger(
const unsigned int port)
488 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Waiting for connection from hltout2merger on port %d\n", port);
490 sd = b2_create_accept_socket(port);
492 ERROR(b2_create_accept_socket);
496 ret = b2_timed_blocking_io(sd,
499 ERROR(b2_timed_blocking_io);
503 ret = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (
char*)&one,
sizeof(
int));
570 MM_get_packet(
const int sd_acc,
unsigned char* buf)
572 unsigned int header[2] = {};
574 int ret = recv(sd_acc, &header,
sizeof(
unsigned int) * 2, MSG_PEEK);
575 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
576 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Packet receive timed out\n");
579 if (ret != 2 *
sizeof(
unsigned int)) {
580 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: recv(): Unexpected return value (%d)\n", ret);
587 size_t n_bytes_from_hltout = 2 *
sizeof(
unsigned int) + ntohl(header[1]);
589 ret = b2_recv(sd_acc, buf, n_bytes_from_hltout);
590 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
591 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Packet receive timed out\n");
594 if (
size_t(ret) != n_bytes_from_hltout) {
595 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_recv(): Unexpected return value (%d)\n", ret);
604 MM_term_connect_to_onsen(
const int sd)
609 void print_stat(
void)
611 ERR_FPRINTF(stderr,
"[INFO] --- STAT START ---\n");
612 unsigned int sum = 0;
613 for (
auto& it : mycount) {
614 ERR_FPRINTF(stderr,
"[INFO] (%s): %d\n", myconn[it.first].c_str(), it.second);
615 if (it.first != 0) sum += it.second;
617 ERR_FPRINTF(stderr,
"[INFO] sum %u out %u diff %d\n", sum, mycount[0], (
int)(mycount[0] - sum));
619 ERR_FPRINTF(stderr,
"[INFO] --- STAT END ---\n");
625 main(
int argc,
char* argv[])
627 int current_runnr = -1;
631 int need_reconnection_to_onsen = 1;
633 int connected_hlts = 0;
634 bool stop_running =
false;
636 char onsen_host[1024];
637 unsigned short onsen_port;
645 unsigned short accept_port;
647 LOG_FPRINTF(stderr,
"[INFO] merger_merge: Process invoked [ver(%s %s)]\n", __DATE__, __TIME__);
650 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: Usage: merger_merge onsen-host onsen-port client-port#1[:client-port#2[:...]]\n");
664 strcpy(onsen_host, p);
667 onsen_port = atoi(p);
670 accept_port = atoi(p);
672 signal(SIGPIPE, catch_pipe_function);
673 signal(SIGTERM, catch_term_function);
674 signal(SIGINT, catch_int_function);
675 signal(SIGUSR1, catch_usr1_function);
676 signal(SIGUSR2, catch_usr2_function);
679 sd_acc = MM_init_accept_from_hltout2merger(accept_port);
680 LOG_FPRINTF(stderr,
"[INFO] merger_merge: port to accept connections from HLTOUT [%d]\n", sd_acc);
685 size_t n_bytes_from_hltout;
686 size_t n_bytes_to_onsen;
688 unsigned char* buf = (
unsigned char*)valloc(ROI_MAX_PACKET_SIZE);
695 bool connected =
false;
698 if (need_reconnection_to_onsen) {
700 if (sd_con != -1) close(sd_con);
705 sd_con = MM_init_connect_to_onsen(onsen_host, onsen_port);
709 need_reconnection_to_onsen = 0;
711 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_init_connect_to_onsen(): Connected to ONSEN\n");
716 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: connection to onsen failed: reconnect in %d second(s)\n", sleep_sec);
735 FD_SET(sd_acc, &allset);
741 while (!stop_running) {
742 memcpy(&rset, &allset,
sizeof(rset));
749 int rc = select(maxfd + 1, &rset, NULL, NULL, NULL);
753 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR1, Run START\n");
758 ERR_FPRINTF(stderr,
"[INFO] Got SIGUSR2, Run STOP\n");
763 ERR_FPRINTF(stderr,
"[INFO] Got SIGINT, ABORT\n");
768 ERR_FPRINTF(stderr,
"[INFO] Got SIGPIPE\n");
772 ERR_FPRINTF(stderr,
"[INFO] Got SIGTERM\n");
777 }
else if (rc == 0) {
781 if (FD_ISSET(sd_acc, &rset)) {
783 struct sockaddr_in isa;
784 socklen_t i =
sizeof(isa);
785 getsockname(sd_acc, (
struct sockaddr*)&isa, &i);
786 if ((t =::accept(sd_acc, (
struct sockaddr*)&isa, &i)) < 0) {
788 ERR_FPRINTF(stderr,
"[ERROR] Error on accepting new connection\n");
789 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
792 LOG_FPRINTF(stderr,
"[INFO] New socket connection t=%d\n", t);
793 char address[INET_ADDRSTRLEN];
794 inet_ntop(AF_INET, &isa.sin_addr, address,
sizeof(address));
797 LOG_FPRINTF(stderr,
"[INFO] %d is IP <%s>\n", t, address);
800 char* ptr = strrchr(address,
'.');
802 int nr = atoi(ptr + 1);
808 if (minfd == sd_acc) minfd = t;
809 if (t > maxfd) maxfd = t;
812 for (
int fd = minfd; fd < maxfd + 1; fd++) {
813 n_bytes_from_hltout = 0;
814 if (FD_ISSET(fd, &rset)) {
818 ret = MM_get_packet(fd, buf);
821 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]: %s\n", fd, strerror(errno));
823 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: MM_get_packet()[%d]\n", fd);
825 ERR_FPRINTF(stderr,
"[ERROR] Connection from HLT was closed on HLT side (hltout2merge) from %s\n",
835 if (connected_hlts == 0) stop_running =
true;
838 n_bytes_from_hltout = ret;
844 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] received event from ROI transmitter\n", event_count);
845 LOG_FPRINTF(stderr,
"[INFO] merger_merge: MM_get_packet() Returned %ld\n", n_bytes_from_hltout);
846 dump_binary(stderr, buf, n_bytes_from_hltout);
852 if (n_bytes_from_hltout > 0) {
854 unsigned char* ptr_head_to_onsen = buf;
865 if (n_bytes_from_hltout >= 6 * 4) {
866 auto* iptr = (boost::endian::big_uint32_t*)ptr_head_to_onsen;
868 runnr = (iptr[4] & 0x3FFF00) >> 8;
872 LOG_FPRINTF(stderr,
"[ERROR] merger_merge: packet to small to hold useful header (%ld)\n", n_bytes_from_hltout);
875 if (runnr > current_runnr) {
876 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: run number increases: got %d current %d trig %d\n", runnr, current_runnr,
880 current_runnr = runnr;
881 }
else if (runnr < current_runnr) {
883 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: got trigger from older run: got %d current %d trig %d\n", runnr, current_runnr,
887 if (runnr == current_runnr) {
889 check_event_nr(eventnr);
892 n_bytes_to_onsen = n_bytes_from_hltout;
894 ret = b2_send(sd_con, ptr_head_to_onsen, n_bytes_to_onsen);
895 if (ret == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
896 ERR_FPRINTF(stderr,
"[WARNING] merger_merge: socket buffer full, retry\n");
903 need_reconnection_to_onsen = 1;
905 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: error to send to ONSEN : %s\n", strerror(errno));
907 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
912 ERR_FPRINTF(stderr,
"[ERROR] merger_merge: b2_send(): Connection closed\n");
913 need_reconnection_to_onsen = 1;
916 ERR_FPRINTF(stderr,
"[ERROR] Connection to ONSEN was closed on ONSEN side\n");
917 ERR_FPRINTF(stderr,
"[ERROR] %s terminated\n", argv[0]);
924 LOG_FPRINTF(stderr,
"[INFO] merger_merge: ---- [ %d] sent event to ONSEN\n", event_count);
925 dump_binary(stderr, ptr_head_to_onsen, n_bytes_to_onsen);
931 if (event_count % 10000 == 0) {
932 int hltcount = hltused.size();
933 int mod = *triggers.begin() % hltcount;
934 if (triggers.empty()) {
936 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
937 event_number_max, event_number_max, missing_walk_index, triggers.size(),
938 0, event_number_max, mod, hlts[mod]);
940 ERR_FPRINTF(stderr,
"[INFO] merger_merge: trigger low %u high %u missing %u inflight %lu delta %u max %u low mod %d low HLT %d\n",
941 *triggers.begin(), *(--triggers.end()), missing_walk_index, triggers.size(),
942 *(--triggers.end()) - *triggers.begin(), event_number_max, mod, hlts[mod]);
950 MM_term_connect_to_onsen(sd_con);
952 if (connected_hlts == 0) {
953 ERR_FPRINTF(stderr,
"[RESULT] Stopped because all HLTs closed connection\n");
956 ERR_FPRINTF(stderr,
"[RESULT] %s terminated\n", argv[0]);
int main(int argc, char **argv)
Run all tests.