Belle II Software  release-08-01-10
DeSerializerPrePC.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/rawdata/modules/DeSerializerPrePC.h>
10 #include <daq/dataobjects/SendHeader.h>
11 #include <daq/dataobjects/SendTrailer.h>
12 #include <rawdata/dataobjects/RawTLU.h>
13 
14 #include <netinet/tcp.h>
15 
16 //#define MAXEVTSIZE 400000000
17 #define CHECKEVT 5000
18 
19 //#define DEBUG
20 //#define NO_DATA_CHECK
21 //#define DUMHSLB
22 
23 using namespace std;
24 using namespace Belle2;
25 
26 //-----------------------------------------------------------------
27 // Register the Module
28 //-----------------------------------------------------------------
29 REG_MODULE(DeSerializerPrePC)
30 
31 //-----------------------------------------------------------------
32 // Implementation
33 //-----------------------------------------------------------------
34 
36 {
37  //Set module properties
38  setDescription("Encode DataStore into RingBuffer");
39  // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
40  addParam("NumConn", m_num_connections, "Number of Connections", 0);
41  addParam("HostNameFrom", m_hostname_from, "Hostnames of data sources");
42  addParam("PortFrom", m_port_from, "port numbers of data sources");
43 
44  B2INFO("DeSerializerPrePC: Constructor done.");
45 }
46 
47 
48 
49 DeSerializerPrePCModule::~DeSerializerPrePCModule()
50 {
51 
52 }
53 
54 
55 void DeSerializerPrePCModule::initialize()
56 {
57  B2INFO("DeSerializerPrePC: initialize() started.");
58 
59  // allocate buffer
60  for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
61  m_bufary[i] = new int[ BUF_SIZE_WORD ];
62  }
63  m_buffer = new int[ BUF_SIZE_WORD ];
64 
65 
66  // initialize buffer
67  for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
68  memset(m_bufary[i], 0, BUF_SIZE_WORD * sizeof(int));
69  }
70 
71  // Initialize EvtMetaData
72  m_eventMetaDataPtr.registerInDataStore();
73 
74  raw_datablkarray.registerInDataStore();
75  rawcprarray.registerInDataStore();
76  raw_ftswarray.registerInDataStore();
77 
78  // Initialize Array of RawCOPPER
79 
80  if (m_dump_fname.size() > 0) {
81  openOutputFile();
82  }
83 
84  // Initialize arrays for time monitor
85  memset(time_array0, 0, sizeof(time_array0));
86  memset(time_array1, 0, sizeof(time_array1));
87  memset(time_array2, 0, sizeof(time_array2));
88 
89  clearNumUsedBuf();
90 
91  // Shared memory
92  if (m_shmflag > 0) {
93  if (m_nodename.size() == 0 || m_nodeid < 0) {
94  m_shmflag = 0;
95  } else {
96  g_status.open(m_nodename, m_nodeid);
97  g_status.reportReady();
98  }
99  }
100 
101  event_diff = 0;
102 
103  m_prev_copper_ctr = 0xFFFFFFFF;
104  m_prev_evenum = 0xFFFFFFFF;
105 
106  B2INFO("DeSerializerPrePC: initialize() done.");
107 }
108 
109 
110 int DeSerializerPrePCModule::recvFD(int sock, char* buf, int data_size_byte, int flag)
111 {
112  int n = 0;
113  while (1) {
114  int read_size = 0;
115  if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n, flag)) < 0) {
116  if (errno == EINTR) {
117  continue;
118  } else {
119  char err_buf[500];
120  sprintf(err_buf, "[FATAL] Failed to receive data(%s). Exiting...", strerror(errno));
121  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
122  sleep(1234567);
123  exit(-1);
124  }
125  } else {
126  n += read_size;
127  if (n == data_size_byte)break;
128  }
129  }
130  return n;
131 }
132 
133 int DeSerializerPrePCModule::Connect()
134 {
135  for (int i = 0; i < m_num_connections; i++) {
136  //
137  // Connect to a downstream node
138  //
139  struct sockaddr_in socPC;
140  socPC.sin_family = AF_INET;
141 
142  struct hostent* host;
143  host = gethostbyname(m_hostname_from[ i ].c_str());
144  if (host == NULL) {
145  char err_buf[100];
146  sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
147  strerror(errno));
148  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
149  sleep(1234567);
150  exit(1);
151  }
152  socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
153  socPC.sin_port = htons(m_port_from[ i ]);
154  int sd = socket(PF_INET, SOCK_STREAM, 0);
155 
156  int val1 = 0;
157  setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
158 
159  printf("[DEBUG] Connecting to %s port %d ...\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]);
160  while (1) {
161  if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
162  perror("Failed to connect. Retrying...");
163  usleep(500000);
164  } else {
165  printf("[DEBUG] Done\n");
166  break;
167  }
168  }
169  m_socket.push_back(sd);
170 
171  // check socket paramters
172  int val, len;
173  len = sizeof(val);
174  getsockopt(m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
175 #ifdef DEBUG
176  printf("[DEBUG] SO_RCVBUF %d\n", val);
177 #endif
178  getsockopt(m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
179 #ifdef DEBUG
180  printf("[DEBUG] SO_SNDBUF %d\n", val);
181 #endif
182  getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
183 #ifdef DEBUG
184  printf("[DEBUG] TCP_MAXSEG %d\n", val);
185 #endif
186  getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
187 #ifdef DEBUG
188  printf("[DEBUG] TCP_NODELAY %d\n", val);
189 #endif
190  if (g_status.isAvailable()) {
191  sockaddr_in sa;
192  memset(&sa, 0, sizeof(sockaddr_in));
193  socklen_t sa_len = sizeof(sa);
194  if (getsockname(m_socket[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
195  g_status.setInputPort(ntohs(sa.sin_port));
196  g_status.setInputAddress(sa.sin_addr.s_addr);
197  }
198  }
199 
200  }
201  printf("[DEBUG] Initialization finished\n");
202  return 0;
203 }
204 
205 
206 
207 int* DeSerializerPrePCModule::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock,
208  int* num_nodes_in_sendblock)
209 {
210 
211  int* temp_buf = NULL; // buffer for data-body
212  int flag = 0;
213 
214  vector <int> each_buf_nwords;
215  each_buf_nwords.clear();
216  vector <int> each_buf_nodes;
217  each_buf_nodes.clear();
218  vector <int> each_buf_events;
219  each_buf_events.clear();
220 
221  *total_buf_nwords = 0;
222  *num_nodes_in_sendblock = 0;
223  *num_events_in_sendblock = 0;
224 
225  //
226  // Read Header and obtain data size
227  //
228  int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
229 
230  // Read header
231  for (int i = 0; i < (int)(m_socket.size()); i++) {
232 
233  recvFD(m_socket[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
234 
235  SendHeader send_hdr;
236  send_hdr.SetBuffer(send_hdr_buf);
237 
238  int temp_num_events = send_hdr.GetNumEventsinPacket();
239  int temp_num_nodes = send_hdr.GetNumNodesinPacket();
240 
241 
242 
243  if (i == 0) {
244  *num_events_in_sendblock = temp_num_events;
245  } else if (*num_events_in_sendblock != temp_num_events) {
246 #ifndef NO_DATA_CHECK
247  char err_buf[500];
248  sprintf(err_buf,
249  "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
250  *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
251  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
252  sleep(1234567);
253  exit(1);
254 #endif
255  }
256 
257  *num_nodes_in_sendblock += temp_num_nodes;
258 
259  int rawblk_nwords = send_hdr.GetTotalNwords()
260  - SendHeader::SENDHDR_NWORDS
261  - SendTrailer::SENDTRL_NWORDS;
262  *total_buf_nwords += rawblk_nwords;
263 
264  //
265  // Data size check1
266  //
267  if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
268  printf("[DEBUG] *******HDR**********\n");
269  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
270  char err_buf[500];
271  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
272  send_hdr.GetTotalNwords());
273  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
274  sleep(123456);
275  exit(1);
276 
277  }
278 
279  each_buf_nwords.push_back(rawblk_nwords);
280  each_buf_events.push_back(temp_num_events);
281  each_buf_nodes.push_back(temp_num_nodes);
282 
283  }
284 
285 
286  temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
287  //
288  // Read body
289  //
290  int total_recvd_byte = 0;
291  for (int i = 0; i < (int)(m_socket.size()); i++) {
292  total_recvd_byte += recvFD(m_socket[ i ], (char*)temp_buf + total_recvd_byte,
293  each_buf_nwords[ i ] * sizeof(int), flag);
294 
295  //
296  // Data length check
297  //
298  unsigned temp_length = 0;
299  for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
300  int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
301  temp_length += this_length * sizeof(int);
302  }
303  if (temp_length != each_buf_nwords[ i ] * sizeof(int)) {
304  printf("[DEBUG]*******SENDHDR*********** \n");
305  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
306  printf("[DEBUG]*******BODY***********\n ");
307  printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
308  char err_buf[500];
309  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
310  (int)(*total_buf_nwords * sizeof(int)), temp_length);
311  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
312  sleep(1234567);
313  exit(-1);
314  }
315 
316  }
317 
318  if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
319  char err_buf[500];
320  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
321  total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
322  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
323  sleep(1234567);
324  exit(-1);
325  }
326 
327  // Read Traeiler
328  int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
329  for (int i = 0; i < (int)(m_socket.size()); i++) {
330  recvFD(m_socket[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
331  }
332 
333  return temp_buf;
334 }
335 
336 
337 void DeSerializerPrePCModule::setRecvdBuffer(RawDataBlock* temp_raw_datablk, int* delete_flag)
338 {
339  //
340  // Get data from socket
341  //
342  int total_buf_nwords = 0 ;
343  int num_events_in_sendblock = 0;
344  int num_nodes_in_sendblock = 0;
345 
346  if (m_start_flag == 0) B2INFO("DeSerializerPrePC: Reading the 1st packet from eb0...");
347  int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
348  &num_nodes_in_sendblock);
349  if (m_start_flag == 0) {
350  B2INFO("DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords << " words");
351  m_start_flag = 1;
352  }
353  m_totbytes += total_buf_nwords * sizeof(int);
354 
355  temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
356  num_events_in_sendblock, num_nodes_in_sendblock);
357 
358  //
359  // check even # and node # in one Sendblock
360  //
361  int num_entries = temp_raw_datablk->GetNumEntries();
362  if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
363  char err_buf[500];
364  sprintf(err_buf,
365  "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
366  num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
367  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
368  sleep(1234567);
369  exit(-1);
370  }
371  return;
372 
373 }
374 
375 
376 
377 void DeSerializerPrePCModule::checkData(RawDataBlock* raw_datablk, unsigned int* eve_copper_0)
378 {
379  //
380  // Data check
381  //
382  int* temp_buf = raw_datablk->GetBuffer(0);
383  int cpr_num = 0;
384  unsigned int cur_evenum = 0, cur_copper_ctr = 0;
385  unsigned int eve_array[32]; // # of noeds is less than 17
386  unsigned int utime_array[32];// # of noeds is less than 17
387  unsigned int ctime_type_array[32];// # of noeds is less than 17
388 
389 #ifdef DUMHSLB
390  unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
391 #endif
392 
393  for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
394  memset(eve_array, 0, sizeof(eve_array));
395  memset(utime_array, 0, sizeof(utime_array));
396  memset(ctime_type_array, 0, sizeof(ctime_type_array));
397 
398  int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
399  for (int l = 0; l < num_nodes_in_sendblock; l++) {
400  int entry_id = l + k * num_nodes_in_sendblock;
401 
402  //
403  // RawFTSW
404  //
405  if (raw_datablk->CheckFTSWID(entry_id)) {
406  RawFTSW* temp_rawftsw = new RawFTSW;
407  int block_id = 0;
408  temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
409  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
410  if (temp_rawftsw->GetEveNo(block_id) < 10) {
411  printf("[DEBUG] ######FTSW#########\n");
412  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
413  }
414 
415 #ifdef DUMHSLB
416  exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
417  ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
418  utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
419 #endif
420 
421 
422 #ifndef NO_DATA_CHECK
423  try {
424  temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
425  eve_array[ entry_id ] = cur_evenum;
426  } catch (string err_str) {
427  print_err.PrintError(m_shmflag, &g_status, err_str);
428  exit(1);
429  }
430 #endif
431  utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
432  ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
433  delete temp_rawftsw;
434 
435  //
436  // RawTLU
437  //
438  } else if (raw_datablk->CheckTLUID(entry_id)) {
439 
440  RawTLU* temp_rawtlu = new RawTLU;
441  temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
442  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
443  if (temp_rawtlu->GetEveNo(0) < 10
444  ) {
445  printf("[DEBUG] ######TLU#########\n");
446  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
447  }
448 
449 #ifndef NO_DATA_CHECK
450  try {
451  temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
452  eve_array[ entry_id ] = cur_evenum;
453  } catch (string err_str) {
454  print_err.PrintError(m_shmflag, &g_status, err_str);
455  exit(1);
456  }
457 #endif
458  delete temp_rawtlu;
459  } else {
460 
461 
462 
463  //
464  // RawCOPPER
465  //
466 
467  RawCOPPER* temp_rawcopper = new RawCOPPER;
468  temp_rawcopper->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
469  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
470 
471 #ifdef DUMHSLB
472  int block_id = 0;
473  (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
474  (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
475  (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
476 #endif
477 
478 
479 
480 #ifndef NO_DATA_CHECK
481  try {
482 
483  temp_rawcopper->CheckData(0, m_prev_evenum, &cur_evenum,
484  m_prev_copper_ctr, &cur_copper_ctr,
485  m_prev_exprunsubrun_no, &m_exprunsubrun_no);
486  eve_array[ entry_id ] = cur_evenum;
487  } catch (string err_str) {
488  print_err.PrintError(m_shmflag, &g_status, err_str);
489  exit(1);
490  }
491 #endif
492 
493  utime_array[ entry_id ] = temp_rawcopper->GetTTUtime(0);
494  ctime_type_array[ entry_id ] = temp_rawcopper->GetTTCtimeTRGType(0);
495 
496  if (cpr_num == 0) {
497  raw_datablk->GetBlockNwords(entry_id); // returns data_size_copper_0
498  *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
499  } else if (cpr_num == 1) {
500  raw_datablk->GetBlockNwords(entry_id); // returns data_size_copper_1
501  }
502  cpr_num++;
503  delete temp_rawcopper;
504 
505 
506  }
507 
508  }
509 
510 #ifndef NO_DATA_CHECK
511  // event #, ctime, utime over nodes
512  for (int l = 1; l < num_nodes_in_sendblock; l++) {
513  if (eve_array[ 0 ] != eve_array[ l ] ||
514  utime_array[ 0 ] != utime_array[ l ] ||
515  ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
516  char err_buf[500];
517  for (int m = 0; m < num_nodes_in_sendblock; m++) {
518  printf("[DEBUG] node %d eve # %d utime %x ctime %x\n",
519  m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
520  }
521  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
522  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
523  sleep(1234567);
524  exit(-1);
525  }
526  }
527 #endif
528 
529  // Event # monitor in runchange
530 // if (m_prev_runsubrun_no != m_runsubrun_no) {
531 // printf("[DEBUG] ##############################################\n");
532 // for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
533 // printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
534 // }
535 // printf("[DEBUG] ##############################################\n");
536 // fflush(stderr);
537 // }
538  m_prev_evenum = cur_evenum;
539  m_prev_copper_ctr = cur_copper_ctr;
540  m_prev_exprunsubrun_no = m_exprunsubrun_no;
541  }
542  return;
543 }
544 
545 
546 
547 void DeSerializerPrePCModule::event()
548 {
549  // For data check
550 
551  unsigned int eve_copper_0 = 0;
552 
553  clearNumUsedBuf();
554 
555  if (m_start_flag == 0) {
556  //
557  // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
558  //
559  Connect();
560  if (g_status.isAvailable()) {
561  B2INFO("DeSerializerPrePC: Waiting for Start...\n");
562  g_status.reportRunning();
563  }
564  m_start_time = getTimeSec();
565  n_basf2evt = 0;
566  }
567 
568  // Make rawdatablk array
569 
570 
571  //
572  // Main loop
573  //
574  for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
575 
576  //
577  // Receive data from COPPER
578  //
579  eve_copper_0 = 0;
580  int delete_flag_from = 0, delete_flag_to = 0;
581  RawDataBlock temp_rawdatablk;
582  setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
583  // temp_rawdatablk.PrintData( temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords() );
584  checkData(&temp_rawdatablk, &eve_copper_0);
585  // PreRawCOPPERFormat_latest pre_rawcopper_latest;
586  // pre_rawcopper_latest.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
587  // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
589 
590 
591  int temp_num_events, temp_num_nodes;
592  int temp_nwords_to;
593  int* buf_to = NULL;
594 #ifdef REDUCED_RAWCOPPER
595  //
596  // Copy reduced buffer
597  //
598  // int* buf_to = getNewBuffer(m_pre_rawcpr.CalcReducedDataSize(&temp_rawdatablk), &delete_flag_to); // basf2-dependent style
599  int* temp_bufin = temp_rawdatablk.GetWholeBuffer();
600  int temp_nwords_from = temp_rawdatablk.TotalBufNwords();
601  temp_num_events = temp_rawdatablk.GetNumEvents();
602  temp_num_nodes = temp_rawdatablk.GetNumNodes();
603  int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
604  buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
605 
606  // m_pre_rawcpr.CopyReducedData(&temp_rawdatablk, buf_to, delete_flag_from); // basf2-dependent style
607  m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
608  if (calced_temp_nwords_to != temp_nwords_to) {
609  char err_buf[500];
610  sprintf(err_buf,
611  "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
612  calced_temp_nwords_to, temp_nwords_to);
613  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
614  exit(1);
615  }
616 
617 #else
618  buf_to = temp_rawdatablk.GetWholeBuffer();
619  temp_nwords_to = temp_rawdatablk.TotalBufNwords();
620  temp_num_events = temp_rawdatablk.GetNumEvents();
621  temp_num_nodes = temp_rawdatablk.GetNumNodes();
622  delete_flag_to = delete_flag_from;
623  delete_flag_from = 0; // to avoid double delete
624 #endif
625 
626 
627  //
628  // Set buffer to the RawData class stored in DataStore
629  //
630  RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
631 // raw_datablk->SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
632 // delete_flag_to, temp_rawdatablk.GetNumEvents(),
633 // temp_rawdatablk.GetNumNodes());
634  raw_datablk->SetBuffer(buf_to, temp_nwords_to, delete_flag_to,
635  temp_num_events, temp_num_nodes);
636 
637 
638  //
639  // CRC16 check after data reduction
640  //
641 #ifdef REDUCED_RAWCOPPER
642  PostRawCOPPERFormat_latest post_rawcopper_latest;
643  post_rawcopper_latest.SetBuffer(buf_to, temp_nwords_to,
644  0, temp_num_events, temp_num_nodes);
645 
646  for (int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
647  int block_num = 0;
648  if (post_rawcopper_latest.GetFINESSENwords(block_num, i_finesse_num) > 0) {
649  post_rawcopper_latest.CheckCRC16(block_num, i_finesse_num);
650  }
651  }
652 
653 #endif
654 
655  }
656 
657 
658  //
659  // Update EventMetaData
660  //
661  m_eventMetaDataPtr.create();
662  m_eventMetaDataPtr->setExperiment(1);
663  m_eventMetaDataPtr->setRun(1);
664  m_eventMetaDataPtr->setEvent(n_basf2evt);
665 
666 
667 
668 
669  //
670  // Run stop via NSM (Already obsolete. Need to ask Konno-san about implementation)
671  //
672  // if (m_shmflag != 0) {
673  // if (n_basf2evt % 10 == 0) {
674  // if (g_status.isStopped()) {
675  // printf("[DEBUG] [INFO] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n", max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
676  // m_eventMetaDataPtr->setEndOfData();
677  // }
678  // }
679  // }
680 
681  //
682  // Monitor
683  //
684  if (max_nevt >= 0 || max_seconds >= 0.) {
685  if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
686  || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
687  printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
688  max_nevt, max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
689  m_eventMetaDataPtr->setEndOfData();
690  }
691  }
692 
693  if (n_basf2evt % 20000 == 0 || n_basf2evt < 10) {
694  RateMonitor(eve_copper_0);
695  }
696 
697 
698 
699  if (g_status.isAvailable()) {
700  g_status.setInputNBytes(m_totbytes);
701  g_status.setInputCount(n_basf2evt);
702  }
703 
704  n_basf2evt++;
705  return;
706 }
A class definition of an input module for Sequential ROOT I/O.
Definition: DeSerializer.h:36
A class definition of an input module for Sequential ROOT I/O.
int CheckCRC16(int n, int finesse_num)
check magic words
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
Definition: RawCOPPER.h:52
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawCOPPER.cc:141
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
Definition: RawDataBlock.h:27
virtual int GetBufferPos(int n)
get position of data block in word
Definition: RawDataBlock.h:46
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:101
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
Definition: RawDataBlock.h:67
virtual int * GetBuffer(int n)
get nth buffer pointer
Definition: RawDataBlock.h:53
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawDataBlock.cc:35
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
Definition: RawDataBlock.h:74
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:108
virtual int GetBlockNwords(int n)
get size of a data block
Definition: RawDataBlock.h:94
virtual int GetNumEvents()
get # of events in m_buffer
Definition: RawDataBlock.h:81
virtual int TotalBufNwords()
Get total length of m_buffer.
Definition: RawDataBlock.h:39
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
Definition: RawDataBlock.h:60
The Raw FTSW class.
Definition: RawFTSW.h:30
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
Definition: RawFTSW.h:72
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Definition: RawFTSW.h:130
unsigned int GetEveNo(int n)
Get event #.
Definition: RawFTSW.h:65
unsigned int GetTTUtime(int n)
get unixtime of the trigger
Definition: RawFTSW.h:79
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawFTSW.cc:107
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
Definition: RawFTSW.h:121
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition: RawTLU.h:27
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition: RawTLU.h:81
unsigned int GetEveNo(int n)
Get Event #.
Definition: RawTLU.h:53
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:608
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:614
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
Definition: RawCOPPER.h:699
int GetFINESSENwords(int n, int finesse_num) OVERRIDE_CPP17
Get the size of a finesse buffer.
Abstract base class for different kinds of events.