Belle II Software  release-08-01-10
des_ser_PCIe40_main.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include "des_ser_PCIe40_main.h"
9 using namespace std;
10 
11 #ifdef SPLIT_ECL_ECLTRG
12 const std::vector<int> splitted_ch {16}; // recl3: cpr6001-6008,cpr13001 (0-15,16ch)
13 #endif
14 
15 #ifndef USE_ZMQ
16 unsigned int* data_1[NUM_SENDER_THREADS];
17 unsigned int* data_2[NUM_SENDER_THREADS];
18 
19 //pthread_t sender_thr[NUM_CLIENTS];
20 pthread_mutex_t mtx1_ch[NUM_SENDER_THREADS];
21 pthread_mutex_t mtx2_ch[NUM_SENDER_THREADS];
22 #endif
23 pthread_mutex_t mtx_sender_log;
24 
25 #ifndef USE_ZMQ
26 int buffer_filled[NUM_SENDER_THREADS][2];
27 int copy_nwords[NUM_SENDER_THREADS][2];
28 #endif
29 
30 #ifdef USE_ZMQ
32 // Handshake by ZMQ
34 zmq::socket_t* zmq_writer[NUM_SENDER_THREADS];
35 zmq::socket_t* zmq_reader[NUM_SENDER_THREADS];
36 #endif
37 
39 // From main_pcie40_dmahirate.cpp
41 bool exit_on_error = false ;
42 int nTot = 100000 ;
43 //int max_number_of_messages = 0x10000000;
44 unsigned int max_number_of_messages = 10;
45 
46 
48 // Error counter in checkDMAHeader()
50 unsigned int dmaerr_no_data = 0;
51 unsigned int dmaerr_bad_size = 0;
52 unsigned int dmaerr_bad_size_dmatrl = 0;
53 unsigned int dmaerr_bad_word_size = 0;
54 unsigned int dmaerr_bad_header = 0;
55 unsigned int dmaerr_bad_dmatrl = 0;
57 // Error counter in checkEventData()
59 
60 unsigned int total_crc_good[NUM_SENDER_THREADS] = {0};
61 int total_crc_errors[NUM_SENDER_THREADS] = {0};
62 unsigned int err_flag_cnt[NUM_SENDER_THREADS] = {0};
63 unsigned int cur_evtnum[NUM_SENDER_THREADS] = {0};
64 
65 unsigned int err_not_reduced[NUM_SENDER_THREADS] = {0};
66 unsigned int err_bad_7f7f[NUM_SENDER_THREADS] = {0};
67 unsigned int err_bad_runnum[NUM_SENDER_THREADS] = {0};
68 unsigned int err_bad_linknum[NUM_SENDER_THREADS] = {0};
69 unsigned int err_bad_evenum[NUM_SENDER_THREADS] = {0};
70 unsigned int err_bad_ffaa[NUM_SENDER_THREADS] = {0};
71 unsigned int err_bad_ff55[NUM_SENDER_THREADS] = {0};
72 unsigned int err_bad_linksize[NUM_SENDER_THREADS] = {0};
73 unsigned int err_link_eve_jump[NUM_SENDER_THREADS] = {0};
74 unsigned int crc_err_ch[NUM_SENDER_THREADS][ MAX_PCIE40_CH];
75 
77 // hostname
79 std::map< string, unsigned int > host_nodeid;
80 char hostnamebuf[50];
81 
82 
83 unsigned int n_messages[17] = {0};
84 // std::map< int , int > n_messages = {
85 // { 0 , 0 } , // no data
86 // { 1 , 0 } , // bad header
87 // { 2 , 0 } , // bad size
88 // { 3 , 0 } , // Bad word size
89 // { 4 , 0 } , // Bad belle2 header
90 // { 5 , 0 } , // bad trailer size
91 // { 6 , 0 } , // bad trailer
92 // { 7 , 0 } , // bad 7ff code
93 // { 8 , 0 } , // bad version
94 // { 9 , 0 } , // bad runnber
95 // { 10 , 0 } , // bad event numnber
96 // { 11 , 0 } , // bad link number
97 // { 12 , 0 } , // bad FFAA
98 // { 13 , 0 } , // bad link size
99 // { 14 , 0 } , // bad data size
100 // { 15 , 0 } , // Bad CRC
101 // { 16 , 0 } // missing links
102 // };
103 
104 const int CRC16_XMODEM_TABLE[] = {
105  0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
106  0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
107  0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
108  0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
109  0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
110  0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
111  0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
112  0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
113  0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
114  0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
115  0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
116  0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
117  0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
118  0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
119  0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
120  0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
121  0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
122  0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
123  0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
124  0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
125  0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
126  0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
127  0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
128  0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
129  0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
130  0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
131  0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
132  0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
133  0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
134  0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
135  0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
136  0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
137 } ;
138 
139 void crc_calc(unsigned int& crc, const unsigned int& data)
140 {
141  int byte1, byte2, byte3, byte4 ;
142  byte1 = data & 0xFF;
143  byte2 = (data & 0xFF00) >> 8;
144  byte3 = (data & 0xFF0000) >> 16;
145  byte4 = (data & 0xFF000000) >> 24;
146  crc = (((crc) << 8) & 0xff00) ^ CRC16_XMODEM_TABLE[(((crc) >> 8) & 0xff)^byte4] ;
147  crc = (((crc) << 8) & 0xff00) ^ CRC16_XMODEM_TABLE[(((crc) >> 8) & 0xff)^byte3] ;
148  crc = (((crc) << 8) & 0xff00) ^ CRC16_XMODEM_TABLE[(((crc) >> 8) & 0xff)^byte2] ;
149  crc = (((crc) << 8) & 0xff00) ^ CRC16_XMODEM_TABLE[(((crc) >> 8) & 0xff)^byte1] ;
150 }
151 
152 unsigned int get_crc(unsigned int* data, int length, unsigned int initial_value)
153 {
154  unsigned int result = initial_value ;
155  // printf("get_crc()\n");
156  for (int i = 0 ; i < length ; ++i) {
157  crc_calc(result, data[ i ]) ;
158  // printf("%.8d %.8x %.8x\n", i, result, data[i]);
159  }
160  return result ;
161 }
162 
163 
164 int getEventNumber(unsigned int* data)
165 {
166  if (0 != data) return data[4] ;
167  else return -1 ;
168 }
169 
170 void printHeader(unsigned int* data)
171 {
172  if (0 != data) {
173  printf("Header : %8X%8X%8X%8X%8X%8X%8X%8X\n", data[7], data[6], data[5], data[4],
174  data[3], data[2], data[1], data[0]) ;
175  } else printf("No data\n") ;
176 }
177 
178 void printTrailer(unsigned int* data)
179 {
180  if (0 != data) {
181  printf("Trailer : %8X%8X%8X%8X%8X%8X%8X%8X\n", data[7], data[6], data[5], data[4],
182  data[3], data[2], data[1], data[0]) ;
183  } else printf("No data\n") ;
184 }
185 
186 void printData(unsigned int* data)
187 {
188  if (0 != data) {
189  printf("Data : %8X%8X%8X%8X%8X%8X%8X%8X\n", data[7], data[6], data[5], data[4],
190  data[3], data[2], data[1], data[0]) ;
191  } else printf("No data\n") ;
192 }
193 
194 void writeToFile(std::ofstream& the_file, unsigned int* data, int size)
195 {
196  the_file << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << std::endl ; // to separate events
197  for (int i = 0 ; i < 8 * (size - 2) ; ++i) { // Write the data in 32bit values
198  the_file << std::hex << data[ i ] << std::endl ;
199  }
200 }
201 
202 void printLine(unsigned int* data, int pos)
203 {
204  pos = pos - (pos % 8);
205  printf("pos %.8x : ", pos);
206  for (int i = pos; i < pos + 8; i++) {
207  printf(" %.8x", data[ pos + i]);
208  }
209  printf("\n");
210  fflush(stdout);
211 }
212 
213 void printEventData(unsigned int* data)
214 {
215  int eventSize = ((data[ 0 ] & 0xFFFF)) - 1 ; // minus header
216  printf(" eve 0 : ");
217  for (int i = 0 ; i < eventSize; ++i) {
218  printf("%.8x ", data[ i ]);
219  if (i % 8 == 7)printf("\n eve %.3x : ", i);
220  }
221  fflush(stdout);
222 }
223 
224 void printEventData(unsigned int* data, int size)
225 {
226  printf("%.8x : ", 0);
227  if (0 != data) {
228  for (int i = 0 ; i < size; ++i) {
229  printf("%.8x ", data[ i ]);
230  if (i % 8 == 7)printf("\n%.8x : ", i + 1);
231  }
232  } else printf("No data\n") ;
233  printf("\n");
234  fflush(stdout);
235 }
236 
237 void printEventData(unsigned int* data, int size, int sender_id)
238 {
239  printf("thread %d : %.8x : ", sender_id, 0);
240  if (0 != data) {
241  for (int i = 0 ; i < size; ++i) {
242  printf("%.8x ", data[ i ]);
243  if (i % 8 == 7)printf("\nthread %d : %.8x : ", sender_id, i + 1);
244  }
245  } else printf("No data\n") ;
246  printf("\n");
247  fflush(stdout);
248 }
249 
250 void printFullData(unsigned int* data)
251 {
252  printf("Header : %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n",
253  data[0], data[1], data[2], data[3],
254  data[4], data[5], data[6], data[7]) ;
255  // printf( "Header : %8X%8X%8X%8X%8X%8X%8X%8X\n" , data[7], data[6] ,data[5] ,data[4],
256  // data[3], data[2], data[1], data[0] ) ;
257 
258  int eventSize = ((data[ 0 ] & 0xFF)) - 1 ; // minus header
259 
260  printf(" data 0 : ");
261  for (int i = 0 ; i < eventSize * 8 ; ++i) {
262  printf("%.8x ", data[ i ]);
263  if (i % 8 == 7)printf("\n data %.3x : ", i);
264  }
265  // for ( int i = 1 ; i < eventSize + 10 ; ++i ) {
266  // printf( "data %3d: %8X%8X%8X%8X%8X%8X%8X%8X\n" , i-1 , data[ 8*i+7 ] , data[ 8*i+6 ] , data[ 8*i+5 ] ,
267  // data[ 8*i+4 ], data[ 8*i+3 ], data[ 8*i+2 ], data[ 8*i+1 ], data[ 8*i ] ) ;
268  // }
269  printf("\nTrailer : %8X %8X %8X %8X %8X %8X %8X %8X\n", data[8 * eventSize + 7], data[8 * eventSize + 6],
270  data[8 * eventSize + 5], data[8 * eventSize + 4],
271  data[8 * eventSize + 3], data[8 * eventSize + 2], data[8 * eventSize + 1], data[8 * eventSize]) ;
272  fflush(stdout);
273 }
274 
275 int get1stChannel(unsigned int*& data)
276 {
277  int ret_1st_ch = -1;
278  unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
279 
280  for (int i = 0; i < MAX_PCIE40_CH; i++) {
281  int linksize = 0;
282  if (i < MAX_PCIE40_CH - 1) {
283  linksize = data[ POS_TABLE_POS + (i + 1) ] - data[ POS_TABLE_POS + i ];
284  } else {
285  linksize = event_length - (data[ POS_TABLE_POS + (MAX_PCIE40_CH - 1) ] + LEN_ROB_TRAILER);
286  }
287  if (linksize > 0) {
288  ret_1st_ch = i;
289  break;
290  }
291  }
292  return ret_1st_ch;
293 }
294 
295 
296 void printEventNumberError(unsigned int*& data, const unsigned int evtnum, const unsigned int exprun, const int eve_diff,
297  const int sender_id)
298 {
299  //
300  // event # check ( Since this check is done in a single thread, only differnce in the prev. event came to this thread can be checked.
301  // So, if event # from PCIe40 are in order like, 0, 3, 2, 10002, 9746, 5, 8, 7, 10007, 9753, No event jump can be issued.
302  // eb0 will check futher check.
303  //
304 
305  unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
306  char err_buf[2000] = {0};
307  int reduced_flag = 1; // 0 : not-reduced(error event) 1: reduced
308  if (data[ MAGIC_7F7F_POS ] & 0x00008000) {
309  reduced_flag = 0;
310  }
311 
312  pthread_mutex_lock(&(mtx_sender_log));
313  n_messages[ 10 ] = n_messages[ 10 ] + 1 ;
314  if (reduced_flag == 1) {
315  sprintf(err_buf,
316  "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Invalid event_number. Exiting...: cur 32bit eve %u preveve %u for all channels : prun %u crun %u\n %s %s %d\n",
317  sender_id, hostnamebuf, get1stChannel(data),
318  data[EVENUM_POS], evtnum + (eve_diff - 1),
319  exprun, data[RUNNO_POS],
320  __FILE__, __PRETTY_FUNCTION__, __LINE__);
321  } else {
322  sprintf(err_buf,
323  "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Invalid event_number. Exiting...: cur 32bit eve %u preveve %u ( ",
324  sender_id, hostnamebuf, get1stChannel(data),
325  data[EVENUM_POS], evtnum + (eve_diff - 1));
326  int temp_pos = 0;
327  unsigned int temp_eve = 0;
328  for (int i = 0; i < MAX_PCIE40_CH; i++) {
329  int linksize = 0;
330  if (i < MAX_PCIE40_CH - 1) {
331  linksize = data[ POS_TABLE_POS + (i + 1) ] - data[ POS_TABLE_POS + i ];
332  } else {
333  linksize = event_length - (data[ POS_TABLE_POS + (MAX_PCIE40_CH - 1) ] + LEN_ROB_TRAILER);
334  }
335  if (linksize <= 0) continue;
336  temp_pos = data[ POS_TABLE_POS + i ] + OFFSET_HDR;
337  temp_eve = data[ temp_pos +
338  Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
339  Belle2::PreRawCOPPERFormat_latest::POS_TT_TAG ];
340 
341  if (evtnum + eve_diff != temp_eve) {
342  sprintf(err_buf + strlen(err_buf),
343  "ch %d eve 0x%.8x : ",
344  i, temp_eve);
345  }
346  }
347  sprintf(err_buf + strlen(err_buf), "prun %u crun %u\n %s %s %d\n",
348  exprun, data[RUNNO_POS],
349  __FILE__, __PRETTY_FUNCTION__, __LINE__);
350  }
351  printf("%s\n", err_buf); fflush(stdout);
352  printEventData(data, event_length, sender_id);
353  err_bad_evenum[sender_id]++;
354  pthread_mutex_unlock(&(mtx_sender_log));
355  return;
356 }
357 
358 
359 void checkUtimeCtimeTRGType(unsigned int*& data, const int sender_id)
360 {
361  unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
362  unsigned int new_exprun = data[ Belle2::RawHeader_latest::POS_EXP_RUN_NO ] ;
363  unsigned int new_evtnum = data[ Belle2::RawHeader_latest::POS_EVE_NO ] ;
364  //
365  // Check the 7f7f magic word
366  //
367  if ((data[ MAGIC_7F7F_POS ] & 0xFFFF0000) != 0x7F7F0000) {
368  char err_buf[500] = {0};
369  pthread_mutex_lock(&(mtx_sender_log));
370  sprintf(err_buf,
371  "[FATAL] thread %d : %s : ERROR_EVENT : Invalid Magic word in ReadOut Board header( 0x%.8x ) : It must be 0x7f7f???? : eve %d exp %d run %d sub %d : %s %s %d",
372  sender_id, hostnamebuf, data[ MAGIC_7F7F_POS ],
373  new_evtnum,
374  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
375  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
376  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
377  __FILE__, __PRETTY_FUNCTION__, __LINE__);
378  printf("%s\n", err_buf); fflush(stdout);
379  printEventData(data, event_length, sender_id);
380  pthread_mutex_unlock(&(mtx_sender_log));
381 #ifndef NO_ERROR_STOP
382  exit(1);
383 #endif
384  }
385 
386  //
387  // Check if non data-reduction bit was set or not.
388  //
389  if (!(data[ MAGIC_7F7F_POS ] & 0x00008000)) {
390  // reduced
391  pthread_mutex_lock(&(mtx_sender_log));
392  printf("[FATAL] thread %d : %s : This function cannot be used for already reduced data. 7f7f header is 0x%.8x : eve %d exp %d run %d sub %d : %s %s %d\n",
393  sender_id, hostnamebuf, data[ MAGIC_7F7F_POS ],
394  new_evtnum,
395  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
396  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
397  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
398  __FILE__, __PRETTY_FUNCTION__, __LINE__);
399  printEventData(data, event_length, sender_id);
400  pthread_mutex_unlock(&(mtx_sender_log));
401 #ifndef NO_ERROR_STOP
402  exit(1);
403 #endif
404  }
405 
406 
407  //
408  // Check consistency of B2L header over all input channels
409  //
410  int flag = 0, err_flag = 0, err_ch = -1;
411  unsigned int temp_utime = 0, temp_ctime_trgtype = 0, temp_eve = 0, temp_exprun = 0;
412  unsigned int utime[MAX_PCIE40_CH], ctime_trgtype[MAX_PCIE40_CH], eve[MAX_PCIE40_CH], exprun[MAX_PCIE40_CH];
413  int used_ch[MAX_PCIE40_CH] = {0};
414  char err_buf[20000];
415  int first_ch = -1;
416 
417  memset(utime, 0, sizeof(utime));
418  memset(ctime_trgtype, 0, sizeof(ctime_trgtype));
419  memset(eve, 0, sizeof(eve));
420  memset(exprun, 0, sizeof(exprun));
421 
422  for (int i = 0; i < MAX_PCIE40_CH; i++) {
423  unsigned int temp_ctime_trgtype_footer = 0, temp_eve_footer = 0;
424  int linksize = 0;
425  if (i < MAX_PCIE40_CH - 1) {
426  linksize = data[ POS_TABLE_POS + (i + 1) ] - data[ POS_TABLE_POS + i ];
427  } else {
428  linksize = event_length - (data[ POS_TABLE_POS + (MAX_PCIE40_CH - 1) ] + LEN_ROB_TRAILER);
429  }
430  if (linksize <= 0) {
431  continue;
432  } else {
433  used_ch[ i ] = 1;
434  }
435 
436  int temp_pos = data[ POS_TABLE_POS + i ] + OFFSET_HDR;
437  ctime_trgtype[ i ] = data[ temp_pos +
438  Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
439  Belle2::PreRawCOPPERFormat_latest::POS_TT_CTIME_TYPE ];
440  eve[ i ] = data[ temp_pos +
441  Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
442  Belle2::PreRawCOPPERFormat_latest::POS_TT_TAG ];
443  utime[ i ] = data[ temp_pos +
444  Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
445  Belle2::PreRawCOPPERFormat_latest::POS_TT_UTIME ];
446  exprun[ i ] = data[ temp_pos +
447  Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
448  Belle2::PreRawCOPPERFormat_latest::POS_EXP_RUN ];
449  temp_ctime_trgtype_footer = data[ temp_pos + linksize +
450  - (Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER +
451  Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER) +
452  Belle2::PreRawCOPPERFormat_latest::POS_TT_CTIME_B2LFEE ];
453  temp_eve_footer = data[ temp_pos + linksize +
454  - (Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER +
455  Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER) +
456  Belle2::PreRawCOPPERFormat_latest::POS_CHKSUM_B2LFEE ];
457 
458  if (flag == 0) {
459  temp_ctime_trgtype = ctime_trgtype[ i ];
460  temp_eve = eve[ i ];
461  temp_utime = utime[ i ];
462  temp_exprun = exprun[ i ];
463  flag = 1;
464  first_ch = i;
465 
466  if (temp_eve != new_evtnum) {
467  pthread_mutex_lock(&(mtx_sender_log));
468  printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Invalid event_number. Exiting...: eve in ROBheader = 0x%.8x , ch %d 's eve = 0x%.8x : exp %d run %d sub %d : %s %s %d\n",
469  sender_id, hostnamebuf, i,
470  new_evtnum, i, temp_eve,
471  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
472  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
473  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
474  __FILE__, __PRETTY_FUNCTION__, __LINE__);
475  printEventData(data, event_length, sender_id);
476  pthread_mutex_unlock(&(mtx_sender_log));
477  exit(1);
478  }
479 
480  } else {
481  if (temp_ctime_trgtype != ctime_trgtype[ i ] || temp_utime != utime[ i ] ||
482  temp_eve != eve[ i ] || temp_exprun != exprun[ i ]) {
483  err_ch = i;
484  err_flag = 1;
485  }
486 
487  }
488 
489  //
490  // Mismatch between header and trailer
491  //
492  if (temp_ctime_trgtype != temp_ctime_trgtype_footer || (temp_eve & 0xffff) != ((temp_eve_footer >> 16) & 0xffff)) {
493  pthread_mutex_lock(&(mtx_sender_log));
494  printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : mismatch(finesse %d) between header(ctime 0x%.8x eve 0x%.8x) and footer(ctime 0x%.8x eve_crc16 0x%.8x). Exiting... : exp %d run %d sub %d : %s %s %d\n",
495  sender_id, hostnamebuf, i, i,
496  temp_ctime_trgtype, temp_eve, temp_ctime_trgtype_footer, temp_eve_footer,
497  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
498  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
499  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
500  __FILE__, __PRETTY_FUNCTION__, __LINE__);
501  printEventData(data, event_length, sender_id);
502  pthread_mutex_unlock(&(mtx_sender_log));
503  exit(1);
504  }
505  }
506 
507 
508  //
509  // Mismatch over channels
510  //
511  if (err_flag == 1) {
512  pthread_mutex_lock(&(mtx_sender_log));
513  sprintf(err_buf,
514  "[FATAL] thread %d : %s ch= %d or %d : ERROR_EVENT : mismatch header value over FINESSEs ( between ch %d and ch %d ). Exiting...: ",
515  sender_id, hostnamebuf, err_ch, first_ch, err_ch, first_ch);
516  for (int i = 0; i < MAX_PCIE40_CH; i++) {
517  if (used_ch[ i ] == 1) {
518  sprintf(err_buf + strlen(err_buf),
519  "\nch = %d ctimeTRGtype 0x%.8x utime 0x%.8x eve 0x%.8x exprun 0x%.8x",
520  i, ctime_trgtype[ i ], utime[ i ], eve[ i ], exprun[ i ]);
521  }
522  }
523  printf("%s\n", err_buf); fflush(stdout);
524  pthread_mutex_unlock(&(mtx_sender_log));
525  exit(1);
526  }
527 
528  return;
529 }
530 
531 
532 int checkDMAHeader(unsigned int*& data, unsigned int& size, double& dsize, int& total_pages, int& index_pages)
533 {
534  if (data == 0) {
535  pthread_mutex_lock(&(mtx_sender_log));
536  n_messages[ 0 ] = n_messages[ 0 ] + 1 ;
537  if (n_messages[ 0 ] < max_number_of_messages) {
538  printf("[WARNING] Null pointer to data buffer\n") ;
539  }
540  dmaerr_no_data++;
541  pthread_mutex_unlock(&(mtx_sender_log));
542  return 1 ;
543  }
544 
545  unsigned int fragment_size = data[ DMA_WORDS_OF_256BITS ] & 0xFFFF ;
546  dsize += fragment_size * 32 ; // in bytes
547 
548  if (((data[ DMA_WORDS_OF_256BITS ] & 0xFFFF0000) != 0xEEEE0000) ||
549  (data[ DMA_HDR_MAGIC ] != 0xAAAAEEEE) ||
550  ((data[ DMA_SIZE_IN_BYTES ] & 0xFFFF) != 0xAAAA)) {
551  pthread_mutex_lock(&(mtx_sender_log));
552  n_messages[ 4 ] = n_messages[ 4 ] + 1 ;
553  if (n_messages[ 4 ] < max_number_of_messages) {
554  printf("[FATAL] Invalid DMA header format. ( %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x )\n",
555  data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]) ;
556  printFullData(data);
557  }
558  dmaerr_bad_header++;
559  pthread_mutex_unlock(&(mtx_sender_log));
560 #ifndef NO_ERROR_STOP
561  exit(1);
562 #endif
563  return 5 ;
564  } else if ((data[ DMA_WORDS_OF_256BITS ] & 0xFFFF) > MAX_DMA_WORDS_OF_256BITS) {
565  pthread_mutex_lock(&(mtx_sender_log));
566  n_messages[ 2 ] = n_messages[ 2 ] + 1 ;
567  if (n_messages[ 2 ] < max_number_of_messages) {
568  printf("[FATAL] Too large DMA packet(= %lf bytes). ( %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x )\n",
569  dsize,
570  data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]) ;
571  }
572  dmaerr_bad_size++;
573  pthread_mutex_unlock(&(mtx_sender_log));
574 #ifndef NO_ERROR_STOP
575  exit(1);
576 #endif
577  return 3 ;
578  } else if (((data[ DMA_SIZE_IN_BYTES ] & 0xFFFF0000) >> 16) != (fragment_size * 32)) {
579  pthread_mutex_lock(&(mtx_sender_log));
580  n_messages[ 3 ] = n_messages[ 3 ] + 1 ;
581  if (n_messages[ 3 ] < max_number_of_messages) {
582  printf("[FATAL] Inconsistent between byte-size( = %u ) and 8words-size( = %u ) in DMA header. ( %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x )\n"
583  ,
584  (data[ DMA_SIZE_IN_BYTES ] & 0xFFFF0000) >> 16, fragment_size * 32,
585  data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]) ;
586  }
587  dmaerr_bad_word_size++;
588  pthread_mutex_unlock(&(mtx_sender_log));
589 #ifndef NO_ERROR_STOP
590  exit(1);
591 #endif
592  return 4 ;
593  } else
594 
595  // Checktrailer
596  if (data[ 8 * (fragment_size - 1) ] != fragment_size) {
597  pthread_mutex_lock(&(mtx_sender_log));
598  n_messages[ 5 ] = n_messages[ 5 ] + 1 ;
599  if (n_messages[ 5 ] < max_number_of_messages) {
600  printf("Bad size in trailer : size %.8x size in hdr %.8x bef %.8x\n", data[8 * (fragment_size - 1)], fragment_size,
601  data[ 8 * (fragment_size - 2) ]) ;
602  printLine(data, 8 * (fragment_size - 1));
603  printFullData(data);
604  // printEventData(data, fragment_size);
605  }
606  if (data[ 8 * (fragment_size - 2) ] == fragment_size) {
607  dmaerr_bad_size_dmatrl++;
608  fragment_size--;
609  }
610  pthread_mutex_unlock(&(mtx_sender_log));
611  }
612  // if ( ( data[ 8*(fragment_size-1)+1 ] != 0 ) || ( data[ 8*(fragment_size-1)+2 ] != 0 ) ||
613  // ( data[ 8*(fragment_size-1)+3 ] != 0 ) || ( data[ 8*(fragment_size-1)+4 ] != 0 ) ||
614  // ( data[ 8*(fragment_size-1)+5 ] != 0 ) || ( data[ 8*(fragment_size-1)+6 ] != 0 ) ||
615  // ( data[ 8*(fragment_size-1)+7 ] != 0 ) ) {
616  // n_messages[ 6 ] = n_messages[ 6 ] + 1 ;
617  // if ( n_messages[ 6 ] < max_number_of_messages ) {
618  // printf( "Bad trailer\n" ) ;
619  // printTrailer( &data[ 8*(fragment_size-1) ] ) ;
620  // }
621  // err_bad_dmatrl[sender_id]++;
622  // // return 7 ;
623  // }
624 
625  total_pages = (data[ 4 ] & 0xFFFF0000) >> 16 ;
626  index_pages = (data[ 4 ] & 0xFFFF) ;
627 
628  size = fragment_size ;
629 
630  // Remve header and trailer from data
631  // unsigned int * tmp = new unsigned int[ S_PAGE_SLOT_SIZE/4 ] ;
632  // memcpy( tmp , &data[ 8 ], 8*(fragment_size-2)*4 ) ;
633  // delete [] data ;
634  // data = tmp ;
635  // if ( total_pages != 1 ) return -1 ;
636  return 0 ;
637 }
638 
639 
640 double getTimeSec()
641 {
642  struct timeval t;
643  gettimeofday(&t, NULL);
644  return (t.tv_sec + t.tv_usec * 1.e-6);
645 }
646 
647 void reduceHdrTrl(unsigned int* data, unsigned int& event_nwords)
648 {
649 
650  // TO CHECK LATER unsigned int event_size = data[ 8 ] ;
651  unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
652  if (event_length > 0x100000) {
653  pthread_mutex_lock(&(mtx_sender_log));
654  printf("[FATAL] Too large event size. : 0x%.8x : %u words. Exiting...\n",
655  data[ Belle2::RawHeader_latest::POS_NWORDS ],
656  event_length);
657  printEventData(data, (event_length & 0xfffff));
658  pthread_mutex_unlock(&(mtx_sender_log));
659  exit(1);
660  }
661 
662  unsigned int* temp_data = new unsigned int[event_length];
663  memset(temp_data, 0, event_length * sizeof(unsigned int));
664 
665  if (data[ Belle2::RawHeader_latest::POS_VERSION_HDRNWORDS ] & 0x00008000) {
666  // Remove non-reduced flag
667  data[ Belle2::RawHeader_latest::POS_VERSION_HDRNWORDS ] =
668  data[ Belle2::RawHeader_latest::POS_VERSION_HDRNWORDS ] & 0xffff7fff;
669  // Remove error-flag
670  data[ Belle2::RawHeader_latest::POS_TRUNC_MASK_DATATYPE ] = 0;
671  } else {
672  pthread_mutex_lock(&(mtx_sender_log));
673  printf("[FATAL] reduceHdrTrl() must not be used for already reduced-event. 7f7f word = %.8x . Exiting...\n",
674  data[ Belle2::RawHeader_latest::POS_VERSION_HDRNWORDS ]);
675  fflush(stdout);
676  pthread_mutex_unlock(&(mtx_sender_log));
677  exit(1);
678  }
679 
680  // Copy header before the position table
681  unsigned int cur_pos = 0;
682  unsigned int dst_cur_pos = 0;
683  memcpy(temp_data + dst_cur_pos, data + cur_pos,
684  Belle2::RawHeader_latest::POS_CH_POS_TABLE * sizeof(unsigned int));
685 
686  cur_pos = Belle2::RawHeader_latest::POS_CH_POS_TABLE;
687  dst_cur_pos = Belle2::RawHeader_latest::POS_CH_POS_TABLE;
688 
689 
690  // Check eror flag in ROB header
691  int red_linksize = 0;
692 
693  temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE ] = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE ];
694  for (int i = 0; i < MAX_PCIE40_CH ; i++) {
695 
696  int linksize = 0;
697  if (i < MAX_PCIE40_CH - 1) {
698  linksize = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + (i + 1) ]
699  - data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
700  } else {
701  linksize = event_length - (data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + (MAX_PCIE40_CH - 1) ] +
702  Belle2::RawTrailer_latest::RAWTRAILER_NWORDS);
703  }
704 
705  if (linksize < 0) {
706  pthread_mutex_lock(&(mtx_sender_log));
707  printf("[FATAL] event size(= %d ) for ch %d is negative. Exiting...\n",
708  linksize, i);
709  printEventData(data, (event_length & 0xfffff));
710  fflush(stdout);
711  pthread_mutex_unlock(&(mtx_sender_log));
712  exit(1);
713  } else if (linksize == 0) {
714  // this channel is not used.
715  if (i < MAX_PCIE40_CH - 1) {
716  temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i + 1 ] =
717  temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
718  }
719  continue;
720  } else {
721  red_linksize = linksize
722  - (Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER
723  + Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER
724  - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER
725  - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER)
726  - (Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER
727  + Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER
728  - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER
729  - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER);
730 
731  if (i < MAX_PCIE40_CH - 1) {
732  temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i + 1 ] =
733  temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] + red_linksize;
734  }
735  }
736 
737  // Set position of data
738  dst_cur_pos = temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
739  cur_pos = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
740 
741  // Copy data(B2LHSLB header)
742  temp_data[ dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::POS_B2LHSLB_MAGIC]
743  = data[ cur_pos + Belle2::PreRawCOPPERFormat_latest::POS_MAGIC_B2LHSLB ];
744  dst_cur_pos = dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER;
745  cur_pos = cur_pos + Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER;
746 
747 
748  // Copy data(B2LFEE header)
749  temp_data[ dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::POS_B2L_CTIME ]
750  = data[ cur_pos + Belle2::PreRawCOPPERFormat_latest::POS_B2L_CTIME ];
751  dst_cur_pos = dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER;
752  cur_pos = cur_pos + Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER;
753 
754  // Copy data( Detector data )
755  int numwords_det_buffer = red_linksize
756  - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER
757  - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER
758  - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER
759  - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER;
760 
761  memcpy(temp_data + dst_cur_pos, data + cur_pos,
762  numwords_det_buffer * sizeof(unsigned int));
763  dst_cur_pos = dst_cur_pos + numwords_det_buffer;
764  cur_pos = cur_pos + numwords_det_buffer;
765 
766  // Copy data( B2L FEE trailer )
767  temp_data[ dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::POS_B2LFEE_ERRCNT_CRC16 ]
768  = data[ cur_pos + Belle2::PreRawCOPPERFormat_latest::POS_CHKSUM_B2LFEE ];
769  dst_cur_pos += Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER;
770  cur_pos += Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER;
771 
772  // Copy data( B2L HSLB trailer )
773  temp_data[ dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::POS_B2LHSLB_TRL_MAGIC ]
774  = data[ cur_pos + Belle2::PreRawCOPPERFormat_latest::POS_CHKSUM_B2LHSLB ];
775  dst_cur_pos += Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER;
776  cur_pos += Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER;
777  }
778 
779 
780  // Copy RawCOPPER trailer
781  temp_data[ dst_cur_pos + Belle2::RawTrailer_latest::POS_ERROR_BIT_CH1 ] =
782  data[ cur_pos + Belle2::RawTrailer_latest::POS_ERROR_BIT_CH1 ];
783  temp_data[ dst_cur_pos + Belle2::RawTrailer_latest::POS_ERROR_CH2 ] =
784  data[ cur_pos + Belle2::RawTrailer_latest::POS_ERROR_CH2 ];
785  temp_data[ dst_cur_pos + Belle2::RawTrailer_latest::POS_CHKSUM ] =
786  data[ cur_pos + Belle2::RawTrailer_latest::POS_CHKSUM ];
787  temp_data[ dst_cur_pos + Belle2::RawTrailer_latest::POS_TERM_WORD ] =
788  data[ cur_pos + Belle2::RawTrailer_latest::POS_TERM_WORD ];
789 
790  dst_cur_pos += Belle2::RawTrailer_latest::RAWTRAILER_NWORDS;
791  cur_pos += Belle2::RawTrailer_latest::RAWTRAILER_NWORDS;
792 
793  if (dst_cur_pos > cur_pos) {
794  pthread_mutex_lock(&(mtx_sender_log));
795  printf("[FATAL] reduced data-size ( %d words ) in reduceHdrTrl() is larger than the original size ( %d words). Exiting...\n",
796  dst_cur_pos, cur_pos);
797  fflush(stdout);
798  pthread_mutex_unlock(&(mtx_sender_log));
799  exit(1);
800  }
801 
802  event_nwords = dst_cur_pos;
803  temp_data[ Belle2::RawHeader_latest::POS_NWORDS ] = event_nwords;
804 
805  memset(data, 0, event_nwords * sizeof(unsigned int));
806  memcpy(data, temp_data, dst_cur_pos * sizeof(unsigned int));
807 
808 
809  delete temp_data;
810 
811  return ;
812 }
813 
814 
815 int checkEventData(int sender_id, unsigned int* data, unsigned int event_nwords, unsigned int& exprun,
816  unsigned int& evtnum, unsigned int node_id, std::vector< int > valid_ch)
817 {
818  int expected_number_of_links = valid_ch.size() ;
819  int reduced_flag = 1; // 0 : not-reduced(error event) 1: reduced
820 
821  // For error message
822  unsigned int new_exprun = data[ Belle2::RawHeader_latest::POS_EXP_RUN_NO ] ;
823  unsigned int new_evtnum = data[ Belle2::RawHeader_latest::POS_EVE_NO ] ;
824 
825  // TO CHECK LATER unsigned int event_size = data[ 8 ] ;
826  //
827  // Check if event length is not too long or zero.
828  //
829  unsigned int event_length = data[ EVENT_LEN_POS ];
830  if (event_length > 0x100000) {
831  pthread_mutex_lock(&(mtx_sender_log));
832  printf("[FATAL] thread %d : %s : ERROR_EVENT : Too large event size. : 0x%.8x : %d words. : exp %d run %d sub %d : Exiting...\n",
833  sender_id, hostnamebuf,
834  data[ EVENT_LEN_POS ], data[ EVENT_LEN_POS ],
835  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
836  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
837  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
838  );
839 
840  printEventData(data, (event_length & 0xfffff), sender_id);
841  pthread_mutex_unlock(&(mtx_sender_log));
842  exit(1);
843  } else if (event_length == 0) {
844  pthread_mutex_lock(&(mtx_sender_log));
845  printf("[FATAL] thread %d : %s : ERROR_EVENT : Specified event size is zero. : 0x%.8x : %u words. : exp %d run %d sub %d : Exiting...\n",
846  sender_id, hostnamebuf,
847  data[ EVENT_LEN_POS ], event_length,
848  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
849  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
850  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
851  );
852  printEventData(data, 24, sender_id);
853  pthread_mutex_unlock(&(mtx_sender_log));
854  exit(1);
855  }
856 
857  //
858  // Check the 7f7f magic word
859  //
860  if ((data[ MAGIC_7F7F_POS ] & 0xFFFF0000) != 0x7F7F0000) {
861  char err_buf[500] = {0};
862  pthread_mutex_lock(&(mtx_sender_log));
863  n_messages[ 7 ] = n_messages[ 7 ] + 1 ;
864  if (n_messages[ 7 ] < max_number_of_messages) {
865  sprintf(err_buf,
866  "[FATAL] thread %d : %s : ERROR_EVENT : Invalid Magic word in ReadOut Board header( 0x%.8x ) : It must be 0x7f7f???? : exp %d run %d sub %d",
867  sender_id, hostnamebuf, data[ MAGIC_7F7F_POS ],
868  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
869  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
870  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
871  ) ;
872  printf("%s\n", err_buf); fflush(stdout);
873  printEventData(data, event_length, sender_id);
874  }
875  err_bad_7f7f[sender_id]++;
876  pthread_mutex_unlock(&(mtx_sender_log));
877 #ifndef NO_ERROR_STOP
878  exit(1);
879 #endif
880  }
881 
882  //
883  // Store nodeID
884  //
885  data[ NODEID_POS ] = node_id;
886 
887  //
888  // Check if non data-reduction bit was set or not.
889  //
890  int ffaa_pos = 0, ff55_pos_from_end = 0;
891  if (data[ MAGIC_7F7F_POS ] & 0x00008000) {
892  // not-reduced
893  reduced_flag = 0;
894  ffaa_pos = Belle2::PreRawCOPPERFormat_latest::POS_MAGIC_B2LHSLB;
895  ff55_pos_from_end = - Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER +
896  Belle2::PreRawCOPPERFormat_latest::POS_CHKSUM_B2LHSLB;
897 
898  // printf("[WARNING] thread %d : Error was detected by data-check core in PCIe40 FPGA. : exp %d run %d sub %d\n",
899  // sender_id, data[ MAGIC_7F7F_POS ],
900  // (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
901  // (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
902  // (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
903  // );
904 
905  // if (data[ ERR_POS ] == 0) {
906  // pthread_mutex_lock(&(mtx_sender_log));
907  // printf("[FATAL] thread %d : Data error was deteced by PCIe40 FPGA. Header %.8x, Errorbit %.8x\n", sender_id, data[ MAGIC_7F7F_POS ],
908  // data[ ERR_POS ]);
909  // printEventData(data, event_length, sender_id);
910  // pthread_mutex_unlock(&(mtx_sender_log));
911  // #ifndef NO_ERROR_STOP
912  // exit(1);
913  // #endif
914  // }
915  } else {
916  // reduced
917  reduced_flag = 1;
918  ffaa_pos = Belle2::PostRawCOPPERFormat_latest::POS_B2LHSLB_MAGIC;
919  ff55_pos_from_end = - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER +
920  Belle2::PostRawCOPPERFormat_latest::POS_B2LHSLB_TRL_MAGIC;
921  if (data[ ERR_POS ] != 0) {
922  pthread_mutex_lock(&(mtx_sender_log));
923  printf("[FATAL] thread %d : %s : Inconsistency between header(no error found by FPGA) %.8x and errorbit %.8x (error-bit is non-zero) : exp %d run %d sub %d\n",
924  sender_id, hostnamebuf, data[ MAGIC_7F7F_POS ], data[ ERR_POS ],
925  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
926  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
927  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
928  );
929  printEventData(data, event_length, sender_id);
930  pthread_mutex_unlock(&(mtx_sender_log));
931 #ifndef NO_ERROR_STOP
932  exit(1);
933 #endif
934  }
935  }
936 
937  //
938  // Check event # incrementation
939  //
940  if (evtnum + NUM_SENDER_THREADS != data[EVENUM_POS]) {
941  if (exprun == data[RUNNO_POS]
942  && exprun != 0) { // After a run-change or if this is the 1st event, event incrementation is not checked.
943  printEventNumberError(data, evtnum, exprun, NUM_SENDER_THREADS, sender_id);
944 #ifndef NO_ERROR_STOP
945  exit(1);
946 #endif
947  }
948  }
949 
950 
951  //
952  // Check exprun #
953  //
954  if (exprun == 0) { // default value of exprun
955  exprun = data[RUNNO_POS];
956  } else {
957  if (exprun != data[RUNNO_POS]) {
958  if (new_evtnum < 0 || new_evtnum >= NUM_SENDER_THREADS) {
959  pthread_mutex_lock(&(mtx_sender_log));
960  n_messages[ 9 ] = n_messages[ 9 ] + 1 ;
961  if (n_messages[ 9 ] < max_number_of_messages) {
962  printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Bad exprun(now %.8x prev. %.8x) : exp %d run %d sub %d : Exiting...\n",
963  sender_id, hostnamebuf, get1stChannel(data),
964  exprun, data[RUNNO_POS],
965  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
966  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
967  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
968  ) ;
969  printEventData(data, event_length, sender_id);
970  }
971  err_bad_runnum[sender_id]++;
972  pthread_mutex_unlock(&(mtx_sender_log));
973 #ifndef NO_ERROR_STOP
974  exit(1);
975 #endif
976  } else {
977  if (sender_id == 0) {
978  printf("[DEBUG] thread %d : Run number was changed. cur exprun %.8x prev. exprun %.8x cur eve %.8x : exp %d run %d sub %d\n",
979  sender_id, data[RUNNO_POS], exprun, new_evtnum,
980  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
981  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
982  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
983  );
984  }
985  //
986  // A new run was started.
987  //
988  exprun = data[RUNNO_POS];
989  //
990  // Initialize error counter when run # is changed.
991  //
992  total_crc_good[sender_id] = 0;
993  total_crc_errors[sender_id] = 0;
994  err_flag_cnt[sender_id] = 0;
995  cur_evtnum[sender_id] = 0;
996  err_not_reduced[sender_id] = 0;
997  err_bad_7f7f[sender_id] = 0;
998  err_bad_runnum[sender_id] = 0;
999  err_bad_linknum[sender_id] = 0;
1000  err_bad_evenum[sender_id] = 0;
1001  err_bad_ffaa[sender_id] = 0;
1002  err_bad_ff55[sender_id] = 0;
1003  err_bad_linksize[sender_id] = 0;
1004  err_link_eve_jump[sender_id] = 0;
1005  }
1006  }
1007  }
1008 
1009  //
1010  // Checking each channel's header
1011  //
1012  unsigned int ctime = data[ Belle2::RawHeader_latest::POS_TTCTIME_TRGTYPE ] ;
1013  unsigned int utime = data[ Belle2::RawHeader_latest::POS_TTUTIME ] ;
1014 
1015  unsigned int crc_init = 0xFFFF ;
1016  unsigned int f_crc[ 4 ] = { ctime, new_evtnum, utime, new_exprun } ;
1017  unsigned int first_crc = 0;
1018 
1019  // find number of links
1020  unsigned int cur_pos = 0 ;
1021  int non_crc_counts = 0;
1022  // Check eror flag in ROB header
1023 
1024  unsigned int first_b2lctime = 0;
1025  int first_b2lctime_flag = 0;
1026 
1027  if (reduced_flag == 1) {
1028  first_crc = get_crc(f_crc, 4, crc_init) ;
1029  non_crc_counts = NON_CRC_COUNTS_REDUCED;
1030  } else {
1031  err_flag_cnt[sender_id]++;
1032  // printf("ERROR flag : Printing a whole event... %u\n", new_evtnum);
1033  // printEventData(data, event_length);
1034  first_crc = crc_init;
1035  non_crc_counts = NON_CRC_COUNTS_NOTREDUCED;
1036  }
1037 
1038  int first_eve_flag = 0;
1039  int link_cnt = 0;
1040 
1041  //
1042  // Loop over input channels
1043  //
1044  for (int i = 0; i < MAX_PCIE40_CH; i++) {
1045  if (i == 0) first_b2lctime_flag = 0;
1046 
1047  int linksize = 0;
1048  if (i < MAX_PCIE40_CH - 1) {
1049  linksize = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + (i + 1) ]
1050  - data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
1051  } else {
1052  linksize = event_length - (data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + (MAX_PCIE40_CH - 1) ] +
1053  Belle2::RawTrailer_latest::RAWTRAILER_NWORDS);
1054  }
1055  if (linksize <= 0) continue;
1056  cur_pos = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] + OFFSET_HDR;
1057 
1058  //
1059  // compare valid ch with register value
1060  //
1061  if (valid_ch[link_cnt] != i) {
1062  pthread_mutex_lock(&(mtx_sender_log));
1063  n_messages[ 11 ] = n_messages[ 11 ] + 1 ;
1064  if (n_messages[ 11 ] < max_number_of_messages) {
1065  printf("[FATAL] thread %d : %s ch=%d or %d : ERROR_EVENT : HSLB or PCIe40 channel found in data is ch %d but the next channel must be ch %d according to masking register info. of PCIe40. Please check the status of channel masking. : exp %d run %d sub %d\n",
1066  sender_id,
1067  hostnamebuf, i, valid_ch[link_cnt],
1068  i, valid_ch[link_cnt],
1069  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1070  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1071  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
1072  );
1073  printEventData(data, event_length, sender_id);
1074  }
1075  err_bad_linknum[sender_id]++;
1076  pthread_mutex_unlock(&(mtx_sender_log));
1077 #ifndef NO_ERROR_STOP
1078  exit(1);
1079 #endif
1080  }
1081 
1082  //
1083  // Check FFAA value
1084  //
1085  if ((data[ cur_pos + ffaa_pos ] & 0xFFFF0000) != 0xFFAA0000) {
1086  pthread_mutex_lock(&(mtx_sender_log));
1087  n_messages[ 12 ] = n_messages[ 12 ] + 1 ;
1088  if (n_messages[ 12 ] < max_number_of_messages) {
1089  char err_buf[500];
1090  sprintf(err_buf,
1091  "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : HSLB or PCIe40 header magic word(0xffaa) is invalid. header %.8x : exp %d run %d sub %d : %s %s %d\n",
1092  sender_id,
1093  hostnamebuf, i,
1094  data[ cur_pos + ffaa_pos ],
1095  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1096  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1097  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1098  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1099  printf("%s\n", err_buf); fflush(stdout);
1100  printLine(data, cur_pos + ffaa_pos);
1101  printEventData(data, event_length, sender_id);
1102  }
1103  err_bad_ffaa[sender_id]++;
1104  pthread_mutex_unlock(&(mtx_sender_log));
1105 #ifndef NO_ERROR_STOP
1106  exit(1);
1107 #endif
1108  }
1109 
1110  //
1111  // b2link time check ( Only thread == 0 )
1112  //
1113  if (new_evtnum % 1000000 == 1000) {
1114  if (reduced_flag == 1) {
1115  time_t timer;
1116  struct tm* t_st;
1117  time(&timer);
1118  t_st = localtime(&timer);
1119  if (first_b2lctime_flag == 0) {
1120  first_b2lctime = data[ cur_pos +
1121  Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1122  Belle2::PostRawCOPPERFormat_latest::POS_B2L_CTIME ];
1123  first_b2lctime_flag = 1;
1124  }
1125  pthread_mutex_lock(&(mtx_sender_log));
1126  printf("[DEBUG] thread %d : eve %u ch %3d B2Lctime 0x%.8x diff %.2lf [us] : exp %d run %d sub %d : %s",
1127  sender_id, new_evtnum, i,
1128  data[ cur_pos +
1129  Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1130  Belle2::PostRawCOPPERFormat_latest::POS_B2L_CTIME ],
1131  ((int)(data[ cur_pos +
1132  Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1133  Belle2::PostRawCOPPERFormat_latest::POS_B2L_CTIME ]
1134  - first_b2lctime)) / 127.22,
1135  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1136  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1137  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1138  asctime(t_st));
1139  pthread_mutex_unlock(&(mtx_sender_log));
1140  }
1141  }
1142 
1143  // event # jump
1144  if (first_eve_flag == 0) {
1145  first_eve_flag = 1;
1146  }
1147 
1148  //
1149  // Check event number in ffaa header
1150  //
1151  unsigned int eve_link_8bits = data[ cur_pos + ffaa_pos ] & 0x000000ff;
1152  if ((new_evtnum & 0x000000FF) != eve_link_8bits) {
1153  pthread_mutex_lock(&(mtx_sender_log));
1154  err_link_eve_jump[sender_id]++;
1155  if (err_link_eve_jump[sender_id] < max_number_of_messages) {
1156  char err_buf[500] = {0};
1157  sprintf(err_buf,
1158  "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Invalid event_number (= lower 8bits in ffaa header -> 0x%.2x). Exiting...: eve 0x%.8x ffaa header 0x%.8x : exp %d run %d sub %d : %s %s %d",
1159  sender_id,
1160  hostnamebuf, i,
1161  data[ cur_pos + ffaa_pos ] & 0xff, new_evtnum, data[ cur_pos + ffaa_pos ],
1162  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1163  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1164  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1165  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1166  printf("%s\n", err_buf); fflush(stdout);
1167  printEventData(data, event_length, sender_id);
1168  }
1169  pthread_mutex_unlock(&(mtx_sender_log));
1170 #ifndef NO_ERROR_STOP
1171  exit(1);
1172 #endif
1173  }
1174 
1175  //
1176  // Check channel number in ffaa header
1177  //
1178  unsigned int ch_ffaa = (data[ cur_pos + ffaa_pos ] >> 8) & 0x000000ff;
1179  if ((unsigned int)i != ch_ffaa) {
1180  pthread_mutex_lock(&(mtx_sender_log));
1181  printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : HSLB or PCIe40 channel-number is differnt. It should be ch %d in the channel table in the ROB header buf ffaa header info says ch is %d (%.8x). : exp %d run %d sub %d : %s %s %d\n",
1182  sender_id, hostnamebuf, i,
1183  i, (data[ cur_pos + ffaa_pos ] >> 8) & 0xff,
1184  data[ cur_pos + ffaa_pos ],
1185  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1186  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1187  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1188  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1189  printEventData(data, event_length, sender_id);
1190  pthread_mutex_unlock(&(mtx_sender_log));
1191 #ifndef NO_ERROR_STOP
1192  exit(1);
1193 #endif
1194  }
1195 
1196 
1197 #ifdef SPLIT_ECL_ECLTRG
1198  //
1199  // Check ECLTRG FEE is connected to a proper channel
1200  //
1201  unsigned int ecl_ecltrg_1stword = 0;
1202  if (reduced_flag == 0) {
1203  ecl_ecltrg_1stword = data[ cur_pos + ffaa_pos +
1204  Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1205  Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER ];
1206  } else {
1207  ecl_ecltrg_1stword = data[ cur_pos + ffaa_pos +
1208  Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1209  Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER ];
1210  }
1211 
1212  if (((ecl_ecltrg_1stword & 0xffff0000) >> 16) == 0) {
1213  // ECL data
1214  for (int j = 0; j < splitted_ch.size(); j++) {
1215  if (splitted_ch[j] == i) {
1216  pthread_mutex_lock(&(mtx_sender_log));
1217  printf("[FATAL] thread %d : %s ch=%d : ECL data(1st word = %.8x , eve = %.8x ) are detected in ECLTRG channel. Maybe, fiber connection mismatch. Exiting... : exp %d run %d sub %d : %s %s %d\n",
1218  sender_id,
1219  hostnamebuf, i,
1220  ecl_ecltrg_1stword,
1221  new_evtnum,
1222  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1223  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1224  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1225  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1226  printEventData(data, event_length, sender_id);
1227  pthread_mutex_unlock(&(mtx_sender_log));
1228  exit(1);
1229  }
1230  }
1231  } else {
1232  // ECLTRG data
1233  int ecltrg_flag = 0;
1234  for (int j = 0; j < splitted_ch.size(); j++) {
1235  if (splitted_ch[j] == i) {
1236  ecltrg_flag = 1;
1237  break;
1238  }
1239  }
1240 
1241  if (ecltrg_flag == 0) {
1242  pthread_mutex_lock(&(mtx_sender_log));
1243  printf("[FATAL] thread %d : %s ch=%d : ECLTRG data(1st word = %.8x , eve = %.8x ) are detected in ECL channel. Maybe, fiber connection mismatch. Exiting... : exp %d run %d sub %d : %s %s %d\n",
1244  sender_id,
1245  hostnamebuf, i,
1246  ecl_ecltrg_1stword,
1247  new_evtnum,
1248  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1249  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1250  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1251  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1252  printEventData(data, event_length, sender_id);
1253  pthread_mutex_unlock(&(mtx_sender_log));
1254  exit(1);
1255  }
1256  }
1257 #endif // SPLIT_ECL_ECLTRG
1258 
1259 
1260  //
1261  // Check if the current position exceeds the event end
1262  //
1263  if (cur_pos + linksize > event_nwords - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS) {
1264  pthread_mutex_lock(&(mtx_sender_log));
1265  n_messages[ 13 ] = n_messages[ 13 ] + 1 ;
1266  if (n_messages[ 13 ] < max_number_of_messages) {
1267  printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : The end position ( %d words ) of this channel data exceeds event size( %d words ). Exiting... : exp %d run %d sub %d : %s %s %d\n",
1268  sender_id,
1269  hostnamebuf, i,
1270  (cur_pos + linksize), event_nwords,
1271  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1272  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1273  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1274  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1275 
1276  }
1277  printEventData(data, event_length, sender_id);
1278  err_bad_linksize[sender_id]++;
1279  pthread_mutex_unlock(&(mtx_sender_log));
1280 #ifndef NO_ERROR_STOP
1281  exit(1);
1282 #endif
1283  }
1284 
1285  //
1286  // Check FF55 value
1287  //
1288  if (((data[ cur_pos + linksize + ff55_pos_from_end ]) & 0xFFFF0000) != 0xFF550000) {
1289  pthread_mutex_lock(&(mtx_sender_log));
1290  n_messages[ 14 ] = n_messages[ 14 ] + 1 ;
1291  if (n_messages[ 14 ] < max_number_of_messages) {
1292  char err_buf[500];
1293  sprintf(err_buf,
1294  "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : HSLB or PCIe40 trailer magic word(0xff55) is invalid. foooter %.8x (pos.=0x%.x) : exp %d run %d sub %d : %s %s %d",
1295  sender_id,
1296  hostnamebuf, i,
1297  data[ cur_pos + linksize + ff55_pos_from_end ], cur_pos + linksize + ff55_pos_from_end,
1298  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1299  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1300  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1301  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1302  printf("%s\n", err_buf); fflush(stdout);
1303  printEventData(data, event_length + 16, sender_id);
1304  }
1305  err_bad_ff55[sender_id]++;
1306  pthread_mutex_unlock(&(mtx_sender_log));
1307 #ifndef NO_ERROR_STOP
1308  exit(1);
1309 #endif
1310  }
1311 
1312 
1313 
1314  //
1315  // CRC check
1316  //
1317  unsigned int crc_data = data[ cur_pos + linksize - 2 ] & 0xFFFF ;
1318  int size = linksize - non_crc_counts;
1319  unsigned int value = crc_data;
1320  unsigned int* data_for_crc = data + cur_pos + CRC_START_POS;
1321 #ifdef CRC_CHECK
1322  if (get_crc(data_for_crc, size, first_crc) != value) {
1323  pthread_mutex_lock(&(mtx_sender_log));
1324  // Currently, zero-torellance for a CRC error.
1325  // if (crc_err_ch[sender_id][i] == 0) {
1326  printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : PRE CRC16 error or POST B2link event CRC16 error. data(%x) calc(%x) : eve %d exp %d run %d sub %d : %s %s %d\n",
1327  sender_id,
1328  hostnamebuf, i,
1329  value, get_crc(data_for_crc, size, first_crc),
1330  new_evtnum,
1331  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1332  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1333  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1334  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1335  printEventData(data, event_length, sender_id);
1336  // }
1337 
1338  crc_err_ch[sender_id][i]++;
1339  total_crc_errors[sender_id]++;
1340  pthread_mutex_unlock(&(mtx_sender_log));
1341 #ifndef NO_ERROR_STOP
1342  exit(1);
1343 #endif
1344  } else {
1345  total_crc_good[sender_id]++ ;
1346  }
1347 #endif // CRC_CHECK
1348 
1349 
1350  //
1351  // Monitoring CRC check status
1352  //
1353  if (new_evtnum % 1000000 == 0) {
1354  // if (total_crc_good[sdr_id] % (1000000 + sdr_id) == 0) {
1355  pthread_mutex_lock(&(mtx_sender_log));
1356  printf("[DEBUG] thread %d : CRC Good calc %.4X data %.4X eve %u ch %d crcOK %u crcNG %d errflag %u : exp %d run %d sub %d\n",
1357  sender_id,
1358  get_crc(data_for_crc, size, first_crc),
1359  value, new_evtnum, i, total_crc_good[sender_id], total_crc_errors[sender_id], err_flag_cnt[sender_id],
1360  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1361  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1362  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
1363  ) ;
1364  int temp_err_cnt = 0;
1365 
1366  for (int j = 0; j < MAX_PCIE40_CH; j++) {
1367  if (crc_err_ch[sender_id][j] > 0) {
1368  if (temp_err_cnt == 0) {
1369  printf("[DEBUG] thread %d : crc_err_cnt : ", sender_id);
1370  temp_err_cnt = 1;
1371  }
1372  printf("ch %d %u : ", j, crc_err_ch[sender_id][j]);
1373  }
1374  }
1375  if (temp_err_cnt != 0) {
1376  printf("\n");
1377  }
1378  fflush(stdout);
1379  pthread_mutex_unlock(&(mtx_sender_log));
1380  }
1381 
1382  //
1383  // Check the end of the event
1384  //
1385  link_cnt++;
1386  cur_pos = cur_pos + linksize ;
1387  if (((data[ cur_pos ] & 0xFFFF0000) == 0x7FFF0000)) break ;
1388 
1389  }
1390 
1391  //
1392  // Check if the current position exceeds the event end
1393  //
1394  if (cur_pos != event_nwords - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS) {
1395  pthread_mutex_lock(&(mtx_sender_log));
1396  printf("[FATAL] thread %d : %s : ERROR_EVENT : The end position of channel data( %d-th word ) does not coincide with the start of RawTrailer( %d-th word ). Exiting... : exp %d run %d sub %d : %s %s %d\n",
1397  sender_id,
1398  hostnamebuf,
1399  cur_pos, event_nwords - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS,
1400  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1401  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1402  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1403  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1404  printEventData(data, event_length, sender_id);
1405  pthread_mutex_unlock(&(mtx_sender_log));
1406 #ifndef NO_ERROR_STOP
1407  exit(1);
1408 #endif
1409  }
1410 
1411 
1412  //
1413  // Check the consistency of number of input links
1414  //
1415  if (link_cnt != expected_number_of_links) {
1416  pthread_mutex_lock(&(mtx_sender_log));
1417  printf("[FATAL] thread %d : %s : ERROR_EVENT : # of links(%d) in data is not the same as exptected(=%d). : Exiting... : exp %d run %d sub %d : %s %s %d\n",
1418  sender_id,
1419  hostnamebuf,
1420  link_cnt, expected_number_of_links,
1421  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1422  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1423  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1424  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1425 
1426  printEventData(data, event_length, sender_id);
1427  pthread_mutex_unlock(&(mtx_sender_log));
1428 #ifndef NO_ERROR_STOP
1429  exit(1);
1430 #endif
1431  }
1432 
1433 
1434  //
1435  // Printing the 1st event
1436  //
1437  if (new_evtnum == 0) {
1438  pthread_mutex_lock(&(mtx_sender_log));
1439  printf("[DEBUG] thread %d : Printing the 1st event.\n", sender_id);
1440  printEventData(data, event_length, sender_id);
1441  pthread_mutex_unlock(&(mtx_sender_log));
1442  }
1443 
1444 
1445 
1446  //
1447  // Check unreduced header consistency
1448  //
1449  int ret = DATACHECK_OK;
1450  if (reduced_flag == 0) {
1451  checkUtimeCtimeTRGType(data, sender_id);
1452  pthread_mutex_lock(&(mtx_sender_log));
1453  if (err_not_reduced[sender_id] < max_number_of_messages) {
1454  printf("[WARNING] thread %d : %s ch=%d : ERROR_EVENT : Error-flag was set by the data-check module in PCIe40 FPGA. : eve %d prev thr eve %d : exp %d run %d sub %d : %s %s %d\n",
1455  sender_id,
1456  hostnamebuf, -1, new_evtnum, evtnum,
1457  (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1458  (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1459  (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1460  __FILE__, __PRETTY_FUNCTION__, __LINE__);
1461  printEventData(data, event_length, sender_id);
1462  }
1463  err_not_reduced[sender_id]++;
1464  pthread_mutex_unlock(&(mtx_sender_log));
1465  // exit(1); // zero-torellance policy
1466  ret = DATACHECK_OK_BUT_ERRFLAG_IN_HDR;
1467  }
1468 
1469  evtnum = data[EVENUM_POS];
1470  return ret;
1471 }
1472 
1473 void checkEventGenerator(unsigned int* data, int i, unsigned int size)
1474 {
1475  if (data == 0) {
1476  pthread_mutex_lock(&(mtx_sender_log));
1477  printf("No data\n") ;
1478  pthread_mutex_unlock(&(mtx_sender_log));
1479  return ;
1480  }
1481 
1482  if (i != getEventNumber(data)) {
1483  pthread_mutex_lock(&(mtx_sender_log));
1484  printf("Event number mismatch %d %d\n",
1485  getEventNumber(data), i) ;
1486  pthread_mutex_unlock(&(mtx_sender_log));
1487  }
1488  // Check header
1489  // if ( ( data[7] != 0 ) || ( data[6] != 0 ) || ( data[5] != 0 ) || ( data[3] != 0 ) ) {
1490  if ((data[7] != 0) || (data[6] != 0)) {
1491  pthread_mutex_lock(&(mtx_sender_log));
1492  printf("Bad header 3 %.8x %.8x\n", data[7], data[6]) ;
1493  printHeader(data) ;
1494  pthread_mutex_unlock(&(mtx_sender_log));
1495  } else if ((data[ 0 ] & 0xFFFF) != size) {
1496  pthread_mutex_lock(&(mtx_sender_log));
1497  printf("Bad size %d %d\n", data[0] & 0xFFFF, size) ;
1498  printLine(data, EVENT_LEN_POS);
1499  pthread_mutex_unlock(&(mtx_sender_log));
1500  } else if (((data[ 2 ] & 0xFFFF0000) >> 16) != (size * 32)) {
1501  pthread_mutex_lock(&(mtx_sender_log));
1502  printf("Bad word size %d %d\n", (data[ 2 ] & 0xFFFF0000) >> 16, size * 32) ;
1503  printHeader(data) ;
1504  pthread_mutex_unlock(&(mtx_sender_log));
1505  } else if (((data[ 0 ] & 0xFFFF0000) != 0xEEEE0000) ||
1506  (data[ 1 ] != 0xAAAAEEEE) ||
1507  ((data[ 2 ] & 0xFFFF) != 0xAAAA)) {
1508  pthread_mutex_lock(&(mtx_sender_log));
1509  printf("Bad header 4\n") ;
1510  printHeader(data) ;
1511  printEventData(data, size);
1512  pthread_mutex_unlock(&(mtx_sender_log));
1513  }
1514  // Check trailer
1515  if (data[ 8 * (size - 1) ] != size) {
1516  pthread_mutex_lock(&(mtx_sender_log));
1517  printf("Bad size in trailer %.8x %.8x\n", data[8 * (size - 1)], size) ;
1518  printLine(data, 8 * (size - 1));
1519  pthread_mutex_unlock(&(mtx_sender_log));
1520  } else if ((data[ 8 * (size - 1) + 1 ] != 0) || (data[ 8 * (size - 1) + 2 ] != 0) ||
1521  (data[ 8 * (size - 1) + 3 ] != 0) || (data[ 8 * (size - 1) + 4 ] != 0) ||
1522  (data[ 8 * (size - 1) + 5 ] != 0) || (data[ 8 * (size - 1) + 6 ] != 0) ||
1523  (data[ 8 * (size - 1) + 7 ] != 0)) {
1524  pthread_mutex_lock(&(mtx_sender_log));
1525  printf("Bad trailer\n") ;
1526  printTrailer(&data[ 8 * (size - 1) ]) ;
1527  pthread_mutex_unlock(&(mtx_sender_log));
1528  }
1529  // Check data
1530  for (unsigned int j = 1 ; j < (size - 1) ; ++j) {
1531  if (data[ 8 * j ] != j) {
1532  pthread_mutex_lock(&(mtx_sender_log));
1533  printf("Bad data number %d %d\n", data[8 * j], j) ;
1534  pthread_mutex_unlock(&(mtx_sender_log));
1535  } else if (data[8 * j + 1] != 0) {
1536  pthread_mutex_lock(&(mtx_sender_log));
1537  printf("Bad data\n") ;
1538  printData(&data[8 * j]) ;
1539  pthread_mutex_unlock(&(mtx_sender_log));
1540  } else if ((data[8 * j + 2] != 0xFFFFFFFF) || (data[8 * j + 3] != 0xEEEEEEEE) ||
1541  (data[8 * j + 4] != 0xDDDDDDDD) || (data[8 * j + 5] != 0xCCCCCCCC) ||
1542  (data[8 * j + 6] != 0xBBBBBBBB) || (data[8 * j + 7] != 0xAAAAAAAA)) {
1543  pthread_mutex_lock(&(mtx_sender_log));
1544  printf("Bad data\n") ;
1545  printData(&data[8 * j]) ;
1546  pthread_mutex_unlock(&(mtx_sender_log));
1547  }
1548  }
1549 }
1550 
1551 
1552 unsigned short CalcCRC16LittleEndian(unsigned short crc16, const int buf[], int nwords)
1553 {
1554 
1555  if (nwords < 0) {
1556 
1557  char err_buf[500];
1558  pthread_mutex_lock(&(mtx_sender_log));
1559  sprintf(err_buf, "nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
1560  nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
1561  printf("%s", err_buf); fflush(stdout);
1562  pthread_mutex_unlock(&(mtx_sender_log));
1563  string err_str = err_buf;
1564  throw (err_str);
1565  }
1566 
1567  const unsigned short CRC16Table0x1021[ 256 ] = {
1568  0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
1569  0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
1570  0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
1571  0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
1572  0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
1573  0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
1574  0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
1575  0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
1576 
1577  0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
1578  0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
1579  0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
1580  0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
1581  0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
1582  0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
1583  0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
1584  0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
1585 
1586  0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
1587  0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
1588  0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
1589  0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
1590  0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
1591  0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
1592  0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
1593  0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
1594 
1595  0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
1596  0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
1597  0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
1598  0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
1599  0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
1600  0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
1601  0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
1602  0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
1603  };
1604 
1605  int cnt = 0, nints = 0;
1606  // printf("### %.8x %.4x\n", buf[ 0 ], crc16);
1607  while (nwords != 0) {
1608 
1609  unsigned char temp_buf = *((unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
1610  crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
1611  // printf("%.2x %.4x\n", temp_buf, crc16);
1612  if ((cnt % 4) == 3) {
1613  nwords--;
1614  nints++;
1615  // printf("### %.8x\n", buf[ nints ] );
1616  }
1617 
1618  cnt++;
1619  }
1620 
1621 
1622  return crc16;
1623 
1624 }
1625 
1626 int fillDataContents(int* buf, int nwords_per_fee, unsigned int node_id, int ncpr, int nhslb, int run)
1627 {
1628  int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
1629  ncpr * (NW_RAW_HEADER +
1630  (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
1631  + NW_RAW_TRAILER);
1632 
1633  // Send Header
1634  int offset = 0;
1635  buf[ offset + 0 ] = nwords;
1636  buf[ offset + 1 ] = 6;
1637  buf[ offset + 2 ] = (1 << 16) | ncpr;
1638  unsigned int exp_run = run << 8;
1639  buf[ offset + 3 ] = exp_run;
1640  buf[ offset + 5 ] = node_id;
1641  offset += NW_SEND_HEADER;
1642 
1643  for (int k = 0; k < ncpr; k++) {
1644  //
1645  // RawHeader
1646  //
1647  int cpr_nwords = NW_RAW_HEADER +
1648  (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
1649  + NW_RAW_TRAILER;
1650  unsigned int ctime = CTIME_VAL;
1651  unsigned int utime = 0x98765432;
1652 
1653  buf[ offset + 0 ] = cpr_nwords;
1654 #ifdef DUMMY_REDUCED_DATA
1655  buf[ offset + 1 ] = 0x7f7f020c;
1656 #else
1657  buf[ offset + 1 ] = 0x7f7f820c;
1658 #endif
1659  buf[ offset + 2 ] = exp_run;
1660  printf("run_no %d\n", exp_run); fflush(stdout);
1661  buf[ offset + 4 ] = ctime;
1662  buf[ offset + 5 ] = utime;
1663  buf[ offset + 6 ] = node_id + k;
1664  buf[ offset + 7 ] = 0x34567890;
1665  offset += NW_RAW_HEADER;
1666 
1667  for (int i = 0; i < nhslb ; i++) {
1668 #ifdef DUMMY_REDUCED_DATA
1669  buf[ offset + 0 ] = nwords_per_fee + 3;
1670  buf[ offset + 1 ] = 0xffaa0000;
1671  buf[ offset + 2 ] = ctime;
1672 #else
1673  buf[ offset + 0 ] = nwords_per_fee + 7;
1674  buf[ offset + 1 ] = 0xffaa0000;
1675  buf[ offset + 3 ] = ctime;
1676  buf[ offset + 4 ] = utime;
1677  buf[ offset + 5 ] = exp_run;
1678  buf[ offset + 6 ] = ctime;
1679 #endif
1680  offset += NW_B2L_HEADER;
1681 
1682  for (int j = offset; j < offset + nwords_per_fee; j++) {
1683  buf[ j ] = rand();
1684  }
1685  offset += nwords_per_fee;
1686 
1687 #ifdef DUMMY_REDUCED_DATA
1688  buf[ offset ] = 0;
1689  buf[ offset + 1 ] = 0xff550000;
1690 #else
1691  buf[ offset ] = ctime;
1692  buf[ offset + 1 ] = 0;
1693  buf[ offset + 2 ] = 0xff550000;
1694 #endif
1695 
1696  offset += NW_B2L_TRAILER;
1697  }
1698  buf[ offset ] = 0x0; // error bits
1699  buf[ offset + 1 ] = 0x0; // error slots
1700  buf[ offset + 2 ] = 0x0; // XOR checksum
1701  buf[ offset + 3 ] = 0x7fff0006;
1702  offset += NW_RAW_TRAILER;
1703  }
1704 
1705  // Send trailer
1706  buf[ offset ] = 0;
1707  buf[ offset + 1 ] = 0x7fff0000;
1708  offset += NW_SEND_TRAILER;
1709  return offset;
1710 }
1711 
1712 void split_Ecltrg(int sender_id, unsigned int* data, std::vector< int > valid_ch,
1713  unsigned int* data_main, unsigned int* data_splitted,
1714  int& event_nwords_main, int& event_nwords_splitted,
1715  unsigned int splitted_node_id, std::vector< int > splitted_ch)
1716 {
1717  unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
1718  // pthread_mutex_lock(&(mtx_sender_log));
1719  // printf("[DEBUG] Before splitting : sdrid %d. Exiting...\n",
1720  // sender_id);
1721  // printEventData(data, (event_length & 0xfffff));
1722  // pthread_mutex_unlock(&(mtx_sender_log));
1723  // Check event size
1724 
1725  if (event_length > 0x100000) {
1726  pthread_mutex_lock(&(mtx_sender_log));
1727  printf("[FATAL] Too large event size. : sdrid %d : 0x%.8x : %d words. Exiting...\n", sender_id, data[ EVENT_LEN_POS ],
1728  data[ EVENT_LEN_POS ]);
1729  printEventData(data, (event_length & 0xfffff));
1730  pthread_mutex_unlock(&(mtx_sender_log));
1731  exit(1);
1732  } else if (event_length == 0) {
1733  pthread_mutex_lock(&(mtx_sender_log));
1734  printf("[FATAL] Specified event size is zero. : 0x%.8x : %u words. Exiting...\n",
1735  data[ EVENT_LEN_POS ], event_length);
1736  printEventData(data, 24);
1737  pthread_mutex_unlock(&(mtx_sender_log));
1738  exit(1);
1739  }
1740 
1741  // Check magic word
1742  if ((data[ MAGIC_7F7F_POS ] & 0xFFFF0000) != 0x7F7F0000) {
1743  pthread_mutex_lock(&(mtx_sender_log));
1744  n_messages[ 7 ] = n_messages[ 7 ] + 1 ;
1745  if (n_messages[ 7 ] < max_number_of_messages) {
1746  printf("Bad code 7F7F ( 0x%.8x )\n", data[ MAGIC_7F7F_POS ]) ;
1747  // printLine(data, MAGIC_7F7F_POS);
1748  printEventData(data, event_length);
1749  }
1750  err_bad_7f7f[sender_id]++;
1751  pthread_mutex_unlock(&(mtx_sender_log));
1752 #ifndef NO_ERROR_STOP
1753  exit(1);
1754 #endif
1755  // return 1 ;
1756  }
1757 
1758 
1759  // Copy RawHeader
1760  memcpy(data_main, data, Belle2::RawHeader_latest::POS_CH_POS_TABLE * sizeof(unsigned int));
1761  memcpy(data_splitted, data, Belle2::RawHeader_latest::POS_CH_POS_TABLE * sizeof(unsigned int));
1762  data_splitted[ Belle2::RawHeader_latest::POS_NODE_ID ] = splitted_node_id;
1763 
1764  int cur_pos = 0;
1765  int cur_ch_main = 0;
1766  int prev_ch_main = -1;
1767  int cur_ch_splitted = 0;
1768  int prev_ch_splitted = -1;
1769  int cur_pos_main = Belle2::RawHeader_latest::RAWHEADER_NWORDS;
1770  int cur_pos_splitted = Belle2::RawHeader_latest::RAWHEADER_NWORDS;
1771  int link_cnt = 0;
1772 
1773  int cnt_main = 0;
1774  int cnt_splitted = 0;
1775 
1776  for (int i = 0; i < MAX_PCIE40_CH; i++) {
1777  // Calculate linksize
1778  int linksize = 0;
1779  if (i < 47) {
1780  linksize = data[ POS_TABLE_POS + (i + 1) ] - data[ POS_TABLE_POS + i ];
1781  } else {
1782  linksize = event_length - (data[ POS_TABLE_POS + 47 ] + LEN_ROB_TRAILER);
1783  }
1784  if (linksize <= 0) continue;
1785  cur_pos = data[ POS_TABLE_POS + i ] + OFFSET_HDR;
1786 
1787  // compare valid ch with register value
1788  if (valid_ch[link_cnt] != i) {
1789  pthread_mutex_lock(&(mtx_sender_log));
1790  n_messages[ 11 ] = n_messages[ 11 ] + 1 ;
1791  if (n_messages[ 11 ] < max_number_of_messages) {
1792  printf("[FATAL] A valid ch in data(=%d) is not equal to regeister value(%d) for masking\n", i, valid_ch[link_cnt]) ;
1793  printEventData(data, event_length);
1794  }
1795  err_bad_linknum[sender_id]++;
1796  pthread_mutex_unlock(&(mtx_sender_log));
1797 #ifndef NO_ERROR_STOP
1798  exit(1);
1799 #endif
1800  }
1801 
1802  // Check main ch or splitted ch
1803  int splitted_ch_flag = 0;
1804  for (int j = 0; j < splitted_ch.size(); j++) {
1805  if (splitted_ch[j] == i) {
1806  splitted_ch_flag = 1;
1807  break;
1808  }
1809  }
1810 
1811  // Filling pos-table
1812  if (splitted_ch_flag == 0) {
1813  data_main[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] = cur_pos_main;
1814  for (int j = prev_ch_main + 1; j < i; j++) {
1815  data_main[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + j ] = cur_pos_main;
1816  }
1817  memcpy(data_main + cur_pos_main, data + cur_pos, linksize * sizeof(unsigned int));
1818  cur_pos_main += linksize;
1819  prev_ch_main = i;
1820  cnt_main++;
1821  } else {
1822  data_splitted[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] = cur_pos_splitted;
1823  for (int j = prev_ch_splitted + 1; j < i; j++) {
1824  data_splitted[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + j ] = cur_pos_splitted;
1825  }
1826  memcpy(data_splitted + cur_pos_splitted, data + cur_pos, linksize * sizeof(unsigned int));
1827  cur_pos_splitted += linksize;
1828  prev_ch_splitted = i;
1829  cnt_splitted++;
1830  }
1831  link_cnt++;
1832  }
1833 
1834  if (cnt_main == 0 || cnt_splitted == 0) {
1835  pthread_mutex_lock(&(mtx_sender_log));
1836  printf("[FATAL] No channels for ECL(# of used ch = %d) or ECLTRG(# of used ch = %d) data. Exiting...\n",
1837  cnt_main, cnt_splitted);
1838  pthread_mutex_unlock(&(mtx_sender_log));
1839  // exit(1);
1840  }
1841 
1842  // Fill remaining position table
1843  for (int i = prev_ch_main + 1; i < MAX_PCIE40_CH; i++) {
1844  data_main[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] = cur_pos_main;
1845  }
1846  for (int i = prev_ch_splitted + 1; i < MAX_PCIE40_CH; i++) {
1847  data_splitted[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] = cur_pos_splitted;
1848  }
1849 
1850  // Calcurate each event-length
1851  unsigned int eve_size_main = cur_pos_main + Belle2::RawTrailer_latest::RAWTRAILER_NWORDS;
1852  unsigned int eve_size_splitted = cur_pos_splitted + Belle2::RawTrailer_latest::RAWTRAILER_NWORDS;
1853  data_main[ Belle2::RawHeader_latest::POS_NWORDS ] = eve_size_main;
1854  data_splitted[ Belle2::RawHeader_latest::POS_NWORDS ] = eve_size_splitted;
1855  event_nwords_main = eve_size_main;
1856  event_nwords_splitted = eve_size_splitted;
1857 
1858  // Copy RawTrailer (Currently 00000000 00000000 00000000 7fff0006. So, just copy the 4 words.)
1859  memcpy(data_main + cur_pos_main, data + event_length - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS,
1860  Belle2::RawTrailer_latest::RAWTRAILER_NWORDS * sizeof(unsigned int));
1861  memcpy(data_splitted + cur_pos_splitted, data + event_length - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS,
1862  Belle2::RawTrailer_latest::RAWTRAILER_NWORDS * sizeof(unsigned int));
1863 
1864  // Copy back to data buffer
1865  memcpy(data, data_main, eve_size_main * sizeof(unsigned int));
1866  memcpy(data + eve_size_main, data_splitted, eve_size_splitted * sizeof(unsigned int));
1867 
1868  // pthread_mutex_lock(&(mtx_sender_log));
1869  // printf("[DEBUG]Splitted data sender %d\n",
1870  // sender_id);
1871  // printEventData(data, eve_size_main + eve_size_splitted);
1872  // printf("[DEBUG]main data sender %d\n",
1873  // sender_id);
1874  // printEventData(data_main, eve_size_main);
1875  // printf("[DEBUG]split data sender %d\n",
1876  // sender_id);
1877  // printEventData(data_splitted, eve_size_splitted);
1878  // pthread_mutex_unlock(&(mtx_sender_log));
1879 
1880  return;
1881 }
1882 
1883 
1884 //int sender_id, int run_no, int nwords_per_fee, int ncpr, int nhslb, std::vector< int > valid_ch)
1885 void* sender(void* arg)
1886 {
1887  //
1888  // Get arguments
1889  //
1890  sender_argv* snd_arg = (sender_argv*)arg;
1891  int sender_id = snd_arg->sender_id;
1892  unsigned int node_id = snd_arg->node_id;
1893  vector<int> valid_ch = snd_arg->valid_ch;
1894 
1895  //
1896  // data
1897  //
1898  int total_words = 0;
1899  unsigned int* buff = new unsigned int[MAX_EVENT_WORDS];
1900 
1901 #ifdef SPLIT_ECL_ECLTRG
1902  vector<int> valid_main_ch;
1903  vector<int> valid_splitted_ch;
1904  unsigned int* buff_main = new unsigned int[MAX_EVENT_WORDS];
1905  unsigned int* buff_splitted = new unsigned int[MAX_EVENT_WORDS];
1906 
1907  int split_main_use = 0; // some unmasked channels for ECL
1908  int split_sub_use = 0; // some unmasked channels for ECLTRG
1909 
1910  // Prepare valid_main table
1911  for (int k = 0; k < valid_ch.size(); k++) {
1912  int splitted_ch_flag = 0;
1913  for (int l = 0; l < splitted_ch.size(); l++) {
1914  if (splitted_ch[l] == valid_ch[k]) {
1915  splitted_ch_flag = 1;
1916  break;
1917  }
1918  }
1919  if (splitted_ch_flag == 0) {
1920  valid_main_ch.push_back(valid_ch[k]);
1921  split_main_use = 1;
1922  } else {
1923  valid_splitted_ch.push_back(valid_ch[k]);
1924  split_sub_use = 1;
1925  }
1926  }
1927 
1928  if (split_main_use == 0 && split_sub_use == 0) {
1929  pthread_mutex_lock(&(mtx_sender_log));
1930  printf("[FATAL] thread %d : No channels are used for this PCIe40 board (ECL/ECLTRG) in %s. Please mask this readout PC with runcontrol GUI (or exclude sub-system if this is the only readout PC of the sub-system). Exiting..\n",
1931  sender_id, hostnamebuf);
1932  fflush(stdout);
1933  pthread_mutex_unlock(&(mtx_sender_log));
1934  exit(1);
1935  }
1936 #endif // SPLIT_ECL_ECLTRG
1937 
1938 
1939  //
1940  // network connection
1941  //
1942  int port_to = 31000 + sender_id + 1;
1943 
1944 #ifndef NOT_SEND
1945  //
1946  // Bind and listen
1947  //
1948  int fd_listen;
1949  struct sockaddr_in sock_listen;
1950  sock_listen.sin_family = AF_INET;
1951  // sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
1952  sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
1953 
1954  socklen_t addrlen = sizeof(sock_listen);
1955  sock_listen.sin_port = htons(port_to);
1956  fd_listen = socket(PF_INET, SOCK_STREAM, 0);
1957 
1958  int flags = 1;
1959  int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)sizeof(flags));
1960  if (ret < 0) {
1961  perror("Failed to set REUSEADDR");
1962  }
1963 
1964  if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
1965  pthread_mutex_lock(&(mtx_sender_log));
1966  printf("[FATAL] thread %d : %s : Failed to bind(%s). Maybe other programs have already occupied this port(%d). Exiting...\n",
1967  sender_id, hostnamebuf, strerror(errno),
1968  port_to); fflush(stdout);
1969  pthread_mutex_unlock(&(mtx_sender_log));
1970 
1971  // Check the process occupying the port 3100?.
1972  FILE* fp;
1973  char buf[256];
1974  char cmdline[500];
1975  sprintf(cmdline, "/usr/sbin/ss -ap | grep %d", port_to);
1976  if ((fp = popen(cmdline, "r")) == NULL) {
1977  pthread_mutex_lock(&(mtx_sender_log));
1978  printf("[WARNING] thread %d : Failed to run %s\n", sender_id,
1979  cmdline);
1980  pthread_mutex_unlock(&(mtx_sender_log));
1981  }
1982 
1983  while (fgets(buf, 256, fp) != NULL) {
1984  pthread_mutex_lock(&(mtx_sender_log));
1985  printf("[DEBUG] thread %d : Port %d is used by : %s\n", sender_id,
1986  port_to, buf); fflush(stdout);
1987  pthread_mutex_unlock(&(mtx_sender_log));
1988  }
1989  fclose(fp);
1990  exit(1);
1991  }
1992 
1993  int val1 = 0;
1994  setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)sizeof(val1));
1995  int backlog = 1;
1996  if (listen(fd_listen, backlog) < 0) {
1997  char err_buf[500];
1998  pthread_mutex_lock(&(mtx_sender_log));
1999  sprintf(err_buf, "[FATAL] thread %d : %s : Failed in listen(%s). Exting...",
2000  sender_id, hostnamebuf,
2001  strerror(errno));
2002  printf("%s\n", err_buf); fflush(stdout);
2003  pthread_mutex_unlock(&(mtx_sender_log));
2004  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
2005  exit(-1);
2006  }
2007 
2008  //
2009  // Accept
2010  //
2011  int fd_accept;
2012  struct sockaddr_in sock_accept;
2013  pthread_mutex_lock(&(mtx_sender_log));
2014  printf("[DEBUG] thread %d : Accepting... : port %d\n", sender_id,
2015  port_to);
2016  fflush(stdout);
2017  pthread_mutex_unlock(&(mtx_sender_log));
2018 
2019  if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
2020  char err_buf[500];
2021  pthread_mutex_lock(&(mtx_sender_log));
2022  sprintf(err_buf, "[FATAL] thread %d : %s : Failed to accept(%s). Exiting...",
2023  sender_id, hostnamebuf,
2024  strerror(errno));
2025  printf("%s\n", err_buf); fflush(stdout);
2026  pthread_mutex_unlock(&(mtx_sender_log));
2027  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
2028  exit(-1);
2029  } else {
2030  // B2INFO("Done.");
2031  pthread_mutex_lock(&(mtx_sender_log));
2032  printf("[INFO] thread %d : Connection(port %d) from eb0 was accepted\n", sender_id, port_to); fflush(stdout);
2033  pthread_mutex_unlock(&(mtx_sender_log));
2034 
2035  // set timepout option
2036  struct timeval timeout;
2037  timeout.tv_sec = 1;
2038  timeout.tv_usec = 0;
2039  ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)sizeof(timeout));
2040  if (ret < 0) {
2041  pthread_mutex_lock(&(mtx_sender_log));
2042  char err_buf[500];
2043  sprintf(err_buf, "[FATAL] thread %d : %s : Failed to set TIMEOUT. Exiting...", sender_id, hostnamebuf);
2044  printf("%s\n", err_buf); fflush(stdout);
2045  pthread_mutex_unlock(&(mtx_sender_log));
2046  exit(-1);
2047  }
2048  }
2049 
2050  if (fd_listen) {
2051  close(fd_listen);
2052  }
2053 #endif
2054 
2055  double init_time = getTimeSec();
2056  double prev_time = init_time;
2057 
2058  unsigned long long int cnt = 0;
2059  unsigned long long int prev_cnt = 0;
2060  unsigned long long int start_cnt = 3000;
2061 
2062  unsigned int exprun = 0;
2063  unsigned int evtnum = 0;
2064 
2065 #ifndef USE_ZMQ
2066  int buffer_id = 0;
2067 #endif
2068  unsigned int tot_event_nwords = 0;
2069  for (
2070 #ifdef MAX_EVENT
2071  int j = 0; j < MAX_EVENT; j++
2072 #else
2073  ;;
2074 #endif
2075  ) {
2076 
2077 #ifdef USE_ZMQ
2078  // Copy data from ZMQ (experimental)
2079  //
2080  {
2081  zmq::message_t zevent;
2082  zmq_reader[sender_id]->recv(&zevent);
2083  memcpy(buff + NW_SEND_HEADER, zevent.data(), zevent.size());
2084  tot_event_nwords = zevent.size() / sizeof(unsigned int);
2085  }
2086 #else
2087  // Copy data from buffer (orignal)
2088  //
2089  if (buffer_id == 0) {
2090  while (1) {
2091  if (buffer_filled[sender_id][0] == 1)break;
2092  usleep(1);
2093  }
2094  {
2095  pthread_mutex_lock(&(mtx1_ch[sender_id]));
2096  memcpy((buff + NW_SEND_HEADER), data_1[sender_id], copy_nwords[sender_id][0] * sizeof(unsigned int));
2097  tot_event_nwords = copy_nwords[sender_id][0];
2098  buffer_filled[sender_id][0] = 0;
2099  pthread_mutex_unlock(&(mtx1_ch[sender_id]));
2100  }
2101  } else {
2102 
2103  while (1) {
2104  if (buffer_filled[sender_id][1] == 1)break;
2105  usleep(1);
2106  }
2107 
2108  {
2109  pthread_mutex_lock(&(mtx2_ch[sender_id]));
2110  memcpy((buff + NW_SEND_HEADER), data_2[sender_id], copy_nwords[sender_id][1] * sizeof(unsigned int));
2111  tot_event_nwords = copy_nwords[sender_id][1];
2112  buffer_filled[sender_id][1] = 0;
2113  pthread_mutex_unlock(&(mtx2_ch[sender_id]));
2114  }
2115  }
2116 #endif
2117 
2118  //
2119  // Check data
2120  //
2121  if (buff == NULL) {
2122  pthread_mutex_lock(&(mtx_sender_log));
2123  printf("[FATAL] thread %d : %s : buffer in sender is NULL(= %p )\n", sender_id, hostnamebuf, buff); fflush(stdout);
2124  pthread_mutex_unlock(&(mtx_sender_log));
2125  exit(1);
2126  }
2127 
2128 #ifdef SPLIT_ECL_ECLTRG
2129  int event_nwords_main = 0, event_nwords_splitted = 0;
2130  if (split_main_use == 1 && split_sub_use == 1) {
2131  split_Ecltrg(sender_id, buff + NW_SEND_HEADER, valid_ch,
2132  buff_main, buff_splitted, event_nwords_main, event_nwords_splitted, ECLTRG_NODE_ID, splitted_ch);
2133  tot_event_nwords = event_nwords_main + event_nwords_splitted;
2134  } else if (split_main_use == 1 && split_sub_use == 0) {
2135  event_nwords_main = tot_event_nwords;
2136  } else if (split_main_use == 0 && split_sub_use == 1) {
2137  event_nwords_splitted = tot_event_nwords;
2138  } else {
2139  pthread_mutex_lock(&(mtx_sender_log));
2140  printf("[FATAL] thread %d : %s : No channels are used for this PCIe40 board (ECL/ECLTRG). Please mask this readout PC with runcontrol GUI (or exclude sub-system if this is the only readout PC of the sub-system). Exiting..\n",
2141  sender_id, hostnamebuf);
2142  fflush(stdout);
2143  pthread_mutex_unlock(&(mtx_sender_log));
2144  exit(1);
2145  }
2146 #endif // SPLIT_ECL_ECLTRG
2147 
2148  unsigned int prev_exprun = exprun;
2149  unsigned int prev_evtnum = evtnum;
2150 
2151 #ifdef SPLIT_ECL_ECLTRG
2152  for (int k = 0; k < NUM_SUB_EVE ; k++) {
2153 #endif // SPLIT_ECL_ECLTRG
2154  unsigned int* eve_buff = NULL;
2155  unsigned int event_nwords = 0;
2156  int ret = 0;
2157 
2158 #ifdef SPLIT_ECL_ECLTRG
2159  if (k == 0) {
2160  if (split_main_use == 0) continue;
2161  event_nwords = event_nwords_main;
2162  eve_buff = buff + NW_SEND_HEADER;
2163  ret = checkEventData(sender_id, eve_buff, event_nwords_main,
2164  exprun, evtnum, node_id, valid_main_ch);
2165  } else if (k == 1) {
2166  if (split_sub_use == 0) continue;
2167  exprun = prev_exprun;
2168  evtnum = prev_evtnum;
2169  event_nwords = event_nwords_splitted;
2170  eve_buff = buff + NW_SEND_HEADER + event_nwords_main;
2171  ret = checkEventData(sender_id, eve_buff, event_nwords_splitted,
2172  exprun, evtnum, ECLTRG_NODE_ID, valid_splitted_ch);
2173  } else {
2174  pthread_mutex_lock(&(mtx_sender_log));
2175  printf("[FATAL] thread %d : # of sub-events must be 1 or 2(for ECL,ECLTRG). k = %d Exiting... : exp %d run %d sub %d : %s %s %d\n",
2176  sender_id, k,
2177  (exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
2178  (exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
2179  (exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
2180  __FILE__, __PRETTY_FUNCTION__, __LINE__);
2181  fflush(stdout);
2182  exit(1);
2183  pthread_mutex_unlock(&(mtx_sender_log));
2184  }
2185 #else
2186  event_nwords = tot_event_nwords;
2187  eve_buff = buff + NW_SEND_HEADER;
2188  ret = checkEventData(sender_id, eve_buff, event_nwords, exprun, evtnum, node_id, valid_ch);
2189 #endif
2190 
2191  if (ret != DATACHECK_OK) {
2192  if (ret == DATACHECK_OK_BUT_ERRFLAG_IN_HDR) {
2193  // err_bad_ffaa[sender_id]++;
2194  unsigned int reduced_event_nwords = 0;
2195  pthread_mutex_lock(&(mtx_sender_log));
2196  printf("[WARNING] thread %d : fake-error events are detected. Header and trailer reduction will be made and data are checked again.\n",
2197  sender_id);
2198  fflush(stdout);
2199  pthread_mutex_unlock(&(mtx_sender_log));
2200  reduceHdrTrl(eve_buff, reduced_event_nwords);
2201  tot_event_nwords -= (event_nwords - reduced_event_nwords);
2202  event_nwords = reduced_event_nwords;
2203 
2204  exprun = prev_exprun;
2205  evtnum = prev_evtnum;
2206 
2207  int ret = 0;
2208 #ifdef SPLIT_ECL_ECLTRG
2209  if (k == 0) {
2210  if (event_nwords_splitted != 0) {
2211  memcpy(buff_splitted, eve_buff + event_nwords_main, event_nwords_splitted * sizeof(unsigned int));
2212  memcpy(eve_buff + reduced_event_nwords, buff_splitted, event_nwords_splitted * sizeof(unsigned int));
2213  }
2214  event_nwords_main = reduced_event_nwords;
2215  ret = checkEventData(sender_id, eve_buff, reduced_event_nwords, exprun, evtnum, node_id, valid_main_ch);
2216  } else {
2217  event_nwords_splitted = reduced_event_nwords;
2218  ret = checkEventData(sender_id, eve_buff, reduced_event_nwords, exprun, evtnum, node_id, valid_splitted_ch);
2219  }
2220 #else
2221  ret = checkEventData(sender_id, eve_buff, reduced_event_nwords, exprun, evtnum, node_id, valid_ch);
2222 #endif //SPLIT_ECL_ECLTRG
2223 
2224  if (ret != DATACHECK_OK) {
2225  pthread_mutex_lock(&(mtx_sender_log));
2226  printf("[FATAL] thread %d : %s : checkEventData() detected an error after reduceHdrTrl(). Exiting...\n", sender_id, hostnamebuf);
2227  fflush(stdout);
2228  pthread_mutex_unlock(&(mtx_sender_log));
2229  exit(1);
2230  }
2231  pthread_mutex_lock(&(mtx_sender_log));
2232  printf("[WARNING] thread %d : %s : Data-check was passed. This event is treated as a normal event.\n", sender_id, hostnamebuf);
2233  // printf("[FATAL] thread %d : Currently, we will not tolerate a fake-error event. Exiting...\n", sender_id);
2234  printEventData(eve_buff, reduced_event_nwords);
2235  fflush(stdout);
2236  pthread_mutex_unlock(&(mtx_sender_log));
2237  // exit(1);
2238  } else {
2239  pthread_mutex_lock(&(mtx_sender_log));
2240  printf("[FATAL] thread %d : %s : checkEventData() detected an error. Exiting...\n", sender_id, hostnamebuf);
2241  fflush(stdout);
2242  pthread_mutex_unlock(&(mtx_sender_log));
2243  exit(1);
2244  }
2245  }
2246 
2247  if (eve_buff[ 1 ] & 0xfffff000 != 0x7f7f0000 ||
2248  eve_buff[ event_nwords - 1 ] != 0x7fff0006) {
2249  pthread_mutex_lock(&(mtx_sender_log));
2250  printf("[FATAL] thread %d : %s : ERROR_EVENT : Invalid Magic word in header( pos=0x%x, %.8x ) and/or trailer( pos=0x%x, 0x%.8x ) : eve %d exp %d run %d sub %d : %s %s %d\n",
2251  sender_id, hostnamebuf,
2252  1, eve_buff[ 1 ],
2253  event_nwords - 1, eve_buff[ event_nwords - 1 ],
2254  evtnum,
2255  (exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
2256  (exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
2257  (exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
2258  __FILE__, __PRETTY_FUNCTION__, __LINE__);
2259  printEventData(eve_buff, event_nwords, sender_id);
2260  fflush(stdout);
2261  pthread_mutex_unlock(&(mtx_sender_log));
2262  exit(1);
2263  }
2264 #ifdef SPLIT_ECL_ECLTRG
2265  }
2266 #endif
2267 
2268  //
2269  // For TOP feature extraction function
2270  //
2271 
2272 
2273 
2274  //
2275  // Filling SendHeader
2276  //
2277  buff[ 0 ] = tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER;
2278  buff[ 1 ] = 6;
2279 #ifdef SPLIT_ECL_ECLTRG
2280  if (split_main_use == 1 && split_sub_use == 1) {
2281  buff[ 2 ] = 0x00010002; // nevent = 1, nboards = 2
2282  } else {
2283  buff[ 2 ] = 0x00010001; // nevent = 1, nboards = 1
2284  }
2285 #else
2286  buff[ 2 ] = 0x00010001; // nevent = 1, nboards = 1
2287 #endif //SPLIT_ECL_ECLTRG
2288  buff[ 3 ] = buff[ NW_SEND_HEADER + 2 ];
2289  buff[ 4 ] = buff[ NW_SEND_HEADER + 3 ];
2290  buff[ 5 ] = buff[ NW_SEND_HEADER + 6 ];
2291  //
2292  // Filling SendTrailer
2293  //
2294  buff[ tot_event_nwords + NW_SEND_HEADER ] = 0x0;
2295  buff[ tot_event_nwords + NW_SEND_HEADER + 1 ] = 0x7fff0007;
2296 
2297 #ifndef NOT_SEND
2298  ret = 0;
2299  int sent_bytes = 0;
2300  // pthread_mutex_lock(&(mtx_sender_log));
2301  // printf("[DEBUG] thread %d : sent words %d + sndhdr %d + sndtrl %d\n", sender_id, tot_event_nwords, NW_SEND_HEADER, NW_SEND_TRAILER );
2302  // printEventData( buff, tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER, sender_id);
2303  // pthread_mutex_unlock(&(mtx_sender_log));
2304 
2305  if ((buff[ NW_SEND_HEADER + 1 ] & 0xfffff000) != 0x7f7f0000 ||
2306  buff[ NW_SEND_HEADER + tot_event_nwords - 1 ] != 0x7fff0006) {
2307  pthread_mutex_lock(&(mtx_sender_log));
2308 
2309  printf("[FATAL] thread %d : %s : ERROR_EVENT : Invalid Magic word in the 1st header( pos=0x%x, 0x%.8x ) and/or the last trailer( pos=0x%x, 0x%.8x ) : eve %d exp %d run %d sub %d : %s %s %d\n",
2310  sender_id, hostnamebuf, NW_SEND_HEADER + 1, buff[ NW_SEND_HEADER + 1 ],
2311  NW_SEND_HEADER + tot_event_nwords - 1, buff[ NW_SEND_HEADER + tot_event_nwords - 1 ],
2312  evtnum,
2313  (exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
2314  (exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
2315  (exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
2316  __FILE__, __PRETTY_FUNCTION__, __LINE__);
2317  printEventData(buff, tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER, sender_id);
2318  fflush(stdout);
2319  pthread_mutex_unlock(&(mtx_sender_log));
2320  exit(1);
2321  }
2322 
2323  while (true) {
2324  if ((ret = write(fd_accept, (char*)buff + sent_bytes, (tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER)
2325  * sizeof(unsigned int) - sent_bytes)) <= 0) {
2326  if (errno == EINTR) {
2327  continue;
2328  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
2329  continue;
2330  } else {
2331  perror("[DEBuG] write() failed");
2332  pthread_mutex_lock(&(mtx_sender_log));
2333  printf("[FATAL] thread %d : %s : write() failed. Return value of write() = %d\n", sender_id, hostnamebuf, ret);
2334  fflush(stdout);
2335  pthread_mutex_unlock(&(mtx_sender_log));
2336  exit(1);
2337  }
2338  }
2339  sent_bytes += ret;
2340  if (sent_bytes == (int)((tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER)
2341  * sizeof(unsigned int))) {
2342  break;
2343  } else if (sent_bytes > (int)((tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER)
2344  * sizeof(unsigned int))) {
2345  pthread_mutex_lock(&(mtx_sender_log));
2346  printf("[FATAL] thread %d : %s : Too many bytes are sent\n", sender_id, hostnamebuf);
2347  fflush(stdout);
2348  pthread_mutex_unlock(&(mtx_sender_log));
2349  exit(1);
2350  }
2351  }
2352 #endif
2353 #ifndef USE_ZMQ
2354  if (buffer_id == 0) {
2355  buffer_id = 1;
2356  } else {
2357  buffer_id = 0;
2358  }
2359 #endif
2360  cnt++;
2361 
2362  if (cnt == start_cnt) init_time = getTimeSec();
2363  if (cnt % 1000000 == 1) {
2364  if (cnt > start_cnt) {
2365  double cur_time = getTimeSec();
2366  pthread_mutex_lock(&(mtx_sender_log));
2367  printf("[INFO] thread %d : evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
2368  sender_id,
2369  cnt, cur_time - init_time,
2370  NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
2371  (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
2372  NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
2373  (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
2374 
2375  fflush(stdout);
2376  pthread_mutex_unlock(&(mtx_sender_log));
2377  prev_time = cur_time;
2378  prev_cnt = cnt;
2379  } else {
2380  // printf("Eve %lld\n", cnt);fflush(stdout);
2381  }
2382  }
2383  }
2384 
2385  delete buff;
2386 #ifndef NOT_SEND
2387  close(fd_accept);
2388 #endif
2389  return (void*)0;
2390 }
2391 
2392 int main(int argc, char** argv)
2393 {
2394 
2396  // From main_pcie40_dmahirate.cpp
2398  bool isData = true ;
2399  bool writeInFile = false ;
2400  if (argc != 2) {
2401  pthread_mutex_lock(&(mtx_sender_log));
2402  printf("[FATAL] Invalid usage of %s : %s <node ID>, node ID = 0x0, if you are not using the Belle II DAQ system.\n",
2403  argv[0], argv[0]) ;
2404  pthread_mutex_unlock(&(mtx_sender_log));
2405  return 0 ;
2406  }
2407 
2408 
2409  char* endptr;
2410  unsigned int pcie40_node_id = (unsigned int)strtol(argv[1], &endptr, 0);
2411  // char tmp_arg[20];
2412  // if( argv[1][0] == 'x' || argv[1][0] == 'X' || argv[1][1] == 'x' || argv[1][1] == 'X' ){
2413  // strncpy(tmp_arg, argv[1] + 2, 8);
2414  // pcie40_node_id = (unsigned int)strtol(tmp_arg, &endptr, 0) ;
2415  // }else{
2416  // pcie40_node_id = (unsigned int)strtol(tmp_arg, &endptr, 16) ;
2417  // }
2418 
2419  host_nodeid[ "rsvd1" ] = 0x01000001;
2420  host_nodeid[ "rsvd2" ] = 0x01000002;
2421  host_nodeid[ "rsvd3" ] = 0x01000003;
2422  host_nodeid[ "rsvd4" ] = 0x01000004;
2423  host_nodeid[ "rsvd5" ] = 0x01000005;
2424  host_nodeid[ "rcdc1" ] = 0x02000001;
2425  host_nodeid[ "rcdc2" ] = 0x02000002;
2426  host_nodeid[ "rcdc3" ] = 0x02000003;
2427  host_nodeid[ "rcdc4" ] = 0x02000004;
2428  host_nodeid[ "rcdc5" ] = 0x02000005;
2429  host_nodeid[ "rcdc6" ] = 0x02000006;
2430  host_nodeid[ "rcdc7" ] = 0x02000007;
2431  host_nodeid[ "rcdc8" ] = 0x02000008;
2432  host_nodeid[ "rtop1" ] = 0x03000001;
2433  host_nodeid[ "rtop2" ] = 0x03000002;
2434  host_nodeid[ "rari1" ] = 0x04000001;
2435  host_nodeid[ "rari2" ] = 0x04000002;
2436  host_nodeid[ "recl1" ] = 0x05000001;
2437  host_nodeid[ "recl2" ] = 0x05000002;
2438  host_nodeid[ "recl3" ] = 0x05000003;
2439  host_nodeid[ "rklm1" ] = 0x07000001;
2440  host_nodeid[ "rtrg1" ] = 0x10000001;
2441 
2442 
2443  gethostname(hostnamebuf, sizeof(hostnamebuf));
2444  if (pcie40_node_id != NODE_ID_TEST_BENCH) {
2445  std::map<string, unsigned int>::iterator itr;
2446  itr = host_nodeid.find(hostnamebuf);
2447  if (itr != host_nodeid.end()) {
2448  if (itr->second != pcie40_node_id) {
2449  pthread_mutex_lock(&(mtx_sender_log));
2450  printf("[FATAL] Node_id argument ( 0x%.8x ) is invalid. Node_id for %s is 0x%.8x. Exiting...\n",
2451  pcie40_node_id, (itr->first).c_str(), itr->second);
2452  pthread_mutex_unlock(&(mtx_sender_log));
2453  exit(1);
2454  } else {
2455  pthread_mutex_lock(&(mtx_sender_log));
2456  printf("[DEBUG] (hostname %s, nodeid 0x%.8x ) concides with stored info.( %s 0x%.8x )\n", hostnamebuf, pcie40_node_id,
2457  (itr->first).c_str(), itr->second); fflush(stdout);
2458  pthread_mutex_unlock(&(mtx_sender_log));
2459  }
2460  } else {
2461  pthread_mutex_lock(&(mtx_sender_log));
2462  printf("[FATAL] This sever's hostname is not for a PCIe40 ROPC( %s ). Please use 0x%.8x for a test. Exiting...\n", hostnamebuf,
2463  NODE_ID_TEST_BENCH);
2464  pthread_mutex_unlock(&(mtx_sender_log));
2465  exit(1);
2466  }
2467 
2468  }
2469  fflush(stdout);
2470 
2471 #ifdef USE_ZMQ
2473  // ZMQ initialize
2475  zmq::context_t ctx(0);
2476  const pid_t pid = getpid();
2477  for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2478  zmq_writer[i] = new zmq::socket_t(ctx, ZMQ_PAIR);
2479  zmq_reader[i] = new zmq::socket_t(ctx, ZMQ_PAIR);
2480  char zpath[256];
2481  snprintf(zpath, sizeof(zpath), "inproc:///dev/shm/des_ser_PCIe40_main.%d.%d.ipc", pid, i);
2482  zmq_writer[i]->bind(zpath);
2483  zmq_reader[i]->connect(zpath);
2484  }
2485 #else
2487  // buffer for inter-threads communication
2489  for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2490  data_1[i] = new unsigned int[MAX_EVENT_WORDS];
2491  data_2[i] = new unsigned int[MAX_EVENT_WORDS];
2492  }
2493 #endif
2494 
2496  // Initialize variables
2498  double init_time = getTimeSec();
2499  double prev_time = init_time;
2500  unsigned long long int cnt = 0;
2501  unsigned long long int prev_cnt = 0;
2502  unsigned long long int start_cnt = 300000;
2503 #ifndef USE_ZMQ
2504  int buffer_id[NUM_SENDER_THREADS];
2505 #endif
2506  int total_words = 0;
2507 #ifndef USE_ZMQ
2508  for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2509  buffer_id[i] = 0;
2510  buffer_filled[i][0] = 0;
2511  buffer_filled[i][1] = 0;
2512  copy_nwords[i][0] = 0;
2513  copy_nwords[i][1] = 0;
2514  }
2515 #endif
2516 
2517 
2519  // Initialize PCIe40
2521  printf("[DEBUG] Initializing PCIe40 readout...\n"); fflush(stdout);
2522 
2523  std::ofstream the_file ;
2524  if (writeInFile) the_file.open("data_file.txt") ;
2525  double triggerRate = 400 ; // kHz
2526  double data_size = 0. ;
2527  int size = 0x1F ;
2528  int res = ecs_open(0, 0) ;
2529 
2530  if (-1 == res) {
2531  pthread_mutex_lock(&(mtx_sender_log));
2532  printf("ERROR: Could not open device (BAR 0)\n") ;
2533  pthread_mutex_unlock(&(mtx_sender_log));
2534  } else {
2535  pthread_mutex_lock(&(mtx_sender_log));
2536  printf("SUCCESS: Device opened for ECS 0\n");
2537  pthread_mutex_unlock(&(mtx_sender_log));
2538  }
2539 
2540  res = ecs_open(0, 2) ;
2541  if (-1 == res) {
2542  pthread_mutex_lock(&(mtx_sender_log));
2543  printf("ERROR: Could not open device (BAR 2)\n") ;
2544  pthread_mutex_unlock(&(mtx_sender_log));
2545  } else {
2546  pthread_mutex_lock(&(mtx_sender_log));
2547  printf("SUCCESS: Device opened for ECS 2\n");
2548  pthread_mutex_unlock(&(mtx_sender_log));
2549  }
2550  // DMA part
2551  res = dma_open(0) ;
2552  if (-1 == res) {
2553  pthread_mutex_lock(&(mtx_sender_log));
2554  printf("ERROR: Could not open device (DMA)\n") ;
2555  pthread_mutex_unlock(&(mtx_sender_log));
2556  } else {
2557  pthread_mutex_lock(&(mtx_sender_log));
2558  printf("SUCCESS: Device opened for DMA\n");
2559  pthread_mutex_unlock(&(mtx_sender_log));
2560  }
2561 
2562 
2563  // Read the active links
2564  unsigned int masks0 = ecs_read(0, 2, 0x50520) ;
2565  unsigned int masks1 = ecs_read(0, 2, 0x50540) ;
2566  std::vector< int > valid_ch ;
2567  valid_ch.clear();
2568  for (int i = 0 ; i < 24 ; i++) {
2569  if ((masks0 & (1 << i)) != 0) valid_ch.push_back(i) ;
2570  }
2571  for (int i = 24 ; i < MAX_PCIE40_CH ; i++) {
2572  if ((masks1 & (1 << (i - 24))) != 0) valid_ch.push_back(i) ;
2573  }
2574 
2575 
2576  // printf("[DEBUG] mask stauts\n");
2577  // printf("[DEBUG] mask register : %.8x %.8x\n", masks0, masks1);
2578  // int temp_valid_pos = 0;
2579  // for (int i = 0 ; i < MAX_PCIE40_CH ; i++) {
2580  // if (valid_ch[temp_valid_pos] == i) {
2581  // printf("[DEBUG] ch %d 1\n", i);
2582  // temp_valid_pos++;
2583  // } else {
2584  // printf("[DEBUG] ch %d 0\n", i);
2585  // }
2586  // }
2587  int num_of_chs = valid_ch.size() ;
2588  pthread_mutex_lock(&(mtx_sender_log));
2589  printf("[DEBUG] # of used channels = %d\n", num_of_chs); fflush(stdout);
2590  pthread_mutex_unlock(&(mtx_sender_log));
2591  if (num_of_chs <= 0) {
2592  pthread_mutex_lock(&(mtx_sender_log));
2593  printf("[FATAL] %s : No channels are used for this PCIe40 board. Please mask this readout PC with runcontrol GUI (or exclude sub-system if this is the only readout PC of the sub-system). Exiting..\n",
2594  hostnamebuf);
2595  fflush(stdout);
2596  pthread_mutex_unlock(&(mtx_sender_log));
2597  exit(1);
2598  }
2599 
2600  // initialize sum of error counters;
2601  for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2602  for (int j = 0; j < MAX_PCIE40_CH; j++) {
2603  crc_err_ch[i][j] = 0;
2604  }
2605  }
2606 
2607  //
2608  // ulreset to clear FIFOs and rest state machines in userlogic
2609  //
2610  ecs_write(0, 2, 0x00050400, 0x0);
2611  ecs_write(0, 2, 0x00050400, 0x4);
2612  ecs_write(0, 2, 0x00050400, 0x0);
2613 
2614  unsigned int initial_value = pcie40_getNbWordInEvent(0) ;
2615  if (initial_value == 0) pcie40_setNbWordInEvent(0, 0xFF) ;
2616  pcie40_dmaReset(0) ;
2617  pcie40_dmaStop(0) ;
2618  if (! isData) {
2619  pcie40_enableGenerator(0) ;
2620  pcie40_useDataFromGenerator(0) ;
2621  } else {
2622  pcie40_disableGenerator(0) ;
2623  pcie40_useDataFromFibers(0) ;
2624  }
2625  //pcie40_useFreeSignal( 0 ) ;
2626  int t_rate = 10416.666 / ((double) triggerRate) - 1 ;
2627  pcie40_setGeneratorTriggerFrequency(0, t_rate) ;
2628  pcie40_setNbEvents(0, 0) ;
2629  // start DAQ
2630  pcie40_resetSPages(0) ;
2631  pcie40_dmaSetReadout(0) ;
2632  pcie40_setSizeFromHeader(0) ;
2633  if (! isData)
2634  pcie40_setNbWordInEvent(0, size) ;
2635  pcie40_setBusyLevel(0, 0x502) ;
2636  dma_initialize(0) ;
2637 
2638  pthread_mutex_lock(&(mtx_sender_log));
2639  printf("[DEBUG] PCIe40 readout was initialized.\n"); fflush(stdout);
2640  pthread_mutex_unlock(&(mtx_sender_log));
2641 
2643  // Make sender threads
2645  int run_no;
2646  pthread_t sender_thr[NUM_SENDER_THREADS];
2647  // std::thread sender_thr[NUM_SENDER_THREADS];
2648  sender_argv snd_argv[NUM_SENDER_THREADS];
2649 
2650  for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2651  snd_argv[i].sender_id = i;
2652  snd_argv[i].valid_ch = valid_ch;
2653  snd_argv[i].node_id = pcie40_node_id;
2654  int ret = pthread_create(&(sender_thr[i]), NULL, sender, &(snd_argv[i]));
2655  if (ret != 0) {
2656  pthread_mutex_lock(&(mtx_sender_log));
2657  printf("[FATAL] Failed to create a thread. ret = %d. Exting...\n", ret);
2658  fflush(stdout);
2659  pthread_mutex_unlock(&(mtx_sender_log));
2660  exit(1);
2661  }
2662  }
2663 
2664 #ifndef CRC_CHECK
2665  pthread_mutex_lock(&(mtx_sender_log));
2666  // printf("[WARNING] CRC check by software is disabled now !! Relying on check in PCIe40 firmware\n"); fflush(stdout);
2667  printf("[FATAL] CRC check by software is disabled now !! Relying on check in PCIe40 firmware\n"); fflush(stdout);
2668  pthread_mutex_unlock(&(mtx_sender_log));
2669  exit(1);
2670 #endif
2671 
2673  // Initialize readout variables
2675  int rv ;
2676  unsigned int* data ;
2677  unsigned int* combined_data = NULL;
2678  unsigned int* buf_combined = new unsigned int[ MAX_EVENT_WORDS ];
2679  int new_buf_combined = 0;
2680  long long int get_sp_cnt = 0 ;
2681  // int get_sp_cnt = 0x7fff0000 ;
2682  int k = 0 ;
2683 
2684  unsigned int evtnum = 0;
2685  unsigned int exprun = 0;
2686  unsigned int prev_exprun = 0;
2687  int errors = 0 ;
2688  unsigned int esize = 0 ;
2689  int total_pages = 0 ;
2690  int index_pages = 0 ;
2691  int previous_index = 0 ;
2692  unsigned int frag_size = 0 ;
2693  // auto t1 = std::chrono::high_resolution_clock::now();
2694  double m_start_time = getTimeSec();
2695  double m_prev_time = 0.;
2696  unsigned int event_words = 0;
2697  double total_size_bytes = 0.;
2698  double prev_total_size_bytes = 0.;
2699  double total_eve_cnt = 0.;
2700  double prev_total_eve_cnt = 0.;
2701  int first_flag = 0;
2702  int first_event_flag = 0;
2703  unsigned int evecnt = 0;
2704  unsigned int prev_evecnt = 0;
2705  int client_id = 0;
2706  int dma_hdr_offset = 0;
2707 
2709  // Main loop
2711  pthread_mutex_lock(&(mtx_sender_log));
2712  printf("[INFO] des_ser_PCIe40_main: Reading the 1st event from a PCIe40 board...\n"); fflush(stdout);
2713  pthread_mutex_unlock(&(mtx_sender_log));
2714  for (;;) {
2716  // Main loop
2718  while (true) {
2719  // usleep(100000);
2720  // start DMA and wait for one or more super pages of data
2721  rv = pcie40_dmaStart(0) ;
2722  //printf( "Number of super page received: %d\n" , rv ) ;
2723  // #pragma omp parallel for
2724  for (int j = 0 ; j < rv * S_PAGE_SLOT_NMB ; ++j) {
2725  event_words = 0;
2726  data = pcie40_getSuperPageCopy(0, (get_sp_cnt / S_PAGE_SLOT_NMB) % S_PAGES, get_sp_cnt % S_PAGE_SLOT_NMB) ;
2727  if (! isData) {
2728  checkEventGenerator(data, get_sp_cnt, size);
2729  } else {
2730  // Check DMA header and trailer
2731  int ret = checkDMAHeader(data, frag_size, data_size, total_pages, index_pages) ;
2732 
2733  if (first_event_flag == 0) {
2734  pthread_mutex_lock(&(mtx_sender_log));
2735  printf("[INFO] des_ser_PCIe40_main: Done. the size of the 1st packet is %d bytes.\n", (int)data_size); fflush(stdout);
2736  pthread_mutex_unlock(&(mtx_sender_log));
2737  first_event_flag = 1;
2738  }
2739 
2740  if (first_flag == 0 && index_pages != 0 && ret < 1) {
2741  pthread_mutex_lock(&(mtx_sender_log));
2742  printf("Invalid index error : tot %d index %d ret %d\n", total_pages, index_pages, ret);
2743  pthread_mutex_unlock(&(mtx_sender_log));
2744  ret = 1;
2745  }
2746  first_flag = 1;
2747 
2748  if (ret == 0) { // No error in checkDMAHeader()
2749  if (total_pages > 1 && total_pages <= 0xffff) { // Multiple DMA packets for an event
2750  //
2751  // Prepare buffer for combined data
2752  //
2753  if (index_pages == 0) {
2754  esize = frag_size ;
2755  if (total_pages * S_PAGE_SLOT_SIZE / 4 > MAX_EVENT_WORDS) {
2756  new_buf_combined = 1;
2757  combined_data = new unsigned int[ total_pages * S_PAGE_SLOT_SIZE / 4 ] ;
2758  } else {
2759  new_buf_combined = 0;
2760  combined_data = buf_combined;
2761  }
2762  } else {
2763  esize += frag_size ;
2764  }
2765 
2766  if (combined_data == NULL) {
2767  pthread_mutex_lock(&(mtx_sender_log));
2768  printf("[FATAL] Data buffer is not yet allocated. %p\n", combined_data);
2769  pthread_mutex_unlock(&(mtx_sender_log));
2770  fflush(stdout);
2771  exit(1);
2772  }
2773  // Store a DMA packet in buffer for combined data
2774  memcpy(&combined_data[ previous_index ], data + DMA_HDR_WORDS, 8 * (frag_size - 2) * 4) ;
2775  delete [] data ;
2776  data = NULL;
2777  previous_index = previous_index + 8 * (frag_size - 2) ;
2778 
2779  // Get more DMA packets to complete an event
2780  if (index_pages != (total_pages - 1)) {
2781  get_sp_cnt++;
2782  if ((get_sp_cnt > 0) && ((get_sp_cnt % S_PAGE_SLOT_NMB) == 0)) pcie40_freeSuperPage(0, 1) ;
2783  continue ; //
2784  }
2785  // End of an event
2786  dma_hdr_offset = 0;
2787 
2788  } else if (total_pages == 1) { // One DMA packet for an event
2789  // End of an event
2790  esize = frag_size ;
2791  combined_data = data;
2792  new_buf_combined = 2; // Delete data[] later
2793  dma_hdr_offset = DMA_HDR_WORDS;
2794  } else {
2795  pthread_mutex_lock(&(mtx_sender_log));
2796  printf("Invalid total pages %d\n", total_pages);
2797  pthread_mutex_unlock(&(mtx_sender_log));
2798  exit(1);
2799  }
2800  } else {
2801  exit(1);
2802  if (exit_on_error) exit(0) ;
2803  errors++ ;
2804  }
2805 
2806  //
2807  // End of an event
2808  //
2809  if (ret < 1) {
2810  if (combined_data != NULL) {
2811  // if (k < 10)printFullData(combined_data + dma_hdr_offset);
2812  event_words = combined_data[ dma_hdr_offset + EVENT_LEN_POS ];
2813  if (event_words >= 0 && event_words < 32000) {
2814  total_size_bytes += ((double)event_words) * 4.;
2815  total_eve_cnt++;
2816  } else {
2817  pthread_mutex_lock(&(mtx_sender_log));
2818  printf("Strange event size %.8x ret %d\n", event_words, ret);
2819  printFullData(combined_data + dma_hdr_offset);
2820  pthread_mutex_unlock(&(mtx_sender_log));
2821  }
2822  }
2823  evecnt++;
2824  }
2825  }
2826  previous_index = 0 ;
2827 
2828  //
2829  // Send data to senders
2830  //
2831  if (event_words > 0 && event_words < MAX_EVENT_WORDS) {
2832 
2833  //
2834  // Check event # incrementation
2835  //
2836  unsigned int* temp_data = combined_data + dma_hdr_offset;
2837  if (evtnum + 1 != temp_data[EVENUM_POS]) {
2838  if (exprun == temp_data[RUNNO_POS]
2839  && exprun != 0) { // After a run-change or if this is the 1st event, event incrementation is not checked.
2840  printEventNumberError(temp_data, evtnum, exprun, 1, -1);
2841 #ifndef NO_ERROR_STOP
2842  exit(1);
2843 #endif
2844  }
2845  }
2846 
2847  if (exprun != prev_exprun || exprun == 0) {
2848  m_start_time = getTimeSec();
2849  m_prev_time = m_start_time;
2850  evecnt = 1;
2851  prev_evecnt = 1;
2852  total_eve_cnt = 1;
2853  prev_total_eve_cnt = 0;
2854  total_size_bytes = 0.;
2855  prev_total_size_bytes = 0.;
2856  }
2857  evtnum = temp_data[EVENUM_POS];
2858  prev_exprun = exprun;
2859  exprun = temp_data[RUNNO_POS];
2860 
2861  //
2862  // Copy data to buffer
2863  //
2864  client_id = client_id % NUM_SENDER_THREADS;
2865 #ifdef USE_ZMQ
2866  // by ZMQ (experimental)
2867  zmq_writer[client_id]->send(combined_data + dma_hdr_offset, event_words * sizeof(unsigned int));
2868 #else
2869  // by double buffer (original)
2870  if (buffer_id[client_id] == 0) {
2871  while (1) {
2872  if (buffer_filled[client_id][0] == 0)break;
2873  usleep(1);
2874  }
2875 
2876  {
2877  pthread_mutex_lock(&(mtx1_ch[client_id]));
2878  memcpy(data_1[client_id], combined_data + dma_hdr_offset, event_words * sizeof(unsigned int));
2879  copy_nwords[client_id][0] = event_words;
2880  buffer_filled[client_id][0] = 1;
2881  pthread_mutex_unlock(&(mtx1_ch[client_id]));
2882  }
2883  } else {
2884  while (1) {
2885  if (buffer_filled[client_id][1] == 0)break;
2886  usleep(1);
2887  }
2888  {
2889  pthread_mutex_lock(&(mtx2_ch[client_id]));
2890  fflush(stdout);
2891  memcpy(data_2[client_id], combined_data + dma_hdr_offset, event_words * sizeof(unsigned int));
2892  copy_nwords[client_id][1] = event_words;
2893  buffer_filled[client_id][1] = 1;
2894  pthread_mutex_unlock(&(mtx2_ch[client_id]));
2895  }
2896  }
2897 
2898  if (buffer_id[client_id] == 0) {
2899  buffer_id[client_id] = 1;
2900  } else {
2901  buffer_id[client_id] = 0;
2902  }
2903 #endif
2904  client_id++;
2905  } else {
2906  pthread_mutex_lock(&(mtx_sender_log));
2907  printf("[FATAL] Invalid event-size %d\n", event_words);
2908  fflush(stdout);
2909  pthread_mutex_unlock(&(mtx_sender_log));
2910  exit(1);
2911  }
2912 
2913  //
2914  // Error-count monitor
2915  //
2916  previous_index = 0 ;
2917  if (new_buf_combined == 1) {
2918  delete [] combined_data ;
2919  combined_data = NULL;
2920  } else if (new_buf_combined == 2) {
2921  delete [] data ;
2922  }
2923  first_flag = 0;
2924  // if ( i != getEventNumber( data ) ) printf( "Mismatch event number %d %d\n" , i , getEventNumber( data ) ) ;
2925  get_sp_cnt++;
2926  ++k ;
2927  if ((evecnt % 100000) == 0 ||
2928  ((evecnt % 10000) == 0 && 0 < evecnt && evecnt < 100000) ||
2929  evecnt == 1
2930  ) {
2931  unsigned int sum_total_crc_good = 0;
2932  unsigned int sum_total_crc_errors = 0;
2933  unsigned int sum_err_flag_cnt = 0;
2934  unsigned int sum_cur_evtnum = 0;
2935  unsigned int sum_err_not_reduced = 0;
2936  unsigned int sum_err_bad_7f7f = 0;
2937  unsigned int sum_err_bad_runnum = 0;
2938  unsigned int sum_err_bad_linknum = 0;
2939  unsigned int sum_err_bad_evenum = 0;
2940  unsigned int sum_err_bad_ffaa = 0;
2941  unsigned int sum_err_bad_ff55 = 0;
2942  unsigned int sum_err_bad_linksize = 0;
2943  unsigned int sum_err_link_eve_jump = 0;
2944  unsigned int sum_crc_err_ch[ MAX_PCIE40_CH] = {0};
2945 
2946  if (evecnt != 1) {
2947  for (int l = 0; l < NUM_SENDER_THREADS; l++) {
2948  sum_total_crc_good += total_crc_good[l];
2949  sum_total_crc_errors += total_crc_errors[l];
2950  sum_err_flag_cnt += err_flag_cnt[l];
2951  sum_cur_evtnum += cur_evtnum[l];
2952  sum_err_not_reduced += err_not_reduced[l];
2953  sum_err_bad_7f7f += err_bad_7f7f[l];
2954  sum_err_bad_runnum += err_bad_runnum[l];
2955  sum_err_bad_linknum += err_bad_linknum[l];
2956  sum_err_bad_evenum += err_bad_evenum[l];
2957  sum_err_bad_ffaa += err_bad_ffaa[l];
2958  sum_err_bad_ff55 += err_bad_ff55[l];
2959  sum_err_bad_linksize += err_bad_linksize[l];
2960  sum_err_link_eve_jump += err_link_eve_jump[l];
2961 
2962  // if (cur_exprun[0] != cur_exprun[l]) {
2963  // pthread_mutex_lock(&(mtx_sender_log));
2964  // printf("[FATAL] exprun mismatch thr 0 = 0x%.8x , thr %d = 0x%.8x", cur_exprun[0], l, cur_exprun[l]);
2965  // pthread_mutex_unlock(&(mtx_sender_log));
2966  // exit(1);
2967  // }
2968 
2969  for (int m = 0; m < MAX_PCIE40_CH; m++) {
2970  sum_crc_err_ch[m] += crc_err_ch[l][m];
2971  }
2972  }
2973  }
2974 
2975  double cur_time = getTimeSec();
2976  double total_time = cur_time - m_start_time;
2977  double interval = cur_time - m_prev_time;
2978  m_prev_time = cur_time;
2979  time_t timer;
2980  struct tm* t_st;
2981  time(&timer);
2982  t_st = localtime(&timer);
2983  pthread_mutex_lock(&(mtx_sender_log));
2984 
2985  printf("[DEBUG] Event %12d Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s] evenum %12d exprun 0x%.8x eve_size %6.2lf[kB] numch %d nonred %u crcok %u crcng %u evejump %d bad_7f7f %d bad_runnum %d bad_linknum %d bad_evenum %d bad_ffaa %d bad_ff55 %d bad_linksize %d no_data %d bad_header %d bad_size %d bad_size_dmatrl %d bad_dmatrl %d bad_word_size %d %s",
2986  evecnt, (evecnt - prev_evecnt) / interval / 1.e3,
2987  (total_size_bytes - prev_total_size_bytes) / interval / 1.e6,
2988  total_time, interval,
2989  evtnum, exprun,
2990  (total_size_bytes - prev_total_size_bytes) / (total_eve_cnt - prev_total_eve_cnt) / 1.e3,
2991  num_of_chs,
2992  sum_err_not_reduced, sum_total_crc_good, sum_total_crc_errors, sum_err_link_eve_jump,
2993  sum_err_bad_7f7f,
2994  sum_err_bad_runnum, sum_err_bad_linknum, sum_err_bad_evenum, sum_err_bad_ffaa, sum_err_bad_ff55, sum_err_bad_linksize,
2995  dmaerr_no_data, dmaerr_bad_header, dmaerr_bad_size, dmaerr_bad_size_dmatrl, dmaerr_bad_dmatrl, dmaerr_bad_word_size,
2996  asctime(t_st));
2997  fflush(stdout);
2998  pthread_mutex_unlock(&(mtx_sender_log));
2999  prev_total_size_bytes = total_size_bytes;
3000  prev_evecnt = evecnt;
3001  prev_total_eve_cnt = total_eve_cnt;
3002  }
3003 
3004  if ((k % 100) == 0)
3005  if (writeInFile) writeToFile(the_file, data, esize) ;
3006  if ((get_sp_cnt > 0) && ((get_sp_cnt % S_PAGE_SLOT_NMB) == 0)) pcie40_freeSuperPage(0, 1) ;
3007  }
3008  }
3009 
3010  //
3011  // Rate Monitor
3012  //
3013  cnt++;
3014  if (cnt == start_cnt) init_time = getTimeSec();
3015  if (cnt % 10000 == 1) {
3016  if (cnt > start_cnt) {
3017  double cur_time = getTimeSec();
3018  pthread_mutex_lock(&(mtx_sender_log));
3019  printf("run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
3020  run_no,
3021  cnt,
3022  cur_time - init_time,
3023  NUM_SENDER_THREADS * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
3024  (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
3025  NUM_SENDER_THREADS * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
3026  (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
3027  fflush(stdout);
3028  pthread_mutex_unlock(&(mtx_sender_log));
3029  prev_time = cur_time;
3030  prev_cnt = cnt;
3031  } else {
3032  // printf("Eve %lld\n", cnt);fflush(stdout);
3033  }
3034  }
3035  }
3036 
3037  //
3038  // Close PCIe40
3039  //
3040  ecs_close(0, 0) ;
3041  ecs_close(0, 2) ;
3042  dma_close(0) ;
3043  if (writeInFile) the_file.close() ;
3044 
3045  //
3046  // Close threads and delete buffers
3047  //
3048  for (int i = 0; i < NUM_SENDER_THREADS; i++) {
3049  pthread_join(sender_thr[i], NULL);
3050 #ifndef USE_ZMQ
3051  pthread_mutex_destroy(&(mtx1_ch[i]));
3052  pthread_mutex_destroy(&(mtx2_ch[i]));
3053  delete data_1[i];
3054  delete data_2[i];
3055 #endif
3056  }
3057  pthread_mutex_destroy(&mtx_sender_log);
3058  return 0;
3059 }
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91