8 #ifdef USE_DUMMY_DATA_THREADS
12 #include <netinet/in.h>
13 #include <netinet/tcp.h>
14 #include <sys/types.h>
15 #include <sys/socket.h>
20 #include <arpa/inet.h>
36 #define NUM_CLIENTS_PER_THREAD 1
41 #define NW_SEND_HEADER 6
42 #define NW_SEND_TRAILER 2
44 #define NW_RAW_HEADER 8
45 #define NW_RAW_TRAILER 4
48 #define NW_B2L_HEADER 3
49 #define NW_B2L_TRAILER 2
51 #define NW_B2L_HEADER 7
52 #define NW_B2L_TRAILER 3
55 #define CTIME_VAL 0x12345601
59 unsigned int* data_1[NUM_CLIENTS];
60 unsigned int* data_2[NUM_CLIENTS];
62 std::mutex mtx1_ch[NUM_CLIENTS];
63 std::mutex mtx2_ch[NUM_CLIENTS];
65 unsigned short CalcCRC16LittleEndian(
unsigned short crc16,
const int buf[],
int nwords)
70 sprintf(err_buf,
"nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
71 nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
72 printf(
"%s", err_buf); fflush(stdout);
73 string err_str = err_buf;
77 const unsigned short CRC16Table0x1021[ 256 ] = {
78 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
79 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
80 0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
81 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
82 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
83 0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
84 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
85 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
87 0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
88 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
89 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
90 0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
91 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
92 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
93 0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
94 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
96 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
97 0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
98 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
99 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
100 0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
101 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
102 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
103 0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
105 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
106 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
107 0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
108 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
109 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
110 0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
111 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
112 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
115 int cnt = 0, nints = 0;
117 while (nwords != 0) {
119 unsigned char temp_buf = *((
unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
120 crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
122 if ((cnt % 4) == 3) {
140 gettimeofday(&t, NULL);
141 return (t.tv_sec + t.tv_usec * 1.e-6 - 1417570000.);
144 int fillDataContents(
int* buf,
int nwords_per_fee,
unsigned int node_id,
int ncpr,
int nhslb,
int run)
146 int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
147 ncpr * (NW_RAW_HEADER +
148 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
153 buf[ offset + 0 ] = nwords;
154 buf[ offset + 1 ] = 6;
155 buf[ offset + 2 ] = (1 << 16) | ncpr;
156 unsigned int exp_run = run << 8;
157 buf[ offset + 3 ] = exp_run;
158 buf[ offset + 5 ] = node_id;
159 offset += NW_SEND_HEADER;
161 for (
int k = 0; k < ncpr; k++) {
162 int top_pos = offset;
166 int cpr_nwords = NW_RAW_HEADER +
167 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
169 int finesse_nwords = nwords_per_fee + NW_B2L_HEADER + NW_B2L_TRAILER;
170 unsigned int ctime = CTIME_VAL;
171 unsigned int utime = 0x98765432;
173 buf[ offset + 0 ] = cpr_nwords;
175 buf[ offset + 1 ] = 0x7f7f020c;
177 buf[ offset + 1 ] = 0x7f7f820c;
179 buf[ offset + 2 ] = exp_run;
180 printf(
"run_no %d\n", exp_run); fflush(stdout);
181 buf[ offset + 4 ] = ctime;
182 buf[ offset + 5 ] = utime;
183 buf[ offset + 6 ] = node_id + k;
184 buf[ offset + 7 ] = 0x34567890;
185 offset += NW_RAW_HEADER;
187 for (
int i = 0; i < nhslb ; i++) {
189 buf[ offset + 0 ] = nwords_per_fee + 3;
190 buf[ offset + 1 ] = 0xffaa0000;
191 buf[ offset + 2 ] = ctime;
193 buf[ offset + 0 ] = nwords_per_fee + 7;
194 buf[ offset + 1 ] = 0xffaa0000;
195 buf[ offset + 3 ] = ctime;
196 buf[ offset + 4 ] = utime;
197 buf[ offset + 5 ] = exp_run;
198 buf[ offset + 6 ] = ctime;
200 offset += NW_B2L_HEADER;
202 for (
int j = offset; j < offset + nwords_per_fee; j++) {
205 offset += nwords_per_fee;
209 buf[ offset + 1 ] = 0xff550000;
211 buf[ offset ] = ctime;
212 buf[ offset + 1 ] = 0;
213 buf[ offset + 2 ] = 0xff550000;
216 offset += NW_B2L_TRAILER;
219 buf[ offset + 1 ] = 0x0;
220 buf[ offset + 2 ] = 0x0;
221 buf[ offset + 3 ] = 0x7fff0006;
222 offset += NW_RAW_TRAILER;
227 buf[ offset + 1 ] = 0x7fff0000;
228 offset += NW_SEND_TRAILER;
234 inline void addEvent(
int* buf,
int nwords_per_fee,
unsigned int event,
int ncpr,
int nhslb)
238 buf[ offset + 4 ] = event;
239 offset += NW_SEND_HEADER;
241 for (
int k = 0; k < ncpr; k++) {
242 int nwords = buf[ offset ];
243 int posback_xorchksum = 2;
244 int pos_xorchksum = offset + nwords - posback_xorchksum;
245 if (buf[ offset + 4 ] != CTIME_VAL) {
246 printf(
"[FATAL] data-production error 2 0x%.x", buf[ offset + 4 ]);
251 buf[ pos_xorchksum ] ^= buf[ offset + 3];
252 buf[ offset + 3] = event;
253 buf[ pos_xorchksum ] ^= buf[ offset + 3];
256 offset += NW_RAW_HEADER;
257 for (
int i = 0; i < nhslb ; i++) {
258 if ((buf[ offset + 1 ] & 0xffff0000) != 0xffaa0000) {
259 printf(
"[FATAL] data-production error 3 : 0x%.x hslb %d cpr %d\n", buf[ offset ], i, k);
263 buf[ offset + 1 ] = 0xffaa0000 + (
event & 0xffff);
264 buf[ offset + 3 ] = event;
267 int* crc_buf = buf + offset + 2;
268 int crc_nwords = nwords_per_fee + 5;
269 unsigned short temp_crc16 = CalcCRC16LittleEndian(0xffff, crc_buf, crc_nwords);
270 buf[ offset + NW_B2L_HEADER + nwords_per_fee + 1 ] = ((
event & 0x0000ffff) << 16) | temp_crc16;
274 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
276 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
279 offset += NW_RAW_TRAILER;
280 unsigned int xor_chksum = 0;
281 unsigned int xor_chksum2 = 0;
287 int sender(
int sender_id,
int run_no,
int nwords_per_fee,
int ncpr,
int nhslb)
292 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
293 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
295 int buff[total_words];
300 int port_to = 30000 + sender_id;
301 struct sockaddr_in servaddr;
302 struct pollfd client[NUM_CLIENTS_PER_THREAD + 1];
323 struct sockaddr_in sock_listen;
324 sock_listen.sin_family = AF_INET;
326 sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
328 socklen_t addrlen =
sizeof(sock_listen);
329 sock_listen.sin_port = htons(port_to);
330 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
333 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)
sizeof(flags));
335 perror(
"Failed to set REUSEADDR");
338 if (bind(fd_listen, (
struct sockaddr*)&sock_listen,
sizeof(
struct sockaddr)) < 0) {
339 printf(
"[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
340 port_to); fflush(stdout);
345 sprintf(cmdline,
"/usr/sbin/ss -ap | grep %d", port_to);
346 if ((fp = popen(cmdline,
"r")) == NULL) {
347 printf(
"[WARNING] Failed to run %s\n", cmdline);
349 while (fgets(buf, 256, fp) != NULL) {
350 printf(
"[ERROR] Failed to bind. output of ss(port %d) : %s\n", port_to, buf); fflush(stdout);
355 sprintf(err_buf,
"[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
356 strerror(errno), port_to);
357 printf(
"%s\n", err_buf); fflush(stdout);
363 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)
sizeof(val1));
365 if (listen(fd_listen, backlog) < 0) {
367 sprintf(err_buf,
"Failed in listen(%s). Exting...", strerror(errno));
368 printf(
"%s\n", err_buf); fflush(stdout);
377 struct sockaddr_in sock_accept;
379 printf(
"[DEBUG] Accepting... : port %d\n", port_to);
382 if ((fd_accept = accept(fd_listen, (
struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
384 sprintf(err_buf,
"[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
385 printf(
"%s\n", err_buf); fflush(stdout);
390 printf(
"[DEBUG] Accepted.\n"); fflush(stdout);
393 struct timeval timeout;
396 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)
sizeof(timeout));
398 char err_buf[500] =
"[FATAL] Failed to set TIMEOUT. Exiting...";
399 printf(
"%s\n", err_buf); fflush(stdout);
411 printf(
"Connection(port %d) accepted\n", port_to); fflush(stdout);
413 double init_time = getTimeSec();
414 double prev_time = init_time;
416 unsigned long long int cnt = 0;
417 unsigned long long int prev_cnt = 0;
418 unsigned long long int start_cnt = 3000;
422 int j = 0; j < MAX_EVENT; j++
439 if ((ret = write(fd_accept, buff, total_words *
sizeof(
int))) <= 0) {
440 printf(
"[FATAL] Return value %d\n", ret);
447 if (cnt == start_cnt) init_time = getTimeSec();
448 if (cnt % 1000000 == 1) {
449 if (cnt > start_cnt) {
450 double cur_time = getTimeSec();
451 printf(
"run %d evt %llu time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
454 cur_time - init_time,
455 NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - prev_time),
456 (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
457 NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - init_time),
458 (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
461 prev_time = cur_time;
473 int main(
int argc,
char** argv)
476 printf(
"###################################################\n");
478 printf(
"# PCIe40 data after reduction (#define REDUCED_DATA) #\n");
480 printf(
"# PCIe40 data before reduction (//#define REDUCED_DATA) #\n");
482 printf(
"###################################################\n");
485 printf(
"Usage : %s <node ID> <run#> <nwords of det. buf per FEE> <# of CPR per COPPER> <# of HSLBs>\n", argv[ 0 ]);
492 unsigned int node_id = 0;
493 sscanf(argv[1],
"0x%x", &node_id);
495 int run_no = atoi(argv[2]);
496 int nwords_per_fee = atoi(argv[3]);
497 int ncpr = atoi(argv[4]);
498 int nhslb = atoi(argv[5]);
503 for (
int i = 0; i < NUM_CLIENTS; i++) {
504 data_1[i] =
new unsigned int[100000];
505 data_2[i] =
new unsigned int[100000];
511 std::thread sender0(sender, 0, run_no, nwords_per_fee, ncpr, nhslb);
512 std::thread sender1(sender, 1, run_no, nwords_per_fee, ncpr, nhslb);
513 std::thread sender2(sender, 2, run_no, nwords_per_fee, ncpr, nhslb);
514 std::thread sender3(sender, 3, run_no, nwords_per_fee, ncpr, nhslb);
515 std::thread sender4(sender, 4, run_no, nwords_per_fee, ncpr, nhslb);
522 unsigned int node_id = 0;
523 sscanf(argv[1],
"0x%x", &node_id);
525 int run_no = atoi(argv[2]);
526 int nwords_per_fee = atoi(argv[3]);
527 int ncpr = atoi(argv[4]);
528 int nhslb = atoi(argv[5]);
530 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
531 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
533 printf(
"TET %d %d %d %d %d\n ", NW_SEND_HEADER + NW_SEND_TRAILER, ncpr,
534 NW_RAW_HEADER, (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb, NW_RAW_TRAILER);
535 int buff[total_words];
540 int temp_ret = fillDataContents(buff, nwords_per_fee, node_id, ncpr, nhslb, run_no);
541 if (temp_ret != total_words) {
542 printf(
"[FATAL] data-production error 1 %d %d\n", total_words, temp_ret);
547 double init_time = getTimeSec();
548 double prev_time = init_time;
550 unsigned long long int cnt = 0;
551 unsigned long long int prev_cnt = 0;
552 unsigned long long int start_cnt = 300000;
557 for (
int j = 0; j < MAX_EVENT; j++)
563 addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
572 for (
int i = 1 ; i <= NUM_CLIENTS ; i++) {
574 if (buffer_id == 0) {
576 std::lock_guard<std::mutex> lock(mtx1_ch[i]);
577 memcpy(data_1[i], buff, total_words *
sizeof(
unsigned int))
581 std::lock_guard<std::mutex> lock(mtx2_ch[i]);
582 memcpy(data_2[i], buff, total_words *
sizeof(
unsigned int))
592 if (buffer_id == 0) {
600 if (cnt == start_cnt) init_time = getTimeSec();
601 if (cnt % 10000 == 1) {
602 if (cnt > start_cnt) {
603 double cur_time = getTimeSec();
604 printf(
"run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
607 cur_time - init_time,
608 NUM_CLIENTS * (cnt - prev_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - prev_time),
609 (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
610 NUM_CLIENTS * (cnt - start_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - init_time),
611 (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
614 prev_time = cur_time;
630 for (
int i = 0; i < NUM_CLIENTS; i++) {
int main(int argc, char **argv)
Run all tests.