9#include <daq/rawdata/modules/DeSerializerPrePC.h>
10#include <daq/dataobjects/SendHeader.h>
11#include <daq/dataobjects/SendTrailer.h>
12#include <rawdata/dataobjects/RawTLU.h>
14#include <netinet/tcp.h>
44 B2INFO(
"DeSerializerPrePC: Constructor done.");
49DeSerializerPrePCModule::~DeSerializerPrePCModule()
57 B2INFO(
"DeSerializerPrePC: initialize() started.");
60 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
67 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
72 m_eventMetaDataPtr.registerInDataStore();
74 raw_datablkarray.registerInDataStore();
75 rawcprarray.registerInDataStore();
76 raw_ftswarray.registerInDataStore();
85 memset(time_array0, 0,
sizeof(time_array0));
86 memset(time_array1, 0,
sizeof(time_array1));
87 memset(time_array2, 0,
sizeof(time_array2));
103 m_prev_copper_ctr = 0xFFFFFFFF;
104 m_prev_evenum = 0xFFFFFFFF;
106 B2INFO(
"DeSerializerPrePC: initialize() done.");
115 if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n, flag)) < 0) {
116 if (errno == EINTR) {
120 sprintf(err_buf,
"[FATAL] Failed to receive data(%s). Exiting...", strerror(errno));
127 if (n == data_size_byte)
break;
139 struct sockaddr_in socPC;
140 socPC.sin_family = AF_INET;
142 struct hostent* host;
146 sprintf(err_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...",
m_hostname_from[ i ].c_str(),
152 socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
154 int sd = socket(PF_INET, SOCK_STREAM, 0);
157 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1,
sizeof(val1));
161 if (connect(sd, (
struct sockaddr*)(&socPC),
sizeof(socPC)) < 0) {
162 perror(
"Failed to connect. Retrying...");
165 printf(
"[DEBUG] Done\n");
174 getsockopt(
m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
176 printf(
"[DEBUG] SO_RCVBUF %d\n", val);
178 getsockopt(
m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
180 printf(
"[DEBUG] SO_SNDBUF %d\n", val);
182 getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
184 printf(
"[DEBUG] TCP_MAXSEG %d\n", val);
186 getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
188 printf(
"[DEBUG] TCP_NODELAY %d\n", val);
192 memset(&sa, 0,
sizeof(sockaddr_in));
193 socklen_t sa_len =
sizeof(sa);
194 if (getsockname(
m_socket[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
195 g_status.setInputPort(ntohs(sa.sin_port));
196 g_status.setInputAddress(sa.sin_addr.s_addr);
201 printf(
"[DEBUG] Initialization finished\n");
208 int* num_nodes_in_sendblock)
211 int* temp_buf = NULL;
214 vector <int> each_buf_nwords;
215 each_buf_nwords.clear();
216 vector <int> each_buf_nodes;
217 each_buf_nodes.clear();
218 vector <int> each_buf_events;
219 each_buf_events.clear();
221 *total_buf_nwords = 0;
222 *num_nodes_in_sendblock = 0;
223 *num_events_in_sendblock = 0;
228 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
231 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
233 recvFD(
m_socket[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
239 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
244 *num_events_in_sendblock = temp_num_events;
245 }
else if (*num_events_in_sendblock != temp_num_events) {
249 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
250 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
257 *num_nodes_in_sendblock += temp_num_nodes;
259 int rawblk_nwords = send_hdr.GetTotalNwords()
260 - SendHeader::SENDHDR_NWORDS
261 - SendTrailer::SENDTRL_NWORDS;
262 *total_buf_nwords += rawblk_nwords;
267 if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
268 printf(
"[DEBUG] *******HDR**********\n");
269 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
271 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
272 send_hdr.GetTotalNwords());
279 each_buf_nwords.push_back(rawblk_nwords);
280 each_buf_events.push_back(temp_num_events);
281 each_buf_nodes.push_back(temp_num_nodes);
286 temp_buf =
getNewBuffer(*total_buf_nwords, delete_flag);
290 int total_recvd_byte = 0;
291 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
292 total_recvd_byte +=
recvFD(
m_socket[ i ], (
char*)temp_buf + total_recvd_byte,
293 each_buf_nwords[ i ] *
sizeof(
int), flag);
298 unsigned temp_length = 0;
299 for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
300 int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] *
sizeof(int) + temp_length));
301 temp_length += this_length *
sizeof(int);
303 if (temp_length != each_buf_nwords[ i ] *
sizeof(
int)) {
304 printf(
"[DEBUG]*******SENDHDR*********** \n");
305 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
306 printf(
"[DEBUG]*******BODY***********\n ");
307 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
309 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %u. Exting...",
310 (
int)(*total_buf_nwords *
sizeof(
int)), temp_length);
318 if ((
int)(*total_buf_nwords *
sizeof(
int)) != total_recvd_byte) {
320 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
321 total_recvd_byte, (
int)(*total_buf_nwords *
sizeof(
int)));
328 int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
329 for (
int i = 0; i < (int)(
m_socket.size()); i++) {
330 recvFD(
m_socket[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
342 int total_buf_nwords = 0 ;
343 int num_events_in_sendblock = 0;
344 int num_nodes_in_sendblock = 0;
346 if (
m_start_flag == 0) B2INFO(
"DeSerializerPrePC: Reading the 1st packet from eb0...");
347 int* temp_buf =
recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
348 &num_nodes_in_sendblock);
350 B2INFO(
"DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords <<
" words");
353 m_totbytes += total_buf_nwords *
sizeof(int);
355 temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, *delete_flag,
356 num_events_in_sendblock, num_nodes_in_sendblock);
362 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
365 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
366 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
382 int* temp_buf = raw_datablk->
GetBuffer(0);
384 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
385 unsigned int eve_array[32];
386 unsigned int utime_array[32];
387 unsigned int ctime_type_array[32];
390 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
394 memset(eve_array, 0,
sizeof(eve_array));
395 memset(utime_array, 0,
sizeof(utime_array));
396 memset(ctime_type_array, 0,
sizeof(ctime_type_array));
398 int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
399 for (
int l = 0; l < num_nodes_in_sendblock; l++) {
400 int entry_id = l + k * num_nodes_in_sendblock;
410 if (temp_rawftsw->
GetEveNo(block_id) < 10) {
411 printf(
"[DEBUG] ######FTSW#########\n");
418 utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
425 eve_array[ entry_id ] = cur_evenum;
426 }
catch (
const string& err_str) {
431 utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
438 }
else if (raw_datablk->
CheckTLUID(entry_id)) {
445 printf(
"[DEBUG] ######TLU#########\n");
451 temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
452 eve_array[ entry_id ] = cur_evenum;
453 }
catch (
const string& err_str) {
473 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
474 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
475 (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
483 temp_rawcopper->
CheckData(0, m_prev_evenum, &cur_evenum,
484 m_prev_copper_ctr, &cur_copper_ctr,
486 eve_array[ entry_id ] = cur_evenum;
487 }
catch (
const string& err_str) {
493 utime_array[ entry_id ] = temp_rawcopper->
GetTTUtime(0);
498 *eve_copper_0 = (raw_datablk->
GetBuffer(entry_id))[ 3 ];
499 }
else if (cpr_num == 1) {
503 delete temp_rawcopper;
512 for (
int l = 1; l < num_nodes_in_sendblock; l++) {
513 if (eve_array[ 0 ] != eve_array[ l ] ||
514 utime_array[ 0 ] != utime_array[ l ] ||
515 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
517 for (
int m = 0; m < num_nodes_in_sendblock; m++) {
518 printf(
"[DEBUG] node %d eve # %u utime %x ctime %x\n",
519 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
521 sprintf(err_buf,
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
538 m_prev_evenum = cur_evenum;
539 m_prev_copper_ctr = cur_copper_ctr;
551 unsigned int eve_copper_0 = 0;
561 B2INFO(
"DeSerializerPrePC: Waiting for Start...\n");
574 for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
580 int delete_flag_from = 0, delete_flag_to = 0;
584 checkData(&temp_rawdatablk, &eve_copper_0);
591 int temp_num_events, temp_num_nodes;
594#ifdef REDUCED_RAWCOPPER
603 int calced_temp_nwords_to = m_pre_rawcpr.
CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
604 buf_to =
getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
607 m_pre_rawcpr.
CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
608 if (calced_temp_nwords_to != temp_nwords_to) {
611 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
612 calced_temp_nwords_to, temp_nwords_to);
613 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
622 delete_flag_to = delete_flag_from;
623 delete_flag_from = 0;
630 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
634 raw_datablk->
SetBuffer(buf_to, temp_nwords_to, delete_flag_to,
635 temp_num_events, temp_num_nodes);
641#ifdef REDUCED_RAWCOPPER
643 post_rawcopper_latest.
SetBuffer(buf_to, temp_nwords_to,
644 0, temp_num_events, temp_num_nodes);
646 for (
int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
649 post_rawcopper_latest.
CheckCRC16(block_num, i_finesse_num);
661 m_eventMetaDataPtr.create();
662 m_eventMetaDataPtr->setExperiment(1);
663 m_eventMetaDataPtr->setRun(1);
687 printf(
"[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
689 m_eventMetaDataPtr->setEndOfData();
700 g_status.setInputNBytes(m_totbytes);
A class definition of an input module for Sequential ROOT I/O.
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
std::string m_nodename
Node name.
int m_start_flag
start flag
int BUF_SIZE_WORD
size of buffer for one event (word)
static RunInfoBuffer g_status
buffer class to communicate with NSM client
int n_basf2evt
No. of sent events.
CprErrorMessage print_err
wrapper for B2LOG system
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
int m_shmflag
Use shared memory.
int * m_bufary[NUM_PREALLOC_BUF]
buffer
unsigned int m_exprunsubrun_no
run no.
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
int m_nodeid
Node(PC or COPPER) ID.
double getTimeSec()
store time info.
std::vector< int > m_socket
Reciever Socket.
virtual int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
void initialize() override
Module functions to be called from main process.
void event() override
Module functions to be called from event process.
virtual void checkData(RawDataBlock *raw_datablk, unsigned int *eve_copper_0)
check data contents
std::vector< int > m_port_from
port # to connect data sources
virtual int Connect()
Accept connection.
virtual int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
DeSerializerPrePCModule()
Constructor / Destructor.
virtual void setRecvdBuffer(RawDataBlock *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
std::vector< std::string > m_hostname_from
hostname of upstream Data Sources
void setDescription(const std::string &description)
Sets the description of the module.
The Raw COPPER class ver.
int CheckCRC16(int n, int finesse_num)
check magic words
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetBufferPos(int n)
get position of data block in word
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBlockNwords(int n)
get size of a data block
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetEveNo(int n)
Get event #.
unsigned int GetTTUtime(int n)
get unixtime of the trigger
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
unsigned int GetEveNo(int n)
Get Event #.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
int GetFINESSENwords(int n, int finesse_num) OVERRIDE_CPP17
Get the size of a finesse buffer.
Abstract base class for different kinds of events.