Belle II Software  release-05-01-25
DesSerPrePC.cc
1 //+
2 // File : DesSerPrePC.cc
3 // Description : Module to receive data from eb0 and send it to eb1tx
4 //
5 // Author : Satoru Yamada Itoh, IPNS, KEK
6 // Date : 2 - Aug - 2013
7 //-
8 #include <daq/rawdata/DesSerPrePC.h>
9 #include <rawdata/dataobjects/RawFTSWFormat_latest.h>
10 #include <rawdata/dataobjects/RawTLUFormat.h>
11 
12 #include <arpa/inet.h>
13 #include <netdb.h>
14 #include <netinet/tcp.h>
15 
16 #include <sys/socket.h>
17 
18 //#define NO_DATA_CHECK
19 //#define DUMHSLB
20 
21 using namespace std;
22 using namespace Belle2;
23 
24 //----------------------------------------------------------------
25 // Implementation
26 //----------------------------------------------------------------
27 DesSerPrePC::DesSerPrePC(string host_recv, int port_recv, string host_send, int port_send, int shmflag,
28  const std::string& nodename, int nodeid)
29 {
30 
31  for (int i = 0 ; i < m_num_connections; i++) {
32  // m_hostname_from.push_back( "localhost");
33  m_hostname_from.push_back(host_recv);
34  // m_port_from.push_back(30000);
35  m_port_from.push_back(port_recv) ;
36  m_socket_recv.push_back(-1);
37  }
38 
39  // m_port_to = 31001;
40  m_port_to = port_send;
41  // m_hostname_local = "localhost";
42  m_hostname_local = host_send;
43  m_nodename = nodename;
44 
45  m_shmflag = shmflag;
46 
47  // B2INFO("DeSerializerPrePC: Constructor done.");
48  printf("[INFO] DeSerializerPrePC: Constructor done.\n"); fflush(stdout);
49 }
50 
51 
52 
53 DesSerPrePC::~DesSerPrePC()
54 {
55 }
56 
57 
58 
59 int DesSerPrePC::recvFD(int sock, char* buf, int data_size_byte, int flag)
60 {
61  int n = 0;
62  int read_size = 0;
63  while (1) {
64  if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n , flag)) < 0) {
65  if (errno == EINTR) {
66  continue;
67  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
68 #ifdef NONSTOP
69  string err_str;
70  callCheckRunPause(err_str);
71 #endif
72  continue;
73  } else {
74  perror("[WARNING]");
75  char err_buf[500];
76  sprintf(err_buf, "recv() returned error; ret = %d. : %s %s %d",
77  read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
78 #ifdef NONSTOP
79  m_run_error = 1;
80  // B2ERROR(err_buf);
81  printf("[WARNING] %s\n", err_buf); fflush(stdout);
82  string err_str = "RUN_ERROR";
83  printf("AIUEO********************\n"); fflush(stdout);
84  throw (err_str);
85 #endif
86  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
87  exit(-1);
88  }
89  } else if (read_size == 0) {
90  // Connection is closed ( error )
91  char err_buf[500];
92  sprintf(err_buf, "[WARNING] Connection is closed by peer(%s). readsize = %d %d. : %s %s %d",
93  strerror(errno), read_size, errno, __FILE__, __PRETTY_FUNCTION__, __LINE__);
94 #ifdef NONSTOP
95  m_run_error = 1;
96  // B2ERROR(err_buf);
97  printf("%s\n", err_buf); fflush(stdout);
98  string err_str = "RUN_ERROR";
99  throw (err_str);
100 #else
101  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
102  exit(-1);
103 #endif
104  } else {
105  n += read_size;
106  if (n == data_size_byte)break;
107  }
108  }
109  return n;
110 }
111 
112 
113 int DesSerPrePC::Connect()
114 {
115 
116  for (int i = 0; i < m_num_connections; i++) {
117 
118  if (m_socket_recv[ i ] >= 0) continue; // Already have an established socket
119 
120  //
121  // Connect to a downstream node
122  //
123  struct hostent* host;
124  host = gethostbyname(m_hostname_from[ i ].c_str());
125  if (host == NULL) {
126  char err_buf[100];
127  sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
128  strerror(errno));
129  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
130  sleep(1234567);
131  exit(1);
132  }
133 
134  struct sockaddr_in socPC;
135  socPC.sin_family = AF_INET;
136  socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
137  socPC.sin_port = htons(m_port_from[ i ]);
138  int sd = socket(PF_INET, SOCK_STREAM, 0);
139  int val1 = 0;
140  setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
141 
142  struct timeval timeout;
143  timeout.tv_sec = 1;
144  timeout.tv_usec = 0;
145  setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)sizeof(timeout));
146 
147  printf("[DEBUG] Connecting to %s port %d\n" , m_hostname_from[ i ].c_str(), m_port_from[ i ]); fflush(stdout);
148 
149  while (1) {
150  if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
151  perror("Failed to connect. Retrying...");
152  usleep(500000);
153  } else {
154  // B2INFO("Done");
155  printf("[DEBUG] Done\n"); fflush(stdout);
156  break;
157  }
158  }
159  // m_socket_recv.push_back(sd);
160  m_socket_recv[ i ] = sd;
161 
162  // check socket paramters
163  int val, len;
164  len = sizeof(val);
165  getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
166  // B2INFO("SO_RCVBUF" << val);
167  getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
168  // B2DEBUG("SO_SNDBUF" << val);
169  getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
170  // B2DEBUG("TCP_MAXSEG" << val);
171  getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
172  // B2DEBUG("TCP_NODELAY" << val);
173 
174  if (m_status.isAvailable()) {
175  sockaddr_in sa;
176  memset(&sa, 0, sizeof(sockaddr_in));
177  socklen_t sa_len = sizeof(sa);
178  if (getsockname(m_socket_recv[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
179  m_status.setInputPort(ntohs(sa.sin_port));
180  m_status.setInputAddress(sa.sin_addr.s_addr);
181  }
182  }
183 
184  }
185  // B2INFO("[DEBUG] Initialization finished");
186  printf("[DEBUG] Initialization finished\n"); fflush(stdout);
187  return 0;
188 }
189 
190 
191 
192 int* DesSerPrePC::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock, int* num_nodes_in_sendblock)
193 {
194  int* temp_buf = NULL; // buffer for data-body
195  int flag = 0;
196 
197  vector <int> each_buf_nwords;
198  each_buf_nwords.clear();
199  vector <int> each_buf_nodes;
200  each_buf_nodes.clear();
201  vector <int> each_buf_events;
202  each_buf_events.clear();
203 
204  *total_buf_nwords = 0;
205  *num_nodes_in_sendblock = 0;
206  *num_events_in_sendblock = 0;
207 
208  //
209  // Read Header and obtain data size
210  //
211  int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
212  int temp_num_events = 0;
213  int temp_num_nodes = 0;
214 
215  // Read header
216  for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
217 
218  recvFD(m_socket_recv[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
219 
220  SendHeader send_hdr;
221  send_hdr.SetBuffer(send_hdr_buf);
222 
223  temp_num_events = send_hdr.GetNumEventsinPacket();
224  temp_num_nodes = send_hdr.GetNumNodesinPacket();
225 
226  if (i == 0) {
227  *num_events_in_sendblock = temp_num_events;
228  } else if (*num_events_in_sendblock != temp_num_events) {
229 #ifndef NO_DATA_CHECK
230  char err_buf[500];
231  sprintf(err_buf,
232  "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
233  *num_events_in_sendblock , temp_num_events, i, *num_nodes_in_sendblock , temp_num_nodes, i);
234  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
235  sleep(1234567);
236  exit(1);
237 #endif
238  }
239 
240  *num_nodes_in_sendblock += temp_num_nodes;
241 
242  int rawblk_nwords = send_hdr.GetTotalNwords()
243  - SendHeader::SENDHDR_NWORDS
244  - SendTrailer::SENDTRL_NWORDS;
245  *total_buf_nwords += rawblk_nwords;
246 
247  //
248  // Data size check1
249  //
250  if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
251  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
252  char err_buf[500];
253  sprintf(err_buf, "CORRUPTED DATA: Too large event : Header %d %d %d %d :block size %d words\n", i, temp_num_events, temp_num_nodes,
254  send_hdr.GetTotalNwords(), rawblk_nwords);
255  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
256  sleep(123456);
257  exit(1);
258 
259  }
260 
261  each_buf_nwords.push_back(rawblk_nwords);
262  each_buf_events.push_back(temp_num_events);
263  each_buf_nodes.push_back(temp_num_nodes);
264 
265  }
266 
267 
268  temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
269  //
270  // Read body
271  //
272  int total_recvd_byte = 0;
273  for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
274 
275  try {
276  total_recvd_byte += recvFD(m_socket_recv[ i ], (char*)temp_buf + total_recvd_byte,
277  each_buf_nwords[ i ] * sizeof(int), flag);
278  } catch (string err_str) {
279  if (*delete_flag) {
280  // B2WARNING("Delete buffer before going to Run-pause state");
281  printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
282  delete temp_buf;
283  }
284  throw (err_str);
285  }
286  //
287  // Data length check
288  //
289  int temp_length = 0;
290  for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
291  int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
292  temp_length += this_length * sizeof(int);
293  }
294  if (temp_length != (int)(each_buf_nwords[ i ] * sizeof(int))) {
295  printf("[DEBUG]*******SENDHDR*********** \n");
296  printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
297  printf("[DEBUG]*******BODY***********\n ");
298  printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
299  char err_buf[500];
300  sprintf(err_buf, "CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
301  (int)(*total_buf_nwords * sizeof(int)), temp_length);
302  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
303  sleep(1234567);
304  exit(-1);
305  }
306 
307  }
308 
309  if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
310  char err_buf[500];
311  sprintf(err_buf, "CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
312  total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
313  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
314  sleep(1234567);
315  exit(-1);
316  }
317 
318  // Read Traeiler
319  int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
320  for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
321  try {
322  recvFD(m_socket_recv[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
323  } catch (string err_str) {
324  if (*delete_flag) {
325  // B2WARNING("Delete buffer before going to Run-pause state");
326  printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
327  delete temp_buf;
328  }
329  throw (err_str);
330  }
331  }
332 
333  return temp_buf;
334 }
335 
336 
337 void DesSerPrePC::setRecvdBuffer(RawDataBlockFormat* temp_raw_datablk, int* delete_flag)
338 {
339  //
340  // Get data from socket
341  //
342  int total_buf_nwords = 0 ;
343  int num_events_in_sendblock = 0;
344  int num_nodes_in_sendblock = 0;
345 
346  if (m_start_flag == 0) {
347  // B2INFO("DeSerializerPrePC: Reading the 1st packet from eb0...");
348  printf("DeSerializerPrePC: Reading the 1st packet from eb0...\n"); fflush(stdout);
349  }
350  int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
351  &num_nodes_in_sendblock);
352  if (m_start_flag == 0) {
353  // B2INFO("DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords << " words");
354  printf("DeSerializerPrePC: Done. the size of the 1st packet %d words\n", total_buf_nwords); fflush(stdout);
355  m_start_flag = 1;
356  }
357  m_recvd_totbytes += total_buf_nwords * sizeof(int);
358 
359  temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
360  num_events_in_sendblock, num_nodes_in_sendblock);
361 
362  //
363  // check even # and node # in one Sendblock
364  //
365  int num_entries = temp_raw_datablk->GetNumEntries();
366  if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
367  char err_buf[500];
368  sprintf(err_buf,
369  "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
370  num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
371  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
372  sleep(1234567);
373  exit(-1);
374  }
375  return;
376 
377 }
378 
379 
380 
381 
382 void DesSerPrePC::checkData(RawDataBlockFormat* raw_datablk, unsigned int* eve_copper_0)
383 {
384  // int data_size_copper_0 = -1;
385  // int data_size_copper_1 = -1;
386 
387  //
388  // Data check
389  //
390  int* temp_buf = raw_datablk->GetBuffer(0);
391  int cpr_num = 0;
392  unsigned int cur_evenum = 0, cur_copper_ctr = 0;
393  unsigned int eve_array[32]; // # of noeds is less than 17
394  unsigned int utime_array[32];// # of noeds is less than 17
395  unsigned int ctime_type_array[32];// # of noeds is less than 17
396 
397 #ifdef DUMHSLB
398  unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
399 #endif
400 
401  for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
402  memset(eve_array, 0, sizeof(eve_array));
403  memset(utime_array, 0, sizeof(utime_array));
404  memset(ctime_type_array, 0, sizeof(ctime_type_array));
405 
406  int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
407  for (int l = 0; l < num_nodes_in_sendblock; l++) {
408  int entry_id = l + k * num_nodes_in_sendblock;
409 
410  //
411  // RawFTSW
412  //
413  if (raw_datablk->CheckFTSWID(entry_id)) {
414  RawFTSWFormat_latest* temp_rawftsw = new RawFTSWFormat_latest;
415  int block_id = 0;
416  temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
417  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
418  if (temp_rawftsw->GetEveNo(block_id) < 10) {
419  printf("[DEBUG] ######FTSW#########\n");
420  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
421  }
422 
423 #ifdef DUMHSLB
424  exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
425  ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
426  utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
427 #endif
428 
429 
430 #ifndef NO_DATA_CHECK
431  try {
432  temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
433  eve_array[ entry_id ] = cur_evenum;
434  } catch (string err_str) {
435  char err_buf[500];
436  strcpy(err_buf, err_str.c_str());
437  print_err.PrintError(err_buf , __FILE__, __PRETTY_FUNCTION__, __LINE__);
438  exit(1);
439  }
440 #endif
441  utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
442  ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
443  delete temp_rawftsw;
444 
445  //
446  // RawTLU
447  //
448  } else if (raw_datablk->CheckTLUID(entry_id)) {
449 
450  RawTLUFormat* temp_rawtlu = new RawTLUFormat;
451  temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
452  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
453  if (temp_rawtlu->GetEveNo(0) < 10
454  ) {
455  printf("[DEBUG] ######TLU#########\n");
456  printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
457  }
458 
459 #ifndef NO_DATA_CHECK
460  try {
461  temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
462  eve_array[ entry_id ] = cur_evenum;
463  } catch (string err_str) {
464  char err_buf[500];
465  strcpy(err_buf, err_str.c_str());
466  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
467  exit(1);
468  }
469 #endif
470  delete temp_rawtlu;
471  } else {
472  //
473  // RawCOPPER
474  //
476  pre_rawcpr_fmt->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
477  raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
478 
479 #ifdef DUMHSLB
480  int block_id = 0;
481  "do not use the following for actual DAQ"
482  (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
483  (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
484  (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
485 #endif
486 
487 #ifndef NO_DATA_CHECK
488  try {
489  pre_rawcpr_fmt->CheckData(0, m_prev_evenum, &cur_evenum,
490  m_prev_copper_ctr, &cur_copper_ctr,
491  m_prev_exprunsubrun_no, &m_exprunsubrun_no);
492  eve_array[ entry_id ] = cur_evenum;
493  } catch (string err_str) {
494  exit(1); // Error in the contents of an event was detected
495  }
496 #endif
497 
498  utime_array[ entry_id ] = pre_rawcpr_fmt->GetTTUtime(0);
499  ctime_type_array[ entry_id ] = pre_rawcpr_fmt->GetTTCtimeTRGType(0);
500 
501  if (cpr_num == 0) {
502  // data_size_copper_0 = raw_datablk->GetBlockNwords(entry_id);
503  *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
504  } else if (cpr_num == 1) {
505  // data_size_copper_1 = raw_datablk->GetBlockNwords(entry_id);
506  }
507  cpr_num++;
508  delete pre_rawcpr_fmt;
509  }
510  }
511 
512 #ifndef NO_DATA_CHECK
513  // event #, ctime, utime over nodes
514  for (int l = 1; l < num_nodes_in_sendblock; l++) {
515  if (eve_array[ 0 ] != eve_array[ l ] ||
516  utime_array[ 0 ] != utime_array[ l ] ||
517  ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
518  char err_buf[500];
519  for (int m = 0; m < num_nodes_in_sendblock; m++) {
520  printf("[DEBUG] node %d eve # %d utime %x ctime %x\n",
521  m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
522  }
523  sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
524  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
525  sleep(1234567);
526  exit(-1);
527  }
528  }
529 #endif
530 
531  // Event # monitor in runchange
532 // if (m_prev_runsubrun_no != m_runsubrun_no) {
533 // printf("[DEBUG] ##############################################\n");
534 // for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
535 // printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
536 // }
537 // printf("[DEBUG] ##############################################\n");
538 // fflush(stderr);
539 // }
540  m_prev_evenum = cur_evenum;
541  m_prev_copper_ctr = cur_copper_ctr;
542  m_prev_exprunsubrun_no = m_exprunsubrun_no;
543  }
544  return;
545 }
546 
547 
548 
549 void DesSerPrePC::DataAcquisition()
550 {
551  // For data check
552  unsigned int eve_copper_0 = 0;
553  // B2INFO("initializing...");
554  printf("[DEBUG] initializing...\n"); fflush(stdout);
555  initialize();
556 
557  // B2INFO("Done.");
558  printf("[DEBUG] Done.\n"); fflush(stdout);
559 
560  if (m_start_flag == 0) {
561  //
562  // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
563  //
564  Connect();
565  if (m_status.isAvailable()) {
566  // B2INFO("DeSerializerPrePC: Waiting for Start...\n");
567  printf("[DEBUG] DeSerializerPrePC: Waiting for Start...\n"); fflush(stdout);
568  m_status.reportRunning();
569  }
570  m_start_time = getTimeSec();
571  n_basf2evt = 0;
572  }
573 
574  //
575  // Main loop
576  //
577  while (1) {
578  //
579  // Stand-by loop
580  //
581 #ifdef NONSTOP
582  if (m_run_pause > 0 || m_run_error > 0) {
583  waitResume();
584  }
585 #endif
586 
587  clearNumUsedBuf();
588  // RawDataBlock raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
589  RawDataBlockFormat raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
590 
591  //
592  // Recv loop
593  //
594  for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
595  //
596  // Receive data from COPPER
597  //
598  eve_copper_0 = 0;
599  int delete_flag_from =
600  0; // Delete flag for temp_rawdatablk.It can be set to 1 by setRecvdBuffer if the buffer size is larger than that of pre-allocated buffer.
601  int delete_flag_to =
602  0; // Delete flag for raw_datablk[i]. It can be set to 1 by getNewBuffer if the buffer size is larger than that of pre-allocated buffer.
603  RawDataBlockFormat temp_rawdatablk;
604  try {
605  setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
606  checkData(&temp_rawdatablk, &eve_copper_0);
607  } catch (string err_str) {
608  printf("Error was detected\n"); fflush(stdout);
609  break;
610  }
611  // PreRawCOPPERFormat_latest pre_rawcopper_latest;
612  // pre_rawcopper_latest.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
613  // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
615 
616  int temp_num_events, temp_num_nodes;
617  int temp_nwords_to;
618  int* buf_to = NULL;
619 #ifdef REDUCED_RAWCOPPER
620  //
621  // Copy reduced buffer
622  //
623  // int* buf_to = getNewBuffer(m_pre_rawcpr.CalcReducedDataSize(&temp_rawdatablk), &delete_flag_to); // basf2-dependent style
624  int* temp_bufin = temp_rawdatablk.GetWholeBuffer();
625  int temp_nwords_from = temp_rawdatablk.TotalBufNwords();
626  temp_num_events = temp_rawdatablk.GetNumEvents();
627  temp_num_nodes = temp_rawdatablk.GetNumNodes();
628  int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
629  buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
630 
631  // m_pre_rawcpr.CopyReducedData(&temp_rawdatablk, buf_to, delete_flag_from); // basf2-dependent style
632  m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
633  if (calced_temp_nwords_to != temp_nwords_to) {
634  char err_buf[500];
635  sprintf(err_buf,
636  "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
637  calced_temp_nwords_to, temp_nwords_to);
638  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
639  exit(1);
640  }
641  m_status.copyEventHeader(buf_to);
642 #else
643  buf_to = temp_rawdatablk.GetWholeBuffer();
644  temp_nwords_to = temp_rawdatablk.TotalBufNwords();
645  temp_num_events = temp_rawdatablk.GetNumEvents();
646  temp_num_nodes = temp_rawdatablk.GetNumNodes();
647  delete_flag_to = delete_flag_from;
648  delete_flag_from = 0; // to avoid double delete
649 #endif
650 
651  //
652  // Set buffer to the RawData class stored in DataStore
653  //
654 // raw_datablk[ j ].SetBuffer( (int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
655 // delete_flag_to, temp_rawdatablk.GetNumEvents(),
656 // temp_rawdatablk.GetNumNodes());
657  raw_datablk[ j ].SetBuffer(buf_to, temp_nwords_to, delete_flag_to, temp_num_events, temp_num_nodes);
658 
659 
660  //
661  // CRC16 check after data reduction
662  //
663 #ifdef REDUCED_RAWCOPPER
664  PostRawCOPPERFormat_latest post_rawcopper_latest;
665 
666 // post_rawcopper_latest.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
667 // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
668  post_rawcopper_latest.SetBuffer(raw_datablk[ j ].GetWholeBuffer(), raw_datablk[ j ].TotalBufNwords(),
669  0, raw_datablk[ j ].GetNumEvents(), raw_datablk[ j ].GetNumNodes());
670 
671  for (int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
672  int block_num = 0;
673  if (post_rawcopper_latest.GetFINESSENwords(block_num, i_finesse_num) > 0) {
674  post_rawcopper_latest.CheckCRC16(block_num, i_finesse_num);
675  }
676  }
677 
678 #endif
679  }
680 
681 #ifdef NONSTOP
682  // Goto Stand-by loop when run is paused or stopped by error
683  if (m_run_pause != 0 || m_run_error != 0) continue;
684 #endif
685 
686 
688  // From Serializer.cc
690  if (m_start_flag == 0) {
691  m_start_time = getTimeSec();
692  n_basf2evt = 0;
693  }
694  // StoreArray<RawCOPPER> rawcprarray;
695  // StoreArray<RawDataBlock> raw_dblkarray;
696 
697  for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
698 
699  //
700  // Send data
701  //
702  if (m_start_flag == 0) {
703  // B2INFO("SerializerPC: Sending the 1st packet...");
704  printf("[DEBUG] SerializerPC: Sending the 1st packet...\n"); fflush(stdout);
705  }
706 
707  try {
708  m_sent_totbytes += sendByWriteV(&(raw_datablk[ j ]));
709  } catch (string err_str) {
710 #ifdef NONSTOP
711  break;
712 #endif
713  print_err.PrintError((char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
714  exit(1);
715  }
716  if (m_start_flag == 0) {
717  // B2INFO("Done. ");
718  printf("[DEBUG] Done.\n"); fflush(stdout);
719  m_start_flag = 1;
720  }
721  }
722 
723 #ifdef NONSTOP
724  // Goto Stand-by loop when run is paused or stopped by error
725  if (m_run_pause != 0 || m_run_error != 0) continue;
726 #endif
727 
728  //
729  // Monitor
730  //
731  if (max_nevt >= 0 || max_seconds >= 0.) {
732 #ifdef AIUEO
733  if (n_basf2evt % 10000 == 0) {
734 // if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
735 // || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
736  printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
737  max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
738  }
739 #endif
740  }
741 
742  if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC) % 100000 == 0) {
743  double interval = cur_time - m_prev_time;
744  double total_time = cur_time - m_start_time;
745  printf("[DEBUG] Event %12d Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] sent %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s]\n",
746  n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC,
747  (n_basf2evt - m_prev_nevt)*NUM_EVT_PER_BASF2LOOP_PC / interval / 1.e3,
748  (m_recvd_totbytes - m_recvd_prev_totbytes) / interval / 1.e6,
749  (m_sent_totbytes - m_sent_prev_totbytes) / interval / 1.e6,
750  total_time,
751  interval);
752  fflush(stdout);
753 
754  m_prev_time = cur_time;
755  m_recvd_prev_totbytes = m_recvd_totbytes;
756  m_sent_prev_totbytes = m_sent_totbytes;
757  m_prev_nevt = n_basf2evt;
758  cur_time = getTimeSec();
759  }
760 
761  n_basf2evt++;
762 
763  if (m_status.isAvailable()) {
764  m_status.setOutputNBytes(m_sent_totbytes);
765  m_status.setOutputCount(n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC);
766  }
767 
768  }
769 
770  return;
771 }
772 
774 // From Serializer.cc
776 
777 
778 #ifdef NONSTOP
779 void DesSerPrePC::waitResume()
780 {
781  if (m_run_pause == 0) {
782  while (true) {
783  if (checkRunPause()) break;
784 #ifdef NONSTOP_DEBUG
785  printf("\033[31m");
786  printf("###########(DesSerPrePC) Waiting for Runstop() ###############\n");
787  fflush(stdout);
788  printf("\033[0m");
789 #endif
790  sleep(1);
791  }
792  }
793 
794  while (true) {
795 #ifdef NONSTOP_DEBUG
796  printf("\033[31m");
797  printf("###########(Ser) Waiting for Resume ###############\n");
798  fflush(stdout);
799  printf("\033[0m");
800 #endif
801  if (checkRunRecovery()) {
802  m_run_pause = 0;
803  break;
804  }
805  sleep(1);
806  }
807 
808 
809  printf("Done!\n"); fflush(stdout);
810 
811  printf("Checking connection to eb0\n"); fflush(stdout);
812  if (CheckConnection(m_socket_send) < 0) {
813  printf("Trying Accept1\n"); fflush(stdout);
814  Accept();
815  printf("Trying Accept2\n"); fflush(stdout);
816  }
817 
818  printf("Checking connection to COPPER\n"); fflush(stdout);
819  for (int i = 0; i < m_num_connections; i++) {
820  if (CheckConnection(m_socket_recv[ i ]) < 0) m_socket_recv[ i ] = -1;
821  }
822  Connect();
823 
824  resumeRun();
825  return;
826 }
827 
828 
829 
830 #endif
831 
Belle2::PostRawCOPPERFormat_latest::CheckCRC16
int CheckCRC16(int n, int finesse_num)
check magic words
Definition: PostRawCOPPERFormat_latest.cc:216
Belle2::RawDataBlockFormat::SetBuffer
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawDataBlockFormat.cc:131
Belle2::RawDataBlockFormat::GetBufferPos
virtual int GetBufferPos(int n)
get position of data block in word
Definition: RawDataBlockFormat.cc:30
Belle2::PostRawCOPPERFormat_latest::GetFINESSENwords
int GetFINESSENwords(int n, int finesse) OVERRIDE_CPP17
get data size of FINESSE buffer
Definition: PostRawCOPPERFormat_latest.cc:50
Belle2::SendHeader::SetBuffer
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
Belle2::PostRawCOPPERFormat_latest
The Raw COPPER class ver.1 ( the latest version since May, 2014 ) This class stores data received by ...
Definition: PostRawCOPPERFormat_latest.h:40
Belle2::RawFTSWFormat_latest::GetTTCtimeTRGType
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
Get a word containing ctime and trigger type info.
Definition: RawFTSWFormat_latest.cc:23
Belle2::RawDataBlockFormat::GetBuffer
virtual int * GetBuffer(int n)
get nth buffer pointer
Definition: RawDataBlockFormat.cc:124
Belle2::PreRawCOPPERFormat_latest
The Raw COPPER class ver.1 ( the latest version since May, 2014 ) This class stores data received by ...
Definition: PreRawCOPPERFormat_latest.h:30
Belle2::RawFTSWFormat_latest::GetEveNo
unsigned int GetEveNo(int n) OVERRIDE_CPP17
Get event #.
Definition: RawFTSWFormat_latest.cc:85
Belle2::PreRawCOPPERFormat_latest::CheckData
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check data contents
Definition: PreRawCOPPERFormat_latest.cc:173
Belle2::SendHeader
Definition: SendHeader.h:21
Belle2::RawDataBlockFormat::GetNumNodes
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
Definition: RawDataBlockFormat.h:52
Belle2::RawTLUFormat
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition: RawTLUFormat.h:25
Belle2::SendHeader::GetNumEventsinPacket
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
Belle2::RawTLUFormat::CheckData
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition: RawTLUFormat.cc:60
Belle2::RawDataBlockFormat
The RawDataBlockFormat class Format information for rawdata handling.
Definition: RawDataBlockFormat.h:25
Belle2::RawDataBlockFormat::GetNumEvents
virtual int GetNumEvents()
get # of events in m_buffer
Definition: RawDataBlockFormat.h:55
Belle2::RawFTSWFormat_latest::GetExpRunSubrun
unsigned int GetExpRunSubrun(int n) OVERRIDE_CPP17
Exp# (10bit) run# (14bit) restart # (8bit)
Definition: RawFTSWFormat_latest.h:170
Belle2::RawFTSWFormat_latest::CheckData
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check the data contents
Definition: RawFTSWFormat_latest.cc:132
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::RawDataBlockFormat::GetBlockNwords
virtual int GetBlockNwords(int n)
get size of a data block
Definition: RawDataBlockFormat.cc:107
Belle2::RawDataBlockFormat::GetWholeBuffer
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
Definition: RawDataBlockFormat.cc:119
Belle2::RawCOPPERFormat_latest::GetTTCtimeTRGType
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
get b2l block from "FEE b2link header"
Definition: RawCOPPERFormat_latest.h:308
Belle2::RawFTSWFormat_latest
The Raw FTSW class 3 ( 2019.8.20 )
Definition: RawFTSWFormat_latest.h:24
Belle2::RawFTSWFormat_latest::GetTTUtime
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
get unixtime of the trigger
Definition: RawFTSWFormat_latest.cc:40
Belle2::RawDataBlockFormat::TotalBufNwords
virtual int TotalBufNwords()
Get total length of m_buffer.
Definition: RawDataBlockFormat.cc:101
Belle2::RawDataBlockFormat::CheckFTSWID
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlockFormat.cc:73
Belle2::RawTLUFormat::GetEveNo
unsigned int GetEveNo(int n)
Get Event #.
Definition: RawTLUFormat.cc:37
Belle2::RawCOPPERFormat_latest::GetTTUtime
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
Check if COPPER Magic words are correct.
Definition: RawCOPPERFormat_latest.h:314
Belle2::RawDataBlockFormat::CheckTLUID
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlockFormat.cc:88
Belle2::RawDataBlockFormat::GetNumEntries
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
Definition: RawDataBlockFormat.h:49