Belle II Software  release-08-01-10
dummy_data_threads.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #ifdef USE_DUMMY_DATA_THREADS
9 #include <iostream>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <netinet/in.h>
13 #include <netinet/tcp.h>
14 #include <sys/types.h>
15 #include <sys/socket.h>
16 #include <sys/time.h>
17 #include <time.h>
18 #include <string.h>
19 #include <unistd.h>
20 #include <arpa/inet.h>
21 #include <limits.h>
22 #include <signal.h>
23 #include <poll.h>
24 #include <netdb.h>
25 #include <thread>
26 #include <mutex>
27 
28 
29 
31 // Parameter for data-contents
33 //#define REDUCED_DATA
34 //#define CRC_ON
35 #define LISTENQ 1
36 #define NUM_CLIENTS_PER_THREAD 1
37 #define NUM_CLIENTS 5
38 //#define MAX_EVENT 1000
39 
40 // Format (PCIe40)
41 #define NW_SEND_HEADER 6
42 #define NW_SEND_TRAILER 2
43 
44 #define NW_RAW_HEADER 8
45 #define NW_RAW_TRAILER 4
46 
47 #ifdef REDUCED_DATA
48 #define NW_B2L_HEADER 3
49 #define NW_B2L_TRAILER 2
50 #else
51 #define NW_B2L_HEADER 7
52 #define NW_B2L_TRAILER 3
53 #endif
54 
55 #define CTIME_VAL 0x12345601
56 
57 using namespace std;
58 
59 unsigned int* data_1[NUM_CLIENTS];
60 unsigned int* data_2[NUM_CLIENTS];
61 
62 std::mutex mtx1_ch[NUM_CLIENTS];
63 std::mutex mtx2_ch[NUM_CLIENTS];
64 
65 unsigned short CalcCRC16LittleEndian(unsigned short crc16, const int buf[], int nwords)
66 {
67 
68  if (nwords < 0) {
69  char err_buf[500];
70  sprintf(err_buf, "nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
71  nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
72  printf("%s", err_buf); fflush(stdout);
73  string err_str = err_buf;
74  throw (err_str);
75  }
76 
77  const unsigned short CRC16Table0x1021[ 256 ] = {
78  0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
79  0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
80  0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
81  0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
82  0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
83  0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
84  0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
85  0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
86 
87  0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
88  0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
89  0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
90  0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
91  0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
92  0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
93  0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
94  0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
95 
96  0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
97  0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
98  0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
99  0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
100  0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
101  0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
102  0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
103  0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
104 
105  0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
106  0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
107  0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
108  0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
109  0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
110  0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
111  0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
112  0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
113  };
114 
115  int cnt = 0, nints = 0;
116  // printf("### %.8x %.4x\n", buf[ 0 ], crc16);
117  while (nwords != 0) {
118 
119  unsigned char temp_buf = *((unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
120  crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
121  // printf("%.2x %.4x\n", temp_buf, crc16);
122  if ((cnt % 4) == 3) {
123  nwords--;
124  nints++;
125  // printf("### %.8x\n", buf[ nints ] );
126  }
127 
128  cnt++;
129  }
130 
131 
132  return crc16;
133 
134 }
135 
136 
137 double getTimeSec()
138 {
139  struct timeval t;
140  gettimeofday(&t, NULL);
141  return (t.tv_sec + t.tv_usec * 1.e-6 - 1417570000.);
142 }
143 
144 int fillDataContents(int* buf, int nwords_per_fee, unsigned int node_id, int ncpr, int nhslb, int run)
145 {
146  int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
147  ncpr * (NW_RAW_HEADER +
148  (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
149  + NW_RAW_TRAILER);
150 
151  // Send Header
152  int offset = 0;
153  buf[ offset + 0 ] = nwords;
154  buf[ offset + 1 ] = 6;
155  buf[ offset + 2 ] = (1 << 16) | ncpr;
156  unsigned int exp_run = run << 8;
157  buf[ offset + 3 ] = exp_run;
158  buf[ offset + 5 ] = node_id;
159  offset += NW_SEND_HEADER;
160 
161  for (int k = 0; k < ncpr; k++) {
162  int top_pos = offset;
163  //
164  // RawHeader
165  //
166  int cpr_nwords = NW_RAW_HEADER +
167  (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
168  + NW_RAW_TRAILER;
169  int finesse_nwords = nwords_per_fee + NW_B2L_HEADER + NW_B2L_TRAILER;
170  unsigned int ctime = CTIME_VAL;
171  unsigned int utime = 0x98765432;
172 
173  buf[ offset + 0 ] = cpr_nwords;
174 #ifdef REDUCED_DATA
175  buf[ offset + 1 ] = 0x7f7f020c;
176 #else
177  buf[ offset + 1 ] = 0x7f7f820c;
178 #endif
179  buf[ offset + 2 ] = exp_run;
180  printf("run_no %d\n", exp_run); fflush(stdout);
181  buf[ offset + 4 ] = ctime;
182  buf[ offset + 5 ] = utime;
183  buf[ offset + 6 ] = node_id + k;
184  buf[ offset + 7 ] = 0x34567890;
185  offset += NW_RAW_HEADER;
186 
187  for (int i = 0; i < nhslb ; i++) {
188 #ifdef REDUCED_DATA
189  buf[ offset + 0 ] = nwords_per_fee + 3;
190  buf[ offset + 1 ] = 0xffaa0000;
191  buf[ offset + 2 ] = ctime;
192 #else
193  buf[ offset + 0 ] = nwords_per_fee + 7;
194  buf[ offset + 1 ] = 0xffaa0000;
195  buf[ offset + 3 ] = ctime;
196  buf[ offset + 4 ] = utime;
197  buf[ offset + 5 ] = exp_run;
198  buf[ offset + 6 ] = ctime;
199 #endif
200  offset += NW_B2L_HEADER;
201 
202  for (int j = offset; j < offset + nwords_per_fee; j++) {
203  buf[ j ] = rand();
204  }
205  offset += nwords_per_fee;
206 
207 #ifdef REDUCED_DATA
208  buf[ offset ] = 0;
209  buf[ offset + 1 ] = 0xff550000;
210 #else
211  buf[ offset ] = ctime;
212  buf[ offset + 1 ] = 0;
213  buf[ offset + 2 ] = 0xff550000;
214 #endif
215 
216  offset += NW_B2L_TRAILER;
217  }
218  buf[ offset ] = 0x0; // error bits
219  buf[ offset + 1 ] = 0x0; // error slots
220  buf[ offset + 2 ] = 0x0; // XOR checksum
221  buf[ offset + 3 ] = 0x7fff0006;
222  offset += NW_RAW_TRAILER;
223  }
224 
225  // Send trailer
226  buf[ offset ] = 0;
227  buf[ offset + 1 ] = 0x7fff0000;
228  offset += NW_SEND_TRAILER;
229  return offset;
230 }
231 
232 
233 
234 inline void addEvent(int* buf, int nwords_per_fee, unsigned int event, int ncpr, int nhslb)
235 //inline void addEvent(int* buf, int nwords, unsigned int event)
236 {
237  int offset = 0;
238  buf[ offset + 4 ] = event;
239  offset += NW_SEND_HEADER;
240 
241  for (int k = 0; k < ncpr; k++) {
242  int nwords = buf[ offset ];
243  int posback_xorchksum = 2;
244  int pos_xorchksum = offset + nwords - posback_xorchksum;
245  if (buf[ offset + 4 ] != CTIME_VAL) {
246  printf("[FATAL] data-production error 2 0x%.x", buf[ offset + 4 ]);
247  fflush(stdout);
248  exit(1);
249  }
250  // RawHeader
251  buf[ pos_xorchksum ] ^= buf[ offset + 3];
252  buf[ offset + 3] = event;
253  buf[ pos_xorchksum ] ^= buf[ offset + 3];
254 
255  // COPPER header
256  offset += NW_RAW_HEADER;
257  for (int i = 0; i < nhslb ; i++) {
258  if ((buf[ offset + 1 ] & 0xffff0000) != 0xffaa0000) {
259  printf("[FATAL] data-production error 3 : 0x%.x hslb %d cpr %d\n", buf[ offset ], i, k);
260  fflush(stdout);
261  exit(1);
262  }
263  buf[ offset + 1 ] = 0xffaa0000 + (event & 0xffff);
264  buf[ offset + 3 ] = event;
265 
266 #ifdef CRC_ON
267  int* crc_buf = buf + offset + 2; // 1 => size of HSLB B2L header
268  int crc_nwords = nwords_per_fee + 5; // 5 => size of FEE B2L header
269  unsigned short temp_crc16 = CalcCRC16LittleEndian(0xffff, crc_buf, crc_nwords);
270  buf[ offset + NW_B2L_HEADER + nwords_per_fee + 1 ] = ((event & 0x0000ffff) << 16) | temp_crc16;
271 #endif
272 
273 #ifdef REDUCED_DATA
274  offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
275 #else
276  offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
277 #endif
278  }
279  offset += NW_RAW_TRAILER;
280  unsigned int xor_chksum = 0;
281  unsigned int xor_chksum2 = 0;
282  }
283 
284 }
285 
286 
287 int sender(int sender_id, int run_no, int nwords_per_fee, int ncpr, int nhslb)
288 {
289  //
290  // data
291  //
292  int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
293  ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
294  NW_RAW_TRAILER);
295  int buff[total_words];
296 
297  //
298  // network connection
299  //
300  int port_to = 30000 + sender_id;
301  struct sockaddr_in servaddr;
302  struct pollfd client[NUM_CLIENTS_PER_THREAD + 1];
303 
304  //
305  // Connect to cprtb01
306  //
307  // string hostname_local;
308  // struct hostent* host;
309  // host = gethostbyname(hostname_local.c_str());
310  // if (host == NULL) {
311  // char err_buf[500];
312  // sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
313  // hostname_local.c_str(), strerror(errno));
314  // printf("%s\n", err_buf); fflush(stdout);
315  // // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
316  // exit(1);
317  // }
318 
319  //
320  // Bind and listen
321  //
322  int fd_listen;
323  struct sockaddr_in sock_listen;
324  sock_listen.sin_family = AF_INET;
325  // sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
326  sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
327 
328  socklen_t addrlen = sizeof(sock_listen);
329  sock_listen.sin_port = htons(port_to);
330  fd_listen = socket(PF_INET, SOCK_STREAM, 0);
331 
332  int flags = 1;
333  int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)sizeof(flags));
334  if (ret < 0) {
335  perror("Failed to set REUSEADDR");
336  }
337 
338  if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
339  printf("[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
340  port_to); fflush(stdout);
341  // Check the process occupying the port 30000.
342  FILE* fp;
343  char buf[256];
344  char cmdline[500];
345  sprintf(cmdline, "/usr/sbin/ss -ap | grep %d", port_to);
346  if ((fp = popen(cmdline, "r")) == NULL) {
347  printf("[WARNING] Failed to run %s\n", cmdline);
348  }
349  while (fgets(buf, 256, fp) != NULL) {
350  printf("[ERROR] Failed to bind. output of ss(port %d) : %s\n", port_to, buf); fflush(stdout);
351  }
352  // Error message
353  fclose(fp);
354  char err_buf[500];
355  sprintf(err_buf, "[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
356  strerror(errno), port_to);
357  printf("%s\n", err_buf); fflush(stdout);
358  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
359  exit(1);
360  }
361 
362  int val1 = 0;
363  setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)sizeof(val1));
364  int backlog = 1;
365  if (listen(fd_listen, backlog) < 0) {
366  char err_buf[500];
367  sprintf(err_buf, "Failed in listen(%s). Exting...", strerror(errno));
368  printf("%s\n", err_buf); fflush(stdout);
369  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
370  exit(-1);
371  }
372 
373  //
374  // Accept
375  //
376  int fd_accept;
377  struct sockaddr_in sock_accept;
378  // printf("[DEBUG] Accepting... : port %d server %s\n", port_to, hostname_local.c_str());
379  printf("[DEBUG] Accepting... : port %d\n", port_to);
380  fflush(stdout);
381 
382  if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
383  char err_buf[500];
384  sprintf(err_buf, "[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
385  printf("%s\n", err_buf); fflush(stdout);
386  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
387  exit(-1);
388  } else {
389  // B2INFO("Done.");
390  printf("[DEBUG] Accepted.\n"); fflush(stdout);
391 
392  // set timepout option
393  struct timeval timeout;
394  timeout.tv_sec = 1;
395  timeout.tv_usec = 0;
396  ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)sizeof(timeout));
397  if (ret < 0) {
398  char err_buf[500] = "[FATAL] Failed to set TIMEOUT. Exiting...";
399  printf("%s\n", err_buf); fflush(stdout);
400  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
401  exit(-1);
402  }
403  }
404 
405  if (fd_listen) {
406  close(fd_listen);
407  }
408 
409 
410 
411  printf("Connection(port %d) accepted\n", port_to); fflush(stdout);
412 
413  double init_time = getTimeSec();
414  double prev_time = init_time;
415 
416  unsigned long long int cnt = 0;
417  unsigned long long int prev_cnt = 0;
418  unsigned long long int start_cnt = 3000;
419 
420  for (
421 #ifdef MAX_EVENT
422  int j = 0; j < MAX_EVENT; j++
423 #else
424  ;;
425 #endif
426  ) {
427  // addEvent(buff, total_words, cnt);
428  //addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
429  // printf("cnt %d bytes\n", cnt*total_words); fflush(stdout);
430  // sprintf( buff, "event %d dessa", cnt );
431 
432  // for(int i = 0 ; i < total_words ; i++){
433  // printf("%.8x ", buff[ i ]);
434  // if( i % 10 == 9 ) printf("\n");
435  // }
436 
437 
438  int ret = 0;
439  if ((ret = write(fd_accept, buff, total_words * sizeof(int))) <= 0) {
440  printf("[FATAL] Return value %d\n", ret);
441  fflush(stdout);
442  exit(1);
443  }
444 
445  cnt++;
446 
447  if (cnt == start_cnt) init_time = getTimeSec();
448  if (cnt % 1000000 == 1) {
449  if (cnt > start_cnt) {
450  double cur_time = getTimeSec();
451  printf("run %d evt %llu time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
452  run_no,
453  cnt,
454  cur_time - init_time,
455  NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
456  (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
457  NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
458  (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
459 
460  fflush(stdout);
461  prev_time = cur_time;
462  prev_cnt = cnt;
463  } else {
464  // printf("Eve %lld\n", cnt);fflush(stdout);
465  }
466  }
467  }
468 
469  close(fd_accept);
470  return 0;
471 }
472 
473 int main(int argc, char** argv)
474 {
475 
476  printf("###################################################\n");
477 #ifdef REDUCED_DATA
478  printf("# PCIe40 data after reduction (#define REDUCED_DATA) #\n");
479 #else
480  printf("# PCIe40 data before reduction (//#define REDUCED_DATA) #\n");
481 #endif
482  printf("###################################################\n");
483 
484  if (argc != 6) {
485  printf("Usage : %s <node ID> <run#> <nwords of det. buf per FEE> <# of CPR per COPPER> <# of HSLBs>\n", argv[ 0 ]);
486  exit(1);
487  }
488 
489  //
490  // dummy data
491  //
492  unsigned int node_id = 0;
493  sscanf(argv[1], "0x%x", &node_id);
494 
495  int run_no = atoi(argv[2]);
496  int nwords_per_fee = atoi(argv[3]);
497  int ncpr = atoi(argv[4]);
498  int nhslb = atoi(argv[5]);
499 
500  //
501  // buffer for inter-threads communication
502  //
503  for (int i = 0; i < NUM_CLIENTS; i++) {
504  data_1[i] = new unsigned int[100000];
505  data_2[i] = new unsigned int[100000];
506  }
507 
508  //
509  // Make sender threads
510  //
511  std::thread sender0(sender, 0, run_no, nwords_per_fee, ncpr, nhslb);
512  std::thread sender1(sender, 1, run_no, nwords_per_fee, ncpr, nhslb);
513  std::thread sender2(sender, 2, run_no, nwords_per_fee, ncpr, nhslb);
514  std::thread sender3(sender, 3, run_no, nwords_per_fee, ncpr, nhslb);
515  std::thread sender4(sender, 4, run_no, nwords_per_fee, ncpr, nhslb);
516 
517 
518 #ifdef AIUEO
519  //
520  // dummy data
521  //
522  unsigned int node_id = 0;
523  sscanf(argv[1], "0x%x", &node_id);
524 
525  int run_no = atoi(argv[2]);
526  int nwords_per_fee = atoi(argv[3]);
527  int ncpr = atoi(argv[4]);
528  int nhslb = atoi(argv[5]);
529 
530  int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
531  ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
532  NW_RAW_TRAILER);
533  printf("TET %d %d %d %d %d\n ", NW_SEND_HEADER + NW_SEND_TRAILER, ncpr,
534  NW_RAW_HEADER, (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb, NW_RAW_TRAILER);
535  int buff[total_words];
536 
537  //
538  // Prepare header
539  //
540  int temp_ret = fillDataContents(buff, nwords_per_fee, node_id, ncpr, nhslb, run_no);
541  if (temp_ret != total_words) {
542  printf("[FATAL] data-production error 1 %d %d\n", total_words, temp_ret);
543  fflush(stdout);
544  exit(1);
545  }
546 
547  double init_time = getTimeSec();
548  double prev_time = init_time;
549 
550  unsigned long long int cnt = 0;
551  unsigned long long int prev_cnt = 0;
552  unsigned long long int start_cnt = 300000;
553 
554  int buffer_id = 0;
555 
556 #ifdef MAX_EVENT
557  for (int j = 0; j < MAX_EVENT; j++)
558 #else
559  for (;;)
560 #endif
561  {
562  // addEvent(buff, total_words, cnt);
563  addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
564  // printf("cnt %d bytes\n", cnt*total_words); fflush(stdout);
565  // sprintf( buff, "event %d dessa", cnt );
566 
567  // for(int i = 0 ; i < total_words ; i++){
568  // printf("%.8x ", buff[ i ]);
569  // if( i % 10 == 9 ) printf("\n");
570  // }
571 
572  for (int i = 1 ; i <= NUM_CLIENTS ; i++) {
573  int ret = 0;
574  if (buffer_id == 0) {
575  {
576  std::lock_guard<std::mutex> lock(mtx1_ch[i]);
577  memcpy(data_1[i], buff, total_words * sizeof(unsigned int))
578  }
579  } else {
580  {
581  std::lock_guard<std::mutex> lock(mtx2_ch[i]);
582  memcpy(data_2[i], buff, total_words * sizeof(unsigned int))
583  }
584  }
585  // if ((ret = write(client[i].fd, buff, total_words * sizeof(int))) <= 0) {
586  // printf("[FATAL] Return value %d\n", ret);
587  // fflush(stdout);
588  // exit(1);
589  // }
590  }
591 
592  if (buffer_id == 0) {
593  buffer_id = 1;
594  } else {
595  buffer_id = 0;
596  }
597 
598  cnt++;
599 
600  if (cnt == start_cnt) init_time = getTimeSec();
601  if (cnt % 10000 == 1) {
602  if (cnt > start_cnt) {
603  double cur_time = getTimeSec();
604  printf("run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
605  run_no,
606  cnt,
607  cur_time - init_time,
608  NUM_CLIENTS * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
609  (cnt - prev_cnt) / (cur_time - prev_time) / 1000.,
610  NUM_CLIENTS * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
611  (cnt - start_cnt) / (cur_time - init_time) / 1000., total_words);
612 
613  fflush(stdout);
614  prev_time = cur_time;
615  prev_cnt = cnt;
616  } else {
617  // printf("Eve %lld\n", cnt);fflush(stdout);
618  }
619  }
620  }
621 
622 #endif
623 
624  sender0.join();
625  sender1.join();
626  sender2.join();
627  sender3.join();
628  sender4.join();
629 
630  for (int i = 0; i < NUM_CLIENTS; i++) {
631  delete data_1[i];
632  delete data_2[i];
633  }
634 
635 
636 }
637 #else
638 
639 int main()
640 {
641  return 0;
642 }
643 #endif
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91