Belle II Software  release-08-01-10
DesSerPrePC.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/rawdata/DesSerPrePC.h>
9 #include <rawdata/dataobjects/RawFTSWFormat_latest.h>
10 #include <rawdata/dataobjects/RawTLUFormat.h>
11 #include <utility>
12 
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <netinet/tcp.h>
16 
17 #include <sys/socket.h>
18 
19 //#define NO_DATA_CHECK
20 //#define DUMHSLB
21 
22 using namespace std;
23 using namespace Belle2;
24 
25 //----------------------------------------------------------------
26 // Implementation
27 //----------------------------------------------------------------
28 DesSerPrePC::DesSerPrePC(string host_recv, int port_recv, const string& host_send, int port_send, int shmflag,
29  const std::string& nodename, int /*nodeid*/)
30 {
31 
32  for (int i = 0 ; i < m_num_connections; i++) {
33  // m_hostname_from.push_back( "localhost");
34  m_hostname_from.push_back(host_recv);
35  // m_port_from.push_back(30000);
36  m_port_from.push_back(port_recv) ;
37  m_socket_recv.push_back(-1);
38  }
39 
40  // m_port_to = 31001;
41  m_port_to = port_send;
42  // m_hostname_local = "localhost";
43  m_hostname_local = host_send;
44  m_nodename = nodename;
45 
46  m_shmflag = shmflag;
47 
48  // B2INFO("DeSerializerPrePC: Constructor done.");
49  printf("[INFO] DeSerializerPrePC: Constructor done.\n"); fflush(stdout);
50 }
51 
52 
53 
54 DesSerPrePC::~DesSerPrePC()
55 {
56 }
57 
58 
59 
60 int DesSerPrePC::recvFD(int sock, char* buf, int data_size_byte, int flag)
61 {
62  int n = 0;
63  while (1) {
64  int read_size = 0;
65  if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n, flag)) < 0) {
66  if (errno == EINTR) {
67  continue;
68  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
69 #ifdef NONSTOP
70  string err_str;
71  callCheckRunPause(err_str);
72 #endif
73  continue;
74  } else {
75  perror("[WARNING]");
76  char err_buf[500];
77  sprintf(err_buf, "recv() returned error; ret = %d. : %s %s %d",
78  read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
79 #ifdef NONSTOP
80  m_run_error = 1;
81  // B2ERROR(err_buf);
82  printf("[WARNING] %s\n", err_buf); fflush(stdout);
83  string err_str = "RUN_ERROR";
84  printf("AIUEO********************\n"); fflush(stdout);
85  throw (err_str);
86 #endif
87  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
88  exit(-1);
89  }
90  } else if (read_size == 0) {
91  // Connection is closed ( error )
92  char err_buf[500];
93  sprintf(err_buf, "[WARNING] Connection is closed by peer(%s). readsize = %d %d. : %s %s %d",
94  strerror(errno), read_size, errno, __FILE__, __PRETTY_FUNCTION__, __LINE__);
95 #ifdef NONSTOP
96  m_run_error = 1;
97  // B2ERROR(err_buf);
98  printf("%s\n", err_buf); fflush(stdout);
99  string err_str = "RUN_ERROR";
100  throw (err_str);
101 #else
102  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
103  exit(-1);
104 #endif
105  } else {
106  n += read_size;
107  if (n == data_size_byte)break;
108  }
109  }
110  return n;
111 }
112 
113 
114 int DesSerPrePC::Connect()
115 {
116 
117  for (int i = 0; i < m_num_connections; i++) {
118 
119  if (m_socket_recv[ i ] >= 0) continue; // Already have an established socket
120 
121  //
122  // Connect to a downstream node
123  //
124  struct hostent* host;
125  host = gethostbyname(m_hostname_from[ i ].c_str());
126  if (host == NULL) {
127  char err_buf[100];
128  sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
129  strerror(errno));
130  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
131  sleep(1234567);
132  exit(1);
133  }
134 
135  struct sockaddr_in socPC;
136  socPC.sin_family = AF_INET;
137  socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
138  socPC.sin_port = htons(m_port_from[ i ]);
139  int sd = socket(PF_INET, SOCK_STREAM, 0);
140  int val1 = 0;
141  setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
142 
143  struct timeval timeout;
144  timeout.tv_sec = 1;
145  timeout.tv_usec = 0;
146  setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)sizeof(timeout));
147 
148  printf("[DEBUG] Connecting to %s port %d\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]); fflush(stdout);
149 
150  while (1) {
151  if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
152  perror("Failed to connect. Retrying...");
153  usleep(500000);
154  } else {
155  // B2INFO("Done");
156  printf("[DEBUG] Done\n"); fflush(stdout);
157  break;
158  }
159  }
160  // m_socket_recv.push_back(sd);
161  m_socket_recv[ i ] = sd;
162 
163  // check socket paramters
164  int val, len;
165  len = sizeof(val);
166  getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
167  // B2INFO("SO_RCVBUF" << val);
168  getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
169  // B2DEBUG("SO_SNDBUF" << val);
170  getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
171  // B2DEBUG("TCP_MAXSEG" << val);
172  getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
173  // B2DEBUG("TCP_NODELAY" << val);
174 
175  if (m_status.isAvailable()) {
176  sockaddr_in sa;
177  memset(&sa, 0, sizeof(sockaddr_in));
178  socklen_t sa_len = sizeof(sa);
179  if (getsockname(m_socket_recv[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
180  m_status.setInputPort(ntohs(sa.sin_port));
181  m_status.setInputAddress(sa.sin_addr.s_addr);
182  }
183  }
184 
185  }
186  // B2INFO("[DEBUG] Initialization finished");
187  printf("[DEBUG] Initialization finished\n"); fflush(stdout);
188  return 0;
189 }
190 
191 
192 
193 int* DesSerPrePC::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock, int* num_nodes_in_sendblock)
194 {
195  int* temp_buf = NULL; // buffer for data-body
196  int flag = 0;
197 
198  vector <int> each_buf_nwords;
199  each_buf_nwords.clear();
200  vector <int> each_buf_nodes;
201  each_buf_nodes.clear();
202  vector <int> each_buf_events;
203  each_buf_events.clear();
204 
205  *total_buf_nwords = 0;
206  *num_nodes_in_sendblock = 0;
207  *num_events_in_sendblock = 0;
208 
209  //
210  // Read Header and obtain data size
211  //
212  int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
213 
214  // Read header
215  for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
216 
217  recvFD(m_socket_recv[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
218 
219  SendHeader send_hdr;
220  send_hdr.SetBuffer(send_hdr_buf);
221 
222  int temp_num_events = send_hdr.GetNumEventsinPacket();
223  int temp_num_nodes = send_hdr.GetNumNodesinPacket();
224 
225  if (i == 0) {
226  *num_events_in_sendblock = temp_num_events;
227  } else if (*num_events_in_sendblock != temp_num_events) {
228 #ifndef NO_DATA_CHECK
229  char err_buf[500];
230  sprintf(err_buf,
231  "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
232  *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
233  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
234  sleep(1234567);
235  exit(1);
236 #endif
237  }
238 
239  *num_nodes_in_sendblock += temp_num_nodes;
240 
241  int rawblk_nwords = send_hdr.GetTotalNwords()
242  - SendHeader::SENDHDR_NWORDS
243  - SendTrailer::SENDTRL_NWORDS;
244  *total_buf_nwords += rawblk_nwords;
245 
246  //
247  // Data size check1
248  //
249  if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
250  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
251  char err_buf[500];
252  sprintf(err_buf, "CORRUPTED DATA: Too large event : Header %d %d %d %d :block size %d words\n", i, temp_num_events, temp_num_nodes,
253  send_hdr.GetTotalNwords(), rawblk_nwords);
254  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
255  sleep(123456);
256  exit(1);
257 
258  }
259 
260  each_buf_nwords.push_back(rawblk_nwords);
261  each_buf_events.push_back(temp_num_events);
262  each_buf_nodes.push_back(temp_num_nodes);
263 
264  }
265 
266 
267  temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
268  //
269  // Read body
270  //
271  int total_recvd_byte = 0;
272  for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
273 
274  try {
275  total_recvd_byte += recvFD(m_socket_recv[ i ], (char*)temp_buf + total_recvd_byte,
276  each_buf_nwords[ i ] * sizeof(int), flag);
277  } catch (string err_str) {
278  if (*delete_flag) {
279  // B2WARNING("Delete buffer before going to Run-pause state");
280  printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
281  delete temp_buf;
282  }
283  throw (std::move(err_str));
284  }
285  //
286  // Data length check
287  //
288  int temp_length = 0;
289  for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
290  int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
291  temp_length += this_length * sizeof(int);
292  }
293  if (temp_length != (int)(each_buf_nwords[ i ] * sizeof(int))) {
294  printf("[DEBUG]*******SENDHDR*********** \n");
295  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
296  printf("[DEBUG]*******BODY***********\n ");
297  printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
298  char err_buf[500];
299  sprintf(err_buf, "CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
300  (int)(*total_buf_nwords * sizeof(int)), temp_length);
301  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
302  sleep(1234567);
303  exit(-1);
304  }
305 
306  }
307 
308  if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
309  char err_buf[500];
310  sprintf(err_buf, "CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
311  total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
312  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
313  sleep(1234567);
314  exit(-1);
315  }
316 
317  // Read Traeiler
318  int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
319  for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
320  try {
321  recvFD(m_socket_recv[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
322  } catch (string err_str) {
323  if (*delete_flag) {
324  // B2WARNING("Delete buffer before going to Run-pause state");
325  printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
326  delete temp_buf;
327  }
328  throw (std::move(err_str));
329  }
330  }
331 
332  return temp_buf;
333 }
334 
335 
336 void DesSerPrePC::setRecvdBuffer(RawDataBlockFormat* temp_raw_datablk, int* delete_flag)
337 {
338  //
339  // Get data from socket
340  //
341  int total_buf_nwords = 0 ;
342  int num_events_in_sendblock = 0;
343  int num_nodes_in_sendblock = 0;
344 
345  if (m_start_flag == 0) {
346  // B2INFO("DeSerializerPrePC: Reading the 1st packet from eb0...");
347  printf("DeSerializerPrePC: Reading the 1st packet from eb0...\n"); fflush(stdout);
348  }
349  int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
350  &num_nodes_in_sendblock);
351  if (m_start_flag == 0) {
352  // B2INFO("DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords << " words");
353  printf("DeSerializerPrePC: Done. the size of the 1st packet %d words\n", total_buf_nwords); fflush(stdout);
354  m_start_flag = 1;
355  }
356  m_recvd_totbytes += total_buf_nwords * sizeof(int);
357 
358  temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
359  num_events_in_sendblock, num_nodes_in_sendblock);
360 
361  //
362  // check even # and node # in one Sendblock
363  //
364  int num_entries = temp_raw_datablk->GetNumEntries();
365  if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
366  char err_buf[500];
367  sprintf(err_buf,
368  "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
369  num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
370  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
371  sleep(1234567);
372  exit(-1);
373  }
374  return;
375 
376 }
377 
378 
379 
380 
381 void DesSerPrePC::checkData(RawDataBlockFormat* raw_datablk, unsigned int* eve_copper_0)
382 {
383  // int data_size_copper_0 = -1;
384  // int data_size_copper_1 = -1;
385 
386  //
387  // Data check
388  //
389  int* temp_buf = raw_datablk->GetBuffer(0);
390  int cpr_num = 0;
391  unsigned int cur_evenum = 0, cur_copper_ctr = 0;
392  unsigned int eve_array[32]; // # of noeds is less than 17
393  unsigned int utime_array[32];// # of noeds is less than 17
394  unsigned int ctime_type_array[32];// # of noeds is less than 17
395 
396 #ifdef DUMHSLB
397  unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
398 #endif
399 
400  for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
401  memset(eve_array, 0, sizeof(eve_array));
402  memset(utime_array, 0, sizeof(utime_array));
403  memset(ctime_type_array, 0, sizeof(ctime_type_array));
404 
405  int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
406  for (int l = 0; l < num_nodes_in_sendblock; l++) {
407  int entry_id = l + k * num_nodes_in_sendblock;
408 
409  //
410  // RawFTSW
411  //
412  if (raw_datablk->CheckFTSWID(entry_id)) {
413  RawFTSWFormat_latest* temp_rawftsw = new RawFTSWFormat_latest;
414  int block_id = 0;
415  temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
416  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
417  if (temp_rawftsw->GetEveNo(block_id) < 10) {
418  printf("[DEBUG] ######FTSW#########\n");
419  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
420  }
421 
422 #ifdef DUMHSLB
423  exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
424  ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
425  utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
426 #endif
427 
428 
429 #ifndef NO_DATA_CHECK
430  try {
431  temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
432  eve_array[ entry_id ] = cur_evenum;
433  } catch (string err_str) {
434  char err_buf[500];
435  strcpy(err_buf, err_str.c_str());
436  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
437  exit(1);
438  }
439 #endif
440  utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
441  ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
442  delete temp_rawftsw;
443 
444  //
445  // RawTLU
446  //
447  } else if (raw_datablk->CheckTLUID(entry_id)) {
448 
449  RawTLUFormat* temp_rawtlu = new RawTLUFormat;
450  temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
451  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
452  if (temp_rawtlu->GetEveNo(0) < 10
453  ) {
454  printf("[DEBUG] ######TLU#########\n");
455  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
456  }
457 
458 #ifndef NO_DATA_CHECK
459  try {
460  temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
461  eve_array[ entry_id ] = cur_evenum;
462  } catch (string err_str) {
463  char err_buf[500];
464  strcpy(err_buf, err_str.c_str());
465  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
466  exit(1);
467  }
468 #endif
469  delete temp_rawtlu;
470  } else {
471  //
472  // RawCOPPER
473  //
474  PreRawCOPPERFormat_v2* pre_rawcpr_fmt = new PreRawCOPPERFormat_v2;
475  pre_rawcpr_fmt->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
476  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
477 
478 #ifdef DUMHSLB
479  int block_id = 0;
480  "do not use the following for actual DAQ"
481  (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_EXP_RUN_NO ] = exp_run_ftsw;
482  (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
483  (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_TTUTIME ] = utime_ftsw;
484 #endif
485 
486 #ifndef NO_DATA_CHECK
487  try {
488  pre_rawcpr_fmt->CheckData(0, m_prev_evenum, &cur_evenum,
489  m_prev_copper_ctr, &cur_copper_ctr,
490  m_prev_exprunsubrun_no, &m_exprunsubrun_no);
491  eve_array[ entry_id ] = cur_evenum;
492  } catch (string err_str) {
493  exit(1); // Error in the contents of an event was detected
494  }
495 #endif
496 
497  utime_array[ entry_id ] = pre_rawcpr_fmt->GetTTUtime(0);
498  ctime_type_array[ entry_id ] = pre_rawcpr_fmt->GetTTCtimeTRGType(0);
499 
500  if (cpr_num == 0) {
501  // data_size_copper_0 = raw_datablk->GetBlockNwords(entry_id);
502  *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
503  } else if (cpr_num == 1) {
504  // data_size_copper_1 = raw_datablk->GetBlockNwords(entry_id);
505  }
506  cpr_num++;
507  delete pre_rawcpr_fmt;
508  }
509  }
510 
511 #ifndef NO_DATA_CHECK
512  // event #, ctime, utime over nodes
513  for (int l = 1; l < num_nodes_in_sendblock; l++) {
514  if (eve_array[ 0 ] != eve_array[ l ] ||
515  utime_array[ 0 ] != utime_array[ l ] ||
516  ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
517  char err_buf[500];
518  for (int m = 0; m < num_nodes_in_sendblock; m++) {
519  printf("[DEBUG] node %d eve # %x utime %x ctime %x\n",
520  m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
521  }
522  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
523  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
524  sleep(1234567);
525  exit(-1);
526  }
527  }
528 #endif
529 
530  // Event # monitor in runchange
531 // if (m_prev_runsubrun_no != m_runsubrun_no) {
532 // printf("[DEBUG] ##############################################\n");
533 // for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
534 // printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
535 // }
536 // printf("[DEBUG] ##############################################\n");
537 // fflush(stderr);
538 // }
539  m_prev_evenum = cur_evenum;
540  m_prev_copper_ctr = cur_copper_ctr;
541  m_prev_exprunsubrun_no = m_exprunsubrun_no;
542  }
543  return;
544 }
545 
546 
547 
548 void DesSerPrePC::DataAcquisition()
549 {
550  // For data check
551  unsigned int eve_copper_0 = 0;
552  // B2INFO("initializing...");
553  printf("[DEBUG] initializing...\n"); fflush(stdout);
554  initialize();
555 
556  // B2INFO("Done.");
557  printf("[DEBUG] Done.\n"); fflush(stdout);
558 
559  if (m_start_flag == 0) {
560  //
561  // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
562  //
563  Connect();
564  if (m_status.isAvailable()) {
565  // B2INFO("DeSerializerPrePC: Waiting for Start...\n");
566  printf("[DEBUG] DeSerializerPrePC: Waiting for Start...\n"); fflush(stdout);
567  m_status.reportRunning();
568  }
569  m_start_time = getTimeSec();
570  n_basf2evt = 0;
571  }
572 
573  //
574  // Main loop
575  //
576  while (1) {
577  //
578  // Stand-by loop
579  //
580 #ifdef NONSTOP
581  if (m_run_pause > 0 || m_run_error > 0) {
582  waitResume();
583  }
584 #endif
585 
586  clearNumUsedBuf();
587  // RawDataBlock raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
588  RawDataBlockFormat raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
589 
590  //
591  // Recv loop
592  //
593  for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
594  //
595  // Receive data from COPPER
596  //
597  eve_copper_0 = 0;
598  int delete_flag_from =
599  0; // Delete flag for temp_rawdatablk.It can be set to 1 by setRecvdBuffer if the buffer size is larger than that of pre-allocated buffer.
600  int delete_flag_to =
601  0; // Delete flag for raw_datablk[i]. It can be set to 1 by getNewBuffer if the buffer size is larger than that of pre-allocated buffer.
602  RawDataBlockFormat temp_rawdatablk;
603  try {
604  setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
605  checkData(&temp_rawdatablk, &eve_copper_0);
606  } catch (string err_str) {
607  printf("Error was detected\n"); fflush(stdout);
608  break;
609  }
610  // PreRawCOPPERFormat_v2 pre_rawcopper_v2;
611  // pre_rawcopper_v2.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
612  // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
614 
615  int temp_num_events, temp_num_nodes;
616  int temp_nwords_to;
617  int* buf_to = NULL;
618 #ifdef REDUCED_RAWCOPPER
619  //
620  // Copy reduced buffer
621  //
622  // int* buf_to = getNewBuffer(m_pre_rawcpr.CalcReducedDataSize(&temp_rawdatablk), &delete_flag_to); // basf2-dependent style
623  int* temp_bufin = temp_rawdatablk.GetWholeBuffer();
624  int temp_nwords_from = temp_rawdatablk.TotalBufNwords();
625  temp_num_events = temp_rawdatablk.GetNumEvents();
626  temp_num_nodes = temp_rawdatablk.GetNumNodes();
627  int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
628  buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
629 
630  // m_pre_rawcpr.CopyReducedData(&temp_rawdatablk, buf_to, delete_flag_from); // basf2-dependent style
631  m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
632  if (calced_temp_nwords_to != temp_nwords_to) {
633  char err_buf[500];
634  sprintf(err_buf,
635  "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
636  calced_temp_nwords_to, temp_nwords_to);
637  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
638  exit(1);
639  }
640  m_status.copyEventHeader(buf_to);
641 #else
642  buf_to = temp_rawdatablk.GetWholeBuffer();
643  temp_nwords_to = temp_rawdatablk.TotalBufNwords();
644  temp_num_events = temp_rawdatablk.GetNumEvents();
645  temp_num_nodes = temp_rawdatablk.GetNumNodes();
646  delete_flag_to = delete_flag_from;
647  delete_flag_from = 0; // to avoid double delete
648 #endif
649 
650  //
651  // Set buffer to the RawData class stored in DataStore
652  //
653 // raw_datablk[ j ].SetBuffer( (int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
654 // delete_flag_to, temp_rawdatablk.GetNumEvents(),
655 // temp_rawdatablk.GetNumNodes());
656  raw_datablk[ j ].SetBuffer(buf_to, temp_nwords_to, delete_flag_to, temp_num_events, temp_num_nodes);
657 
658 
659  //
660  // CRC16 check after data reduction
661  //
662 #ifdef REDUCED_RAWCOPPER
663  PostRawCOPPERFormat_v2 post_rawcopper_v2; // Should be the latest version before ver.4(PCIe40)
664 
665 // post_rawcopper_v2.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
666 // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
667  post_rawcopper_v2.SetBuffer(raw_datablk[ j ].GetWholeBuffer(), raw_datablk[ j ].TotalBufNwords(),
668  0, raw_datablk[ j ].GetNumEvents(), raw_datablk[ j ].GetNumNodes());
669 
670  for (int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
671  int block_num = 0;
672  if (post_rawcopper_v2.GetFINESSENwords(block_num, i_finesse_num) > 0) {
673  post_rawcopper_v2.CheckCRC16(block_num, i_finesse_num);
674  }
675  }
676 
677 #endif
678  }
679 
680 #ifdef NONSTOP
681  // Goto Stand-by loop when run is paused or stopped by error
682  if (m_run_pause != 0 || m_run_error != 0) continue;
683 #endif
684 
685 
687  // From Serializer.cc
689  if (m_start_flag == 0) {
690  m_start_time = getTimeSec();
691  n_basf2evt = 0;
692  }
693  // StoreArray<RawCOPPER> rawcprarray;
694  // StoreArray<RawDataBlock> raw_dblkarray;
695 
696  for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
697 
698  //
699  // Send data
700  //
701  if (m_start_flag == 0) {
702  // B2INFO("SerializerPC: Sending the 1st packet...");
703  printf("[DEBUG] SerializerPC: Sending the 1st packet...\n"); fflush(stdout);
704  }
705 
706  try {
707  m_sent_totbytes += sendByWriteV(&(raw_datablk[ j ]));
708  } catch (string err_str) {
709 #ifdef NONSTOP
710  break;
711 #endif
712  print_err.PrintError((char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
713  exit(1);
714  }
715  if (m_start_flag == 0) {
716  // B2INFO("Done. ");
717  printf("[DEBUG] Done.\n"); fflush(stdout);
718  m_start_flag = 1;
719  }
720  }
721 
722 #ifdef NONSTOP
723  // Goto Stand-by loop when run is paused or stopped by error
724  if (m_run_pause != 0 || m_run_error != 0) continue;
725 #endif
726 
727  //
728  // Monitor
729  //
730  if (max_nevt >= 0 || max_seconds >= 0.) {
731 #ifdef AIUEO
732  if (n_basf2evt % 10000 == 0) {
733 // if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
734 // || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
735  printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
736  max_nevt, max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
737  }
738 #endif
739  }
740 
741  if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC) % 100000 == 0) {
742  double interval = cur_time - m_prev_time;
743  double total_time = cur_time - m_start_time;
744  printf("[DEBUG] Event %12d Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] sent %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s]\n",
745  n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC,
746  (n_basf2evt - m_prev_nevt)*NUM_EVT_PER_BASF2LOOP_PC / interval / 1.e3,
747  (m_recvd_totbytes - m_recvd_prev_totbytes) / interval / 1.e6,
748  (m_sent_totbytes - m_sent_prev_totbytes) / interval / 1.e6,
749  total_time,
750  interval);
751  fflush(stdout);
752 
753  m_prev_time = cur_time;
754  m_recvd_prev_totbytes = m_recvd_totbytes;
755  m_sent_prev_totbytes = m_sent_totbytes;
756  m_prev_nevt = n_basf2evt;
757  cur_time = getTimeSec();
758  }
759 
760  n_basf2evt++;
761 
762  if (m_status.isAvailable()) {
763  m_status.setOutputNBytes(m_sent_totbytes);
764  m_status.setOutputCount(n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC);
765  }
766 
767  }
768 
769  return;
770 }
771 
773 // From Serializer.cc
775 
776 
777 #ifdef NONSTOP
778 void DesSerPrePC::waitResume()
779 {
780  if (m_run_pause == 0) {
781  while (true) {
782  if (checkRunPause()) break;
783 #ifdef NONSTOP_DEBUG
784  printf("\033[31m");
785  printf("###########(DesSerPrePC) Waiting for Runstop() ###############\n");
786  fflush(stdout);
787  printf("\033[0m");
788 #endif
789  sleep(1);
790  }
791  }
792 
793  while (true) {
794 #ifdef NONSTOP_DEBUG
795  printf("\033[31m");
796  printf("###########(Ser) Waiting for Resume ###############\n");
797  fflush(stdout);
798  printf("\033[0m");
799 #endif
800  if (checkRunRecovery()) {
801  m_run_pause = 0;
802  break;
803  }
804  sleep(1);
805  }
806 
807 
808  printf("Done!\n"); fflush(stdout);
809 
810  printf("Checking connection to eb0\n"); fflush(stdout);
811  if (CheckConnection(m_socket_send) < 0) {
812  printf("Trying Accept1\n"); fflush(stdout);
813  Accept();
814  printf("Trying Accept2\n"); fflush(stdout);
815  }
816 
817  printf("Checking connection to COPPER\n"); fflush(stdout);
818  for (int i = 0; i < m_num_connections; i++) {
819  if (CheckConnection(m_socket_recv[ i ]) < 0) m_socket_recv[ i ] = -1;
820  }
821  Connect();
822 
823  resumeRun();
824  return;
825 }
826 
827 
828 
829 #endif
830 
The Raw COPPER class ver.2 This class stores data received by COPPER via belle2link Data from all det...
int CheckCRC16(int n, int finesse_num)
check magic words
int GetFINESSENwords(int n, int finesse) OVERRIDE_CPP17
get data size of FINESSE buffer
The Raw COPPER class ver.1 ( the latest version since May, 2014 ) This class stores data received by ...
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check data contents
The RawDataBlockFormat class Format information for rawdata handling.
virtual int GetBlockNwords(int n)
get size of a data block
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBufferPos(int n)
get position of data block in word
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int TotalBufNwords()
Get total length of m_buffer.
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetNumEvents()
get # of events in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
The Raw FTSW class 3 ( 2019.8.20 )
unsigned int GetEveNo(int n) OVERRIDE_CPP17
Get event #.
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
get unixtime of the trigger
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
Get a word containing ctime and trigger type info.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition: RawTLUFormat.h:25
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition: RawTLUFormat.cc:60
unsigned int GetEveNo(int n)
Get Event #.
Definition: RawTLUFormat.cc:37
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
unsigned int GetExpRunSubrun(int n) OVERRIDE_CPP17
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
Check if COPPER Magic words are correct.
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
get b2l block from "FEE b2link header"
Abstract base class for different kinds of events.