9 #include <daq/rawdata/modules/DeSerializerPC.h>
10 #include <daq/dataobjects/SendHeader.h>
11 #include <daq/dataobjects/SendTrailer.h>
12 #include <rawdata/dataobjects/RawTLU.h>
15 #include <netinet/tcp.h>
22 #define USE_DESERIALIZER_PREPC
40 #ifndef REDUCED_RAWCOPPER
41 #ifdef USE_DESERIALIZER_PREPC
56 B2INFO(
"DeSerializerPC: Constructor done.");
63 DeSerializerPCModule::~DeSerializerPCModule()
71 B2INFO(
"DeSerializerPC: initialize() started.");
75 B2FATAL(
"[FATAL] Hostname or port# is not specified for all connections. Please check a python script. Exiting... \n");
83 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
90 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
95 m_eventMetaDataPtr.registerInDataStore();
97 raw_datablkarray.registerInDataStore();
98 rawcprarray.registerInDataStore();
99 raw_ftswarray.registerInDataStore();
109 memset(time_array0, 0,
sizeof(time_array0));
110 memset(time_array1, 0,
sizeof(time_array1));
111 memset(time_array2, 0,
sizeof(time_array2));
127 m_prev_copper_ctr = 0xFFFFFFFF;
128 m_prev_evenum = 0xFFFFFFFF;
134 B2INFO(
"DeSerializerPC: initialize() done.");
143 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n , flag)) < 0) {
144 if (errno == EINTR) {
146 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
150 callCheckRunPause(err_str);
155 sprintf(err_buf,
"[WARNING] recv() returned error; ret = %d. : %s %s %d",
156 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
160 string err_str =
"RUN_ERROR";
166 }
else if (read_size == 0) {
168 sprintf(err_buf,
"[WARNING] Connection is closed by peer(%s).: %s %s %d",
169 strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
173 string err_str =
"RUN_ERROR";
180 if (n == data_size_byte)
break;
193 struct sockaddr_in socPC;
194 socPC.sin_family = AF_INET;
196 struct hostent* host;
200 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...",
m_hostname_from[ i ].c_str(),
206 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
208 int sd = socket(PF_INET, SOCK_STREAM, 0);
211 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
213 struct timeval timeout;
216 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)
sizeof(timeout));
220 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
221 perror(
"Failed to connect. Retrying...");
224 printf(
"[DEBUG] Done\n");
235 getsockopt(
m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
237 printf(
"[DEBUG] SO_RCVBUF %d\n", val);
239 getsockopt(
m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
241 printf(
"[DEBUG] SO_SNDBUF %d\n", val);
243 getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
245 printf(
"[DEBUG] TCP_MAXSEG %d\n", val);
247 getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
249 printf(
"[DEBUG] TCP_NODELAY %d\n", val);
253 memset(&sa, 0,
sizeof(sockaddr_in));
254 socklen_t sa_len =
sizeof(sa);
255 if (getsockname(
m_socket[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
256 g_status.setInputPort(ntohs(sa.sin_port));
257 g_status.setInputAddress(sa.sin_addr.s_addr);
262 printf(
"[DEBUG] Initialization finished\n");
269 int* num_nodes_in_sendblock)
272 int* temp_buf = NULL;
275 vector <int> each_buf_nwords;
276 each_buf_nwords.clear();
277 vector <int> each_buf_nodes;
278 each_buf_nodes.clear();
279 vector <int> each_buf_events;
280 each_buf_events.clear();
282 *total_buf_nwords = 0;
283 *num_nodes_in_sendblock = 0;
284 *num_events_in_sendblock = 0;
289 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
290 int temp_num_events = 0;
291 int temp_num_nodes = 0;
294 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
296 recvFD(
m_socket[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
302 temp_num_nodes = send_hdr.GetNumNodesinPacket();
307 *num_events_in_sendblock = temp_num_events;
308 }
else if (*num_events_in_sendblock != temp_num_events) {
310 #ifndef NO_DATA_CHECK
311 printf(
"[DEBUG] *******HDR**********\n");
312 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
315 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
316 *num_events_in_sendblock , temp_num_events, i, *num_nodes_in_sendblock , temp_num_nodes, i);
323 *num_nodes_in_sendblock += temp_num_nodes;
325 int rawblk_nwords = send_hdr.GetTotalNwords()
326 - SendHeader::SENDHDR_NWORDS
327 - SendTrailer::SENDTRL_NWORDS;
328 *total_buf_nwords += rawblk_nwords;
333 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
334 printf(
"[DEBUG] *******HDR**********\n");
339 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
340 send_hdr.GetTotalNwords());
347 each_buf_nwords.push_back(rawblk_nwords);
348 each_buf_events.push_back(temp_num_events);
349 each_buf_nodes.push_back(temp_num_nodes);
354 temp_buf =
getNewBuffer(*total_buf_nwords, delete_flag);
358 int total_recvd_byte = 0;
359 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
362 total_recvd_byte +=
recvFD(
m_socket[ i ], (
char*)temp_buf + total_recvd_byte,
363 each_buf_nwords[ i ] *
sizeof(
int), flag);
364 }
catch (
string err_str) {
366 B2WARNING(
"Delete buffer before going to Run-pause state");
376 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
377 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
378 temp_length += this_length *
sizeof(int);
380 if (temp_length != (
int)(each_buf_nwords[ i ] *
sizeof(int))) {
381 printf(
"[DEBUG]*******SENDHDR*********** \n");
382 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
383 printf(
"[DEBUG]*******BODY***********\n ");
384 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
386 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
387 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
395 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
397 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
398 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
405 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
406 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
408 recvFD(
m_socket[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
409 }
catch (
string err_str) {
411 B2WARNING(
"Delete buffer before going to Run-pause state");
427 int total_buf_nwords = 0 ;
428 int num_events_in_sendblock = 0;
429 int num_nodes_in_sendblock = 0;
431 if (
m_start_flag == 0) B2INFO(
"DeSerializerPC: Reading the 1st packet from eb0...");
433 int* temp_buf =
recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
434 &num_nodes_in_sendblock);
436 B2INFO(
"DeSerializerPC: Done. the size of the 1st packet " << total_buf_nwords <<
" words");
439 m_totbytes += total_buf_nwords *
sizeof(int);
444 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, 0,
446 num_events_in_sendblock, num_nodes_in_sendblock);
452 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
455 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
456 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
468 unsigned int* subrun_copper_0,
unsigned int* eve_copper_0,
unsigned int* error_bit_flag)
477 int* temp_buf = raw_datablk->
GetBuffer(0);
479 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
480 unsigned int eve_array[32];
481 unsigned int utime_array[32];
482 unsigned int ctime_type_array[32];
485 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
489 memset(eve_array, 0,
sizeof(eve_array));
490 memset(utime_array, 0,
sizeof(utime_array));
491 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
493 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
494 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
495 int entry_id = l + k * num_nodes_in_sendblock;
504 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
505 printf(
"[DEBUG] ######FTSW#########\n");
512 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
516 #ifndef NO_DATA_CHECK
519 eve_array[ entry_id ] = cur_evenum;
520 }
catch (
string err_str) {
525 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
532 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
539 printf(
"[DEBUG] ######TLU#########\n");
543 #ifndef NO_DATA_CHECK
545 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
546 eve_array[ entry_id ] = cur_evenum;
547 }
catch (
string err_str) {
566 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
567 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
568 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
571 #ifndef NO_DATA_CHECK
574 temp_rawcopper->
CheckData(0, m_prev_evenum, &cur_evenum,
575 m_prev_copper_ctr, &cur_copper_ctr,
577 eve_array[ entry_id ] = cur_evenum;
578 }
catch (
string err_str) {
585 utime_array[ entry_id ] = temp_rawcopper->
GetTTUtime(0);
591 *eve_copper_0 = temp_rawcopper->
GetEveNo(0);
592 *exp_copper_0 = temp_rawcopper->
GetExpNo(0);
593 *run_copper_0 = temp_rawcopper->
GetRunNo(0);
595 }
else if (cpr_num == 1) {
603 delete temp_rawcopper;
607 #ifndef NO_DATA_CHECK
609 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
610 if (eve_array[ 0 ] != eve_array[ l ]) {
615 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
616 printf(
"[DEBUG] node %d eve # %d utime %x ctime %x\n",
617 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
619 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
629 printf(
"[DEBUG] ##############################################\n");
631 printf(
"[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
633 printf(
"[DEBUG] ##############################################\n");
636 m_prev_evenum = cur_evenum;
637 m_prev_copper_ctr = cur_copper_ctr;
644 void DeSerializerPCModule::waitResume()
649 printf(
"###########(Ser) Waiting for Resume ###############\n");
653 if (checkRunRecovery()) {
672 if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
676 if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
680 if (error_set) B2INFO(
"Raw2Ds: Error flag was set in EventMetaData.");
688 unsigned int exp_copper_0 = 0;
689 unsigned int run_copper_0 = 0;
690 unsigned int subrun_copper_0 = 0;
691 unsigned int eve_copper_0 = 0;
692 unsigned int error_bit_flag = 0;
702 B2INFO(
"DeSerializerPC: Waiting for Start...\n");
711 if (g_run_pause > 0 || g_run_error > 0) {
712 if (g_run_pause == 0) {
714 if (checkRunPause())
break;
717 printf(
"###########(DeserializerPC) Waiting for Runpause() ###############\n");
725 m_eventMetaDataPtr.create();
737 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
746 checkData(&temp_rawdatablk, &exp_copper_0, &run_copper_0, &subrun_copper_0, &eve_copper_0, &error_bit_flag);
747 }
catch (
string err_str) {
750 if (err_str ==
"RUN_PAUSE" || err_str ==
"RUN_ERROR") {
751 m_eventMetaDataPtr.create();
755 print_err.PrintError((
char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
759 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
764 if (buf_rc != NULL) {
771 m_eventMetaDataPtr.create();
772 m_eventMetaDataPtr->setExperiment(exp_copper_0);
773 m_eventMetaDataPtr->setRun(run_copper_0);
774 m_eventMetaDataPtr->setSubrun(subrun_copper_0);
775 m_eventMetaDataPtr->setEvent(eve_copper_0);
777 setErrorFlag(error_bit_flag, m_eventMetaDataPtr);
778 if (error_bit_flag != 0) {
780 printf(
"[ERROR] error bit was detected. exp %d run %d eve %d count = %d\n",
781 exp_copper_0, run_copper_0, eve_copper_0, error_bit_flag);
803 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
805 m_eventMetaDataPtr->setEndOfData();
810 RateMonitor(eve_copper_0, subrun_copper_0, run_copper_0);
814 g_status.setInputNBytes(m_totbytes);
A class definition of an input module for Sequential ROOT I/O.
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
std::string m_nodename
Node name.
int m_start_flag
start flag
int BUF_SIZE_WORD
size of buffer for one event (word)
static RunInfoBuffer g_status
buffer class to communicate with NSM client
int n_basf2evt
No. of sent events.
CprErrorMessage print_err
wrapper for B2LOG system
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
int m_shmflag
Use shared memory.
int * m_bufary[NUM_PREALLOC_BUF]
buffer
unsigned int m_exprunsubrun_no
run no.
virtual void printASCIIData(int *buf, int nwords)
dump error data
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
int m_nodeid
Node(PC or COPPER) ID.
double getTimeSec()
store time info.
std::vector< int > m_socket
Reciever Socket.
virtual void event()
Module functions to be called from event process.
virtual int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
virtual void initialize()
Module functions to be called from main process.
std::vector< int > m_port_from
port # to connect data sources
virtual int Connect()
Accept connection.
virtual int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
virtual void setRecvdBuffer(RawDataBlock *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
virtual void checkData(RawDataBlock *raw_datablk, unsigned int *exp_copper_0, unsigned int *run_copper_0, unsigned int *subrun_copper_0, unsigned int *eve_copper_0, unsigned int *error_bit_flag)
check data contents
std::vector< std::string > m_hostname_from
hostname of upstream Data Sources
void setDescription(const std::string &description)
Sets the description of the module.
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
virtual void PrintData(int *buf, int nwords)
print data
virtual int GetBufferPos(int n)
get position of data block in word
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBlockNwords(int n)
get size of a data block
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetEveNo(int n)
Get event #.
unsigned int GetTTUtime(int n)
get unixtime of the trigger
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
unsigned int GetEveNo(int n)
Get Event #.
Type-safe access to single objects in the data store.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
int GetExpNo(int n)
get Experimental # from header
unsigned int GetEveNo(int n)
get subrun #(8bit)
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
int GetSubRunNo(int n)
get run # (14bit)
unsigned int GetErrorBitFlag(int n)
get contents of header
int GetRunNo(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Abstract base class for different kinds of events.