9 #include <daq/rawdata/modules/DeSerializerPC.h>
10 #include <daq/dataobjects/SendHeader.h>
11 #include <daq/dataobjects/SendTrailer.h>
12 #include <rawdata/dataobjects/RawTLU.h>
15 #include <netinet/tcp.h>
22 #define USE_DESERIALIZER_PREPC
40 #ifndef REDUCED_RAWCOPPER
41 #ifdef USE_DESERIALIZER_PREPC
56 B2INFO(
"DeSerializerPC: Constructor done.");
63 DeSerializerPCModule::~DeSerializerPCModule()
71 B2INFO(
"DeSerializerPC: initialize() started.");
75 B2FATAL(
"[FATAL] Hostname or port# is not specified for all connections. Please check a python script. Exiting... \n");
83 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
90 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
95 m_eventMetaDataPtr.registerInDataStore();
97 raw_datablkarray.registerInDataStore();
98 rawcprarray.registerInDataStore();
99 raw_ftswarray.registerInDataStore();
109 memset(time_array0, 0,
sizeof(time_array0));
110 memset(time_array1, 0,
sizeof(time_array1));
111 memset(time_array2, 0,
sizeof(time_array2));
127 m_prev_copper_ctr = 0xFFFFFFFF;
128 m_prev_evenum = 0xFFFFFFFF;
134 B2INFO(
"DeSerializerPC: initialize() done.");
143 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n , flag)) < 0) {
144 if (errno == EINTR) {
146 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
150 callCheckRunPause(err_str);
155 sprintf(err_buf,
"[WARNING] recv() returned error; ret = %d. : %s %s %d",
156 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
160 string err_str =
"RUN_ERROR";
166 }
else if (read_size == 0) {
168 sprintf(err_buf,
"[WARNING] Connection is closed by peer(%s).: %s %s %d",
169 strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
173 string err_str =
"RUN_ERROR";
180 if (n == data_size_byte)
break;
193 struct sockaddr_in socPC;
194 socPC.sin_family = AF_INET;
196 struct hostent* host;
200 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...",
m_hostname_from[ i ].c_str(),
206 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
208 int sd = socket(PF_INET, SOCK_STREAM, 0);
211 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
213 struct timeval timeout;
216 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)
sizeof(timeout));
220 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
221 perror(
"Failed to connect. Retrying...");
224 printf(
"[DEBUG] Done\n");
235 getsockopt(
m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
237 printf(
"[DEBUG] SO_RCVBUF %d\n", val);
239 getsockopt(
m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
241 printf(
"[DEBUG] SO_SNDBUF %d\n", val);
243 getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
245 printf(
"[DEBUG] TCP_MAXSEG %d\n", val);
247 getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
249 printf(
"[DEBUG] TCP_NODELAY %d\n", val);
253 memset(&sa, 0,
sizeof(sockaddr_in));
254 socklen_t sa_len =
sizeof(sa);
255 if (getsockname(
m_socket[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
256 g_status.setInputPort(ntohs(sa.sin_port));
257 g_status.setInputAddress(sa.sin_addr.s_addr);
262 printf(
"[DEBUG] Initialization finished\n");
269 int* num_nodes_in_sendblock)
272 int* temp_buf = NULL;
275 vector <int> each_buf_nwords;
276 each_buf_nwords.clear();
277 vector <int> each_buf_nodes;
278 each_buf_nodes.clear();
279 vector <int> each_buf_events;
280 each_buf_events.clear();
282 *total_buf_nwords = 0;
283 *num_nodes_in_sendblock = 0;
284 *num_events_in_sendblock = 0;
289 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
290 int temp_num_events = 0;
291 int temp_num_nodes = 0;
294 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
296 recvFD(
m_socket[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
302 temp_num_nodes = send_hdr.GetNumNodesinPacket();
307 *num_events_in_sendblock = temp_num_events;
308 }
else if (*num_events_in_sendblock != temp_num_events) {
310 #ifndef NO_DATA_CHECK
311 printf(
"[DEBUG] *******HDR**********\n");
312 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
315 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
316 *num_events_in_sendblock , temp_num_events, i, *num_nodes_in_sendblock , temp_num_nodes, i);
323 *num_nodes_in_sendblock += temp_num_nodes;
325 int rawblk_nwords = send_hdr.GetTotalNwords()
326 - SendHeader::SENDHDR_NWORDS
327 - SendTrailer::SENDTRL_NWORDS;
328 *total_buf_nwords += rawblk_nwords;
333 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
334 printf(
"[DEBUG] *******HDR**********\n");
339 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
340 send_hdr.GetTotalNwords());
347 each_buf_nwords.push_back(rawblk_nwords);
348 each_buf_events.push_back(temp_num_events);
349 each_buf_nodes.push_back(temp_num_nodes);
354 temp_buf =
getNewBuffer(*total_buf_nwords, delete_flag);
358 int total_recvd_byte = 0;
359 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
362 total_recvd_byte +=
recvFD(
m_socket[ i ], (
char*)temp_buf + total_recvd_byte,
363 each_buf_nwords[ i ] *
sizeof(
int), flag);
364 }
catch (
string err_str) {
366 B2WARNING(
"Delete buffer before going to Run-pause state");
376 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
377 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
378 temp_length += this_length *
sizeof(int);
380 if (temp_length != (
int)(each_buf_nwords[ i ] *
sizeof(int))) {
381 printf(
"[DEBUG]*******SENDHDR*********** \n");
382 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
383 printf(
"[DEBUG]*******BODY***********\n ");
384 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
386 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
387 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
395 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
397 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
398 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
405 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
406 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
408 recvFD(
m_socket[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
409 }
catch (
string err_str) {
411 B2WARNING(
"Delete buffer before going to Run-pause state");
427 int total_buf_nwords = 0 ;
428 int num_events_in_sendblock = 0;
429 int num_nodes_in_sendblock = 0;
431 if (
m_start_flag == 0) B2INFO(
"DeSerializerPC: Reading the 1st packet from eb0...");
433 int* temp_buf =
recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
434 &num_nodes_in_sendblock);
436 B2INFO(
"DeSerializerPC: Done. the size of the 1st packet " << total_buf_nwords <<
" words");
439 m_totbytes += total_buf_nwords *
sizeof(int);
444 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, 0,
446 num_events_in_sendblock, num_nodes_in_sendblock);
452 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
455 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
456 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
468 unsigned int* subrun_copper_0,
unsigned int* eve_copper_0,
unsigned int* error_bit_flag)
477 int* temp_buf = raw_datablk->
GetBuffer(0);
479 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
480 unsigned int eve_array[32];
481 unsigned int utime_array[32];
482 unsigned int ctime_type_array[32];
485 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
489 memset(eve_array, 0,
sizeof(eve_array));
490 memset(utime_array, 0,
sizeof(utime_array));
491 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
493 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
494 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
495 int entry_id = l + k * num_nodes_in_sendblock;
504 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
505 printf(
"[DEBUG] ######FTSW#########\n");
512 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
516 #ifndef NO_DATA_CHECK
519 eve_array[ entry_id ] = cur_evenum;
520 }
catch (
string err_str) {
525 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
532 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
539 printf(
"[DEBUG] ######TLU#########\n");
543 #ifndef NO_DATA_CHECK
545 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
546 eve_array[ entry_id ] = cur_evenum;
547 }
catch (
string err_str) {
566 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
567 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
568 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
571 #ifndef NO_DATA_CHECK
574 temp_rawcopper->
CheckData(0, m_prev_evenum, &cur_evenum,
575 m_prev_copper_ctr, &cur_copper_ctr,
577 eve_array[ entry_id ] = cur_evenum;
578 }
catch (
string err_str) {
585 utime_array[ entry_id ] = temp_rawcopper->
GetTTUtime(0);
591 *eve_copper_0 = temp_rawcopper->
GetEveNo(0);
592 *exp_copper_0 = temp_rawcopper->
GetExpNo(0);
593 *run_copper_0 = temp_rawcopper->
GetRunNo(0);
595 }
else if (cpr_num == 1) {
603 delete temp_rawcopper;
607 #ifndef NO_DATA_CHECK
609 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
610 if (eve_array[ 0 ] != eve_array[ l ]) {
615 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
616 printf(
"[DEBUG] node %d eve # %d utime %x ctime %x\n",
617 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
619 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
629 printf(
"[DEBUG] ##############################################\n");
631 printf(
"[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
633 printf(
"[DEBUG] ##############################################\n");
636 m_prev_evenum = cur_evenum;
637 m_prev_copper_ctr = cur_copper_ctr;
644 void DeSerializerPCModule::waitResume()
649 printf(
"###########(Ser) Waiting for Resume ###############\n");
653 if (checkRunRecovery()) {
672 if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
676 if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
680 if (error_set) B2INFO(
"Raw2Ds: Error flag was set in EventMetaData.");
688 unsigned int exp_copper_0 = 0;
689 unsigned int run_copper_0 = 0;
690 unsigned int subrun_copper_0 = 0;
691 unsigned int eve_copper_0 = 0;
692 unsigned int error_bit_flag = 0;
702 B2INFO(
"DeSerializerPC: Waiting for Start...\n");
711 if (g_run_pause > 0 || g_run_error > 0) {
712 if (g_run_pause == 0) {
714 if (checkRunPause())
break;
717 printf(
"###########(DeserializerPC) Waiting for Runpause() ###############\n");
725 m_eventMetaDataPtr.create();
737 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
746 checkData(&temp_rawdatablk, &exp_copper_0, &run_copper_0, &subrun_copper_0, &eve_copper_0, &error_bit_flag);
747 }
catch (
string err_str) {
750 if (err_str ==
"RUN_PAUSE" || err_str ==
"RUN_ERROR") {
751 m_eventMetaDataPtr.create();
755 print_err.PrintError((
char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
759 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
764 if (buf_rc != NULL) {
771 m_eventMetaDataPtr.create();
772 m_eventMetaDataPtr->setExperiment(exp_copper_0);
773 m_eventMetaDataPtr->setRun(run_copper_0);
774 m_eventMetaDataPtr->setSubrun(subrun_copper_0);
775 m_eventMetaDataPtr->setEvent(eve_copper_0);
777 setErrorFlag(error_bit_flag, m_eventMetaDataPtr);
778 if (error_bit_flag != 0) {
780 printf(
"[ERROR] error bit was detected. exp %d run %d eve %d count = %d\n",
781 exp_copper_0, run_copper_0, eve_copper_0, error_bit_flag);
803 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
805 m_eventMetaDataPtr->setEndOfData();
810 RateMonitor(eve_copper_0, subrun_copper_0, run_copper_0);
814 g_status.setInputNBytes(m_totbytes);