Belle II Software development
dummy_data_threads.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#ifdef USE_DUMMY_DATA_THREADS
9#include <iostream>
10#include <stdio.h>
11#include <stdlib.h>
12#include <netinet/in.h>
13#include <netinet/tcp.h>
14#include <sys/types.h>
15#include <sys/socket.h>
16#include <sys/time.h>
17#include <time.h>
18#include <string.h>
19#include <unistd.h>
20#include <arpa/inet.h>
21#include <limits.h>
22#include <signal.h>
23#include <poll.h>
24#include <netdb.h>
25#include <thread>
26#include <mutex>
27
28
29
31// Parameter for data-contents
33//#define REDUCED_DATA
34//#define CRC_ON
35#define LISTENQ 1
36#define NUM_CLIENTS_PER_THREAD 1
37#define NUM_CLIENTS 5
38//#define MAX_EVENT 1000
39
40// Format (PCIe40)
41#define NW_SEND_HEADER 6
42#define NW_SEND_TRAILER 2
43
44#define NW_RAW_HEADER 8
45#define NW_RAW_TRAILER 4
46
47#ifdef REDUCED_DATA
48#define NW_B2L_HEADER 3
49#define NW_B2L_TRAILER 2
50#else
51#define NW_B2L_HEADER 7
52#define NW_B2L_TRAILER 3
53#endif
54
55#define CTIME_VAL 0x12345601
56
57using namespace std;
58
59unsigned int* data_1[NUM_CLIENTS];
60unsigned int* data_2[NUM_CLIENTS];
61
62std::mutex mtx1_ch[NUM_CLIENTS];
63std::mutex mtx2_ch[NUM_CLIENTS];
64
65unsigned short CalcCRC16LittleEndian(unsigned short crc16, const int buf[], int nwords)
66{
67
68 if (nwords < 0) {
69 char err_buf[500];
70 sprintf(err_buf, "nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
71 nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
72 printf("%s", err_buf); fflush(stdout);
73 string err_str = err_buf;
74 throw (err_str);
75 }
76
77 const unsigned short CRC16Table0x1021[ 256 ] = {
78 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
79 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
80 0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
81 0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
82 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
83 0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
84 0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
85 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
86
87 0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
88 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
89 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
90 0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
91 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
92 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
93 0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
94 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
95
96 0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
97 0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
98 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
99 0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
100 0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
101 0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
102 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
103 0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
104
105 0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
106 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
107 0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
108 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
109 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
110 0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
111 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
112 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
113 };
114
115 int cnt = 0, nints = 0;
116 // printf("### %.8x %.4x\n", buf[ 0 ], crc16);
117 while (nwords != 0) {
118
119 unsigned char temp_buf = *((unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
120 crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
121 // printf("%.2x %.4x\n", temp_buf, crc16);
122 if ((cnt % 4) == 3) {
123 nwords--;
124 nints++;
125 // printf("### %.8x\n", buf[ nints ] );
126 }
127
128 cnt++;
129 }
130
131
132 return crc16;
133
134}
135
136
137double getTimeSec()
138{
139 struct timeval t;
140 gettimeofday(&t, NULL);
141 return (t.tv_sec + t.tv_usec * 1.e-6 - 1417570000.);
142}
143
144int fillDataContents(int* buf, int nwords_per_fee, unsigned int node_id, int ncpr, int nhslb, int run)
145{
146 int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
147 ncpr * (NW_RAW_HEADER +
148 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
149 + NW_RAW_TRAILER);
150
151 // Send Header
152 int offset = 0;
153 buf[ offset + 0 ] = nwords;
154 buf[ offset + 1 ] = 6;
155 buf[ offset + 2 ] = (1 << 16) | ncpr;
156 unsigned int exp_run = run << 8;
157 buf[ offset + 3 ] = exp_run;
158 buf[ offset + 5 ] = node_id;
159 offset += NW_SEND_HEADER;
160
161 for (int k = 0; k < ncpr; k++) {
162 int top_pos = offset;
163 //
164 // RawHeader
165 //
166 int cpr_nwords = NW_RAW_HEADER +
167 (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
168 + NW_RAW_TRAILER;
169 int finesse_nwords = nwords_per_fee + NW_B2L_HEADER + NW_B2L_TRAILER;
170 unsigned int ctime = CTIME_VAL;
171 unsigned int utime = 0x98765432;
172
173 buf[ offset + 0 ] = cpr_nwords;
174#ifdef REDUCED_DATA
175 buf[ offset + 1 ] = 0x7f7f020c;
176#else
177 buf[ offset + 1 ] = 0x7f7f820c;
178#endif
179 buf[ offset + 2 ] = exp_run;
180 printf("run_no %u\n", exp_run); fflush(stdout);
181 buf[ offset + 4 ] = ctime;
182 buf[ offset + 5 ] = utime;
183 buf[ offset + 6 ] = node_id + k;
184 buf[ offset + 7 ] = 0x34567890;
185 offset += NW_RAW_HEADER;
186
187 for (int i = 0; i < nhslb ; i++) {
188#ifdef REDUCED_DATA
189 buf[ offset + 0 ] = nwords_per_fee + 3;
190 buf[ offset + 1 ] = 0xffaa0000;
191 buf[ offset + 2 ] = ctime;
192#else
193 buf[ offset + 0 ] = nwords_per_fee + 7;
194 buf[ offset + 1 ] = 0xffaa0000;
195 buf[ offset + 3 ] = ctime;
196 buf[ offset + 4 ] = utime;
197 buf[ offset + 5 ] = exp_run;
198 buf[ offset + 6 ] = ctime;
199#endif
200 offset += NW_B2L_HEADER;
201
202 for (int j = offset; j < offset + nwords_per_fee; j++) {
203 buf[ j ] = rand();
204 }
205 offset += nwords_per_fee;
206
207#ifdef REDUCED_DATA
208 buf[ offset ] = 0;
209 buf[ offset + 1 ] = 0xff550000;
210#else
211 buf[ offset ] = ctime;
212 buf[ offset + 1 ] = 0;
213 buf[ offset + 2 ] = 0xff550000;
214#endif
215
216 offset += NW_B2L_TRAILER;
217 }
218 buf[ offset ] = 0x0; // error bits
219 buf[ offset + 1 ] = 0x0; // error slots
220 buf[ offset + 2 ] = 0x0; // XOR checksum
221 buf[ offset + 3 ] = 0x7fff0006;
222 offset += NW_RAW_TRAILER;
223 }
224
225 // Send trailer
226 buf[ offset ] = 0;
227 buf[ offset + 1 ] = 0x7fff0000;
228 offset += NW_SEND_TRAILER;
229 return offset;
230}
231
232
233
234inline void addEvent(int* buf, int nwords_per_fee, unsigned int event, int ncpr, int nhslb)
235//inline void addEvent(int* buf, int nwords, unsigned int event)
236{
237 int offset = 0;
238 buf[ offset + 4 ] = event;
239 offset += NW_SEND_HEADER;
240
241 for (int k = 0; k < ncpr; k++) {
242 int nwords = buf[ offset ];
243 int posback_xorchksum = 2;
244 int pos_xorchksum = offset + nwords - posback_xorchksum;
245 if (buf[ offset + 4 ] != CTIME_VAL) {
246 printf("[FATAL] data-production error 2 0x%.x", buf[ offset + 4 ]);
247 fflush(stdout);
248 exit(1);
249 }
250 // RawHeader
251 buf[ pos_xorchksum ] ^= buf[ offset + 3];
252 buf[ offset + 3] = event;
253 buf[ pos_xorchksum ] ^= buf[ offset + 3];
254
255 // COPPER header
256 offset += NW_RAW_HEADER;
257 for (int i = 0; i < nhslb ; i++) {
258 if ((buf[ offset + 1 ] & 0xffff0000) != 0xffaa0000) {
259 printf("[FATAL] data-production error 3 : 0x%.x hslb %d cpr %d\n", buf[ offset ], i, k);
260 fflush(stdout);
261 exit(1);
262 }
263 buf[ offset + 1 ] = 0xffaa0000 + (event & 0xffff);
264 buf[ offset + 3 ] = event;
265
266#ifdef CRC_ON
267 int* crc_buf = buf + offset + 2; // 1 => size of HSLB B2L header
268 int crc_nwords = nwords_per_fee + 5; // 5 => size of FEE B2L header
269 unsigned short temp_crc16 = CalcCRC16LittleEndian(0xffff, crc_buf, crc_nwords);
270 buf[ offset + NW_B2L_HEADER + nwords_per_fee + 1 ] = ((event & 0x0000ffff) << 16) | temp_crc16;
271#endif
272
273#ifdef REDUCED_DATA
274 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
275#else
276 offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
277#endif
278 }
279 offset += NW_RAW_TRAILER;
280 unsigned int xor_chksum = 0;
281 unsigned int xor_chksum2 = 0;
282 }
283
284}
285
286
287int sender(int sender_id, int run_no, int nwords_per_fee, int ncpr, int nhslb)
288{
289 //
290 // data
291 //
292 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
293 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
294 NW_RAW_TRAILER);
295 int buff[total_words];
296
297 //
298 // network connection
299 //
300 int port_to = 30000 + sender_id;
301 struct sockaddr_in servaddr;
302 struct pollfd client[NUM_CLIENTS_PER_THREAD + 1];
303
304 //
305 // Connect to cprtb01
306 //
307 // string hostname_local;
308 // struct hostent* host;
309 // host = gethostbyname(hostname_local.c_str());
310 // if (host == NULL) {
311 // char err_buf[500];
312 // sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
313 // hostname_local.c_str(), strerror(errno));
314 // printf("%s\n", err_buf); fflush(stdout);
315 // // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
316 // exit(1);
317 // }
318
319 //
320 // Bind and listen
321 //
322 int fd_listen;
323 struct sockaddr_in sock_listen;
324 sock_listen.sin_family = AF_INET;
325 // sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
326 sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
327
328 socklen_t addrlen = sizeof(sock_listen);
329 sock_listen.sin_port = htons(port_to);
330 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
331
332 int flags = 1;
333 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)sizeof(flags));
334 if (ret < 0) {
335 perror("Failed to set REUSEADDR");
336 }
337
338 if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
339 printf("[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
340 port_to); fflush(stdout);
341 // Check the process occupying the port 30000.
342 FILE* fp;
343 char buf[256];
344 char cmdline[500];
345 sprintf(cmdline, "/usr/sbin/ss -ap | grep %d", port_to);
346 if ((fp = popen(cmdline, "r")) == NULL) {
347 printf("[WARNING] Failed to run %s\n", cmdline);
348 }
349 while (fgets(buf, 256, fp) != NULL) {
350 printf("[ERROR] Failed to bind. output of ss(port %d) : %s\n", port_to, buf); fflush(stdout);
351 }
352 // Error message
353 fclose(fp);
354 char err_buf[500];
355 sprintf(err_buf, "[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
356 strerror(errno), port_to);
357 printf("%s\n", err_buf); fflush(stdout);
358 // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
359 exit(1);
360 }
361
362 int val1 = 0;
363 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)sizeof(val1));
364 int backlog = 1;
365 if (listen(fd_listen, backlog) < 0) {
366 char err_buf[500];
367 sprintf(err_buf, "Failed in listen(%s). Exting...", strerror(errno));
368 printf("%s\n", err_buf); fflush(stdout);
369 // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
370 exit(-1);
371 }
372
373 //
374 // Accept
375 //
376 int fd_accept;
377 struct sockaddr_in sock_accept;
378 // printf("[DEBUG] Accepting... : port %d server %s\n", port_to, hostname_local.c_str());
379 printf("[DEBUG] Accepting... : port %d\n", port_to);
380 fflush(stdout);
381
382 if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
383 char err_buf[500];
384 sprintf(err_buf, "[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
385 printf("%s\n", err_buf); fflush(stdout);
386 // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
387 exit(-1);
388 } else {
389 // B2INFO("Done.");
390 printf("[DEBUG] Accepted.\n"); fflush(stdout);
391
392 // set timepout option
393 struct timeval timeout;
394 timeout.tv_sec = 1;
395 timeout.tv_usec = 0;
396 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)sizeof(timeout));
397 if (ret < 0) {
398 char err_buf[500] = "[FATAL] Failed to set TIMEOUT. Exiting...";
399 printf("%s\n", err_buf); fflush(stdout);
400 // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
401 exit(-1);
402 }
403 }
404
405 if (fd_listen) {
406 close(fd_listen);
407 }
408
409
410
411 printf("Connection(port %d) accepted\n", port_to); fflush(stdout);
412
413 double init_time = getTimeSec();
414 double prev_time = init_time;
415
416 unsigned long long int cnt = 0;
417 unsigned long long int prev_cnt = 0;
418 unsigned long long int start_cnt = 3000;
419
420 for (
421#ifdef MAX_EVENT
422 int j = 0; j < MAX_EVENT; j++
423#else
424 ;;
425#endif
426 ) {
427 // addEvent(buff, total_words, cnt);
428 //addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
429 // printf("cnt %d bytes\n", cnt*total_words); fflush(stdout);
430 // sprintf( buff, "event %d dessa", cnt );
431
432 // for(int i = 0 ; i < total_words ; i++){
433 // printf("%.8x ", buff[ i ]);
434 // if( i % 10 == 9 ) printf("\n");
435 // }
436
437
438 if ((int r = write(fd_accept, buff, total_words * sizeof(int))) <= 0) {
439 printf("[FATAL] Return value %d\n", r);
440 fflush(stdout);
441 exit(1);
442 }
443
444 cnt++;
445
446 if (cnt == start_cnt) init_time = getTimeSec();
447 if (cnt % 1000000 == 1) {
448 if (cnt > start_cnt) {
449 double cur_time = getTimeSec();
450 printf("run %d evt %llu time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
451 run_no,
452 cnt,
453 cur_time - init_time,
454 NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
455 (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
456 NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
457 (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
458
459 fflush(stdout);
460 prev_time = cur_time;
461 prev_cnt = cnt;
462 } else {
463 // printf("Eve %lld\n", cnt);fflush(stdout);
464 }
465 }
466 }
467
468 close(fd_accept);
469 return 0;
470}
471
472int main(int argc, char** argv)
473{
474
475 printf("###################################################\n");
476#ifdef REDUCED_DATA
477 printf("# PCIe40 data after reduction (#define REDUCED_DATA) #\n");
478#else
479 printf("# PCIe40 data before reduction (//#define REDUCED_DATA) #\n");
480#endif
481 printf("###################################################\n");
482
483 if (argc != 6) {
484 printf("Usage : %s <node ID> <run#> <nwords of det. buf per FEE> <# of CPR per COPPER> <# of HSLBs>\n", argv[ 0 ]);
485 exit(1);
486 }
487
488 //
489 // dummy data
490 //
491 unsigned int node_id = 0;
492 sscanf(argv[1], "0x%x", &node_id);
493
494 int run_no = atoi(argv[2]);
495 int nwords_per_fee = atoi(argv[3]);
496 int ncpr = atoi(argv[4]);
497 int nhslb = atoi(argv[5]);
498
499 //
500 // buffer for inter-threads communication
501 //
502 for (int i = 0; i < NUM_CLIENTS; i++) {
503 data_1[i] = new unsigned int[100000];
504 data_2[i] = new unsigned int[100000];
505 }
506
507 //
508 // Make sender threads
509 //
510 std::thread sender0(sender, 0, run_no, nwords_per_fee, ncpr, nhslb);
511 std::thread sender1(sender, 1, run_no, nwords_per_fee, ncpr, nhslb);
512 std::thread sender2(sender, 2, run_no, nwords_per_fee, ncpr, nhslb);
513 std::thread sender3(sender, 3, run_no, nwords_per_fee, ncpr, nhslb);
514 std::thread sender4(sender, 4, run_no, nwords_per_fee, ncpr, nhslb);
515
516
517#ifdef AIUEO
518 //
519 // dummy data
520 //
521 unsigned int node_id = 0;
522 sscanf(argv[1], "0x%x", &node_id);
523
524 int run_no = atoi(argv[2]);
525 int nwords_per_fee = atoi(argv[3]);
526 int ncpr = atoi(argv[4]);
527 int nhslb = atoi(argv[5]);
528
529 int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
530 ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
531 NW_RAW_TRAILER);
532 printf("TET %d %d %d %d %d\n ", NW_SEND_HEADER + NW_SEND_TRAILER, ncpr,
533 NW_RAW_HEADER, (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb, NW_RAW_TRAILER);
534 int buff[total_words];
535
536 //
537 // Prepare header
538 //
539 int temp_ret = fillDataContents(buff, nwords_per_fee, node_id, ncpr, nhslb, run_no);
540 if (temp_ret != total_words) {
541 printf("[FATAL] data-production error 1 %d %d\n", total_words, temp_ret);
542 fflush(stdout);
543 exit(1);
544 }
545
546 double init_time = getTimeSec();
547 double prev_time = init_time;
548
549 unsigned long long int cnt = 0;
550 unsigned long long int prev_cnt = 0;
551 unsigned long long int start_cnt = 300000;
552
553 int buffer_id = 0;
554
555#ifdef MAX_EVENT
556 for (int j = 0; j < MAX_EVENT; j++)
557#else
558 for (;;)
559#endif
560 {
561 // addEvent(buff, total_words, cnt);
562 addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
563 // printf("cnt %d bytes\n", cnt*total_words); fflush(stdout);
564 // sprintf( buff, "event %d dessa", cnt );
565
566 // for(int i = 0 ; i < total_words ; i++){
567 // printf("%.8x ", buff[ i ]);
568 // if( i % 10 == 9 ) printf("\n");
569 // }
570
571 for (int i = 1 ; i <= NUM_CLIENTS ; i++) {
572 int ret = 0;
573 if (buffer_id == 0) {
574 {
575 std::lock_guard<std::mutex> lock(mtx1_ch[i]);
576 memcpy(data_1[i], buff, total_words * sizeof(unsigned int))
577 }
578 } else {
579 {
580 std::lock_guard<std::mutex> lock(mtx2_ch[i]);
581 memcpy(data_2[i], buff, total_words * sizeof(unsigned int))
582 }
583 }
584 // if ((ret = write(client[i].fd, buff, total_words * sizeof(int))) <= 0) {
585 // printf("[FATAL] Return value %d\n", ret);
586 // fflush(stdout);
587 // exit(1);
588 // }
589 }
590
591 if (buffer_id == 0) {
592 buffer_id = 1;
593 } else {
594 buffer_id = 0;
595 }
596
597 cnt++;
598
599 if (cnt == start_cnt) init_time = getTimeSec();
600 if (cnt % 10000 == 1) {
601 if (cnt > start_cnt) {
602 double cur_time = getTimeSec();
603 printf("run %d evt %llu time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
604 run_no,
605 cnt,
606 cur_time - init_time,
607 NUM_CLIENTS * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
608 (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
609 NUM_CLIENTS * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
610 (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
611
612 fflush(stdout);
613 prev_time = cur_time;
614 prev_cnt = cnt;
615 } else {
616 // printf("Eve %lld\n", cnt);fflush(stdout);
617 }
618 }
619 }
620
621#endif
622
623 sender0.join();
624 sender1.join();
625 sender2.join();
626 sender3.join();
627 sender4.join();
628
629 for (int i = 0; i < NUM_CLIENTS; i++) {
630 delete data_1[i];
631 delete data_2[i];
632 }
633
634
635}
636#else
637
638int main()
639{
640 return 0;
641}
642#endif
STL namespace.