8 #include <daq/rawdata/DesSerPrePC.h>
9 #include <rawdata/dataobjects/RawFTSWFormat_latest.h>
10 #include <rawdata/dataobjects/RawTLUFormat.h>
12 #include <arpa/inet.h>
14 #include <netinet/tcp.h>
16 #include <sys/socket.h>
27 DesSerPrePC::DesSerPrePC(
string host_recv,
int port_recv,
string host_send,
int port_send,
int shmflag,
28 const std::string& nodename,
int nodeid)
31 for (
int i = 0 ; i < m_num_connections; i++) {
33 m_hostname_from.push_back(host_recv);
35 m_port_from.push_back(port_recv) ;
36 m_socket_recv.push_back(-1);
40 m_port_to = port_send;
42 m_hostname_local = host_send;
43 m_nodename = nodename;
48 printf(
"[INFO] DeSerializerPrePC: Constructor done.\n"); fflush(stdout);
53 DesSerPrePC::~DesSerPrePC()
59 int DesSerPrePC::recvFD(
int sock,
char* buf,
int data_size_byte,
int flag)
64 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n , flag)) < 0) {
67 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
70 callCheckRunPause(err_str);
76 sprintf(err_buf,
"recv() returned error; ret = %d. : %s %s %d",
77 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
81 printf(
"[WARNING] %s\n", err_buf); fflush(stdout);
82 string err_str =
"RUN_ERROR";
83 printf(
"AIUEO********************\n"); fflush(stdout);
86 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
89 }
else if (read_size == 0) {
92 sprintf(err_buf,
"[WARNING] Connection is closed by peer(%s). readsize = %d %d. : %s %s %d",
93 strerror(errno), read_size, errno, __FILE__, __PRETTY_FUNCTION__, __LINE__);
97 printf(
"%s\n", err_buf); fflush(stdout);
98 string err_str =
"RUN_ERROR";
101 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
106 if (n == data_size_byte)
break;
113 int DesSerPrePC::Connect()
116 for (
int i = 0; i < m_num_connections; i++) {
118 if (m_socket_recv[ i ] >= 0)
continue;
123 struct hostent* host;
124 host = gethostbyname(m_hostname_from[ i ].c_str());
127 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
129 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
134 struct sockaddr_in socPC;
135 socPC.sin_family = AF_INET;
136 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
137 socPC.sin_port = htons(m_port_from[ i ]);
138 int sd = socket(PF_INET, SOCK_STREAM, 0);
140 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
142 struct timeval timeout;
145 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)
sizeof(timeout));
147 printf(
"[DEBUG] Connecting to %s port %d\n" , m_hostname_from[ i ].c_str(), m_port_from[ i ]); fflush(stdout);
150 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
151 perror(
"Failed to connect. Retrying...");
155 printf(
"[DEBUG] Done\n"); fflush(stdout);
160 m_socket_recv[ i ] = sd;
165 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
167 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
169 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
171 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
174 if (m_status.isAvailable()) {
176 memset(&sa, 0,
sizeof(sockaddr_in));
177 socklen_t sa_len =
sizeof(sa);
178 if (getsockname(m_socket_recv[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
179 m_status.setInputPort(ntohs(sa.sin_port));
180 m_status.setInputAddress(sa.sin_addr.s_addr);
186 printf(
"[DEBUG] Initialization finished\n"); fflush(stdout);
192 int* DesSerPrePC::recvData(
int* delete_flag,
int* total_buf_nwords,
int* num_events_in_sendblock,
int* num_nodes_in_sendblock)
194 int* temp_buf = NULL;
197 vector <int> each_buf_nwords;
198 each_buf_nwords.clear();
199 vector <int> each_buf_nodes;
200 each_buf_nodes.clear();
201 vector <int> each_buf_events;
202 each_buf_events.clear();
204 *total_buf_nwords = 0;
205 *num_nodes_in_sendblock = 0;
206 *num_events_in_sendblock = 0;
211 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
212 int temp_num_events = 0;
213 int temp_num_nodes = 0;
216 for (
int i = 0; i < (int)(m_socket_recv.size()); i++) {
218 recvFD(m_socket_recv[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
224 temp_num_nodes = send_hdr.GetNumNodesinPacket();
227 *num_events_in_sendblock = temp_num_events;
228 }
else if (*num_events_in_sendblock != temp_num_events) {
229 #ifndef NO_DATA_CHECK
232 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
233 *num_events_in_sendblock , temp_num_events, i, *num_nodes_in_sendblock , temp_num_nodes, i);
234 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
240 *num_nodes_in_sendblock += temp_num_nodes;
242 int rawblk_nwords = send_hdr.GetTotalNwords()
243 - SendHeader::SENDHDR_NWORDS
244 - SendTrailer::SENDTRL_NWORDS;
245 *total_buf_nwords += rawblk_nwords;
250 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
251 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
253 sprintf(err_buf,
"CORRUPTED DATA: Too large event : Header %d %d %d %d :block size %d words\n", i, temp_num_events, temp_num_nodes,
254 send_hdr.GetTotalNwords(), rawblk_nwords);
255 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
261 each_buf_nwords.push_back(rawblk_nwords);
262 each_buf_events.push_back(temp_num_events);
263 each_buf_nodes.push_back(temp_num_nodes);
268 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag);
272 int total_recvd_byte = 0;
273 for (
int i = 0; i < (int)(m_socket_recv.size()); i++) {
276 total_recvd_byte += recvFD(m_socket_recv[ i ], (
char*)temp_buf + total_recvd_byte,
277 each_buf_nwords[ i ] *
sizeof(
int), flag);
278 }
catch (
string err_str) {
281 printf(
"[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
290 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
291 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
292 temp_length += this_length *
sizeof(int);
294 if (temp_length != (
int)(each_buf_nwords[ i ] *
sizeof(int))) {
295 printf(
"[DEBUG]*******SENDHDR*********** \n");
296 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
297 printf(
"[DEBUG]*******BODY***********\n ");
298 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
300 sprintf(err_buf,
"CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
301 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
302 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
309 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
311 sprintf(err_buf,
"CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
312 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
313 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
319 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
320 for (
int i = 0; i < (int)(m_socket_recv.size()); i++) {
322 recvFD(m_socket_recv[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
323 }
catch (
string err_str) {
326 printf(
"[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
342 int total_buf_nwords = 0 ;
343 int num_events_in_sendblock = 0;
344 int num_nodes_in_sendblock = 0;
346 if (m_start_flag == 0) {
348 printf(
"DeSerializerPrePC: Reading the 1st packet from eb0...\n"); fflush(stdout);
350 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
351 &num_nodes_in_sendblock);
352 if (m_start_flag == 0) {
354 printf(
"DeSerializerPrePC: Done. the size of the 1st packet %d words\n", total_buf_nwords); fflush(stdout);
357 m_recvd_totbytes += total_buf_nwords *
sizeof(int);
359 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, *delete_flag,
360 num_events_in_sendblock, num_nodes_in_sendblock);
366 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
369 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
370 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
371 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
390 int* temp_buf = raw_datablk->
GetBuffer(0);
392 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
393 unsigned int eve_array[32];
394 unsigned int utime_array[32];
395 unsigned int ctime_type_array[32];
398 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
402 memset(eve_array, 0,
sizeof(eve_array));
403 memset(utime_array, 0,
sizeof(utime_array));
404 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
406 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
407 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
408 int entry_id = l + k * num_nodes_in_sendblock;
418 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
419 printf(
"[DEBUG] ######FTSW#########\n");
426 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
430 #ifndef NO_DATA_CHECK
432 temp_rawftsw->
CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
433 eve_array[ entry_id ] = cur_evenum;
434 }
catch (
string err_str) {
436 strcpy(err_buf, err_str.c_str());
437 print_err.PrintError(err_buf , __FILE__, __PRETTY_FUNCTION__, __LINE__);
441 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
448 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
455 printf(
"[DEBUG] ######TLU#########\n");
459 #ifndef NO_DATA_CHECK
461 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
462 eve_array[ entry_id ] = cur_evenum;
463 }
catch (
string err_str) {
465 strcpy(err_buf, err_str.c_str());
466 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
481 "do not use the following for actual DAQ"
482 (pre_rawcpr_fmt->
GetBuffer(block_id))[ RawHeader_v2::POS_EXP_RUN_NO ] = exp_run_ftsw;
483 (pre_rawcpr_fmt->
GetBuffer(block_id))[ RawHeader_v2::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
484 (pre_rawcpr_fmt->
GetBuffer(block_id))[ RawHeader_v2::POS_TTUTIME ] = utime_ftsw;
487 #ifndef NO_DATA_CHECK
489 pre_rawcpr_fmt->
CheckData(0, m_prev_evenum, &cur_evenum,
490 m_prev_copper_ctr, &cur_copper_ctr,
491 m_prev_exprunsubrun_no, &m_exprunsubrun_no);
492 eve_array[ entry_id ] = cur_evenum;
493 }
catch (
string err_str) {
498 utime_array[ entry_id ] = pre_rawcpr_fmt->
GetTTUtime(0);
503 *eve_copper_0 = (raw_datablk->
GetBuffer(entry_id))[ 3 ];
504 }
else if (cpr_num == 1) {
508 delete pre_rawcpr_fmt;
512 #ifndef NO_DATA_CHECK
514 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
515 if (eve_array[ 0 ] != eve_array[ l ] ||
516 utime_array[ 0 ] != utime_array[ l ] ||
517 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
519 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
520 printf(
"[DEBUG] node %d eve # %d utime %x ctime %x\n",
521 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
523 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
524 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
540 m_prev_evenum = cur_evenum;
541 m_prev_copper_ctr = cur_copper_ctr;
542 m_prev_exprunsubrun_no = m_exprunsubrun_no;
549 void DesSerPrePC::DataAcquisition()
552 unsigned int eve_copper_0 = 0;
554 printf(
"[DEBUG] initializing...\n"); fflush(stdout);
558 printf(
"[DEBUG] Done.\n"); fflush(stdout);
560 if (m_start_flag == 0) {
565 if (m_status.isAvailable()) {
567 printf(
"[DEBUG] DeSerializerPrePC: Waiting for Start...\n"); fflush(stdout);
568 m_status.reportRunning();
570 m_start_time = getTimeSec();
582 if (m_run_pause > 0 || m_run_error > 0) {
594 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
599 int delete_flag_from =
605 setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
606 checkData(&temp_rawdatablk, &eve_copper_0);
607 }
catch (
string err_str) {
608 printf(
"Error was detected\n"); fflush(stdout);
616 int temp_num_events, temp_num_nodes;
619 #ifdef REDUCED_RAWCOPPER
628 int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
629 buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
632 m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
633 if (calced_temp_nwords_to != temp_nwords_to) {
636 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
637 calced_temp_nwords_to, temp_nwords_to);
638 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
641 m_status.copyEventHeader(buf_to);
647 delete_flag_to = delete_flag_from;
648 delete_flag_from = 0;
657 raw_datablk[ j ].
SetBuffer(buf_to, temp_nwords_to, delete_flag_to, temp_num_events, temp_num_nodes);
663 #ifdef REDUCED_RAWCOPPER
668 post_rawcopper_v2.
SetBuffer(raw_datablk[ j ].GetWholeBuffer(), raw_datablk[ j ].TotalBufNwords(),
669 0, raw_datablk[ j ].GetNumEvents(), raw_datablk[ j ].GetNumNodes());
671 for (
int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
674 post_rawcopper_v2.
CheckCRC16(block_num, i_finesse_num);
683 if (m_run_pause != 0 || m_run_error != 0)
continue;
690 if (m_start_flag == 0) {
691 m_start_time = getTimeSec();
697 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
702 if (m_start_flag == 0) {
704 printf(
"[DEBUG] SerializerPC: Sending the 1st packet...\n"); fflush(stdout);
708 m_sent_totbytes += sendByWriteV(&(raw_datablk[ j ]));
709 }
catch (
string err_str) {
713 print_err.PrintError((
char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
716 if (m_start_flag == 0) {
718 printf(
"[DEBUG] Done.\n"); fflush(stdout);
725 if (m_run_pause != 0 || m_run_error != 0)
continue;
731 if (max_nevt >= 0 || max_seconds >= 0.) {
733 if (n_basf2evt % 10000 == 0) {
736 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
737 max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
742 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC) % 100000 == 0) {
743 double interval = cur_time - m_prev_time;
744 double total_time = cur_time - m_start_time;
745 printf(
"[DEBUG] Event %12d Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] sent %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s]\n",
746 n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC,
747 (n_basf2evt - m_prev_nevt)*NUM_EVT_PER_BASF2LOOP_PC / interval / 1.e3,
748 (m_recvd_totbytes - m_recvd_prev_totbytes) / interval / 1.e6,
749 (m_sent_totbytes - m_sent_prev_totbytes) / interval / 1.e6,
754 m_prev_time = cur_time;
755 m_recvd_prev_totbytes = m_recvd_totbytes;
756 m_sent_prev_totbytes = m_sent_totbytes;
757 m_prev_nevt = n_basf2evt;
758 cur_time = getTimeSec();
763 if (m_status.isAvailable()) {
764 m_status.setOutputNBytes(m_sent_totbytes);
765 m_status.setOutputCount(n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC);
779 void DesSerPrePC::waitResume()
781 if (m_run_pause == 0) {
783 if (checkRunPause())
break;
786 printf(
"###########(DesSerPrePC) Waiting for Runstop() ###############\n");
797 printf(
"###########(Ser) Waiting for Resume ###############\n");
801 if (checkRunRecovery()) {
809 printf(
"Done!\n"); fflush(stdout);
811 printf(
"Checking connection to eb0\n"); fflush(stdout);
812 if (CheckConnection(m_socket_send) < 0) {
813 printf(
"Trying Accept1\n"); fflush(stdout);
815 printf(
"Trying Accept2\n"); fflush(stdout);
818 printf(
"Checking connection to COPPER\n"); fflush(stdout);
819 for (
int i = 0; i < m_num_connections; i++) {
820 if (CheckConnection(m_socket_recv[ i ]) < 0) m_socket_recv[ i ] = -1;
The Raw COPPER class ver.2 This class stores data received by COPPER via belle2link Data from all det...
int CheckCRC16(int n, int finesse_num)
check magic words
int GetFINESSENwords(int n, int finesse) OVERRIDE_CPP17
get data size of FINESSE buffer
unsigned int GetExpRunSubrun(int n) OVERRIDE_CPP17
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
Check if COPPER Magic words are correct.
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
get b2l block from "FEE b2link header"
Abstract base class for different kinds of events.