8#ifdef USE_DUMMY_DATA_THREADS
12#include <netinet/in.h>
13#include <netinet/tcp.h>
15#include <sys/socket.h>
36#define NUM_CLIENTS_PER_THREAD 1
41#define NW_SEND_HEADER 6
42#define NW_SEND_TRAILER 2
44#define NW_RAW_HEADER 8
45#define NW_RAW_TRAILER 4
48#define NW_B2L_HEADER 3
49#define NW_B2L_TRAILER 2
51#define NW_B2L_HEADER 7
52#define NW_B2L_TRAILER 3
55#define CTIME_VAL 0x12345601
59unsigned int* data_1[NUM_CLIENTS];
60unsigned int* data_2[NUM_CLIENTS];
62std::mutex mtx1_ch[NUM_CLIENTS];
63std::mutex mtx2_ch[NUM_CLIENTS];
65unsigned short CalcCRC16LittleEndian(
unsigned short crc16,
const int buf[],
int nwords)
70 sprintf(err_buf,
"nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
71 nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
72 printf(
"%s", err_buf); fflush(stdout);
73 string err_str = err_buf;
77 const unsigned short CRC16Table0x1021[ 256 ] = {
78 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
79 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
80 0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
81 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
82 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
83 0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
84 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
85 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
87 0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
88 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
89 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
90 0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
91 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
92 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
93 0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
94 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
96 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
97 0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
98 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
99 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
100 0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
101 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
102 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
103 0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
105 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
106 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
107 0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
108 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
109 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
110 0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
111 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
112 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
115 int cnt = 0, nints = 0;
117 while (nwords != 0) {
119 unsigned char temp_buf = *((
unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
120 crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
122 if ((cnt % 4) == 3) {
140 gettimeofday(&t, NULL);
141 return (t.tv_sec + t.tv_usec * 1.e-6 - 1417570000.);
144int fillDataContents(
int* buf,
int nwords_per_fee,
unsigned int node_id,
int ncpr,
int nhslb,
int run)
146 int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
147 ncpr * (NW_RAW_HEADER +
148 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
153 buf[ offset + 0 ] = nwords;
154 buf[ offset + 1 ] = 6;
155 buf[ offset + 2 ] = (1 << 16) | ncpr;
156 unsigned int exp_run = run << 8;
157 buf[ offset + 3 ] = exp_run;
158 buf[ offset + 5 ] = node_id;
159 offset += NW_SEND_HEADER;
161 for (
int k = 0; k < ncpr; k++) {
162 int top_pos = offset;
166 int cpr_nwords = NW_RAW_HEADER +
167 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
169 int finesse_nwords = nwords_per_fee + NW_B2L_HEADER + NW_B2L_TRAILER;
170 unsigned int ctime = CTIME_VAL;
171 unsigned int utime = 0x98765432;
173 buf[ offset + 0 ] = cpr_nwords;
175 buf[ offset + 1 ] = 0x7f7f020c;
177 buf[ offset + 1 ] = 0x7f7f820c;
179 buf[ offset + 2 ] = exp_run;
180 printf(
"run_no %u\n", exp_run); fflush(stdout);
181 buf[ offset + 4 ] = ctime;
182 buf[ offset + 5 ] = utime;
183 buf[ offset + 6 ] = node_id + k;
184 buf[ offset + 7 ] = 0x34567890;
185 offset += NW_RAW_HEADER;
187 for (
int i = 0; i < nhslb ; i++) {
189 buf[ offset + 0 ] = nwords_per_fee + 3;
190 buf[ offset + 1 ] = 0xffaa0000;
191 buf[ offset + 2 ] = ctime;
193 buf[ offset + 0 ] = nwords_per_fee + 7;
194 buf[ offset + 1 ] = 0xffaa0000;
195 buf[ offset + 3 ] = ctime;
196 buf[ offset + 4 ] = utime;
197 buf[ offset + 5 ] = exp_run;
198 buf[ offset + 6 ] = ctime;
200 offset += NW_B2L_HEADER;
202 for (
int j = offset; j < offset + nwords_per_fee; j++) {
205 offset += nwords_per_fee;
209 buf[ offset + 1 ] = 0xff550000;
211 buf[ offset ] = ctime;
212 buf[ offset + 1 ] = 0;
213 buf[ offset + 2 ] = 0xff550000;
216 offset += NW_B2L_TRAILER;
219 buf[ offset + 1 ] = 0x0;
220 buf[ offset + 2 ] = 0x0;
221 buf[ offset + 3 ] = 0x7fff0006;
222 offset += NW_RAW_TRAILER;
227 buf[ offset + 1 ] = 0x7fff0000;
228 offset += NW_SEND_TRAILER;
234inline void addEvent(
int* buf,
int nwords_per_fee,
unsigned int event,
int ncpr,
int nhslb)
238 buf[ offset + 4 ] = event;
239 offset += NW_SEND_HEADER;
241 for (
int k = 0; k < ncpr; k++) {
242 int nwords = buf[ offset ];
243 int posback_xorchksum = 2;
244 int pos_xorchksum = offset + nwords - posback_xorchksum;
245 if (buf[ offset + 4 ] != CTIME_VAL) {
246 printf(
"[FATAL] data-production error 2 0x%.x", buf[ offset + 4 ]);
251 buf[ pos_xorchksum ] ^= buf[ offset + 3];
252 buf[ offset + 3] = event;
253 buf[ pos_xorchksum ] ^= buf[ offset + 3];
256 offset += NW_RAW_HEADER;
257 for (
int i = 0; i < nhslb ; i++) {
258 if ((buf[ offset + 1 ] & 0xffff0000) != 0xffaa0000) {
259 printf(
"[FATAL] data-production error 3 : 0x%.x hslb %d cpr %d\n", buf[ offset ], i, k);
263 buf[ offset + 1 ] = 0xffaa0000 + (
event & 0xffff);
264 buf[ offset + 3 ] = event;
267 int* crc_buf = buf + offset + 2;
268 int crc_nwords = nwords_per_fee + 5;
269 unsigned short temp_crc16 = CalcCRC16LittleEndian(0xffff, crc_buf, crc_nwords);
270 buf[ offset + NW_B2L_HEADER + nwords_per_fee + 1 ] = ((
event & 0x0000ffff) << 16) | temp_crc16;
274 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
276 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
279 offset += NW_RAW_TRAILER;
280 unsigned int xor_chksum = 0;
281 unsigned int xor_chksum2 = 0;
287int sender(
int sender_id,
int run_no,
int nwords_per_fee,
int ncpr,
int nhslb)
292 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
293 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
295 int buff[total_words];
300 int port_to = 30000 + sender_id;
301 struct sockaddr_in servaddr;
302 struct pollfd client[NUM_CLIENTS_PER_THREAD + 1];
323 struct sockaddr_in sock_listen;
324 sock_listen.sin_family = AF_INET;
326 sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
328 socklen_t addrlen =
sizeof(sock_listen);
329 sock_listen.sin_port = htons(port_to);
330 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
333 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)
sizeof(flags));
335 perror(
"Failed to set REUSEADDR");
338 if (bind(fd_listen, (
struct sockaddr*)&sock_listen,
sizeof(
struct sockaddr)) < 0) {
339 printf(
"[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
340 port_to); fflush(stdout);
345 sprintf(cmdline,
"/usr/sbin/ss -ap | grep %d", port_to);
346 if ((fp = popen(cmdline,
"r")) == NULL) {
347 printf(
"[WARNING] Failed to run %s\n", cmdline);
349 while (fgets(buf, 256, fp) != NULL) {
350 printf(
"[ERROR] Failed to bind. output of ss(port %d) : %s\n", port_to, buf); fflush(stdout);
355 sprintf(err_buf,
"[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
356 strerror(errno), port_to);
357 printf(
"%s\n", err_buf); fflush(stdout);
363 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)
sizeof(val1));
365 if (listen(fd_listen, backlog) < 0) {
367 sprintf(err_buf,
"Failed in listen(%s). Exting...", strerror(errno));
368 printf(
"%s\n", err_buf); fflush(stdout);
377 struct sockaddr_in sock_accept;
379 printf(
"[DEBUG] Accepting... : port %d\n", port_to);
382 if ((fd_accept = accept(fd_listen, (
struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
384 sprintf(err_buf,
"[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
385 printf(
"%s\n", err_buf); fflush(stdout);
390 printf(
"[DEBUG] Accepted.\n"); fflush(stdout);
393 struct timeval timeout;
396 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)
sizeof(timeout));
398 char err_buf[500] =
"[FATAL] Failed to set TIMEOUT. Exiting...";
399 printf(
"%s\n", err_buf); fflush(stdout);
411 printf(
"Connection(port %d) accepted\n", port_to); fflush(stdout);
413 double init_time = getTimeSec();
414 double prev_time = init_time;
416 unsigned long long int cnt = 0;
417 unsigned long long int prev_cnt = 0;
418 unsigned long long int start_cnt = 3000;
422 int j = 0; j < MAX_EVENT; j++
438 if ((
int r = write(fd_accept, buff, total_words *
sizeof(
int))) <= 0) {
439 printf(
"[FATAL] Return value %d\n", r);
446 if (cnt == start_cnt) init_time = getTimeSec();
447 if (cnt % 1000000 == 1) {
448 if (cnt > start_cnt) {
449 double cur_time = getTimeSec();
450 printf(
"run %d evt %llu time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
453 cur_time - init_time,
454 NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - prev_time),
455 (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
456 NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - init_time),
457 (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
460 prev_time = cur_time;
472int main(
int argc,
char** argv)
475 printf(
"###################################################\n");
477 printf(
"# PCIe40 data after reduction (#define REDUCED_DATA) #\n");
479 printf(
"# PCIe40 data before reduction (//#define REDUCED_DATA) #\n");
481 printf(
"###################################################\n");
484 printf(
"Usage : %s <node ID> <run#> <nwords of det. buf per FEE> <# of CPR per COPPER> <# of HSLBs>\n", argv[ 0 ]);
491 unsigned int node_id = 0;
492 sscanf(argv[1],
"0x%x", &node_id);
494 int run_no = atoi(argv[2]);
495 int nwords_per_fee = atoi(argv[3]);
496 int ncpr = atoi(argv[4]);
497 int nhslb = atoi(argv[5]);
502 for (
int i = 0; i < NUM_CLIENTS; i++) {
503 data_1[i] =
new unsigned int[100000];
504 data_2[i] =
new unsigned int[100000];
510 std::thread sender0(sender, 0, run_no, nwords_per_fee, ncpr, nhslb);
511 std::thread sender1(sender, 1, run_no, nwords_per_fee, ncpr, nhslb);
512 std::thread sender2(sender, 2, run_no, nwords_per_fee, ncpr, nhslb);
513 std::thread sender3(sender, 3, run_no, nwords_per_fee, ncpr, nhslb);
514 std::thread sender4(sender, 4, run_no, nwords_per_fee, ncpr, nhslb);
521 unsigned int node_id = 0;
522 sscanf(argv[1],
"0x%x", &node_id);
524 int run_no = atoi(argv[2]);
525 int nwords_per_fee = atoi(argv[3]);
526 int ncpr = atoi(argv[4]);
527 int nhslb = atoi(argv[5]);
529 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
530 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
532 printf(
"TET %d %d %d %d %d\n ", NW_SEND_HEADER + NW_SEND_TRAILER, ncpr,
533 NW_RAW_HEADER, (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb, NW_RAW_TRAILER);
534 int buff[total_words];
539 int temp_ret = fillDataContents(buff, nwords_per_fee, node_id, ncpr, nhslb, run_no);
540 if (temp_ret != total_words) {
541 printf(
"[FATAL] data-production error 1 %d %d\n", total_words, temp_ret);
546 double init_time = getTimeSec();
547 double prev_time = init_time;
549 unsigned long long int cnt = 0;
550 unsigned long long int prev_cnt = 0;
551 unsigned long long int start_cnt = 300000;
556 for (
int j = 0; j < MAX_EVENT; j++)
562 addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
571 for (
int i = 1 ; i <= NUM_CLIENTS ; i++) {
573 if (buffer_id == 0) {
575 std::lock_guard<std::mutex> lock(mtx1_ch[i]);
576 memcpy(data_1[i], buff, total_words *
sizeof(
unsigned int))
580 std::lock_guard<std::mutex> lock(mtx2_ch[i]);
581 memcpy(data_2[i], buff, total_words *
sizeof(
unsigned int))
591 if (buffer_id == 0) {
599 if (cnt == start_cnt) init_time = getTimeSec();
600 if (cnt % 10000 == 1) {
601 if (cnt > start_cnt) {
602 double cur_time = getTimeSec();
603 printf(
"run %d evt %llu time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
606 cur_time - init_time,
607 NUM_CLIENTS * (cnt - prev_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - prev_time),
608 (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
609 NUM_CLIENTS * (cnt - start_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - init_time),
610 (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
613 prev_time = cur_time;
629 for (
int i = 0; i < NUM_CLIENTS; i++) {