1 #ifdef USE_DUMMY_DATA_THREADS
5 #include <netinet/in.h>
6 #include <netinet/tcp.h>
8 #include <sys/socket.h>
13 #include <arpa/inet.h>
29 #define NUM_CLIENTS_PER_THREAD 1
34 #define NW_SEND_HEADER 6
35 #define NW_SEND_TRAILER 2
37 #define NW_RAW_HEADER 8
38 #define NW_RAW_TRAILER 4
41 #define NW_B2L_HEADER 3
42 #define NW_B2L_TRAILER 2
44 #define NW_B2L_HEADER 7
45 #define NW_B2L_TRAILER 3
48 #define CTIME_VAL 0x12345601
52 unsigned int* data_1[NUM_CLIENTS];
53 unsigned int* data_2[NUM_CLIENTS];
55 std::mutex mtx1_ch[NUM_CLIENTS];
56 std::mutex mtx2_ch[NUM_CLIENTS];
58 unsigned short CalcCRC16LittleEndian(
unsigned short crc16,
const int buf[],
int nwords)
63 sprintf(err_buf,
"nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
64 nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
65 printf(
"%s", err_buf); fflush(stdout);
66 string err_str = err_buf;
70 const unsigned short CRC16Table0x1021[ 256 ] = {
71 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
72 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
73 0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
74 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
75 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
76 0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
77 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
78 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
80 0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
81 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
82 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
83 0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
84 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
85 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
86 0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
87 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
89 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
90 0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
91 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
92 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
93 0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
94 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
95 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
96 0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
98 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
99 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
100 0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
101 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
102 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
103 0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
104 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
105 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
108 int cnt = 0, nints = 0;
110 while (nwords != 0) {
112 unsigned char temp_buf = *((
unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
113 crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
115 if ((cnt % 4) == 3) {
133 gettimeofday(&t, NULL);
134 return (t.tv_sec + t.tv_usec * 1.e-6 - 1417570000.);
137 int fillDataContents(
int* buf,
int nwords_per_fee,
unsigned int node_id,
int ncpr,
int nhslb,
int run)
139 int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
140 ncpr * (NW_RAW_HEADER +
141 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
146 buf[ offset + 0 ] = nwords;
147 buf[ offset + 1 ] = 6;
148 buf[ offset + 2 ] = (1 << 16) | ncpr;
149 unsigned int exp_run = run << 8;
150 buf[ offset + 3 ] = exp_run;
151 buf[ offset + 5 ] = node_id;
152 offset += NW_SEND_HEADER;
154 for (
int k = 0; k < ncpr; k++) {
155 int top_pos = offset;
159 int cpr_nwords = NW_RAW_HEADER +
160 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
162 int finesse_nwords = nwords_per_fee + NW_B2L_HEADER + NW_B2L_TRAILER;
163 unsigned int ctime = CTIME_VAL;
164 unsigned int utime = 0x98765432;
166 buf[ offset + 0 ] = cpr_nwords;
168 buf[ offset + 1 ] = 0x7f7f020c;
170 buf[ offset + 1 ] = 0x7f7f820c;
172 buf[ offset + 2 ] = exp_run;
173 printf(
"run_no %d\n", exp_run); fflush(stdout);
174 buf[ offset + 4 ] = ctime;
175 buf[ offset + 5 ] = utime;
176 buf[ offset + 6 ] = node_id + k;
177 buf[ offset + 7 ] = 0x34567890;
178 offset += NW_RAW_HEADER;
180 for (
int i = 0; i < nhslb ; i++) {
182 buf[ offset + 0 ] = nwords_per_fee + 3;
183 buf[ offset + 1 ] = 0xffaa0000;
184 buf[ offset + 2 ] = ctime;
186 buf[ offset + 0 ] = nwords_per_fee + 7;
187 buf[ offset + 1 ] = 0xffaa0000;
188 buf[ offset + 3 ] = ctime;
189 buf[ offset + 4 ] = utime;
190 buf[ offset + 5 ] = exp_run;
191 buf[ offset + 6 ] = ctime;
193 offset += NW_B2L_HEADER;
195 for (
int j = offset; j < offset + nwords_per_fee; j++) {
198 offset += nwords_per_fee;
202 buf[ offset + 1 ] = 0xff550000;
204 buf[ offset ] = ctime;
205 buf[ offset + 1 ] = 0;
206 buf[ offset + 2 ] = 0xff550000;
209 offset += NW_B2L_TRAILER;
212 buf[ offset + 1 ] = 0x0;
213 buf[ offset + 2 ] = 0x0;
214 buf[ offset + 3 ] = 0x7fff0006;
215 offset += NW_RAW_TRAILER;
220 buf[ offset + 1 ] = 0x7fff0000;
221 offset += NW_SEND_TRAILER;
227 inline void addEvent(
int* buf,
int nwords_per_fee,
unsigned int event,
int ncpr,
int nhslb)
232 buf[ offset + 4 ] = event;
233 offset += NW_SEND_HEADER;
235 for (
int k = 0; k < ncpr; k++) {
236 int nwords = buf[ offset ];
237 int posback_xorchksum = 2;
238 int pos_xorchksum = offset + nwords - posback_xorchksum;
239 prev_offset = offset;
240 if (buf[ offset + 4 ] != CTIME_VAL) {
241 printf(
"[FATAL] data-production error 2 0x%.x", buf[ offset + 4 ]);
246 buf[ pos_xorchksum ] ^= buf[ offset + 3];
247 buf[ offset + 3] = event;
248 buf[ pos_xorchksum ] ^= buf[ offset + 3];
251 offset += NW_RAW_HEADER;
252 for (
int i = 0; i < nhslb ; i++) {
253 if ((buf[ offset + 1 ] & 0xffff0000) != 0xffaa0000) {
254 printf(
"[FATAL] data-production error 3 : 0x%.x hslb %d cpr %d\n", buf[ offset ], i, k);
258 buf[ offset + 1 ] = 0xffaa0000 + (
event & 0xffff);
259 buf[ offset + 3 ] = event;
262 int* crc_buf = buf + offset + 2;
263 int crc_nwords = nwords_per_fee + 5;
264 unsigned short temp_crc16 = CalcCRC16LittleEndian(0xffff, crc_buf, crc_nwords);
265 buf[ offset + NW_B2L_HEADER + nwords_per_fee + 1 ] = ((
event & 0x0000ffff) << 16) | temp_crc16;
269 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
271 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
274 offset += NW_RAW_TRAILER;
275 unsigned int xor_chksum = 0;
276 unsigned int xor_chksum2 = 0;
282 int sender(
int sender_id,
int run_no,
int nwords_per_fee,
int ncpr,
int nhslb)
287 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
288 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
290 int buff[total_words];
295 int port_to = 30000 + sender_id;
297 struct sockaddr_in servaddr;
298 struct pollfd client[NUM_CLIENTS_PER_THREAD + 1];
319 struct sockaddr_in sock_listen;
320 sock_listen.sin_family = AF_INET;
322 sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
324 socklen_t addrlen =
sizeof(sock_listen);
325 sock_listen.sin_port = htons(port_to);
326 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
329 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)
sizeof(flags));
331 perror(
"Failed to set REUSEADDR");
334 if (bind(fd_listen, (
struct sockaddr*)&sock_listen,
sizeof(
struct sockaddr)) < 0) {
335 printf(
"[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
336 port_to); fflush(stdout);
341 sprintf(cmdline,
"/usr/sbin/ss -ap | grep %d", port_to);
342 if ((fp = popen(cmdline,
"r")) == NULL) {
343 printf(
"[WARNING] Failed to run %s\n", cmdline);
345 while (fgets(buf, 256, fp) != NULL) {
346 printf(
"[ERROR] Failed to bind. output of ss(port %d) : %s\n", port_to, buf); fflush(stdout);
351 sprintf(err_buf,
"[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
352 strerror(errno), port_to);
353 printf(
"%s\n", err_buf); fflush(stdout);
359 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)
sizeof(val1));
361 if (listen(fd_listen, backlog) < 0) {
363 sprintf(err_buf,
"Failed in listen(%s). Exting...", strerror(errno));
364 printf(
"%s\n", err_buf); fflush(stdout);
373 struct sockaddr_in sock_accept;
375 printf(
"[DEBUG] Accepting... : port %d\n", port_to);
378 if ((fd_accept = accept(fd_listen, (
struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
380 sprintf(err_buf,
"[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
381 printf(
"%s\n", err_buf); fflush(stdout);
386 printf(
"[DEBUG] Accepted.\n"); fflush(stdout);
389 struct timeval timeout;
392 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)
sizeof(timeout));
394 char err_buf[500] =
"[FATAL] Failed to set TIMEOUT. Exiting...";
395 printf(
"%s\n", err_buf); fflush(stdout);
407 printf(
"Connection(port %d) accepted\n", port_to); fflush(stdout);
409 double init_time = getTimeSec();
410 double prev_time = init_time;
412 unsigned long long int cnt = 0;
413 unsigned long long int prev_cnt = 0;
414 unsigned long long int start_cnt = 3000;
418 int j = 0; j < MAX_EVENT; j++
435 if ((ret = write(fd_accept, buff, total_words *
sizeof(
int))) <= 0) {
436 printf(
"[FATAL] Return value %d\n", ret);
443 if (cnt == start_cnt) init_time = getTimeSec();
444 if (cnt % 1000000 == 1) {
445 if (cnt > start_cnt) {
446 double cur_time = getTimeSec();
447 printf(
"run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
450 cur_time - init_time,
451 NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - prev_time),
452 (cnt - prev_cnt) / (cur_time - prev_time) / 1000. ,
453 NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - init_time),
454 (cnt - start_cnt) / (cur_time - init_time) / 1000. , total_words);
457 prev_time = cur_time;
469 int main(
int argc,
char** argv)
472 printf(
"###################################################\n");
474 printf(
"# PCIe40 data after reduction (#define REDUCED_DATA) #\n");
476 printf(
"# PCIe40 data before reduction (//#define REDUCED_DATA) #\n");
478 printf(
"###################################################\n");
481 printf(
"Usage : %s <node ID> <run#> <nwords of det. buf per FEE> <# of CPR per COPPER> <# of HSLBs>\n", argv[ 0 ]);
488 unsigned int node_id = 0;
489 sscanf(argv[1],
"0x%x", &node_id);
491 int run_no = atoi(argv[2]);
492 int nwords_per_fee = atoi(argv[3]);
493 int ncpr = atoi(argv[4]);
494 int nhslb = atoi(argv[5]);
499 for (
int i = 0; i < NUM_CLIENTS; i++) {
500 data_1[i] =
new unsigned int[100000];
501 data_2[i] =
new unsigned int[100000];
507 std::thread sender0(sender, 0, run_no, nwords_per_fee, ncpr, nhslb);
508 std::thread sender1(sender, 1, run_no, nwords_per_fee, ncpr, nhslb);
509 std::thread sender2(sender, 2, run_no, nwords_per_fee, ncpr, nhslb);
510 std::thread sender3(sender, 3, run_no, nwords_per_fee, ncpr, nhslb);
511 std::thread sender4(sender, 4, run_no, nwords_per_fee, ncpr, nhslb);
518 unsigned int node_id = 0;
519 sscanf(argv[1],
"0x%x", &node_id);
521 int run_no = atoi(argv[2]);
522 int nwords_per_fee = atoi(argv[3]);
523 int ncpr = atoi(argv[4]);
524 int nhslb = atoi(argv[5]);
526 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
527 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
529 printf(
"TET %d %d %d %d %d\n ", NW_SEND_HEADER + NW_SEND_TRAILER, ncpr,
530 NW_RAW_HEADER, (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb, NW_RAW_TRAILER);
531 int buff[total_words];
536 int temp_ret = fillDataContents(buff, nwords_per_fee, node_id, ncpr, nhslb, run_no);
537 if (temp_ret != total_words) {
538 printf(
"[FATAL] data-production error 1 %d %d\n", total_words, temp_ret);
543 double init_time = getTimeSec();
544 double prev_time = init_time;
546 unsigned long long int cnt = 0;
547 unsigned long long int prev_cnt = 0;
548 unsigned long long int start_cnt = 300000;
553 for (
int j = 0; j < MAX_EVENT; j++)
559 addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
568 for (
int i = 1 ; i <= NUM_CLIENTS ; i++) {
570 if (buffer_id == 0) {
572 std::lock_guard<std::mutex> lock(mtx1_ch[i]);
573 memcpy(data_1[i], buff, total_words *
sizeof(
unsigned int))
577 std::lock_guard<std::mutex> lock(mtx2_ch[i]);
578 memcpy(data_2[i], buff, total_words *
sizeof(
unsigned int))
588 if (buffer_id == 0) {
596 if (cnt == start_cnt) init_time = getTimeSec();
597 if (cnt % 10000 == 1) {
598 if (cnt > start_cnt) {
599 double cur_time = getTimeSec();
600 printf(
"run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
603 cur_time - init_time,
604 NUM_CLIENTS * (cnt - prev_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - prev_time),
605 (cnt - prev_cnt) / (cur_time - prev_time) / 1000. ,
606 NUM_CLIENTS * (cnt - start_cnt)*total_words *
sizeof(
int) / 1000000. / (cur_time - init_time),
607 (cnt - start_cnt) / (cur_time - init_time) / 1000. , total_words);
610 prev_time = cur_time;
626 for (
int i = 0; i < NUM_CLIENTS; i++) {