8 #ifdef USE_DUMMY_DATA_THREADS
12 #include <netinet/in.h>
13 #include <netinet/tcp.h>
14 #include <sys/types.h>
15 #include <sys/socket.h>
20 #include <arpa/inet.h>
36 #define NUM_CLIENTS_PER_THREAD 1
41 #define NW_SEND_HEADER 6
42 #define NW_SEND_TRAILER 2
44 #define NW_RAW_HEADER 8
45 #define NW_RAW_TRAILER 4
48 #define NW_B2L_HEADER 3
49 #define NW_B2L_TRAILER 2
51 #define NW_B2L_HEADER 7
52 #define NW_B2L_TRAILER 3
55 #define CTIME_VAL 0x12345601
59 unsigned int* data_1[NUM_CLIENTS];
60 unsigned int* data_2[NUM_CLIENTS];
62 std::mutex mtx1_ch[NUM_CLIENTS];
63 std::mutex mtx2_ch[NUM_CLIENTS];
65 unsigned short CalcCRC16LittleEndian(
unsigned short crc16,
const int buf[],
int nwords)
70 sprintf(err_buf,
"nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
71 nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
72 printf(
"%s", err_buf); fflush(stdout);
73 string err_str = err_buf;
77 const unsigned short CRC16Table0x1021[ 256 ] = {
78 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
79 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
80 0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
81 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
82 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
83 0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
84 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
85 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
87 0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
88 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
89 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
90 0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
91 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
92 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
93 0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
94 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
96 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
97 0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
98 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
99 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
100 0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
101 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
102 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
103 0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
105 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
106 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
107 0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
108 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
109 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
110 0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
111 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
112 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
115 int cnt = 0, nints = 0;
117 while (nwords != 0) {
119 unsigned char temp_buf = *((
unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
120 crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
122 if ((cnt % 4) == 3) {
140 gettimeofday(&t, NULL);
141 return (t.tv_sec + t.tv_usec * 1.e-6 - 1417570000.);
144 int fillDataContents(
int* buf,
int nwords_per_fee,
unsigned int node_id,
int ncpr,
int nhslb,
int run)
146 int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
147 ncpr * (NW_RAW_HEADER +
148 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
153 buf[ offset + 0 ] = nwords;
154 buf[ offset + 1 ] = 6;
155 buf[ offset + 2 ] = (1 << 16) | ncpr;
156 unsigned int exp_run = run << 8;
157 buf[ offset + 3 ] = exp_run;
158 buf[ offset + 5 ] = node_id;
159 offset += NW_SEND_HEADER;
161 for (
int k = 0; k < ncpr; k++) {
162 int top_pos = offset;
166 int cpr_nwords = NW_RAW_HEADER +
167 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
169 int finesse_nwords = nwords_per_fee + NW_B2L_HEADER + NW_B2L_TRAILER;
170 unsigned int ctime = CTIME_VAL;
171 unsigned int utime = 0x98765432;
173 buf[ offset + 0 ] = cpr_nwords;
175 buf[ offset + 1 ] = 0x7f7f020c;
177 buf[ offset + 1 ] = 0x7f7f820c;
179 buf[ offset + 2 ] = exp_run;
180 printf(
"run_no %d\n", exp_run); fflush(stdout);
181 buf[ offset + 4 ] = ctime;
182 buf[ offset + 5 ] = utime;
183 buf[ offset + 6 ] = node_id + k;
184 buf[ offset + 7 ] = 0x34567890;
185 offset += NW_RAW_HEADER;
187 for (
int i = 0; i < nhslb ; i++) {
189 buf[ offset + 0 ] = nwords_per_fee + 3;
190 buf[ offset + 1 ] = 0xffaa0000;
191 buf[ offset + 2 ] = ctime;
193 buf[ offset + 0 ] = nwords_per_fee + 7;
194 buf[ offset + 1 ] = 0xffaa0000;
195 buf[ offset + 3 ] = ctime;
196 buf[ offset + 4 ] = utime;
197 buf[ offset + 5 ] = exp_run;
198 buf[ offset + 6 ] = ctime;
200 offset += NW_B2L_HEADER;
202 for (
int j = offset; j < offset + nwords_per_fee; j++) {
205 offset += nwords_per_fee;
209 buf[ offset + 1 ] = 0xff550000;
211 buf[ offset ] = ctime;
212 buf[ offset + 1 ] = 0;
213 buf[ offset + 2 ] = 0xff550000;
216 offset += NW_B2L_TRAILER;
219 buf[ offset + 1 ] = 0x0;
220 buf[ offset + 2 ] = 0x0;
221 buf[ offset + 3 ] = 0x7fff0006;
222 offset += NW_RAW_TRAILER;
227 buf[ offset + 1 ] = 0x7fff0000;
228 offset += NW_SEND_TRAILER;
234 inline void addEvent(
int* buf,
int nwords_per_fee,
unsigned int event,
int ncpr,
int nhslb)
239 buf[ offset + 4 ] = event;
240 offset += NW_SEND_HEADER;
242 for (
int k = 0; k < ncpr; k++) {
243 int nwords = buf[ offset ];
244 int posback_xorchksum = 2;
245 int pos_xorchksum = offset + nwords - posback_xorchksum;
246 prev_offset = offset;
247 if (buf[ offset + 4 ] != CTIME_VAL) {
248 printf(
"[FATAL] data-production error 2 0x%.x", buf[ offset + 4 ]);
253 buf[ pos_xorchksum ] ^= buf[ offset + 3];
254 buf[ offset + 3] = event;
255 buf[ pos_xorchksum ] ^= buf[ offset + 3];
258 offset += NW_RAW_HEADER;
259 for (
int i = 0; i < nhslb ; i++) {
260 if ((buf[ offset + 1 ] & 0xffff0000) != 0xffaa0000) {
261 printf(
"[FATAL] data-production error 3 : 0x%.x hslb %d cpr %d\n", buf[ offset ], i, k);
265 buf[ offset + 1 ] = 0xffaa0000 + (
event & 0xffff);
266 buf[ offset + 3 ] = event;
269 int* crc_buf = buf + offset + 2;
270 int crc_nwords = nwords_per_fee + 5;
271 unsigned short temp_crc16 = CalcCRC16LittleEndian(0xffff, crc_buf, crc_nwords);
272 buf[ offset + NW_B2L_HEADER + nwords_per_fee + 1 ] = ((
event & 0x0000ffff) << 16) | temp_crc16;
276 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
278 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
281 offset += NW_RAW_TRAILER;
282 unsigned int xor_chksum = 0;
283 unsigned int xor_chksum2 = 0;
289 int sender(
int sender_id,
int run_no,
int nwords_per_fee,
int ncpr,
int nhslb)
294 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
295 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
297 int buff[total_words];
302 int port_to = 30000 + sender_id;
304 struct sockaddr_in servaddr;
305 struct pollfd client[NUM_CLIENTS_PER_THREAD + 1];
326 struct sockaddr_in sock_listen;
327 sock_listen.sin_family = AF_INET;
329 sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
331 socklen_t addrlen =
sizeof(sock_listen);
332 sock_listen.sin_port = htons(port_to);
333 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
336 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)
sizeof(flags));
338 perror(
"Failed to set REUSEADDR");
341 if (bind(fd_listen, (
struct sockaddr*)&sock_listen,
sizeof(
struct sockaddr)) < 0) {
342 printf(
"[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
343 port_to); fflush(stdout);
348 sprintf(cmdline,
"/usr/sbin/ss -ap | grep %d", port_to);
349 if ((fp = popen(cmdline,
"r")) == NULL) {
350 printf(
"[WARNING] Failed to run %s\n", cmdline);
352 while (fgets(buf, 256, fp) != NULL) {
353 printf(
"[ERROR] Failed to bind. output of ss(port %d) : %s\n", port_to, buf); fflush(stdout);
358 sprintf(err_buf,
"[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
359 strerror(errno), port_to);
360 printf(
"%s\n", err_buf); fflush(stdout);
366 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)
sizeof(val1));
368 if (listen(fd_listen, backlog) < 0) {
370 sprintf(err_buf,
"Failed in listen(%s). Exting...", strerror(errno));
371 printf(
"%s\n", err_buf); fflush(stdout);
380 struct sockaddr_in sock_accept;
382 printf(
"[DEBUG] Accepting... : port %d\n", port_to);
385 if ((fd_accept = accept(fd_listen, (
struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
387 sprintf(err_buf,
"[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
388 printf(
"%s\n", err_buf); fflush(stdout);
393 printf(
"[DEBUG] Accepted.\n"); fflush(stdout);
396 struct timeval timeout;
399 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)
sizeof(timeout));
401 char err_buf[500] =
"[FATAL] Failed to set TIMEOUT. Exiting...";
402 printf(
"%s\n", err_buf); fflush(stdout);
414 printf(
"Connection(port %d) accepted\n", port_to); fflush(stdout);
416 double init_time = getTimeSec();
417 double prev_time = init_time;
419 unsigned long long int cnt = 0;
420 unsigned long long int prev_cnt = 0;
421 unsigned long long int start_cnt = 3000;
425 int j = 0; j < MAX_EVENT; j++
442 if ((ret = write(fd_accept, buff, total_words *
sizeof(
int))) <= 0) {
443 printf(
"[FATAL] Return value %d\n", ret);
450 if (cnt == start_cnt) init_time = getTimeSec();
451 if (cnt % 1000000 == 1) {
452 if (cnt > start_cnt) {
453 double cur_time = getTimeSec();
454 printf(
"run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
457 cur_time - init_time,
458 NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - prev_time),
459 (cnt - prev_cnt) / (cur_time - prev_time) / 1000. ,
460 NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - init_time),
461 (cnt - start_cnt) / (cur_time - init_time) / 1000. , total_words);
464 prev_time = cur_time;
476 int main(
int argc,
char** argv)
479 printf(
"###################################################\n");
481 printf(
"# PCIe40 data after reduction (#define REDUCED_DATA) #\n");
483 printf(
"# PCIe40 data before reduction (//#define REDUCED_DATA) #\n");
485 printf(
"###################################################\n");
488 printf(
"Usage : %s <node ID> <run#> <nwords of det. buf per FEE> <# of CPR per COPPER> <# of HSLBs>\n", argv[ 0 ]);
495 unsigned int node_id = 0;
496 sscanf(argv[1],
"0x%x", &node_id);
498 int run_no = atoi(argv[2]);
499 int nwords_per_fee = atoi(argv[3]);
500 int ncpr = atoi(argv[4]);
501 int nhslb = atoi(argv[5]);
506 for (
int i = 0; i < NUM_CLIENTS; i++) {
507 data_1[i] =
new unsigned int[100000];
508 data_2[i] =
new unsigned int[100000];
514 std::thread sender0(sender, 0, run_no, nwords_per_fee, ncpr, nhslb);
515 std::thread sender1(sender, 1, run_no, nwords_per_fee, ncpr, nhslb);
516 std::thread sender2(sender, 2, run_no, nwords_per_fee, ncpr, nhslb);
517 std::thread sender3(sender, 3, run_no, nwords_per_fee, ncpr, nhslb);
518 std::thread sender4(sender, 4, run_no, nwords_per_fee, ncpr, nhslb);
525 unsigned int node_id = 0;
526 sscanf(argv[1],
"0x%x", &node_id);
528 int run_no = atoi(argv[2]);
529 int nwords_per_fee = atoi(argv[3]);
530 int ncpr = atoi(argv[4]);
531 int nhslb = atoi(argv[5]);
533 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
534 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
536 printf(
"TET %d %d %d %d %d\n ", NW_SEND_HEADER + NW_SEND_TRAILER, ncpr,
537 NW_RAW_HEADER, (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb, NW_RAW_TRAILER);
538 int buff[total_words];
543 int temp_ret = fillDataContents(buff, nwords_per_fee, node_id, ncpr, nhslb, run_no);
544 if (temp_ret != total_words) {
545 printf(
"[FATAL] data-production error 1 %d %d\n", total_words, temp_ret);
550 double init_time = getTimeSec();
551 double prev_time = init_time;
553 unsigned long long int cnt = 0;
554 unsigned long long int prev_cnt = 0;
555 unsigned long long int start_cnt = 300000;
560 for (
int j = 0; j < MAX_EVENT; j++)
566 addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
575 for (
int i = 1 ; i <= NUM_CLIENTS ; i++) {
577 if (buffer_id == 0) {
579 std::lock_guard<std::mutex> lock(mtx1_ch[i]);
580 memcpy(data_1[i], buff, total_words *
sizeof(
unsigned int))
584 std::lock_guard<std::mutex> lock(mtx2_ch[i]);
585 memcpy(data_2[i], buff, total_words *
sizeof(
unsigned int))
595 if (buffer_id == 0) {
603 if (cnt == start_cnt) init_time = getTimeSec();
604 if (cnt % 10000 == 1) {
605 if (cnt > start_cnt) {
606 double cur_time = getTimeSec();
607 printf(
"run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
610 cur_time - init_time,
611 NUM_CLIENTS * (cnt - prev_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - prev_time),
612 (cnt - prev_cnt) / (cur_time - prev_time) / 1000. ,
613 NUM_CLIENTS * (cnt - start_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - init_time),
614 (cnt - start_cnt) / (cur_time - init_time) / 1000. , total_words);
617 prev_time = cur_time;
633 for (
int i = 0; i < NUM_CLIENTS; i++) {
int main(int argc, char **argv)
Run all tests.