8#include <daq/rawdata/DesSer.h>
11#include <netinet/tcp.h>
14#include <sys/socket.h>
46 printf(
"[DEBUG] DesSer: Constructor done.\n"); fflush(stdout);
66 "No pre-allocated buffers are left. %d > %d. Not enough buffers are allocated or "
67 "memory leak or forget to call ClearNumUsedBuf every event loop. Exting...",
69 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
82 if (nwords > BUF_SIZE_WORD) {
84 temp_buf =
new int[ nwords ];
88 sprintf(err_buf,
"Null pointer from GetPreALlocBuf(). Exting...\n");
89 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
101void DesSer::initialize(
bool close_listen)
103 printf(
"[DEBUG] DesSer: initialize() started.\n"); fflush(stdout);
104 signal(SIGPIPE, SIG_IGN);
111 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
112 m_bufary[i] =
new int[ BUF_SIZE_WORD ];
114 m_buffer =
new int[ BUF_SIZE_WORD ];
117 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
118 memset(
m_bufary[i], 0, BUF_SIZE_WORD *
sizeof(
int));
135 m_prev_copper_ctr = 0xFFFFFFFF;
136 m_prev_evenum = 0xFFFFFFFF;
147 m_buffer =
new int[ BUF_SIZE_WORD ];
149 Accept(close_listen);
161 printf(
"[DEBUG] DesSer: initialize() was done.\n"); fflush(stdout);
174 int total_send_nwords =
188 if (total_send_nwords != (rawdblk->
GetBuffer(0))[ 0 ] + 8) {
190 sprintf(err_buf,
"Length error. total length %d rawdblk length %d. Exting...\n",
191 total_send_nwords, (rawdblk->
GetBuffer(0))[ 0 ]);
193 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
216 char err_buf[500] =
"[FATAL] CORRUPTED DATA: No COPPER blocks in RawDataBlock. Exiting...";
217 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
231 fillSendHeaderTrailer(&send_header, &send_trailer, rawdblk);
236 struct iovec iov[ NUM_BUFFER ];
242 iov[0].iov_base = (
char*)send_header.
GetBuffer();
243 iov[0].iov_len =
sizeof(int) * send_header.
GetHdrNwords();
246 iov[1].iov_len =
sizeof(int) * rawcopper_nwords;
248 iov[2].iov_base = (
char*)send_trailer.GetBuffer();
249 iov[2].iov_len =
sizeof(int) * send_trailer.GetTrlNwords();
255 if (errno == EINTR) {
257 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
260 callCheckRunPause(err_str);
265 sprintf(err_buf,
"[WARNING] WRITEV error.(%s) : sent %d bytes, header %lu bytes body %lu trailer %lu\n",
266 strerror(errno), n, iov[0].iov_len, iov[1].iov_len, iov[2].iov_len);
270 printf(
"%s\n", err_buf); fflush(stdout);
271 string err_str =
"RUN_ERROR";
274 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
283 printf(
"[DEBUG] *******BODY**********\n");
284 printf(
"[DEBUG] \n%.8d : ", 0);
285 printData((
int*)(iov[1].iov_base), iov[1].iov_len);
289 int total_send_bytes =
sizeof(int) * send_header.GetTotalNwords();
295 if (n != total_send_bytes) {
297 printf(
"[WARNING] Serializer: Sent byte( %d bytes) is not same as the event size ( %d bytes). Retrying...\n", n,
303 if (n < (
int)(iov[ 0 ].iov_len)) {
307 if (n < (
int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len)) {
308 n +=
Send(
m_socket_send, (
char*)iov[ 1 ].iov_base + (n - iov[ 0 ].iov_len), iov[ 1 ].iov_len - (n - iov[ 0 ].iov_len));
311 if (n < (
int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len + iov[ 2 ].iov_len)) {
312 n +=
Send(
m_socket_send, (
char*)iov[ 2 ].iov_base + (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len),
313 iov[ 2 ].iov_len - (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len));
317 printf(
"[WARNING] Resending ends. It takes %lf (s)\n", retry_end - retry_start);
323 return total_send_bytes;
333 if ((ret = send(socket,
334 buf + sent_bytes, size_bytes - sent_bytes, MSG_NOSIGNAL)) < 0) {
335 if (errno == EINTR) {
337 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
340 callCheckRunPause(err_str);
345 sprintf(err_buf,
"[WARNING] SEND ERROR.(%s)", strerror(errno));
349 printf(
"%s\n", err_buf); fflush(stdout);
350 string err_str =
"RUN_ERROR";
353 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
359 if (sent_bytes == size_bytes)
break;
364void DesSer::Accept(
bool close_listen)
369 struct hostent* host;
373 sprintf(temp_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
375 print_err.PrintError(temp_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
383 struct sockaddr_in sock_listen;
384 sock_listen.sin_family = AF_INET;
385 sock_listen.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
387 socklen_t addrlen =
sizeof(sock_listen);
389 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
392 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)
sizeof(flags));
394 perror(
"Failed to set REUSEADDR");
397 if (bind(fd_listen, (
struct sockaddr*)&sock_listen,
sizeof(
struct sockaddr)) < 0) {
398 printf(
"[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
404 sprintf(cmdline,
"/usr/sbin/ss -ap | grep %d",
m_port_to);
405 if ((fp = popen(cmdline,
"r")) == NULL) {
406 printf(
"[WARNING] Failed to run %s\n", cmdline);
408 while (fgets(buf, 256, fp) != NULL) {
409 printf(
"[ERROR] Failed to bind. output of ss(port %d) : %s\n",
m_port_to, buf); fflush(stdout);
414 sprintf(temp_char,
"[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
416 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
421 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)
sizeof(val1));
423 if (listen(fd_listen, backlog) < 0) {
425 sprintf(err_buf,
"Failed in listen(%s). Exting...", strerror(errno));
426 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
434 struct sockaddr_in sock_accept;
438 if ((fd_accept = accept(fd_listen, (
struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
440 sprintf(err_buf,
"[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
441 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
445 printf(
"[DEBUG] Done.\n"); fflush(stdout);
448 struct timeval timeout;
451 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)
sizeof(timeout));
453 char temp_char[100] =
"[FATAL] Failed to set TIMEOUT. Exiting...";
454 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
468 m_status.setOutputPort(ntohs(sock_listen.sin_port));
469 m_status.setOutputAddress(sock_listen.sin_addr.s_addr);
471 printf(
"Accepted. port %d address %d\n", (
int)ntohs(sock_listen.sin_port), (
int)sock_listen.sin_addr.s_addr); fflush(stdout);
481 gettimeofday(&t, NULL);
482 return (t.tv_sec + t.tv_usec * 1.e-6);
488 if (event >= 50000 && event < 50500) {
489 array[
event - 50000 ] =
getTimeSec() - m_start_time;
497 unsigned int checksum = 0;
498 for (
int i = 0; i < nwords; i++) {
499 checksum = checksum ^ buf[ i ];
508 for (
int i = 0; i < nwords; i++) {
509 printf(
"%.8x ", buf[ i ]);
510 if (i % 10 == 9) printf(
"\n[DEBUG]");
519void DesSer::openRunPauseNshm()
521 char path_shm[100] =
"/cpr_pause_resume";
522 int fd = shm_open(path_shm, O_RDONLY, 0666);
524 printf(
"[DEBUG] %s\n", path_shm);
525 perror(
"[ERROR] shm_open2");
528 m_ptr = (
int*)mmap(NULL,
sizeof(
int), PROT_READ, MAP_SHARED, fd, 0);
532int DesSer::checkRunPause()
545int DesSer::checkRunRecovery()
558void DesSer::resumeRun()
562 printf(
"###########(Ser) the 1st event sicne the resume ###############\n");
572void DesSer::pauseRun()
576 printf(
"###########(Ser) Pause the run ###############\n");
585void DesSer::callCheckRunPause(
const string& err_str)
589 printf(
"###########(DesSer) TIMEOUT. ###############\n");
593 if (checkRunPause()) {
596 printf(
"###########(DesSer) Pause is detected. ###############\n");
607int DesSer::CheckConnection(
int socket)
616 printf(
"CC1\n"); fflush(stdout);
617 ret = send(socket, buffer, 0, MSG_DONTWAIT);
618 printf(
"CC2\n"); fflush(stdout);
623 if (errno == EAGAIN) {
624 printf(
"EAGAIN %d cnt %d recvd %d\n", socket, eagain_cnt, tot_ret); fflush(stdout);
626 if (eagain_cnt > 100) {
632 printf(
"ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
637 printf(
"Return value %d of send is strange. Exting...\n", ret);
648 ret = recv(socket, buffer,
sizeof(buffer), MSG_DONTWAIT);
651 printf(
"EOF %d\n", socket); fflush(stdout);
655 if (errno == EAGAIN) {
656 printf(
"EAGAIN %d cnt %d recvd %d\n", socket, eagain_cnt, tot_ret); fflush(stdout);
658 if (eagain_cnt > 100) {
664 printf(
"ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
671 printf(
"Flushing data in socket buffer : sockid = %d %d bytes tot %d bytes\n", socket, ret, tot_ret); fflush(stdout);
void shmOpen(char *path_cfg, char *path_sta)
open shared memory
int m_port_to
Destination port.
RunInfoBuffer m_status
Run info buffer.
std::string m_hostname_local
Destination Host.
unsigned int m_prev_exprunsubrun_no
run no.
unsigned int calcXORChecksum(int *buf, int nwords)
calculate checksum
std::string m_nodename
Node Name for SLC.
int * getPreAllocBuf()
Getbuffer.
int m_start_flag
start flag
RawHeader_v2 tmp_header
which format is used
DesSer()
Constructor / Destructor.
int n_basf2evt
No. of sent events.
int m_socket_send
Reciever Socket.
CprErrorMessage print_err
wrapper for B2LOG system
int m_run_pause
flag to show that run-controller pauses a run
void printData(int *buf, int nwords)
dump error data
int m_run_error
flag to show that there is some errors with which DAQ cannot continue.
int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
int m_shmflag
Use shared memory.
void recordTime(int event, double *array)
store time info.
int m_num_connections
check data contents
int m_compressionLevel
Compression Level.
int * m_bufary[NUM_PREALLOC_BUF]
buffer
unsigned int m_exprunsubrun_no
run no.
int m_nodeid
Node ID for SLC.
int Send(int socket, char *buf, int size_bytes)
send buffer
double getTimeSec()
store time info.
unsigned int GetEveNo()
get restart #(8bit)
unsigned int GetNodeID()
get contents of header
unsigned int GetExpRunSubrun()
get contents of header
Abstract base class for different kinds of events.