9#include <daq/rawdata/modules/DeSerializerPrePC.h> 
   10#include <daq/dataobjects/SendHeader.h> 
   11#include <daq/dataobjects/SendTrailer.h> 
   12#include <rawdata/dataobjects/RawTLU.h> 
   14#include <netinet/tcp.h> 
   44  B2INFO(
"DeSerializerPrePC: Constructor done.");
 
   49DeSerializerPrePCModule::~DeSerializerPrePCModule()
 
   57  B2INFO(
"DeSerializerPrePC: initialize() started.");
 
   60  for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
 
   67  for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
 
   72  m_eventMetaDataPtr.registerInDataStore();
 
   74  raw_datablkarray.registerInDataStore();
 
   75  rawcprarray.registerInDataStore();
 
   76  raw_ftswarray.registerInDataStore();
 
   85  memset(time_array0, 0, 
sizeof(time_array0));
 
   86  memset(time_array1, 0, 
sizeof(time_array1));
 
   87  memset(time_array2, 0, 
sizeof(time_array2));
 
  103  m_prev_copper_ctr = 0xFFFFFFFF;
 
  104  m_prev_evenum = 0xFFFFFFFF;
 
  106  B2INFO(
"DeSerializerPrePC: initialize() done.");
 
  115    if ((read_size = recv(sock, (
char*)buf + n, data_size_byte - n, flag)) < 0) {
 
  116      if (errno == EINTR) {
 
  120        sprintf(err_buf, 
"[FATAL] Failed to receive data(%s). Exiting...", strerror(errno));
 
  127      if (n == data_size_byte)
break;
 
  139    struct sockaddr_in socPC;
 
  140    socPC.sin_family = AF_INET;
 
  142    struct hostent* host;
 
  146      sprintf(err_buf, 
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", 
m_hostname_from[ i ].c_str(),
 
  152    socPC.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
 
  154    int sd = socket(PF_INET, SOCK_STREAM, 0);
 
  157    setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, 
sizeof(val1));
 
  161      if (connect(sd, (
struct sockaddr*)(&socPC), 
sizeof(socPC)) < 0) {
 
  162        perror(
"Failed to connect. Retrying...");
 
  165        printf(
"[DEBUG] Done\n");
 
  174    getsockopt(
m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
 
  176    printf(
"[DEBUG] SO_RCVBUF %d\n", val);
 
  178    getsockopt(
m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
 
  180    printf(
"[DEBUG] SO_SNDBUF %d\n", val);
 
  182    getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
 
  184    printf(
"[DEBUG] TCP_MAXSEG %d\n", val);
 
  186    getsockopt(
m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
 
  188    printf(
"[DEBUG] TCP_NODELAY %d\n", val);
 
  192      memset(&sa, 0, 
sizeof(sockaddr_in));
 
  193      socklen_t sa_len = 
sizeof(sa);
 
  194      if (getsockname(
m_socket[i], (
struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
 
  195        g_status.setInputPort(ntohs(sa.sin_port));
 
  196        g_status.setInputAddress(sa.sin_addr.s_addr);
 
  201  printf(
"[DEBUG] Initialization finished\n");
 
  208                                       int* num_nodes_in_sendblock)
 
  211  int* temp_buf = NULL; 
 
  214  vector <int> each_buf_nwords;
 
  215  each_buf_nwords.clear();
 
  216  vector <int> each_buf_nodes;
 
  217  each_buf_nodes.clear();
 
  218  vector <int> each_buf_events;
 
  219  each_buf_events.clear();
 
  221  *total_buf_nwords = 0;
 
  222  *num_nodes_in_sendblock = 0;
 
  223  *num_events_in_sendblock = 0;
 
  228  int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
 
  231  for (
int i = 0; i < (int)(
m_socket.size()); i++) {
 
  233    recvFD(
m_socket[ i ], (
char*)send_hdr_buf, 
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
 
  239    int temp_num_nodes = send_hdr.GetNumNodesinPacket();
 
  244      *num_events_in_sendblock = temp_num_events;
 
  245    } 
else if (*num_events_in_sendblock != temp_num_events) {
 
  249              "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
 
  250              *num_events_in_sendblock, temp_num_events, i,  *num_nodes_in_sendblock, temp_num_nodes, i);
 
  257    *num_nodes_in_sendblock += temp_num_nodes;
 
  259    int rawblk_nwords = send_hdr.GetTotalNwords()
 
  260                        - SendHeader::SENDHDR_NWORDS
 
  261                        - SendTrailer::SENDTRL_NWORDS;
 
  262    *total_buf_nwords += rawblk_nwords;
 
  267    if (rawblk_nwords > (
int)(2.5e6) || rawblk_nwords <= 0) {
 
  268      printf(
"[DEBUG] *******HDR**********\n");
 
  269      printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
 
  271      sprintf(err_buf, 
"[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
 
  272              send_hdr.GetTotalNwords());
 
  279    each_buf_nwords.push_back(rawblk_nwords);
 
  280    each_buf_events.push_back(temp_num_events);
 
  281    each_buf_nodes.push_back(temp_num_nodes);
 
  286  temp_buf = 
getNewBuffer(*total_buf_nwords, delete_flag); 
 
  290  int total_recvd_byte = 0;
 
  291  for (
int i = 0; i < (int)(
m_socket.size()); i++) {
 
  292    total_recvd_byte += 
recvFD(
m_socket[ i ], (
char*)temp_buf + total_recvd_byte,
 
  293                               each_buf_nwords[ i ] * 
sizeof(
int), flag);
 
  298    unsigned temp_length = 0;
 
  299    for (
int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
 
  300      int this_length = *((
int*)((
char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * 
sizeof(int) + temp_length));
 
  301      temp_length += this_length * 
sizeof(int);
 
  303    if (temp_length != each_buf_nwords[ i ] * 
sizeof(
int)) {
 
  304      printf(
"[DEBUG]*******SENDHDR*********** \n");
 
  305      printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
 
  306      printf(
"[DEBUG]*******BODY***********\n ");
 
  307      printData(temp_buf, (
int)(total_recvd_byte / 
sizeof(
int)));
 
  309      sprintf(err_buf, 
"[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %u. Exting...",
 
  310              (
int)(*total_buf_nwords * 
sizeof(
int)), temp_length);
 
  318  if ((
int)(*total_buf_nwords * 
sizeof(
int)) != total_recvd_byte) {
 
  320    sprintf(err_buf, 
"[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
 
  321            total_recvd_byte, (
int)(*total_buf_nwords * 
sizeof(
int)));
 
  328  int send_trl_buf[(
unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
 
  329  for (
int i = 0; i < (int)(
m_socket.size()); i++) {
 
  330    recvFD(
m_socket[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * 
sizeof(
int), flag);
 
  342  int total_buf_nwords = 0 ;
 
  343  int num_events_in_sendblock = 0;
 
  344  int num_nodes_in_sendblock = 0;
 
  346  if (
m_start_flag == 0) B2INFO(
"DeSerializerPrePC: Reading the 1st packet from eb0...");
 
  347  int* temp_buf = 
recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
 
  348                           &num_nodes_in_sendblock);
 
  350    B2INFO(
"DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords << 
" words");
 
  353  m_totbytes += total_buf_nwords * 
sizeof(int);
 
  355  temp_raw_datablk->
SetBuffer((
int*)temp_buf, total_buf_nwords, *delete_flag,
 
  356                              num_events_in_sendblock, num_nodes_in_sendblock);
 
  362  if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
 
  365            "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
 
  366            num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
 
  382  int* temp_buf = raw_datablk->
GetBuffer(0);
 
  384  unsigned int cur_evenum = 0, cur_copper_ctr = 0;
 
  385  unsigned int eve_array[32]; 
 
  386  unsigned int utime_array[32];
 
  387  unsigned int ctime_type_array[32];
 
  390  unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
 
  394    memset(eve_array, 0, 
sizeof(eve_array));
 
  395    memset(utime_array, 0, 
sizeof(utime_array));
 
  396    memset(ctime_type_array, 0, 
sizeof(ctime_type_array));
 
  398    int num_nodes_in_sendblock = raw_datablk->
GetNumNodes();
 
  399    for (
int l = 0; l < num_nodes_in_sendblock; l++) {
 
  400      int entry_id = l + k * num_nodes_in_sendblock;
 
  410        if (temp_rawftsw->
GetEveNo(block_id) < 10) {
 
  411          printf(
"[DEBUG] ######FTSW#########\n");
 
  418        utime_ftsw = temp_rawftsw->
GetTTUtime(block_id);
 
  425          eve_array[ entry_id ] = cur_evenum;
 
  426        } 
catch (
const string& err_str) {
 
  431        utime_array[ entry_id ] = temp_rawftsw->
GetTTUtime(block_id);
 
  438      } 
else if (raw_datablk->
CheckTLUID(entry_id)) {
 
  445          printf(
"[DEBUG] ######TLU#########\n");
 
  451          temp_rawtlu->
CheckData(0, m_prev_evenum, &cur_evenum);
 
  452          eve_array[ entry_id ] = cur_evenum;
 
  453        } 
catch (
const string& err_str) {
 
  473        (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
 
  474        (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
 
  475        (temp_rawcopper->
GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
 
  483          temp_rawcopper->
CheckData(0, m_prev_evenum, &cur_evenum,
 
  484                                    m_prev_copper_ctr, &cur_copper_ctr,
 
  486          eve_array[ entry_id ] = cur_evenum;
 
  487        } 
catch (
const string& err_str) {
 
  493        utime_array[ entry_id ] = temp_rawcopper->
GetTTUtime(0);
 
  498          *eve_copper_0 = (raw_datablk->
GetBuffer(entry_id))[ 3 ];
 
  499        } 
else if (cpr_num == 1) {
 
  503        delete temp_rawcopper;
 
  512    for (
int l = 1; l < num_nodes_in_sendblock; l++) {
 
  513      if (eve_array[ 0 ] != eve_array[ l ] ||
 
  514          utime_array[ 0 ] != utime_array[ l ] ||
 
  515          ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
 
  517        for (
int m = 0; m < num_nodes_in_sendblock; m++) {
 
  518          printf(
"[DEBUG] node %d eve # %u utime %x ctime %x\n",
 
  519                 m,  eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
 
  521        sprintf(err_buf, 
"[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
 
  538    m_prev_evenum = cur_evenum;
 
  539    m_prev_copper_ctr = cur_copper_ctr;
 
  551  unsigned int eve_copper_0 = 0;
 
  561      B2INFO(
"DeSerializerPrePC: Waiting for Start...\n");
 
  574  for (
int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
 
  580    int delete_flag_from = 0, delete_flag_to = 0;
 
  584    checkData(&temp_rawdatablk, &eve_copper_0);
 
  591    int temp_num_events, temp_num_nodes;
 
  594#ifdef REDUCED_RAWCOPPER 
  603    int calced_temp_nwords_to = m_pre_rawcpr.
CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
 
  604    buf_to = 
getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
 
  607    m_pre_rawcpr.
CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
 
  608    if (calced_temp_nwords_to != temp_nwords_to) {
 
  611              "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
 
  612              calced_temp_nwords_to, temp_nwords_to);
 
  613      print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
 
  622    delete_flag_to = delete_flag_from;
 
  623    delete_flag_from = 0; 
 
  630    RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
 
  634    raw_datablk->
SetBuffer(buf_to, temp_nwords_to, delete_flag_to,
 
  635                           temp_num_events, temp_num_nodes);
 
  641#ifdef REDUCED_RAWCOPPER 
  643    post_rawcopper_latest.
SetBuffer(buf_to, temp_nwords_to,
 
  644                                    0, temp_num_events, temp_num_nodes);
 
  646    for (
int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
 
  649        post_rawcopper_latest.
CheckCRC16(block_num, i_finesse_num);
 
  661  m_eventMetaDataPtr.create();
 
  662  m_eventMetaDataPtr->setExperiment(1);
 
  663  m_eventMetaDataPtr->setRun(1);
 
  687      printf(
"[DEBUG] RunStop was detected. ( Setting:  Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
 
  689      m_eventMetaDataPtr->setEndOfData();
 
  700    g_status.setInputNBytes(m_totbytes);
 
A class definition of an input module for Sequential ROOT I/O.
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
std::string m_nodename
Node name.
int m_start_flag
start flag
int BUF_SIZE_WORD
size of buffer for one event (word)
static RunInfoBuffer g_status
buffer class to communicate with NSM client
int n_basf2evt
No. of sent events.
CprErrorMessage print_err
wrapper for B2LOG system
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
int m_shmflag
Use shared memory.
int * m_bufary[NUM_PREALLOC_BUF]
buffer
unsigned int m_exprunsubrun_no
run no.
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
int m_nodeid
Node(PC or COPPER) ID.
double getTimeSec()
store time info.
std::vector< int > m_socket
Reciever Socket.
virtual int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
void initialize() override
Module functions to be called from main process.
void event() override
Module functions to be called from event process.
virtual void checkData(RawDataBlock *raw_datablk, unsigned int *eve_copper_0)
check data contents
std::vector< int > m_port_from
port # to connect data sources
virtual int Connect()
Accept connection.
virtual int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
DeSerializerPrePCModule()
Constructor / Destructor.
virtual void setRecvdBuffer(RawDataBlock *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
std::vector< std::string > m_hostname_from
hostname of upstream Data Sources
void setDescription(const std::string &description)
Sets the description of the module.
The Raw COPPER class ver.
int CheckCRC16(int n, int finesse_num)
check magic words
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetBufferPos(int n)
get position of data block in word
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBlockNwords(int n)
get size of a data block
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetEveNo(int n)
Get event #.
unsigned int GetTTUtime(int n)
get unixtime of the trigger
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
unsigned int GetEveNo(int n)
Get Event #.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
int GetFINESSENwords(int n, int finesse_num) OVERRIDE_CPP17
Get the size of a finesse buffer.
Abstract base class for different kinds of events.