8#include <daq/rawdata/DesSerPrePC.h>
9#include <rawdata/dataobjects/RawFTSWFormat_latest.h>
10#include <rawdata/dataobjects/RawTLUFormat.h>
15#include <netinet/tcp.h>
17#include <sys/socket.h>
29 const std::string& nodename,
int )
37 m_socket_recv.push_back(-1);
49 printf(
"[INFO] DeSerializerPrePC: Constructor done.\n"); fflush(stdout);
54DesSerPrePC::~DesSerPrePC()
65 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n, flag)) < 0) {
68 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
71 callCheckRunPause(err_str);
77 sprintf(err_buf,
"recv() returned error; ret = %d. : %s %s %d",
78 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
82 printf(
"[WARNING] %s\n", err_buf); fflush(stdout);
83 string err_str =
"RUN_ERROR";
84 printf(
"AIUEO********************\n"); fflush(stdout);
87 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
90 }
else if (read_size == 0) {
93 sprintf(err_buf,
"[WARNING] Connection is closed by peer(%s). readsize = %d %d. : %s %s %d",
94 strerror(errno), read_size, errno, __FILE__, __PRETTY_FUNCTION__, __LINE__);
98 printf(
"%s\n", err_buf); fflush(stdout);
99 string err_str =
"RUN_ERROR";
102 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
107 if (n == data_size_byte)
break;
119 if (m_socket_recv[ i ] >= 0)
continue;
124 struct hostent* host;
128 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...",
m_hostname_from[ i ].c_str(),
130 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
135 struct sockaddr_in socPC;
136 socPC.sin_family = AF_INET;
137 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
139 int sd = socket(PF_INET, SOCK_STREAM, 0);
141 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
143 struct timeval timeout;
146 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)
sizeof(timeout));
151 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
152 perror(
"Failed to connect. Retrying...");
156 printf(
"[DEBUG] Done\n"); fflush(stdout);
161 m_socket_recv[ i ] = sd;
166 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
168 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
170 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
172 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
177 memset(&sa, 0,
sizeof(sockaddr_in));
178 socklen_t sa_len =
sizeof(sa);
179 if (getsockname(m_socket_recv[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
180 m_status.setInputPort(ntohs(sa.sin_port));
181 m_status.setInputAddress(sa.sin_addr.s_addr);
187 printf(
"[DEBUG] Initialization finished\n"); fflush(stdout);
193int*
DesSerPrePC::recvData(
int* delete_flag,
int* total_buf_nwords,
int* num_events_in_sendblock,
int* num_nodes_in_sendblock)
195 int* temp_buf = NULL;
198 vector <int> each_buf_nwords;
199 each_buf_nwords.clear();
200 vector <int> each_buf_nodes;
201 each_buf_nodes.clear();
202 vector <int> each_buf_events;
203 each_buf_events.clear();
205 *total_buf_nwords = 0;
206 *num_nodes_in_sendblock = 0;
207 *num_events_in_sendblock = 0;
212 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
215 for (
int i = 0; i < (int)(m_socket_recv.size()); i++) {
217 recvFD(m_socket_recv[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
223 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
226 *num_events_in_sendblock = temp_num_events;
227 }
else if (*num_events_in_sendblock != temp_num_events) {
231 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
232 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
233 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
239 *num_nodes_in_sendblock += temp_num_nodes;
241 int rawblk_nwords = send_hdr.GetTotalNwords()
242 - SendHeader::SENDHDR_NWORDS
243 - SendTrailer::SENDTRL_NWORDS;
244 *total_buf_nwords += rawblk_nwords;
249 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
250 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
252 sprintf(err_buf,
"CORRUPTED DATA: Too large event : Header %d %d %d %d :block size %d words\n", i, temp_num_events, temp_num_nodes,
253 send_hdr.GetTotalNwords(), rawblk_nwords);
254 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
260 each_buf_nwords.push_back(rawblk_nwords);
261 each_buf_events.push_back(temp_num_events);
262 each_buf_nodes.push_back(temp_num_nodes);
267 temp_buf =
getNewBuffer(*total_buf_nwords, delete_flag);
271 int total_recvd_byte = 0;
272 for (
int i = 0; i < (int)(m_socket_recv.size()); i++) {
275 total_recvd_byte +=
recvFD(m_socket_recv[ i ], (
char*)temp_buf + total_recvd_byte,
276 each_buf_nwords[ i ] *
sizeof(
int), flag);
277 }
catch (
const string& err_str) {
280 printf(
"[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
283 throw (std::move(err_str));
289 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
290 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
291 temp_length += this_length *
sizeof(int);
293 if (temp_length != (
int)(each_buf_nwords[ i ] *
sizeof(int))) {
294 printf(
"[DEBUG]*******SENDHDR*********** \n");
295 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
296 printf(
"[DEBUG]*******BODY***********\n ");
297 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
299 sprintf(err_buf,
"CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
300 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
301 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
308 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
310 sprintf(err_buf,
"CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
311 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
312 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
318 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
319 for (
int i = 0; i < (int)(m_socket_recv.size()); i++) {
321 recvFD(m_socket_recv[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
322 }
catch (
const string& err_str) {
325 printf(
"[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
328 throw (std::move(err_str));
341 int total_buf_nwords = 0 ;
342 int num_events_in_sendblock = 0;
343 int num_nodes_in_sendblock = 0;
347 printf(
"DeSerializerPrePC: Reading the 1st packet from eb0...\n"); fflush(stdout);
349 int* temp_buf =
recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
350 &num_nodes_in_sendblock);
353 printf(
"DeSerializerPrePC: Done. the size of the 1st packet %d words\n", total_buf_nwords); fflush(stdout);
356 m_recvd_totbytes += total_buf_nwords *
sizeof(int);
358 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, *delete_flag,
359 num_events_in_sendblock, num_nodes_in_sendblock);
365 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
368 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
369 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
370 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
389 int* temp_buf = raw_datablk->
GetBuffer(0);
391 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
392 unsigned int eve_array[32];
393 unsigned int utime_array[32];
394 unsigned int ctime_type_array[32];
397 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
401 memset(eve_array, 0,
sizeof(eve_array));
402 memset(utime_array, 0,
sizeof(utime_array));
403 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
405 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
406 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
407 int entry_id = l + k * num_nodes_in_sendblock;
417 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
418 printf(
"[DEBUG] ######FTSW#########\n");
425 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
432 eve_array[ entry_id ] = cur_evenum;
433 }
catch (
const string& err_str) {
435 strcpy(err_buf, err_str.c_str());
436 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
440 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
447 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
454 printf(
"[DEBUG] ######TLU#########\n");
460 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
461 eve_array[ entry_id ] = cur_evenum;
462 }
catch (
const string& err_str) {
464 strcpy(err_buf, err_str.c_str());
465 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
480 "do not use the following for actual DAQ"
481 (pre_rawcpr_fmt->
GetBuffer(block_id))[ RawHeader_v2::POS_EXP_RUN_NO ] = exp_run_ftsw;
482 (pre_rawcpr_fmt->
GetBuffer(block_id))[ RawHeader_v2::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
483 (pre_rawcpr_fmt->
GetBuffer(block_id))[ RawHeader_v2::POS_TTUTIME ] = utime_ftsw;
488 pre_rawcpr_fmt->
CheckData(0, m_prev_evenum, &cur_evenum,
489 m_prev_copper_ctr, &cur_copper_ctr,
491 eve_array[ entry_id ] = cur_evenum;
492 }
catch (
const string& err_str) {
497 utime_array[ entry_id ] = pre_rawcpr_fmt->
GetTTUtime(0);
502 *eve_copper_0 = (raw_datablk->
GetBuffer(entry_id))[ 3 ];
503 }
else if (cpr_num == 1) {
507 delete pre_rawcpr_fmt;
513 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
514 if (eve_array[ 0 ] != eve_array[ l ] ||
515 utime_array[ 0 ] != utime_array[ l ] ||
516 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
518 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
519 printf(
"[DEBUG] node %d eve # %x utime %x ctime %x\n",
520 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
522 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
523 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
539 m_prev_evenum = cur_evenum;
540 m_prev_copper_ctr = cur_copper_ctr;
551 unsigned int eve_copper_0 = 0;
553 printf(
"[DEBUG] initializing...\n"); fflush(stdout);
557 printf(
"[DEBUG] Done.\n"); fflush(stdout);
566 printf(
"[DEBUG] DeSerializerPrePC: Waiting for Start...\n"); fflush(stdout);
593 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
598 int delete_flag_from =
605 checkData(&temp_rawdatablk, &eve_copper_0);
606 }
catch (
const string& err_str) {
607 printf(
"Error was detected\n"); fflush(stdout);
615 int temp_num_events, temp_num_nodes;
618#ifdef REDUCED_RAWCOPPER
628 buf_to =
getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
632 if (calced_temp_nwords_to != temp_nwords_to) {
635 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
636 calced_temp_nwords_to, temp_nwords_to);
637 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
646 delete_flag_to = delete_flag_from;
647 delete_flag_from = 0;
656 raw_datablk[ j ].
SetBuffer(buf_to, temp_nwords_to, delete_flag_to, temp_num_events, temp_num_nodes);
662#ifdef REDUCED_RAWCOPPER
667 post_rawcopper_v2.
SetBuffer(raw_datablk[ j ].GetWholeBuffer(), raw_datablk[ j ].TotalBufNwords(),
668 0, raw_datablk[ j ].GetNumEvents(), raw_datablk[ j ].GetNumNodes());
670 for (
int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
673 post_rawcopper_v2.
CheckCRC16(block_num, i_finesse_num);
696 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
703 printf(
"[DEBUG] SerializerPC: Sending the 1st packet...\n"); fflush(stdout);
707 m_sent_totbytes += sendByWriteV(&(raw_datablk[ j ]));
708 }
catch (
const string& err_str) {
712 print_err.PrintError((
char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
717 printf(
"[DEBUG] Done.\n"); fflush(stdout);
735 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
741 if ((
n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC) % 100000 == 0) {
742 double interval =
cur_time - m_prev_time;
743 double total_time =
cur_time - m_start_time;
744 printf(
"[DEBUG] Event %12d Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] sent %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s]\n",
747 (m_recvd_totbytes - m_recvd_prev_totbytes) / interval / 1.e6,
748 (m_sent_totbytes - m_sent_prev_totbytes) / interval / 1.e6,
754 m_recvd_prev_totbytes = m_recvd_totbytes;
755 m_sent_prev_totbytes = m_sent_totbytes;
763 m_status.setOutputNBytes(m_sent_totbytes);
778void DesSerPrePC::waitResume()
782 if (checkRunPause())
break;
785 printf(
"###########(DesSerPrePC) Waiting for Runstop() ###############\n");
796 printf(
"###########(Ser) Waiting for Resume ###############\n");
800 if (checkRunRecovery()) {
808 printf(
"Done!\n"); fflush(stdout);
810 printf(
"Checking connection to eb0\n"); fflush(stdout);
812 printf(
"Trying Accept1\n"); fflush(stdout);
814 printf(
"Trying Accept2\n"); fflush(stdout);
817 printf(
"Checking connection to COPPER\n"); fflush(stdout);
819 if (CheckConnection(m_socket_recv[ i ]) < 0) m_socket_recv[ i ] = -1;
int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
DesSerPrePC(std::string host_recv, int port_recv, const std::string &host_send, int port_send, int shmflag, const std::string &nodename, int nodeid)
Constructor / Destructor.
void DataAcquisition() override
Module functions to be called from event process.
int Connect()
Accept connection.
void setRecvdBuffer(RawDataBlockFormat *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
void checkData(RawDataBlockFormat *raw_datablk, unsigned int *eve_copper_0)
check data contents
int m_port_to
Destination port.
RunInfoBuffer m_status
Run info buffer.
std::string m_hostname_local
Destination Host.
unsigned int m_prev_exprunsubrun_no
run no.
std::string m_nodename
Node Name for SLC.
int m_start_flag
start flag
std::vector< int > m_port_from
port # to connect data sources
int n_basf2evt
No. of sent events.
int m_socket_send
Reciever Socket.
PreRawCOPPERFormat_v2 m_pre_rawcpr
report status to SLC
CprErrorMessage print_err
wrapper for B2LOG system
int m_run_pause
flag to show that run-controller pauses a run
void printData(int *buf, int nwords)
dump error data
int m_run_error
flag to show that there is some errors with which DAQ cannot continue.
int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
double cur_time
for time monitoring
int m_shmflag
Use shared memory.
int m_prev_nevt
No. of prev sent events.
int m_num_connections
check data contents
unsigned int m_exprunsubrun_no
run no.
double max_seconds
time to stop a run
double getTimeSec()
store time info.
std::vector< std::string > m_hostname_from
Reciever basf2 Socket.
The Raw COPPER class ver.2 This class stores data received by COPPER via belle2link Data from all det...
int CheckCRC16(int n, int finesse_num)
check magic words
int GetFINESSENwords(int n, int finesse) OVERRIDE_CPP17
get data size of FINESSE buffer
unsigned int GetExpRunSubrun(int n) OVERRIDE_CPP17
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
Check if COPPER Magic words are correct.
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
get b2l block from "FEE b2link header"
Abstract base class for different kinds of events.