8 #include <daq/rawdata/DesSer.h>
11 #include <netinet/tcp.h>
14 #include <sys/socket.h>
33 m_num_connections = 1;
35 m_exprunsubrun_no = 0;
37 m_prev_exprunsubrun_no = 0xFFFFFFFF;
46 printf(
"[DEBUG] DesSer: Constructor done.\n"); fflush(stdout);
57 int* DesSer::getPreAllocBuf()
60 if (m_num_usedbuf < NUM_PREALLOC_BUF) {
61 tempbuf = m_bufary[ m_num_usedbuf ];
66 "No pre-allocated buffers are left. %d > %d. Not enough buffers are allocated or "
67 "memory leak or forget to call ClearNumUsedBuf every event loop. Exting...",
68 m_num_usedbuf, NUM_PREALLOC_BUF);
69 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
77 int* DesSer::getNewBuffer(
int nwords,
int* delete_flag)
82 if (nwords > BUF_SIZE_WORD) {
84 temp_buf =
new int[ nwords ];
86 if ((temp_buf = getPreAllocBuf()) == 0x0) {
88 sprintf(err_buf,
"Null pointer from GetPreALlocBuf(). Exting...\n");
89 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
101 void DesSer::initialize(
bool close_listen)
103 printf(
"[DEBUG] DesSer: initialize() started.\n"); fflush(stdout);
104 signal(SIGPIPE, SIG_IGN);
111 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
112 m_bufary[i] =
new int[ BUF_SIZE_WORD ];
114 m_buffer =
new int[ BUF_SIZE_WORD ];
117 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
118 memset(m_bufary[i], 0, BUF_SIZE_WORD *
sizeof(
int));
125 if (m_nodename.size() == 0 || m_nodeid < 0) {
128 m_status.open(m_nodename, m_nodeid);
129 m_status.reportReady();
135 m_prev_copper_ctr = 0xFFFFFFFF;
136 m_prev_evenum = 0xFFFFFFFF;
144 m_compressionLevel = 0;
147 m_buffer =
new int[ BUF_SIZE_WORD ];
149 Accept(close_listen);
156 if (m_status.isAvailable()) {
157 m_status.setOutputNBytes(0);
158 m_status.setOutputCount(0);
161 printf(
"[DEBUG] DesSer: initialize() was done.\n"); fflush(stdout);
174 int total_send_nwords =
188 if (total_send_nwords != (rawdblk->
GetBuffer(0))[ 0 ] + 8) {
190 sprintf(err_buf,
"Length error. total length %d rawdblk length %d. Exting...\n",
191 total_send_nwords, (rawdblk->
GetBuffer(0))[ 0 ]);
193 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
204 tmp_header.SetBuffer(rawdblk->
GetBuffer(i));
205 hdr->SetEventNumber(tmp_header.GetEveNo());
206 hdr->SetNodeID(tmp_header.GetNodeID());
207 hdr->SetExpRunWord(tmp_header.GetExpRunSubrun());
216 char err_buf[500] =
"[FATAL] CORRUPTED DATA: No COPPER blocks in RawDataBlock. Exiting...";
217 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
231 fillSendHeaderTrailer(&send_header, &send_trailer, rawdblk);
236 struct iovec iov[ NUM_BUFFER ];
242 iov[0].iov_base = (
char*)send_header.
GetBuffer();
243 iov[0].iov_len =
sizeof(int) * send_header.
GetHdrNwords();
246 iov[1].iov_len =
sizeof(int) * rawcopper_nwords;
248 iov[2].iov_base = (
char*)send_trailer.GetBuffer();
249 iov[2].iov_len =
sizeof(int) * send_trailer.GetTrlNwords();
254 if ((n = writev(m_socket_send, iov, NUM_BUFFER)) < 0) {
255 if (errno == EINTR) {
257 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
260 callCheckRunPause(err_str);
265 sprintf(err_buf,
"[WARNING] WRITEV error.(%s) : sent %d bytes, header %lu bytes body %lu trailer %lu\n",
266 strerror(errno), n, iov[0].iov_len, iov[1].iov_len, iov[2].iov_len);
270 printf(
"%s\n", err_buf); fflush(stdout);
271 string err_str =
"RUN_ERROR";
274 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
283 printf(
"[DEBUG] *******BODY**********\n");
284 printf(
"[DEBUG] \n%.8d : ", 0);
285 printData((
int*)(iov[1].iov_base), iov[1].iov_len);
289 int total_send_bytes =
sizeof(int) * send_header.GetTotalNwords();
295 if (n != total_send_bytes) {
297 printf(
"[WARNING] Serializer: Sent byte( %d bytes) is not same as the event size ( %d bytes). Retrying...\n", n,
301 double retry_start = getTimeSec();
303 if (n < (
int)(iov[ 0 ].iov_len)) {
304 n += Send(m_socket_send, (
char*)iov[ 0 ].iov_base + n, iov[ 0 ].iov_len - n);
307 if (n < (
int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len)) {
308 n += Send(m_socket_send, (
char*)iov[ 1 ].iov_base + (n - iov[ 0 ].iov_len), iov[ 1 ].iov_len - (n - iov[ 0 ].iov_len));
311 if (n < (
int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len + iov[ 2 ].iov_len)) {
312 n += Send(m_socket_send, (
char*)iov[ 2 ].iov_base + (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len),
313 iov[ 2 ].iov_len - (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len));
315 double retry_end = getTimeSec();
317 printf(
"[WARNING] Resending ends. It takes %lf (s)\n", retry_end - retry_start);
323 return total_send_bytes;
328 int DesSer::Send(
int socket,
char* buf,
int size_bytes)
333 if ((ret = send(socket,
334 buf + sent_bytes, size_bytes - sent_bytes, MSG_NOSIGNAL)) < 0) {
335 if (errno == EINTR) {
337 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
340 callCheckRunPause(err_str);
345 sprintf(err_buf,
"[WARNING] SEND ERROR.(%s)", strerror(errno));
349 printf(
"%s\n", err_buf); fflush(stdout);
350 string err_str =
"RUN_ERROR";
353 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
359 if (sent_bytes == size_bytes)
break;
364 void DesSer::Accept(
bool close_listen)
369 struct hostent* host;
370 host = gethostbyname(m_hostname_local.c_str());
373 sprintf(temp_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
374 m_hostname_local.c_str(), strerror(errno));
375 print_err.PrintError(temp_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
383 struct sockaddr_in sock_listen;
384 sock_listen.sin_family = AF_INET;
385 sock_listen.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
387 socklen_t addrlen =
sizeof(sock_listen);
388 sock_listen.sin_port = htons(m_port_to);
389 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
392 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)
sizeof(flags));
394 perror(
"Failed to set REUSEADDR");
397 if (bind(fd_listen, (
struct sockaddr*)&sock_listen,
sizeof(
struct sockaddr)) < 0) {
398 printf(
"[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
399 m_port_to); fflush(stdout);
404 sprintf(cmdline,
"/usr/sbin/ss -ap | grep %d", m_port_to);
405 if ((fp = popen(cmdline,
"r")) == NULL) {
406 printf(
"[WARNING] Failed to run %s\n", cmdline);
408 while (fgets(buf, 256, fp) != NULL) {
409 printf(
"[ERROR] Failed to bind. output of ss(port %d) : %s\n", m_port_to, buf); fflush(stdout);
414 sprintf(temp_char,
"[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
415 strerror(errno), m_port_to);
416 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
421 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)
sizeof(val1));
423 if (listen(fd_listen, backlog) < 0) {
425 sprintf(err_buf,
"Failed in listen(%s). Exting...", strerror(errno));
426 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
434 struct sockaddr_in sock_accept;
435 printf(
"[DEBUG] Accepting... : port %d server %s\n", m_port_to, m_hostname_local.c_str());
438 if ((fd_accept = accept(fd_listen, (
struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
440 sprintf(err_buf,
"[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
441 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
445 printf(
"[DEBUG] Done.\n"); fflush(stdout);
448 struct timeval timeout;
451 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)
sizeof(timeout));
453 char temp_char[100] =
"[FATAL] Failed to set TIMEOUT. Exiting...";
454 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
465 m_socket_send = fd_accept;
467 if (m_status.isAvailable()) {
468 m_status.setOutputPort(ntohs(sock_listen.sin_port));
469 m_status.setOutputAddress(sock_listen.sin_addr.s_addr);
471 printf(
"Accepted. port %d address %d\n", (
int)ntohs(sock_listen.sin_port), (
int)sock_listen.sin_addr.s_addr); fflush(stdout);
478 double DesSer::getTimeSec()
481 gettimeofday(&t, NULL);
482 return (t.tv_sec + t.tv_usec * 1.e-6);
486 void DesSer::recordTime(
int event,
double* array)
488 if (event >= 50000 && event < 50500) {
489 array[
event - 50000 ] = getTimeSec() - m_start_time;
495 unsigned int DesSer::calcXORChecksum(
int* buf,
int nwords)
497 unsigned int checksum = 0;
498 for (
int i = 0; i < nwords; i++) {
499 checksum = checksum ^ buf[ i ];
505 void DesSer::printData(
int* buf,
int nwords)
508 for (
int i = 0; i < nwords; i++) {
509 printf(
"%.8x ", buf[ i ]);
510 if (i % 10 == 9) printf(
"\n[DEBUG]");
519 void DesSer::openRunPauseNshm()
521 char path_shm[100] =
"/cpr_pause_resume";
522 int fd = shm_open(path_shm, O_RDONLY, 0666);
524 printf(
"[DEBUG] %s\n", path_shm);
525 perror(
"[ERROR] shm_open2");
528 m_ptr = (
int*)mmap(NULL,
sizeof(
int), PROT_READ, MAP_SHARED, fd, 0);
532 int DesSer::checkRunPause()
535 if (m_status.getState() == m_status.PAUSING) {
545 int DesSer::checkRunRecovery()
548 if (m_status.getState() == m_status.RESUMING) {
558 void DesSer::resumeRun()
562 printf(
"###########(Ser) the 1st event sicne the resume ###############\n");
572 void DesSer::pauseRun()
576 printf(
"###########(Ser) Pause the run ###############\n");
585 void DesSer::callCheckRunPause(
const string& err_str)
589 printf(
"###########(DesSer) TIMEOUT. ###############\n");
593 if (checkRunPause()) {
596 printf(
"###########(DesSer) Pause is detected. ###############\n");
607 int DesSer::CheckConnection(
int socket)
616 printf(
"CC1\n"); fflush(stdout);
617 ret = send(socket, buffer, 0, MSG_DONTWAIT);
618 printf(
"CC2\n"); fflush(stdout);
623 if (errno == EAGAIN) {
624 printf(
"EAGAIN %d cnt %d recvd %d\n", socket, eagain_cnt, tot_ret); fflush(stdout);
626 if (eagain_cnt > 100) {
632 printf(
"ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
637 printf(
"Return value %d of send is strange. Exting...\n", ret);
648 ret = recv(socket, buffer,
sizeof(buffer), MSG_DONTWAIT);
651 printf(
"EOF %d\n", socket); fflush(stdout);
655 if (errno == EAGAIN) {
656 printf(
"EAGAIN %d cnt %d recvd %d\n", socket, eagain_cnt, tot_ret); fflush(stdout);
658 if (eagain_cnt > 100) {
664 printf(
"ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
671 printf(
"Flushing data in socket buffer : sockid = %d %d bytes tot %d bytes\n", socket, ret, tot_ret); fflush(stdout);
679 void DesSer::shmOpen(
char*,
char*)
Abstract base class for different kinds of events.