Belle II Software development
DesSerPrePC Class Reference

A class definition of an input module for Sequential ROOT I/O. More...

#include <DesSerPrePC.h>

Inheritance diagram for DesSerPrePC:
DesSer

Public Member Functions

 DesSerPrePC (std::string host_recv, int port_recv, const std::string &host_send, int port_send, int shmflag, const std::string &nodename, int nodeid)
 Constructor / Destructor.
 
void DataAcquisition () override
 Module functions to be called from event process.
 
void initialize (bool close_listen=true)
 
void printData (int *buf, int nwords)
 dump error data
 
int * getPreAllocBuf ()
 Getbuffer.
 
int * getNewBuffer (int nwords, int *delete_flag)
 Getbuffer.
 
double getTimeSec ()
 store time info.
 
void recordTime (int event, double *array)
 store time info.
 
unsigned int calcSimpleChecksum (int *buf, int nwords)
 calculate checksum
 
unsigned int calcXORChecksum (int *buf, int nwords)
 calculate checksum
 
void clearNumUsedBuf ()
 
void RateMonitor (unsigned int nevt)
 monitor rate
 
void shmOpen (char *path_cfg, char *path_sta)
 open shared memory
 
int * shmGet (int fd, int size_words)
 Get shared memory.
 
void initializeNode ()
 Module functions to be called from main process.
 
void beginRun ()
 Module functions to be called from event process.
 
void endRun ()
 
void terminate ()
 
int sendByWriteV (RawDataBlockFormat *rawdblk)
 
void Accept (bool close_listen=true)
 
void fillSendHeaderTrailer (SendHeader *hdr, SendTrailer *trl, RawDataBlockFormat *rawdblk)
 
int Send (int socket, char *buf, int size_bytes)
 send buffer
 

Public Attributes

int max_nevt
 
double max_seconds
 time to stop a run
 
int m_compressionLevel
 Compression Level.
 
int n_basf2evt
 No. of sent events.
 
int m_prev_nevt
 No. of prev sent events.
 
std::string m_dump_fname
 dump filename
 
FILE * m_fp_dump
 dump file descripter
 
int * m_buffer
 buffer
 
int * m_bufary [NUM_PREALLOC_BUF]
 buffer
 
timeval m_t0
 
double m_recvd_totbytes
 
double m_recvd_prev_totbytes
 
double m_sent_totbytes
 
double m_sent_prev_totbytes
 
int m_ncycle
 
double cur_time
 for time monitoring
 
double m_start_time
 
double m_prev_time
 
int prev_event
 
int m_num_usedbuf
 
std::string m_nodename
 Node Name for SLC.
 
int m_nodeid
 Node ID for SLC.
 
RunInfoBuffer m_status
 Run info buffer.
 
unsigned int m_exprunsubrun_no
 run no.
 
unsigned int m_prev_exprunsubrun_no
 run no.
 
int m_exp_no
 exp no.
 
int m_data_type
 data type
 
int m_trunc_mask
 trunc mask
 
int m_shmflag
 Use shared memory.
 
int m_shmfd_cfg
 file descripter for shm
 
int m_shmfd_sta
 file descripter for shm
 
int monitor_numeve
 buffer for shared memory
 
int m_start_flag
 start flag
 
CprErrorMessage print_err
 wrapper for B2LOG system
 
PreRawCOPPERFormat_v2 m_pre_rawcpr
 report status to SLC
 
int * m_cfg_buf
 buffer for shared memory
 
int * m_cfg_sta
 buffer for shared memory
 
int m_run_pause
 flag to show that run-controller pauses a run
 
int m_run_error
 flag to show that there is some errors with which DAQ cannot continue.
 

Protected Types

enum  {
  COPPER = 1 ,
  ROPC = 2
}
 

Protected Member Functions

int Connect ()
 Accept connection.
 
int recvFD (int fd, char *buf, int data_size_byte, int flag)
 receive data
 
int * recvData (int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
 receive data
 
void setRecvdBuffer (RawDataBlockFormat *raw_datablk, int *delete_flag)
 attach buffer to RawDataBlock
 
void checkData (RawDataBlockFormat *raw_datablk, unsigned int *eve_copper_0)
 check data contents
 

Protected Attributes

int m_num_connections
 check data contents
 
std::vector< std::string > m_hostname_from
 Reciever basf2 Socket.
 
std::vector< int > m_port_from
 port # to connect data sources
 
std::vector< int > m_socket_recv
 
int event_diff
 
unsigned int m_prev_copper_ctr
 
unsigned int m_prev_evenum
 
int m_socket_send
 Reciever Socket.
 
std::string m_hostname_local
 Destination Host.
 
int m_port_to
 Destination port.
 
std::string p_method
 EvtSocket.
 
int p_method_val
 
RawHeader_v2 tmp_header
 which format is used
 

Detailed Description

A class definition of an input module for Sequential ROOT I/O.

Definition at line 26 of file DesSerPrePC.h.

Member Enumeration Documentation

◆ anonymous enum

anonymous enum
protectedinherited

Definition at line 269 of file DesSer.h.

269 {
270 COPPER = 1,
271 ROPC = 2
272 };

Constructor & Destructor Documentation

◆ DesSerPrePC()

DesSerPrePC ( std::string  host_recv,
int  port_recv,
const std::string &  host_send,
int  port_send,
int  shmflag,
const std::string &  nodename,
int  nodeid 
)

Constructor / Destructor.

Definition at line 28 of file DesSerPrePC.cc.

30{
31
32 for (int i = 0 ; i < m_num_connections; i++) {
33 // m_hostname_from.push_back( "localhost");
34 m_hostname_from.push_back(host_recv);
35 // m_port_from.push_back(30000);
36 m_port_from.push_back(port_recv) ;
37 m_socket_recv.push_back(-1);
38 }
39
40 // m_port_to = 31001;
41 m_port_to = port_send;
42 // m_hostname_local = "localhost";
43 m_hostname_local = host_send;
44 m_nodename = nodename;
45
46 m_shmflag = shmflag;
47
48 // B2INFO("DeSerializerPrePC: Constructor done.");
49 printf("[INFO] DeSerializerPrePC: Constructor done.\n"); fflush(stdout);
50}
int m_port_to
Destination port.
Definition: DesSer.h:278
std::string m_hostname_local
Destination Host.
Definition: DesSer.h:275
std::string m_nodename
Node Name for SLC.
Definition: DesSer.h:135
std::vector< int > m_port_from
port # to connect data sources
Definition: DesSer.h:246
int m_shmflag
Use shared memory.
Definition: DesSer.h:159
int m_num_connections
check data contents
Definition: DesSer.h:237
std::vector< std::string > m_hostname_from
Reciever basf2 Socket.
Definition: DesSer.h:243

◆ ~DesSerPrePC()

~DesSerPrePC ( )
virtual

Definition at line 54 of file DesSerPrePC.cc.

55{
56}

Member Function Documentation

◆ Accept()

void Accept ( bool  close_listen = true)
inherited

Definition at line 364 of file DesSer.cc.

365{
366 //
367 // Connect to cprtb01
368 //
369 struct hostent* host;
370 host = gethostbyname(m_hostname_local.c_str());
371 if (host == NULL) {
372 char temp_buf[500];
373 sprintf(temp_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
374 m_hostname_local.c_str(), strerror(errno));
375 print_err.PrintError(temp_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
376 exit(1);
377 }
378
379 //
380 // Bind and listen
381 //
382 int fd_listen;
383 struct sockaddr_in sock_listen;
384 sock_listen.sin_family = AF_INET;
385 sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
386
387 socklen_t addrlen = sizeof(sock_listen);
388 sock_listen.sin_port = htons(m_port_to);
389 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
390
391 int flags = 1;
392 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)sizeof(flags));
393 if (ret < 0) {
394 perror("Failed to set REUSEADDR");
395 }
396
397 if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
398 printf("[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
399 m_port_to); fflush(stdout);
400 // Check the process occupying the port 30000.
401 FILE* fp;
402 char buf[256];
403 char cmdline[500];
404 sprintf(cmdline, "/usr/sbin/ss -ap | grep %d", m_port_to);
405 if ((fp = popen(cmdline, "r")) == NULL) {
406 printf("[WARNING] Failed to run %s\n", cmdline);
407 }
408 while (fgets(buf, 256, fp) != NULL) {
409 printf("[ERROR] Failed to bind. output of ss(port %d) : %s\n", m_port_to, buf); fflush(stdout);
410 }
411 // Error message
412 fclose(fp);
413 char temp_char[500];
414 sprintf(temp_char, "[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
415 strerror(errno), m_port_to);
416 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
417 exit(1);
418 }
419
420 int val1 = 0;
421 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)sizeof(val1));
422 int backlog = 1;
423 if (listen(fd_listen, backlog) < 0) {
424 char err_buf[500];
425 sprintf(err_buf, "Failed in listen(%s). Exting...", strerror(errno));
426 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
427 exit(-1);
428 }
429
430 //
431 // Accept
432 //
433 int fd_accept;
434 struct sockaddr_in sock_accept;
435 printf("[DEBUG] Accepting... : port %d server %s\n", m_port_to, m_hostname_local.c_str());
436 fflush(stdout);
437
438 if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
439 char err_buf[500];
440 sprintf(err_buf, "[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
441 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
442 exit(-1);
443 } else {
444 // B2INFO("Done.");
445 printf("[DEBUG] Done.\n"); fflush(stdout);
446
447 // set timepout option
448 struct timeval timeout;
449 timeout.tv_sec = 1;
450 timeout.tv_usec = 0;
451 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)sizeof(timeout));
452 if (ret < 0) {
453 char temp_char[100] = "[FATAL] Failed to set TIMEOUT. Exiting...";
454 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
455 exit(-1);
456 }
457 }
458
459 if (close_listen) {
460 close(fd_listen);
461 }
462
463 // int flag = 1;
464 // ret = setsockopt(fd_accept, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag) );
465 m_socket_send = fd_accept;
466
467 if (m_status.isAvailable()) {
468 m_status.setOutputPort(ntohs(sock_listen.sin_port));
469 m_status.setOutputAddress(sock_listen.sin_addr.s_addr);
470 // B2INFO("Accepted " << (int)ntohs(sock_listen.sin_port) << " " << (int)sock_listen.sin_addr.s_addr);
471 printf("Accepted. port %d address %d\n", (int)ntohs(sock_listen.sin_port), (int)sock_listen.sin_addr.s_addr); fflush(stdout);
472 }
473
474 return;
475
476}
RunInfoBuffer m_status
Run info buffer.
Definition: DesSer.h:141
int m_socket_send
Reciever Socket.
Definition: DesSer.h:267
CprErrorMessage print_err
wrapper for B2LOG system
Definition: DesSer.h:184

◆ calcXORChecksum()

unsigned int calcXORChecksum ( int *  buf,
int  nwords 
)
inherited

calculate checksum

Definition at line 495 of file DesSer.cc.

496{
497 unsigned int checksum = 0;
498 for (int i = 0; i < nwords; i++) {
499 checksum = checksum ^ buf[ i ];
500 }
501 return checksum;
502}

◆ checkData()

void checkData ( RawDataBlockFormat raw_datablk,
unsigned int *  eve_copper_0 
)
protected

check data contents

Definition at line 381 of file DesSerPrePC.cc.

382{
383 // int data_size_copper_0 = -1;
384 // int data_size_copper_1 = -1;
385
386 //
387 // Data check
388 //
389 int* temp_buf = raw_datablk->GetBuffer(0);
390 int cpr_num = 0;
391 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
392 unsigned int eve_array[32]; // # of noeds is less than 17
393 unsigned int utime_array[32];// # of noeds is less than 17
394 unsigned int ctime_type_array[32];// # of noeds is less than 17
395
396#ifdef DUMHSLB
397 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
398#endif
399
400 for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
401 memset(eve_array, 0, sizeof(eve_array));
402 memset(utime_array, 0, sizeof(utime_array));
403 memset(ctime_type_array, 0, sizeof(ctime_type_array));
404
405 int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
406 for (int l = 0; l < num_nodes_in_sendblock; l++) {
407 int entry_id = l + k * num_nodes_in_sendblock;
408
409 //
410 // RawFTSW
411 //
412 if (raw_datablk->CheckFTSWID(entry_id)) {
413 RawFTSWFormat_latest* temp_rawftsw = new RawFTSWFormat_latest;
414 int block_id = 0;
415 temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
416 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
417 if (temp_rawftsw->GetEveNo(block_id) < 10) {
418 printf("[DEBUG] ######FTSW#########\n");
419 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
420 }
421
422#ifdef DUMHSLB
423 exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
424 ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
425 utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
426#endif
427
428
429#ifndef NO_DATA_CHECK
430 try {
431 temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
432 eve_array[ entry_id ] = cur_evenum;
433 } catch (const string& err_str) {
434 char err_buf[500];
435 strcpy(err_buf, err_str.c_str());
436 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
437 exit(1);
438 }
439#endif
440 utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
441 ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
442 delete temp_rawftsw;
443
444 //
445 // RawTLU
446 //
447 } else if (raw_datablk->CheckTLUID(entry_id)) {
448
449 RawTLUFormat* temp_rawtlu = new RawTLUFormat;
450 temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
451 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
452 if (temp_rawtlu->GetEveNo(0) < 10
453 ) {
454 printf("[DEBUG] ######TLU#########\n");
455 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
456 }
457
458#ifndef NO_DATA_CHECK
459 try {
460 temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
461 eve_array[ entry_id ] = cur_evenum;
462 } catch (const string& err_str) {
463 char err_buf[500];
464 strcpy(err_buf, err_str.c_str());
465 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
466 exit(1);
467 }
468#endif
469 delete temp_rawtlu;
470 } else {
471 //
472 // RawCOPPER
473 //
474 PreRawCOPPERFormat_v2* pre_rawcpr_fmt = new PreRawCOPPERFormat_v2;
475 pre_rawcpr_fmt->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
476 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
477
478#ifdef DUMHSLB
479 int block_id = 0;
480 "do not use the following for actual DAQ"
481 (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_EXP_RUN_NO ] = exp_run_ftsw;
482 (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
483 (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_TTUTIME ] = utime_ftsw;
484#endif
485
486#ifndef NO_DATA_CHECK
487 try {
488 pre_rawcpr_fmt->CheckData(0, m_prev_evenum, &cur_evenum,
489 m_prev_copper_ctr, &cur_copper_ctr,
491 eve_array[ entry_id ] = cur_evenum;
492 } catch (const string& err_str) {
493 exit(1); // Error in the contents of an event was detected
494 }
495#endif
496
497 utime_array[ entry_id ] = pre_rawcpr_fmt->GetTTUtime(0);
498 ctime_type_array[ entry_id ] = pre_rawcpr_fmt->GetTTCtimeTRGType(0);
499
500 if (cpr_num == 0) {
501 // data_size_copper_0 = raw_datablk->GetBlockNwords(entry_id);
502 *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
503 } else if (cpr_num == 1) {
504 // data_size_copper_1 = raw_datablk->GetBlockNwords(entry_id);
505 }
506 cpr_num++;
507 delete pre_rawcpr_fmt;
508 }
509 }
510
511#ifndef NO_DATA_CHECK
512 // event #, ctime, utime over nodes
513 for (int l = 1; l < num_nodes_in_sendblock; l++) {
514 if (eve_array[ 0 ] != eve_array[ l ] ||
515 utime_array[ 0 ] != utime_array[ l ] ||
516 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
517 char err_buf[500];
518 for (int m = 0; m < num_nodes_in_sendblock; m++) {
519 printf("[DEBUG] node %d eve # %x utime %x ctime %x\n",
520 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
521 }
522 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
523 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
524 sleep(1234567);
525 exit(-1);
526 }
527 }
528#endif
529
530 // Event # monitor in runchange
531// if (m_prev_runsubrun_no != m_runsubrun_no) {
532// printf("[DEBUG] ##############################################\n");
533// for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
534// printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
535// }
536// printf("[DEBUG] ##############################################\n");
537// fflush(stderr);
538// }
539 m_prev_evenum = cur_evenum;
540 m_prev_copper_ctr = cur_copper_ctr;
542 }
543 return;
544}
unsigned int m_prev_exprunsubrun_no
run no.
Definition: DesSer.h:147
void printData(int *buf, int nwords)
dump error data
Definition: DesSer.cc:505
unsigned int m_exprunsubrun_no
run no.
Definition: DesSer.h:144
The Raw COPPER class ver.1 ( the latest version since May, 2014 ) This class stores data received by ...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check data contents
virtual int GetBlockNwords(int n)
get size of a data block
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBufferPos(int n)
get position of data block in word
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetNumEvents()
get # of events in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
The Raw FTSW class 3 ( 2019.8.20 )
unsigned int GetEveNo(int n) OVERRIDE_CPP17
Get event #.
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
get unixtime of the trigger
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
Get a word containing ctime and trigger type info.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition: RawTLUFormat.h:25
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition: RawTLUFormat.cc:60
unsigned int GetEveNo(int n)
Get Event #.
Definition: RawTLUFormat.cc:37
unsigned int GetExpRunSubrun(int n) OVERRIDE_CPP17
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
Check if COPPER Magic words are correct.
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
get b2l block from "FEE b2link header"

◆ clearNumUsedBuf()

void clearNumUsedBuf ( )
inlineinherited

Definition at line 121 of file DesSer.h.

122 {
123 m_num_usedbuf = 0;
124 return ;
125 }
int m_num_usedbuf
Definition: DesSer.h:132

◆ Connect()

int Connect ( )
protected

Accept connection.

Definition at line 114 of file DesSerPrePC.cc.

115{
116
117 for (int i = 0; i < m_num_connections; i++) {
118
119 if (m_socket_recv[ i ] >= 0) continue; // Already have an established socket
120
121 //
122 // Connect to a downstream node
123 //
124 struct hostent* host;
125 host = gethostbyname(m_hostname_from[ i ].c_str());
126 if (host == NULL) {
127 char err_buf[100];
128 sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
129 strerror(errno));
130 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
131 sleep(1234567);
132 exit(1);
133 }
134
135 struct sockaddr_in socPC;
136 socPC.sin_family = AF_INET;
137 socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
138 socPC.sin_port = htons(m_port_from[ i ]);
139 int sd = socket(PF_INET, SOCK_STREAM, 0);
140 int val1 = 0;
141 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
142
143 struct timeval timeout;
144 timeout.tv_sec = 1;
145 timeout.tv_usec = 0;
146 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)sizeof(timeout));
147
148 printf("[DEBUG] Connecting to %s port %d\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]); fflush(stdout);
149
150 while (1) {
151 if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
152 perror("Failed to connect. Retrying...");
153 usleep(500000);
154 } else {
155 // B2INFO("Done");
156 printf("[DEBUG] Done\n"); fflush(stdout);
157 break;
158 }
159 }
160 // m_socket_recv.push_back(sd);
161 m_socket_recv[ i ] = sd;
162
163 // check socket paramters
164 int val, len;
165 len = sizeof(val);
166 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
167 // B2INFO("SO_RCVBUF" << val);
168 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
169 // B2DEBUG("SO_SNDBUF" << val);
170 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
171 // B2DEBUG("TCP_MAXSEG" << val);
172 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
173 // B2DEBUG("TCP_NODELAY" << val);
174
175 if (m_status.isAvailable()) {
176 sockaddr_in sa;
177 memset(&sa, 0, sizeof(sockaddr_in));
178 socklen_t sa_len = sizeof(sa);
179 if (getsockname(m_socket_recv[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
180 m_status.setInputPort(ntohs(sa.sin_port));
181 m_status.setInputAddress(sa.sin_addr.s_addr);
182 }
183 }
184
185 }
186 // B2INFO("[DEBUG] Initialization finished");
187 printf("[DEBUG] Initialization finished\n"); fflush(stdout);
188 return 0;
189}

◆ DataAcquisition()

void DataAcquisition ( )
overridevirtual

Module functions to be called from event process.

Implements DesSer.

Definition at line 548 of file DesSerPrePC.cc.

549{
550 // For data check
551 unsigned int eve_copper_0 = 0;
552 // B2INFO("initializing...");
553 printf("[DEBUG] initializing...\n"); fflush(stdout);
554 initialize();
555
556 // B2INFO("Done.");
557 printf("[DEBUG] Done.\n"); fflush(stdout);
558
559 if (m_start_flag == 0) {
560 //
561 // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
562 //
563 Connect();
564 if (m_status.isAvailable()) {
565 // B2INFO("DeSerializerPrePC: Waiting for Start...\n");
566 printf("[DEBUG] DeSerializerPrePC: Waiting for Start...\n"); fflush(stdout);
567 m_status.reportRunning();
568 }
569 m_start_time = getTimeSec();
570 n_basf2evt = 0;
571 }
572
573 //
574 // Main loop
575 //
576 while (1) {
577 //
578 // Stand-by loop
579 //
580#ifdef NONSTOP
581 if (m_run_pause > 0 || m_run_error > 0) {
582 waitResume();
583 }
584#endif
585
586 clearNumUsedBuf();
587 // RawDataBlock raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
588 RawDataBlockFormat raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
589
590 //
591 // Recv loop
592 //
593 for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
594 //
595 // Receive data from COPPER
596 //
597 eve_copper_0 = 0;
598 int delete_flag_from =
599 0; // Delete flag for temp_rawdatablk.It can be set to 1 by setRecvdBuffer if the buffer size is larger than that of pre-allocated buffer.
600 int delete_flag_to =
601 0; // Delete flag for raw_datablk[i]. It can be set to 1 by getNewBuffer if the buffer size is larger than that of pre-allocated buffer.
602 RawDataBlockFormat temp_rawdatablk;
603 try {
604 setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
605 checkData(&temp_rawdatablk, &eve_copper_0);
606 } catch (const string& err_str) {
607 printf("Error was detected\n"); fflush(stdout);
608 break;
609 }
610 // PreRawCOPPERFormat_v2 pre_rawcopper_v2;
611 // pre_rawcopper_v2.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
612 // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
614
615 int temp_num_events, temp_num_nodes;
616 int temp_nwords_to;
617 int* buf_to = NULL;
618#ifdef REDUCED_RAWCOPPER
619 //
620 // Copy reduced buffer
621 //
622 // int* buf_to = getNewBuffer(m_pre_rawcpr.CalcReducedDataSize(&temp_rawdatablk), &delete_flag_to); // basf2-dependent style
623 int* temp_bufin = temp_rawdatablk.GetWholeBuffer();
624 int temp_nwords_from = temp_rawdatablk.TotalBufNwords();
625 temp_num_events = temp_rawdatablk.GetNumEvents();
626 temp_num_nodes = temp_rawdatablk.GetNumNodes();
627 int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
628 buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
629
630 // m_pre_rawcpr.CopyReducedData(&temp_rawdatablk, buf_to, delete_flag_from); // basf2-dependent style
631 m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
632 if (calced_temp_nwords_to != temp_nwords_to) {
633 char err_buf[500];
634 sprintf(err_buf,
635 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
636 calced_temp_nwords_to, temp_nwords_to);
637 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
638 exit(1);
639 }
640 m_status.copyEventHeader(buf_to);
641#else
642 buf_to = temp_rawdatablk.GetWholeBuffer();
643 temp_nwords_to = temp_rawdatablk.TotalBufNwords();
644 temp_num_events = temp_rawdatablk.GetNumEvents();
645 temp_num_nodes = temp_rawdatablk.GetNumNodes();
646 delete_flag_to = delete_flag_from;
647 delete_flag_from = 0; // to avoid double delete
648#endif
649
650 //
651 // Set buffer to the RawData class stored in DataStore
652 //
653// raw_datablk[ j ].SetBuffer( (int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
654// delete_flag_to, temp_rawdatablk.GetNumEvents(),
655// temp_rawdatablk.GetNumNodes());
656 raw_datablk[ j ].SetBuffer(buf_to, temp_nwords_to, delete_flag_to, temp_num_events, temp_num_nodes);
657
658
659 //
660 // CRC16 check after data reduction
661 //
662#ifdef REDUCED_RAWCOPPER
663 PostRawCOPPERFormat_v2 post_rawcopper_v2; // Should be the latest version before ver.4(PCIe40)
664
665// post_rawcopper_v2.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
666// 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
667 post_rawcopper_v2.SetBuffer(raw_datablk[ j ].GetWholeBuffer(), raw_datablk[ j ].TotalBufNwords(),
668 0, raw_datablk[ j ].GetNumEvents(), raw_datablk[ j ].GetNumNodes());
669
670 for (int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
671 int block_num = 0;
672 if (post_rawcopper_v2.GetFINESSENwords(block_num, i_finesse_num) > 0) {
673 post_rawcopper_v2.CheckCRC16(block_num, i_finesse_num);
674 }
675 }
676
677#endif
678 }
679
680#ifdef NONSTOP
681 // Goto Stand-by loop when run is paused or stopped by error
682 if (m_run_pause != 0 || m_run_error != 0) continue;
683#endif
684
685
687 // From Serializer.cc
689 if (m_start_flag == 0) {
690 m_start_time = getTimeSec();
691 n_basf2evt = 0;
692 }
693 // StoreArray<RawCOPPER> rawcprarray;
694 // StoreArray<RawDataBlock> raw_dblkarray;
695
696 for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
697
698 //
699 // Send data
700 //
701 if (m_start_flag == 0) {
702 // B2INFO("SerializerPC: Sending the 1st packet...");
703 printf("[DEBUG] SerializerPC: Sending the 1st packet...\n"); fflush(stdout);
704 }
705
706 try {
707 m_sent_totbytes += sendByWriteV(&(raw_datablk[ j ]));
708 } catch (const string& err_str) {
709#ifdef NONSTOP
710 break;
711#endif
712 print_err.PrintError((char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
713 exit(1);
714 }
715 if (m_start_flag == 0) {
716 // B2INFO("Done. ");
717 printf("[DEBUG] Done.\n"); fflush(stdout);
718 m_start_flag = 1;
719 }
720 }
721
722#ifdef NONSTOP
723 // Goto Stand-by loop when run is paused or stopped by error
724 if (m_run_pause != 0 || m_run_error != 0) continue;
725#endif
726
727 //
728 // Monitor
729 //
730 if (max_nevt >= 0 || max_seconds >= 0.) {
731#ifdef AIUEO
732 if (n_basf2evt % 10000 == 0) {
733// if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
734// || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
735 printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
736 max_nevt, max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
737 }
738#endif
739 }
740
741 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC) % 100000 == 0) {
742 double interval = cur_time - m_prev_time;
743 double total_time = cur_time - m_start_time;
744 printf("[DEBUG] Event %12d Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] sent %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s]\n",
745 n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC,
746 (n_basf2evt - m_prev_nevt)*NUM_EVT_PER_BASF2LOOP_PC / interval / 1.e3,
747 (m_recvd_totbytes - m_recvd_prev_totbytes) / interval / 1.e6,
748 (m_sent_totbytes - m_sent_prev_totbytes) / interval / 1.e6,
749 total_time,
750 interval);
751 fflush(stdout);
752
753 m_prev_time = cur_time;
754 m_recvd_prev_totbytes = m_recvd_totbytes;
755 m_sent_prev_totbytes = m_sent_totbytes;
758 }
759
760 n_basf2evt++;
761
762 if (m_status.isAvailable()) {
763 m_status.setOutputNBytes(m_sent_totbytes);
764 m_status.setOutputCount(n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC);
765 }
766
767 }
768
769 return;
770}
int Connect()
Accept connection.
Definition: DesSerPrePC.cc:114
void setRecvdBuffer(RawDataBlockFormat *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
Definition: DesSerPrePC.cc:336
void checkData(RawDataBlockFormat *raw_datablk, unsigned int *eve_copper_0)
check data contents
Definition: DesSerPrePC.cc:381
int m_start_flag
start flag
Definition: DesSer.h:181
int n_basf2evt
No. of sent events.
Definition: DesSer.h:70
PreRawCOPPERFormat_v2 m_pre_rawcpr
report status to SLC
Definition: DesSer.h:191
int m_run_pause
flag to show that run-controller pauses a run
Definition: DesSer.h:227
int m_run_error
flag to show that there is some errors with which DAQ cannot continue.
Definition: DesSer.h:230
int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
Definition: DesSer.cc:77
double cur_time
for time monitoring
Definition: DesSer.h:102
int m_prev_nevt
No. of prev sent events.
Definition: DesSer.h:73
double max_seconds
time to stop a run
Definition: DesSer.h:64
int max_nevt
Definition: DesSer.h:61
double getTimeSec()
store time info.
Definition: DesSer.cc:478
The Raw COPPER class ver.2 This class stores data received by COPPER via belle2link Data from all det...
int CheckCRC16(int n, int finesse_num)
check magic words
int GetFINESSENwords(int n, int finesse) OVERRIDE_CPP17
get data size of FINESSE buffer
void CopyReducedData(int *bufin, int nwords, int num_events, int num_nodes, int *buf_to, int *nwords_to)
reduce and merge header/trailer
int CalcReducedDataSize(int *bufin, int nwords, int num_events, int num_nodes)
reduce and merge header/trailer
The RawDataBlockFormat class Format information for rawdata handling.
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int TotalBufNwords()
Get total length of m_buffer.

◆ fillSendHeaderTrailer()

void fillSendHeaderTrailer ( SendHeader hdr,
SendTrailer trl,
RawDataBlockFormat rawdblk 
)
inherited

Definition at line 171 of file DesSer.cc.

172{
173
174 int total_send_nwords =
175 hdr->GetHdrNwords() +
176 rawdblk->TotalBufNwords() +
177 // rawhdr.GetNwords() +
178 trl->GetTrlNwords();
179
180 hdr->SetNwords(total_send_nwords);
181 hdr->SetNumEventsinPacket(rawdblk->GetNumEvents());
182 hdr->SetNumNodesinPacket(rawdblk->GetNumNodes());
183
184 //
185 // For bug check
186 //
187 if (rawdblk->GetNumEntries() == 1) {
188 if (total_send_nwords != (rawdblk->GetBuffer(0))[ 0 ] + 8) {
189 char err_buf[500];
190 sprintf(err_buf, "Length error. total length %d rawdblk length %d. Exting...\n",
191 total_send_nwords, (rawdblk->GetBuffer(0))[ 0 ]);
192 printData(rawdblk->GetBuffer(0), rawdblk->TotalBufNwords());
193 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
194 sleep(1234567);
195 exit(-1);
196 }
197 }
198
199
200 for (int i = 0; i < rawdblk->GetNumEntries(); i++) {
201
202 //copy event # from a tonp COPPER block
203 if (!(rawdblk->CheckFTSWID(i)) && !(rawdblk->CheckTLUID(i))) {
204 tmp_header.SetBuffer(rawdblk->GetBuffer(i));
205 hdr->SetEventNumber(tmp_header.GetEveNo());
206 hdr->SetNodeID(tmp_header.GetNodeID());
207 hdr->SetExpRunWord(tmp_header.GetExpRunSubrun());
208 break;
209 }
210
211 //Error if you cannot find any COPPER block
212 if (i == (rawdblk->GetNumEntries() - 1)) {
213 printf("[DEBUG] i= %d : num entries %d : Tot words %d\n", i, rawdblk->GetNumEntries(), rawdblk->TotalBufNwords());
214 printData(rawdblk->GetBuffer(0), rawdblk->TotalBufNwords());
215
216 char err_buf[500] = "[FATAL] CORRUPTED DATA: No COPPER blocks in RawDataBlock. Exiting...";
217 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
218 sleep(1234567);
219 exit(-1);
220 }
221 }
222 return;
223}
RawHeader_v2 tmp_header
which format is used
Definition: DesSer.h:323
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
void SetBuffer(int *bufin)
set buffer
Definition: RawHeader_v2.h:47
void SetNumEventsinPacket(int num_events)
set contents of Header
Definition: SendHeader.cc:61
int GetHdrNwords()
get contents of Header
Definition: SendHeader.cc:124
void SetNwords(int total_data_nwords)
initialize Header
Definition: SendHeader.cc:51
unsigned int GetEveNo()
get restart #(8bit)
Definition: RawHeader_v2.h:396
unsigned int GetNodeID()
get contents of header
Definition: RawHeader_v2.h:402
unsigned int GetExpRunSubrun()
get contents of header
Definition: RawHeader_v2.h:389

◆ getNewBuffer()

int * getNewBuffer ( int  nwords,
int *  delete_flag 
)
inherited

Getbuffer.

Definition at line 77 of file DesSer.cc.

78{
79
80 int* temp_buf = NULL;
81 // Prepare buffer
82 if (nwords > BUF_SIZE_WORD) {
83 *delete_flag = 1;
84 temp_buf = new int[ nwords ];
85 } else {
86 if ((temp_buf = getPreAllocBuf()) == 0x0) {
87 char err_buf[500];
88 sprintf(err_buf, "Null pointer from GetPreALlocBuf(). Exting...\n");
89 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
90 sleep(1234567);
91 exit(1);
92 } else {
93 *delete_flag = 0;
94 }
95 }
96
97 return temp_buf;
98
99}
int * getPreAllocBuf()
Getbuffer.
Definition: DesSer.cc:57

◆ getPreAllocBuf()

int * getPreAllocBuf ( )
inherited

Getbuffer.

Definition at line 57 of file DesSer.cc.

58{
59 int* tempbuf = 0;
60 if (m_num_usedbuf < NUM_PREALLOC_BUF) {
61 tempbuf = m_bufary[ m_num_usedbuf ];
63 } else {
64 char err_buf[500];
65 sprintf(err_buf,
66 "No pre-allocated buffers are left. %d > %d. Not enough buffers are allocated or "
67 "memory leak or forget to call ClearNumUsedBuf every event loop. Exting...",
68 m_num_usedbuf, NUM_PREALLOC_BUF);
69 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
70 sleep(1234567);
71 exit(1);
72 }
73 return tempbuf;
74}
int * m_bufary[NUM_PREALLOC_BUF]
buffer
Definition: DesSer.h:85

◆ getTimeSec()

double getTimeSec ( )
inherited

store time info.

Definition at line 478 of file DesSer.cc.

479{
480 struct timeval t;
481 gettimeofday(&t, NULL);
482 return (t.tv_sec + t.tv_usec * 1.e-6);
483}

◆ initialize()

void initialize ( bool  close_listen = true)
inherited

Definition at line 101 of file DesSer.cc.

102{
103 printf("[DEBUG] DesSer: initialize() started.\n"); fflush(stdout);
104 signal(SIGPIPE, SIG_IGN);
105
106 //
107 // initialize Rx part from DeSerializer**.cc
108 //
109
110 // allocate buffer
111 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
112 m_bufary[i] = new int[ BUF_SIZE_WORD ];
113 }
114 m_buffer = new int[ BUF_SIZE_WORD ];
115
116 // initialize buffer
117 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
118 memset(m_bufary[i], 0, BUF_SIZE_WORD * sizeof(int));
119 }
120
121 // Open message handler
122 clearNumUsedBuf();
123 // Shared memory
124 if (m_shmflag > 0) {
125 if (m_nodename.size() == 0 || m_nodeid < 0) {
126 m_shmflag = 0;
127 } else {
129 m_status.reportReady();
130 }
131 }
132
133 event_diff = 0;
134
135 m_prev_copper_ctr = 0xFFFFFFFF;
136 m_prev_evenum = 0xFFFFFFFF;
137
138
139 //
140 // initialize Rx part from DeSerializer**.cc
141 //
142 m_start_flag = 0;
143 n_basf2evt = -1;
145
146#ifdef DUMMY
147 m_buffer = new int[ BUF_SIZE_WORD ];
148#endif
149 Accept(close_listen);
150#ifdef NONSTOP
151 openRunPauseNshm();
152#endif
153
154
155 // For monitor
156 if (m_status.isAvailable()) {
157 m_status.setOutputNBytes(0);
158 m_status.setOutputCount(0);
159 }
160 // B2INFO("DesSer: initialize() was done.");
161 printf("[DEBUG] DesSer: initialize() was done.\n"); fflush(stdout);
162
163}
int * m_buffer
buffer
Definition: DesSer.h:82
int m_compressionLevel
Compression Level.
Definition: DesSer.h:67
int m_nodeid
Node ID for SLC.
Definition: DesSer.h:138

◆ printData()

void printData ( int *  buf,
int  nwords 
)
inherited

dump error data

Definition at line 505 of file DesSer.cc.

506{
507 printf("[DEBUG]");
508 for (int i = 0; i < nwords; i++) {
509 printf("%.8x ", buf[ i ]);
510 if (i % 10 == 9) printf("\n[DEBUG]");
511 }
512 printf("\n[DEBUG]");
513 printf("\n");
514 return;
515}

◆ recordTime()

void recordTime ( int  event,
double *  array 
)
inherited

store time info.

Definition at line 486 of file DesSer.cc.

487{
488 if (event >= 50000 && event < 50500) {
489 array[ event - 50000 ] = getTimeSec() - m_start_time;
490 }
491 return;
492}

◆ recvData()

int * recvData ( int *  delete_flag,
int *  total_m_size_word,
int *  num_events_in_sendblock,
int *  num_nodes_in_sendblock 
)
protected

receive data

Definition at line 193 of file DesSerPrePC.cc.

194{
195 int* temp_buf = NULL; // buffer for data-body
196 int flag = 0;
197
198 vector <int> each_buf_nwords;
199 each_buf_nwords.clear();
200 vector <int> each_buf_nodes;
201 each_buf_nodes.clear();
202 vector <int> each_buf_events;
203 each_buf_events.clear();
204
205 *total_buf_nwords = 0;
206 *num_nodes_in_sendblock = 0;
207 *num_events_in_sendblock = 0;
208
209 //
210 // Read Header and obtain data size
211 //
212 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
213
214 // Read header
215 for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
216
217 recvFD(m_socket_recv[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
218
219 SendHeader send_hdr;
220 send_hdr.SetBuffer(send_hdr_buf);
221
222 int temp_num_events = send_hdr.GetNumEventsinPacket();
223 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
224
225 if (i == 0) {
226 *num_events_in_sendblock = temp_num_events;
227 } else if (*num_events_in_sendblock != temp_num_events) {
228#ifndef NO_DATA_CHECK
229 char err_buf[500];
230 sprintf(err_buf,
231 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
232 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
233 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
234 sleep(1234567);
235 exit(1);
236#endif
237 }
238
239 *num_nodes_in_sendblock += temp_num_nodes;
240
241 int rawblk_nwords = send_hdr.GetTotalNwords()
242 - SendHeader::SENDHDR_NWORDS
243 - SendTrailer::SENDTRL_NWORDS;
244 *total_buf_nwords += rawblk_nwords;
245
246 //
247 // Data size check1
248 //
249 if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
250 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
251 char err_buf[500];
252 sprintf(err_buf, "CORRUPTED DATA: Too large event : Header %d %d %d %d :block size %d words\n", i, temp_num_events, temp_num_nodes,
253 send_hdr.GetTotalNwords(), rawblk_nwords);
254 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
255 sleep(123456);
256 exit(1);
257
258 }
259
260 each_buf_nwords.push_back(rawblk_nwords);
261 each_buf_events.push_back(temp_num_events);
262 each_buf_nodes.push_back(temp_num_nodes);
263
264 }
265
266
267 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
268 //
269 // Read body
270 //
271 int total_recvd_byte = 0;
272 for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
273
274 try {
275 total_recvd_byte += recvFD(m_socket_recv[ i ], (char*)temp_buf + total_recvd_byte,
276 each_buf_nwords[ i ] * sizeof(int), flag);
277 } catch (const string& err_str) {
278 if (*delete_flag) {
279 // B2WARNING("Delete buffer before going to Run-pause state");
280 printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
281 delete temp_buf;
282 }
283 throw (std::move(err_str));
284 }
285 //
286 // Data length check
287 //
288 int temp_length = 0;
289 for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
290 int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
291 temp_length += this_length * sizeof(int);
292 }
293 if (temp_length != (int)(each_buf_nwords[ i ] * sizeof(int))) {
294 printf("[DEBUG]*******SENDHDR*********** \n");
295 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
296 printf("[DEBUG]*******BODY***********\n ");
297 printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
298 char err_buf[500];
299 sprintf(err_buf, "CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
300 (int)(*total_buf_nwords * sizeof(int)), temp_length);
301 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
302 sleep(1234567);
303 exit(-1);
304 }
305
306 }
307
308 if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
309 char err_buf[500];
310 sprintf(err_buf, "CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
311 total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
312 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
313 sleep(1234567);
314 exit(-1);
315 }
316
317 // Read Traeiler
318 int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
319 for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
320 try {
321 recvFD(m_socket_recv[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
322 } catch (const string& err_str) {
323 if (*delete_flag) {
324 // B2WARNING("Delete buffer before going to Run-pause state");
325 printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
326 delete temp_buf;
327 }
328 throw (std::move(err_str));
329 }
330 }
331
332 return temp_buf;
333}
int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
Definition: DesSerPrePC.cc:60
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125

◆ recvFD()

int recvFD ( int  fd,
char *  buf,
int  data_size_byte,
int  flag 
)
protected

receive data

Definition at line 60 of file DesSerPrePC.cc.

61{
62 int n = 0;
63 while (1) {
64 int read_size = 0;
65 if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n, flag)) < 0) {
66 if (errno == EINTR) {
67 continue;
68 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
69#ifdef NONSTOP
70 string err_str;
71 callCheckRunPause(err_str);
72#endif
73 continue;
74 } else {
75 perror("[WARNING]");
76 char err_buf[500];
77 sprintf(err_buf, "recv() returned error; ret = %d. : %s %s %d",
78 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
79#ifdef NONSTOP
80 m_run_error = 1;
81 // B2ERROR(err_buf);
82 printf("[WARNING] %s\n", err_buf); fflush(stdout);
83 string err_str = "RUN_ERROR";
84 printf("AIUEO********************\n"); fflush(stdout);
85 throw (err_str);
86#endif
87 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
88 exit(-1);
89 }
90 } else if (read_size == 0) {
91 // Connection is closed ( error )
92 char err_buf[500];
93 sprintf(err_buf, "[WARNING] Connection is closed by peer(%s). readsize = %d %d. : %s %s %d",
94 strerror(errno), read_size, errno, __FILE__, __PRETTY_FUNCTION__, __LINE__);
95#ifdef NONSTOP
96 m_run_error = 1;
97 // B2ERROR(err_buf);
98 printf("%s\n", err_buf); fflush(stdout);
99 string err_str = "RUN_ERROR";
100 throw (err_str);
101#else
102 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
103 exit(-1);
104#endif
105 } else {
106 n += read_size;
107 if (n == data_size_byte)break;
108 }
109 }
110 return n;
111}

◆ Send()

int Send ( int  socket,
char *  buf,
int  size_bytes 
)
inherited

send buffer

Definition at line 328 of file DesSer.cc.

329{
330 int sent_bytes = 0;
331 while (true) {
332 int ret = 0;
333 if ((ret = send(socket,
334 buf + sent_bytes, size_bytes - sent_bytes, MSG_NOSIGNAL)) < 0) {
335 if (errno == EINTR) {
336 continue;
337 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
338#ifdef NONSTOP
339 string err_str;
340 callCheckRunPause(err_str);
341#endif
342 continue;
343 } else {
344 char err_buf[500];
345 sprintf(err_buf, "[WARNING] SEND ERROR.(%s)", strerror(errno));
346#ifdef NONSTOP
347 m_run_error = 1;
348 // B2ERROR(err_buf);
349 printf("%s\n", err_buf); fflush(stdout);
350 string err_str = "RUN_ERROR";
351 throw (err_str); // To exit this module, go to DeSerializer** and wait for run-resume.
352#else
353 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
354 exit(1);
355#endif
356 }
357 }
358 sent_bytes += ret;
359 if (sent_bytes == size_bytes) break;
360 }
361 return sent_bytes;
362}

◆ sendByWriteV()

int sendByWriteV ( RawDataBlockFormat rawdblk)
inherited

Definition at line 227 of file DesSer.cc.

228{
229 SendHeader send_header;
230 SendTrailer send_trailer;
231 fillSendHeaderTrailer(&send_header, &send_trailer, rawdblk);
232
233 enum {
234 NUM_BUFFER = 3
235 };
236 struct iovec iov[ NUM_BUFFER ];
237
238 // check Body data size
239 int rawcopper_nwords = rawdblk->TotalBufNwords();
240
241 //Fill iov info.
242 iov[0].iov_base = (char*)send_header.GetBuffer();
243 iov[0].iov_len = sizeof(int) * send_header.GetHdrNwords();
244
245 iov[1].iov_base = (char*)rawdblk->GetWholeBuffer();
246 iov[1].iov_len = sizeof(int) * rawcopper_nwords;
247
248 iov[2].iov_base = (char*)send_trailer.GetBuffer();
249 iov[2].iov_len = sizeof(int) * send_trailer.GetTrlNwords();
250
251 // Send Multiple buffers
252 int n = 0;
253 while (true) {
254 if ((n = writev(m_socket_send, iov, NUM_BUFFER)) < 0) {
255 if (errno == EINTR) {
256 continue;
257 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
258#ifdef NONSTOP
259 string err_str;
260 callCheckRunPause(err_str);
261#endif
262 continue;
263 } else {
264 char err_buf[500];
265 sprintf(err_buf, "[WARNING] WRITEV error.(%s) : sent %d bytes, header %lu bytes body %lu trailer %lu\n",
266 strerror(errno), n, iov[0].iov_len, iov[1].iov_len, iov[2].iov_len);
267#ifdef NONSTOP
268 m_run_error = 1;
269 // B2ERROR(err_buf);
270 printf("%s\n", err_buf); fflush(stdout);
271 string err_str = "RUN_ERROR";
272 throw (err_str); // To exit this module, go to DeSerializer** and wait for run-resume.
273#else
274 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
275 exit(1);
276#endif
277 }
278 }
279 break;
280 }
281
282#ifdef DEBUG
283 printf("[DEBUG] *******BODY**********\n");
284 printf("[DEBUG] \n%.8d : ", 0);
285 printData((int*)(iov[1].iov_base), iov[1].iov_len);
286#endif
287
288
289 int total_send_bytes = sizeof(int) * send_header.GetTotalNwords();
290
291
292 //
293 // Retry sending
294 //
295 if (n != total_send_bytes) {
296 // B2WARNING("Serializer: Sent byte(" << n << "bytes) is not same as the event size (" << total_send_bytes << "bytes). Retrying...");
297 printf("[WARNING] Serializer: Sent byte( %d bytes) is not same as the event size ( %d bytes). Retrying...\n", n,
298 total_send_bytes); // back to WARNING due to request from Konno-san on 2018/7/3
299 fflush(stdout);
300
301 double retry_start = getTimeSec();
302 // Send Header
303 if (n < (int)(iov[ 0 ].iov_len)) {
304 n += Send(m_socket_send, (char*)iov[ 0 ].iov_base + n, iov[ 0 ].iov_len - n);
305 }
306
307 if (n < (int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len)) {
308 n += Send(m_socket_send, (char*)iov[ 1 ].iov_base + (n - iov[ 0 ].iov_len), iov[ 1 ].iov_len - (n - iov[ 0 ].iov_len));
309 }
310
311 if (n < (int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len + iov[ 2 ].iov_len)) {
312 n += Send(m_socket_send, (char*)iov[ 2 ].iov_base + (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len),
313 iov[ 2 ].iov_len - (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len));
314 }
315 double retry_end = getTimeSec();
316 // B2WARNING("Resending ends. It takes " << retry_end - retry_start << "(s)");
317 printf("[WARNING] Resending ends. It takes %lf (s)\n", retry_end - retry_start);
318 fflush(stdout); // back to WARNING due to request from Konno-san on 2018/7/3
319 }
320 // printf( "[DEBUG] n %d total %d\n", n, total_send_bytes);
321 // delete temp_buf;
322
323 return total_send_bytes;
324
325}
int Send(int socket, char *buf, int size_bytes)
send buffer
Definition: DesSer.cc:328
int * GetBuffer(void)
Get Header contents.
Definition: SendHeader.cc:32

◆ setRecvdBuffer()

void setRecvdBuffer ( RawDataBlockFormat raw_datablk,
int *  delete_flag 
)
protected

attach buffer to RawDataBlock

Definition at line 336 of file DesSerPrePC.cc.

337{
338 //
339 // Get data from socket
340 //
341 int total_buf_nwords = 0 ;
342 int num_events_in_sendblock = 0;
343 int num_nodes_in_sendblock = 0;
344
345 if (m_start_flag == 0) {
346 // B2INFO("DeSerializerPrePC: Reading the 1st packet from eb0...");
347 printf("DeSerializerPrePC: Reading the 1st packet from eb0...\n"); fflush(stdout);
348 }
349 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
350 &num_nodes_in_sendblock);
351 if (m_start_flag == 0) {
352 // B2INFO("DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords << " words");
353 printf("DeSerializerPrePC: Done. the size of the 1st packet %d words\n", total_buf_nwords); fflush(stdout);
354 m_start_flag = 1;
355 }
356 m_recvd_totbytes += total_buf_nwords * sizeof(int);
357
358 temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
359 num_events_in_sendblock, num_nodes_in_sendblock);
360
361 //
362 // check even # and node # in one Sendblock
363 //
364 int num_entries = temp_raw_datablk->GetNumEntries();
365 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
366 char err_buf[500];
367 sprintf(err_buf,
368 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
369 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
370 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
371 sleep(1234567);
372 exit(-1);
373 }
374 return;
375
376}
int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
Definition: DesSerPrePC.cc:193

◆ shmOpen()

void shmOpen ( char *  path_cfg,
char *  path_sta 
)
inherited

open shared memory

Definition at line 679 of file DesSer.cc.

681{
682 errno = 0;
683 /*m_shmfd_cfg = shm_open( "/cpr_config2", O_CREAT | O_EXCL | O_RDWR, 0666);
684 if (m_shmfd_cfg < 0) {
685 if (errno != EEXIST) {
686 perror("shm_open1");
687 exit(1);
688 }
689 m_shmfd_cfg = shm_open(path_cfg, O_RDWR, 0666);
690 if (m_shmfd_cfg < 0) {
691 printf( "[DEBUG] %s\n", path_cfg);
692 perror("[ERROR] shm_open2");
693 exit(1);
694 }
695 */
696 //}
697 /*
698 m_shmfd_sta = shm_open( "/cpr_status2", O_CREAT | O_EXCL | O_RDWR, 0666);
699 if (m_shmfd_sta < 0) {
700 if (errno != EEXIST) {
701 perror("shm_open1");
702 exit(1);
703 }
704 m_shmfd_sta = shm_open(path_sta , O_RDWR, 0666);
705 if (m_shmfd_sta < 0) {
706 printf( "[DEBUG] %s\n", path_sta);
707 perror("[ERROR] shm_open2");
708 exit(1);
709 }
710 //}
711 int size = 4 * sizeof(int);
712 ftruncate(m_shmfd_cfg, size);
713 ftruncate(m_shmfd_sta, size);
714 */
715}

Member Data Documentation

◆ cur_time

double cur_time
inherited

for time monitoring

Definition at line 102 of file DesSer.h.

◆ event_diff

int event_diff
protectedinherited

Definition at line 256 of file DesSer.h.

◆ m_bufary

int* m_bufary[NUM_PREALLOC_BUF]
inherited

buffer

Definition at line 85 of file DesSer.h.

◆ m_buffer

int* m_buffer
inherited

buffer

Definition at line 82 of file DesSer.h.

◆ m_cfg_buf

int* m_cfg_buf
inherited

buffer for shared memory

Definition at line 221 of file DesSer.h.

◆ m_cfg_sta

int* m_cfg_sta
inherited

buffer for shared memory

Definition at line 224 of file DesSer.h.

◆ m_compressionLevel

int m_compressionLevel
inherited

Compression Level.

Definition at line 67 of file DesSer.h.

◆ m_data_type

int m_data_type
inherited

data type

Definition at line 153 of file DesSer.h.

◆ m_dump_fname

std::string m_dump_fname
inherited

dump filename

Definition at line 76 of file DesSer.h.

◆ m_exp_no

int m_exp_no
inherited

exp no.

Definition at line 150 of file DesSer.h.

◆ m_exprunsubrun_no

unsigned int m_exprunsubrun_no
inherited

run no.

Definition at line 144 of file DesSer.h.

◆ m_fp_dump

FILE* m_fp_dump
inherited

dump file descripter

Definition at line 79 of file DesSer.h.

◆ m_hostname_from

std::vector<std::string> m_hostname_from
protectedinherited

Reciever basf2 Socket.

hostname of upstream Data Sources

Definition at line 243 of file DesSer.h.

◆ m_hostname_local

std::string m_hostname_local
protectedinherited

Destination Host.

Definition at line 275 of file DesSer.h.

◆ m_ncycle

int m_ncycle
inherited

Definition at line 100 of file DesSer.h.

◆ m_nodeid

int m_nodeid
inherited

Node ID for SLC.

Definition at line 138 of file DesSer.h.

◆ m_nodename

std::string m_nodename
inherited

Node Name for SLC.

Definition at line 135 of file DesSer.h.

◆ m_num_connections

int m_num_connections
protectedinherited

check data contents

of connections

Definition at line 237 of file DesSer.h.

◆ m_num_usedbuf

int m_num_usedbuf
inherited

of already used buffers

Definition at line 132 of file DesSer.h.

◆ m_port_from

std::vector<int> m_port_from
protectedinherited

port # to connect data sources

Definition at line 246 of file DesSer.h.

◆ m_port_to

int m_port_to
protectedinherited

Destination port.

Definition at line 278 of file DesSer.h.

◆ m_pre_rawcpr

PreRawCOPPERFormat_v2 m_pre_rawcpr
inherited

report status to SLC

Use ver.2 for the moment(ver.4 -> PCI40)

Definition at line 191 of file DesSer.h.

◆ m_prev_copper_ctr

unsigned int m_prev_copper_ctr
protectedinherited

Definition at line 258 of file DesSer.h.

◆ m_prev_evenum

unsigned int m_prev_evenum
protectedinherited

Definition at line 260 of file DesSer.h.

◆ m_prev_exprunsubrun_no

unsigned int m_prev_exprunsubrun_no
inherited

run no.

Definition at line 147 of file DesSer.h.

◆ m_prev_nevt

int m_prev_nevt
inherited

No. of prev sent events.

Definition at line 73 of file DesSer.h.

◆ m_prev_time

double m_prev_time
inherited

Definition at line 105 of file DesSer.h.

◆ m_recvd_prev_totbytes

double m_recvd_prev_totbytes
inherited

Definition at line 97 of file DesSer.h.

◆ m_recvd_totbytes

double m_recvd_totbytes
inherited

Definition at line 96 of file DesSer.h.

◆ m_run_error

int m_run_error
inherited

flag to show that there is some errors with which DAQ cannot continue.

Definition at line 230 of file DesSer.h.

◆ m_run_pause

int m_run_pause
inherited

flag to show that run-controller pauses a run

Definition at line 227 of file DesSer.h.

◆ m_sent_prev_totbytes

double m_sent_prev_totbytes
inherited

Definition at line 99 of file DesSer.h.

◆ m_sent_totbytes

double m_sent_totbytes
inherited

Definition at line 98 of file DesSer.h.

◆ m_shmfd_cfg

int m_shmfd_cfg
inherited

file descripter for shm

Definition at line 168 of file DesSer.h.

◆ m_shmfd_sta

int m_shmfd_sta
inherited

file descripter for shm

Definition at line 171 of file DesSer.h.

◆ m_shmflag

int m_shmflag
inherited

Use shared memory.

Definition at line 159 of file DesSer.h.

◆ m_socket_recv

std::vector<int> m_socket_recv
protectedinherited

Definition at line 248 of file DesSer.h.

◆ m_socket_send

int m_socket_send
protectedinherited

Reciever Socket.

Definition at line 267 of file DesSer.h.

◆ m_start_flag

int m_start_flag
inherited

start flag

Definition at line 181 of file DesSer.h.

◆ m_start_time

double m_start_time
inherited

Definition at line 104 of file DesSer.h.

◆ m_status

RunInfoBuffer m_status
inherited

Run info buffer.

Definition at line 141 of file DesSer.h.

◆ m_t0

timeval m_t0
inherited

Definition at line 95 of file DesSer.h.

◆ m_trunc_mask

int m_trunc_mask
inherited

trunc mask

Definition at line 156 of file DesSer.h.

◆ max_nevt

int max_nevt
inherited

of events in a run

Definition at line 61 of file DesSer.h.

◆ max_seconds

double max_seconds
inherited

time to stop a run

Definition at line 64 of file DesSer.h.

◆ monitor_numeve

int monitor_numeve
inherited

buffer for shared memory

buffer for shared memory

Definition at line 178 of file DesSer.h.

◆ n_basf2evt

int n_basf2evt
inherited

No. of sent events.

Definition at line 70 of file DesSer.h.

◆ p_method

std::string p_method
protectedinherited

EvtSocket.

How to handle data

Definition at line 284 of file DesSer.h.

◆ p_method_val

int p_method_val
protectedinherited

Definition at line 285 of file DesSer.h.

◆ prev_event

int prev_event
inherited

Definition at line 106 of file DesSer.h.

◆ print_err

CprErrorMessage print_err
inherited

wrapper for B2LOG system

Definition at line 184 of file DesSer.h.

◆ tmp_header

RawHeader_v2 tmp_header
protectedinherited

which format is used

Definition at line 323 of file DesSer.h.


The documentation for this class was generated from the following files: