8 #include <daq/rawdata/DesSerPrePC.h>
9 #include <rawdata/dataobjects/RawFTSWFormat_latest.h>
10 #include <rawdata/dataobjects/RawTLUFormat.h>
13 #include <arpa/inet.h>
15 #include <netinet/tcp.h>
17 #include <sys/socket.h>
28 DesSerPrePC::DesSerPrePC(
string host_recv,
int port_recv,
const string& host_send,
int port_send,
int shmflag,
29 const std::string& nodename,
int )
32 for (
int i = 0 ; i < m_num_connections; i++) {
34 m_hostname_from.push_back(host_recv);
36 m_port_from.push_back(port_recv) ;
37 m_socket_recv.push_back(-1);
41 m_port_to = port_send;
43 m_hostname_local = host_send;
44 m_nodename = nodename;
49 printf(
"[INFO] DeSerializerPrePC: Constructor done.\n"); fflush(stdout);
54 DesSerPrePC::~DesSerPrePC()
60 int DesSerPrePC::recvFD(
int sock,
char* buf,
int data_size_byte,
int flag)
65 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n, flag)) < 0) {
68 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
71 callCheckRunPause(err_str);
77 sprintf(err_buf,
"recv() returned error; ret = %d. : %s %s %d",
78 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
82 printf(
"[WARNING] %s\n", err_buf); fflush(stdout);
83 string err_str =
"RUN_ERROR";
84 printf(
"AIUEO********************\n"); fflush(stdout);
87 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
90 }
else if (read_size == 0) {
93 sprintf(err_buf,
"[WARNING] Connection is closed by peer(%s). readsize = %d %d. : %s %s %d",
94 strerror(errno), read_size, errno, __FILE__, __PRETTY_FUNCTION__, __LINE__);
98 printf(
"%s\n", err_buf); fflush(stdout);
99 string err_str =
"RUN_ERROR";
102 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
107 if (n == data_size_byte)
break;
114 int DesSerPrePC::Connect()
117 for (
int i = 0; i < m_num_connections; i++) {
119 if (m_socket_recv[ i ] >= 0)
continue;
124 struct hostent* host;
125 host = gethostbyname(m_hostname_from[ i ].c_str());
128 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
130 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
135 struct sockaddr_in socPC;
136 socPC.sin_family = AF_INET;
137 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
138 socPC.sin_port = htons(m_port_from[ i ]);
139 int sd = socket(PF_INET, SOCK_STREAM, 0);
141 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
143 struct timeval timeout;
146 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)
sizeof(timeout));
148 printf(
"[DEBUG] Connecting to %s port %d\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]); fflush(stdout);
151 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
152 perror(
"Failed to connect. Retrying...");
156 printf(
"[DEBUG] Done\n"); fflush(stdout);
161 m_socket_recv[ i ] = sd;
166 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
168 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
170 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
172 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
175 if (m_status.isAvailable()) {
177 memset(&sa, 0,
sizeof(sockaddr_in));
178 socklen_t sa_len =
sizeof(sa);
179 if (getsockname(m_socket_recv[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
180 m_status.setInputPort(ntohs(sa.sin_port));
181 m_status.setInputAddress(sa.sin_addr.s_addr);
187 printf(
"[DEBUG] Initialization finished\n"); fflush(stdout);
193 int* DesSerPrePC::recvData(
int* delete_flag,
int* total_buf_nwords,
int* num_events_in_sendblock,
int* num_nodes_in_sendblock)
195 int* temp_buf = NULL;
198 vector <int> each_buf_nwords;
199 each_buf_nwords.clear();
200 vector <int> each_buf_nodes;
201 each_buf_nodes.clear();
202 vector <int> each_buf_events;
203 each_buf_events.clear();
205 *total_buf_nwords = 0;
206 *num_nodes_in_sendblock = 0;
207 *num_events_in_sendblock = 0;
212 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
215 for (
int i = 0; i < (int)(m_socket_recv.size()); i++) {
217 recvFD(m_socket_recv[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
223 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
226 *num_events_in_sendblock = temp_num_events;
227 }
else if (*num_events_in_sendblock != temp_num_events) {
228 #ifndef NO_DATA_CHECK
231 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
232 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
233 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
239 *num_nodes_in_sendblock += temp_num_nodes;
241 int rawblk_nwords = send_hdr.GetTotalNwords()
242 - SendHeader::SENDHDR_NWORDS
243 - SendTrailer::SENDTRL_NWORDS;
244 *total_buf_nwords += rawblk_nwords;
249 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
250 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
252 sprintf(err_buf,
"CORRUPTED DATA: Too large event : Header %d %d %d %d :block size %d words\n", i, temp_num_events, temp_num_nodes,
253 send_hdr.GetTotalNwords(), rawblk_nwords);
254 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
260 each_buf_nwords.push_back(rawblk_nwords);
261 each_buf_events.push_back(temp_num_events);
262 each_buf_nodes.push_back(temp_num_nodes);
267 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag);
271 int total_recvd_byte = 0;
272 for (
int i = 0; i < (int)(m_socket_recv.size()); i++) {
275 total_recvd_byte += recvFD(m_socket_recv[ i ], (
char*)temp_buf + total_recvd_byte,
276 each_buf_nwords[ i ] *
sizeof(
int), flag);
277 }
catch (
string err_str) {
280 printf(
"[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
283 throw (std::move(err_str));
289 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
290 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
291 temp_length += this_length *
sizeof(int);
293 if (temp_length != (
int)(each_buf_nwords[ i ] *
sizeof(int))) {
294 printf(
"[DEBUG]*******SENDHDR*********** \n");
295 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
296 printf(
"[DEBUG]*******BODY***********\n ");
297 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
299 sprintf(err_buf,
"CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
300 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
301 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
308 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
310 sprintf(err_buf,
"CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
311 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
312 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
318 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
319 for (
int i = 0; i < (int)(m_socket_recv.size()); i++) {
321 recvFD(m_socket_recv[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
322 }
catch (
string err_str) {
325 printf(
"[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
328 throw (std::move(err_str));
341 int total_buf_nwords = 0 ;
342 int num_events_in_sendblock = 0;
343 int num_nodes_in_sendblock = 0;
345 if (m_start_flag == 0) {
347 printf(
"DeSerializerPrePC: Reading the 1st packet from eb0...\n"); fflush(stdout);
349 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
350 &num_nodes_in_sendblock);
351 if (m_start_flag == 0) {
353 printf(
"DeSerializerPrePC: Done. the size of the 1st packet %d words\n", total_buf_nwords); fflush(stdout);
356 m_recvd_totbytes += total_buf_nwords *
sizeof(int);
358 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, *delete_flag,
359 num_events_in_sendblock, num_nodes_in_sendblock);
365 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
368 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
369 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
370 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
389 int* temp_buf = raw_datablk->
GetBuffer(0);
391 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
392 unsigned int eve_array[32];
393 unsigned int utime_array[32];
394 unsigned int ctime_type_array[32];
397 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
401 memset(eve_array, 0,
sizeof(eve_array));
402 memset(utime_array, 0,
sizeof(utime_array));
403 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
405 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
406 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
407 int entry_id = l + k * num_nodes_in_sendblock;
417 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
418 printf(
"[DEBUG] ######FTSW#########\n");
425 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
429 #ifndef NO_DATA_CHECK
431 temp_rawftsw->
CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
432 eve_array[ entry_id ] = cur_evenum;
433 }
catch (
string err_str) {
435 strcpy(err_buf, err_str.c_str());
436 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
440 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
447 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
454 printf(
"[DEBUG] ######TLU#########\n");
458 #ifndef NO_DATA_CHECK
460 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
461 eve_array[ entry_id ] = cur_evenum;
462 }
catch (
string err_str) {
464 strcpy(err_buf, err_str.c_str());
465 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
480 "do not use the following for actual DAQ"
481 (pre_rawcpr_fmt->
GetBuffer(block_id))[ RawHeader_v2::POS_EXP_RUN_NO ] = exp_run_ftsw;
482 (pre_rawcpr_fmt->
GetBuffer(block_id))[ RawHeader_v2::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
483 (pre_rawcpr_fmt->
GetBuffer(block_id))[ RawHeader_v2::POS_TTUTIME ] = utime_ftsw;
486 #ifndef NO_DATA_CHECK
488 pre_rawcpr_fmt->
CheckData(0, m_prev_evenum, &cur_evenum,
489 m_prev_copper_ctr, &cur_copper_ctr,
490 m_prev_exprunsubrun_no, &m_exprunsubrun_no);
491 eve_array[ entry_id ] = cur_evenum;
492 }
catch (
string err_str) {
497 utime_array[ entry_id ] = pre_rawcpr_fmt->
GetTTUtime(0);
502 *eve_copper_0 = (raw_datablk->
GetBuffer(entry_id))[ 3 ];
503 }
else if (cpr_num == 1) {
507 delete pre_rawcpr_fmt;
511 #ifndef NO_DATA_CHECK
513 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
514 if (eve_array[ 0 ] != eve_array[ l ] ||
515 utime_array[ 0 ] != utime_array[ l ] ||
516 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
518 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
519 printf(
"[DEBUG] node %d eve # %x utime %x ctime %x\n",
520 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
522 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
523 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
539 m_prev_evenum = cur_evenum;
540 m_prev_copper_ctr = cur_copper_ctr;
541 m_prev_exprunsubrun_no = m_exprunsubrun_no;
548 void DesSerPrePC::DataAcquisition()
551 unsigned int eve_copper_0 = 0;
553 printf(
"[DEBUG] initializing...\n"); fflush(stdout);
557 printf(
"[DEBUG] Done.\n"); fflush(stdout);
559 if (m_start_flag == 0) {
564 if (m_status.isAvailable()) {
566 printf(
"[DEBUG] DeSerializerPrePC: Waiting for Start...\n"); fflush(stdout);
567 m_status.reportRunning();
569 m_start_time = getTimeSec();
581 if (m_run_pause > 0 || m_run_error > 0) {
593 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
598 int delete_flag_from =
604 setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
605 checkData(&temp_rawdatablk, &eve_copper_0);
606 }
catch (
string err_str) {
607 printf(
"Error was detected\n"); fflush(stdout);
615 int temp_num_events, temp_num_nodes;
618 #ifdef REDUCED_RAWCOPPER
627 int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
628 buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
631 m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
632 if (calced_temp_nwords_to != temp_nwords_to) {
635 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
636 calced_temp_nwords_to, temp_nwords_to);
637 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
640 m_status.copyEventHeader(buf_to);
646 delete_flag_to = delete_flag_from;
647 delete_flag_from = 0;
656 raw_datablk[ j ].
SetBuffer(buf_to, temp_nwords_to, delete_flag_to, temp_num_events, temp_num_nodes);
662 #ifdef REDUCED_RAWCOPPER
667 post_rawcopper_v2.
SetBuffer(raw_datablk[ j ].GetWholeBuffer(), raw_datablk[ j ].TotalBufNwords(),
668 0, raw_datablk[ j ].GetNumEvents(), raw_datablk[ j ].GetNumNodes());
670 for (
int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
673 post_rawcopper_v2.
CheckCRC16(block_num, i_finesse_num);
682 if (m_run_pause != 0 || m_run_error != 0)
continue;
689 if (m_start_flag == 0) {
690 m_start_time = getTimeSec();
696 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
701 if (m_start_flag == 0) {
703 printf(
"[DEBUG] SerializerPC: Sending the 1st packet...\n"); fflush(stdout);
707 m_sent_totbytes += sendByWriteV(&(raw_datablk[ j ]));
708 }
catch (
string err_str) {
712 print_err.PrintError((
char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
715 if (m_start_flag == 0) {
717 printf(
"[DEBUG] Done.\n"); fflush(stdout);
724 if (m_run_pause != 0 || m_run_error != 0)
continue;
730 if (max_nevt >= 0 || max_seconds >= 0.) {
732 if (n_basf2evt % 10000 == 0) {
735 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
736 max_nevt, max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
741 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC) % 100000 == 0) {
742 double interval = cur_time - m_prev_time;
743 double total_time = cur_time - m_start_time;
744 printf(
"[DEBUG] Event %12d Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] sent %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s]\n",
745 n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC,
746 (n_basf2evt - m_prev_nevt)*NUM_EVT_PER_BASF2LOOP_PC / interval / 1.e3,
747 (m_recvd_totbytes - m_recvd_prev_totbytes) / interval / 1.e6,
748 (m_sent_totbytes - m_sent_prev_totbytes) / interval / 1.e6,
753 m_prev_time = cur_time;
754 m_recvd_prev_totbytes = m_recvd_totbytes;
755 m_sent_prev_totbytes = m_sent_totbytes;
756 m_prev_nevt = n_basf2evt;
757 cur_time = getTimeSec();
762 if (m_status.isAvailable()) {
763 m_status.setOutputNBytes(m_sent_totbytes);
764 m_status.setOutputCount(n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC);
778 void DesSerPrePC::waitResume()
780 if (m_run_pause == 0) {
782 if (checkRunPause())
break;
785 printf(
"###########(DesSerPrePC) Waiting for Runstop() ###############\n");
796 printf(
"###########(Ser) Waiting for Resume ###############\n");
800 if (checkRunRecovery()) {
808 printf(
"Done!\n"); fflush(stdout);
810 printf(
"Checking connection to eb0\n"); fflush(stdout);
811 if (CheckConnection(m_socket_send) < 0) {
812 printf(
"Trying Accept1\n"); fflush(stdout);
814 printf(
"Trying Accept2\n"); fflush(stdout);
817 printf(
"Checking connection to COPPER\n"); fflush(stdout);
818 for (
int i = 0; i < m_num_connections; i++) {
819 if (CheckConnection(m_socket_recv[ i ]) < 0) m_socket_recv[ i ] = -1;
The Raw COPPER class ver.2 This class stores data received by COPPER via belle2link Data from all det...
int CheckCRC16(int n, int finesse_num)
check magic words
int GetFINESSENwords(int n, int finesse) OVERRIDE_CPP17
get data size of FINESSE buffer
unsigned int GetExpRunSubrun(int n) OVERRIDE_CPP17
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
Check if COPPER Magic words are correct.
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
get b2l block from "FEE b2link header"
Abstract base class for different kinds of events.