9 #include <daq/rawdata/modules/DeSerializerPrePC.h>
10 #include <daq/dataobjects/SendHeader.h>
11 #include <daq/dataobjects/SendTrailer.h>
12 #include <rawdata/dataobjects/RawTLU.h>
14 #include <netinet/tcp.h>
38 setDescription(
"Encode DataStore into RingBuffer");
40 addParam(
"NumConn", m_num_connections,
"Number of Connections", 0);
41 addParam(
"HostNameFrom", m_hostname_from,
"Hostnames of data sources");
42 addParam(
"PortFrom", m_port_from,
"port numbers of data sources");
44 B2INFO(
"DeSerializerPrePC: Constructor done.");
49 DeSerializerPrePCModule::~DeSerializerPrePCModule()
55 void DeSerializerPrePCModule::initialize()
57 B2INFO(
"DeSerializerPrePC: initialize() started.");
60 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
61 m_bufary[i] =
new int[ BUF_SIZE_WORD ];
63 m_buffer =
new int[ BUF_SIZE_WORD ];
67 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
68 memset(m_bufary[i], 0, BUF_SIZE_WORD *
sizeof(
int));
72 m_eventMetaDataPtr.registerInDataStore();
74 raw_datablkarray.registerInDataStore();
75 rawcprarray.registerInDataStore();
76 raw_ftswarray.registerInDataStore();
80 if (m_dump_fname.size() > 0) {
85 memset(time_array0, 0,
sizeof(time_array0));
86 memset(time_array1, 0,
sizeof(time_array1));
87 memset(time_array2, 0,
sizeof(time_array2));
93 if (m_nodename.size() == 0 || m_nodeid < 0) {
96 g_status.open(m_nodename, m_nodeid);
97 g_status.reportReady();
103 m_prev_copper_ctr = 0xFFFFFFFF;
104 m_prev_evenum = 0xFFFFFFFF;
106 B2INFO(
"DeSerializerPrePC: initialize() done.");
110 int DeSerializerPrePCModule::recvFD(
int sock,
char* buf,
int data_size_byte,
int flag)
115 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n , flag)) < 0) {
116 if (errno == EINTR) {
120 sprintf(err_buf,
"[FATAL] Failed to receive data(%s). Exiting...", strerror(errno));
121 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
127 if (n == data_size_byte)
break;
133 int DeSerializerPrePCModule::Connect()
135 for (
int i = 0; i < m_num_connections; i++) {
139 struct sockaddr_in socPC;
140 socPC.sin_family = AF_INET;
142 struct hostent* host;
143 host = gethostbyname(m_hostname_from[ i ].c_str());
146 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
148 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
152 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
153 socPC.sin_port = htons(m_port_from[ i ]);
154 int sd = socket(PF_INET, SOCK_STREAM, 0);
157 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
159 printf(
"[DEBUG] Connecting to %s port %d ...\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]);
161 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
162 perror(
"Failed to connect. Retrying...");
165 printf(
"[DEBUG] Done\n");
169 m_socket.push_back(sd);
174 getsockopt(m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
176 printf(
"[DEBUG] SO_RCVBUF %d\n", val);
178 getsockopt(m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
180 printf(
"[DEBUG] SO_SNDBUF %d\n", val);
182 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
184 printf(
"[DEBUG] TCP_MAXSEG %d\n", val);
186 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
188 printf(
"[DEBUG] TCP_NODELAY %d\n", val);
190 if (g_status.isAvailable()) {
192 memset(&sa, 0,
sizeof(sockaddr_in));
193 socklen_t sa_len =
sizeof(sa);
194 if (getsockname(m_socket[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
195 g_status.setInputPort(ntohs(sa.sin_port));
196 g_status.setInputAddress(sa.sin_addr.s_addr);
201 printf(
"[DEBUG] Initialization finished\n");
207 int* DeSerializerPrePCModule::recvData(
int* delete_flag,
int* total_buf_nwords,
int* num_events_in_sendblock,
208 int* num_nodes_in_sendblock)
211 int* temp_buf = NULL;
214 vector <int> each_buf_nwords;
215 each_buf_nwords.clear();
216 vector <int> each_buf_nodes;
217 each_buf_nodes.clear();
218 vector <int> each_buf_events;
219 each_buf_events.clear();
221 *total_buf_nwords = 0;
222 *num_nodes_in_sendblock = 0;
223 *num_events_in_sendblock = 0;
228 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
229 int temp_num_events = 0;
230 int temp_num_nodes = 0;
233 for (
int i = 0; i < (int)(m_socket.size()); i++) {
235 recvFD(m_socket[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
241 temp_num_nodes = send_hdr.GetNumNodesinPacket();
246 *num_events_in_sendblock = temp_num_events;
247 }
else if (*num_events_in_sendblock != temp_num_events) {
248 #ifndef NO_DATA_CHECK
251 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
252 *num_events_in_sendblock , temp_num_events, i, *num_nodes_in_sendblock , temp_num_nodes, i);
253 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
259 *num_nodes_in_sendblock += temp_num_nodes;
261 int rawblk_nwords = send_hdr.GetTotalNwords()
262 - SendHeader::SENDHDR_NWORDS
263 - SendTrailer::SENDTRL_NWORDS;
264 *total_buf_nwords += rawblk_nwords;
269 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
270 printf(
"[DEBUG] *******HDR**********\n");
271 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
273 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
274 send_hdr.GetTotalNwords());
275 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
281 each_buf_nwords.push_back(rawblk_nwords);
282 each_buf_events.push_back(temp_num_events);
283 each_buf_nodes.push_back(temp_num_nodes);
288 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag);
292 int total_recvd_byte = 0;
293 for (
int i = 0; i < (int)(m_socket.size()); i++) {
294 total_recvd_byte += recvFD(m_socket[ i ], (
char*)temp_buf + total_recvd_byte,
295 each_buf_nwords[ i ] *
sizeof(
int), flag);
301 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
302 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
303 temp_length += this_length *
sizeof(int);
305 if (temp_length != each_buf_nwords[ i ] *
sizeof(
int)) {
306 printf(
"[DEBUG]*******SENDHDR*********** \n");
307 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
308 printf(
"[DEBUG]*******BODY***********\n ");
309 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
311 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
312 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
313 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
320 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
322 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
323 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
324 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
330 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
331 for (
int i = 0; i < (int)(m_socket.size()); i++) {
332 recvFD(m_socket[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
339 void DeSerializerPrePCModule::setRecvdBuffer(
RawDataBlock* temp_raw_datablk,
int* delete_flag)
344 int total_buf_nwords = 0 ;
345 int num_events_in_sendblock = 0;
346 int num_nodes_in_sendblock = 0;
348 if (m_start_flag == 0) B2INFO(
"DeSerializerPrePC: Reading the 1st packet from eb0...");
349 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
350 &num_nodes_in_sendblock);
351 if (m_start_flag == 0) {
352 B2INFO(
"DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords <<
" words");
355 m_totbytes += total_buf_nwords *
sizeof(int);
357 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, *delete_flag,
358 num_events_in_sendblock, num_nodes_in_sendblock);
364 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
367 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
368 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
369 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
379 void DeSerializerPrePCModule::checkData(
RawDataBlock* raw_datablk,
unsigned int* eve_copper_0)
381 int data_size_copper_0 = -1;
382 int data_size_copper_1 = -1;
387 int* temp_buf = raw_datablk->
GetBuffer(0);
389 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
390 unsigned int eve_array[32];
391 unsigned int utime_array[32];
392 unsigned int ctime_type_array[32];
395 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
399 memset(eve_array, 0,
sizeof(eve_array));
400 memset(utime_array, 0,
sizeof(utime_array));
401 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
403 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
404 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
405 int entry_id = l + k * num_nodes_in_sendblock;
415 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
416 printf(
"[DEBUG] ######FTSW#########\n");
423 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
427 #ifndef NO_DATA_CHECK
429 temp_rawftsw->
CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
430 eve_array[ entry_id ] = cur_evenum;
431 }
catch (
string err_str) {
432 print_err.PrintError(m_shmflag, &g_status, err_str);
436 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
443 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
450 printf(
"[DEBUG] ######TLU#########\n");
454 #ifndef NO_DATA_CHECK
456 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
457 eve_array[ entry_id ] = cur_evenum;
458 }
catch (
string err_str) {
459 print_err.PrintError(m_shmflag, &g_status, err_str);
478 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
479 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
480 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
485 #ifndef NO_DATA_CHECK
488 temp_rawcopper->
CheckData(0, m_prev_evenum, &cur_evenum,
489 m_prev_copper_ctr, &cur_copper_ctr,
490 m_prev_exprunsubrun_no, &m_exprunsubrun_no);
491 eve_array[ entry_id ] = cur_evenum;
492 }
catch (
string err_str) {
493 print_err.PrintError(m_shmflag, &g_status, err_str);
498 utime_array[ entry_id ] = temp_rawcopper->
GetTTUtime(0);
503 *eve_copper_0 = (raw_datablk->
GetBuffer(entry_id))[ 3 ];
504 }
else if (cpr_num == 1) {
508 delete temp_rawcopper;
515 #ifndef NO_DATA_CHECK
517 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
518 if (eve_array[ 0 ] != eve_array[ l ] ||
519 utime_array[ 0 ] != utime_array[ l ] ||
520 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
522 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
523 printf(
"[DEBUG] node %d eve # %d utime %x ctime %x\n",
524 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
526 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
527 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
543 m_prev_evenum = cur_evenum;
544 m_prev_copper_ctr = cur_copper_ctr;
545 m_prev_exprunsubrun_no = m_exprunsubrun_no;
552 void DeSerializerPrePCModule::event()
556 unsigned int eve_copper_0 = 0;
560 if (m_start_flag == 0) {
565 if (g_status.isAvailable()) {
566 B2INFO(
"DeSerializerPrePC: Waiting for Start...\n");
567 g_status.reportRunning();
569 m_start_time = getTimeSec();
579 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
585 int delete_flag_from = 0, delete_flag_to = 0;
587 setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
589 checkData(&temp_rawdatablk, &eve_copper_0);
596 int temp_num_events, temp_num_nodes;
599 #ifdef REDUCED_RAWCOPPER
608 int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
609 buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
612 m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
613 if (calced_temp_nwords_to != temp_nwords_to) {
616 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
617 calced_temp_nwords_to, temp_nwords_to);
618 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
627 delete_flag_to = delete_flag_from;
628 delete_flag_from = 0;
635 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
639 raw_datablk->
SetBuffer(buf_to, temp_nwords_to, delete_flag_to,
640 temp_num_events, temp_num_nodes);
646 #ifdef REDUCED_RAWCOPPER
648 post_rawcopper_latest.
SetBuffer(buf_to, temp_nwords_to,
649 0, temp_num_events, temp_num_nodes);
651 for (
int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
654 post_rawcopper_latest.
CheckCRC16(block_num, i_finesse_num);
666 m_eventMetaDataPtr.create();
667 m_eventMetaDataPtr->setExperiment(1);
668 m_eventMetaDataPtr->setRun(1);
669 m_eventMetaDataPtr->setEvent(n_basf2evt);
689 if (max_nevt >= 0 || max_seconds >= 0.) {
690 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
691 || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
692 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
693 max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
694 m_eventMetaDataPtr->setEndOfData();
698 if (n_basf2evt % 20000 == 0 || n_basf2evt < 10) {
699 RateMonitor(eve_copper_0);
704 if (g_status.isAvailable()) {
705 g_status.setInputNBytes(m_totbytes);
706 g_status.setInputCount(n_basf2evt);