Belle II Software  release-06-01-15
dummy_data_threads.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #ifdef USE_DUMMY_DATA_THREADS
9 #include <iostream>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <netinet/in.h>
13 #include <netinet/tcp.h>
14 #include <sys/types.h>
15 #include <sys/socket.h>
16 #include <sys/time.h>
17 #include <time.h>
18 #include <string.h>
19 #include <unistd.h>
20 #include <arpa/inet.h>
21 #include <limits.h>
22 #include <signal.h>
23 #include <poll.h>
24 #include <netdb.h>
25 #include <thread>
26 #include <mutex>
27 
28 
29 
31 // Parameter for data-contents
33 //#define REDUCED_DATA
34 //#define CRC_ON
35 #define LISTENQ 1
36 #define NUM_CLIENTS_PER_THREAD 1
37 #define NUM_CLIENTS 5
38 //#define MAX_EVENT 1000
39 
40 // Format (PCIe40)
41 #define NW_SEND_HEADER 6
42 #define NW_SEND_TRAILER 2
43 
44 #define NW_RAW_HEADER 8
45 #define NW_RAW_TRAILER 4
46 
47 #ifdef REDUCED_DATA
48 #define NW_B2L_HEADER 3
49 #define NW_B2L_TRAILER 2
50 #else
51 #define NW_B2L_HEADER 7
52 #define NW_B2L_TRAILER 3
53 #endif
54 
55 #define CTIME_VAL 0x12345601
56 
57 using namespace std;
58 
59 unsigned int* data_1[NUM_CLIENTS];
60 unsigned int* data_2[NUM_CLIENTS];
61 
62 std::mutex mtx1_ch[NUM_CLIENTS];
63 std::mutex mtx2_ch[NUM_CLIENTS];
64 
65 unsigned short CalcCRC16LittleEndian(unsigned short crc16, const int buf[], int nwords)
66 {
67 
68  if (nwords < 0) {
69  char err_buf[500];
70  sprintf(err_buf, "nwords value(%d) is invalid. Cannot calculate CRC16. Exiting...\n %s %s %d\n",
71  nwords, __FILE__, __PRETTY_FUNCTION__, __LINE__);
72  printf("%s", err_buf); fflush(stdout);
73  string err_str = err_buf;
74  throw (err_str);
75  }
76 
77  const unsigned short CRC16Table0x1021[ 256 ] = {
78  0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
79  0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
80  0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
81  0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
82  0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
83  0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
84  0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
85  0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
86 
87  0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
88  0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
89  0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
90  0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
91  0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
92  0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
93  0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
94  0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
95 
96  0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
97  0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
98  0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
99  0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
100  0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
101  0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
102  0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
103  0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
104 
105  0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
106  0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
107  0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
108  0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
109  0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
110  0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
111  0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
112  0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
113  };
114 
115  int cnt = 0, nints = 0;
116  // printf("### %.8x %.4x\n", buf[ 0 ], crc16);
117  while (nwords != 0) {
118 
119  unsigned char temp_buf = *((unsigned char*)(buf + nints) + (-(cnt % 4) + 3));
120  crc16 = CRC16Table0x1021[(crc16 >> (16 - CHAR_BIT)) ^ temp_buf ] ^ (crc16 << CHAR_BIT);
121  // printf("%.2x %.4x\n", temp_buf, crc16);
122  if ((cnt % 4) == 3) {
123  nwords--;
124  nints++;
125  // printf("### %.8x\n", buf[ nints ] );
126  }
127 
128  cnt++;
129  }
130 
131 
132  return crc16;
133 
134 }
135 
136 
137 double getTimeSec()
138 {
139  struct timeval t;
140  gettimeofday(&t, NULL);
141  return (t.tv_sec + t.tv_usec * 1.e-6 - 1417570000.);
142 }
143 
144 int fillDataContents(int* buf, int nwords_per_fee, unsigned int node_id, int ncpr, int nhslb, int run)
145 {
146  int nwords = NW_SEND_HEADER + NW_SEND_TRAILER +
147  ncpr * (NW_RAW_HEADER +
148  (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
149  + NW_RAW_TRAILER);
150 
151  // Send Header
152  int offset = 0;
153  buf[ offset + 0 ] = nwords;
154  buf[ offset + 1 ] = 6;
155  buf[ offset + 2 ] = (1 << 16) | ncpr;
156  unsigned int exp_run = run << 8;
157  buf[ offset + 3 ] = exp_run;
158  buf[ offset + 5 ] = node_id;
159  offset += NW_SEND_HEADER;
160 
161  for (int k = 0; k < ncpr; k++) {
162  int top_pos = offset;
163  //
164  // RawHeader
165  //
166  int cpr_nwords = NW_RAW_HEADER +
167  (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb
168  + NW_RAW_TRAILER;
169  int finesse_nwords = nwords_per_fee + NW_B2L_HEADER + NW_B2L_TRAILER;
170  unsigned int ctime = CTIME_VAL;
171  unsigned int utime = 0x98765432;
172 
173  buf[ offset + 0 ] = cpr_nwords;
174 #ifdef REDUCED_DATA
175  buf[ offset + 1 ] = 0x7f7f020c;
176 #else
177  buf[ offset + 1 ] = 0x7f7f820c;
178 #endif
179  buf[ offset + 2 ] = exp_run;
180  printf("run_no %d\n", exp_run); fflush(stdout);
181  buf[ offset + 4 ] = ctime;
182  buf[ offset + 5 ] = utime;
183  buf[ offset + 6 ] = node_id + k;
184  buf[ offset + 7 ] = 0x34567890;
185  offset += NW_RAW_HEADER;
186 
187  for (int i = 0; i < nhslb ; i++) {
188 #ifdef REDUCED_DATA
189  buf[ offset + 0 ] = nwords_per_fee + 3;
190  buf[ offset + 1 ] = 0xffaa0000;
191  buf[ offset + 2 ] = ctime;
192 #else
193  buf[ offset + 0 ] = nwords_per_fee + 7;
194  buf[ offset + 1 ] = 0xffaa0000;
195  buf[ offset + 3 ] = ctime;
196  buf[ offset + 4 ] = utime;
197  buf[ offset + 5 ] = exp_run;
198  buf[ offset + 6 ] = ctime;
199 #endif
200  offset += NW_B2L_HEADER;
201 
202  for (int j = offset; j < offset + nwords_per_fee; j++) {
203  buf[ j ] = rand();
204  }
205  offset += nwords_per_fee;
206 
207 #ifdef REDUCED_DATA
208  buf[ offset ] = 0;
209  buf[ offset + 1 ] = 0xff550000;
210 #else
211  buf[ offset ] = ctime;
212  buf[ offset + 1 ] = 0;
213  buf[ offset + 2 ] = 0xff550000;
214 #endif
215 
216  offset += NW_B2L_TRAILER;
217  }
218  buf[ offset ] = 0x0; // error bits
219  buf[ offset + 1 ] = 0x0; // error slots
220  buf[ offset + 2 ] = 0x0; // XOR checksum
221  buf[ offset + 3 ] = 0x7fff0006;
222  offset += NW_RAW_TRAILER;
223  }
224 
225  // Send trailer
226  buf[ offset ] = 0;
227  buf[ offset + 1 ] = 0x7fff0000;
228  offset += NW_SEND_TRAILER;
229  return offset;
230 }
231 
232 
233 
234 inline void addEvent(int* buf, int nwords_per_fee, unsigned int event, int ncpr, int nhslb)
235 //inline void addEvent(int* buf, int nwords, unsigned int event)
236 {
237  int offset = 0;
238  int prev_offset;
239  buf[ offset + 4 ] = event;
240  offset += NW_SEND_HEADER;
241 
242  for (int k = 0; k < ncpr; k++) {
243  int nwords = buf[ offset ];
244  int posback_xorchksum = 2;
245  int pos_xorchksum = offset + nwords - posback_xorchksum;
246  prev_offset = offset;
247  if (buf[ offset + 4 ] != CTIME_VAL) {
248  printf("[FATAL] data-production error 2 0x%.x", buf[ offset + 4 ]);
249  fflush(stdout);
250  exit(1);
251  }
252  // RawHeader
253  buf[ pos_xorchksum ] ^= buf[ offset + 3];
254  buf[ offset + 3] = event;
255  buf[ pos_xorchksum ] ^= buf[ offset + 3];
256 
257  // COPPER header
258  offset += NW_RAW_HEADER;
259  for (int i = 0; i < nhslb ; i++) {
260  if ((buf[ offset + 1 ] & 0xffff0000) != 0xffaa0000) {
261  printf("[FATAL] data-production error 3 : 0x%.x hslb %d cpr %d\n", buf[ offset ], i, k);
262  fflush(stdout);
263  exit(1);
264  }
265  buf[ offset + 1 ] = 0xffaa0000 + (event & 0xffff);
266  buf[ offset + 3 ] = event;
267 
268 #ifdef CRC_ON
269  int* crc_buf = buf + offset + 2; // 1 => size of HSLB B2L header
270  int crc_nwords = nwords_per_fee + 5; // 5 => size of FEE B2L header
271  unsigned short temp_crc16 = CalcCRC16LittleEndian(0xffff, crc_buf, crc_nwords);
272  buf[ offset + NW_B2L_HEADER + nwords_per_fee + 1 ] = ((event & 0x0000ffff) << 16) | temp_crc16;
273 #endif
274 
275 #ifdef REDUCED_DATA
276  offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
277 #else
278  offset += NW_B2L_HEADER + nwords_per_fee + NW_B2L_TRAILER;
279 #endif
280  }
281  offset += NW_RAW_TRAILER;
282  unsigned int xor_chksum = 0;
283  unsigned int xor_chksum2 = 0;
284  }
285 
286 }
287 
288 
289 int sender(int sender_id, int run_no, int nwords_per_fee, int ncpr, int nhslb)
290 {
291  //
292  // data
293  //
294  int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
295  ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
296  NW_RAW_TRAILER);
297  int buff[total_words];
298 
299  //
300  // network connection
301  //
302  int port_to = 30000 + sender_id;
303  int listenfd;
304  struct sockaddr_in servaddr;
305  struct pollfd client[NUM_CLIENTS_PER_THREAD + 1];
306 
307  //
308  // Connect to cprtb01
309  //
310  // string hostname_local;
311  // struct hostent* host;
312  // host = gethostbyname(hostname_local.c_str());
313  // if (host == NULL) {
314  // char err_buf[500];
315  // sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
316  // hostname_local.c_str(), strerror(errno));
317  // printf("%s\n", err_buf); fflush(stdout);
318  // // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
319  // exit(1);
320  // }
321 
322  //
323  // Bind and listen
324  //
325  int fd_listen;
326  struct sockaddr_in sock_listen;
327  sock_listen.sin_family = AF_INET;
328  // sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
329  sock_listen.sin_addr.s_addr = htonl(INADDR_ANY);
330 
331  socklen_t addrlen = sizeof(sock_listen);
332  sock_listen.sin_port = htons(port_to);
333  fd_listen = socket(PF_INET, SOCK_STREAM, 0);
334 
335  int flags = 1;
336  int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)sizeof(flags));
337  if (ret < 0) {
338  perror("Failed to set REUSEADDR");
339  }
340 
341  if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
342  printf("[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
343  port_to); fflush(stdout);
344  // Check the process occupying the port 30000.
345  FILE* fp;
346  char buf[256];
347  char cmdline[500];
348  sprintf(cmdline, "/usr/sbin/ss -ap | grep %d", port_to);
349  if ((fp = popen(cmdline, "r")) == NULL) {
350  printf("[WARNING] Failed to run %s\n", cmdline);
351  }
352  while (fgets(buf, 256, fp) != NULL) {
353  printf("[ERROR] Failed to bind. output of ss(port %d) : %s\n", port_to, buf); fflush(stdout);
354  }
355  // Error message
356  fclose(fp);
357  char err_buf[500];
358  sprintf(err_buf, "[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
359  strerror(errno), port_to);
360  printf("%s\n", err_buf); fflush(stdout);
361  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
362  exit(1);
363  }
364 
365  int val1 = 0;
366  setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)sizeof(val1));
367  int backlog = 1;
368  if (listen(fd_listen, backlog) < 0) {
369  char err_buf[500];
370  sprintf(err_buf, "Failed in listen(%s). Exting...", strerror(errno));
371  printf("%s\n", err_buf); fflush(stdout);
372  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
373  exit(-1);
374  }
375 
376  //
377  // Accept
378  //
379  int fd_accept;
380  struct sockaddr_in sock_accept;
381  // printf("[DEBUG] Accepting... : port %d server %s\n", port_to, hostname_local.c_str());
382  printf("[DEBUG] Accepting... : port %d\n", port_to);
383  fflush(stdout);
384 
385  if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
386  char err_buf[500];
387  sprintf(err_buf, "[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
388  printf("%s\n", err_buf); fflush(stdout);
389  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
390  exit(-1);
391  } else {
392  // B2INFO("Done.");
393  printf("[DEBUG] Accepted.\n"); fflush(stdout);
394 
395  // set timepout option
396  struct timeval timeout;
397  timeout.tv_sec = 1;
398  timeout.tv_usec = 0;
399  ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)sizeof(timeout));
400  if (ret < 0) {
401  char err_buf[500] = "[FATAL] Failed to set TIMEOUT. Exiting...";
402  printf("%s\n", err_buf); fflush(stdout);
403  // print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
404  exit(-1);
405  }
406  }
407 
408  if (fd_listen) {
409  close(fd_listen);
410  }
411 
412 
413 
414  printf("Connection(port %d) accepted\n", port_to); fflush(stdout);
415 
416  double init_time = getTimeSec();
417  double prev_time = init_time;
418 
419  unsigned long long int cnt = 0;
420  unsigned long long int prev_cnt = 0;
421  unsigned long long int start_cnt = 3000;
422 
423  for (
424 #ifdef MAX_EVENT
425  int j = 0; j < MAX_EVENT; j++
426 #else
427  ;;
428 #endif
429  ) {
430  // addEvent(buff, total_words, cnt);
431  //addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
432  // printf("cnt %d bytes\n", cnt*total_words); fflush(stdout);
433  // sprintf( buff, "event %d dessa", cnt );
434 
435  // for(int i = 0 ; i < total_words ; i++){
436  // printf("%.8x ", buff[ i ]);
437  // if( i % 10 == 9 ) printf("\n");
438  // }
439 
440 
441  int ret = 0;
442  if ((ret = write(fd_accept, buff, total_words * sizeof(int))) <= 0) {
443  printf("[FATAL] Return value %d\n", ret);
444  fflush(stdout);
445  exit(1);
446  }
447 
448  cnt++;
449 
450  if (cnt == start_cnt) init_time = getTimeSec();
451  if (cnt % 1000000 == 1) {
452  if (cnt > start_cnt) {
453  double cur_time = getTimeSec();
454  printf("run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
455  run_no,
456  cnt,
457  cur_time - init_time,
458  NUM_CLIENTS_PER_THREAD * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
459  (cnt - prev_cnt) / (cur_time - prev_time) / 1000. ,
460  NUM_CLIENTS_PER_THREAD * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
461  (cnt - start_cnt) / (cur_time - init_time) / 1000. , total_words);
462 
463  fflush(stdout);
464  prev_time = cur_time;
465  prev_cnt = cnt;
466  } else {
467  // printf("Eve %lld\n", cnt);fflush(stdout);
468  }
469  }
470  }
471 
472  close(fd_accept);
473  return 0;
474 }
475 
476 int main(int argc, char** argv)
477 {
478 
479  printf("###################################################\n");
480 #ifdef REDUCED_DATA
481  printf("# PCIe40 data after reduction (#define REDUCED_DATA) #\n");
482 #else
483  printf("# PCIe40 data before reduction (//#define REDUCED_DATA) #\n");
484 #endif
485  printf("###################################################\n");
486 
487  if (argc != 6) {
488  printf("Usage : %s <node ID> <run#> <nwords of det. buf per FEE> <# of CPR per COPPER> <# of HSLBs>\n", argv[ 0 ]);
489  exit(1);
490  }
491 
492  //
493  // dummy data
494  //
495  unsigned int node_id = 0;
496  sscanf(argv[1], "0x%x", &node_id);
497 
498  int run_no = atoi(argv[2]);
499  int nwords_per_fee = atoi(argv[3]);
500  int ncpr = atoi(argv[4]);
501  int nhslb = atoi(argv[5]);
502 
503  //
504  // buffer for inter-threads communication
505  //
506  for (int i = 0; i < NUM_CLIENTS; i++) {
507  data_1[i] = new unsigned int[100000];
508  data_2[i] = new unsigned int[100000];
509  }
510 
511  //
512  // Make sender threads
513  //
514  std::thread sender0(sender, 0, run_no, nwords_per_fee, ncpr, nhslb);
515  std::thread sender1(sender, 1, run_no, nwords_per_fee, ncpr, nhslb);
516  std::thread sender2(sender, 2, run_no, nwords_per_fee, ncpr, nhslb);
517  std::thread sender3(sender, 3, run_no, nwords_per_fee, ncpr, nhslb);
518  std::thread sender4(sender, 4, run_no, nwords_per_fee, ncpr, nhslb);
519 
520 
521 #ifdef AIUEO
522  //
523  // dummy data
524  //
525  unsigned int node_id = 0;
526  sscanf(argv[1], "0x%x", &node_id);
527 
528  int run_no = atoi(argv[2]);
529  int nwords_per_fee = atoi(argv[3]);
530  int ncpr = atoi(argv[4]);
531  int nhslb = atoi(argv[5]);
532 
533  int total_words = NW_SEND_HEADER + NW_SEND_TRAILER +
534  ncpr * (NW_RAW_HEADER + (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb +
535  NW_RAW_TRAILER);
536  printf("TET %d %d %d %d %d\n ", NW_SEND_HEADER + NW_SEND_TRAILER, ncpr,
537  NW_RAW_HEADER, (NW_B2L_HEADER + NW_B2L_TRAILER + nwords_per_fee) * nhslb, NW_RAW_TRAILER);
538  int buff[total_words];
539 
540  //
541  // Prepare header
542  //
543  int temp_ret = fillDataContents(buff, nwords_per_fee, node_id, ncpr, nhslb, run_no);
544  if (temp_ret != total_words) {
545  printf("[FATAL] data-production error 1 %d %d\n", total_words, temp_ret);
546  fflush(stdout);
547  exit(1);
548  }
549 
550  double init_time = getTimeSec();
551  double prev_time = init_time;
552 
553  unsigned long long int cnt = 0;
554  unsigned long long int prev_cnt = 0;
555  unsigned long long int start_cnt = 300000;
556 
557  int buffer_id = 0;
558 
559 #ifdef MAX_EVENT
560  for (int j = 0; j < MAX_EVENT; j++)
561 #else
562  for (;;)
563 #endif
564  {
565  // addEvent(buff, total_words, cnt);
566  addEvent(buff, nwords_per_fee, cnt, ncpr, nhslb);
567  // printf("cnt %d bytes\n", cnt*total_words); fflush(stdout);
568  // sprintf( buff, "event %d dessa", cnt );
569 
570  // for(int i = 0 ; i < total_words ; i++){
571  // printf("%.8x ", buff[ i ]);
572  // if( i % 10 == 9 ) printf("\n");
573  // }
574 
575  for (int i = 1 ; i <= NUM_CLIENTS ; i++) {
576  int ret = 0;
577  if (buffer_id == 0) {
578  {
579  std::lock_guard<std::mutex> lock(mtx1_ch[i]);
580  memcpy(data_1[i], buff, total_words * sizeof(unsigned int))
581  }
582  } else {
583  {
584  std::lock_guard<std::mutex> lock(mtx2_ch[i]);
585  memcpy(data_2[i], buff, total_words * sizeof(unsigned int))
586  }
587  }
588  // if ((ret = write(client[i].fd, buff, total_words * sizeof(int))) <= 0) {
589  // printf("[FATAL] Return value %d\n", ret);
590  // fflush(stdout);
591  // exit(1);
592  // }
593  }
594 
595  if (buffer_id == 0) {
596  buffer_id = 1;
597  } else {
598  buffer_id = 0;
599  }
600 
601  cnt++;
602 
603  if (cnt == start_cnt) init_time = getTimeSec();
604  if (cnt % 10000 == 1) {
605  if (cnt > start_cnt) {
606  double cur_time = getTimeSec();
607  printf("run %d evt %lld time %.1lf dataflow %.1lf MB/s rate %.2lf kHz : so far dataflow %.1lf MB/s rate %.2lf kHz size %d\n",
608  run_no,
609  cnt,
610  cur_time - init_time,
611  NUM_CLIENTS * (cnt - prev_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - prev_time),
612  (cnt - prev_cnt) / (cur_time - prev_time) / 1000. ,
613  NUM_CLIENTS * (cnt - start_cnt)*total_words * sizeof(int) / 1000000. / (cur_time - init_time),
614  (cnt - start_cnt) / (cur_time - init_time) / 1000. , total_words);
615 
616  fflush(stdout);
617  prev_time = cur_time;
618  prev_cnt = cnt;
619  } else {
620  // printf("Eve %lld\n", cnt);fflush(stdout);
621  }
622  }
623  }
624 
625 #endif
626 
627  sender0.join();
628  sender1.join();
629  sender2.join();
630  sender3.join();
631  sender4.join();
632 
633  for (int i = 0; i < NUM_CLIENTS; i++) {
634  delete data_1[i];
635  delete data_2[i];
636  }
637 
638 
639 }
640 #else
641 
642 int main()
643 {
644  return 0;
645 }
646 #endif
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:75