Belle II Software  release-06-01-15
DesSerPrePC.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/rawdata/DesSerPrePC.h>
9 #include <rawdata/dataobjects/RawFTSWFormat_latest.h>
10 #include <rawdata/dataobjects/RawTLUFormat.h>
11 
12 #include <arpa/inet.h>
13 #include <netdb.h>
14 #include <netinet/tcp.h>
15 
16 #include <sys/socket.h>
17 
18 //#define NO_DATA_CHECK
19 //#define DUMHSLB
20 
21 using namespace std;
22 using namespace Belle2;
23 
24 //----------------------------------------------------------------
25 // Implementation
26 //----------------------------------------------------------------
27 DesSerPrePC::DesSerPrePC(string host_recv, int port_recv, string host_send, int port_send, int shmflag,
28  const std::string& nodename, int nodeid)
29 {
30 
31  for (int i = 0 ; i < m_num_connections; i++) {
32  // m_hostname_from.push_back( "localhost");
33  m_hostname_from.push_back(host_recv);
34  // m_port_from.push_back(30000);
35  m_port_from.push_back(port_recv) ;
36  m_socket_recv.push_back(-1);
37  }
38 
39  // m_port_to = 31001;
40  m_port_to = port_send;
41  // m_hostname_local = "localhost";
42  m_hostname_local = host_send;
43  m_nodename = nodename;
44 
45  m_shmflag = shmflag;
46 
47  // B2INFO("DeSerializerPrePC: Constructor done.");
48  printf("[INFO] DeSerializerPrePC: Constructor done.\n"); fflush(stdout);
49 }
50 
51 
52 
53 DesSerPrePC::~DesSerPrePC()
54 {
55 }
56 
57 
58 
59 int DesSerPrePC::recvFD(int sock, char* buf, int data_size_byte, int flag)
60 {
61  int n = 0;
62  int read_size = 0;
63  while (1) {
64  if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n , flag)) < 0) {
65  if (errno == EINTR) {
66  continue;
67  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
68 #ifdef NONSTOP
69  string err_str;
70  callCheckRunPause(err_str);
71 #endif
72  continue;
73  } else {
74  perror("[WARNING]");
75  char err_buf[500];
76  sprintf(err_buf, "recv() returned error; ret = %d. : %s %s %d",
77  read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
78 #ifdef NONSTOP
79  m_run_error = 1;
80  // B2ERROR(err_buf);
81  printf("[WARNING] %s\n", err_buf); fflush(stdout);
82  string err_str = "RUN_ERROR";
83  printf("AIUEO********************\n"); fflush(stdout);
84  throw (err_str);
85 #endif
86  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
87  exit(-1);
88  }
89  } else if (read_size == 0) {
90  // Connection is closed ( error )
91  char err_buf[500];
92  sprintf(err_buf, "[WARNING] Connection is closed by peer(%s). readsize = %d %d. : %s %s %d",
93  strerror(errno), read_size, errno, __FILE__, __PRETTY_FUNCTION__, __LINE__);
94 #ifdef NONSTOP
95  m_run_error = 1;
96  // B2ERROR(err_buf);
97  printf("%s\n", err_buf); fflush(stdout);
98  string err_str = "RUN_ERROR";
99  throw (err_str);
100 #else
101  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
102  exit(-1);
103 #endif
104  } else {
105  n += read_size;
106  if (n == data_size_byte)break;
107  }
108  }
109  return n;
110 }
111 
112 
113 int DesSerPrePC::Connect()
114 {
115 
116  for (int i = 0; i < m_num_connections; i++) {
117 
118  if (m_socket_recv[ i ] >= 0) continue; // Already have an established socket
119 
120  //
121  // Connect to a downstream node
122  //
123  struct hostent* host;
124  host = gethostbyname(m_hostname_from[ i ].c_str());
125  if (host == NULL) {
126  char err_buf[100];
127  sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
128  strerror(errno));
129  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
130  sleep(1234567);
131  exit(1);
132  }
133 
134  struct sockaddr_in socPC;
135  socPC.sin_family = AF_INET;
136  socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
137  socPC.sin_port = htons(m_port_from[ i ]);
138  int sd = socket(PF_INET, SOCK_STREAM, 0);
139  int val1 = 0;
140  setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
141 
142  struct timeval timeout;
143  timeout.tv_sec = 1;
144  timeout.tv_usec = 0;
145  setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)sizeof(timeout));
146 
147  printf("[DEBUG] Connecting to %s port %d\n" , m_hostname_from[ i ].c_str(), m_port_from[ i ]); fflush(stdout);
148 
149  while (1) {
150  if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
151  perror("Failed to connect. Retrying...");
152  usleep(500000);
153  } else {
154  // B2INFO("Done");
155  printf("[DEBUG] Done\n"); fflush(stdout);
156  break;
157  }
158  }
159  // m_socket_recv.push_back(sd);
160  m_socket_recv[ i ] = sd;
161 
162  // check socket paramters
163  int val, len;
164  len = sizeof(val);
165  getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
166  // B2INFO("SO_RCVBUF" << val);
167  getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
168  // B2DEBUG("SO_SNDBUF" << val);
169  getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
170  // B2DEBUG("TCP_MAXSEG" << val);
171  getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
172  // B2DEBUG("TCP_NODELAY" << val);
173 
174  if (m_status.isAvailable()) {
175  sockaddr_in sa;
176  memset(&sa, 0, sizeof(sockaddr_in));
177  socklen_t sa_len = sizeof(sa);
178  if (getsockname(m_socket_recv[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
179  m_status.setInputPort(ntohs(sa.sin_port));
180  m_status.setInputAddress(sa.sin_addr.s_addr);
181  }
182  }
183 
184  }
185  // B2INFO("[DEBUG] Initialization finished");
186  printf("[DEBUG] Initialization finished\n"); fflush(stdout);
187  return 0;
188 }
189 
190 
191 
192 int* DesSerPrePC::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock, int* num_nodes_in_sendblock)
193 {
194  int* temp_buf = NULL; // buffer for data-body
195  int flag = 0;
196 
197  vector <int> each_buf_nwords;
198  each_buf_nwords.clear();
199  vector <int> each_buf_nodes;
200  each_buf_nodes.clear();
201  vector <int> each_buf_events;
202  each_buf_events.clear();
203 
204  *total_buf_nwords = 0;
205  *num_nodes_in_sendblock = 0;
206  *num_events_in_sendblock = 0;
207 
208  //
209  // Read Header and obtain data size
210  //
211  int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
212  int temp_num_events = 0;
213  int temp_num_nodes = 0;
214 
215  // Read header
216  for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
217 
218  recvFD(m_socket_recv[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
219 
220  SendHeader send_hdr;
221  send_hdr.SetBuffer(send_hdr_buf);
222 
223  temp_num_events = send_hdr.GetNumEventsinPacket();
224  temp_num_nodes = send_hdr.GetNumNodesinPacket();
225 
226  if (i == 0) {
227  *num_events_in_sendblock = temp_num_events;
228  } else if (*num_events_in_sendblock != temp_num_events) {
229 #ifndef NO_DATA_CHECK
230  char err_buf[500];
231  sprintf(err_buf,
232  "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
233  *num_events_in_sendblock , temp_num_events, i, *num_nodes_in_sendblock , temp_num_nodes, i);
234  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
235  sleep(1234567);
236  exit(1);
237 #endif
238  }
239 
240  *num_nodes_in_sendblock += temp_num_nodes;
241 
242  int rawblk_nwords = send_hdr.GetTotalNwords()
243  - SendHeader::SENDHDR_NWORDS
244  - SendTrailer::SENDTRL_NWORDS;
245  *total_buf_nwords += rawblk_nwords;
246 
247  //
248  // Data size check1
249  //
250  if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
251  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
252  char err_buf[500];
253  sprintf(err_buf, "CORRUPTED DATA: Too large event : Header %d %d %d %d :block size %d words\n", i, temp_num_events, temp_num_nodes,
254  send_hdr.GetTotalNwords(), rawblk_nwords);
255  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
256  sleep(123456);
257  exit(1);
258 
259  }
260 
261  each_buf_nwords.push_back(rawblk_nwords);
262  each_buf_events.push_back(temp_num_events);
263  each_buf_nodes.push_back(temp_num_nodes);
264 
265  }
266 
267 
268  temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
269  //
270  // Read body
271  //
272  int total_recvd_byte = 0;
273  for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
274 
275  try {
276  total_recvd_byte += recvFD(m_socket_recv[ i ], (char*)temp_buf + total_recvd_byte,
277  each_buf_nwords[ i ] * sizeof(int), flag);
278  } catch (string err_str) {
279  if (*delete_flag) {
280  // B2WARNING("Delete buffer before going to Run-pause state");
281  printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
282  delete temp_buf;
283  }
284  throw (err_str);
285  }
286  //
287  // Data length check
288  //
289  int temp_length = 0;
290  for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
291  int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
292  temp_length += this_length * sizeof(int);
293  }
294  if (temp_length != (int)(each_buf_nwords[ i ] * sizeof(int))) {
295  printf("[DEBUG]*******SENDHDR*********** \n");
296  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
297  printf("[DEBUG]*******BODY***********\n ");
298  printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
299  char err_buf[500];
300  sprintf(err_buf, "CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
301  (int)(*total_buf_nwords * sizeof(int)), temp_length);
302  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
303  sleep(1234567);
304  exit(-1);
305  }
306 
307  }
308 
309  if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
310  char err_buf[500];
311  sprintf(err_buf, "CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
312  total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
313  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
314  sleep(1234567);
315  exit(-1);
316  }
317 
318  // Read Traeiler
319  int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
320  for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
321  try {
322  recvFD(m_socket_recv[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
323  } catch (string err_str) {
324  if (*delete_flag) {
325  // B2WARNING("Delete buffer before going to Run-pause state");
326  printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
327  delete temp_buf;
328  }
329  throw (err_str);
330  }
331  }
332 
333  return temp_buf;
334 }
335 
336 
337 void DesSerPrePC::setRecvdBuffer(RawDataBlockFormat* temp_raw_datablk, int* delete_flag)
338 {
339  //
340  // Get data from socket
341  //
342  int total_buf_nwords = 0 ;
343  int num_events_in_sendblock = 0;
344  int num_nodes_in_sendblock = 0;
345 
346  if (m_start_flag == 0) {
347  // B2INFO("DeSerializerPrePC: Reading the 1st packet from eb0...");
348  printf("DeSerializerPrePC: Reading the 1st packet from eb0...\n"); fflush(stdout);
349  }
350  int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
351  &num_nodes_in_sendblock);
352  if (m_start_flag == 0) {
353  // B2INFO("DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords << " words");
354  printf("DeSerializerPrePC: Done. the size of the 1st packet %d words\n", total_buf_nwords); fflush(stdout);
355  m_start_flag = 1;
356  }
357  m_recvd_totbytes += total_buf_nwords * sizeof(int);
358 
359  temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
360  num_events_in_sendblock, num_nodes_in_sendblock);
361 
362  //
363  // check even # and node # in one Sendblock
364  //
365  int num_entries = temp_raw_datablk->GetNumEntries();
366  if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
367  char err_buf[500];
368  sprintf(err_buf,
369  "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
370  num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
371  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
372  sleep(1234567);
373  exit(-1);
374  }
375  return;
376 
377 }
378 
379 
380 
381 
382 void DesSerPrePC::checkData(RawDataBlockFormat* raw_datablk, unsigned int* eve_copper_0)
383 {
384  // int data_size_copper_0 = -1;
385  // int data_size_copper_1 = -1;
386 
387  //
388  // Data check
389  //
390  int* temp_buf = raw_datablk->GetBuffer(0);
391  int cpr_num = 0;
392  unsigned int cur_evenum = 0, cur_copper_ctr = 0;
393  unsigned int eve_array[32]; // # of noeds is less than 17
394  unsigned int utime_array[32];// # of noeds is less than 17
395  unsigned int ctime_type_array[32];// # of noeds is less than 17
396 
397 #ifdef DUMHSLB
398  unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
399 #endif
400 
401  for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
402  memset(eve_array, 0, sizeof(eve_array));
403  memset(utime_array, 0, sizeof(utime_array));
404  memset(ctime_type_array, 0, sizeof(ctime_type_array));
405 
406  int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
407  for (int l = 0; l < num_nodes_in_sendblock; l++) {
408  int entry_id = l + k * num_nodes_in_sendblock;
409 
410  //
411  // RawFTSW
412  //
413  if (raw_datablk->CheckFTSWID(entry_id)) {
414  RawFTSWFormat_latest* temp_rawftsw = new RawFTSWFormat_latest;
415  int block_id = 0;
416  temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
417  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
418  if (temp_rawftsw->GetEveNo(block_id) < 10) {
419  printf("[DEBUG] ######FTSW#########\n");
420  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
421  }
422 
423 #ifdef DUMHSLB
424  exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
425  ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
426  utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
427 #endif
428 
429 
430 #ifndef NO_DATA_CHECK
431  try {
432  temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
433  eve_array[ entry_id ] = cur_evenum;
434  } catch (string err_str) {
435  char err_buf[500];
436  strcpy(err_buf, err_str.c_str());
437  print_err.PrintError(err_buf , __FILE__, __PRETTY_FUNCTION__, __LINE__);
438  exit(1);
439  }
440 #endif
441  utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
442  ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
443  delete temp_rawftsw;
444 
445  //
446  // RawTLU
447  //
448  } else if (raw_datablk->CheckTLUID(entry_id)) {
449 
450  RawTLUFormat* temp_rawtlu = new RawTLUFormat;
451  temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
452  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
453  if (temp_rawtlu->GetEveNo(0) < 10
454  ) {
455  printf("[DEBUG] ######TLU#########\n");
456  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
457  }
458 
459 #ifndef NO_DATA_CHECK
460  try {
461  temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
462  eve_array[ entry_id ] = cur_evenum;
463  } catch (string err_str) {
464  char err_buf[500];
465  strcpy(err_buf, err_str.c_str());
466  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
467  exit(1);
468  }
469 #endif
470  delete temp_rawtlu;
471  } else {
472  //
473  // RawCOPPER
474  //
475  PreRawCOPPERFormat_v2* pre_rawcpr_fmt = new PreRawCOPPERFormat_v2;
476  pre_rawcpr_fmt->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
477  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
478 
479 #ifdef DUMHSLB
480  int block_id = 0;
481  "do not use the following for actual DAQ"
482  (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_EXP_RUN_NO ] = exp_run_ftsw;
483  (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
484  (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_TTUTIME ] = utime_ftsw;
485 #endif
486 
487 #ifndef NO_DATA_CHECK
488  try {
489  pre_rawcpr_fmt->CheckData(0, m_prev_evenum, &cur_evenum,
490  m_prev_copper_ctr, &cur_copper_ctr,
491  m_prev_exprunsubrun_no, &m_exprunsubrun_no);
492  eve_array[ entry_id ] = cur_evenum;
493  } catch (string err_str) {
494  exit(1); // Error in the contents of an event was detected
495  }
496 #endif
497 
498  utime_array[ entry_id ] = pre_rawcpr_fmt->GetTTUtime(0);
499  ctime_type_array[ entry_id ] = pre_rawcpr_fmt->GetTTCtimeTRGType(0);
500 
501  if (cpr_num == 0) {
502  // data_size_copper_0 = raw_datablk->GetBlockNwords(entry_id);
503  *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
504  } else if (cpr_num == 1) {
505  // data_size_copper_1 = raw_datablk->GetBlockNwords(entry_id);
506  }
507  cpr_num++;
508  delete pre_rawcpr_fmt;
509  }
510  }
511 
512 #ifndef NO_DATA_CHECK
513  // event #, ctime, utime over nodes
514  for (int l = 1; l < num_nodes_in_sendblock; l++) {
515  if (eve_array[ 0 ] != eve_array[ l ] ||
516  utime_array[ 0 ] != utime_array[ l ] ||
517  ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
518  char err_buf[500];
519  for (int m = 0; m < num_nodes_in_sendblock; m++) {
520  printf("[DEBUG] node %d eve # %d utime %x ctime %x\n",
521  m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
522  }
523  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
524  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
525  sleep(1234567);
526  exit(-1);
527  }
528  }
529 #endif
530 
531  // Event # monitor in runchange
532 // if (m_prev_runsubrun_no != m_runsubrun_no) {
533 // printf("[DEBUG] ##############################################\n");
534 // for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
535 // printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
536 // }
537 // printf("[DEBUG] ##############################################\n");
538 // fflush(stderr);
539 // }
540  m_prev_evenum = cur_evenum;
541  m_prev_copper_ctr = cur_copper_ctr;
542  m_prev_exprunsubrun_no = m_exprunsubrun_no;
543  }
544  return;
545 }
546 
547 
548 
549 void DesSerPrePC::DataAcquisition()
550 {
551  // For data check
552  unsigned int eve_copper_0 = 0;
553  // B2INFO("initializing...");
554  printf("[DEBUG] initializing...\n"); fflush(stdout);
555  initialize();
556 
557  // B2INFO("Done.");
558  printf("[DEBUG] Done.\n"); fflush(stdout);
559 
560  if (m_start_flag == 0) {
561  //
562  // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
563  //
564  Connect();
565  if (m_status.isAvailable()) {
566  // B2INFO("DeSerializerPrePC: Waiting for Start...\n");
567  printf("[DEBUG] DeSerializerPrePC: Waiting for Start...\n"); fflush(stdout);
568  m_status.reportRunning();
569  }
570  m_start_time = getTimeSec();
571  n_basf2evt = 0;
572  }
573 
574  //
575  // Main loop
576  //
577  while (1) {
578  //
579  // Stand-by loop
580  //
581 #ifdef NONSTOP
582  if (m_run_pause > 0 || m_run_error > 0) {
583  waitResume();
584  }
585 #endif
586 
587  clearNumUsedBuf();
588  // RawDataBlock raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
589  RawDataBlockFormat raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
590 
591  //
592  // Recv loop
593  //
594  for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
595  //
596  // Receive data from COPPER
597  //
598  eve_copper_0 = 0;
599  int delete_flag_from =
600  0; // Delete flag for temp_rawdatablk.It can be set to 1 by setRecvdBuffer if the buffer size is larger than that of pre-allocated buffer.
601  int delete_flag_to =
602  0; // Delete flag for raw_datablk[i]. It can be set to 1 by getNewBuffer if the buffer size is larger than that of pre-allocated buffer.
603  RawDataBlockFormat temp_rawdatablk;
604  try {
605  setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
606  checkData(&temp_rawdatablk, &eve_copper_0);
607  } catch (string err_str) {
608  printf("Error was detected\n"); fflush(stdout);
609  break;
610  }
611  // PreRawCOPPERFormat_v2 pre_rawcopper_v2;
612  // pre_rawcopper_v2.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
613  // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
615 
616  int temp_num_events, temp_num_nodes;
617  int temp_nwords_to;
618  int* buf_to = NULL;
619 #ifdef REDUCED_RAWCOPPER
620  //
621  // Copy reduced buffer
622  //
623  // int* buf_to = getNewBuffer(m_pre_rawcpr.CalcReducedDataSize(&temp_rawdatablk), &delete_flag_to); // basf2-dependent style
624  int* temp_bufin = temp_rawdatablk.GetWholeBuffer();
625  int temp_nwords_from = temp_rawdatablk.TotalBufNwords();
626  temp_num_events = temp_rawdatablk.GetNumEvents();
627  temp_num_nodes = temp_rawdatablk.GetNumNodes();
628  int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
629  buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
630 
631  // m_pre_rawcpr.CopyReducedData(&temp_rawdatablk, buf_to, delete_flag_from); // basf2-dependent style
632  m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
633  if (calced_temp_nwords_to != temp_nwords_to) {
634  char err_buf[500];
635  sprintf(err_buf,
636  "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
637  calced_temp_nwords_to, temp_nwords_to);
638  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
639  exit(1);
640  }
641  m_status.copyEventHeader(buf_to);
642 #else
643  buf_to = temp_rawdatablk.GetWholeBuffer();
644  temp_nwords_to = temp_rawdatablk.TotalBufNwords();
645  temp_num_events = temp_rawdatablk.GetNumEvents();
646  temp_num_nodes = temp_rawdatablk.GetNumNodes();
647  delete_flag_to = delete_flag_from;
648  delete_flag_from = 0; // to avoid double delete
649 #endif
650 
651  //
652  // Set buffer to the RawData class stored in DataStore
653  //
654 // raw_datablk[ j ].SetBuffer( (int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
655 // delete_flag_to, temp_rawdatablk.GetNumEvents(),
656 // temp_rawdatablk.GetNumNodes());
657  raw_datablk[ j ].SetBuffer(buf_to, temp_nwords_to, delete_flag_to, temp_num_events, temp_num_nodes);
658 
659 
660  //
661  // CRC16 check after data reduction
662  //
663 #ifdef REDUCED_RAWCOPPER
664  PostRawCOPPERFormat_v2 post_rawcopper_v2; // Should be the latest version before ver.4(PCIe40)
665 
666 // post_rawcopper_v2.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
667 // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
668  post_rawcopper_v2.SetBuffer(raw_datablk[ j ].GetWholeBuffer(), raw_datablk[ j ].TotalBufNwords(),
669  0, raw_datablk[ j ].GetNumEvents(), raw_datablk[ j ].GetNumNodes());
670 
671  for (int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
672  int block_num = 0;
673  if (post_rawcopper_v2.GetFINESSENwords(block_num, i_finesse_num) > 0) {
674  post_rawcopper_v2.CheckCRC16(block_num, i_finesse_num);
675  }
676  }
677 
678 #endif
679  }
680 
681 #ifdef NONSTOP
682  // Goto Stand-by loop when run is paused or stopped by error
683  if (m_run_pause != 0 || m_run_error != 0) continue;
684 #endif
685 
686 
688  // From Serializer.cc
690  if (m_start_flag == 0) {
691  m_start_time = getTimeSec();
692  n_basf2evt = 0;
693  }
694  // StoreArray<RawCOPPER> rawcprarray;
695  // StoreArray<RawDataBlock> raw_dblkarray;
696 
697  for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
698 
699  //
700  // Send data
701  //
702  if (m_start_flag == 0) {
703  // B2INFO("SerializerPC: Sending the 1st packet...");
704  printf("[DEBUG] SerializerPC: Sending the 1st packet...\n"); fflush(stdout);
705  }
706 
707  try {
708  m_sent_totbytes += sendByWriteV(&(raw_datablk[ j ]));
709  } catch (string err_str) {
710 #ifdef NONSTOP
711  break;
712 #endif
713  print_err.PrintError((char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
714  exit(1);
715  }
716  if (m_start_flag == 0) {
717  // B2INFO("Done. ");
718  printf("[DEBUG] Done.\n"); fflush(stdout);
719  m_start_flag = 1;
720  }
721  }
722 
723 #ifdef NONSTOP
724  // Goto Stand-by loop when run is paused or stopped by error
725  if (m_run_pause != 0 || m_run_error != 0) continue;
726 #endif
727 
728  //
729  // Monitor
730  //
731  if (max_nevt >= 0 || max_seconds >= 0.) {
732 #ifdef AIUEO
733  if (n_basf2evt % 10000 == 0) {
734 // if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
735 // || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
736  printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
737  max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
738  }
739 #endif
740  }
741 
742  if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC) % 100000 == 0) {
743  double interval = cur_time - m_prev_time;
744  double total_time = cur_time - m_start_time;
745  printf("[DEBUG] Event %12d Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] sent %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s]\n",
746  n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC,
747  (n_basf2evt - m_prev_nevt)*NUM_EVT_PER_BASF2LOOP_PC / interval / 1.e3,
748  (m_recvd_totbytes - m_recvd_prev_totbytes) / interval / 1.e6,
749  (m_sent_totbytes - m_sent_prev_totbytes) / interval / 1.e6,
750  total_time,
751  interval);
752  fflush(stdout);
753 
754  m_prev_time = cur_time;
755  m_recvd_prev_totbytes = m_recvd_totbytes;
756  m_sent_prev_totbytes = m_sent_totbytes;
757  m_prev_nevt = n_basf2evt;
758  cur_time = getTimeSec();
759  }
760 
761  n_basf2evt++;
762 
763  if (m_status.isAvailable()) {
764  m_status.setOutputNBytes(m_sent_totbytes);
765  m_status.setOutputCount(n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC);
766  }
767 
768  }
769 
770  return;
771 }
772 
774 // From Serializer.cc
776 
777 
778 #ifdef NONSTOP
779 void DesSerPrePC::waitResume()
780 {
781  if (m_run_pause == 0) {
782  while (true) {
783  if (checkRunPause()) break;
784 #ifdef NONSTOP_DEBUG
785  printf("\033[31m");
786  printf("###########(DesSerPrePC) Waiting for Runstop() ###############\n");
787  fflush(stdout);
788  printf("\033[0m");
789 #endif
790  sleep(1);
791  }
792  }
793 
794  while (true) {
795 #ifdef NONSTOP_DEBUG
796  printf("\033[31m");
797  printf("###########(Ser) Waiting for Resume ###############\n");
798  fflush(stdout);
799  printf("\033[0m");
800 #endif
801  if (checkRunRecovery()) {
802  m_run_pause = 0;
803  break;
804  }
805  sleep(1);
806  }
807 
808 
809  printf("Done!\n"); fflush(stdout);
810 
811  printf("Checking connection to eb0\n"); fflush(stdout);
812  if (CheckConnection(m_socket_send) < 0) {
813  printf("Trying Accept1\n"); fflush(stdout);
814  Accept();
815  printf("Trying Accept2\n"); fflush(stdout);
816  }
817 
818  printf("Checking connection to COPPER\n"); fflush(stdout);
819  for (int i = 0; i < m_num_connections; i++) {
820  if (CheckConnection(m_socket_recv[ i ]) < 0) m_socket_recv[ i ] = -1;
821  }
822  Connect();
823 
824  resumeRun();
825  return;
826 }
827 
828 
829 
830 #endif
831 
The Raw COPPER class ver.2 This class stores data received by COPPER via belle2link Data from all det...
int CheckCRC16(int n, int finesse_num)
check magic words
int GetFINESSENwords(int n, int finesse) OVERRIDE_CPP17
get data size of FINESSE buffer
The Raw COPPER class ver.1 ( the latest version since May, 2014 ) This class stores data received by ...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check data contents
The RawDataBlockFormat class Format information for rawdata handling.
virtual int GetBlockNwords(int n)
get size of a data block
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBufferPos(int n)
get position of data block in word
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int TotalBufNwords()
Get total length of m_buffer.
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetNumEvents()
get # of events in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
The Raw FTSW class 3 ( 2019.8.20 )
unsigned int GetEveNo(int n) OVERRIDE_CPP17
Get event #.
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
get unixtime of the trigger
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
Get a word containing ctime and trigger type info.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition: RawTLUFormat.h:25
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition: RawTLUFormat.cc:60
unsigned int GetEveNo(int n)
Get Event #.
Definition: RawTLUFormat.cc:37
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
unsigned int GetExpRunSubrun(int n) OVERRIDE_CPP17
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
Check if COPPER Magic words are correct.
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
get b2l block from "FEE b2link header"
Abstract base class for different kinds of events.