Belle II Software  release-06-01-15
DeSerializerPC.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/rawdata/modules/DeSerializerPC.h>
10 #include <daq/dataobjects/SendHeader.h>
11 #include <daq/dataobjects/SendTrailer.h>
12 #include <rawdata/dataobjects/RawTLU.h>
13 
14 #include <netdb.h>
15 #include <netinet/tcp.h>
16 
17 //#define MAXEVTSIZE 400000000
18 #define CHECKEVT 5000
19 
20 
21 
22 #define USE_DESERIALIZER_PREPC
23 
24 //#define DEBUG
25 //#define NO_DATA_CHECK
26 //#define DUMHSLB
27 
28 using namespace std;
29 using namespace Belle2;
30 
31 //-----------------------------------------------------------------
32 // Register the Module
33 //-----------------------------------------------------------------
34 REG_MODULE(DeSerializerPC)
35 
36 //-----------------------------------------------------------------
37 // Implementation
38 //-----------------------------------------------------------------
39 
40 #ifndef REDUCED_RAWCOPPER
41 #ifdef USE_DESERIALIZER_PREPC
42 //compile error
43 #endif
44 #endif
45 
46 DeSerializerPCModule::DeSerializerPCModule() : DeSerializerModule()
47 {
48  //Set module properties
49  setDescription("Encode DataStore into RingBuffer");
50  // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
51  addParam("NumConn", m_num_connections, "Number of Connections", 0);
52  addParam("HostNameFrom", m_hostname_from, "Hostnames of data sources");
53  addParam("PortFrom", m_port_from, "port numbers of data sources");
54 
55 
56  B2INFO("DeSerializerPC: Constructor done.");
57 
58 
59 }
60 
61 
62 
63 DeSerializerPCModule::~DeSerializerPCModule()
64 {
65 
66 }
67 
68 
70 {
71  B2INFO("DeSerializerPC: initialize() started.");
72 
73  // Set m_socket
74  if (m_num_connections > (int)m_hostname_from.size() || m_num_connections > (int)m_port_from.size()) {
75  B2FATAL("[FATAL] Hostname or port# is not specified for all connections. Please check a python script. Exiting... \n");
76  exit(1);
77  }
78  for (int i = 0; i < m_num_connections; i++) {
79  m_socket.push_back(-1);
80  }
81 
82  // allocate buffer
83  for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
84  m_bufary[i] = new int[ BUF_SIZE_WORD ];
85  }
86  m_buffer = new int[ BUF_SIZE_WORD ];
87 
88 
89  // initialize buffer
90  for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
91  memset(m_bufary[i], 0, BUF_SIZE_WORD * sizeof(int));
92  }
93 
94  // Initialize EvtMetaData
95  m_eventMetaDataPtr.registerInDataStore();
96 
97  raw_datablkarray.registerInDataStore();
98  rawcprarray.registerInDataStore();
99  raw_ftswarray.registerInDataStore();
100 
101 
102  // Initialize Array of RawCOPPER
103 
104  if (m_dump_fname.size() > 0) {
105  openOutputFile();
106  }
107 
108  // Initialize arrays for time monitor
109  memset(time_array0, 0, sizeof(time_array0));
110  memset(time_array1, 0, sizeof(time_array1));
111  memset(time_array2, 0, sizeof(time_array2));
112 
113  clearNumUsedBuf();
114 
115  // Shared memory
116  if (m_shmflag > 0) {
117  if (m_nodename.size() == 0 || m_nodeid < 0) {
118  m_shmflag = 0;
119  } else {
121  g_status.reportReady();
122  }
123  }
124 
125  event_diff = 0;
126 
127  m_prev_copper_ctr = 0xFFFFFFFF;
128  m_prev_evenum = 0xFFFFFFFF;
129 
130 #ifdef NONSTOP
131  openRunPauseNshm();
132 #endif
133 
134  B2INFO("DeSerializerPC: initialize() done.");
135 }
136 
137 
138 int DeSerializerPCModule::recvFD(int sock, char* buf, int data_size_byte, int flag)
139 {
140  int n = 0;
141  int read_size = 0;
142  while (1) {
143  if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n , flag)) < 0) {
144  if (errno == EINTR) {
145  continue;
146  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
147  // No data received within SO_RCVTIMEO
148 #ifdef NONSTOP
149  string err_str;
150  callCheckRunPause(err_str);
151 #endif
152  continue;
153  } else {
154  char err_buf[500];
155  sprintf(err_buf, "[WARNING] recv() returned error; ret = %d. : %s %s %d",
156  read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
157 #ifdef NONSTOP
158  g_run_error = 1;
159  B2ERROR(err_buf);
160  string err_str = "RUN_ERROR";
161  throw (err_str);
162 #endif
163  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
164  exit(-1);
165  }
166  } else if (read_size == 0) {
167  char err_buf[500];
168  sprintf(err_buf, "[WARNING] Connection is closed by peer(%s).: %s %s %d",
169  strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
170 #ifdef NONSTOP
171  g_run_error = 1;
172  B2ERROR(err_buf);
173  string err_str = "RUN_ERROR";
174  throw (err_str);
175 #endif
176  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
177  exit(-1);
178  } else {
179  n += read_size;
180  if (n == data_size_byte)break;
181  }
182  }
183  return n;
184 }
185 
187 {
188  for (int i = 0; i < m_num_connections; i++) {
189  if (m_socket[ i ] >= 0) continue; // Already have an established socket
190  //
191  // Connect to a downstream node
192  //
193  struct sockaddr_in socPC;
194  socPC.sin_family = AF_INET;
195 
196  struct hostent* host;
197  host = gethostbyname(m_hostname_from[ i ].c_str());
198  if (host == NULL) {
199  char err_buf[100];
200  sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
201  strerror(errno));
202  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
203  sleep(1234567);
204  exit(1);
205  }
206  socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
207  socPC.sin_port = htons(m_port_from[ i ]);
208  int sd = socket(PF_INET, SOCK_STREAM, 0);
209 
210  int val1 = 0;
211  setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
212 
213  struct timeval timeout;
214  timeout.tv_sec = 1;
215  timeout.tv_usec = 0;
216  setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)sizeof(timeout));
217 
218  printf("[DEBUG] Connecting to %s port %d ...\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]);
219  while (1) {
220  if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
221  perror("Failed to connect. Retrying...");
222  usleep(500000);
223  } else {
224  printf("[DEBUG] Done\n");
225  break;
226  }
227  }
228 
229  // m_socket.push_back(sd);
230  m_socket[ i ] = sd;
231 
232  // check socket paramters
233  int val, len;
234  len = sizeof(val);
235  getsockopt(m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
236 #ifdef DEBUG
237  printf("[DEBUG] SO_RCVBUF %d\n", val);
238 #endif
239  getsockopt(m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
240 #ifdef DEBUG
241  printf("[DEBUG] SO_SNDBUF %d\n", val);
242 #endif
243  getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
244 #ifdef DEBUG
245  printf("[DEBUG] TCP_MAXSEG %d\n", val);
246 #endif
247  getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
248 #ifdef DEBUG
249  printf("[DEBUG] TCP_NODELAY %d\n", val);
250 #endif
251  if (g_status.isAvailable()) {
252  sockaddr_in sa;
253  memset(&sa, 0, sizeof(sockaddr_in));
254  socklen_t sa_len = sizeof(sa);
255  if (getsockname(m_socket[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
256  g_status.setInputPort(ntohs(sa.sin_port));
257  g_status.setInputAddress(sa.sin_addr.s_addr);
258  }
259  }
260 
261  }
262  printf("[DEBUG] Initialization finished\n");
263  return 0;
264 }
265 
266 
267 
268 int* DeSerializerPCModule::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock,
269  int* num_nodes_in_sendblock)
270 {
271 
272  int* temp_buf = NULL; // buffer for data-body
273  int flag = 0;
274 
275  vector <int> each_buf_nwords;
276  each_buf_nwords.clear();
277  vector <int> each_buf_nodes;
278  each_buf_nodes.clear();
279  vector <int> each_buf_events;
280  each_buf_events.clear();
281 
282  *total_buf_nwords = 0;
283  *num_nodes_in_sendblock = 0;
284  *num_events_in_sendblock = 0;
285 
286  //
287  // Read Header and obtain data size
288  //
289  int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
290  int temp_num_events = 0;
291  int temp_num_nodes = 0;
292 
293  // Read header
294  for (int i = 0; i < (int)(m_socket.size()); i++) {
295 
296  recvFD(m_socket[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
297 
298  SendHeader send_hdr;
299  send_hdr.SetBuffer(send_hdr_buf);
300 
301  temp_num_events = send_hdr.GetNumEventsinPacket();
302  temp_num_nodes = send_hdr.GetNumNodesinPacket();
303 
304 
305 
306  if (i == 0) {
307  *num_events_in_sendblock = temp_num_events;
308  } else if (*num_events_in_sendblock != temp_num_events) {
309 
310 #ifndef NO_DATA_CHECK
311  printf("[DEBUG] *******HDR**********\n");
312  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
313  char err_buf[500];
314  sprintf(err_buf,
315  "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
316  *num_events_in_sendblock , temp_num_events, i, *num_nodes_in_sendblock , temp_num_nodes, i);
317  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
318  sleep(1234567);
319  exit(1);
320 #endif
321  }
322 
323  *num_nodes_in_sendblock += temp_num_nodes;
324 
325  int rawblk_nwords = send_hdr.GetTotalNwords()
326  - SendHeader::SENDHDR_NWORDS
327  - SendTrailer::SENDTRL_NWORDS;
328  *total_buf_nwords += rawblk_nwords;
329 
330  //
331  // Data size check1
332  //
333  if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
334  printf("[DEBUG] *******HDR**********\n");
335  // printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
336  printData(send_hdr_buf, 100);
337  printASCIIData(send_hdr_buf, 100);
338  char err_buf[500];
339  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
340  send_hdr.GetTotalNwords());
341  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
342  sleep(123456);
343  exit(1);
344 
345  }
346 
347  each_buf_nwords.push_back(rawblk_nwords);
348  each_buf_events.push_back(temp_num_events);
349  each_buf_nodes.push_back(temp_num_nodes);
350 
351  }
352 
353 
354  temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
355  //
356  // Read body
357  //
358  int total_recvd_byte = 0;
359  for (int i = 0; i < (int)(m_socket.size()); i++) {
360 
361  try {
362  total_recvd_byte += recvFD(m_socket[ i ], (char*)temp_buf + total_recvd_byte,
363  each_buf_nwords[ i ] * sizeof(int), flag);
364  } catch (string err_str) {
365  if (*delete_flag) {
366  B2WARNING("Delete buffer before going to Run-pause state");
367  delete temp_buf;
368  }
369  throw (err_str);
370  }
371 
372  //
373  // Data length check
374  //
375  int temp_length = 0;
376  for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
377  int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
378  temp_length += this_length * sizeof(int);
379  }
380  if (temp_length != (int)(each_buf_nwords[ i ] * sizeof(int))) {
381  printf("[DEBUG]*******SENDHDR*********** \n");
382  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
383  printf("[DEBUG]*******BODY***********\n ");
384  printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
385  char err_buf[500];
386  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
387  (int)(*total_buf_nwords * sizeof(int)), temp_length);
388  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
389  sleep(1234567);
390  exit(-1);
391  }
392 
393  }
394 
395  if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
396  char err_buf[500];
397  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
398  total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
399  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
400  sleep(1234567);
401  exit(-1);
402  }
403 
404  // Read Traeiler
405  int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
406  for (int i = 0; i < (int)(m_socket.size()); i++) {
407  try {
408  recvFD(m_socket[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
409  } catch (string err_str) {
410  if (*delete_flag) {
411  B2WARNING("Delete buffer before going to Run-pause state");
412  delete temp_buf;
413  }
414  throw (err_str);
415  }
416  }
417 
418  return temp_buf;
419 }
420 
421 
422 void DeSerializerPCModule::setRecvdBuffer(RawDataBlock* temp_raw_datablk, int* delete_flag)
423 {
424  //
425  // Get data from socket
426  //
427  int total_buf_nwords = 0 ;
428  int num_events_in_sendblock = 0;
429  int num_nodes_in_sendblock = 0;
430 
431  if (m_start_flag == 0) B2INFO("DeSerializerPC: Reading the 1st packet from eb0...");
432 
433  int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
434  &num_nodes_in_sendblock);
435  if (m_start_flag == 0) {
436  B2INFO("DeSerializerPC: Done. the size of the 1st packet " << total_buf_nwords << " words");
437  m_start_flag = 1;
438  }
439  m_totbytes += total_buf_nwords * sizeof(int);
440 
441  // Fixed for glibc error at Jan. 2017, reported in "Re: data taking with the new firmware".
442  // for temp_raw_datablk, delete_flag should be 0. raw_datablk will take care of deleting buffer
443  // temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
444  temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, 0,
445 
446  num_events_in_sendblock, num_nodes_in_sendblock);
447 
448  //
449  // check even # and node # in one Sendblock
450  //
451  int num_entries = temp_raw_datablk->GetNumEntries();
452  if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
453  char err_buf[500];
454  sprintf(err_buf,
455  "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
456  num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
457  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
458  sleep(1234567);
459  exit(-1);
460  }
461  return;
462 
463 }
464 
465 
466 
467 void DeSerializerPCModule::checkData(RawDataBlock* raw_datablk, unsigned int* exp_copper_0, unsigned int* run_copper_0,
468  unsigned int* subrun_copper_0, unsigned int* eve_copper_0, unsigned int* error_bit_flag)
469 {
470 
471  // int data_size_copper_0 = -1;
472  // int data_size_copper_1 = -1;
473 
474  //
475  // Data check
476  //
477  int* temp_buf = raw_datablk->GetBuffer(0);
478  int cpr_num = 0;
479  unsigned int cur_evenum = 0, cur_copper_ctr = 0;
480  unsigned int eve_array[32]; // # of noeds is less than 17
481  unsigned int utime_array[32];// # of noeds is less than 17
482  unsigned int ctime_type_array[32];// # of noeds is less than 17
483 
484 #ifdef DUMHSLB
485  unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
486 #endif
487 
488  for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
489  memset(eve_array, 0, sizeof(eve_array));
490  memset(utime_array, 0, sizeof(utime_array));
491  memset(ctime_type_array, 0, sizeof(ctime_type_array));
492 
493  int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
494  for (int l = 0; l < num_nodes_in_sendblock; l++) {
495  int entry_id = l + k * num_nodes_in_sendblock;
496  //
497  // RawFTSW
498  //
499  if (raw_datablk->CheckFTSWID(entry_id)) {
500  RawFTSW* temp_rawftsw = new RawFTSW;
501  int block_id = 0;
502  temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
503  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
504  if (temp_rawftsw->GetEveNo(block_id) < 10) {
505  printf("[DEBUG] ######FTSW#########\n");
506  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
507  }
508 
509 #ifdef DUMHSLB
510  exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
511  ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
512  utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
513 #endif
514 
515 
516 #ifndef NO_DATA_CHECK
517  try {
518  temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
519  eve_array[ entry_id ] = cur_evenum;
520  } catch (string err_str) {
521  print_err.PrintError(m_shmflag, &g_status, err_str);
522  exit(1);
523  }
524 #endif
525  utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
526  ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
527  delete temp_rawftsw;
528 
529  //
530  // RawTLU
531  //
532  } else if (raw_datablk->CheckTLUID(entry_id)) {
533 
534  RawTLU* temp_rawtlu = new RawTLU;
535  temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
536  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
537  if (temp_rawtlu->GetEveNo(0) < 10
538  ) {
539  printf("[DEBUG] ######TLU#########\n");
540  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
541  }
542 
543 #ifndef NO_DATA_CHECK
544  try {
545  temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
546  eve_array[ entry_id ] = cur_evenum;
547  } catch (string err_str) {
548  print_err.PrintError(m_shmflag, &g_status, err_str);
549  exit(1);
550  }
551 #endif
552  delete temp_rawtlu;
553  } else {
554 
555  //
556  // RawCOPPER
557  //
558 
559 
560  RawCOPPER* temp_rawcopper = new RawCOPPER;
561  temp_rawcopper->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
562  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
563 
564 #ifdef DUMHSLB
565  int block_id = 0;
566  (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
567  (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
568  (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
569 #endif
570 
571 #ifndef NO_DATA_CHECK
572  try {
573 
574  temp_rawcopper->CheckData(0, m_prev_evenum, &cur_evenum,
575  m_prev_copper_ctr, &cur_copper_ctr,
577  eve_array[ entry_id ] = cur_evenum;
578  } catch (string err_str) {
579  temp_rawcopper->PrintData(temp_rawcopper->GetWholeBuffer(), temp_rawcopper->TotalBufNwords());
580  print_err.PrintError(m_shmflag, &g_status, err_str);
581  exit(1);
582  }
583 #endif
584 
585  utime_array[ entry_id ] = temp_rawcopper->GetTTUtime(0);
586  ctime_type_array[ entry_id ] = temp_rawcopper->GetTTCtimeTRGType(0);
587 
588  if (cpr_num == 0) {
589  // data_size_copper_0 = raw_datablk->GetBlockNwords(entry_id);
590  // *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
591  *eve_copper_0 = temp_rawcopper->GetEveNo(0);
592  *exp_copper_0 = temp_rawcopper->GetExpNo(0);
593  *run_copper_0 = temp_rawcopper->GetRunNo(0);
594  *subrun_copper_0 = temp_rawcopper->GetSubRunNo(0);
595  } else if (cpr_num == 1) {
596  // data_size_copper_1 = raw_datablk->GetBlockNwords(entry_id);
597  }
598  cpr_num++;
599 
600  // Check Error bit flag
601  *error_bit_flag |= temp_rawcopper->GetErrorBitFlag(0);
602 
603  delete temp_rawcopper;
604  }
605  }
606 
607 #ifndef NO_DATA_CHECK
608  // event #, ctime, utime over nodes
609  for (int l = 1; l < num_nodes_in_sendblock; l++) {
610  if (eve_array[ 0 ] != eve_array[ l ]) {
611 // if (eve_array[ 0 ] != eve_array[ l ] ||
612 // utime_array[ 0 ] != utime_array[ l ] ||
613 // ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
614  char err_buf[500];
615  for (int m = 0; m < num_nodes_in_sendblock; m++) {
616  printf("[DEBUG] node %d eve # %d utime %x ctime %x\n",
617  m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
618  }
619  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
620  print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
621  sleep(1234567);
622  exit(-1);
623  }
624  }
625 #endif
626 
627  // Event # monitor in runchange
629  printf("[DEBUG] ##############################################\n");
630  for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
631  printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
632  }
633  printf("[DEBUG] ##############################################\n");
634  fflush(stderr);
635  }
636  m_prev_evenum = cur_evenum;
637  m_prev_copper_ctr = cur_copper_ctr;
639  }
640  return;
641 }
642 
643 #ifdef NONSTOP
644 void DeSerializerPCModule::waitResume()
645 {
646  while (true) {
647 #ifdef NONSTOP_DEBUG
648  printf("\033[31m");
649  printf("###########(Ser) Waiting for Resume ###############\n");
650  fflush(stdout);
651  printf("\033[0m");
652 #endif
653  if (checkRunRecovery()) {
654  g_run_pause = 0;
655  for (int i = 0; i < m_num_connections; i++) {
656  if (CheckConnection(m_socket[ i ]) < 0) m_socket[ i ] = -1;
657  }
658  resumeRun();
659  break;
660  }
661  sleep(1);
662  }
663  return;
664 
665 }
666 #endif
667 
668 void DeSerializerPCModule::setErrorFlag(unsigned int error_flag, StoreObjPtr<EventMetaData> evtmetadata)
669 {
670  // RawHeader_latest raw_hdr;
671  int error_set = 0;
672  if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
673  evtmetadata->addErrorFlag(EventMetaData::c_B2LinkPacketCRCError);
674  error_set = 1;
675  }
676  if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
677  evtmetadata->addErrorFlag(EventMetaData::c_B2LinkEventCRCError);
678  error_set = 1;
679  }
680  if (error_set) B2INFO("Raw2Ds: Error flag was set in EventMetaData.");
681 }
682 
683 
685 {
686 
687  // For data check
688  unsigned int exp_copper_0 = 0;
689  unsigned int run_copper_0 = 0;
690  unsigned int subrun_copper_0 = 0;
691  unsigned int eve_copper_0 = 0;
692  unsigned int error_bit_flag = 0;
693 
694  clearNumUsedBuf();
695 
696  if (m_start_flag == 0) {
697  //
698  // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
699  //
700  Connect();
701  if (g_status.isAvailable()) {
702  B2INFO("DeSerializerPC: Waiting for Start...\n");
703  g_status.reportRunning();
704  }
705  m_start_time = getTimeSec();
706  n_basf2evt = 0;
707  }
708 
709 
710 #ifdef NONSTOP
711  if (g_run_pause > 0 || g_run_error > 0) {
712  if (g_run_pause == 0) {
713  while (true) {
714  if (checkRunPause()) break;
715 #ifdef NONSTOP_DEBUG
716  printf("\033[31m");
717  printf("###########(DeserializerPC) Waiting for Runpause() ###############\n");
718  fflush(stdout);
719  printf("\033[0m");
720 #endif
721  sleep(1);
722  }
723  }
724  waitResume();
725  m_eventMetaDataPtr.create(); // Otherwise endRun() is called.
726  return;
727  }
728 #endif
729 
730  // Make rawdatablk array
731 
732 
733  //
734  // Main loop
735  //
736  int* buf_rc = NULL;
737  for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
738  eve_copper_0 = 0;
739  //
740  // Set buffer to the RawData class stored in DataStore
741  //
742  int delete_flag = 0;
743  RawDataBlock temp_rawdatablk;
744  try {
745  setRecvdBuffer(&temp_rawdatablk, &delete_flag);
746  checkData(&temp_rawdatablk, &exp_copper_0, &run_copper_0, &subrun_copper_0, &eve_copper_0, &error_bit_flag);
747  } catch (string err_str) {
748 #ifdef NONSTOP
749  // Update EventMetaData otherwise basf2 stops.
750  if (err_str == "RUN_PAUSE" || err_str == "RUN_ERROR") {
751  m_eventMetaDataPtr.create();
752  return;
753  }
754 #endif
755  print_err.PrintError((char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
756  exit(1);
757  }
758 
759  RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
760  raw_datablk->SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(), delete_flag,
761  temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
762  buf_rc = temp_rawdatablk.GetWholeBuffer();
763  }
764  if (buf_rc != NULL) {
765  g_status.copyEventHeader(buf_rc);
766  }
767 
768  //
769  // Update EventMetaData
770  //
771  m_eventMetaDataPtr.create();
772  m_eventMetaDataPtr->setExperiment(exp_copper_0);
773  m_eventMetaDataPtr->setRun(run_copper_0);
774  m_eventMetaDataPtr->setSubrun(subrun_copper_0);
775  m_eventMetaDataPtr->setEvent(eve_copper_0);
776 
777  setErrorFlag(error_bit_flag, m_eventMetaDataPtr);
778  if (error_bit_flag != 0) {
779  m_eventMetaDataPtr->addErrorFlag(EventMetaData::c_B2LinkEventCRCError);
780  printf("[ERROR] error bit was detected. exp %d run %d eve %d count = %d\n",
781  exp_copper_0, run_copper_0, eve_copper_0, error_bit_flag);
782  }
783 
784 
785  //
786  // Run stop via NSM (Already obsolete. Need to ask Konno-san about implementation)
787  //
788  // if (m_shmflag != 0) {
789  // if (n_basf2evt % 10 == 0) {
790  // if (g_status.isStopped()) {
791  // printf("[DEBUG] [INFO] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n", max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
792  // m_eventMetaDataPtr->setEndOfData();
793  // }
794  // }
795  // }
796 
797  //
798  // Monitor
799  //
800  if (max_nevt >= 0 || max_seconds >= 0.) {
801  if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
802  || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
803  printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
804  max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
805  m_eventMetaDataPtr->setEndOfData();
806  }
807  }
808 
809  if (n_basf2evt % 20000 == 0 || n_basf2evt < 10) {
810  RateMonitor(eve_copper_0, subrun_copper_0, run_copper_0);
811  }
812  n_basf2evt++;
813  if (g_status.isAvailable()) {
814  g_status.setInputNBytes(m_totbytes);
815  g_status.setInputCount(n_basf2evt);
816  }
817 
818  return;
819 }
820 
821 
A class definition of an input module for Sequential ROOT I/O.
Definition: DeSerializer.h:36
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
Definition: DeSerializer.h:173
std::string m_nodename
Node name.
Definition: DeSerializer.h:161
int BUF_SIZE_WORD
size of buffer for one event (word)
Definition: DeSerializer.h:83
static RunInfoBuffer g_status
buffer class to communicate with NSM client
Definition: DeSerializer.h:200
int n_basf2evt
No. of sent events.
Definition: DeSerializer.h:86
CprErrorMessage print_err
wrapper for B2LOG system
Definition: DeSerializer.h:213
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
Definition: DeSerializer.h:92
int m_shmflag
Use shared memory.
Definition: DeSerializer.h:185
int * m_bufary[NUM_PREALLOC_BUF]
buffer
Definition: DeSerializer.h:101
unsigned int m_exprunsubrun_no
run no.
Definition: DeSerializer.h:170
virtual void printASCIIData(int *buf, int nwords)
dump error data
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
Definition: DeSerializer.h:77
int m_nodeid
Node(PC or COPPER) ID.
Definition: DeSerializer.h:158
double getTimeSec()
store time info.
std::vector< int > m_socket
Reciever Socket.
virtual void event()
Module functions to be called from event process.
virtual int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
virtual void initialize()
Module functions to be called from main process.
std::vector< int > m_port_from
port # to connect data sources
virtual int Connect()
Accept connection.
virtual int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
virtual void setRecvdBuffer(RawDataBlock *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
virtual void checkData(RawDataBlock *raw_datablk, unsigned int *exp_copper_0, unsigned int *run_copper_0, unsigned int *subrun_copper_0, unsigned int *eve_copper_0, unsigned int *error_bit_flag)
check data contents
std::vector< std::string > m_hostname_from
hostname of upstream Data Sources
@ c_B2LinkPacketCRCError
Belle2link CRC error is detected in the event.
Definition: EventMetaData.h:44
@ c_B2LinkEventCRCError
HSLB_COPPER CRC error is detected in the event.
Definition: EventMetaData.h:45
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
Definition: RawCOPPER.h:52
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawCOPPER.cc:141
The RawDataBlock class Base class for rawdata handling.
Definition: RawDataBlock.h:27
virtual void PrintData(int *buf, int nwords)
print data
Definition: RawDataBlock.h:122
virtual int GetBufferPos(int n)
get position of data block in word
Definition: RawDataBlock.h:46
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:101
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
Definition: RawDataBlock.h:67
virtual int * GetBuffer(int n)
get nth buffer pointer
Definition: RawDataBlock.h:53
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawDataBlock.cc:35
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
Definition: RawDataBlock.h:74
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:108
virtual int GetBlockNwords(int n)
get size of a data block
Definition: RawDataBlock.h:94
virtual int GetNumEvents()
get # of events in m_buffer
Definition: RawDataBlock.h:81
virtual int TotalBufNwords()
Get total length of m_buffer.
Definition: RawDataBlock.h:39
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
Definition: RawDataBlock.h:60
The Raw FTSW class.
Definition: RawFTSW.h:30
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
Definition: RawFTSW.h:72
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Definition: RawFTSW.h:130
unsigned int GetEveNo(int n)
Get event #.
Definition: RawFTSW.h:65
unsigned int GetTTUtime(int n)
get unixtime of the trigger
Definition: RawFTSW.h:79
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawFTSW.cc:107
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
Definition: RawFTSW.h:121
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition: RawTLU.h:27
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition: RawTLU.h:81
unsigned int GetEveNo(int n)
Get Event #.
Definition: RawTLU.h:53
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:95
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:605
int GetExpNo(int n)
get Experimental # from header
Definition: RawCOPPER.h:362
unsigned int GetEveNo(int n)
get subrun #(8bit)
Definition: RawCOPPER.h:387
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:611
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
Definition: RawCOPPER.h:696
int GetSubRunNo(int n)
get run # (14bit)
Definition: RawCOPPER.h:381
unsigned int GetErrorBitFlag(int n)
get contents of header
Definition: RawCOPPER.h:413
int GetRunNo(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Definition: RawCOPPER.h:374
Abstract base class for different kinds of events.