Belle II Software development
des_ser_PCIe40_main.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include "des_ser_PCIe40_main.h"
9using namespace std;
10
11#ifdef SPLIT_ECL_ECLTRG
12const std::vector<int> splitted_ch {16}; // recl3: cpr6001-6008,cpr13001 (0-15,16ch)
13#endif
14
15#ifndef USE_ZMQ
16unsigned int* data_1[NUM_SENDER_THREADS];
17unsigned int* data_2[NUM_SENDER_THREADS];
18
19//pthread_t sender_thr[NUM_CLIENTS];
20pthread_mutex_t mtx1_ch[NUM_SENDER_THREADS];
21pthread_mutex_t mtx2_ch[NUM_SENDER_THREADS];
22#endif
23pthread_mutex_t mtx_sender_log;
24
25#ifndef USE_ZMQ
26int buffer_filled[NUM_SENDER_THREADS][2];
27int copy_nwords[NUM_SENDER_THREADS][2];
28#endif
29
30#ifdef USE_ZMQ
32// Handshake by ZMQ
34zmq::socket_t* zmq_writer[NUM_SENDER_THREADS];
35zmq::socket_t* zmq_reader[NUM_SENDER_THREADS];
36#endif
37
39// From main_pcie40_dmahirate.cpp
41bool exit_on_error = false ;
42int nTot = 100000 ;
43//int max_number_of_messages = 0x10000000;
44unsigned int max_number_of_messages = 10;
45
46
48// Error counter in checkDMAHeader()
50unsigned int dmaerr_no_data = 0;
51unsigned int dmaerr_bad_size = 0;
52unsigned int dmaerr_bad_size_dmatrl = 0;
53unsigned int dmaerr_bad_word_size = 0;
54unsigned int dmaerr_bad_header = 0;
55unsigned int dmaerr_bad_dmatrl = 0;
57// Error counter in checkEventData()
59
60unsigned int total_crc_good[NUM_SENDER_THREADS] = {0};
61int total_crc_errors[NUM_SENDER_THREADS] = {0};
62unsigned int err_flag_cnt[NUM_SENDER_THREADS] = {0};
63unsigned int cur_evtnum[NUM_SENDER_THREADS] = {0};
64
65unsigned int err_not_reduced[NUM_SENDER_THREADS] = {0};
66unsigned int err_bad_7f7f[NUM_SENDER_THREADS] = {0};
67unsigned int err_bad_runnum[NUM_SENDER_THREADS] = {0};
68unsigned int err_bad_linknum[NUM_SENDER_THREADS] = {0};
69unsigned int err_bad_evenum[NUM_SENDER_THREADS] = {0};
70unsigned int err_bad_ffaa[NUM_SENDER_THREADS] = {0};
71unsigned int err_bad_ff55[NUM_SENDER_THREADS] = {0};
72unsigned int err_bad_linksize[NUM_SENDER_THREADS] = {0};
73unsigned int err_link_eve_jump[NUM_SENDER_THREADS] = {0};
74unsigned int crc_err_ch[NUM_SENDER_THREADS][ MAX_PCIE40_CH];
75
77// hostname
79std::map< string, unsigned int > host_nodeid;
80char hostnamebuf[50];
81
82
83unsigned int n_messages[17] = {0};
84// std::map< int , int > n_messages = {
85// { 0 , 0 } , // no data
86// { 1 , 0 } , // bad header
87// { 2 , 0 } , // bad size
88// { 3 , 0 } , // Bad word size
89// { 4 , 0 } , // Bad belle2 header
90// { 5 , 0 } , // bad trailer size
91// { 6 , 0 } , // bad trailer
92// { 7 , 0 } , // bad 7ff code
93// { 8 , 0 } , // bad version
94// { 9 , 0 } , // bad runnber
95// { 10 , 0 } , // bad event numnber
96// { 11 , 0 } , // bad link number
97// { 12 , 0 } , // bad FFAA
98// { 13 , 0 } , // bad link size
99// { 14 , 0 } , // bad data size
100// { 15 , 0 } , // Bad CRC
101// { 16 , 0 } // missing links
102// };
103
104const int CRC16_XMODEM_TABLE[] = {
105 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
106 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
107 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
108 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
109 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
110 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
111 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
112 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
113 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
114 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
115 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
116 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
117 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
118 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
119 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
120 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
121 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
122 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
123 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
124 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
125 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
126 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
127 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
128 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
129 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
130 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
131 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
132 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
133 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
134 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
135 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
136 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
137} ;
138
139void crc_calc(unsigned int& crc, const unsigned int& data)
140{
141 int byte1, byte2, byte3, byte4 ;
142 byte1 = data & 0xFF;
143 byte2 = (data & 0xFF00) >> 8;
144 byte3 = (data & 0xFF0000) >> 16;
145 byte4 = (data & 0xFF000000) >> 24;
146 crc = (((crc) << 8) & 0xff00) ^ CRC16_XMODEM_TABLE[(((crc) >> 8) & 0xff)^byte4] ;
147 crc = (((crc) << 8) & 0xff00) ^ CRC16_XMODEM_TABLE[(((crc) >> 8) & 0xff)^byte3] ;
148 crc = (((crc) << 8) & 0xff00) ^ CRC16_XMODEM_TABLE[(((crc) >> 8) & 0xff)^byte2] ;
149 crc = (((crc) << 8) & 0xff00) ^ CRC16_XMODEM_TABLE[(((crc) >> 8) & 0xff)^byte1] ;
150}
151
152unsigned int get_crc(unsigned int* data, int length, unsigned int initial_value)
153{
154 unsigned int result = initial_value ;
155 // printf("get_crc()\n");
156 for (int i = 0 ; i < length ; ++i) {
157 crc_calc(result, data[ i ]) ;
158 // printf("%.8d %.8x %.8x\n", i, result, data[i]);
159 }
160 return result ;
161}
162
163
164int getEventNumber(const unsigned int* data)
165{
166 if (0 != data) return data[4] ;
167 else return -1 ;
168}
169
170void printHeader(unsigned int* data)
171{
172 if (0 != data) {
173 printf("Header : %8X%8X%8X%8X%8X%8X%8X%8X\n", data[7], data[6], data[5], data[4],
174 data[3], data[2], data[1], data[0]) ;
175 } else printf("No data\n") ;
176}
177
178void printTrailer(unsigned int* data)
179{
180 if (0 != data) {
181 printf("Trailer : %8X%8X%8X%8X%8X%8X%8X%8X\n", data[7], data[6], data[5], data[4],
182 data[3], data[2], data[1], data[0]) ;
183 } else printf("No data\n") ;
184}
185
186void printData(unsigned int* data)
187{
188 if (0 != data) {
189 printf("Data : %8X%8X%8X%8X%8X%8X%8X%8X\n", data[7], data[6], data[5], data[4],
190 data[3], data[2], data[1], data[0]) ;
191 } else printf("No data\n") ;
192}
193
194void writeToFile(std::ofstream& the_file, const unsigned int* data, int size)
195{
196 the_file << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << std::endl ; // to separate events
197 for (int i = 0 ; i < 8 * (size - 2) ; ++i) { // Write the data in 32bit values
198 the_file << std::hex << data[ i ] << std::endl ;
199 }
200}
201
202void printLine(unsigned int* data, int pos)
203{
204 pos = pos - (pos % 8);
205 printf("pos %.8x : ", pos);
206 for (int i = pos; i < pos + 8; i++) {
207 printf(" %.8x", data[ pos + i]);
208 }
209 printf("\n");
210 fflush(stdout);
211}
212
213void printEventData(unsigned int* data)
214{
215 int eventSize = ((data[ 0 ] & 0xFFFF)) - 1 ; // minus header
216 printf(" eve 0 : ");
217 for (int i = 0 ; i < eventSize; ++i) {
218 printf("%.8x ", data[ i ]);
219 if (i % 8 == 7)printf("\n eve %.3x : ", i);
220 }
221 fflush(stdout);
222}
223
224void printEventData(unsigned int* data, int size)
225{
226 printf("%.8x : ", 0);
227 if (0 != data) {
228 for (int i = 0 ; i < size; ++i) {
229 printf("%.8x ", data[ i ]);
230 if (i % 8 == 7)printf("\n%.8x : ", i + 1);
231 }
232 } else printf("No data\n") ;
233 printf("\n");
234 fflush(stdout);
235}
236
237void printEventData(unsigned int* data, int size, int sender_id)
238{
239 printf("thread %d : %.8x : ", sender_id, 0);
240 if (0 != data) {
241 for (int i = 0 ; i < size; ++i) {
242 printf("%.8x ", data[ i ]);
243 if (i % 8 == 7)printf("\nthread %d : %.8x : ", sender_id, i + 1);
244 }
245 } else printf("No data\n") ;
246 printf("\n");
247 fflush(stdout);
248}
249
250void printFullData(unsigned int* data)
251{
252 printf("Header : %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n",
253 data[0], data[1], data[2], data[3],
254 data[4], data[5], data[6], data[7]) ;
255 // printf( "Header : %8X%8X%8X%8X%8X%8X%8X%8X\n" , data[7], data[6] ,data[5] ,data[4],
256 // data[3], data[2], data[1], data[0] ) ;
257
258 int eventSize = ((data[ 0 ] & 0xFF)) - 1 ; // minus header
259
260 printf(" data 0 : ");
261 for (int i = 0 ; i < eventSize * 8 ; ++i) {
262 printf("%.8x ", data[ i ]);
263 if (i % 8 == 7)printf("\n data %.3x : ", i);
264 }
265 // for ( int i = 1 ; i < eventSize + 10 ; ++i ) {
266 // printf( "data %3d: %8X%8X%8X%8X%8X%8X%8X%8X\n" , i-1 , data[ 8*i+7 ] , data[ 8*i+6 ] , data[ 8*i+5 ] ,
267 // data[ 8*i+4 ], data[ 8*i+3 ], data[ 8*i+2 ], data[ 8*i+1 ], data[ 8*i ] ) ;
268 // }
269 printf("\nTrailer : %8X %8X %8X %8X %8X %8X %8X %8X\n", data[8 * eventSize + 7], data[8 * eventSize + 6],
270 data[8 * eventSize + 5], data[8 * eventSize + 4],
271 data[8 * eventSize + 3], data[8 * eventSize + 2], data[8 * eventSize + 1], data[8 * eventSize]) ;
272 fflush(stdout);
273}
274
275int get1stChannel(const unsigned int*& data)
276{
277 int ret_1st_ch = -1;
278 unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
279
280 for (int i = 0; i < MAX_PCIE40_CH; i++) {
281 int linksize = 0;
282 if (i < MAX_PCIE40_CH - 1) {
283 linksize = data[ POS_TABLE_POS + (i + 1) ] - data[ POS_TABLE_POS + i ];
284 } else {
285 linksize = event_length - (data[ POS_TABLE_POS + (MAX_PCIE40_CH - 1) ] + LEN_ROB_TRAILER);
286 }
287 if (linksize > 0) {
288 ret_1st_ch = i;
289 break;
290 }
291 }
292 return ret_1st_ch;
293}
294
295
296void printEventNumberError(unsigned int*& data, const unsigned int evtnum, const unsigned int exprun, const int eve_diff,
297 const int sender_id)
298{
299 //
300 // event # check ( Since this check is done in a single thread, only differnce in the prev. event came to this thread can be checked.
301 // So, if event # from PCIe40 are in order like, 0, 3, 2, 10002, 9746, 5, 8, 7, 10007, 9753, No event jump can be issued.
302 // eb0 will check futher check.
303 //
304
305 unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
306 char err_buf[2000] = {0};
307 int reduced_flag = 1; // 0 : not-reduced(error event) 1: reduced
308 if (data[ MAGIC_7F7F_POS ] & 0x00008000) {
309 reduced_flag = 0;
310 }
311
312 pthread_mutex_lock(&(mtx_sender_log));
313 n_messages[ 10 ] = n_messages[ 10 ] + 1 ;
314 if (reduced_flag == 1) {
315 sprintf(err_buf,
316 "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Invalid event_number. Exiting...: cur 32bit eve %u preveve %u for all channels : prun %u crun %u\n %s %s %d\n",
317 sender_id, hostnamebuf, get1stChannel(data),
318 data[EVENUM_POS], evtnum + (eve_diff - 1),
319 exprun, data[RUNNO_POS],
320 __FILE__, __PRETTY_FUNCTION__, __LINE__);
321 } else {
322 sprintf(err_buf,
323 "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Invalid event_number. Exiting...: cur 32bit eve %u preveve %u ( ",
324 sender_id, hostnamebuf, get1stChannel(data),
325 data[EVENUM_POS], evtnum + (eve_diff - 1));
326 int temp_pos = 0;
327 unsigned int temp_eve = 0;
328 for (int i = 0; i < MAX_PCIE40_CH; i++) {
329 int linksize = 0;
330 if (i < MAX_PCIE40_CH - 1) {
331 linksize = data[ POS_TABLE_POS + (i + 1) ] - data[ POS_TABLE_POS + i ];
332 } else {
333 linksize = event_length - (data[ POS_TABLE_POS + (MAX_PCIE40_CH - 1) ] + LEN_ROB_TRAILER);
334 }
335 if (linksize <= 0) continue;
336 temp_pos = data[ POS_TABLE_POS + i ] + OFFSET_HDR;
337 temp_eve = data[ temp_pos +
338 Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
339 Belle2::PreRawCOPPERFormat_latest::POS_TT_TAG ];
340
341 if (evtnum + eve_diff != temp_eve) {
342 sprintf(err_buf + strlen(err_buf),
343 "ch %d eve 0x%.8x : ",
344 i, temp_eve);
345 }
346 }
347 sprintf(err_buf + strlen(err_buf), "prun %u crun %u\n %s %s %d\n",
348 exprun, data[RUNNO_POS],
349 __FILE__, __PRETTY_FUNCTION__, __LINE__);
350 }
351 printf("%s\n", err_buf); fflush(stdout);
352 printEventData(data, event_length, sender_id);
353 err_bad_evenum[sender_id]++;
354 pthread_mutex_unlock(&(mtx_sender_log));
355 return;
356}
357
358
359void checkUtimeCtimeTRGType(unsigned int*& data, const int sender_id)
360{
361 unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
362 unsigned int new_exprun = data[ Belle2::RawHeader_latest::POS_EXP_RUN_NO ] ;
363 unsigned int new_evtnum = data[ Belle2::RawHeader_latest::POS_EVE_NO ] ;
364 //
365 // Check the 7f7f magic word
366 //
367 if ((data[ MAGIC_7F7F_POS ] & 0xFFFF0000) != 0x7F7F0000) {
368 char err_buf[500] = {0};
369 pthread_mutex_lock(&(mtx_sender_log));
370 sprintf(err_buf,
371 "[FATAL] thread %d : %s : ERROR_EVENT : Invalid Magic word in ReadOut Board header( 0x%.8x ) : It must be 0x7f7f???? : eve %u exp %d run %d sub %d : %s %s %d",
372 sender_id, hostnamebuf, data[ MAGIC_7F7F_POS ],
373 new_evtnum,
374 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
375 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
376 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
377 __FILE__, __PRETTY_FUNCTION__, __LINE__);
378 printf("%s\n", err_buf); fflush(stdout);
379 printEventData(data, event_length, sender_id);
380 pthread_mutex_unlock(&(mtx_sender_log));
381#ifndef NO_ERROR_STOP
382 exit(1);
383#endif
384 }
385
386 //
387 // Check if non data-reduction bit was set or not.
388 //
389 if (!(data[ MAGIC_7F7F_POS ] & 0x00008000)) {
390 // reduced
391 pthread_mutex_lock(&(mtx_sender_log));
392 printf("[FATAL] thread %d : %s : This function cannot be used for already reduced data. 7f7f header is 0x%.8x : eve %u exp %d run %d sub %d : %s %s %d\n",
393 sender_id, hostnamebuf, data[ MAGIC_7F7F_POS ],
394 new_evtnum,
395 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
396 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
397 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
398 __FILE__, __PRETTY_FUNCTION__, __LINE__);
399 printEventData(data, event_length, sender_id);
400 pthread_mutex_unlock(&(mtx_sender_log));
401#ifndef NO_ERROR_STOP
402 exit(1);
403#endif
404 }
405
406
407 //
408 // Check consistency of B2L header over all input channels
409 //
410 int flag = 0, err_flag = 0, err_ch = -1;
411 unsigned int temp_utime = 0, temp_ctime_trgtype = 0, temp_eve = 0, temp_exprun = 0;
412 unsigned int utime[MAX_PCIE40_CH], ctime_trgtype[MAX_PCIE40_CH], eve[MAX_PCIE40_CH], exprun[MAX_PCIE40_CH];
413 int used_ch[MAX_PCIE40_CH] = {0};
414 int first_ch = -1;
415
416 memset(utime, 0, sizeof(utime));
417 memset(ctime_trgtype, 0, sizeof(ctime_trgtype));
418 memset(eve, 0, sizeof(eve));
419 memset(exprun, 0, sizeof(exprun));
420
421 for (int i = 0; i < MAX_PCIE40_CH; i++) {
422 unsigned int temp_ctime_trgtype_footer = 0, temp_eve_footer = 0;
423 int linksize = 0;
424 if (i < MAX_PCIE40_CH - 1) {
425 linksize = data[ POS_TABLE_POS + (i + 1) ] - data[ POS_TABLE_POS + i ];
426 } else {
427 linksize = event_length - (data[ POS_TABLE_POS + (MAX_PCIE40_CH - 1) ] + LEN_ROB_TRAILER);
428 }
429 if (linksize <= 0) {
430 continue;
431 } else {
432 used_ch[ i ] = 1;
433 }
434
435 int temp_pos = data[ POS_TABLE_POS + i ] + OFFSET_HDR;
436 ctime_trgtype[ i ] = data[ temp_pos +
437 Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
438 Belle2::PreRawCOPPERFormat_latest::POS_TT_CTIME_TYPE ];
439 eve[ i ] = data[ temp_pos +
440 Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
441 Belle2::PreRawCOPPERFormat_latest::POS_TT_TAG ];
442 utime[ i ] = data[ temp_pos +
443 Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
444 Belle2::PreRawCOPPERFormat_latest::POS_TT_UTIME ];
445 exprun[ i ] = data[ temp_pos +
446 Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
447 Belle2::PreRawCOPPERFormat_latest::POS_EXP_RUN ];
448 temp_ctime_trgtype_footer = data[ temp_pos + linksize +
449 - (Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER +
450 Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER) +
451 Belle2::PreRawCOPPERFormat_latest::POS_TT_CTIME_B2LFEE ];
452 temp_eve_footer = data[ temp_pos + linksize +
453 - (Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER +
454 Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER) +
455 Belle2::PreRawCOPPERFormat_latest::POS_CHKSUM_B2LFEE ];
456
457 if (flag == 0) {
458 temp_ctime_trgtype = ctime_trgtype[ i ];
459 temp_eve = eve[ i ];
460 temp_utime = utime[ i ];
461 temp_exprun = exprun[ i ];
462 flag = 1;
463 first_ch = i;
464
465 if (temp_eve != new_evtnum) {
466 pthread_mutex_lock(&(mtx_sender_log));
467 printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Invalid event_number. Exiting...: eve in ROBheader = 0x%.8x , ch %d 's eve = 0x%.8x : exp %d run %d sub %d : %s %s %d\n",
468 sender_id, hostnamebuf, i,
469 new_evtnum, i, temp_eve,
470 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
471 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
472 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
473 __FILE__, __PRETTY_FUNCTION__, __LINE__);
474 printEventData(data, event_length, sender_id);
475 pthread_mutex_unlock(&(mtx_sender_log));
476 exit(1);
477 }
478
479 } else {
480 if (temp_ctime_trgtype != ctime_trgtype[ i ] || temp_utime != utime[ i ] ||
481 temp_eve != eve[ i ] || temp_exprun != exprun[ i ]) {
482 err_ch = i;
483 err_flag = 1;
484 }
485
486 }
487
488 //
489 // Mismatch between header and trailer
490 //
491 if (temp_ctime_trgtype != temp_ctime_trgtype_footer || (temp_eve & 0xffff) != ((temp_eve_footer >> 16) & 0xffff)) {
492 pthread_mutex_lock(&(mtx_sender_log));
493 printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : mismatch(finesse %d) between header(ctime 0x%.8x eve 0x%.8x) and footer(ctime 0x%.8x eve_crc16 0x%.8x). Exiting... : exp %d run %d sub %d : %s %s %d\n",
494 sender_id, hostnamebuf, i, i,
495 temp_ctime_trgtype, temp_eve, temp_ctime_trgtype_footer, temp_eve_footer,
496 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
497 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
498 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
499 __FILE__, __PRETTY_FUNCTION__, __LINE__);
500 printEventData(data, event_length, sender_id);
501 pthread_mutex_unlock(&(mtx_sender_log));
502 exit(1);
503 }
504 }
505
506
507 //
508 // Mismatch over channels
509 //
510 if (err_flag == 1) {
511 pthread_mutex_lock(&(mtx_sender_log));
512 char err_buf[20000];
513 sprintf(err_buf,
514 "[FATAL] thread %d : %s ch= %d or %d : ERROR_EVENT : mismatch header value over FINESSEs ( between ch %d and ch %d ). Exiting...: ",
515 sender_id, hostnamebuf, err_ch, first_ch, err_ch, first_ch);
516 for (int i = 0; i < MAX_PCIE40_CH; i++) {
517 if (used_ch[ i ] == 1) {
518 sprintf(err_buf + strlen(err_buf),
519 "\nch = %d ctimeTRGtype 0x%.8x utime 0x%.8x eve 0x%.8x exprun 0x%.8x",
520 i, ctime_trgtype[ i ], utime[ i ], eve[ i ], exprun[ i ]);
521 }
522 }
523 printf("%s\n", err_buf); fflush(stdout);
524 pthread_mutex_unlock(&(mtx_sender_log));
525 exit(1);
526 }
527
528 return;
529}
530
531
532int checkDMAHeader(unsigned int*& data, unsigned int& size, double& dsize, int& total_pages, int& index_pages)
533{
534 if (data == 0) {
535 pthread_mutex_lock(&(mtx_sender_log));
536 n_messages[ 0 ] = n_messages[ 0 ] + 1 ;
537 if (n_messages[ 0 ] < max_number_of_messages) {
538 printf("[WARNING] Null pointer to data buffer\n") ;
539 }
540 dmaerr_no_data++;
541 pthread_mutex_unlock(&(mtx_sender_log));
542 return 1 ;
543 }
544
545 unsigned int fragment_size = data[ DMA_WORDS_OF_256BITS ] & 0xFFFF ;
546 dsize += fragment_size * 32 ; // in bytes
547
548 if (((data[ DMA_WORDS_OF_256BITS ] & 0xFFFF0000) != 0xEEEE0000) ||
549 (data[ DMA_HDR_MAGIC ] != 0xAAAAEEEE) ||
550 ((data[ DMA_SIZE_IN_BYTES ] & 0xFFFF) != 0xAAAA)) {
551 pthread_mutex_lock(&(mtx_sender_log));
552 n_messages[ 4 ] = n_messages[ 4 ] + 1 ;
553 if (n_messages[ 4 ] < max_number_of_messages) {
554 printf("[FATAL] Invalid DMA header format. ( %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x )\n",
555 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]) ;
556 printFullData(data);
557 }
558 dmaerr_bad_header++;
559 pthread_mutex_unlock(&(mtx_sender_log));
560#ifndef NO_ERROR_STOP
561 exit(1);
562#endif
563 return 5 ;
564 } else if ((data[ DMA_WORDS_OF_256BITS ] & 0xFFFF) > MAX_DMA_WORDS_OF_256BITS) {
565 pthread_mutex_lock(&(mtx_sender_log));
566 n_messages[ 2 ] = n_messages[ 2 ] + 1 ;
567 if (n_messages[ 2 ] < max_number_of_messages) {
568 printf("[FATAL] Too large DMA packet(= %lf bytes). ( %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x )\n",
569 dsize,
570 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]) ;
571 }
572 dmaerr_bad_size++;
573 pthread_mutex_unlock(&(mtx_sender_log));
574#ifndef NO_ERROR_STOP
575 exit(1);
576#endif
577 return 3 ;
578 } else if (((data[ DMA_SIZE_IN_BYTES ] & 0xFFFF0000) >> 16) != (fragment_size * 32)) {
579 pthread_mutex_lock(&(mtx_sender_log));
580 n_messages[ 3 ] = n_messages[ 3 ] + 1 ;
581 if (n_messages[ 3 ] < max_number_of_messages) {
582 printf("[FATAL] Inconsistent between byte-size( = %u ) and 8words-size( = %u ) in DMA header. ( %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x )\n"
583 ,
584 (data[ DMA_SIZE_IN_BYTES ] & 0xFFFF0000) >> 16, fragment_size * 32,
585 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]) ;
586 }
587 dmaerr_bad_word_size++;
588 pthread_mutex_unlock(&(mtx_sender_log));
589#ifndef NO_ERROR_STOP
590 exit(1);
591#endif
592 return 4 ;
593 } else
594
595 // Checktrailer
596 if (data[ 8 * (fragment_size - 1) ] != fragment_size) {
597 pthread_mutex_lock(&(mtx_sender_log));
598 n_messages[ 5 ] = n_messages[ 5 ] + 1 ;
599 if (n_messages[ 5 ] < max_number_of_messages) {
600 printf("Bad size in trailer : size %.8x size in hdr %.8x bef %.8x\n", data[8 * (fragment_size - 1)], fragment_size,
601 data[ 8 * (fragment_size - 2) ]) ;
602 printLine(data, 8 * (fragment_size - 1));
603 printFullData(data);
604 // printEventData(data, fragment_size);
605 }
606 if (data[ 8 * (fragment_size - 2) ] == fragment_size) {
607 dmaerr_bad_size_dmatrl++;
608 fragment_size--;
609 }
610 pthread_mutex_unlock(&(mtx_sender_log));
611 }
612 // if ( ( data[ 8*(fragment_size-1)+1 ] != 0 ) || ( data[ 8*(fragment_size-1)+2 ] != 0 ) ||
613 // ( data[ 8*(fragment_size-1)+3 ] != 0 ) || ( data[ 8*(fragment_size-1)+4 ] != 0 ) ||
614 // ( data[ 8*(fragment_size-1)+5 ] != 0 ) || ( data[ 8*(fragment_size-1)+6 ] != 0 ) ||
615 // ( data[ 8*(fragment_size-1)+7 ] != 0 ) ) {
616 // n_messages[ 6 ] = n_messages[ 6 ] + 1 ;
617 // if ( n_messages[ 6 ] < max_number_of_messages ) {
618 // printf( "Bad trailer\n" ) ;
619 // printTrailer( &data[ 8*(fragment_size-1) ] ) ;
620 // }
621 // err_bad_dmatrl[sender_id]++;
622 // // return 7 ;
623 // }
624
625 total_pages = (data[ 4 ] & 0xFFFF0000) >> 16 ;
626 index_pages = (data[ 4 ] & 0xFFFF) ;
627
628 size = fragment_size ;
629
630 // Remve header and trailer from data
631 // unsigned int * tmp = new unsigned int[ S_PAGE_SLOT_SIZE/4 ] ;
632 // memcpy( tmp , &data[ 8 ], 8*(fragment_size-2)*4 ) ;
633 // delete [] data ;
634 // data = tmp ;
635 // if ( total_pages != 1 ) return -1 ;
636 return 0 ;
637}
638
639
640double getTimeSec()
641{
642 struct timeval t;
643 gettimeofday(&t, NULL);
644 return (t.tv_sec + t.tv_usec * 1.e-6);
645}
646
647void reduceHdrTrl(unsigned int* data, unsigned int& event_nwords)
648{
649
650 // TO CHECK LATER unsigned int event_size = data[ 8 ] ;
651 unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
652 if (event_length > 0x100000) {
653 pthread_mutex_lock(&(mtx_sender_log));
654 printf("[FATAL] Too large event size. : 0x%.8x : %u words. Exiting...\n",
655 data[ Belle2::RawHeader_latest::POS_NWORDS ],
656 event_length);
657 printEventData(data, (event_length & 0xfffff));
658 pthread_mutex_unlock(&(mtx_sender_log));
659 exit(1);
660 }
661
662 unsigned int* temp_data = new unsigned int[event_length];
663 memset(temp_data, 0, event_length * sizeof(unsigned int));
664
665 if (data[ Belle2::RawHeader_latest::POS_VERSION_HDRNWORDS ] & 0x00008000) {
666 // Remove non-reduced flag
667 data[ Belle2::RawHeader_latest::POS_VERSION_HDRNWORDS ] =
668 data[ Belle2::RawHeader_latest::POS_VERSION_HDRNWORDS ] & 0xffff7fff;
669 // Remove error-flag
670 data[ Belle2::RawHeader_latest::POS_TRUNC_MASK_DATATYPE ] = 0;
671 } else {
672 pthread_mutex_lock(&(mtx_sender_log));
673 printf("[FATAL] reduceHdrTrl() must not be used for already reduced-event. 7f7f word = %.8x . Exiting...\n",
674 data[ Belle2::RawHeader_latest::POS_VERSION_HDRNWORDS ]);
675 fflush(stdout);
676 pthread_mutex_unlock(&(mtx_sender_log));
677 exit(1);
678 }
679
680 // Copy header before the position table
681 unsigned int cur_pos = 0;
682 unsigned int dst_cur_pos = 0;
683 memcpy(temp_data + dst_cur_pos, data + cur_pos,
684 Belle2::RawHeader_latest::POS_CH_POS_TABLE * sizeof(unsigned int));
685
686 cur_pos = Belle2::RawHeader_latest::POS_CH_POS_TABLE;
687 dst_cur_pos = Belle2::RawHeader_latest::POS_CH_POS_TABLE;
688
689
690 // Check eror flag in ROB header
691 int red_linksize = 0;
692
693 temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE ] = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE ];
694 for (int i = 0; i < MAX_PCIE40_CH ; i++) {
695
696 int linksize = 0;
697 if (i < MAX_PCIE40_CH - 1) {
698 linksize = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + (i + 1) ]
699 - data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
700 } else {
701 linksize = event_length - (data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + (MAX_PCIE40_CH - 1) ] +
702 Belle2::RawTrailer_latest::RAWTRAILER_NWORDS);
703 }
704
705 if (linksize < 0) {
706 pthread_mutex_lock(&(mtx_sender_log));
707 printf("[FATAL] event size(= %d ) for ch %d is negative. Exiting...\n",
708 linksize, i);
709 printEventData(data, (event_length & 0xfffff));
710 fflush(stdout);
711 pthread_mutex_unlock(&(mtx_sender_log));
712 exit(1);
713 } else if (linksize == 0) {
714 // this channel is not used.
715 if (i < MAX_PCIE40_CH - 1) {
716 temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i + 1 ] =
717 temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
718 }
719 continue;
720 } else {
721 red_linksize = linksize
722 - (Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER
723 + Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER
724 - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER
725 - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER)
726 - (Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER
727 + Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER
728 - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER
729 - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER);
730
731 if (i < MAX_PCIE40_CH - 1) {
732 temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i + 1 ] =
733 temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] + red_linksize;
734 }
735 }
736
737 // Set position of data
738 dst_cur_pos = temp_data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
739 cur_pos = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
740
741 // Copy data(B2LHSLB header)
742 temp_data[ dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::POS_B2LHSLB_MAGIC]
743 = data[ cur_pos + Belle2::PreRawCOPPERFormat_latest::POS_MAGIC_B2LHSLB ];
744 dst_cur_pos = dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER;
745 cur_pos = cur_pos + Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER;
746
747
748 // Copy data(B2LFEE header)
749 temp_data[ dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::POS_B2L_CTIME ]
750 = data[ cur_pos + Belle2::PreRawCOPPERFormat_latest::POS_B2L_CTIME ];
751 dst_cur_pos = dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER;
752 cur_pos = cur_pos + Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER;
753
754 // Copy data( Detector data )
755 int numwords_det_buffer = red_linksize
756 - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER
757 - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER
758 - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER
759 - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER;
760
761 memcpy(temp_data + dst_cur_pos, data + cur_pos,
762 numwords_det_buffer * sizeof(unsigned int));
763 dst_cur_pos = dst_cur_pos + numwords_det_buffer;
764 cur_pos = cur_pos + numwords_det_buffer;
765
766 // Copy data( B2L FEE trailer )
767 temp_data[ dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::POS_B2LFEE_ERRCNT_CRC16 ]
768 = data[ cur_pos + Belle2::PreRawCOPPERFormat_latest::POS_CHKSUM_B2LFEE ];
769 dst_cur_pos += Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER;
770 cur_pos += Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_TRAILER;
771
772 // Copy data( B2L HSLB trailer )
773 temp_data[ dst_cur_pos + Belle2::PostRawCOPPERFormat_latest::POS_B2LHSLB_TRL_MAGIC ]
774 = data[ cur_pos + Belle2::PreRawCOPPERFormat_latest::POS_CHKSUM_B2LHSLB ];
775 dst_cur_pos += Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER;
776 cur_pos += Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER;
777 }
778
779
780 // Copy RawCOPPER trailer
781 temp_data[ dst_cur_pos + Belle2::RawTrailer_latest::POS_ERROR_BIT_CH1 ] =
782 data[ cur_pos + Belle2::RawTrailer_latest::POS_ERROR_BIT_CH1 ];
783 temp_data[ dst_cur_pos + Belle2::RawTrailer_latest::POS_ERROR_CH2 ] =
784 data[ cur_pos + Belle2::RawTrailer_latest::POS_ERROR_CH2 ];
785 temp_data[ dst_cur_pos + Belle2::RawTrailer_latest::POS_CHKSUM ] =
786 data[ cur_pos + Belle2::RawTrailer_latest::POS_CHKSUM ];
787 temp_data[ dst_cur_pos + Belle2::RawTrailer_latest::POS_TERM_WORD ] =
788 data[ cur_pos + Belle2::RawTrailer_latest::POS_TERM_WORD ];
789
790 dst_cur_pos += Belle2::RawTrailer_latest::RAWTRAILER_NWORDS;
791 cur_pos += Belle2::RawTrailer_latest::RAWTRAILER_NWORDS;
792
793 if (dst_cur_pos > cur_pos) {
794 pthread_mutex_lock(&(mtx_sender_log));
795 printf("[FATAL] reduced data-size ( %u words ) in reduceHdrTrl() is larger than the original size ( %u words). Exiting...\n",
796 dst_cur_pos, cur_pos);
797 fflush(stdout);
798 pthread_mutex_unlock(&(mtx_sender_log));
799 exit(1);
800 }
801
802 event_nwords = dst_cur_pos;
803 temp_data[ Belle2::RawHeader_latest::POS_NWORDS ] = event_nwords;
804
805 memset(data, 0, event_nwords * sizeof(unsigned int));
806 memcpy(data, temp_data, dst_cur_pos * sizeof(unsigned int));
807
808
809 delete temp_data;
810
811 return ;
812}
813
814
815int checkEventData(int sender_id, unsigned int* data, unsigned int event_nwords, unsigned int& exprun,
816 unsigned int& evtnum, unsigned int node_id, std::vector< int > valid_ch)
817{
818 int expected_number_of_links = valid_ch.size() ;
819 int reduced_flag = 1; // 0 : not-reduced(error event) 1: reduced
820
821 // For error message
822 unsigned int new_exprun = data[ Belle2::RawHeader_latest::POS_EXP_RUN_NO ] ;
823 unsigned int new_evtnum = data[ Belle2::RawHeader_latest::POS_EVE_NO ] ;
824
825 // TO CHECK LATER unsigned int event_size = data[ 8 ] ;
826 //
827 // Check if event length is not too long or zero.
828 //
829 unsigned int event_length = data[ EVENT_LEN_POS ];
830 if (event_length > 0x100000) {
831 pthread_mutex_lock(&(mtx_sender_log));
832 printf("[FATAL] thread %d : %s : ERROR_EVENT : Too large event size. : 0x%.8x : %u words. : exp %d run %d sub %d : Exiting...\n",
833 sender_id, hostnamebuf,
834 data[ EVENT_LEN_POS ], data[ EVENT_LEN_POS ],
835 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
836 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
837 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
838 );
839
840 printEventData(data, (event_length & 0xfffff), sender_id);
841 pthread_mutex_unlock(&(mtx_sender_log));
842 exit(1);
843 } else if (event_length == 0) {
844 pthread_mutex_lock(&(mtx_sender_log));
845 printf("[FATAL] thread %d : %s : ERROR_EVENT : Specified event size is zero. : 0x%.8x : %u words. : exp %d run %d sub %d : Exiting...\n",
846 sender_id, hostnamebuf,
847 data[ EVENT_LEN_POS ], event_length,
848 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
849 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
850 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
851 );
852 printEventData(data, 24, sender_id);
853 pthread_mutex_unlock(&(mtx_sender_log));
854 exit(1);
855 }
856
857 //
858 // Check the 7f7f magic word
859 //
860 if ((data[ MAGIC_7F7F_POS ] & 0xFFFF0000) != 0x7F7F0000) {
861 char err_buf[500] = {0};
862 pthread_mutex_lock(&(mtx_sender_log));
863 n_messages[ 7 ] = n_messages[ 7 ] + 1 ;
864 if (n_messages[ 7 ] < max_number_of_messages) {
865 sprintf(err_buf,
866 "[FATAL] thread %d : %s : ERROR_EVENT : Invalid Magic word in ReadOut Board header( 0x%.8x ) : It must be 0x7f7f???? : exp %d run %d sub %d",
867 sender_id, hostnamebuf, data[ MAGIC_7F7F_POS ],
868 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
869 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
870 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
871 ) ;
872 printf("%s\n", err_buf); fflush(stdout);
873 printEventData(data, event_length, sender_id);
874 }
875 err_bad_7f7f[sender_id]++;
876 pthread_mutex_unlock(&(mtx_sender_log));
877#ifndef NO_ERROR_STOP
878 exit(1);
879#endif
880 }
881
882 //
883 // Store nodeID
884 //
885 data[ NODEID_POS ] = node_id;
886
887 //
888 // Check if non data-reduction bit was set or not.
889 //
890 int ffaa_pos = 0, ff55_pos_from_end = 0;
891 if (data[ MAGIC_7F7F_POS ] & 0x00008000) {
892 // not-reduced
893 reduced_flag = 0;
894 ffaa_pos = Belle2::PreRawCOPPERFormat_latest::POS_MAGIC_B2LHSLB;
895 ff55_pos_from_end = - Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER +
896 Belle2::PreRawCOPPERFormat_latest::POS_CHKSUM_B2LHSLB;
897
898 // printf("[WARNING] thread %d : Error was detected by data-check core in PCIe40 FPGA. : exp %d run %d sub %d\n",
899 // sender_id, data[ MAGIC_7F7F_POS ],
900 // (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
901 // (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
902 // (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
903 // );
904
905 // if (data[ ERR_POS ] == 0) {
906 // pthread_mutex_lock(&(mtx_sender_log));
907 // printf("[FATAL] thread %d : Data error was deteced by PCIe40 FPGA. Header %.8x, Errorbit %.8x\n", sender_id, data[ MAGIC_7F7F_POS ],
908 // data[ ERR_POS ]);
909 // printEventData(data, event_length, sender_id);
910 // pthread_mutex_unlock(&(mtx_sender_log));
911 // #ifndef NO_ERROR_STOP
912 // exit(1);
913 // #endif
914 // }
915 } else {
916 // reduced
917 reduced_flag = 1;
918 ffaa_pos = Belle2::PostRawCOPPERFormat_latest::POS_B2LHSLB_MAGIC;
919 ff55_pos_from_end = - Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_TRAILER +
920 Belle2::PostRawCOPPERFormat_latest::POS_B2LHSLB_TRL_MAGIC;
921 if (data[ ERR_POS ] != 0) {
922 pthread_mutex_lock(&(mtx_sender_log));
923 printf("[FATAL] thread %d : %s : Inconsistency between header(no error found by FPGA) %.8x and errorbit %.8x (error-bit is non-zero) : exp %d run %d sub %d\n",
924 sender_id, hostnamebuf, data[ MAGIC_7F7F_POS ], data[ ERR_POS ],
925 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
926 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
927 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
928 );
929 printEventData(data, event_length, sender_id);
930 pthread_mutex_unlock(&(mtx_sender_log));
931#ifndef NO_ERROR_STOP
932 exit(1);
933#endif
934 }
935 }
936
937 //
938 // Check event # incrementation
939 //
940 if (evtnum + NUM_SENDER_THREADS != data[EVENUM_POS]) {
941 if (exprun == data[RUNNO_POS]
942 && exprun != 0) { // After a run-change or if this is the 1st event, event incrementation is not checked.
943 printEventNumberError(data, evtnum, exprun, NUM_SENDER_THREADS, sender_id);
944#ifndef NO_ERROR_STOP
945 exit(1);
946#endif
947 }
948 }
949
950
951 //
952 // Check exprun #
953 //
954 if (exprun == 0) { // default value of exprun
955 exprun = data[RUNNO_POS];
956 } else {
957 if (exprun != data[RUNNO_POS]) {
958 if (new_evtnum >= NUM_SENDER_THREADS) {
959 pthread_mutex_lock(&(mtx_sender_log));
960 n_messages[ 9 ] = n_messages[ 9 ] + 1 ;
961 if (n_messages[ 9 ] < max_number_of_messages) {
962 printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Bad exprun(now %.8x prev. %.8x) : exp %d run %d sub %d : Exiting...\n",
963 sender_id, hostnamebuf, get1stChannel(data),
964 exprun, data[RUNNO_POS],
965 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
966 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
967 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
968 ) ;
969 printEventData(data, event_length, sender_id);
970 }
971 err_bad_runnum[sender_id]++;
972 pthread_mutex_unlock(&(mtx_sender_log));
973#ifndef NO_ERROR_STOP
974 exit(1);
975#endif
976 } else {
977 if (sender_id == 0) {
978 printf("[DEBUG] thread %d : Run number was changed. cur exprun %.8x prev. exprun %.8x cur eve %.8x : exp %d run %d sub %d\n",
979 sender_id, data[RUNNO_POS], exprun, new_evtnum,
980 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
981 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
982 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
983 );
984 }
985 //
986 // A new run was started.
987 //
988 exprun = data[RUNNO_POS];
989 //
990 // Initialize error counter when run # is changed.
991 //
992 total_crc_good[sender_id] = 0;
993 total_crc_errors[sender_id] = 0;
994 err_flag_cnt[sender_id] = 0;
995 cur_evtnum[sender_id] = 0;
996 err_not_reduced[sender_id] = 0;
997 err_bad_7f7f[sender_id] = 0;
998 err_bad_runnum[sender_id] = 0;
999 err_bad_linknum[sender_id] = 0;
1000 err_bad_evenum[sender_id] = 0;
1001 err_bad_ffaa[sender_id] = 0;
1002 err_bad_ff55[sender_id] = 0;
1003 err_bad_linksize[sender_id] = 0;
1004 err_link_eve_jump[sender_id] = 0;
1005 }
1006 }
1007 }
1008
1009 //
1010 // Checking each channel's header
1011 //
1012 unsigned int ctime = data[ Belle2::RawHeader_latest::POS_TTCTIME_TRGTYPE ] ;
1013 unsigned int utime = data[ Belle2::RawHeader_latest::POS_TTUTIME ] ;
1014
1015 unsigned int crc_init = 0xFFFF ;
1016 unsigned int f_crc[ 4 ] = { ctime, new_evtnum, utime, new_exprun } ;
1017 unsigned int first_crc = 0;
1018
1019 // find number of links
1020 unsigned int cur_pos = 0 ;
1021 int non_crc_counts = 0;
1022 // Check eror flag in ROB header
1023
1024 unsigned int first_b2lctime = 0;
1025 int first_b2lctime_flag = 0;
1026
1027 if (reduced_flag == 1) {
1028 first_crc = get_crc(f_crc, 4, crc_init) ;
1029 non_crc_counts = NON_CRC_COUNTS_REDUCED;
1030 } else {
1031 err_flag_cnt[sender_id]++;
1032 // printf("ERROR flag : Printing a whole event... %u\n", new_evtnum);
1033 // printEventData(data, event_length);
1034 first_crc = crc_init;
1035 non_crc_counts = NON_CRC_COUNTS_NOTREDUCED;
1036 }
1037
1038 int first_eve_flag = 0;
1039 int link_cnt = 0;
1040
1041 //
1042 // Loop over input channels
1043 //
1044 for (int i = 0; i < MAX_PCIE40_CH; i++) {
1045 if (i == 0) first_b2lctime_flag = 0;
1046
1047 int linksize = 0;
1048 if (i < MAX_PCIE40_CH - 1) {
1049 linksize = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + (i + 1) ]
1050 - data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ];
1051 } else {
1052 linksize = event_length - (data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + (MAX_PCIE40_CH - 1) ] +
1053 Belle2::RawTrailer_latest::RAWTRAILER_NWORDS);
1054 }
1055 if (linksize <= 0) continue;
1056 cur_pos = data[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] + OFFSET_HDR;
1057
1058 //
1059 // compare valid ch with register value
1060 //
1061 if (valid_ch[link_cnt] != i) {
1062 pthread_mutex_lock(&(mtx_sender_log));
1063 n_messages[ 11 ] = n_messages[ 11 ] + 1 ;
1064 if (n_messages[ 11 ] < max_number_of_messages) {
1065 printf("[FATAL] thread %d : %s ch=%d or %d : ERROR_EVENT : HSLB or PCIe40 channel found in data is ch %d but the next channel must be ch %d according to masking register info. of PCIe40. Please check the status of channel masking. : exp %d run %d sub %d\n",
1066 sender_id,
1067 hostnamebuf, i, valid_ch[link_cnt],
1068 i, valid_ch[link_cnt],
1069 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1070 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1071 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
1072 );
1073 printEventData(data, event_length, sender_id);
1074 }
1075 err_bad_linknum[sender_id]++;
1076 pthread_mutex_unlock(&(mtx_sender_log));
1077#ifndef NO_ERROR_STOP
1078 exit(1);
1079#endif
1080 }
1081
1082 //
1083 // Check FFAA value
1084 //
1085 if ((data[ cur_pos + ffaa_pos ] & 0xFFFF0000) != 0xFFAA0000) {
1086 pthread_mutex_lock(&(mtx_sender_log));
1087 n_messages[ 12 ] = n_messages[ 12 ] + 1 ;
1088 if (n_messages[ 12 ] < max_number_of_messages) {
1089 char err_buf[500];
1090 sprintf(err_buf,
1091 "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : HSLB or PCIe40 header magic word(0xffaa) is invalid. header %.8x : exp %d run %d sub %d : %s %s %d\n",
1092 sender_id,
1093 hostnamebuf, i,
1094 data[ cur_pos + ffaa_pos ],
1095 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1096 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1097 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1098 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1099 printf("%s\n", err_buf); fflush(stdout);
1100 printLine(data, cur_pos + ffaa_pos);
1101 printEventData(data, event_length, sender_id);
1102 }
1103 err_bad_ffaa[sender_id]++;
1104 pthread_mutex_unlock(&(mtx_sender_log));
1105#ifndef NO_ERROR_STOP
1106 exit(1);
1107#endif
1108 }
1109
1110 //
1111 // b2link time check ( Only thread == 0 )
1112 //
1113 if (new_evtnum % 1000000 == 1000) {
1114 if (reduced_flag == 1) {
1115 time_t timer;
1116 struct tm* t_st;
1117 time(&timer);
1118 t_st = localtime(&timer);
1119 char timeStr[100];
1120 std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S\n", t_st);
1121
1122 if (first_b2lctime_flag == 0) {
1123 first_b2lctime = data[ cur_pos +
1124 Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1125 Belle2::PostRawCOPPERFormat_latest::POS_B2L_CTIME ];
1126 first_b2lctime_flag = 1;
1127 }
1128 pthread_mutex_lock(&(mtx_sender_log));
1129 printf("[DEBUG] thread %d : eve %u ch %3d B2Lctime 0x%.8x diff %.2lf [us] : exp %d run %d sub %d : %s",
1130 sender_id, new_evtnum, i,
1131 data[ cur_pos +
1132 Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1133 Belle2::PostRawCOPPERFormat_latest::POS_B2L_CTIME ],
1134 ((int)(data[ cur_pos +
1135 Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1136 Belle2::PostRawCOPPERFormat_latest::POS_B2L_CTIME ]
1137 - first_b2lctime)) / 127.22,
1138 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1139 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1140 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1141 timeStr);
1142 pthread_mutex_unlock(&(mtx_sender_log));
1143 }
1144 }
1145
1146 // event # jump
1147 if (first_eve_flag == 0) {
1148 first_eve_flag = 1;
1149 }
1150
1151 //
1152 // Check event number in ffaa header
1153 //
1154 unsigned int eve_link_8bits = data[ cur_pos + ffaa_pos ] & 0x000000ff;
1155 if ((new_evtnum & 0x000000FF) != eve_link_8bits) {
1156 pthread_mutex_lock(&(mtx_sender_log));
1157 err_link_eve_jump[sender_id]++;
1158 if (err_link_eve_jump[sender_id] < max_number_of_messages) {
1159 char err_buf[500] = {0};
1160 sprintf(err_buf,
1161 "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : Invalid event_number (= lower 8bits in ffaa header -> 0x%.2x). Exiting...: eve 0x%.8x ffaa header 0x%.8x : exp %d run %d sub %d : %s %s %d",
1162 sender_id,
1163 hostnamebuf, i,
1164 data[ cur_pos + ffaa_pos ] & 0xff, new_evtnum, data[ cur_pos + ffaa_pos ],
1165 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1166 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1167 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1168 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1169 printf("%s\n", err_buf); fflush(stdout);
1170 printEventData(data, event_length, sender_id);
1171 }
1172 pthread_mutex_unlock(&(mtx_sender_log));
1173#ifndef NO_ERROR_STOP
1174 exit(1);
1175#endif
1176 }
1177
1178 //
1179 // Check channel number in ffaa header
1180 //
1181 unsigned int ch_ffaa = (data[ cur_pos + ffaa_pos ] >> 8) & 0x000000ff;
1182 if ((unsigned int)i != ch_ffaa) {
1183 pthread_mutex_lock(&(mtx_sender_log));
1184 printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : HSLB or PCIe40 channel-number is differnt. It should be ch %d in the channel table in the ROB header buf ffaa header info says ch is %u (%.8x). : exp %d run %d sub %d : %s %s %d\n",
1185 sender_id, hostnamebuf, i,
1186 i, (data[ cur_pos + ffaa_pos ] >> 8) & 0xff,
1187 data[ cur_pos + ffaa_pos ],
1188 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1189 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1190 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1191 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1192 printEventData(data, event_length, sender_id);
1193 pthread_mutex_unlock(&(mtx_sender_log));
1194#ifndef NO_ERROR_STOP
1195 exit(1);
1196#endif
1197 }
1198
1199
1200#ifdef SPLIT_ECL_ECLTRG
1201 //
1202 // Check ECLTRG FEE is connected to a proper channel
1203 //
1204 unsigned int ecl_ecltrg_1stword = 0;
1205 if (reduced_flag == 0) {
1206 ecl_ecltrg_1stword = data[ cur_pos + ffaa_pos +
1207 Belle2::PreRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1208 Belle2::PreRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER ];
1209 } else {
1210 ecl_ecltrg_1stword = data[ cur_pos + ffaa_pos +
1211 Belle2::PostRawCOPPERFormat_latest::SIZE_B2LHSLB_HEADER +
1212 Belle2::PostRawCOPPERFormat_latest::SIZE_B2LFEE_HEADER ];
1213 }
1214
1215 if (((ecl_ecltrg_1stword & 0xffff0000) >> 16) == 0) {
1216 // ECL data
1217 for (int j = 0; j < splitted_ch.size(); j++) {
1218 if (splitted_ch[j] == i) {
1219 pthread_mutex_lock(&(mtx_sender_log));
1220 printf("[FATAL] thread %d : %s ch=%d : ECL data(1st word = %.8x , eve = %.8x ) are detected in ECLTRG channel. Maybe, fiber connection mismatch. Exiting... : exp %d run %d sub %d : %s %s %d\n",
1221 sender_id,
1222 hostnamebuf, i,
1223 ecl_ecltrg_1stword,
1224 new_evtnum,
1225 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1226 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1227 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1228 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1229 printEventData(data, event_length, sender_id);
1230 pthread_mutex_unlock(&(mtx_sender_log));
1231 exit(1);
1232 }
1233 }
1234 } else {
1235 // ECLTRG data
1236 int ecltrg_flag = 0;
1237 for (int j = 0; j < splitted_ch.size(); j++) {
1238 if (splitted_ch[j] == i) {
1239 ecltrg_flag = 1;
1240 break;
1241 }
1242 }
1243
1244 if (ecltrg_flag == 0) {
1245 pthread_mutex_lock(&(mtx_sender_log));
1246 printf("[FATAL] thread %d : %s ch=%d : ECLTRG data(1st word = %.8x , eve = %.8x ) are detected in ECL channel. Maybe, fiber connection mismatch. Exiting... : exp %d run %d sub %d : %s %s %d\n",
1247 sender_id,
1248 hostnamebuf, i,
1249 ecl_ecltrg_1stword,
1250 new_evtnum,
1251 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1252 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1253 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1254 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1255 printEventData(data, event_length, sender_id);
1256 pthread_mutex_unlock(&(mtx_sender_log));
1257 exit(1);
1258 }
1259 }
1260#endif // SPLIT_ECL_ECLTRG
1261
1262
1263 //
1264 // Check if the current position exceeds the event end
1265 //
1266 if (cur_pos + linksize > event_nwords - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS) {
1267 pthread_mutex_lock(&(mtx_sender_log));
1268 n_messages[ 13 ] = n_messages[ 13 ] + 1 ;
1269 if (n_messages[ 13 ] < max_number_of_messages) {
1270 printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : The end position ( %u words ) of this channel data exceeds event size( %u words ). Exiting... : exp %d run %d sub %d : %s %s %d\n",
1271 sender_id,
1272 hostnamebuf, i,
1273 (cur_pos + linksize), event_nwords,
1274 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1275 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1276 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1277 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1278
1279 }
1280 printEventData(data, event_length, sender_id);
1281 err_bad_linksize[sender_id]++;
1282 pthread_mutex_unlock(&(mtx_sender_log));
1283#ifndef NO_ERROR_STOP
1284 exit(1);
1285#endif
1286 }
1287
1288 //
1289 // Check FF55 value
1290 //
1291 if (((data[ cur_pos + linksize + ff55_pos_from_end ]) & 0xFFFF0000) != 0xFF550000) {
1292 pthread_mutex_lock(&(mtx_sender_log));
1293 n_messages[ 14 ] = n_messages[ 14 ] + 1 ;
1294 if (n_messages[ 14 ] < max_number_of_messages) {
1295 char err_buf[500];
1296 sprintf(err_buf,
1297 "[FATAL] thread %d : %s ch=%d : ERROR_EVENT : HSLB or PCIe40 trailer magic word(0xff55) is invalid. foooter %.8x (pos.=0x%.x) : exp %d run %d sub %d : %s %s %d",
1298 sender_id,
1299 hostnamebuf, i,
1300 data[ cur_pos + linksize + ff55_pos_from_end ], cur_pos + linksize + ff55_pos_from_end,
1301 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1302 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1303 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1304 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1305 printf("%s\n", err_buf); fflush(stdout);
1306 printEventData(data, event_length + 16, sender_id);
1307 }
1308 err_bad_ff55[sender_id]++;
1309 pthread_mutex_unlock(&(mtx_sender_log));
1310#ifndef NO_ERROR_STOP
1311 exit(1);
1312#endif
1313 }
1314
1315
1316
1317 //
1318 // CRC check
1319 //
1320 unsigned int crc_data = data[ cur_pos + linksize - 2 ] & 0xFFFF ;
1321 int size = linksize - non_crc_counts;
1322 unsigned int value = crc_data;
1323 unsigned int* data_for_crc = data + cur_pos + CRC_START_POS;
1324#ifdef CRC_CHECK
1325 if (get_crc(data_for_crc, size, first_crc) != value) {
1326 pthread_mutex_lock(&(mtx_sender_log));
1327 // Currently, zero-torellance for a CRC error.
1328 // if (crc_err_ch[sender_id][i] == 0) {
1329 printf("[FATAL] thread %d : %s ch=%d : ERROR_EVENT : PRE CRC16 error or POST B2link event CRC16 error. data(%x) calc(%x) : eve %u exp %d run %d sub %d : %s %s %d\n",
1330 sender_id,
1331 hostnamebuf, i,
1332 value, get_crc(data_for_crc, size, first_crc),
1333 new_evtnum,
1334 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1335 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1336 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1337 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1338 printEventData(data, event_length, sender_id);
1339 // }
1340
1341 crc_err_ch[sender_id][i]++;
1342 total_crc_errors[sender_id]++;
1343 pthread_mutex_unlock(&(mtx_sender_log));
1344#ifndef NO_ERROR_STOP
1345 exit(1);
1346#endif
1347 } else {
1348 total_crc_good[sender_id]++ ;
1349 }
1350#endif // CRC_CHECK
1351
1352
1353 //
1354 // Monitoring CRC check status
1355 //
1356 if (new_evtnum % 1000000 == 0) {
1357 // if (total_crc_good[sdr_id] % (1000000 + sdr_id) == 0) {
1358 pthread_mutex_lock(&(mtx_sender_log));
1359 printf("[DEBUG] thread %d : CRC Good calc %.4X data %.4X eve %u ch %d crcOK %u crcNG %d errflag %u : exp %d run %d sub %d\n",
1360 sender_id,
1361 get_crc(data_for_crc, size, first_crc),
1362 value, new_evtnum, i, total_crc_good[sender_id], total_crc_errors[sender_id], err_flag_cnt[sender_id],
1363 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1364 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1365 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK)
1366 ) ;
1367 int temp_err_cnt = 0;
1368
1369 for (int j = 0; j < MAX_PCIE40_CH; j++) {
1370 if (crc_err_ch[sender_id][j] > 0) {
1371 if (temp_err_cnt == 0) {
1372 printf("[DEBUG] thread %d : crc_err_cnt : ", sender_id);
1373 temp_err_cnt = 1;
1374 }
1375 printf("ch %d %u : ", j, crc_err_ch[sender_id][j]);
1376 }
1377 }
1378 if (temp_err_cnt != 0) {
1379 printf("\n");
1380 }
1381 fflush(stdout);
1382 pthread_mutex_unlock(&(mtx_sender_log));
1383 }
1384
1385 //
1386 // Check the end of the event
1387 //
1388 link_cnt++;
1389 cur_pos = cur_pos + linksize ;
1390 if (((data[ cur_pos ] & 0xFFFF0000) == 0x7FFF0000)) break ;
1391
1392 }
1393
1394 //
1395 // Check if the current position exceeds the event end
1396 //
1397 if (cur_pos != event_nwords - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS) {
1398 pthread_mutex_lock(&(mtx_sender_log));
1399 printf("[FATAL] thread %d : %s : ERROR_EVENT : The end position of channel data( %u-th word ) does not coincide with the start of RawTrailer( %d-th word ). Exiting... : exp %d run %d sub %d : %s %s %d\n",
1400 sender_id,
1401 hostnamebuf,
1402 cur_pos, event_nwords - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS,
1403 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1404 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1405 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1406 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1407 printEventData(data, event_length, sender_id);
1408 pthread_mutex_unlock(&(mtx_sender_log));
1409#ifndef NO_ERROR_STOP
1410 exit(1);
1411#endif
1412 }
1413
1414
1415 //
1416 // Check the consistency of number of input links
1417 //
1418 if (link_cnt != expected_number_of_links) {
1419 pthread_mutex_lock(&(mtx_sender_log));
1420 printf("[FATAL] thread %d : %s : ERROR_EVENT : # of links(%d) in data is not the same as exptected(=%d). : Exiting... : exp %d run %d sub %d : %s %s %d\n",
1421 sender_id,
1422 hostnamebuf,
1423 link_cnt, expected_number_of_links,
1424 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1425 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1426 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1427 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1428
1429 printEventData(data, event_length, sender_id);
1430 pthread_mutex_unlock(&(mtx_sender_log));
1431#ifndef NO_ERROR_STOP
1432 exit(1);
1433#endif
1434 }
1435
1436
1437 //
1438 // Printing the 1st event
1439 //
1440 if (new_evtnum == 0) {
1441 pthread_mutex_lock(&(mtx_sender_log));
1442 printf("[DEBUG] thread %d : Printing the 1st event.\n", sender_id);
1443 printEventData(data, event_length, sender_id);
1444 pthread_mutex_unlock(&(mtx_sender_log));
1445 }
1446
1447
1448
1449 //
1450 // Check unreduced header consistency
1451 //
1452 int ret = DATACHECK_OK;
1453 if (reduced_flag == 0) {
1454 checkUtimeCtimeTRGType(data, sender_id);
1455 pthread_mutex_lock(&(mtx_sender_log));
1456 if (err_not_reduced[sender_id] < max_number_of_messages) {
1457 printf("[WARNING] thread %d : %s ch=%d : ERROR_EVENT : Error-flag was set by the data-check module in PCIe40 FPGA. : eve %u prev thr eve %u : exp %d run %d sub %d : %s %s %d\n",
1458 sender_id,
1459 hostnamebuf, -1, new_evtnum, evtnum,
1460 (new_exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
1461 (new_exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
1462 (new_exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
1463 __FILE__, __PRETTY_FUNCTION__, __LINE__);
1464 printEventData(data, event_length, sender_id);
1465 }
1466 err_not_reduced[sender_id]++;
1467 pthread_mutex_unlock(&(mtx_sender_log));
1468 // exit(1); // zero-torellance policy
1469 ret = DATACHECK_OK_BUT_ERRFLAG_IN_HDR;
1470 }
1471
1472 evtnum = data[EVENUM_POS];
1473 return ret;
1474}
1475
1476void checkEventGenerator(unsigned int* data, int i, unsigned int size)
1477{
1478 if (data == 0) {
1479 pthread_mutex_lock(&(mtx_sender_log));
1480 printf("No data\n") ;
1481 pthread_mutex_unlock(&(mtx_sender_log));
1482 return ;
1483 }
1484
1485 if (i != getEventNumber(data)) {
1486 pthread_mutex_lock(&(mtx_sender_log));
1487 printf("Event number mismatch %d %d\n",
1488 getEventNumber(data), i) ;
1489 pthread_mutex_unlock(&(mtx_sender_log));
1490 }
1491 // Check header
1492 // if ( ( data[7] != 0 ) || ( data[6] != 0 ) || ( data[5] != 0 ) || ( data[3] != 0 ) ) {
1493 if ((data[7] != 0) || (data[6] != 0)) {
1494 pthread_mutex_lock(&(mtx_sender_log));
1495 printf("Bad header 3 %.8x %.8x\n", data[7], data[6]) ;
1496 printHeader(data) ;
1497 pthread_mutex_unlock(&(mtx_sender_log));
1498 } else if ((data[ 0 ] & 0xFFFF) != size) {
1499 pthread_mutex_lock(&(mtx_sender_log));
1500 printf("Bad size %u %u\n", data[0] & 0xFFFF, size) ;
1501 printLine(data, EVENT_LEN_POS);
1502 pthread_mutex_unlock(&(mtx_sender_log));
1503 } else if (((data[ 2 ] & 0xFFFF0000) >> 16) != (size * 32)) {
1504 pthread_mutex_lock(&(mtx_sender_log));
1505 printf("Bad word size %u %u\n", (data[ 2 ] & 0xFFFF0000) >> 16, size * 32) ;
1506 printHeader(data) ;
1507 pthread_mutex_unlock(&(mtx_sender_log));
1508 } else if (((data[ 0 ] & 0xFFFF0000) != 0xEEEE0000) ||
1509 (data[ 1 ] != 0xAAAAEEEE) ||
1510 ((data[ 2 ] & 0xFFFF) != 0xAAAA)) {
1511 pthread_mutex_lock(&(mtx_sender_log));
1512 printf("Bad header 4\n") ;
1513 printHeader(data) ;
1514 printEventData(data, size);
1515 pthread_mutex_unlock(&(mtx_sender_log));
1516 }
1517 // Check trailer
1518 if (data[ 8 * (size - 1) ] != size) {
1519 pthread_mutex_lock(&(mtx_sender_log));
1520 printf("Bad size in trailer %.8x %.8x\n", data[8 * (size - 1)], size) ;
1521 printLine(data, 8 * (size - 1));
1522 pthread_mutex_unlock(&(mtx_sender_log));
1523 } else if ((data[ 8 * (size - 1) + 1 ] != 0) || (data[ 8 * (size - 1) + 2 ] != 0) ||
1524 (data[ 8 * (size - 1) + 3 ] != 0) || (data[ 8 * (size - 1) + 4 ] != 0) ||
1525 (data[ 8 * (size - 1) + 5 ] != 0) || (data[ 8 * (size - 1) + 6 ] != 0) ||
1526 (data[ 8 * (size - 1) + 7 ] != 0)) {
1527 pthread_mutex_lock(&(mtx_sender_log));
1528 printf("Bad trailer\n") ;
1529 printTrailer(&data[ 8 * (size - 1) ]) ;
1530 pthread_mutex_unlock(&(mtx_sender_log));
1531 }
1532 // Check data
1533 for (unsigned int j = 1 ; j < (size - 1) ; ++j) {
1534 if (data[ 8 * j ] != j) {
1535 pthread_mutex_lock(&(mtx_sender_log));
1536 printf("Bad data number %u %u\n", data[8 * j], j) ;
1537 pthread_mutex_unlock(&(mtx_sender_log));
1538 } else if (data[8 * j + 1] != 0) {
1539 pthread_mutex_lock(&(mtx_sender_log));
1540 printf("Bad data\n") ;
1541 printData(&data[8 * j]) ;
1542 pthread_mutex_unlock(&(mtx_sender_log));
1543 } else if ((data[8 * j + 2] != 0xFFFFFFFF) || (data[8 * j + 3] != 0xEEEEEEEE) ||
1544 (data[8 * j + 4] != 0xDDDDDDDD) || (data[8 * j + 5] != 0xCCCCCCCC) ||
1545 (data[8 * j + 6] != 0xBBBBBBBB) || (data[8 * j + 7] != 0xAAAAAAAA)) {
1546 pthread_mutex_lock(&(mtx_sender_log));
1547 printf("Bad data\n") ;
1548 printData(&data[8 * j]) ;
1549 pthread_mutex_unlock(&(mtx_sender_log));
1550 }
1551 }
1552}
1553
1554
1555unsigned short CalcCRC16LittleEndian(unsigned short crc16, const int buf[], int nwords)
1556{
1557
1558 if (nwords < 0) {
1559
1560 char err_buf[500];
1561 pthread_mutex_lock(&(mtx_sender_log));
1562 sprintf(err_buf, "nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
1563 nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
1564 printf("%s", err_buf); fflush(stdout);
1565 pthread_mutex_unlock(&(mtx_sender_log));
1566 string err_str = err_buf;
1567 throw (err_str);
1568 }
1569
1570 const unsigned short CRC16Table0x1021[ 256 ] = {
1571 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
1572 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
1573 0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
1574 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
1575 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
1576 0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
1577 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
1578 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
1579
1580 0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
1581 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
1582 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
1583 0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
1584 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
1585 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
1586 0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
1587 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
1588
1589 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
1590 0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
1591 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
1592 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
1593 0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
1594 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
1595 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
1596 0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
1597
1598 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
1599 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
1600 0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
1601 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
1602 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
1603 0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
1604 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
1605 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
1606 };
1607
1608 int cnt = 0, nints = 0;
1609 // printf("### %.8x %.4x\n", buf[ 0 ], crc16);
1610 while (nwords != 0) {
1611
1612 unsigned char temp_buf = *((unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
1613 crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
1614 // printf("%.2x %.4x\n", temp_buf, crc16);
1615 if ((cnt % 4) == 3) {
1616 nwords--;
1617 nints++;
1618 // printf("### %.8x\n", buf[ nints ] );
1619 }
1620
1621 cnt++;
1622 }
1623
1624
1625 return crc16;
1626
1627}
1628
1629int fillDataContents(int* buf, int nwords_per_fee, unsigned int node_id, int ncpr, int nhslb, int run)
1630{
1631 int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
1632 ncpr * (NW_RAW_HEADER +
1633 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
1634 + NW_RAW_TRAILER);
1635
1636 // Send Header
1637 int offset = 0;
1638 buf[ offset + 0 ] = nwords;
1639 buf[ offset + 1 ] = 6;
1640 buf[ offset + 2 ] = (1 << 16) | ncpr;
1641 unsigned int exp_run = run << 8;
1642 buf[ offset + 3 ] = exp_run;
1643 buf[ offset + 5 ] = node_id;
1644 offset += NW_SEND_HEADER;
1645
1646 for (int k = 0; k < ncpr; k++) {
1647 //
1648 // RawHeader
1649 //
1650 int cpr_nwords = NW_RAW_HEADER +
1651 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
1652 + NW_RAW_TRAILER;
1653 unsigned int ctime = CTIME_VAL;
1654 unsigned int utime = 0x98765432;
1655
1656 buf[ offset + 0 ] = cpr_nwords;
1657#ifdef DUMMY_REDUCED_DATA
1658 buf[ offset + 1 ] = 0x7f7f020c;
1659#else
1660 buf[ offset + 1 ] = 0x7f7f820c;
1661#endif
1662 buf[ offset + 2 ] = exp_run;
1663 printf("run_no %u\n", exp_run); fflush(stdout);
1664 buf[ offset + 4 ] = ctime;
1665 buf[ offset + 5 ] = utime;
1666 buf[ offset + 6 ] = node_id + k;
1667 buf[ offset + 7 ] = 0x34567890;
1668 offset += NW_RAW_HEADER;
1669
1670 for (int i = 0; i < nhslb ; i++) {
1671#ifdef DUMMY_REDUCED_DATA
1672 buf[ offset + 0 ] = nwords_per_fee + 3;
1673 buf[ offset + 1 ] = 0xffaa0000;
1674 buf[ offset + 2 ] = ctime;
1675#else
1676 buf[ offset + 0 ] = nwords_per_fee + 7;
1677 buf[ offset + 1 ] = 0xffaa0000;
1678 buf[ offset + 3 ] = ctime;
1679 buf[ offset + 4 ] = utime;
1680 buf[ offset + 5 ] = exp_run;
1681 buf[ offset + 6 ] = ctime;
1682#endif
1683 offset += NW_B2L_HEADER;
1684
1685 for (int j = offset; j < offset + nwords_per_fee; j++) {
1686 buf[ j ] = rand();
1687 }
1688 offset += nwords_per_fee;
1689
1690#ifdef DUMMY_REDUCED_DATA
1691 buf[ offset ] = 0;
1692 buf[ offset + 1 ] = 0xff550000;
1693#else
1694 buf[ offset ] = ctime;
1695 buf[ offset + 1 ] = 0;
1696 buf[ offset + 2 ] = 0xff550000;
1697#endif
1698
1699 offset += NW_B2L_TRAILER;
1700 }
1701 buf[ offset ] = 0x0; // error bits
1702 buf[ offset + 1 ] = 0x0; // error slots
1703 buf[ offset + 2 ] = 0x0; // XOR checksum
1704 buf[ offset + 3 ] = 0x7fff0006;
1705 offset += NW_RAW_TRAILER;
1706 }
1707
1708 // Send trailer
1709 buf[ offset ] = 0;
1710 buf[ offset + 1 ] = 0x7fff0000;
1711 offset += NW_SEND_TRAILER;
1712 return offset;
1713}
1714
1715void split_Ecltrg(int sender_id, unsigned int* data, std::vector< int > valid_ch,
1716 unsigned int* data_main, unsigned int* data_splitted,
1717 int& event_nwords_main, int& event_nwords_splitted,
1718 unsigned int splitted_node_id, std::vector< int > splitted_ch)
1719{
1720 unsigned int event_length = data[ Belle2::RawHeader_latest::POS_NWORDS ];
1721 // pthread_mutex_lock(&(mtx_sender_log));
1722 // printf("[DEBUG] Before splitting : sdrid %d. Exiting...\n",
1723 // sender_id);
1724 // printEventData(data, (event_length & 0xfffff));
1725 // pthread_mutex_unlock(&(mtx_sender_log));
1726 // Check event size
1727
1728 if (event_length > 0x100000) {
1729 pthread_mutex_lock(&(mtx_sender_log));
1730 printf("[FATAL] Too large event size. : sdrid %d : 0x%.8x : %u words. Exiting...\n", sender_id, data[ EVENT_LEN_POS ],
1731 data[ EVENT_LEN_POS ]);
1732 printEventData(data, (event_length & 0xfffff));
1733 pthread_mutex_unlock(&(mtx_sender_log));
1734 exit(1);
1735 } else if (event_length == 0) {
1736 pthread_mutex_lock(&(mtx_sender_log));
1737 printf("[FATAL] Specified event size is zero. : 0x%.8x : %u words. Exiting...\n",
1738 data[ EVENT_LEN_POS ], event_length);
1739 printEventData(data, 24);
1740 pthread_mutex_unlock(&(mtx_sender_log));
1741 exit(1);
1742 }
1743
1744 // Check magic word
1745 if ((data[ MAGIC_7F7F_POS ] & 0xFFFF0000) != 0x7F7F0000) {
1746 pthread_mutex_lock(&(mtx_sender_log));
1747 n_messages[ 7 ] = n_messages[ 7 ] + 1 ;
1748 if (n_messages[ 7 ] < max_number_of_messages) {
1749 printf("Bad code 7F7F ( 0x%.8x )\n", data[ MAGIC_7F7F_POS ]) ;
1750 // printLine(data, MAGIC_7F7F_POS);
1751 printEventData(data, event_length);
1752 }
1753 err_bad_7f7f[sender_id]++;
1754 pthread_mutex_unlock(&(mtx_sender_log));
1755#ifndef NO_ERROR_STOP
1756 exit(1);
1757#endif
1758 // return 1 ;
1759 }
1760
1761
1762 // Copy RawHeader
1763 memcpy(data_main, data, Belle2::RawHeader_latest::POS_CH_POS_TABLE * sizeof(unsigned int));
1764 memcpy(data_splitted, data, Belle2::RawHeader_latest::POS_CH_POS_TABLE * sizeof(unsigned int));
1765 data_splitted[ Belle2::RawHeader_latest::POS_NODE_ID ] = splitted_node_id;
1766
1767 int cur_pos = 0;
1768 int cur_ch_main = 0;
1769 int prev_ch_main = -1;
1770 int cur_ch_splitted = 0;
1771 int prev_ch_splitted = -1;
1772 int cur_pos_main = Belle2::RawHeader_latest::RAWHEADER_NWORDS;
1773 int cur_pos_splitted = Belle2::RawHeader_latest::RAWHEADER_NWORDS;
1774 int link_cnt = 0;
1775
1776 int cnt_main = 0;
1777 int cnt_splitted = 0;
1778
1779 for (int i = 0; i < MAX_PCIE40_CH; i++) {
1780 // Calculate linksize
1781 int linksize = 0;
1782 if (i < 47) {
1783 linksize = data[ POS_TABLE_POS + (i + 1) ] - data[ POS_TABLE_POS + i ];
1784 } else {
1785 linksize = event_length - (data[ POS_TABLE_POS + 47 ] + LEN_ROB_TRAILER);
1786 }
1787 if (linksize <= 0) continue;
1788 cur_pos = data[ POS_TABLE_POS + i ] + OFFSET_HDR;
1789
1790 // compare valid ch with register value
1791 if (valid_ch[link_cnt] != i) {
1792 pthread_mutex_lock(&(mtx_sender_log));
1793 n_messages[ 11 ] = n_messages[ 11 ] + 1 ;
1794 if (n_messages[ 11 ] < max_number_of_messages) {
1795 printf("[FATAL] A valid ch in data(=%d) is not equal to regeister value(%d) for masking\n", i, valid_ch[link_cnt]) ;
1796 printEventData(data, event_length);
1797 }
1798 err_bad_linknum[sender_id]++;
1799 pthread_mutex_unlock(&(mtx_sender_log));
1800#ifndef NO_ERROR_STOP
1801 exit(1);
1802#endif
1803 }
1804
1805 // Check main ch or splitted ch
1806 int splitted_ch_flag = 0;
1807 for (int j = 0; j < splitted_ch.size(); j++) {
1808 if (splitted_ch[j] == i) {
1809 splitted_ch_flag = 1;
1810 break;
1811 }
1812 }
1813
1814 // Filling pos-table
1815 if (splitted_ch_flag == 0) {
1816 data_main[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] = cur_pos_main;
1817 for (int j = prev_ch_main + 1; j < i; j++) {
1818 data_main[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + j ] = cur_pos_main;
1819 }
1820 memcpy(data_main + cur_pos_main, data + cur_pos, linksize * sizeof(unsigned int));
1821 cur_pos_main += linksize;
1822 prev_ch_main = i;
1823 cnt_main++;
1824 } else {
1825 data_splitted[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] = cur_pos_splitted;
1826 for (int j = prev_ch_splitted + 1; j < i; j++) {
1827 data_splitted[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + j ] = cur_pos_splitted;
1828 }
1829 memcpy(data_splitted + cur_pos_splitted, data + cur_pos, linksize * sizeof(unsigned int));
1830 cur_pos_splitted += linksize;
1831 prev_ch_splitted = i;
1832 cnt_splitted++;
1833 }
1834 link_cnt++;
1835 }
1836
1837 if (cnt_main == 0 || cnt_splitted == 0) {
1838 pthread_mutex_lock(&(mtx_sender_log));
1839 printf("[FATAL] No channels for ECL(# of used ch = %d) or ECLTRG(# of used ch = %d) data. Exiting...\n",
1840 cnt_main, cnt_splitted);
1841 pthread_mutex_unlock(&(mtx_sender_log));
1842 // exit(1);
1843 }
1844
1845 // Fill remaining position table
1846 for (int i = prev_ch_main + 1; i < MAX_PCIE40_CH; i++) {
1847 data_main[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] = cur_pos_main;
1848 }
1849 for (int i = prev_ch_splitted + 1; i < MAX_PCIE40_CH; i++) {
1850 data_splitted[ Belle2::RawHeader_latest::POS_CH_POS_TABLE + i ] = cur_pos_splitted;
1851 }
1852
1853 // Calcurate each event-length
1854 unsigned int eve_size_main = cur_pos_main + Belle2::RawTrailer_latest::RAWTRAILER_NWORDS;
1855 unsigned int eve_size_splitted = cur_pos_splitted + Belle2::RawTrailer_latest::RAWTRAILER_NWORDS;
1856 data_main[ Belle2::RawHeader_latest::POS_NWORDS ] = eve_size_main;
1857 data_splitted[ Belle2::RawHeader_latest::POS_NWORDS ] = eve_size_splitted;
1858 event_nwords_main = eve_size_main;
1859 event_nwords_splitted = eve_size_splitted;
1860
1861 // Copy RawTrailer (Currently 00000000 00000000 00000000 7fff0006. So, just copy the 4 words.)
1862 memcpy(data_main + cur_pos_main, data + event_length - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS,
1863 Belle2::RawTrailer_latest::RAWTRAILER_NWORDS * sizeof(unsigned int));
1864 memcpy(data_splitted + cur_pos_splitted, data + event_length - Belle2::RawTrailer_latest::RAWTRAILER_NWORDS,
1865 Belle2::RawTrailer_latest::RAWTRAILER_NWORDS * sizeof(unsigned int));
1866
1867 // Copy back to data buffer
1868 memcpy(data, data_main, eve_size_main * sizeof(unsigned int));
1869 memcpy(data + eve_size_main, data_splitted, eve_size_splitted * sizeof(unsigned int));
1870
1871 // pthread_mutex_lock(&(mtx_sender_log));
1872 // printf("[DEBUG]Splitted data sender %d\n",
1873 // sender_id);
1874 // printEventData(data, eve_size_main + eve_size_splitted);
1875 // printf("[DEBUG]main data sender %d\n",
1876 // sender_id);
1877 // printEventData(data_main, eve_size_main);
1878 // printf("[DEBUG]split data sender %d\n",
1879 // sender_id);
1880 // printEventData(data_splitted, eve_size_splitted);
1881 // pthread_mutex_unlock(&(mtx_sender_log));
1882
1883 return;
1884}
1885
1886
1887//int sender_id, int run_no, int nwords_per_fee, int ncpr, int nhslb, std::vector< int > valid_ch)
1888void* sender(void* arg)
1889{
1890 //
1891 // Get arguments
1892 //
1893 sender_argv* snd_arg = (sender_argv*)arg;
1894 int sender_id = snd_arg->sender_id;
1895 unsigned int node_id = snd_arg->node_id;
1896 vector<int> valid_ch = snd_arg->valid_ch;
1897
1898 //
1899 // data
1900 //
1901 int total_words = 0;
1902 unsigned int* buff = new unsigned int[MAX_EVENT_WORDS];
1903
1904#ifdef SPLIT_ECL_ECLTRG
1905 vector<int> valid_main_ch;
1906 vector<int> valid_splitted_ch;
1907 unsigned int* buff_main = new unsigned int[MAX_EVENT_WORDS];
1908 unsigned int* buff_splitted = new unsigned int[MAX_EVENT_WORDS];
1909
1910 int split_main_use = 0; // some unmasked channels for ECL
1911 int split_sub_use = 0; // some unmasked channels for ECLTRG
1912
1913 // Prepare valid_main table
1914 for (int k = 0; k < valid_ch.size(); k++) {
1915 int splitted_ch_flag = 0;
1916 for (int l = 0; l < splitted_ch.size(); l++) {
1917 if (splitted_ch[l] == valid_ch[k]) {
1918 splitted_ch_flag = 1;
1919 break;
1920 }
1921 }
1922 if (splitted_ch_flag == 0) {
1923 valid_main_ch.push_back(valid_ch[k]);
1924 split_main_use = 1;
1925 } else {
1926 valid_splitted_ch.push_back(valid_ch[k]);
1927 split_sub_use = 1;
1928 }
1929 }
1930
1931 if (split_main_use == 0 && split_sub_use == 0) {
1932 pthread_mutex_lock(&(mtx_sender_log));
1933 printf("[FATAL] thread %d : No channels are used for this PCIe40 board (ECL/ECLTRG) in %s. Please mask this readout PC with runcontrol GUI (or exclude sub-system if this is the only readout PC of the sub-system). Exiting..\n",
1934 sender_id, hostnamebuf);
1935 fflush(stdout);
1936 pthread_mutex_unlock(&(mtx_sender_log));
1937 exit(1);
1938 }
1939#endif // SPLIT_ECL_ECLTRG
1940
1941
1942 //
1943 // network connection
1944 //
1945 int port_to = 31000 + sender_id + 1;
1946
1947#ifndef NOT_SEND
1948 //
1949 // Bind and listen
1950 //
1951 int fd_listen;
1952 struct sockaddr_in sock_listen;
1953 sock_listen.sin_family = AF_INET;
1954 // sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
1955 sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
1956
1957 socklen_t addrlen = sizeof(sock_listen);
1958 sock_listen.sin_port = htons(port_to);
1959 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
1960
1961 int flags = 1;
1962 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)sizeof(flags));
1963 if (ret < 0) {
1964 perror("Failed to set REUSEADDR");
1965 }
1966
1967 if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
1968 pthread_mutex_lock(&(mtx_sender_log));
1969 printf("[FATAL] thread %d : %s : Failed to bind(%s). Maybe other programs have already occupied this port(%d). Exiting...\n",
1970 sender_id, hostnamebuf, strerror(errno),
1971 port_to); fflush(stdout);
1972 pthread_mutex_unlock(&(mtx_sender_log));
1973
1974 // Check the process occupying the port 3100?.
1975 FILE* fp;
1976 char buf[256];
1977 char cmdline[500];
1978 sprintf(cmdline, "/usr/sbin/ss -ap | grep %d", port_to);
1979 if ((fp = popen(cmdline, "r")) == NULL) {
1980 pthread_mutex_lock(&(mtx_sender_log));
1981 printf("[WARNING] thread %d : Failed to run %s\n", sender_id,
1982 cmdline);
1983 pthread_mutex_unlock(&(mtx_sender_log));
1984 }
1985
1986 while (fgets(buf, 256, fp) != NULL) {
1987 pthread_mutex_lock(&(mtx_sender_log));
1988 printf("[DEBUG] thread %d : Port %d is used by : %s\n", sender_id,
1989 port_to, buf); fflush(stdout);
1990 pthread_mutex_unlock(&(mtx_sender_log));
1991 }
1992 fclose(fp);
1993 exit(1);
1994 }
1995
1996 int val1 = 0;
1997 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)sizeof(val1));
1998 int backlog = 1;
1999 if (listen(fd_listen, backlog) < 0) {
2000 char err_buf[500];
2001 pthread_mutex_lock(&(mtx_sender_log));
2002 sprintf(err_buf, "[FATAL] thread %d : %s : Failed in listen(%s). Exting...",
2003 sender_id, hostnamebuf,
2004 strerror(errno));
2005 printf("%s\n", err_buf); fflush(stdout);
2006 pthread_mutex_unlock(&(mtx_sender_log));
2007 // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
2008 exit(-1);
2009 }
2010
2011 //
2012 // Accept
2013 //
2014 int fd_accept;
2015 struct sockaddr_in sock_accept;
2016 pthread_mutex_lock(&(mtx_sender_log));
2017 printf("[DEBUG] thread %d : Accepting... : port %d\n", sender_id,
2018 port_to);
2019 fflush(stdout);
2020 pthread_mutex_unlock(&(mtx_sender_log));
2021
2022 if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
2023 char err_buf[500];
2024 pthread_mutex_lock(&(mtx_sender_log));
2025 sprintf(err_buf, "[FATAL] thread %d : %s : Failed to accept(%s). Exiting...",
2026 sender_id, hostnamebuf,
2027 strerror(errno));
2028 printf("%s\n", err_buf); fflush(stdout);
2029 pthread_mutex_unlock(&(mtx_sender_log));
2030 // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
2031 exit(-1);
2032 } else {
2033 // B2INFO("Done.");
2034 pthread_mutex_lock(&(mtx_sender_log));
2035 printf("[INFO] thread %d : Connection(port %d) from eb0 was accepted\n", sender_id, port_to); fflush(stdout);
2036 pthread_mutex_unlock(&(mtx_sender_log));
2037
2038 // set timepout option
2039 struct timeval timeout;
2040 timeout.tv_sec = 1;
2041 timeout.tv_usec = 0;
2042 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)sizeof(timeout));
2043 if (ret < 0) {
2044 pthread_mutex_lock(&(mtx_sender_log));
2045 char err_buf[500];
2046 sprintf(err_buf, "[FATAL] thread %d : %s : Failed to set TIMEOUT. Exiting...", sender_id, hostnamebuf);
2047 printf("%s\n", err_buf); fflush(stdout);
2048 pthread_mutex_unlock(&(mtx_sender_log));
2049 exit(-1);
2050 }
2051 }
2052
2053 if (fd_listen) {
2054 close(fd_listen);
2055 }
2056#endif
2057
2058 double init_time = getTimeSec();
2059 double prev_time = init_time;
2060
2061 unsigned long long int cnt = 0;
2062 unsigned long long int prev_cnt = 0;
2063 unsigned long long int start_cnt = 3000;
2064
2065 unsigned int exprun = 0;
2066 unsigned int evtnum = 0;
2067
2068#ifndef USE_ZMQ
2069 int buffer_id = 0;
2070#endif
2071 unsigned int tot_event_nwords = 0;
2072 for (
2073#ifdef MAX_EVENT
2074 int j = 0; j < MAX_EVENT; j++
2075#else
2076 ;;
2077#endif
2078 ) {
2079
2080#ifdef USE_ZMQ
2081 // Copy data from ZMQ (experimental)
2082 //
2083 {
2084 zmq::message_t zevent;
2085 zmq_reader[sender_id]->recv(&zevent);
2086 memcpy(buff + NW_SEND_HEADER, zevent.data(), zevent.size());
2087 tot_event_nwords = zevent.size() / sizeof(unsigned int);
2088 }
2089#else
2090 // Copy data from buffer (orignal)
2091 //
2092 if (buffer_id == 0) {
2093 while (1) {
2094 if (buffer_filled[sender_id][0] == 1)break;
2095 usleep(1);
2096 }
2097 {
2098 pthread_mutex_lock(&(mtx1_ch[sender_id]));
2099 memcpy((buff + NW_SEND_HEADER), data_1[sender_id], copy_nwords[sender_id][0] * sizeof(unsigned int));
2100 tot_event_nwords = copy_nwords[sender_id][0];
2101 buffer_filled[sender_id][0] = 0;
2102 pthread_mutex_unlock(&(mtx1_ch[sender_id]));
2103 }
2104 } else {
2105
2106 while (1) {
2107 if (buffer_filled[sender_id][1] == 1)break;
2108 usleep(1);
2109 }
2110
2111 {
2112 pthread_mutex_lock(&(mtx2_ch[sender_id]));
2113 memcpy((buff + NW_SEND_HEADER), data_2[sender_id], copy_nwords[sender_id][1] * sizeof(unsigned int));
2114 tot_event_nwords = copy_nwords[sender_id][1];
2115 buffer_filled[sender_id][1] = 0;
2116 pthread_mutex_unlock(&(mtx2_ch[sender_id]));
2117 }
2118 }
2119#endif
2120
2121 //
2122 // Check data
2123 //
2124 if (buff == NULL) {
2125 pthread_mutex_lock(&(mtx_sender_log));
2126 printf("[FATAL] thread %d : %s : buffer in sender is NULL(= %p )\n", sender_id, hostnamebuf, buff); fflush(stdout);
2127 pthread_mutex_unlock(&(mtx_sender_log));
2128 exit(1);
2129 }
2130
2131#ifdef SPLIT_ECL_ECLTRG
2132 int event_nwords_main = 0, event_nwords_splitted = 0;
2133 if (split_main_use == 1 && split_sub_use == 1) {
2134 split_Ecltrg(sender_id, buff + NW_SEND_HEADER, valid_ch,
2135 buff_main, buff_splitted, event_nwords_main, event_nwords_splitted, ECLTRG_NODE_ID, splitted_ch);
2136 tot_event_nwords = event_nwords_main + event_nwords_splitted;
2137 } else if (split_main_use == 1 && split_sub_use == 0) {
2138 event_nwords_main = tot_event_nwords;
2139 } else if (split_main_use == 0 && split_sub_use == 1) {
2140 event_nwords_splitted = tot_event_nwords;
2141 } else {
2142 pthread_mutex_lock(&(mtx_sender_log));
2143 printf("[FATAL] thread %d : %s : No channels are used for this PCIe40 board (ECL/ECLTRG). Please mask this readout PC with runcontrol GUI (or exclude sub-system if this is the only readout PC of the sub-system). Exiting..\n",
2144 sender_id, hostnamebuf);
2145 fflush(stdout);
2146 pthread_mutex_unlock(&(mtx_sender_log));
2147 exit(1);
2148 }
2149#endif // SPLIT_ECL_ECLTRG
2150
2151 unsigned int prev_exprun = exprun;
2152 unsigned int prev_evtnum = evtnum;
2153
2154#ifdef SPLIT_ECL_ECLTRG
2155 for (int k = 0; k < NUM_SUB_EVE ; k++) {
2156#endif // SPLIT_ECL_ECLTRG
2157 unsigned int* eve_buff = NULL;
2158 unsigned int event_nwords = 0;
2159 int ret = 0;
2160
2161#ifdef SPLIT_ECL_ECLTRG
2162 if (k == 0) {
2163 if (split_main_use == 0) continue;
2164 event_nwords = event_nwords_main;
2165 eve_buff = buff + NW_SEND_HEADER;
2166 ret = checkEventData(sender_id, eve_buff, event_nwords_main,
2167 exprun, evtnum, node_id, valid_main_ch);
2168 } else if (k == 1) {
2169 if (split_sub_use == 0) continue;
2170 exprun = prev_exprun;
2171 evtnum = prev_evtnum;
2172 event_nwords = event_nwords_splitted;
2173 eve_buff = buff + NW_SEND_HEADER + event_nwords_main;
2174 ret = checkEventData(sender_id, eve_buff, event_nwords_splitted,
2175 exprun, evtnum, ECLTRG_NODE_ID, valid_splitted_ch);
2176 } else {
2177 pthread_mutex_lock(&(mtx_sender_log));
2178 printf("[FATAL] thread %d : # of sub-events must be 1 or 2(for ECL,ECLTRG). k = %d Exiting... : exp %d run %d sub %d : %s %s %d\n",
2179 sender_id, k,
2180 (exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
2181 (exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
2182 (exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
2183 __FILE__, __PRETTY_FUNCTION__, __LINE__);
2184 fflush(stdout);
2185 exit(1);
2186 // pthread_mutex_unlock(&(mtx_sender_log)); //TODO can be removed?
2187 }
2188#else
2189 event_nwords = tot_event_nwords;
2190 eve_buff = buff + NW_SEND_HEADER;
2191 ret = checkEventData(sender_id, eve_buff, event_nwords, exprun, evtnum, node_id, valid_ch);
2192#endif
2193
2194 if (ret != DATACHECK_OK) {
2195 if (ret == DATACHECK_OK_BUT_ERRFLAG_IN_HDR) {
2196 // err_bad_ffaa[sender_id]++;
2197 unsigned int reduced_event_nwords = 0;
2198 pthread_mutex_lock(&(mtx_sender_log));
2199 printf("[WARNING] thread %d : fake-error events are detected. Header and trailer reduction will be made and data are checked again.\n",
2200 sender_id);
2201 fflush(stdout);
2202 pthread_mutex_unlock(&(mtx_sender_log));
2203 reduceHdrTrl(eve_buff, reduced_event_nwords);
2204 tot_event_nwords -= (event_nwords - reduced_event_nwords);
2205 event_nwords = reduced_event_nwords;
2206
2207 exprun = prev_exprun;
2208 evtnum = prev_evtnum;
2209
2210 int ret = 0;
2211#ifdef SPLIT_ECL_ECLTRG
2212 if (k == 0) {
2213 if (event_nwords_splitted != 0) {
2214 memcpy(buff_splitted, eve_buff + event_nwords_main, event_nwords_splitted * sizeof(unsigned int));
2215 memcpy(eve_buff + reduced_event_nwords, buff_splitted, event_nwords_splitted * sizeof(unsigned int));
2216 }
2217 event_nwords_main = reduced_event_nwords;
2218 ret = checkEventData(sender_id, eve_buff, reduced_event_nwords, exprun, evtnum, node_id, valid_main_ch);
2219 } else {
2220 event_nwords_splitted = reduced_event_nwords;
2221 ret = checkEventData(sender_id, eve_buff, reduced_event_nwords, exprun, evtnum, node_id, valid_splitted_ch);
2222 }
2223#else
2224 ret = checkEventData(sender_id, eve_buff, reduced_event_nwords, exprun, evtnum, node_id, valid_ch);
2225#endif //SPLIT_ECL_ECLTRG
2226
2227 if (ret != DATACHECK_OK) {
2228 pthread_mutex_lock(&(mtx_sender_log));
2229 printf("[FATAL] thread %d : %s : checkEventData() detected an error after reduceHdrTrl(). Exiting...\n", sender_id, hostnamebuf);
2230 fflush(stdout);
2231 pthread_mutex_unlock(&(mtx_sender_log));
2232 exit(1);
2233 }
2234 pthread_mutex_lock(&(mtx_sender_log));
2235 printf("[WARNING] thread %d : %s : Data-check was passed. This event is treated as a normal event.\n", sender_id, hostnamebuf);
2236 // printf("[FATAL] thread %d : Currently, we will not tolerate a fake-error event. Exiting...\n", sender_id);
2237 printEventData(eve_buff, reduced_event_nwords);
2238 fflush(stdout);
2239 pthread_mutex_unlock(&(mtx_sender_log));
2240 // exit(1);
2241 } else {
2242 pthread_mutex_lock(&(mtx_sender_log));
2243 printf("[FATAL] thread %d : %s : checkEventData() detected an error. Exiting...\n", sender_id, hostnamebuf);
2244 fflush(stdout);
2245 pthread_mutex_unlock(&(mtx_sender_log));
2246 exit(1);
2247 }
2248 }
2249
2250 if (eve_buff[ 1 ] & 0xfffff000 != 0x7f7f0000 ||
2251 eve_buff[ event_nwords - 1 ] != 0x7fff0006) {
2252 pthread_mutex_lock(&(mtx_sender_log));
2253 printf("[FATAL] thread %d : %s : ERROR_EVENT : Invalid Magic word in header( pos=0x%x, %.8x ) and/or trailer( pos=0x%x, 0x%.8x ) : eve %u exp %d run %d sub %d : %s %s %d\n",
2254 sender_id, hostnamebuf,
2255 1, eve_buff[ 1 ],
2256 event_nwords - 1, eve_buff[ event_nwords - 1 ],
2257 evtnum,
2258 (exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
2259 (exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
2260 (exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
2261 __FILE__, __PRETTY_FUNCTION__, __LINE__);
2262 printEventData(eve_buff, event_nwords, sender_id);
2263 fflush(stdout);
2264 pthread_mutex_unlock(&(mtx_sender_log));
2265 exit(1);
2266 }
2267#ifdef SPLIT_ECL_ECLTRG
2268 }
2269#endif
2270
2271 //
2272 // For TOP feature extraction function
2273 //
2274
2275
2276
2277 //
2278 // Filling SendHeader
2279 //
2280 buff[ 0 ] = tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER;
2281 buff[ 1 ] = 6;
2282#ifdef SPLIT_ECL_ECLTRG
2283 if (split_main_use == 1 && split_sub_use == 1) {
2284 buff[ 2 ] = 0x00010002; // nevent = 1, nboards = 2
2285 } else {
2286 buff[ 2 ] = 0x00010001; // nevent = 1, nboards = 1
2287 }
2288#else
2289 buff[ 2 ] = 0x00010001; // nevent = 1, nboards = 1
2290#endif //SPLIT_ECL_ECLTRG
2291 buff[ 3 ] = buff[ NW_SEND_HEADER + 2 ];
2292 buff[ 4 ] = buff[ NW_SEND_HEADER + 3 ];
2293 buff[ 5 ] = buff[ NW_SEND_HEADER + 6 ];
2294 //
2295 // Filling SendTrailer
2296 //
2297 buff[ tot_event_nwords + NW_SEND_HEADER ] = 0x0;
2298 buff[ tot_event_nwords + NW_SEND_HEADER + 1 ] = 0x7fff0007;
2299
2300#ifndef NOT_SEND
2301 ret = 0;
2302 int sent_bytes = 0;
2303 // pthread_mutex_lock(&(mtx_sender_log));
2304 // printf("[DEBUG] thread %d : sent words %d + sndhdr %d + sndtrl %d\n", sender_id, tot_event_nwords, NW_SEND_HEADER, NW_SEND_TRAILER );
2305 // printEventData( buff, tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER, sender_id);
2306 // pthread_mutex_unlock(&(mtx_sender_log));
2307
2308 if ((buff[ NW_SEND_HEADER + 1 ] & 0xfffff000) != 0x7f7f0000 ||
2309 buff[ NW_SEND_HEADER + tot_event_nwords - 1 ] != 0x7fff0006) {
2310 pthread_mutex_lock(&(mtx_sender_log));
2311
2312 printf("[FATAL] thread %d : %s : ERROR_EVENT : Invalid Magic word in the 1st header( pos=0x%x, 0x%.8x ) and/or the last trailer( pos=0x%x, 0x%.8x ) : eve %u exp %d run %d sub %d : %s %s %d\n",
2313 sender_id, hostnamebuf, NW_SEND_HEADER + 1, buff[ NW_SEND_HEADER + 1 ],
2314 NW_SEND_HEADER + tot_event_nwords - 1, buff[ NW_SEND_HEADER + tot_event_nwords - 1 ],
2315 evtnum,
2316 (exprun & Belle2::RawHeader_latest::EXP_MASK) >> Belle2::RawHeader_latest::EXP_SHIFT,
2317 (exprun & Belle2::RawHeader_latest::RUNNO_MASK) >> Belle2::RawHeader_latest::RUNNO_SHIFT,
2318 (exprun & Belle2::RawHeader_latest::SUBRUNNO_MASK),
2319 __FILE__, __PRETTY_FUNCTION__, __LINE__);
2320 printEventData(buff, tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER, sender_id);
2321 fflush(stdout);
2322 pthread_mutex_unlock(&(mtx_sender_log));
2323 exit(1);
2324 }
2325
2326 while (true) {
2327 if ((ret = write(fd_accept, (char*)buff + sent_bytes, (tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER)
2328 * sizeof(unsigned int) - sent_bytes)) <= 0) {
2329 if (errno == EINTR) {
2330 continue;
2331 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
2332 continue;
2333 } else {
2334 perror("[DEBuG] write() failed");
2335 pthread_mutex_lock(&(mtx_sender_log));
2336 printf("[FATAL] thread %d : %s : write() failed. Return value of write() = %d\n", sender_id, hostnamebuf, ret);
2337 fflush(stdout);
2338 pthread_mutex_unlock(&(mtx_sender_log));
2339 exit(1);
2340 }
2341 }
2342 sent_bytes += ret;
2343 if (sent_bytes == (int)((tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER)
2344 * sizeof(unsigned int))) {
2345 break;
2346 } else if (sent_bytes > (int)((tot_event_nwords + NW_SEND_HEADER + NW_SEND_TRAILER)
2347 * sizeof(unsigned int))) {
2348 pthread_mutex_lock(&(mtx_sender_log));
2349 printf("[FATAL] thread %d : %s : Too many bytes are sent\n", sender_id, hostnamebuf);
2350 fflush(stdout);
2351 pthread_mutex_unlock(&(mtx_sender_log));
2352 exit(1);
2353 }
2354 }
2355#endif
2356#ifndef USE_ZMQ
2357 if (buffer_id == 0) {
2358 buffer_id = 1;
2359 } else {
2360 buffer_id = 0;
2361 }
2362#endif
2363 cnt++;
2364
2365 if (cnt == start_cnt) init_time = getTimeSec();
2366 if (cnt % 1000000 == 1) {
2367 if (cnt > start_cnt) {
2368 double cur_time = getTimeSec();
2369 pthread_mutex_lock(&(mtx_sender_log));
2370 printf("[INFO] thread %d : evt %llu time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
2371 sender_id,
2372 cnt, cur_time - init_time,
2373 NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
2374 (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
2375 NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
2376 (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
2377
2378 fflush(stdout);
2379 pthread_mutex_unlock(&(mtx_sender_log));
2380 prev_time = cur_time;
2381 prev_cnt = cnt;
2382 } else {
2383 // printf("Eve %lld\n", cnt);fflush(stdout);
2384 }
2385 }
2386 }
2387
2388 delete buff;
2389#ifndef NOT_SEND
2390 close(fd_accept);
2391#endif
2392 return (void*)0;
2393}
2394
2395int main(int argc, char** argv)
2396{
2397
2399 // From main_pcie40_dmahirate.cpp
2401 bool isData = true ;
2402 bool writeInFile = false ;
2403 if (argc != 2) {
2404 pthread_mutex_lock(&(mtx_sender_log));
2405 printf("[FATAL] Invalid usage of %s : %s <node ID>, node ID = 0x0, if you are not using the Belle II DAQ system.\n",
2406 argv[0], argv[0]) ;
2407 pthread_mutex_unlock(&(mtx_sender_log));
2408 return 0 ;
2409 }
2410
2411
2412 char* endptr;
2413 unsigned int pcie40_node_id = (unsigned int)strtol(argv[1], &endptr, 0);
2414 // char tmp_arg[20];
2415 // if( argv[1][0] == 'x' || argv[1][0] == 'X' || argv[1][1] == 'x' || argv[1][1] == 'X' ){
2416 // strncpy(tmp_arg, argv[1] + 2, 8);
2417 // pcie40_node_id = (unsigned int)strtol(tmp_arg, &endptr, 0) ;
2418 // }else{
2419 // pcie40_node_id = (unsigned int)strtol(tmp_arg, &endptr, 16) ;
2420 // }
2421
2422 host_nodeid[ "rsvd1" ] = 0x01000001;
2423 host_nodeid[ "rsvd2" ] = 0x01000002;
2424 host_nodeid[ "rsvd3" ] = 0x01000003;
2425 host_nodeid[ "rsvd4" ] = 0x01000004;
2426 host_nodeid[ "rsvd5" ] = 0x01000005;
2427 host_nodeid[ "rcdc1" ] = 0x02000001;
2428 host_nodeid[ "rcdc2" ] = 0x02000002;
2429 host_nodeid[ "rcdc3" ] = 0x02000003;
2430 host_nodeid[ "rcdc4" ] = 0x02000004;
2431 host_nodeid[ "rcdc5" ] = 0x02000005;
2432 host_nodeid[ "rcdc6" ] = 0x02000006;
2433 host_nodeid[ "rcdc7" ] = 0x02000007;
2434 host_nodeid[ "rcdc8" ] = 0x02000008;
2435 host_nodeid[ "rtop1" ] = 0x03000001;
2436 host_nodeid[ "rtop2" ] = 0x03000002;
2437 host_nodeid[ "rari1" ] = 0x04000001;
2438 host_nodeid[ "rari2" ] = 0x04000002;
2439 host_nodeid[ "recl1" ] = 0x05000001;
2440 host_nodeid[ "recl2" ] = 0x05000002;
2441 host_nodeid[ "recl3" ] = 0x05000003;
2442 host_nodeid[ "rklm1" ] = 0x07000001;
2443 host_nodeid[ "rtrg1" ] = 0x10000001;
2444
2445
2446 gethostname(hostnamebuf, sizeof(hostnamebuf));
2447 if (pcie40_node_id != NODE_ID_TEST_BENCH) {
2448 std::map<string, unsigned int>::iterator itr;
2449 itr = host_nodeid.find(hostnamebuf);
2450 if (itr != host_nodeid.end()) {
2451 if (itr->second != pcie40_node_id) {
2452 pthread_mutex_lock(&(mtx_sender_log));
2453 printf("[FATAL] Node_id argument ( 0x%.8x ) is invalid. Node_id for %s is 0x%.8x. Exiting...\n",
2454 pcie40_node_id, (itr->first).c_str(), itr->second);
2455 pthread_mutex_unlock(&(mtx_sender_log));
2456 exit(1);
2457 } else {
2458 pthread_mutex_lock(&(mtx_sender_log));
2459 printf("[DEBUG] (hostname %s, nodeid 0x%.8x ) concides with stored info.( %s 0x%.8x )\n", hostnamebuf, pcie40_node_id,
2460 (itr->first).c_str(), itr->second); fflush(stdout);
2461 pthread_mutex_unlock(&(mtx_sender_log));
2462 }
2463 } else {
2464 pthread_mutex_lock(&(mtx_sender_log));
2465 printf("[FATAL] This sever's hostname is not for a PCIe40 ROPC( %s ). Please use 0x%.8x for a test. Exiting...\n", hostnamebuf,
2466 NODE_ID_TEST_BENCH);
2467 pthread_mutex_unlock(&(mtx_sender_log));
2468 exit(1);
2469 }
2470
2471 }
2472 fflush(stdout);
2473
2474#ifdef USE_ZMQ
2476 // ZMQ initialize
2478 zmq::context_t ctx(0);
2479 const pid_t pid = getpid();
2480 for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2481 zmq_writer[i] = new zmq::socket_t(ctx, ZMQ_PAIR);
2482 zmq_reader[i] = new zmq::socket_t(ctx, ZMQ_PAIR);
2483 char zpath[256];
2484 snprintf(zpath, sizeof(zpath), "inproc:///dev/shm/des_ser_PCIe40_main.%d.%d.ipc", pid, i);
2485 zmq_writer[i]->bind(zpath);
2486 zmq_reader[i]->connect(zpath);
2487 }
2488#else
2490 // buffer for inter-threads communication
2492 for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2493 data_1[i] = new unsigned int[MAX_EVENT_WORDS];
2494 data_2[i] = new unsigned int[MAX_EVENT_WORDS];
2495 }
2496#endif
2497
2499 // Initialize variables
2501 double init_time = getTimeSec();
2502 double prev_time = init_time;
2503 unsigned long long int cnt = 0;
2504 unsigned long long int prev_cnt = 0;
2505 unsigned long long int start_cnt = 300000;
2506#ifndef USE_ZMQ
2507 int buffer_id[NUM_SENDER_THREADS];
2508#endif
2509 int total_words = 0;
2510#ifndef USE_ZMQ
2511 for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2512 buffer_id[i] = 0;
2513 buffer_filled[i][0] = 0;
2514 buffer_filled[i][1] = 0;
2515 copy_nwords[i][0] = 0;
2516 copy_nwords[i][1] = 0;
2517 }
2518#endif
2519
2520
2522 // Initialize PCIe40
2524 printf("[DEBUG] Initializing PCIe40 readout...\n"); fflush(stdout);
2525
2526 std::ofstream the_file ;
2527 if (writeInFile) the_file.open("data_file.txt") ;
2528 double triggerRate = 400 ; // kHz
2529 double data_size = 0. ;
2530 int size = 0x1F ;
2531 int res = ecs_open(0, 0) ;
2532
2533 if (-1 == res) {
2534 pthread_mutex_lock(&(mtx_sender_log));
2535 printf("ERROR: Could not open device (BAR 0)\n") ;
2536 pthread_mutex_unlock(&(mtx_sender_log));
2537 } else {
2538 pthread_mutex_lock(&(mtx_sender_log));
2539 printf("SUCCESS: Device opened for ECS 0\n");
2540 pthread_mutex_unlock(&(mtx_sender_log));
2541 }
2542
2543 res = ecs_open(0, 2) ;
2544 if (-1 == res) {
2545 pthread_mutex_lock(&(mtx_sender_log));
2546 printf("ERROR: Could not open device (BAR 2)\n") ;
2547 pthread_mutex_unlock(&(mtx_sender_log));
2548 } else {
2549 pthread_mutex_lock(&(mtx_sender_log));
2550 printf("SUCCESS: Device opened for ECS 2\n");
2551 pthread_mutex_unlock(&(mtx_sender_log));
2552 }
2553 // DMA part
2554 res = dma_open(0) ;
2555 if (-1 == res) {
2556 pthread_mutex_lock(&(mtx_sender_log));
2557 printf("ERROR: Could not open device (DMA)\n") ;
2558 pthread_mutex_unlock(&(mtx_sender_log));
2559 } else {
2560 pthread_mutex_lock(&(mtx_sender_log));
2561 printf("SUCCESS: Device opened for DMA\n");
2562 pthread_mutex_unlock(&(mtx_sender_log));
2563 }
2564
2565
2566 // Read the active links
2567 unsigned int masks0 = ecs_read(0, 2, 0x50520) ;
2568 unsigned int masks1 = ecs_read(0, 2, 0x50540) ;
2569 std::vector< int > valid_ch ;
2570 valid_ch.clear();
2571 for (int i = 0 ; i < 24 ; i++) {
2572 if ((masks0 & (1 << i)) != 0) valid_ch.push_back(i) ;
2573 }
2574 for (int i = 24 ; i < MAX_PCIE40_CH ; i++) {
2575 if ((masks1 & (1 << (i - 24))) != 0) valid_ch.push_back(i) ;
2576 }
2577
2578
2579 // printf("[DEBUG] mask stauts\n");
2580 // printf("[DEBUG] mask register : %.8x %.8x\n", masks0, masks1);
2581 // int temp_valid_pos = 0;
2582 // for (int i = 0 ; i < MAX_PCIE40_CH ; i++) {
2583 // if (valid_ch[temp_valid_pos] == i) {
2584 // printf("[DEBUG] ch %d 1\n", i);
2585 // temp_valid_pos++;
2586 // } else {
2587 // printf("[DEBUG] ch %d 0\n", i);
2588 // }
2589 // }
2590 int num_of_chs = valid_ch.size() ;
2591 pthread_mutex_lock(&(mtx_sender_log));
2592 printf("[DEBUG] # of used channels = %d\n", num_of_chs); fflush(stdout);
2593 pthread_mutex_unlock(&(mtx_sender_log));
2594 if (num_of_chs <= 0) {
2595 pthread_mutex_lock(&(mtx_sender_log));
2596 printf("[FATAL] %s : No channels are used for this PCIe40 board. Please mask this readout PC with runcontrol GUI (or exclude sub-system if this is the only readout PC of the sub-system). Exiting..\n",
2597 hostnamebuf);
2598 fflush(stdout);
2599 pthread_mutex_unlock(&(mtx_sender_log));
2600 exit(1);
2601 }
2602
2603 // initialize sum of error counters;
2604 for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2605 for (int j = 0; j < MAX_PCIE40_CH; j++) {
2606 crc_err_ch[i][j] = 0;
2607 }
2608 }
2609
2610 //
2611 // ulreset to clear FIFOs and rest state machines in userlogic
2612 //
2613 ecs_write(0, 2, 0x00050400, 0x0);
2614 ecs_write(0, 2, 0x00050400, 0x4);
2615 ecs_write(0, 2, 0x00050400, 0x0);
2616
2617 unsigned int initial_value = pcie40_getNbWordInEvent(0) ;
2618 if (initial_value == 0) pcie40_setNbWordInEvent(0, 0xFF) ;
2619 pcie40_dmaReset(0) ;
2620 pcie40_dmaStop(0) ;
2621 if (! isData) {
2622 pcie40_enableGenerator(0) ;
2623 pcie40_useDataFromGenerator(0) ;
2624 } else {
2625 pcie40_disableGenerator(0) ;
2626 pcie40_useDataFromFibers(0) ;
2627 }
2628 //pcie40_useFreeSignal( 0 ) ;
2629 int t_rate = 10416.666 / ((double) triggerRate) - 1 ;
2630 pcie40_setGeneratorTriggerFrequency(0, t_rate) ;
2631 pcie40_setNbEvents(0, 0) ;
2632 // start DAQ
2633 pcie40_resetSPages(0) ;
2634 pcie40_dmaSetReadout(0) ;
2635 pcie40_setSizeFromHeader(0) ;
2636 if (! isData)
2637 pcie40_setNbWordInEvent(0, size) ;
2638 pcie40_setBusyLevel(0, 0x502) ;
2639 dma_initialize(0) ;
2640
2641 pthread_mutex_lock(&(mtx_sender_log));
2642 printf("[DEBUG] PCIe40 readout was initialized.\n"); fflush(stdout);
2643 pthread_mutex_unlock(&(mtx_sender_log));
2644
2646 // Make sender threads
2648 int run_no;
2649 pthread_t sender_thr[NUM_SENDER_THREADS];
2650 // std::thread sender_thr[NUM_SENDER_THREADS];
2651 sender_argv snd_argv[NUM_SENDER_THREADS];
2652
2653 for (int i = 0; i < NUM_SENDER_THREADS; i++) {
2654 snd_argv[i].sender_id = i;
2655 snd_argv[i].valid_ch = valid_ch;
2656 snd_argv[i].node_id = pcie40_node_id;
2657 int ret = pthread_create(&(sender_thr[i]), NULL, sender, &(snd_argv[i]));
2658 if (ret != 0) {
2659 pthread_mutex_lock(&(mtx_sender_log));
2660 printf("[FATAL] Failed to create a thread. ret = %d. Exting...\n", ret);
2661 fflush(stdout);
2662 pthread_mutex_unlock(&(mtx_sender_log));
2663 exit(1);
2664 }
2665 }
2666
2667#ifndef CRC_CHECK
2668 pthread_mutex_lock(&(mtx_sender_log));
2669 // printf("[WARNING] CRC check by software is disabled now !! Relying on check in PCIe40 firmware\n"); fflush(stdout);
2670 printf("[FATAL] CRC check by software is disabled now !! Relying on check in PCIe40 firmware\n"); fflush(stdout);
2671 pthread_mutex_unlock(&(mtx_sender_log));
2672 exit(1);
2673#endif
2674
2676 // Initialize readout variables
2678 int rv ;
2679 unsigned int* data ;
2680 unsigned int* combined_data = NULL;
2681 unsigned int* buf_combined = new unsigned int[ MAX_EVENT_WORDS ];
2682 int new_buf_combined = 0;
2683 long long int get_sp_cnt = 0 ;
2684 // int get_sp_cnt = 0x7fff0000 ;
2685 int k = 0 ;
2686
2687 unsigned int evtnum = 0;
2688 unsigned int exprun = 0;
2689 unsigned int prev_exprun = 0;
2690 int errors = 0 ;
2691 unsigned int esize = 0 ;
2692 int total_pages = 0 ;
2693 int index_pages = 0 ;
2694 int previous_index = 0 ;
2695 unsigned int frag_size = 0 ;
2696 // auto t1 = std::chrono::high_resolution_clock::now();
2697 double m_start_time = getTimeSec();
2698 double m_prev_time = 0.;
2699 double total_size_bytes = 0.;
2700 double prev_total_size_bytes = 0.;
2701 double total_eve_cnt = 0.;
2702 double prev_total_eve_cnt = 0.;
2703 int first_flag = 0;
2704 int first_event_flag = 0;
2705 unsigned int evecnt = 0;
2706 unsigned int prev_evecnt = 0;
2707 int client_id = 0;
2708 int dma_hdr_offset = 0;
2709
2711 // Main loop
2713 pthread_mutex_lock(&(mtx_sender_log));
2714 printf("[INFO] des_ser_PCIe40_main: Reading the 1st event from a PCIe40 board...\n"); fflush(stdout);
2715 pthread_mutex_unlock(&(mtx_sender_log));
2716 for (;;) {
2718 // Main loop
2720 while (true) {
2721 // usleep(100000);
2722 // start DMA and wait for one or more super pages of data
2723 rv = pcie40_dmaStart(0) ;
2724 //printf( "Number of super page received: %d\n" , rv ) ;
2725 // #pragma omp parallel for
2726 for (int j = 0 ; j < rv * S_PAGE_SLOT_NMB ; ++j) {
2727 unsigned int event_words = 0;
2728 data = pcie40_getSuperPageCopy(0, (get_sp_cnt / S_PAGE_SLOT_NMB) % S_PAGES, get_sp_cnt % S_PAGE_SLOT_NMB) ;
2729 if (! isData) {
2730 checkEventGenerator(data, get_sp_cnt, size);
2731 } else {
2732 // Check DMA header and trailer
2733 int ret = checkDMAHeader(data, frag_size, data_size, total_pages, index_pages) ;
2734
2735 if (first_event_flag == 0) {
2736 pthread_mutex_lock(&(mtx_sender_log));
2737 printf("[INFO] des_ser_PCIe40_main: Done. the size of the 1st packet is %d bytes.\n", (int)data_size); fflush(stdout);
2738 pthread_mutex_unlock(&(mtx_sender_log));
2739 first_event_flag = 1;
2740 }
2741
2742 if (first_flag == 0 && index_pages != 0 && ret < 1) {
2743 pthread_mutex_lock(&(mtx_sender_log));
2744 printf("Invalid index error : tot %d index %d ret %d\n", total_pages, index_pages, ret);
2745 pthread_mutex_unlock(&(mtx_sender_log));
2746 ret = 1;
2747 }
2748 first_flag = 1;
2749
2750 if (ret == 0) { // No error in checkDMAHeader()
2751 if (total_pages > 1 && total_pages <= 0xffff) { // Multiple DMA packets for an event
2752 //
2753 // Prepare buffer for combined data
2754 //
2755 if (index_pages == 0) {
2756 esize = frag_size ;
2757 if (total_pages * S_PAGE_SLOT_SIZE / 4 > MAX_EVENT_WORDS) {
2758 new_buf_combined = 1;
2759 combined_data = new unsigned int[ total_pages * S_PAGE_SLOT_SIZE / 4 ] ;
2760 } else {
2761 new_buf_combined = 0;
2762 combined_data = buf_combined;
2763 }
2764 } else {
2765 esize += frag_size ;
2766 }
2767
2768 if (combined_data == NULL) {
2769 pthread_mutex_lock(&(mtx_sender_log));
2770 printf("[FATAL] Data buffer is not yet allocated. %p\n", combined_data);
2771 pthread_mutex_unlock(&(mtx_sender_log));
2772 fflush(stdout);
2773 exit(1);
2774 }
2775 // Store a DMA packet in buffer for combined data
2776 memcpy(&combined_data[ previous_index ], data + DMA_HDR_WORDS, 8 * (frag_size - 2) * 4) ;
2777 delete [] data ;
2778 data = NULL;
2779 previous_index = previous_index + 8 * (frag_size - 2) ;
2780
2781 // Get more DMA packets to complete an event
2782 if (index_pages != (total_pages - 1)) {
2783 get_sp_cnt++;
2784 if ((get_sp_cnt > 0) && ((get_sp_cnt % S_PAGE_SLOT_NMB) == 0)) pcie40_freeSuperPage(0, 1) ;
2785 continue ; //
2786 }
2787 // End of an event
2788 dma_hdr_offset = 0;
2789
2790 } else if (total_pages == 1) { // One DMA packet for an event
2791 // End of an event
2792 esize = frag_size ;
2793 combined_data = data;
2794 new_buf_combined = 2; // Delete data[] later
2795 dma_hdr_offset = DMA_HDR_WORDS;
2796 } else {
2797 pthread_mutex_lock(&(mtx_sender_log));
2798 printf("Invalid total pages %d\n", total_pages);
2799 pthread_mutex_unlock(&(mtx_sender_log));
2800 exit(1);
2801 }
2802 } else {
2803 exit(1);
2804 // if (exit_on_error) exit(0) ; //TODO can be removed?
2805 // errors++ ; //TODO can be removed?
2806 }
2807
2808 //
2809 // End of an event
2810 //
2811 {
2812 if (combined_data != NULL) {
2813 // if (k < 10)printFullData(combined_data + dma_hdr_offset);
2814 event_words = combined_data[ dma_hdr_offset + EVENT_LEN_POS ];
2815 if (event_words < 32000) {
2816 total_size_bytes += ((double)event_words) * 4.;
2817 total_eve_cnt++;
2818 } else {
2819 pthread_mutex_lock(&(mtx_sender_log));
2820 printf("Strange event size %.8x ret %d\n", event_words, ret);
2821 printFullData(combined_data + dma_hdr_offset);
2822 pthread_mutex_unlock(&(mtx_sender_log));
2823 }
2824 }
2825 evecnt++;
2826 }
2827 }
2828 previous_index = 0 ;
2829
2830 //
2831 // Send data to senders
2832 //
2833 if (event_words > 0 && event_words < MAX_EVENT_WORDS) {
2834
2835 //
2836 // Check event # incrementation
2837 //
2838 unsigned int* temp_data = combined_data + dma_hdr_offset;
2839 if (evtnum + 1 != temp_data[EVENUM_POS]) {
2840 if (exprun == temp_data[RUNNO_POS]
2841 && exprun != 0) { // After a run-change or if this is the 1st event, event incrementation is not checked.
2842 printEventNumberError(temp_data, evtnum, exprun, 1, -1);
2843#ifndef NO_ERROR_STOP
2844 exit(1);
2845#endif
2846 }
2847 }
2848
2849 if (exprun != prev_exprun || exprun == 0) {
2850 m_start_time = getTimeSec();
2851 m_prev_time = m_start_time;
2852 evecnt = 1;
2853 prev_evecnt = 1;
2854 total_eve_cnt = 1;
2855 prev_total_eve_cnt = 0;
2856 total_size_bytes = 0.;
2857 prev_total_size_bytes = 0.;
2858 }
2859 evtnum = temp_data[EVENUM_POS];
2860 prev_exprun = exprun;
2861 exprun = temp_data[RUNNO_POS];
2862
2863 //
2864 // Copy data to buffer
2865 //
2866 client_id = client_id % NUM_SENDER_THREADS;
2867#ifdef USE_ZMQ
2868 // by ZMQ (experimental)
2869 zmq_writer[client_id]->send(combined_data + dma_hdr_offset, event_words * sizeof(unsigned int));
2870#else
2871 // by double buffer (original)
2872 if (buffer_id[client_id] == 0) {
2873 while (1) {
2874 if (buffer_filled[client_id][0] == 0)break;
2875 usleep(1);
2876 }
2877
2878 {
2879 pthread_mutex_lock(&(mtx1_ch[client_id]));
2880 memcpy(data_1[client_id], combined_data + dma_hdr_offset, event_words * sizeof(unsigned int));
2881 copy_nwords[client_id][0] = event_words;
2882 buffer_filled[client_id][0] = 1;
2883 pthread_mutex_unlock(&(mtx1_ch[client_id]));
2884 }
2885 } else {
2886 while (1) {
2887 if (buffer_filled[client_id][1] == 0)break;
2888 usleep(1);
2889 }
2890 {
2891 pthread_mutex_lock(&(mtx2_ch[client_id]));
2892 fflush(stdout);
2893 memcpy(data_2[client_id], combined_data + dma_hdr_offset, event_words * sizeof(unsigned int));
2894 copy_nwords[client_id][1] = event_words;
2895 buffer_filled[client_id][1] = 1;
2896 pthread_mutex_unlock(&(mtx2_ch[client_id]));
2897 }
2898 }
2899
2900 if (buffer_id[client_id] == 0) {
2901 buffer_id[client_id] = 1;
2902 } else {
2903 buffer_id[client_id] = 0;
2904 }
2905#endif
2906 client_id++;
2907 } else {
2908 pthread_mutex_lock(&(mtx_sender_log));
2909 printf("[FATAL] Invalid event-size %u\n", event_words);
2910 fflush(stdout);
2911 pthread_mutex_unlock(&(mtx_sender_log));
2912 exit(1);
2913 }
2914
2915 //
2916 // Error-count monitor
2917 //
2918 previous_index = 0 ;
2919 if (new_buf_combined == 1) {
2920 delete [] combined_data ;
2921 combined_data = NULL;
2922 } else if (new_buf_combined == 2) {
2923 delete [] data ;
2924 }
2925 first_flag = 0;
2926 // if ( i != getEventNumber( data ) ) printf( "Mismatch event number %d %d\n" , i , getEventNumber( data ) ) ;
2927 get_sp_cnt++;
2928 ++k ;
2929 if ((evecnt % 100000) == 0 ||
2930 ((evecnt % 10000) == 0 && 0 < evecnt && evecnt < 100000) ||
2931 evecnt == 1
2932 ) {
2933 unsigned int sum_total_crc_good = 0;
2934 unsigned int sum_total_crc_errors = 0;
2935 unsigned int sum_err_not_reduced = 0;
2936 unsigned int sum_err_bad_7f7f = 0;
2937 unsigned int sum_err_bad_runnum = 0;
2938 unsigned int sum_err_bad_linknum = 0;
2939 unsigned int sum_err_bad_evenum = 0;
2940 unsigned int sum_err_bad_ffaa = 0;
2941 unsigned int sum_err_bad_ff55 = 0;
2942 unsigned int sum_err_bad_linksize = 0;
2943 unsigned int sum_err_link_eve_jump = 0;
2944
2945 if (evecnt != 1) {
2946 unsigned int sum_err_flag_cnt = 0;
2947 unsigned int sum_cur_evtnum = 0;
2948 unsigned int sum_crc_err_ch[ MAX_PCIE40_CH] = {0};
2949
2950 for (int l = 0; l < NUM_SENDER_THREADS; l++) {
2951 sum_total_crc_good += total_crc_good[l];
2952 sum_total_crc_errors += total_crc_errors[l];
2953 sum_err_flag_cnt += err_flag_cnt[l];
2954 sum_cur_evtnum += cur_evtnum[l];
2955 sum_err_not_reduced += err_not_reduced[l];
2956 sum_err_bad_7f7f += err_bad_7f7f[l];
2957 sum_err_bad_runnum += err_bad_runnum[l];
2958 sum_err_bad_linknum += err_bad_linknum[l];
2959 sum_err_bad_evenum += err_bad_evenum[l];
2960 sum_err_bad_ffaa += err_bad_ffaa[l];
2961 sum_err_bad_ff55 += err_bad_ff55[l];
2962 sum_err_bad_linksize += err_bad_linksize[l];
2963 sum_err_link_eve_jump += err_link_eve_jump[l];
2964
2965 // if (cur_exprun[0] != cur_exprun[l]) {
2966 // pthread_mutex_lock(&(mtx_sender_log));
2967 // printf("[FATAL] exprun mismatch thr 0 = 0x%.8x , thr %d = 0x%.8x", cur_exprun[0], l, cur_exprun[l]);
2968 // pthread_mutex_unlock(&(mtx_sender_log));
2969 // exit(1);
2970 // }
2971
2972 for (int m = 0; m < MAX_PCIE40_CH; m++) {
2973 sum_crc_err_ch[m] += crc_err_ch[l][m];
2974 }
2975 }
2976 }
2977
2978 double cur_time = getTimeSec();
2979 double total_time = cur_time - m_start_time;
2980 double interval = cur_time - m_prev_time;
2981 m_prev_time = cur_time;
2982 time_t timer;
2983 struct tm* t_st;
2984 time(&timer);
2985 t_st = localtime(&timer);
2986 char timeStr[100];
2987 std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S\n", t_st);
2988
2989 pthread_mutex_lock(&(mtx_sender_log));
2990
2991 printf("[DEBUG] Event %12u Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s] evenum %12u exprun 0x%.8x eve_size %6.2lf[kB] numch %d nonred %u crcok %u crcng %u evejump %u bad_7f7f %u bad_runnum %u bad_linknum %u bad_evenum %u bad_ffaa %u bad_ff55 %u bad_linksize %u no_data %u bad_header %u bad_size %u bad_size_dmatrl %u bad_dmatrl %u bad_word_size %u %s",
2992 evecnt, (evecnt - prev_evecnt) / interval / 1.e3,
2993 (total_size_bytes - prev_total_size_bytes) / interval / 1.e6,
2994 total_time, interval,
2995 evtnum, exprun,
2996 (total_size_bytes - prev_total_size_bytes) / (total_eve_cnt - prev_total_eve_cnt) / 1.e3,
2997 num_of_chs,
2998 sum_err_not_reduced, sum_total_crc_good, sum_total_crc_errors, sum_err_link_eve_jump,
2999 sum_err_bad_7f7f,
3000 sum_err_bad_runnum, sum_err_bad_linknum, sum_err_bad_evenum, sum_err_bad_ffaa, sum_err_bad_ff55, sum_err_bad_linksize,
3001 dmaerr_no_data, dmaerr_bad_header, dmaerr_bad_size, dmaerr_bad_size_dmatrl, dmaerr_bad_dmatrl, dmaerr_bad_word_size,
3002 timeStr);
3003 fflush(stdout);
3004 pthread_mutex_unlock(&(mtx_sender_log));
3005 prev_total_size_bytes = total_size_bytes;
3006 prev_evecnt = evecnt;
3007 prev_total_eve_cnt = total_eve_cnt;
3008 }
3009
3010 if ((k % 100) == 0)
3011 if (writeInFile) writeToFile(the_file, data, esize) ;
3012 if ((get_sp_cnt > 0) && ((get_sp_cnt % S_PAGE_SLOT_NMB) == 0)) pcie40_freeSuperPage(0, 1) ;
3013 }
3014 }
3015
3016 //
3017 // Rate Monitor
3018 //
3019 cnt++;
3020 if (cnt == start_cnt) init_time = getTimeSec();
3021 if (cnt % 10000 == 1) {
3022 if (cnt > start_cnt) {
3023 double cur_time = getTimeSec();
3024 pthread_mutex_lock(&(mtx_sender_log));
3025 printf("run %d evt %llu time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
3026 run_no,
3027 cnt,
3028 cur_time - init_time,
3029 NUM_SENDER_THREADS * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
3030 (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
3031 NUM_SENDER_THREADS * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
3032 (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
3033 fflush(stdout);
3034 pthread_mutex_unlock(&(mtx_sender_log));
3035 prev_time = cur_time;
3036 prev_cnt = cnt;
3037 } else {
3038 // printf("Eve %lld\n", cnt);fflush(stdout);
3039 }
3040 }
3041 }
3042
3043 //
3044 // Close PCIe40
3045 //
3046 ecs_close(0, 0) ;
3047 ecs_close(0, 2) ;
3048 dma_close(0) ;
3049 if (writeInFile) the_file.close() ;
3050
3051 //
3052 // Close threads and delete buffers
3053 //
3054 for (int i = 0; i < NUM_SENDER_THREADS; i++) {
3055 pthread_join(sender_thr[i], NULL);
3056#ifndef USE_ZMQ
3057 pthread_mutex_destroy(&(mtx1_ch[i]));
3058 pthread_mutex_destroy(&(mtx2_ch[i]));
3059 delete data_1[i];
3060 delete data_2[i];
3061#endif
3062 }
3063 pthread_mutex_destroy(&mtx_sender_log);
3064 return 0;
3065}
STL namespace.