9#include <daq/rawdata/modules/DeSerializerPC.h> 
   10#include <daq/dataobjects/SendHeader.h> 
   11#include <daq/dataobjects/SendTrailer.h> 
   12#include <rawdata/dataobjects/RawTLU.h> 
   15#include <netinet/tcp.h> 
   22#define USE_DESERIALIZER_PREPC 
   40#ifndef REDUCED_RAWCOPPER 
   41#ifdef USE_DESERIALIZER_PREPC 
   56  B2INFO(
"DeSerializerPC: Constructor done.");
 
   63DeSerializerPCModule::~DeSerializerPCModule()
 
   71  B2INFO(
"DeSerializerPC: initialize() started.");
 
   75    B2FATAL(
"[FATAL] Hostname or port# is not specified for all connections. Please check a python script. Exiting... \n");
 
   83  for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
 
   90  for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
 
   95  m_eventMetaDataPtr.registerInDataStore();
 
   97  raw_datablkarray.registerInDataStore();
 
   98  rawcprarray.registerInDataStore();
 
   99  raw_ftswarray.registerInDataStore();
 
  109  memset(time_array0, 0, 
sizeof(time_array0));
 
  110  memset(time_array1, 0, 
sizeof(time_array1));
 
  111  memset(time_array2, 0, 
sizeof(time_array2));
 
  127  m_prev_copper_ctr = 0xFFFFFFFF;
 
  128  m_prev_evenum = 0xFFFFFFFF;
 
  134  B2INFO(
"DeSerializerPC: initialize() done.");
 
  143    if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n, flag)) < 0) {
 
  144      if (errno == EINTR) {
 
  146      } 
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
 
  150        callCheckRunPause(err_str);
 
  155        sprintf(err_buf, 
"[WARNING] recv() returned error; ret = %d. : %s %s %d",
 
  156                read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
 
  160        string err_str = 
"RUN_ERROR";
 
  166    } 
else if (read_size == 0) {
 
  168      sprintf(err_buf, 
"[WARNING] Connection is closed by peer(%s).:  %s %s %d",
 
  169              strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
 
  173      string err_str = 
"RUN_ERROR";
 
  180      if (n == data_size_byte)
break;
 
  193    struct sockaddr_in socPC;
 
  194    socPC.sin_family = AF_INET;
 
  196    struct hostent* host;
 
  200      sprintf(err_buf, 
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", 
m_hostname_from[ i ].c_str(),
 
  206    socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
 
  208    int sd = socket(PF_INET, SOCK_STREAM, 0);
 
  211    setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, 
sizeof(val1));
 
  213    struct timeval timeout;
 
  216    setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)
sizeof(timeout));
 
  220      if (connect(sd, (
struct sockaddr*)(&socPC), 
sizeof(socPC)) < 0) {
 
  221        perror(
"Failed to connect. Retrying...");
 
  224        printf(
"[DEBUG] Done\n");
 
  235    getsockopt(
m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
 
  237    printf(
"[DEBUG] SO_RCVBUF %d\n", val);
 
  239    getsockopt(
m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
 
  241    printf(
"[DEBUG] SO_SNDBUF %d\n", val);
 
  243    getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
 
  245    printf(
"[DEBUG] TCP_MAXSEG %d\n", val);
 
  247    getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
 
  249    printf(
"[DEBUG] TCP_NODELAY %d\n", val);
 
  253      memset(&sa, 0, 
sizeof(sockaddr_in));
 
  254      socklen_t sa_len = 
sizeof(sa);
 
  255      if (getsockname(
m_socket[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
 
  256        g_status.setInputPort(ntohs(sa.sin_port));
 
  257        g_status.setInputAddress(sa.sin_addr.s_addr);
 
  262  printf(
"[DEBUG] Initialization finished\n");
 
  269                                    int* num_nodes_in_sendblock)
 
  272  int* temp_buf = NULL; 
 
  275  vector <int> each_buf_nwords;
 
  276  each_buf_nwords.clear();
 
  277  vector <int> each_buf_nodes;
 
  278  each_buf_nodes.clear();
 
  279  vector <int> each_buf_events;
 
  280  each_buf_events.clear();
 
  282  *total_buf_nwords = 0;
 
  283  *num_nodes_in_sendblock = 0;
 
  284  *num_events_in_sendblock = 0;
 
  289  int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
 
  292  for (
int i = 0; i < (int)(
m_socket.size()); i++) {
 
  294    recvFD(
m_socket[ i ], (
char*)send_hdr_buf, 
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
 
  300    int temp_num_nodes = send_hdr.GetNumNodesinPacket();
 
  305      *num_events_in_sendblock = temp_num_events;
 
  306    } 
else if (*num_events_in_sendblock != temp_num_events) {
 
  309      printf(
"[DEBUG] *******HDR**********\n");
 
  310      printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
 
  313              "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
 
  314              *num_events_in_sendblock, temp_num_events, i,  *num_nodes_in_sendblock, temp_num_nodes, i);
 
  321    *num_nodes_in_sendblock += temp_num_nodes;
 
  323    int rawblk_nwords = send_hdr.GetTotalNwords()
 
  324                        - SendHeader::SENDHDR_NWORDS
 
  325                        - SendTrailer::SENDTRL_NWORDS;
 
  326    *total_buf_nwords += rawblk_nwords;
 
  331    if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
 
  332      printf(
"[DEBUG] *******HDR**********\n");
 
  337      sprintf(err_buf, 
"[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
 
  338              send_hdr.GetTotalNwords());
 
  345    each_buf_nwords.push_back(rawblk_nwords);
 
  346    each_buf_events.push_back(temp_num_events);
 
  347    each_buf_nodes.push_back(temp_num_nodes);
 
  352  temp_buf = 
getNewBuffer(*total_buf_nwords, delete_flag); 
 
  356  int total_recvd_byte = 0;
 
  357  for (
int i = 0; i < (int)(
m_socket.size()); i++) {
 
  360      total_recvd_byte += 
recvFD(
m_socket[ i ], (
char*)temp_buf + total_recvd_byte,
 
  361                                 each_buf_nwords[ i ] * 
sizeof(
int), flag);
 
  362    } 
catch (
const string& err_str) {
 
  364        B2WARNING(
"Delete buffer before going to Run-pause state");
 
  374    for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
 
  375      int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * 
sizeof(int) + temp_length));
 
  376      temp_length += this_length * 
sizeof(int);
 
  378    if (temp_length != (
int)(each_buf_nwords[ i ] * 
sizeof(int))) {
 
  379      printf(
"[DEBUG]*******SENDHDR*********** \n");
 
  380      printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
 
  381      printf(
"[DEBUG]*******BODY***********\n ");
 
  382      printData(temp_buf, (
int)(total_recvd_byte / 
sizeof(
int)));
 
  384      sprintf(err_buf, 
"[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
 
  385              (
int)(*total_buf_nwords * 
sizeof(
int)), temp_length);
 
  393  if ((
int)(*total_buf_nwords * 
sizeof(
int)) != total_recvd_byte) {
 
  395    sprintf(err_buf, 
"[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
 
  396            total_recvd_byte, (
int)(*total_buf_nwords * 
sizeof(
int)));
 
  403  int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
 
  404  for (
int i = 0; i < (int)(
m_socket.size()); i++) {
 
  406      recvFD(
m_socket[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * 
sizeof(
int), flag);
 
  407    } 
catch (
const string& err_str) {
 
  409        B2WARNING(
"Delete buffer before going to Run-pause state");
 
  425  int total_buf_nwords = 0 ;
 
  426  int num_events_in_sendblock = 0;
 
  427  int num_nodes_in_sendblock = 0;
 
  429  if (
m_start_flag == 0) B2INFO(
"DeSerializerPC: Reading the 1st packet from eb0...");
 
  431  int* temp_buf = 
recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
 
  432                           &num_nodes_in_sendblock);
 
  434    B2INFO(
"DeSerializerPC: Done. the size of the 1st packet " << total_buf_nwords << 
" words");
 
  437  m_totbytes += total_buf_nwords * 
sizeof(int);
 
  442  temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, 0,
 
  444                              num_events_in_sendblock, num_nodes_in_sendblock);
 
  450  if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
 
  453            "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
 
  454            num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
 
  466                                     unsigned int* subrun_copper_0, 
unsigned int* eve_copper_0, 
unsigned int* error_bit_flag)
 
  475  int* temp_buf = raw_datablk->
GetBuffer(0);
 
  477  unsigned int cur_evenum = 0, cur_copper_ctr = 0;
 
  478  unsigned int eve_array[32]; 
 
  479  unsigned int utime_array[32];
 
  480  unsigned int ctime_type_array[32];
 
  483  unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
 
  487    memset(eve_array, 0, 
sizeof(eve_array));
 
  488    memset(utime_array, 0, 
sizeof(utime_array));
 
  489    memset(ctime_type_array, 0, 
sizeof(ctime_type_array));
 
  491    int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
 
  492    for (
int l = 0; l < num_nodes_in_sendblock; l++) {
 
  493      int entry_id = l + k * num_nodes_in_sendblock;
 
  502        if (temp_rawftsw->
GetEveNo(block_id) < 10) {
 
  503          printf(
"[DEBUG] ######FTSW#########\n");
 
  510        utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
 
  517          eve_array[ entry_id ] = cur_evenum;
 
  518        } 
catch (
const string& err_str) {
 
  523        utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
 
  530      } 
else if (raw_datablk->
CheckTLUID(entry_id)) {
 
  537          printf(
"[DEBUG] ######TLU#########\n");
 
  543          temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
 
  544          eve_array[ entry_id ] = cur_evenum;
 
  545        } 
catch (
const string& err_str) {
 
  564        (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
 
  565        (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
 
  566        (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
 
  572          temp_rawcopper->
CheckData(0, m_prev_evenum, &cur_evenum,
 
  573                                    m_prev_copper_ctr, &cur_copper_ctr,
 
  575          eve_array[ entry_id ] = cur_evenum;
 
  576        } 
catch (
const string& err_str) {
 
  583        utime_array[ entry_id ] = temp_rawcopper->
GetTTUtime(0);
 
  589          *eve_copper_0 = temp_rawcopper->
GetEveNo(0);
 
  590          *exp_copper_0 = temp_rawcopper->
GetExpNo(0);
 
  591          *run_copper_0 = temp_rawcopper->
GetRunNo(0);
 
  593        } 
else if (cpr_num == 1) {
 
  601        delete temp_rawcopper;
 
  607    for (
int l = 1; l < num_nodes_in_sendblock; l++) {
 
  608      if (eve_array[ 0 ] != eve_array[ l ]) {
 
  613        for (
int m = 0; m < num_nodes_in_sendblock; m++) {
 
  614          printf(
"[DEBUG] node %d eve # %u utime %x ctime %x\n",
 
  615                 m,  eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
 
  617        sprintf(err_buf, 
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
 
  627      printf(
"[DEBUG] ##############################################\n");
 
  629        printf(
"[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
 
  631      printf(
"[DEBUG] ##############################################\n");
 
  634    m_prev_evenum = cur_evenum;
 
  635    m_prev_copper_ctr = cur_copper_ctr;
 
  642void DeSerializerPCModule::waitResume()
 
  647    printf(
"###########(Ser) Waiting for Resume ###############\n");
 
  651    if (checkRunRecovery()) {
 
  670  if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
 
  674  if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
 
  678  if (error_set)  B2INFO(
"Raw2Ds: Error flag was set in EventMetaData.");
 
  686  unsigned int exp_copper_0 = 0;
 
  687  unsigned int run_copper_0 = 0;
 
  688  unsigned int subrun_copper_0 = 0;
 
  689  unsigned int eve_copper_0 = 0;
 
  690  unsigned int error_bit_flag = 0;
 
  700      B2INFO(
"DeSerializerPC: Waiting for Start...\n");
 
  709  if (g_run_pause > 0 ||  g_run_error > 0) {
 
  710    if (g_run_pause == 0) {
 
  712        if (checkRunPause()) 
break;
 
  715        printf(
"###########(DeserializerPC) Waiting for Runpause()  ###############\n");
 
  723    m_eventMetaDataPtr.create(); 
 
  735  for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
 
  744      checkData(&temp_rawdatablk, &exp_copper_0, &run_copper_0, &subrun_copper_0, &eve_copper_0, &error_bit_flag);
 
  745    } 
catch (
const string& err_str) {
 
  748      if (err_str == 
"RUN_PAUSE" || err_str == 
"RUN_ERROR") {
 
  749        m_eventMetaDataPtr.create();
 
  753      print_err.PrintError((
char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
 
  757    RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
 
  762  if (buf_rc != NULL) {
 
  769  m_eventMetaDataPtr.create();
 
  770  m_eventMetaDataPtr->setExperiment(exp_copper_0);
 
  771  m_eventMetaDataPtr->setRun(run_copper_0);
 
  772  m_eventMetaDataPtr->setSubrun(subrun_copper_0);
 
  773  m_eventMetaDataPtr->setEvent(eve_copper_0);
 
  775  setErrorFlag(error_bit_flag, m_eventMetaDataPtr);
 
  776  if (error_bit_flag != 0) {
 
  778    printf(
"[ERROR] error bit was detected. exp %u run %u eve %u count = %u\n",
 
  779           exp_copper_0, run_copper_0, eve_copper_0, error_bit_flag);
 
  801      printf(
"[DEBUG] RunStop was detected. ( Setting:  Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
 
  803      m_eventMetaDataPtr->setEndOfData();
 
  808    RateMonitor(eve_copper_0, subrun_copper_0, run_copper_0);
 
  812    g_status.setInputNBytes(m_totbytes);
 
A class definition of an input module for Sequential ROOT I/O.
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
std::string m_nodename
Node name.
int m_start_flag
start flag
int BUF_SIZE_WORD
size of buffer for one event (word)
static RunInfoBuffer g_status
buffer class to communicate with NSM client
int n_basf2evt
No. of sent events.
CprErrorMessage print_err
wrapper for B2LOG system
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
int m_shmflag
Use shared memory.
int * m_bufary[NUM_PREALLOC_BUF]
buffer
unsigned int m_exprunsubrun_no
run no.
virtual void printASCIIData(int *buf, int nwords)
dump error data
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
int m_nodeid
Node(PC or COPPER) ID.
double getTimeSec()
store time info.
std::vector< int > m_socket
Reciever Socket.
virtual int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
void initialize() override
Module functions to be called from main process.
void event() override
Module functions to be called from event process.
std::vector< int > m_port_from
port # to connect data sources
virtual int Connect()
Accept connection.
virtual int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
virtual void setRecvdBuffer(RawDataBlock *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
DeSerializerPCModule()
Constructor / Destructor.
virtual void checkData(RawDataBlock *raw_datablk, unsigned int *exp_copper_0, unsigned int *run_copper_0, unsigned int *subrun_copper_0, unsigned int *eve_copper_0, unsigned int *error_bit_flag)
check data contents
std::vector< std::string > m_hostname_from
hostname of upstream Data Sources
void setDescription(const std::string &description)
Sets the description of the module.
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
virtual void PrintData(int *buf, int nwords)
print data
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetBufferPos(int n)
get position of data block in word
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBlockNwords(int n)
get size of a data block
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetEveNo(int n)
Get event #.
unsigned int GetTTUtime(int n)
get unixtime of the trigger
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
unsigned int GetEveNo(int n)
Get Event #.
Type-safe access to single objects in the data store.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
int GetExpNo(int n)
get Experimental # from header
unsigned int GetEveNo(int n)
get subrun #(8bit)
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
int GetSubRunNo(int n)
get run # (14bit)
unsigned int GetErrorBitFlag(int n)
get contents of header
int GetRunNo(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Abstract base class for different kinds of events.