9 #include <daq/rawdata/modules/DeSerializerPrePC.h>
10 #include <daq/dataobjects/SendHeader.h>
11 #include <daq/dataobjects/SendTrailer.h>
12 #include <rawdata/dataobjects/RawTLU.h>
14 #include <netinet/tcp.h>
38 setDescription(
"Encode DataStore into RingBuffer");
40 addParam(
"NumConn", m_num_connections,
"Number of Connections", 0);
41 addParam(
"HostNameFrom", m_hostname_from,
"Hostnames of data sources");
42 addParam(
"PortFrom", m_port_from,
"port numbers of data sources");
44 B2INFO(
"DeSerializerPrePC: Constructor done.");
49 DeSerializerPrePCModule::~DeSerializerPrePCModule()
55 void DeSerializerPrePCModule::initialize()
57 B2INFO(
"DeSerializerPrePC: initialize() started.");
60 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
61 m_bufary[i] =
new int[ BUF_SIZE_WORD ];
63 m_buffer =
new int[ BUF_SIZE_WORD ];
67 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
68 memset(m_bufary[i], 0, BUF_SIZE_WORD *
sizeof(
int));
72 m_eventMetaDataPtr.registerInDataStore();
74 raw_datablkarray.registerInDataStore();
75 rawcprarray.registerInDataStore();
76 raw_ftswarray.registerInDataStore();
80 if (m_dump_fname.size() > 0) {
85 memset(time_array0, 0,
sizeof(time_array0));
86 memset(time_array1, 0,
sizeof(time_array1));
87 memset(time_array2, 0,
sizeof(time_array2));
93 if (m_nodename.size() == 0 || m_nodeid < 0) {
96 g_status.open(m_nodename, m_nodeid);
97 g_status.reportReady();
103 m_prev_copper_ctr = 0xFFFFFFFF;
104 m_prev_evenum = 0xFFFFFFFF;
106 B2INFO(
"DeSerializerPrePC: initialize() done.");
110 int DeSerializerPrePCModule::recvFD(
int sock,
char* buf,
int data_size_byte,
int flag)
115 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n, flag)) < 0) {
116 if (errno == EINTR) {
120 sprintf(err_buf,
"[FATAL] Failed to receive data(%s). Exiting...", strerror(errno));
121 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
127 if (n == data_size_byte)
break;
133 int DeSerializerPrePCModule::Connect()
135 for (
int i = 0; i < m_num_connections; i++) {
139 struct sockaddr_in socPC;
140 socPC.sin_family = AF_INET;
142 struct hostent* host;
143 host = gethostbyname(m_hostname_from[ i ].c_str());
146 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
148 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
152 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
153 socPC.sin_port = htons(m_port_from[ i ]);
154 int sd = socket(PF_INET, SOCK_STREAM, 0);
157 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
159 printf(
"[DEBUG] Connecting to %s port %d ...\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]);
161 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
162 perror(
"Failed to connect. Retrying...");
165 printf(
"[DEBUG] Done\n");
169 m_socket.push_back(sd);
174 getsockopt(m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
176 printf(
"[DEBUG] SO_RCVBUF %d\n", val);
178 getsockopt(m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
180 printf(
"[DEBUG] SO_SNDBUF %d\n", val);
182 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
184 printf(
"[DEBUG] TCP_MAXSEG %d\n", val);
186 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
188 printf(
"[DEBUG] TCP_NODELAY %d\n", val);
190 if (g_status.isAvailable()) {
192 memset(&sa, 0,
sizeof(sockaddr_in));
193 socklen_t sa_len =
sizeof(sa);
194 if (getsockname(m_socket[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
195 g_status.setInputPort(ntohs(sa.sin_port));
196 g_status.setInputAddress(sa.sin_addr.s_addr);
201 printf(
"[DEBUG] Initialization finished\n");
207 int* DeSerializerPrePCModule::recvData(
int* delete_flag,
int* total_buf_nwords,
int* num_events_in_sendblock,
208 int* num_nodes_in_sendblock)
211 int* temp_buf = NULL;
214 vector <int> each_buf_nwords;
215 each_buf_nwords.clear();
216 vector <int> each_buf_nodes;
217 each_buf_nodes.clear();
218 vector <int> each_buf_events;
219 each_buf_events.clear();
221 *total_buf_nwords = 0;
222 *num_nodes_in_sendblock = 0;
223 *num_events_in_sendblock = 0;
228 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
231 for (
int i = 0; i < (int)(m_socket.size()); i++) {
233 recvFD(m_socket[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
239 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
244 *num_events_in_sendblock = temp_num_events;
245 }
else if (*num_events_in_sendblock != temp_num_events) {
246 #ifndef NO_DATA_CHECK
249 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
250 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
251 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
257 *num_nodes_in_sendblock += temp_num_nodes;
259 int rawblk_nwords = send_hdr.GetTotalNwords()
260 - SendHeader::SENDHDR_NWORDS
261 - SendTrailer::SENDTRL_NWORDS;
262 *total_buf_nwords += rawblk_nwords;
267 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
268 printf(
"[DEBUG] *******HDR**********\n");
269 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
271 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
272 send_hdr.GetTotalNwords());
273 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
279 each_buf_nwords.push_back(rawblk_nwords);
280 each_buf_events.push_back(temp_num_events);
281 each_buf_nodes.push_back(temp_num_nodes);
286 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag);
290 int total_recvd_byte = 0;
291 for (
int i = 0; i < (int)(m_socket.size()); i++) {
292 total_recvd_byte += recvFD(m_socket[ i ], (
char*)temp_buf + total_recvd_byte,
293 each_buf_nwords[ i ] *
sizeof(
int), flag);
298 unsigned temp_length = 0;
299 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
300 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
301 temp_length += this_length *
sizeof(int);
303 if (temp_length != each_buf_nwords[ i ] *
sizeof(
int)) {
304 printf(
"[DEBUG]*******SENDHDR*********** \n");
305 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
306 printf(
"[DEBUG]*******BODY***********\n ");
307 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
309 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
310 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
311 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
318 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
320 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
321 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
322 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
328 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
329 for (
int i = 0; i < (int)(m_socket.size()); i++) {
330 recvFD(m_socket[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
337 void DeSerializerPrePCModule::setRecvdBuffer(
RawDataBlock* temp_raw_datablk,
int* delete_flag)
342 int total_buf_nwords = 0 ;
343 int num_events_in_sendblock = 0;
344 int num_nodes_in_sendblock = 0;
346 if (m_start_flag == 0) B2INFO(
"DeSerializerPrePC: Reading the 1st packet from eb0...");
347 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
348 &num_nodes_in_sendblock);
349 if (m_start_flag == 0) {
350 B2INFO(
"DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords <<
" words");
353 m_totbytes += total_buf_nwords *
sizeof(int);
355 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, *delete_flag,
356 num_events_in_sendblock, num_nodes_in_sendblock);
362 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
365 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
366 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
367 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
377 void DeSerializerPrePCModule::checkData(
RawDataBlock* raw_datablk,
unsigned int* eve_copper_0)
382 int* temp_buf = raw_datablk->
GetBuffer(0);
384 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
385 unsigned int eve_array[32];
386 unsigned int utime_array[32];
387 unsigned int ctime_type_array[32];
390 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
394 memset(eve_array, 0,
sizeof(eve_array));
395 memset(utime_array, 0,
sizeof(utime_array));
396 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
398 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
399 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
400 int entry_id = l + k * num_nodes_in_sendblock;
410 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
411 printf(
"[DEBUG] ######FTSW#########\n");
418 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
422 #ifndef NO_DATA_CHECK
424 temp_rawftsw->
CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
425 eve_array[ entry_id ] = cur_evenum;
426 }
catch (
string err_str) {
427 print_err.PrintError(m_shmflag, &g_status, err_str);
431 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
438 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
445 printf(
"[DEBUG] ######TLU#########\n");
449 #ifndef NO_DATA_CHECK
451 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
452 eve_array[ entry_id ] = cur_evenum;
453 }
catch (
string err_str) {
454 print_err.PrintError(m_shmflag, &g_status, err_str);
473 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
474 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
475 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
480 #ifndef NO_DATA_CHECK
483 temp_rawcopper->
CheckData(0, m_prev_evenum, &cur_evenum,
484 m_prev_copper_ctr, &cur_copper_ctr,
485 m_prev_exprunsubrun_no, &m_exprunsubrun_no);
486 eve_array[ entry_id ] = cur_evenum;
487 }
catch (
string err_str) {
488 print_err.PrintError(m_shmflag, &g_status, err_str);
493 utime_array[ entry_id ] = temp_rawcopper->
GetTTUtime(0);
498 *eve_copper_0 = (raw_datablk->
GetBuffer(entry_id))[ 3 ];
499 }
else if (cpr_num == 1) {
503 delete temp_rawcopper;
510 #ifndef NO_DATA_CHECK
512 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
513 if (eve_array[ 0 ] != eve_array[ l ] ||
514 utime_array[ 0 ] != utime_array[ l ] ||
515 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
517 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
518 printf(
"[DEBUG] node %d eve # %d utime %x ctime %x\n",
519 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
521 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
522 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
538 m_prev_evenum = cur_evenum;
539 m_prev_copper_ctr = cur_copper_ctr;
540 m_prev_exprunsubrun_no = m_exprunsubrun_no;
547 void DeSerializerPrePCModule::event()
551 unsigned int eve_copper_0 = 0;
555 if (m_start_flag == 0) {
560 if (g_status.isAvailable()) {
561 B2INFO(
"DeSerializerPrePC: Waiting for Start...\n");
562 g_status.reportRunning();
564 m_start_time = getTimeSec();
574 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
580 int delete_flag_from = 0, delete_flag_to = 0;
582 setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
584 checkData(&temp_rawdatablk, &eve_copper_0);
591 int temp_num_events, temp_num_nodes;
594 #ifdef REDUCED_RAWCOPPER
603 int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
604 buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
607 m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
608 if (calced_temp_nwords_to != temp_nwords_to) {
611 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
612 calced_temp_nwords_to, temp_nwords_to);
613 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
622 delete_flag_to = delete_flag_from;
623 delete_flag_from = 0;
630 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
634 raw_datablk->
SetBuffer(buf_to, temp_nwords_to, delete_flag_to,
635 temp_num_events, temp_num_nodes);
641 #ifdef REDUCED_RAWCOPPER
643 post_rawcopper_latest.
SetBuffer(buf_to, temp_nwords_to,
644 0, temp_num_events, temp_num_nodes);
646 for (
int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
649 post_rawcopper_latest.
CheckCRC16(block_num, i_finesse_num);
661 m_eventMetaDataPtr.create();
662 m_eventMetaDataPtr->setExperiment(1);
663 m_eventMetaDataPtr->setRun(1);
664 m_eventMetaDataPtr->setEvent(n_basf2evt);
684 if (max_nevt >= 0 || max_seconds >= 0.) {
685 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
686 || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
687 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
688 max_nevt, max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
689 m_eventMetaDataPtr->setEndOfData();
693 if (n_basf2evt % 20000 == 0 || n_basf2evt < 10) {
694 RateMonitor(eve_copper_0);
699 if (g_status.isAvailable()) {
700 g_status.setInputNBytes(m_totbytes);
701 g_status.setInputCount(n_basf2evt);
A class definition of an input module for Sequential ROOT I/O.
A class definition of an input module for Sequential ROOT I/O.
The Raw COPPER class ver.
int CheckCRC16(int n, int finesse_num)
check magic words
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
virtual int GetBufferPos(int n)
get position of data block in word
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBlockNwords(int n)
get size of a data block
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetEveNo(int n)
Get event #.
unsigned int GetTTUtime(int n)
get unixtime of the trigger
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
unsigned int GetEveNo(int n)
Get Event #.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
int GetFINESSENwords(int n, int finesse_num) OVERRIDE_CPP17
Get the size of a finesse buffer.
Abstract base class for different kinds of events.