Belle II Software  release-05-02-19
dummy_data_threads.cc
1 #ifdef USE_DUMMY_DATA_THREADS
2 #include <iostream>
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <netinet/in.h>
6 #include <netinet/tcp.h>
7 #include <sys/types.h>
8 #include <sys/socket.h>
9 #include <sys/time.h>
10 #include <time.h>
11 #include <string.h>
12 #include <unistd.h>
13 #include <arpa/inet.h>
14 #include <limits.h>
15 #include <signal.h>
16 #include <poll.h>
17 #include <netdb.h>
18 #include <thread>
19 #include <mutex>
20 
21 
22 
24 // Parameter for data-contents
26 //#define REDUCED_DATA
27 //#define CRC_ON
28 #define LISTENQ 1
29 #define NUM_CLIENTS_PER_THREAD 1
30 #define NUM_CLIENTS 5
31 //#define MAX_EVENT 1000
32 
33 // Format (PCIe40)
34 #define NW_SEND_HEADER 6
35 #define NW_SEND_TRAILER 2
36 
37 #define NW_RAW_HEADER 8
38 #define NW_RAW_TRAILER 4
39 
40 #ifdef REDUCED_DATA
41 #define NW_B2L_HEADER 3
42 #define NW_B2L_TRAILER 2
43 #else
44 #define NW_B2L_HEADER 7
45 #define NW_B2L_TRAILER 3
46 #endif
47 
48 #define CTIME_VAL 0x12345601
49 
50 using namespace std;
51 
52 unsigned int* data_1[NUM_CLIENTS];
53 unsigned int* data_2[NUM_CLIENTS];
54 
55 std::mutex mtx1_ch[NUM_CLIENTS];
56 std::mutex mtx2_ch[NUM_CLIENTS];
57 
58 unsigned short CalcCRC16LittleEndian(unsigned short crc16, const int buf[], int nwords)
59 {
60 
61  if (nwords < 0) {
62  char err_buf[500];
63  sprintf(err_buf, "nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
64  nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
65  printf("%s", err_buf); fflush(stdout);
66  string err_str = err_buf;
67  throw (err_str);
68  }
69 
70  const unsigned short CRC16Table0x1021[ 256 ] = {
71  0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
72  0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
73  0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
74  0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
75  0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
76  0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
77  0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
78  0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
79 
80  0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
81  0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
82  0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
83  0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
84  0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
85  0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
86  0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
87  0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
88 
89  0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
90  0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
91  0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
92  0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
93  0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
94  0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
95  0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
96  0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
97 
98  0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
99  0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
100  0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
101  0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
102  0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
103  0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
104  0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
105  0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
106  };
107 
108  int cnt = 0, nints = 0;
109  // printf("### %.8x %.4x\n", buf[ 0 ], crc16);
110  while (nwords != 0) {
111 
112  unsigned char temp_buf = *((unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
113  crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
114  // printf("%.2x %.4x\n", temp_buf, crc16);
115  if ((cnt % 4) == 3) {
116  nwords--;
117  nints++;
118  // printf("### %.8x\n", buf[ nints ] );
119  }
120 
121  cnt++;
122  }
123 
124 
125  return crc16;
126 
127 }
128 
129 
130 double getTimeSec()
131 {
132  struct timeval t;
133  gettimeofday(&t, NULL);
134  return (t.tv_sec + t.tv_usec * 1.e-6 - 1417570000.);
135 }
136 
137 int fillDataContents(int* buf, int nwords_per_fee, unsigned int node_id, int ncpr, int nhslb, int run)
138 {
139  int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
140  ncpr * (NW_RAW_HEADER +
141  (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
142  + NW_RAW_TRAILER);
143 
144  // Send Header
145  int offset = 0;
146  buf[ offset + 0 ] = nwords;
147  buf[ offset + 1 ] = 6;
148  buf[ offset + 2 ] = (1 << 16) | ncpr;
149  unsigned int exp_run = run << 8;
150  buf[ offset + 3 ] = exp_run;
151  buf[ offset + 5 ] = node_id;
152  offset += NW_SEND_HEADER;
153 
154  for (int k = 0; k < ncpr; k++) {
155  int top_pos = offset;
156  //
157  // RawHeader
158  //
159  int cpr_nwords = NW_RAW_HEADER +
160  (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
161  + NW_RAW_TRAILER;
162  int finesse_nwords = nwords_per_fee + NW_B2L_HEADER + NW_B2L_TRAILER;
163  unsigned int ctime = CTIME_VAL;
164  unsigned int utime = 0x98765432;
165 
166  buf[ offset + 0 ] = cpr_nwords;
167 #ifdef REDUCED_DATA
168  buf[ offset + 1 ] = 0x7f7f020c;
169 #else
170  buf[ offset + 1 ] = 0x7f7f820c;
171 #endif
172  buf[ offset + 2 ] = exp_run;
173  printf("run_no %d\n", exp_run); fflush(stdout);
174  buf[ offset + 4 ] = ctime;
175  buf[ offset + 5 ] = utime;
176  buf[ offset + 6 ] = node_id + k;
177  buf[ offset + 7 ] = 0x34567890;
178  offset += NW_RAW_HEADER;
179 
180  for (int i = 0; i < nhslb ; i++) {
181 #ifdef REDUCED_DATA
182  buf[ offset + 0 ] = nwords_per_fee + 3;
183  buf[ offset + 1 ] = 0xffaa0000;
184  buf[ offset + 2 ] = ctime;
185 #else
186  buf[ offset + 0 ] = nwords_per_fee + 7;
187  buf[ offset + 1 ] = 0xffaa0000;
188  buf[ offset + 3 ] = ctime;
189  buf[ offset + 4 ] = utime;
190  buf[ offset + 5 ] = exp_run;
191  buf[ offset + 6 ] = ctime;
192 #endif
193  offset += NW_B2L_HEADER;
194 
195  for (int j = offset; j < offset + nwords_per_fee; j++) {
196  buf[ j ] = rand();
197  }
198  offset += nwords_per_fee;
199 
200 #ifdef REDUCED_DATA
201  buf[ offset ] = 0;
202  buf[ offset + 1 ] = 0xff550000;
203 #else
204  buf[ offset ] = ctime;
205  buf[ offset + 1 ] = 0;
206  buf[ offset + 2 ] = 0xff550000;
207 #endif
208 
209  offset += NW_B2L_TRAILER;
210  }
211  buf[ offset ] = 0x0; // error bits
212  buf[ offset + 1 ] = 0x0; // error slots
213  buf[ offset + 2 ] = 0x0; // XOR checksum
214  buf[ offset + 3 ] = 0x7fff0006;
215  offset += NW_RAW_TRAILER;
216  }
217 
218  // Send trailer
219  buf[ offset ] = 0;
220  buf[ offset + 1 ] = 0x7fff0000;
221  offset += NW_SEND_TRAILER;
222  return offset;
223 }
224 
225 
226 
227 inline void addEvent(int* buf, int nwords_per_fee, unsigned int event, int ncpr, int nhslb)
228 //inline void addEvent(int* buf, int nwords, unsigned int event)
229 {
230  int offset = 0;
231  int prev_offset;
232  buf[ offset + 4 ] = event;
233  offset += NW_SEND_HEADER;
234 
235  for (int k = 0; k < ncpr; k++) {
236  int nwords = buf[ offset ];
237  int posback_xorchksum = 2;
238  int pos_xorchksum = offset + nwords - posback_xorchksum;
239  prev_offset = offset;
240  if (buf[ offset + 4 ] != CTIME_VAL) {
241  printf("[FATAL] data-production error 2 0x%.x", buf[ offset + 4 ]);
242  fflush(stdout);
243  exit(1);
244  }
245  // RawHeader
246  buf[ pos_xorchksum ] ^= buf[ offset + 3];
247  buf[ offset + 3] = event;
248  buf[ pos_xorchksum ] ^= buf[ offset + 3];
249 
250  // COPPER header
251  offset += NW_RAW_HEADER;
252  for (int i = 0; i < nhslb ; i++) {
253  if ((buf[ offset + 1 ] & 0xffff0000) != 0xffaa0000) {
254  printf("[FATAL] data-production error 3 : 0x%.x hslb %d cpr %d\n", buf[ offset ], i, k);
255  fflush(stdout);
256  exit(1);
257  }
258  buf[ offset + 1 ] = 0xffaa0000 + (event & 0xffff);
259  buf[ offset + 3 ] = event;
260 
261 #ifdef CRC_ON
262  int* crc_buf = buf + offset + 2; // 1 => size of HSLB B2L header
263  int crc_nwords = nwords_per_fee + 5; // 5 => size of FEE B2L header
264  unsigned short temp_crc16 = CalcCRC16LittleEndian(0xffff, crc_buf, crc_nwords);
265  buf[ offset + NW_B2L_HEADER + nwords_per_fee + 1 ] = ((event & 0x0000ffff) << 16) | temp_crc16;
266 #endif
267 
268 #ifdef REDUCED_DATA
269  offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
270 #else
271  offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
272 #endif
273  }
274  offset += NW_RAW_TRAILER;
275  unsigned int xor_chksum = 0;
276  unsigned int xor_chksum2 = 0;
277  }
278 
279 }
280 
281 
282 int sender(int sender_id, int run_no, int nwords_per_fee, int ncpr, int nhslb)
283 {
284  //
285  // data
286  //
287  int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
288  ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
289  NW_RAW_TRAILER);
290  int buff[total_words];
291 
292  //
293  // network connection
294  //
295  int port_to = 30000 + sender_id;
296  int listenfd;
297  struct sockaddr_in servaddr;
298  struct pollfd client[NUM_CLIENTS_PER_THREAD + 1];
299 
300  //
301  // Connect to cprtb01
302  //
303  // string hostname_local;
304  // struct hostent* host;
305  // host = gethostbyname(hostname_local.c_str());
306  // if (host == NULL) {
307  // char err_buf[500];
308  // sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
309  // hostname_local.c_str(), strerror(errno));
310  // printf("%s\n", err_buf); fflush(stdout);
311  // // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
312  // exit(1);
313  // }
314 
315  //
316  // Bind and listen
317  //
318  int fd_listen;
319  struct sockaddr_in sock_listen;
320  sock_listen.sin_family = AF_INET;
321  // sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
322  sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
323 
324  socklen_t addrlen = sizeof(sock_listen);
325  sock_listen.sin_port = htons(port_to);
326  fd_listen = socket(PF_INET, SOCK_STREAM, 0);
327 
328  int flags = 1;
329  int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)sizeof(flags));
330  if (ret < 0) {
331  perror("Failed to set REUSEADDR");
332  }
333 
334  if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
335  printf("[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
336  port_to); fflush(stdout);
337  // Check the process occupying the port 30000.
338  FILE* fp;
339  char buf[256];
340  char cmdline[500];
341  sprintf(cmdline, "/usr/sbin/ss -ap | grep %d", port_to);
342  if ((fp = popen(cmdline, "r")) == NULL) {
343  printf("[WARNING] Failed to run %s\n", cmdline);
344  }
345  while (fgets(buf, 256, fp) != NULL) {
346  printf("[ERROR] Failed to bind. output of ss(port %d) : %s\n", port_to, buf); fflush(stdout);
347  }
348  // Error message
349  fclose(fp);
350  char err_buf[500];
351  sprintf(err_buf, "[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
352  strerror(errno), port_to);
353  printf("%s\n", err_buf); fflush(stdout);
354  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
355  exit(1);
356  }
357 
358  int val1 = 0;
359  setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)sizeof(val1));
360  int backlog = 1;
361  if (listen(fd_listen, backlog) < 0) {
362  char err_buf[500];
363  sprintf(err_buf, "Failed in listen(%s). Exting...", strerror(errno));
364  printf("%s\n", err_buf); fflush(stdout);
365  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
366  exit(-1);
367  }
368 
369  //
370  // Accept
371  //
372  int fd_accept;
373  struct sockaddr_in sock_accept;
374  // printf("[DEBUG] Accepting... : port %d server %s\n", port_to, hostname_local.c_str());
375  printf("[DEBUG] Accepting... : port %d\n", port_to);
376  fflush(stdout);
377 
378  if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
379  char err_buf[500];
380  sprintf(err_buf, "[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
381  printf("%s\n", err_buf); fflush(stdout);
382  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
383  exit(-1);
384  } else {
385  // B2INFO("Done.");
386  printf("[DEBUG] Accepted.\n"); fflush(stdout);
387 
388  // set timepout option
389  struct timeval timeout;
390  timeout.tv_sec = 1;
391  timeout.tv_usec = 0;
392  ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)sizeof(timeout));
393  if (ret < 0) {
394  char err_buf[500] = "[FATAL] Failed to set TIMEOUT. Exiting...";
395  printf("%s\n", err_buf); fflush(stdout);
396  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
397  exit(-1);
398  }
399  }
400 
401  if (fd_listen) {
402  close(fd_listen);
403  }
404 
405 
406 
407  printf("Connection(port %d) accepted\n", port_to); fflush(stdout);
408 
409  double init_time = getTimeSec();
410  double prev_time = init_time;
411 
412  unsigned long long int cnt = 0;
413  unsigned long long int prev_cnt = 0;
414  unsigned long long int start_cnt = 3000;
415 
416  for (
417 #ifdef MAX_EVENT
418  int j = 0; j < MAX_EVENT; j++
419 #else
420  ;;
421 #endif
422  ) {
423  // addEvent(buff, total_words, cnt);
424  //addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
425  // printf("cnt %d bytes\n", cnt*total_words); fflush(stdout);
426  // sprintf( buff, "event %d dessa", cnt );
427 
428  // for(int i = 0 ; i < total_words ; i++){
429  // printf("%.8x ", buff[ i ]);
430  // if( i % 10 == 9 ) printf("\n");
431  // }
432 
433 
434  int ret = 0;
435  if ((ret = write(fd_accept, buff, total_words * sizeof(int))) <= 0) {
436  printf("[FATAL] Return value %d\n", ret);
437  fflush(stdout);
438  exit(1);
439  }
440 
441  cnt++;
442 
443  if (cnt == start_cnt) init_time = getTimeSec();
444  if (cnt % 1000000 == 1) {
445  if (cnt > start_cnt) {
446  double cur_time = getTimeSec();
447  printf("run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
448  run_no,
449  cnt,
450  cur_time - init_time,
451  NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
452  (cnt - prev_cnt) / (cur_time - prev_time) / 1000. ,
453  NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
454  (cnt - start_cnt) / (cur_time - init_time) / 1000. , total_words);
455 
456  fflush(stdout);
457  prev_time = cur_time;
458  prev_cnt = cnt;
459  } else {
460  // printf("Eve %lld\n", cnt);fflush(stdout);
461  }
462  }
463  }
464 
465  close(fd_accept);
466  return 0;
467 }
468 
469 int main(int argc, char** argv)
470 {
471 
472  printf("###################################################\n");
473 #ifdef REDUCED_DATA
474  printf("# PCIe40 data after reduction (#define REDUCED_DATA) #\n");
475 #else
476  printf("# PCIe40 data before reduction (//#define REDUCED_DATA) #\n");
477 #endif
478  printf("###################################################\n");
479 
480  if (argc != 6) {
481  printf("Usage : %s <node ID> <run#> <nwords of det. buf per FEE> <# of CPR per COPPER> <# of HSLBs>\n", argv[ 0 ]);
482  exit(1);
483  }
484 
485  //
486  // dummy data
487  //
488  unsigned int node_id = 0;
489  sscanf(argv[1], "0x%x", &node_id);
490 
491  int run_no = atoi(argv[2]);
492  int nwords_per_fee = atoi(argv[3]);
493  int ncpr = atoi(argv[4]);
494  int nhslb = atoi(argv[5]);
495 
496  //
497  // buffer for inter-threads communication
498  //
499  for (int i = 0; i < NUM_CLIENTS; i++) {
500  data_1[i] = new unsigned int[100000];
501  data_2[i] = new unsigned int[100000];
502  }
503 
504  //
505  // Make sender threads
506  //
507  std::thread sender0(sender, 0, run_no, nwords_per_fee, ncpr, nhslb);
508  std::thread sender1(sender, 1, run_no, nwords_per_fee, ncpr, nhslb);
509  std::thread sender2(sender, 2, run_no, nwords_per_fee, ncpr, nhslb);
510  std::thread sender3(sender, 3, run_no, nwords_per_fee, ncpr, nhslb);
511  std::thread sender4(sender, 4, run_no, nwords_per_fee, ncpr, nhslb);
512 
513 
514 #ifdef AIUEO
515  //
516  // dummy data
517  //
518  unsigned int node_id = 0;
519  sscanf(argv[1], "0x%x", &node_id);
520 
521  int run_no = atoi(argv[2]);
522  int nwords_per_fee = atoi(argv[3]);
523  int ncpr = atoi(argv[4]);
524  int nhslb = atoi(argv[5]);
525 
526  int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
527  ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
528  NW_RAW_TRAILER);
529  printf("TET %d %d %d %d %d\n ", NW_SEND_HEADER + NW_SEND_TRAILER, ncpr,
530  NW_RAW_HEADER, (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb, NW_RAW_TRAILER);
531  int buff[total_words];
532 
533  //
534  // Prepare header
535  //
536  int temp_ret = fillDataContents(buff, nwords_per_fee, node_id, ncpr, nhslb, run_no);
537  if (temp_ret != total_words) {
538  printf("[FATAL] data-production error 1 %d %d\n", total_words, temp_ret);
539  fflush(stdout);
540  exit(1);
541  }
542 
543  double init_time = getTimeSec();
544  double prev_time = init_time;
545 
546  unsigned long long int cnt = 0;
547  unsigned long long int prev_cnt = 0;
548  unsigned long long int start_cnt = 300000;
549 
550  int buffer_id = 0;
551 
552 #ifdef MAX_EVENT
553  for (int j = 0; j < MAX_EVENT; j++)
554 #else
555  for (;;)
556 #endif
557  {
558  // addEvent(buff, total_words, cnt);
559  addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
560  // printf("cnt %d bytes\n", cnt*total_words); fflush(stdout);
561  // sprintf( buff, "event %d dessa", cnt );
562 
563  // for(int i = 0 ; i < total_words ; i++){
564  // printf("%.8x ", buff[ i ]);
565  // if( i % 10 == 9 ) printf("\n");
566  // }
567 
568  for (int i = 1 ; i <= NUM_CLIENTS ; i++) {
569  int ret = 0;
570  if (buffer_id == 0) {
571  {
572  std::lock_guard<std::mutex> lock(mtx1_ch[i]);
573  memcpy(data_1[i], buff, total_words * sizeof(unsigned int))
574  }
575  } else {
576  {
577  std::lock_guard<std::mutex> lock(mtx2_ch[i]);
578  memcpy(data_2[i], buff, total_words * sizeof(unsigned int))
579  }
580  }
581  // if ((ret = write(client[i].fd, buff, total_words * sizeof(int))) <= 0) {
582  // printf("[FATAL] Return value %d\n", ret);
583  // fflush(stdout);
584  // exit(1);
585  // }
586  }
587 
588  if (buffer_id == 0) {
589  buffer_id = 1;
590  } else {
591  buffer_id = 0;
592  }
593 
594  cnt++;
595 
596  if (cnt == start_cnt) init_time = getTimeSec();
597  if (cnt % 10000 == 1) {
598  if (cnt > start_cnt) {
599  double cur_time = getTimeSec();
600  printf("run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
601  run_no,
602  cnt,
603  cur_time - init_time,
604  NUM_CLIENTS * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
605  (cnt - prev_cnt) / (cur_time - prev_time) / 1000. ,
606  NUM_CLIENTS * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
607  (cnt - start_cnt) / (cur_time - init_time) / 1000. , total_words);
608 
609  fflush(stdout);
610  prev_time = cur_time;
611  prev_cnt = cnt;
612  } else {
613  // printf("Eve %lld\n", cnt);fflush(stdout);
614  }
615  }
616  }
617 
618 #endif
619 
620  sender0.join();
621  sender1.join();
622  sender2.join();
623  sender3.join();
624  sender4.join();
625 
626  for (int i = 0; i < NUM_CLIENTS; i++) {
627  delete data_1[i];
628  delete data_2[i];
629  }
630 
631 
632 }
633 #else
634 
635 int main()
636 {
637  return 0;
638 }
639 #endif
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77