9#include <daq/rawdata/modules/DeSerializerPC.h>
10#include <daq/dataobjects/SendHeader.h>
11#include <daq/dataobjects/SendTrailer.h>
12#include <rawdata/dataobjects/RawTLU.h>
15#include <netinet/tcp.h>
22#define USE_DESERIALIZER_PREPC
40#ifndef REDUCED_RAWCOPPER
41#ifdef USE_DESERIALIZER_PREPC
56 B2INFO(
"DeSerializerPC: Constructor done.");
63DeSerializerPCModule::~DeSerializerPCModule()
71 B2INFO(
"DeSerializerPC: initialize() started.");
75 B2FATAL(
"[FATAL] Hostname or port# is not specified for all connections. Please check a python script. Exiting... \n");
83 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
90 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
95 m_eventMetaDataPtr.registerInDataStore();
97 raw_datablkarray.registerInDataStore();
98 rawcprarray.registerInDataStore();
99 raw_ftswarray.registerInDataStore();
109 memset(time_array0, 0,
sizeof(time_array0));
110 memset(time_array1, 0,
sizeof(time_array1));
111 memset(time_array2, 0,
sizeof(time_array2));
127 m_prev_copper_ctr = 0xFFFFFFFF;
128 m_prev_evenum = 0xFFFFFFFF;
134 B2INFO(
"DeSerializerPC: initialize() done.");
143 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n, flag)) < 0) {
144 if (errno == EINTR) {
146 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
150 callCheckRunPause(err_str);
155 sprintf(err_buf,
"[WARNING] recv() returned error; ret = %d. : %s %s %d",
156 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
160 string err_str =
"RUN_ERROR";
166 }
else if (read_size == 0) {
168 sprintf(err_buf,
"[WARNING] Connection is closed by peer(%s).: %s %s %d",
169 strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
173 string err_str =
"RUN_ERROR";
180 if (n == data_size_byte)
break;
193 struct sockaddr_in socPC;
194 socPC.sin_family = AF_INET;
196 struct hostent* host;
200 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...",
m_hostname_from[ i ].c_str(),
206 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
208 int sd = socket(PF_INET, SOCK_STREAM, 0);
211 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
213 struct timeval timeout;
216 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)
sizeof(timeout));
220 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
221 perror(
"Failed to connect. Retrying...");
224 printf(
"[DEBUG] Done\n");
235 getsockopt(
m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
237 printf(
"[DEBUG] SO_RCVBUF %d\n", val);
239 getsockopt(
m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
241 printf(
"[DEBUG] SO_SNDBUF %d\n", val);
243 getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
245 printf(
"[DEBUG] TCP_MAXSEG %d\n", val);
247 getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
249 printf(
"[DEBUG] TCP_NODELAY %d\n", val);
253 memset(&sa, 0,
sizeof(sockaddr_in));
254 socklen_t sa_len =
sizeof(sa);
255 if (getsockname(
m_socket[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
256 g_status.setInputPort(ntohs(sa.sin_port));
257 g_status.setInputAddress(sa.sin_addr.s_addr);
262 printf(
"[DEBUG] Initialization finished\n");
269 int* num_nodes_in_sendblock)
272 int* temp_buf = NULL;
275 vector <int> each_buf_nwords;
276 each_buf_nwords.clear();
277 vector <int> each_buf_nodes;
278 each_buf_nodes.clear();
279 vector <int> each_buf_events;
280 each_buf_events.clear();
282 *total_buf_nwords = 0;
283 *num_nodes_in_sendblock = 0;
284 *num_events_in_sendblock = 0;
289 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
292 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
294 recvFD(
m_socket[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
300 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
305 *num_events_in_sendblock = temp_num_events;
306 }
else if (*num_events_in_sendblock != temp_num_events) {
309 printf(
"[DEBUG] *******HDR**********\n");
310 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
313 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
314 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
321 *num_nodes_in_sendblock += temp_num_nodes;
323 int rawblk_nwords = send_hdr.GetTotalNwords()
324 - SendHeader::SENDHDR_NWORDS
325 - SendTrailer::SENDTRL_NWORDS;
326 *total_buf_nwords += rawblk_nwords;
331 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
332 printf(
"[DEBUG] *******HDR**********\n");
337 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
338 send_hdr.GetTotalNwords());
345 each_buf_nwords.push_back(rawblk_nwords);
346 each_buf_events.push_back(temp_num_events);
347 each_buf_nodes.push_back(temp_num_nodes);
352 temp_buf =
getNewBuffer(*total_buf_nwords, delete_flag);
356 int total_recvd_byte = 0;
357 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
360 total_recvd_byte +=
recvFD(
m_socket[ i ], (
char*)temp_buf + total_recvd_byte,
361 each_buf_nwords[ i ] *
sizeof(
int), flag);
362 }
catch (
const string& err_str) {
364 B2WARNING(
"Delete buffer before going to Run-pause state");
374 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
375 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
376 temp_length += this_length *
sizeof(int);
378 if (temp_length != (
int)(each_buf_nwords[ i ] *
sizeof(int))) {
379 printf(
"[DEBUG]*******SENDHDR*********** \n");
380 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
381 printf(
"[DEBUG]*******BODY***********\n ");
382 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
384 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
385 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
393 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
395 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
396 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
403 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
404 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
406 recvFD(
m_socket[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
407 }
catch (
const string& err_str) {
409 B2WARNING(
"Delete buffer before going to Run-pause state");
425 int total_buf_nwords = 0 ;
426 int num_events_in_sendblock = 0;
427 int num_nodes_in_sendblock = 0;
429 if (
m_start_flag == 0) B2INFO(
"DeSerializerPC: Reading the 1st packet from eb0...");
431 int* temp_buf =
recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
432 &num_nodes_in_sendblock);
434 B2INFO(
"DeSerializerPC: Done. the size of the 1st packet " << total_buf_nwords <<
" words");
437 m_totbytes += total_buf_nwords *
sizeof(int);
442 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, 0,
444 num_events_in_sendblock, num_nodes_in_sendblock);
450 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
453 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
454 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
466 unsigned int* subrun_copper_0,
unsigned int* eve_copper_0,
unsigned int* error_bit_flag)
475 int* temp_buf = raw_datablk->
GetBuffer(0);
477 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
478 unsigned int eve_array[32];
479 unsigned int utime_array[32];
480 unsigned int ctime_type_array[32];
483 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
487 memset(eve_array, 0,
sizeof(eve_array));
488 memset(utime_array, 0,
sizeof(utime_array));
489 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
491 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
492 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
493 int entry_id = l + k * num_nodes_in_sendblock;
502 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
503 printf(
"[DEBUG] ######FTSW#########\n");
510 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
517 eve_array[ entry_id ] = cur_evenum;
518 }
catch (
const string& err_str) {
523 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
530 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
537 printf(
"[DEBUG] ######TLU#########\n");
543 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
544 eve_array[ entry_id ] = cur_evenum;
545 }
catch (
const string& err_str) {
564 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
565 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
566 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
572 temp_rawcopper->
CheckData(0, m_prev_evenum, &cur_evenum,
573 m_prev_copper_ctr, &cur_copper_ctr,
575 eve_array[ entry_id ] = cur_evenum;
576 }
catch (
const string& err_str) {
583 utime_array[ entry_id ] = temp_rawcopper->
GetTTUtime(0);
589 *eve_copper_0 = temp_rawcopper->
GetEveNo(0);
590 *exp_copper_0 = temp_rawcopper->
GetExpNo(0);
591 *run_copper_0 = temp_rawcopper->
GetRunNo(0);
593 }
else if (cpr_num == 1) {
601 delete temp_rawcopper;
607 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
608 if (eve_array[ 0 ] != eve_array[ l ]) {
613 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
614 printf(
"[DEBUG] node %d eve # %u utime %x ctime %x\n",
615 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
617 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
627 printf(
"[DEBUG] ##############################################\n");
629 printf(
"[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
631 printf(
"[DEBUG] ##############################################\n");
634 m_prev_evenum = cur_evenum;
635 m_prev_copper_ctr = cur_copper_ctr;
642void DeSerializerPCModule::waitResume()
647 printf(
"###########(Ser) Waiting for Resume ###############\n");
651 if (checkRunRecovery()) {
670 if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
674 if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
678 if (error_set) B2INFO(
"Raw2Ds: Error flag was set in EventMetaData.");
686 unsigned int exp_copper_0 = 0;
687 unsigned int run_copper_0 = 0;
688 unsigned int subrun_copper_0 = 0;
689 unsigned int eve_copper_0 = 0;
690 unsigned int error_bit_flag = 0;
700 B2INFO(
"DeSerializerPC: Waiting for Start...\n");
709 if (g_run_pause > 0 || g_run_error > 0) {
710 if (g_run_pause == 0) {
712 if (checkRunPause())
break;
715 printf(
"###########(DeserializerPC) Waiting for Runpause() ###############\n");
723 m_eventMetaDataPtr.create();
735 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
744 checkData(&temp_rawdatablk, &exp_copper_0, &run_copper_0, &subrun_copper_0, &eve_copper_0, &error_bit_flag);
745 }
catch (
const string& err_str) {
748 if (err_str ==
"RUN_PAUSE" || err_str ==
"RUN_ERROR") {
749 m_eventMetaDataPtr.create();
753 print_err.PrintError((
char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
757 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
762 if (buf_rc != NULL) {
769 m_eventMetaDataPtr.create();
770 m_eventMetaDataPtr->setExperiment(exp_copper_0);
771 m_eventMetaDataPtr->setRun(run_copper_0);
772 m_eventMetaDataPtr->setSubrun(subrun_copper_0);
773 m_eventMetaDataPtr->setEvent(eve_copper_0);
775 setErrorFlag(error_bit_flag, m_eventMetaDataPtr);
776 if (error_bit_flag != 0) {
778 printf(
"[ERROR] error bit was detected. exp %u run %u eve %u count = %u\n",
779 exp_copper_0, run_copper_0, eve_copper_0, error_bit_flag);
801 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
803 m_eventMetaDataPtr->setEndOfData();
808 RateMonitor(eve_copper_0, subrun_copper_0, run_copper_0);
812 g_status.setInputNBytes(m_totbytes);
A class definition of an input module for Sequential ROOT I/O.
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
std::string m_nodename
Node name.
int m_start_flag
start flag
int BUF_SIZE_WORD
size of buffer for one event (word)
static RunInfoBuffer g_status
buffer class to communicate with NSM client
int n_basf2evt
No. of sent events.
CprErrorMessage print_err
wrapper for B2LOG system
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
int m_shmflag
Use shared memory.
int * m_bufary[NUM_PREALLOC_BUF]
buffer
unsigned int m_exprunsubrun_no
run no.
virtual void printASCIIData(int *buf, int nwords)
dump error data
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
int m_nodeid
Node(PC or COPPER) ID.
double getTimeSec()
store time info.
std::vector< int > m_socket
Reciever Socket.
virtual int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
void initialize() override
Module functions to be called from main process.
void event() override
Module functions to be called from event process.
std::vector< int > m_port_from
port # to connect data sources
virtual int Connect()
Accept connection.
virtual int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
virtual void setRecvdBuffer(RawDataBlock *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
DeSerializerPCModule()
Constructor / Destructor.
virtual void checkData(RawDataBlock *raw_datablk, unsigned int *exp_copper_0, unsigned int *run_copper_0, unsigned int *subrun_copper_0, unsigned int *eve_copper_0, unsigned int *error_bit_flag)
check data contents
std::vector< std::string > m_hostname_from
hostname of upstream Data Sources
void setDescription(const std::string &description)
Sets the description of the module.
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
virtual void PrintData(int *buf, int nwords)
print data
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetBufferPos(int n)
get position of data block in word
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBlockNwords(int n)
get size of a data block
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetEveNo(int n)
Get event #.
unsigned int GetTTUtime(int n)
get unixtime of the trigger
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
unsigned int GetEveNo(int n)
Get Event #.
Type-safe access to single objects in the data store.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
int GetExpNo(int n)
get Experimental # from header
unsigned int GetEveNo(int n)
get subrun #(8bit)
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
int GetSubRunNo(int n)
get run # (14bit)
unsigned int GetErrorBitFlag(int n)
get contents of header
int GetRunNo(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Abstract base class for different kinds of events.