9 #include <daq/rawdata/modules/DeSerializerPrePC.h>
10 #include <daq/dataobjects/SendHeader.h>
11 #include <daq/dataobjects/SendTrailer.h>
12 #include <rawdata/dataobjects/RawTLU.h>
14 #include <netinet/tcp.h>
38 setDescription(
"Encode DataStore into RingBuffer");
40 addParam(
"NumConn", m_num_connections,
"Number of Connections", 0);
41 addParam(
"HostNameFrom", m_hostname_from,
"Hostnames of data sources");
42 addParam(
"PortFrom", m_port_from,
"port numbers of data sources");
44 B2INFO(
"DeSerializerPrePC: Constructor done.");
49 DeSerializerPrePCModule::~DeSerializerPrePCModule()
55 void DeSerializerPrePCModule::initialize()
57 B2INFO(
"DeSerializerPrePC: initialize() started.");
60 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
61 m_bufary[i] =
new int[ BUF_SIZE_WORD ];
63 m_buffer =
new int[ BUF_SIZE_WORD ];
67 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
68 memset(m_bufary[i], 0, BUF_SIZE_WORD *
sizeof(
int));
72 m_eventMetaDataPtr.registerInDataStore();
74 raw_datablkarray.registerInDataStore();
75 rawcprarray.registerInDataStore();
76 raw_ftswarray.registerInDataStore();
80 if (m_dump_fname.size() > 0) {
85 memset(time_array0, 0,
sizeof(time_array0));
86 memset(time_array1, 0,
sizeof(time_array1));
87 memset(time_array2, 0,
sizeof(time_array2));
93 if (m_nodename.size() == 0 || m_nodeid < 0) {
96 g_status.open(m_nodename, m_nodeid);
97 g_status.reportReady();
103 m_prev_copper_ctr = 0xFFFFFFFF;
104 m_prev_evenum = 0xFFFFFFFF;
106 B2INFO(
"DeSerializerPrePC: initialize() done.");
110 int DeSerializerPrePCModule::recvFD(
int sock,
char* buf,
int data_size_byte,
int flag)
115 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n , flag)) < 0) {
116 if (errno == EINTR) {
120 sprintf(err_buf,
"[FATAL] Failed to receive data(%s). Exiting...", strerror(errno));
121 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
127 if (n == data_size_byte)
break;
133 int DeSerializerPrePCModule::Connect()
135 for (
int i = 0; i < m_num_connections; i++) {
139 struct sockaddr_in socPC;
140 socPC.sin_family = AF_INET;
142 struct hostent* host;
143 host = gethostbyname(m_hostname_from[ i ].c_str());
146 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
148 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
152 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
153 socPC.sin_port = htons(m_port_from[ i ]);
154 int sd = socket(PF_INET, SOCK_STREAM, 0);
157 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
159 printf(
"[DEBUG] Connecting to %s port %d ...\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]);
161 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
162 perror(
"Failed to connect. Retrying...");
165 printf(
"[DEBUG] Done\n");
169 m_socket.push_back(sd);
174 getsockopt(m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
176 printf(
"[DEBUG] SO_RCVBUF %d\n", val);
178 getsockopt(m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
180 printf(
"[DEBUG] SO_SNDBUF %d\n", val);
182 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
184 printf(
"[DEBUG] TCP_MAXSEG %d\n", val);
186 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
188 printf(
"[DEBUG] TCP_NODELAY %d\n", val);
190 if (g_status.isAvailable()) {
192 memset(&sa, 0,
sizeof(sockaddr_in));
193 socklen_t sa_len =
sizeof(sa);
194 if (getsockname(m_socket[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
195 g_status.setInputPort(ntohs(sa.sin_port));
196 g_status.setInputAddress(sa.sin_addr.s_addr);
201 printf(
"[DEBUG] Initialization finished\n");
207 int* DeSerializerPrePCModule::recvData(
int* delete_flag,
int* total_buf_nwords,
int* num_events_in_sendblock,
208 int* num_nodes_in_sendblock)
211 int* temp_buf = NULL;
214 vector <int> each_buf_nwords;
215 each_buf_nwords.clear();
216 vector <int> each_buf_nodes;
217 each_buf_nodes.clear();
218 vector <int> each_buf_events;
219 each_buf_events.clear();
221 *total_buf_nwords = 0;
222 *num_nodes_in_sendblock = 0;
223 *num_events_in_sendblock = 0;
228 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
229 int temp_num_events = 0;
230 int temp_num_nodes = 0;
233 for (
int i = 0; i < (int)(m_socket.size()); i++) {
235 recvFD(m_socket[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
241 temp_num_nodes = send_hdr.GetNumNodesinPacket();
246 *num_events_in_sendblock = temp_num_events;
247 }
else if (*num_events_in_sendblock != temp_num_events) {
248 #ifndef NO_DATA_CHECK
251 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
252 *num_events_in_sendblock , temp_num_events, i, *num_nodes_in_sendblock , temp_num_nodes, i);
253 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
259 *num_nodes_in_sendblock += temp_num_nodes;
261 int rawblk_nwords = send_hdr.GetTotalNwords()
262 - SendHeader::SENDHDR_NWORDS
263 - SendTrailer::SENDTRL_NWORDS;
264 *total_buf_nwords += rawblk_nwords;
269 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
270 printf(
"[DEBUG] *******HDR**********\n");
271 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
273 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
274 send_hdr.GetTotalNwords());
275 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
281 each_buf_nwords.push_back(rawblk_nwords);
282 each_buf_events.push_back(temp_num_events);
283 each_buf_nodes.push_back(temp_num_nodes);
288 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag);
292 int total_recvd_byte = 0;
293 for (
int i = 0; i < (int)(m_socket.size()); i++) {
294 total_recvd_byte += recvFD(m_socket[ i ], (
char*)temp_buf + total_recvd_byte,
295 each_buf_nwords[ i ] *
sizeof(
int), flag);
301 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
302 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
303 temp_length += this_length *
sizeof(int);
305 if (temp_length != each_buf_nwords[ i ] *
sizeof(
int)) {
306 printf(
"[DEBUG]*******SENDHDR*********** \n");
307 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
308 printf(
"[DEBUG]*******BODY***********\n ");
309 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
311 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
312 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
313 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
320 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
322 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
323 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
324 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
330 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
331 for (
int i = 0; i < (int)(m_socket.size()); i++) {
332 recvFD(m_socket[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
339 void DeSerializerPrePCModule::setRecvdBuffer(
RawDataBlock* temp_raw_datablk,
int* delete_flag)
344 int total_buf_nwords = 0 ;
345 int num_events_in_sendblock = 0;
346 int num_nodes_in_sendblock = 0;
348 if (m_start_flag == 0) B2INFO(
"DeSerializerPrePC: Reading the 1st packet from eb0...");
349 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
350 &num_nodes_in_sendblock);
351 if (m_start_flag == 0) {
352 B2INFO(
"DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords <<
" words");
355 m_totbytes += total_buf_nwords *
sizeof(int);
357 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, *delete_flag,
358 num_events_in_sendblock, num_nodes_in_sendblock);
364 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
367 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
368 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
369 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
379 void DeSerializerPrePCModule::checkData(
RawDataBlock* raw_datablk,
unsigned int* eve_copper_0)
381 int data_size_copper_0 = -1;
382 int data_size_copper_1 = -1;
387 int* temp_buf = raw_datablk->
GetBuffer(0);
389 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
390 unsigned int eve_array[32];
391 unsigned int utime_array[32];
392 unsigned int ctime_type_array[32];
395 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
399 memset(eve_array, 0,
sizeof(eve_array));
400 memset(utime_array, 0,
sizeof(utime_array));
401 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
403 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
404 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
405 int entry_id = l + k * num_nodes_in_sendblock;
415 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
416 printf(
"[DEBUG] ######FTSW#########\n");
423 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
427 #ifndef NO_DATA_CHECK
429 temp_rawftsw->
CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
430 eve_array[ entry_id ] = cur_evenum;
431 }
catch (
string err_str) {
432 print_err.PrintError(m_shmflag, &g_status, err_str);
436 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
443 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
450 printf(
"[DEBUG] ######TLU#########\n");
454 #ifndef NO_DATA_CHECK
456 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
457 eve_array[ entry_id ] = cur_evenum;
458 }
catch (
string err_str) {
459 print_err.PrintError(m_shmflag, &g_status, err_str);
478 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
479 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
480 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
485 #ifndef NO_DATA_CHECK
488 temp_rawcopper->
CheckData(0, m_prev_evenum, &cur_evenum,
489 m_prev_copper_ctr, &cur_copper_ctr,
490 m_prev_exprunsubrun_no, &m_exprunsubrun_no);
491 eve_array[ entry_id ] = cur_evenum;
492 }
catch (
string err_str) {
493 print_err.PrintError(m_shmflag, &g_status, err_str);
498 utime_array[ entry_id ] = temp_rawcopper->
GetTTUtime(0);
503 *eve_copper_0 = (raw_datablk->
GetBuffer(entry_id))[ 3 ];
504 }
else if (cpr_num == 1) {
508 delete temp_rawcopper;
515 #ifndef NO_DATA_CHECK
517 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
518 if (eve_array[ 0 ] != eve_array[ l ] ||
519 utime_array[ 0 ] != utime_array[ l ] ||
520 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
522 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
523 printf(
"[DEBUG] node %d eve # %d utime %x ctime %x\n",
524 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
526 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
527 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
543 m_prev_evenum = cur_evenum;
544 m_prev_copper_ctr = cur_copper_ctr;
545 m_prev_exprunsubrun_no = m_exprunsubrun_no;
552 void DeSerializerPrePCModule::event()
556 unsigned int eve_copper_0 = 0;
560 if (m_start_flag == 0) {
565 if (g_status.isAvailable()) {
566 B2INFO(
"DeSerializerPrePC: Waiting for Start...\n");
567 g_status.reportRunning();
569 m_start_time = getTimeSec();
579 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
585 int delete_flag_from = 0, delete_flag_to = 0;
587 setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
589 checkData(&temp_rawdatablk, &eve_copper_0);
596 int temp_num_events, temp_num_nodes;
599 #ifdef REDUCED_RAWCOPPER
608 int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
609 buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
612 m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
613 if (calced_temp_nwords_to != temp_nwords_to) {
616 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
617 calced_temp_nwords_to, temp_nwords_to);
618 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
627 delete_flag_to = delete_flag_from;
628 delete_flag_from = 0;
635 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
639 raw_datablk->
SetBuffer(buf_to, temp_nwords_to, delete_flag_to,
640 temp_num_events, temp_num_nodes);
646 #ifdef REDUCED_RAWCOPPER
648 post_rawcopper_latest.
SetBuffer(buf_to, temp_nwords_to,
649 0, temp_num_events, temp_num_nodes);
651 for (
int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
654 post_rawcopper_latest.
CheckCRC16(block_num, i_finesse_num);
666 m_eventMetaDataPtr.create();
667 m_eventMetaDataPtr->setExperiment(1);
668 m_eventMetaDataPtr->setRun(1);
669 m_eventMetaDataPtr->setEvent(n_basf2evt);
689 if (max_nevt >= 0 || max_seconds >= 0.) {
690 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
691 || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
692 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
693 max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
694 m_eventMetaDataPtr->setEndOfData();
698 if (n_basf2evt % 20000 == 0 || n_basf2evt < 10) {
699 RateMonitor(eve_copper_0);
704 if (g_status.isAvailable()) {
705 g_status.setInputNBytes(m_totbytes);
706 g_status.setInputCount(n_basf2evt);
A class definition of an input module for Sequential ROOT I/O.
A class definition of an input module for Sequential ROOT I/O.
The Raw COPPER class ver.
int CheckCRC16(int n, int finesse_num)
check magic words
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
virtual int GetBufferPos(int n)
get position of data block in word
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBlockNwords(int n)
get size of a data block
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetEveNo(int n)
Get event #.
unsigned int GetTTUtime(int n)
get unixtime of the trigger
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
unsigned int GetEveNo(int n)
Get Event #.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
int GetFINESSENwords(int n, int finesse_num) OVERRIDE_CPP17
Get the size of a finesse buffer.
Abstract base class for different kinds of events.