Belle II Software  release-06-02-00
DeSerializerPrePC.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/rawdata/modules/DeSerializerPrePC.h>
10 #include <daq/dataobjects/SendHeader.h>
11 #include <daq/dataobjects/SendTrailer.h>
12 #include <rawdata/dataobjects/RawTLU.h>
13 
14 #include <netinet/tcp.h>
15 
16 //#define MAXEVTSIZE 400000000
17 #define CHECKEVT 5000
18 
19 //#define DEBUG
20 //#define NO_DATA_CHECK
21 //#define DUMHSLB
22 
23 using namespace std;
24 using namespace Belle2;
25 
26 //-----------------------------------------------------------------
27 // Register the Module
28 //-----------------------------------------------------------------
29 REG_MODULE(DeSerializerPrePC)
30 
31 //-----------------------------------------------------------------
32 // Implementation
33 //-----------------------------------------------------------------
34 
36 {
37  //Set module properties
38  setDescription("Encode DataStore into RingBuffer");
39  // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
40  addParam("NumConn", m_num_connections, "Number of Connections", 0);
41  addParam("HostNameFrom", m_hostname_from, "Hostnames of data sources");
42  addParam("PortFrom", m_port_from, "port numbers of data sources");
43 
44  B2INFO("DeSerializerPrePC: Constructor done.");
45 }
46 
47 
48 
49 DeSerializerPrePCModule::~DeSerializerPrePCModule()
50 {
51 
52 }
53 
54 
55 void DeSerializerPrePCModule::initialize()
56 {
57  B2INFO("DeSerializerPrePC: initialize() started.");
58 
59  // allocate buffer
60  for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
61  m_bufary[i] = new int[ BUF_SIZE_WORD ];
62  }
63  m_buffer = new int[ BUF_SIZE_WORD ];
64 
65 
66  // initialize buffer
67  for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
68  memset(m_bufary[i], 0, BUF_SIZE_WORD * sizeof(int));
69  }
70 
71  // Initialize EvtMetaData
72  m_eventMetaDataPtr.registerInDataStore();
73 
74  raw_datablkarray.registerInDataStore();
75  rawcprarray.registerInDataStore();
76  raw_ftswarray.registerInDataStore();
77 
78  // Initialize Array of RawCOPPER
79 
80  if (m_dump_fname.size() > 0) {
81  openOutputFile();
82  }
83 
84  // Initialize arrays for time monitor
85  memset(time_array0, 0, sizeof(time_array0));
86  memset(time_array1, 0, sizeof(time_array1));
87  memset(time_array2, 0, sizeof(time_array2));
88 
89  clearNumUsedBuf();
90 
91  // Shared memory
92  if (m_shmflag > 0) {
93  if (m_nodename.size() == 0 || m_nodeid < 0) {
94  m_shmflag = 0;
95  } else {
96  g_status.open(m_nodename, m_nodeid);
97  g_status.reportReady();
98  }
99  }
100 
101  event_diff = 0;
102 
103  m_prev_copper_ctr = 0xFFFFFFFF;
104  m_prev_evenum = 0xFFFFFFFF;
105 
106  B2INFO("DeSerializerPrePC: initialize() done.");
107 }
108 
109 
110 int DeSerializerPrePCModule::recvFD(int sock, char* buf, int data_size_byte, int flag)
111 {
112  int n = 0;
113  int read_size = 0;
114  while (1) {
115  if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n , flag)) < 0) {
116  if (errno == EINTR) {
117  continue;
118  } else {
119  char err_buf[500];
120  sprintf(err_buf, "[FATAL] Failed to receive data(%s). Exiting...", strerror(errno));
121  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
122  sleep(1234567);
123  exit(-1);
124  }
125  } else {
126  n += read_size;
127  if (n == data_size_byte)break;
128  }
129  }
130  return n;
131 }
132 
133 int DeSerializerPrePCModule::Connect()
134 {
135  for (int i = 0; i < m_num_connections; i++) {
136  //
137  // Connect to a downstream node
138  //
139  struct sockaddr_in socPC;
140  socPC.sin_family = AF_INET;
141 
142  struct hostent* host;
143  host = gethostbyname(m_hostname_from[ i ].c_str());
144  if (host == NULL) {
145  char err_buf[100];
146  sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
147  strerror(errno));
148  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
149  sleep(1234567);
150  exit(1);
151  }
152  socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
153  socPC.sin_port = htons(m_port_from[ i ]);
154  int sd = socket(PF_INET, SOCK_STREAM, 0);
155 
156  int val1 = 0;
157  setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
158 
159  printf("[DEBUG] Connecting to %s port %d ...\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]);
160  while (1) {
161  if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
162  perror("Failed to connect. Retrying...");
163  usleep(500000);
164  } else {
165  printf("[DEBUG] Done\n");
166  break;
167  }
168  }
169  m_socket.push_back(sd);
170 
171  // check socket paramters
172  int val, len;
173  len = sizeof(val);
174  getsockopt(m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
175 #ifdef DEBUG
176  printf("[DEBUG] SO_RCVBUF %d\n", val);
177 #endif
178  getsockopt(m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
179 #ifdef DEBUG
180  printf("[DEBUG] SO_SNDBUF %d\n", val);
181 #endif
182  getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
183 #ifdef DEBUG
184  printf("[DEBUG] TCP_MAXSEG %d\n", val);
185 #endif
186  getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
187 #ifdef DEBUG
188  printf("[DEBUG] TCP_NODELAY %d\n", val);
189 #endif
190  if (g_status.isAvailable()) {
191  sockaddr_in sa;
192  memset(&sa, 0, sizeof(sockaddr_in));
193  socklen_t sa_len = sizeof(sa);
194  if (getsockname(m_socket[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
195  g_status.setInputPort(ntohs(sa.sin_port));
196  g_status.setInputAddress(sa.sin_addr.s_addr);
197  }
198  }
199 
200  }
201  printf("[DEBUG] Initialization finished\n");
202  return 0;
203 }
204 
205 
206 
207 int* DeSerializerPrePCModule::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock,
208  int* num_nodes_in_sendblock)
209 {
210 
211  int* temp_buf = NULL; // buffer for data-body
212  int flag = 0;
213 
214  vector <int> each_buf_nwords;
215  each_buf_nwords.clear();
216  vector <int> each_buf_nodes;
217  each_buf_nodes.clear();
218  vector <int> each_buf_events;
219  each_buf_events.clear();
220 
221  *total_buf_nwords = 0;
222  *num_nodes_in_sendblock = 0;
223  *num_events_in_sendblock = 0;
224 
225  //
226  // Read Header and obtain data size
227  //
228  int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
229  int temp_num_events = 0;
230  int temp_num_nodes = 0;
231 
232  // Read header
233  for (int i = 0; i < (int)(m_socket.size()); i++) {
234 
235  recvFD(m_socket[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
236 
237  SendHeader send_hdr;
238  send_hdr.SetBuffer(send_hdr_buf);
239 
240  temp_num_events = send_hdr.GetNumEventsinPacket();
241  temp_num_nodes = send_hdr.GetNumNodesinPacket();
242 
243 
244 
245  if (i == 0) {
246  *num_events_in_sendblock = temp_num_events;
247  } else if (*num_events_in_sendblock != temp_num_events) {
248 #ifndef NO_DATA_CHECK
249  char err_buf[500];
250  sprintf(err_buf,
251  "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
252  *num_events_in_sendblock , temp_num_events, i, *num_nodes_in_sendblock , temp_num_nodes, i);
253  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
254  sleep(1234567);
255  exit(1);
256 #endif
257  }
258 
259  *num_nodes_in_sendblock += temp_num_nodes;
260 
261  int rawblk_nwords = send_hdr.GetTotalNwords()
262  - SendHeader::SENDHDR_NWORDS
263  - SendTrailer::SENDTRL_NWORDS;
264  *total_buf_nwords += rawblk_nwords;
265 
266  //
267  // Data size check1
268  //
269  if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
270  printf("[DEBUG] *******HDR**********\n");
271  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
272  char err_buf[500];
273  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
274  send_hdr.GetTotalNwords());
275  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
276  sleep(123456);
277  exit(1);
278 
279  }
280 
281  each_buf_nwords.push_back(rawblk_nwords);
282  each_buf_events.push_back(temp_num_events);
283  each_buf_nodes.push_back(temp_num_nodes);
284 
285  }
286 
287 
288  temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
289  //
290  // Read body
291  //
292  int total_recvd_byte = 0;
293  for (int i = 0; i < (int)(m_socket.size()); i++) {
294  total_recvd_byte += recvFD(m_socket[ i ], (char*)temp_buf + total_recvd_byte,
295  each_buf_nwords[ i ] * sizeof(int), flag);
296 
297  //
298  // Data length check
299  //
300  int temp_length = 0;
301  for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
302  int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
303  temp_length += this_length * sizeof(int);
304  }
305  if (temp_length != each_buf_nwords[ i ] * sizeof(int)) {
306  printf("[DEBUG]*******SENDHDR*********** \n");
307  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
308  printf("[DEBUG]*******BODY***********\n ");
309  printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
310  char err_buf[500];
311  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
312  (int)(*total_buf_nwords * sizeof(int)), temp_length);
313  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
314  sleep(1234567);
315  exit(-1);
316  }
317 
318  }
319 
320  if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
321  char err_buf[500];
322  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
323  total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
324  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
325  sleep(1234567);
326  exit(-1);
327  }
328 
329  // Read Traeiler
330  int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
331  for (int i = 0; i < (int)(m_socket.size()); i++) {
332  recvFD(m_socket[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
333  }
334 
335  return temp_buf;
336 }
337 
338 
339 void DeSerializerPrePCModule::setRecvdBuffer(RawDataBlock* temp_raw_datablk, int* delete_flag)
340 {
341  //
342  // Get data from socket
343  //
344  int total_buf_nwords = 0 ;
345  int num_events_in_sendblock = 0;
346  int num_nodes_in_sendblock = 0;
347 
348  if (m_start_flag == 0) B2INFO("DeSerializerPrePC: Reading the 1st packet from eb0...");
349  int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
350  &num_nodes_in_sendblock);
351  if (m_start_flag == 0) {
352  B2INFO("DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords << " words");
353  m_start_flag = 1;
354  }
355  m_totbytes += total_buf_nwords * sizeof(int);
356 
357  temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
358  num_events_in_sendblock, num_nodes_in_sendblock);
359 
360  //
361  // check even # and node # in one Sendblock
362  //
363  int num_entries = temp_raw_datablk->GetNumEntries();
364  if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
365  char err_buf[500];
366  sprintf(err_buf,
367  "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
368  num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
369  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
370  sleep(1234567);
371  exit(-1);
372  }
373  return;
374 
375 }
376 
377 
378 
379 void DeSerializerPrePCModule::checkData(RawDataBlock* raw_datablk, unsigned int* eve_copper_0)
380 {
381  int data_size_copper_0 = -1;
382  int data_size_copper_1 = -1;
383 
384  //
385  // Data check
386  //
387  int* temp_buf = raw_datablk->GetBuffer(0);
388  int cpr_num = 0;
389  unsigned int cur_evenum = 0, cur_copper_ctr = 0;
390  unsigned int eve_array[32]; // # of noeds is less than 17
391  unsigned int utime_array[32];// # of noeds is less than 17
392  unsigned int ctime_type_array[32];// # of noeds is less than 17
393 
394 #ifdef DUMHSLB
395  unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
396 #endif
397 
398  for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
399  memset(eve_array, 0, sizeof(eve_array));
400  memset(utime_array, 0, sizeof(utime_array));
401  memset(ctime_type_array, 0, sizeof(ctime_type_array));
402 
403  int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
404  for (int l = 0; l < num_nodes_in_sendblock; l++) {
405  int entry_id = l + k * num_nodes_in_sendblock;
406 
407  //
408  // RawFTSW
409  //
410  if (raw_datablk->CheckFTSWID(entry_id)) {
411  RawFTSW* temp_rawftsw = new RawFTSW;
412  int block_id = 0;
413  temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
414  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
415  if (temp_rawftsw->GetEveNo(block_id) < 10) {
416  printf("[DEBUG] ######FTSW#########\n");
417  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
418  }
419 
420 #ifdef DUMHSLB
421  exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
422  ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
423  utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
424 #endif
425 
426 
427 #ifndef NO_DATA_CHECK
428  try {
429  temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
430  eve_array[ entry_id ] = cur_evenum;
431  } catch (string err_str) {
432  print_err.PrintError(m_shmflag, &g_status, err_str);
433  exit(1);
434  }
435 #endif
436  utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
437  ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
438  delete temp_rawftsw;
439 
440  //
441  // RawTLU
442  //
443  } else if (raw_datablk->CheckTLUID(entry_id)) {
444 
445  RawTLU* temp_rawtlu = new RawTLU;
446  temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
447  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
448  if (temp_rawtlu->GetEveNo(0) < 10
449  ) {
450  printf("[DEBUG] ######TLU#########\n");
451  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
452  }
453 
454 #ifndef NO_DATA_CHECK
455  try {
456  temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
457  eve_array[ entry_id ] = cur_evenum;
458  } catch (string err_str) {
459  print_err.PrintError(m_shmflag, &g_status, err_str);
460  exit(1);
461  }
462 #endif
463  delete temp_rawtlu;
464  } else {
465 
466 
467 
468  //
469  // RawCOPPER
470  //
471  int block_id = 0;
472 
473  RawCOPPER* temp_rawcopper = new RawCOPPER;
474  temp_rawcopper->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
475  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
476 
477 #ifdef DUMHSLB
478  (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
479  (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
480  (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
481 #endif
482 
483 
484 
485 #ifndef NO_DATA_CHECK
486  try {
487 
488  temp_rawcopper->CheckData(0, m_prev_evenum, &cur_evenum,
489  m_prev_copper_ctr, &cur_copper_ctr,
490  m_prev_exprunsubrun_no, &m_exprunsubrun_no);
491  eve_array[ entry_id ] = cur_evenum;
492  } catch (string err_str) {
493  print_err.PrintError(m_shmflag, &g_status, err_str);
494  exit(1);
495  }
496 #endif
497 
498  utime_array[ entry_id ] = temp_rawcopper->GetTTUtime(0);
499  ctime_type_array[ entry_id ] = temp_rawcopper->GetTTCtimeTRGType(0);
500 
501  if (cpr_num == 0) {
502  data_size_copper_0 = raw_datablk->GetBlockNwords(entry_id);
503  *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
504  } else if (cpr_num == 1) {
505  data_size_copper_1 = raw_datablk->GetBlockNwords(entry_id);
506  }
507  cpr_num++;
508  delete temp_rawcopper;
509 
510 
511  }
512 
513  }
514 
515 #ifndef NO_DATA_CHECK
516  // event #, ctime, utime over nodes
517  for (int l = 1; l < num_nodes_in_sendblock; l++) {
518  if (eve_array[ 0 ] != eve_array[ l ] ||
519  utime_array[ 0 ] != utime_array[ l ] ||
520  ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
521  char err_buf[500];
522  for (int m = 0; m < num_nodes_in_sendblock; m++) {
523  printf("[DEBUG] node %d eve # %d utime %x ctime %x\n",
524  m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
525  }
526  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
527  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
528  sleep(1234567);
529  exit(-1);
530  }
531  }
532 #endif
533 
534  // Event # monitor in runchange
535 // if (m_prev_runsubrun_no != m_runsubrun_no) {
536 // printf("[DEBUG] ##############################################\n");
537 // for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
538 // printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
539 // }
540 // printf("[DEBUG] ##############################################\n");
541 // fflush(stderr);
542 // }
543  m_prev_evenum = cur_evenum;
544  m_prev_copper_ctr = cur_copper_ctr;
545  m_prev_exprunsubrun_no = m_exprunsubrun_no;
546  }
547  return;
548 }
549 
550 
551 
552 void DeSerializerPrePCModule::event()
553 {
554  // For data check
555 
556  unsigned int eve_copper_0 = 0;
557 
558  clearNumUsedBuf();
559 
560  if (m_start_flag == 0) {
561  //
562  // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
563  //
564  Connect();
565  if (g_status.isAvailable()) {
566  B2INFO("DeSerializerPrePC: Waiting for Start...\n");
567  g_status.reportRunning();
568  }
569  m_start_time = getTimeSec();
570  n_basf2evt = 0;
571  }
572 
573  // Make rawdatablk array
574 
575 
576  //
577  // Main loop
578  //
579  for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
580 
581  //
582  // Receive data from COPPER
583  //
584  eve_copper_0 = 0;
585  int delete_flag_from = 0, delete_flag_to = 0;
586  RawDataBlock temp_rawdatablk;
587  setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
588  // temp_rawdatablk.PrintData( temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords() );
589  checkData(&temp_rawdatablk, &eve_copper_0);
590  // PreRawCOPPERFormat_latest pre_rawcopper_latest;
591  // pre_rawcopper_latest.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
592  // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
594 
595 
596  int temp_num_events, temp_num_nodes;
597  int temp_nwords_to;
598  int* buf_to = NULL;
599 #ifdef REDUCED_RAWCOPPER
600  //
601  // Copy reduced buffer
602  //
603  // int* buf_to = getNewBuffer(m_pre_rawcpr.CalcReducedDataSize(&temp_rawdatablk), &delete_flag_to); // basf2-dependent style
604  int* temp_bufin = temp_rawdatablk.GetWholeBuffer();
605  int temp_nwords_from = temp_rawdatablk.TotalBufNwords();
606  temp_num_events = temp_rawdatablk.GetNumEvents();
607  temp_num_nodes = temp_rawdatablk.GetNumNodes();
608  int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
609  buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
610 
611  // m_pre_rawcpr.CopyReducedData(&temp_rawdatablk, buf_to, delete_flag_from); // basf2-dependent style
612  m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
613  if (calced_temp_nwords_to != temp_nwords_to) {
614  char err_buf[500];
615  sprintf(err_buf,
616  "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
617  calced_temp_nwords_to, temp_nwords_to);
618  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
619  exit(1);
620  }
621 
622 #else
623  buf_to = temp_rawdatablk.GetWholeBuffer();
624  temp_nwords_to = temp_rawdatablk.TotalBufNwords();
625  temp_num_events = temp_rawdatablk.GetNumEvents();
626  temp_num_nodes = temp_rawdatablk.GetNumNodes();
627  delete_flag_to = delete_flag_from;
628  delete_flag_from = 0; // to avoid double delete
629 #endif
630 
631 
632  //
633  // Set buffer to the RawData class stored in DataStore
634  //
635  RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
636 // raw_datablk->SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
637 // delete_flag_to, temp_rawdatablk.GetNumEvents(),
638 // temp_rawdatablk.GetNumNodes());
639  raw_datablk->SetBuffer(buf_to, temp_nwords_to, delete_flag_to,
640  temp_num_events, temp_num_nodes);
641 
642 
643  //
644  // CRC16 check after data reduction
645  //
646 #ifdef REDUCED_RAWCOPPER
647  PostRawCOPPERFormat_latest post_rawcopper_latest;
648  post_rawcopper_latest.SetBuffer(buf_to, temp_nwords_to,
649  0, temp_num_events, temp_num_nodes);
650 
651  for (int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
652  int block_num = 0;
653  if (post_rawcopper_latest.GetFINESSENwords(block_num, i_finesse_num) > 0) {
654  post_rawcopper_latest.CheckCRC16(block_num, i_finesse_num);
655  }
656  }
657 
658 #endif
659 
660  }
661 
662 
663  //
664  // Update EventMetaData
665  //
666  m_eventMetaDataPtr.create();
667  m_eventMetaDataPtr->setExperiment(1);
668  m_eventMetaDataPtr->setRun(1);
669  m_eventMetaDataPtr->setEvent(n_basf2evt);
670 
671 
672 
673 
674  //
675  // Run stop via NSM (Already obsolete. Need to ask Konno-san about implementation)
676  //
677  // if (m_shmflag != 0) {
678  // if (n_basf2evt % 10 == 0) {
679  // if (g_status.isStopped()) {
680  // printf("[DEBUG] [INFO] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n", max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
681  // m_eventMetaDataPtr->setEndOfData();
682  // }
683  // }
684  // }
685 
686  //
687  // Monitor
688  //
689  if (max_nevt >= 0 || max_seconds >= 0.) {
690  if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
691  || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
692  printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
693  max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
694  m_eventMetaDataPtr->setEndOfData();
695  }
696  }
697 
698  if (n_basf2evt % 20000 == 0 || n_basf2evt < 10) {
699  RateMonitor(eve_copper_0);
700  }
701 
702 
703 
704  if (g_status.isAvailable()) {
705  g_status.setInputNBytes(m_totbytes);
706  g_status.setInputCount(n_basf2evt);
707  }
708 
709  n_basf2evt++;
710  return;
711 }
A class definition of an input module for Sequential ROOT I/O.
Definition: DeSerializer.h:36
A class definition of an input module for Sequential ROOT I/O.
int CheckCRC16(int n, int finesse_num)
check magic words
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
Definition: RawCOPPER.h:52
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawCOPPER.cc:141
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
Definition: RawDataBlock.h:27
virtual int GetBufferPos(int n)
get position of data block in word
Definition: RawDataBlock.h:46
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:101
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
Definition: RawDataBlock.h:67
virtual int * GetBuffer(int n)
get nth buffer pointer
Definition: RawDataBlock.h:53
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawDataBlock.cc:35
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
Definition: RawDataBlock.h:74
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:108
virtual int GetBlockNwords(int n)
get size of a data block
Definition: RawDataBlock.h:94
virtual int GetNumEvents()
get # of events in m_buffer
Definition: RawDataBlock.h:81
virtual int TotalBufNwords()
Get total length of m_buffer.
Definition: RawDataBlock.h:39
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
Definition: RawDataBlock.h:60
The Raw FTSW class.
Definition: RawFTSW.h:30
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
Definition: RawFTSW.h:72
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Definition: RawFTSW.h:130
unsigned int GetEveNo(int n)
Get event #.
Definition: RawFTSW.h:65
unsigned int GetTTUtime(int n)
get unixtime of the trigger
Definition: RawFTSW.h:79
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawFTSW.cc:107
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
Definition: RawFTSW.h:121
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition: RawTLU.h:27
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition: RawTLU.h:81
unsigned int GetEveNo(int n)
Get Event #.
Definition: RawTLU.h:53
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:605
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:611
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
Definition: RawCOPPER.h:696
int GetFINESSENwords(int n, int finesse_num) OVERRIDE_CPP17
Get the size of a finesse buffer.
Abstract base class for different kinds of events.