Belle II Software  release-08-01-10
DeSerializer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/rawdata/modules/DAQConsts.h>
9 #include <daq/rawdata/modules/DeSerializer.h>
10 
11 #include <sys/mman.h>
12 
13 //#define MAXEVTSIZE 400000000
14 
15 using namespace std;
16 using namespace Belle2;
17 
18 //-----------------------------------------------------------------
19 // Register the Module
20 //-----------------------------------------------------------------
21 REG_MODULE(DeSerializer)
22 
23 //-----------------------------------------------------------------
24 // Implementation
25 //-----------------------------------------------------------------
26 
27 RunInfoBuffer DeSerializerModule::g_status;
28 
29 DeSerializerModule::DeSerializerModule() : Module()
30 {
31  //Set module properties
32  setDescription("Encode DataStore into RingBuffer");
33 
34  addParam("DumpFileName", m_dump_fname, "filename to record data", string(""));
35 
36  addParam("EventDataBufferWords", BUF_SIZE_WORD, "DataBuffer words per event", 4800);
37 
38  addParam("MaxEventNum", max_nevt, "Maximum event number in one run", -1);
39 
40  addParam("MaxTime", max_seconds, "Time(s) to stop, DAQ", -1.);
41 
42  addParam("NodeID", m_nodeid, "Node(subsystem) ID", 0);
43 
44  addParam("NodeName", m_nodename, "Node(subsystem) name", std::string(""));
45 
46  addParam("UseShmFlag", m_shmflag, "Use shared memory to communicate with Runcontroller", 0);
47 
48  m_nodeid = m_nodeid << 12; // input value is used as slog ID in subsystemID record
49 
50  n_basf2evt = -1;
51 
52  m_totbytes = 0;
53 
55 
56  m_prev_time = 0.;
57 
58  monitor_numeve = 10;
59 
60  m_exp_no = 0; // will obtain info from parameter
61 
62  m_data_type = 0; // will obtain info from parameter
63 
64  m_trunc_mask = 0; // will obtain info from parameter
65 
66  m_prev_nevt = -1;
67 
68  prev_event = -1;
69 
70  m_run_no = 0; // will obtain info from data
71 
72  m_prev_run_no = -1;
73 
74  m_exprunsubrun_no = 0; // will obtain info from data
75 
76  m_prev_exprunsubrun_no = 0xFFFFFFFF;
77 
78  m_start_flag = 0;
79 
80 
81 }
82 
83 
84 DeSerializerModule::~DeSerializerModule()
85 {
86 }
87 
88 
90 {
91 
92  // allocate buffer
93  for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
94  m_bufary[i] = new int[ BUF_SIZE_WORD ];
95  }
96  m_buffer = new int[ BUF_SIZE_WORD ];
97 
98 }
99 
101 {
102  delete[] m_buffer;
103  for (int i = 0 ; i < NUM_EVT_PER_BASF2LOOP_COPPER; i++) {
104  delete[] m_bufary[i];
105  }
106 
107  if (m_dump_fname.size() > 0) {
108  fclose(m_fp_dump);
109  }
110 
111 }
112 
113 
114 void DeSerializerModule::shmOpen(char*, char*)
115 //(char* path_cfg, char* path_sta)
116 {
117  errno = 0;
118  /*m_shmfd_cfg = shm_open( "/cpr_config2", O_CREAT | O_EXCL | O_RDWR, 0666);
119  if (m_shmfd_cfg < 0) {
120  if (errno != EEXIST) {
121  perror("shm_open1");
122  exit(1);
123  }
124  m_shmfd_cfg = shm_open(path_cfg, O_RDWR, 0666);
125  if (m_shmfd_cfg < 0) {
126  printf( "[DEBUG] %s\n", path_cfg);
127  perror("[ERROR] shm_open2");
128  exit(1);
129  }
130  */
131  //}
132  /*
133  m_shmfd_sta = shm_open( "/cpr_status2", O_CREAT | O_EXCL | O_RDWR, 0666);
134  if (m_shmfd_sta < 0) {
135  if (errno != EEXIST) {
136  perror("shm_open1");
137  exit(1);
138  }
139  m_shmfd_sta = shm_open(path_sta , O_RDWR, 0666);
140  if (m_shmfd_sta < 0) {
141  printf( "[DEBUG] %s\n", path_sta);
142  perror("[ERROR] shm_open2");
143  exit(1);
144  }
145  //}
146  int size = 4 * sizeof(int);
147  ftruncate(m_shmfd_cfg, size);
148  ftruncate(m_shmfd_sta, size);
149  */
150 }
151 
152 int* DeSerializerModule::shmGet(int fd, int size_words)
153 {
154  int offset = 0;
155  return (int*)mmap(NULL, size_words * sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, offset);
156 }
157 
158 
159 
160 
161 unsigned int DeSerializerModule::calcXORChecksum(int* buf, int nwords)
162 {
163  unsigned int checksum = 0;
164  for (int i = 0; i < nwords; i++) {
165 
166  checksum = checksum ^ buf[ i ];
167  }
168  return checksum;
169 }
170 
171 
172 unsigned int DeSerializerModule::calcSimpleChecksum(int* buf, int nwords)
173 {
174  unsigned int checksum = 0;
175  for (int i = 0; i < nwords; i++) {
176  checksum = checksum + (unsigned int)buf[ i ];
177  // printf( "[DEBUG] i %.4d 0x%.8x 0x%.8x\n", i, checksum, buf[i]);
178  }
179  return checksum;
180 }
181 
182 
183 
185 {
186  struct timeval t;
187  gettimeofday(&t, NULL);
188  return (t.tv_sec + t.tv_usec * 1.e-6);
189 }
190 
191 
192 void DeSerializerModule::recordTime(int event, double* array)
193 {
194  if (event >= 10000 && event < 10500) {
195  array[ event - 10000 ] = getTimeSec() - m_start_time;
196  }
197  return;
198 }
199 
200 
202 {
203  if ((m_fp_dump = fopen(m_dump_fname.c_str(), "wb")) == NULL) {
204  char err_buf[500];
205  sprintf(err_buf, "[FATAL] Failed to open file %s. Exiting...\n", m_dump_fname.c_str());
206  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
207  exit(-1);
208  }
209 }
210 
211 void DeSerializerModule::dumpData(char* buf, int size)
212 {
213  if (fwrite(buf, size, 1, m_fp_dump) <= 0) {
214  char err_buf[500];
215  sprintf(err_buf, "[FATAL] Failed to write buffer to a file. Exiting...\n");
216  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
217  exit(-1);
218  }
219 }
220 
221 void DeSerializerModule::printData(int* buf, int nwords)
222 {
223  printf("[DEBUG]");
224  for (int i = 0; i < nwords; i++) {
225  printf("%.8x ", buf[ i ]);
226  if (i % 10 == 9) printf("\n[DEBUG]");
227  }
228  printf("\n[DEBUG]");
229  printf("\n");
230  return;
231 }
232 
233 void DeSerializerModule::printASCIIData(int* buf, int nwords)
234 {
235  char ascii_code[500];
236  sprintf(ascii_code,
237  " ! #$ &'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[ ]^_'abcdefghijklmnopqrstuvwxyz{|}~ ");
238  printf("[DEBUG]");
239  for (int i = 0; i < nwords; i++) {
240  for (int j = 0 ; j < 4; j++) {
241  printf("%c", ascii_code[(buf[ i ] >> j * 8) & 0x7f ]);
242  }
243  if (i % 10 == 9) printf("\n[DEBUG]");
244  }
245  printf("\n[DEBUG]");
246  printf("\n");
247  return;
248 }
249 
250 int* DeSerializerModule::getNewBuffer(int nwords, int* delete_flag)
251 {
252 
253  int* temp_buf = NULL;
254  // Prepare buffer
255  // printf( "[DEBUG] ############ %d %d %d %d\n", nwords, BUF_SIZE_WORD, m_num_usedbuf, NUM_PREALLOC_BUF );
256  if (nwords > BUF_SIZE_WORD) {
257  *delete_flag = 1;
258  temp_buf = new int[ nwords ];
259  } else {
260  if ((temp_buf = getPreAllocBuf()) == 0x0) {
261  char err_buf[500];
262  sprintf(err_buf, "[FATAL] Null pointer from GetPreALlocBuf(). Exting...\n");
263  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
264  sleep(1234567);
265  exit(1);
266  } else {
267  *delete_flag = 0;
268  }
269  }
270 
271  return temp_buf;
272 
273 }
274 
276 {
277  int* tempbuf = 0;
278  if (m_num_usedbuf < NUM_PREALLOC_BUF) {
279  tempbuf = m_bufary[ m_num_usedbuf ];
280  m_num_usedbuf++;
281  } else {
282  char err_buf[500];
283  sprintf(err_buf,
284  "[FATAL] No pre-allocated buffers are left. %d > %d. Not enough buffers are allocated or memory leak or forget to call ClearNumUsedBuf every event loop. Exting...",
285  m_num_usedbuf, NUM_PREALLOC_BUF);
286  print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
287  sleep(1234567);
288  exit(1);
289  }
290  return tempbuf;
291 }
292 
293 
294 void DeSerializerModule::RateMonitor(unsigned int nevt, int subrun, int run)
295 {
296  double current_time = getTimeSec();
297  double total_time = current_time - m_start_time;
298  double interval = current_time - m_prev_time;
299  time_t timer;
300  struct tm* t_st;
301  time(&timer);
302  t_st = localtime(&timer);
303  printf("[INFO] run %d sub %d Event %12d Rate %6.2lf[kHz] Recvd Flow %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s] %s",
304  run, subrun,
305  nevt, (nevt - m_prev_nevt) / interval / 1.e3,
306  (m_totbytes - m_prev_totbytes) / interval / 1.e6,
307  total_time,
308  interval,
309  asctime(t_st));
310 
311  fflush(stdout);
312  m_prev_time = current_time;
313  m_prev_totbytes = m_totbytes;
314  m_prev_nevt = nevt;
315 
316 }
317 
318 #ifdef NONSTOP
319 void DeSerializerModule::openRunPauseNshm()
320 {
321  char path_shm[100] = "/cpr_pause_resume";
322  int fd = shm_open(path_shm, O_RDONLY, 0666);
323  if (fd < 0) {
324  printf("[DEBUG] %s\n", path_shm);
325  perror("[FATAL] failed to open shm");
326  exit(1);
327  }
328  m_ptr = (int*)mmap(NULL, sizeof(int), PROT_READ, MAP_SHARED, fd, 0);
329  return;
330 }
331 
332 int DeSerializerModule::checkRunPause()
333 {
334 
335  if (m_ptr == NULL) {
336  B2INFO("Shared memory is not assigned.");
337  return 0;
338  }
339 #ifdef NONSTOP_SLC
340  if (g_status.getState() == g_status.PAUSING) {
341 #else
342  if (*m_ptr) {
343 #endif
344  return 1;
345  } else {
346  return 0;
347  }
348 }
349 
350 int DeSerializerModule::checkRunRecovery()
351 {
352  if (m_ptr == NULL) {
353  B2INFO("Shared memory is not assigned.");
354  return 0;
355  }
356 #ifdef NONSTOP_SLC
357  if (g_status.getState() == g_status.RESUMING) {
358 #else
359  if (*m_ptr) {
360 #endif
361  return 0;
362  } else {
363  return 1;
364  }
365 }
366 
367 
368 void DeSerializerModule::resumeRun()
369 {
370 
371 #ifdef NONSTOP_DEBUG
372  printf("###########(Des) Resume from PAUSE ############### %s %s %d\n", __FILE__, __PRETTY_FUNCTION__, __LINE__);
373  fflush(stdout);
374 #endif
375  g_run_error = 0;
376  g_run_resuming = 1;
377  m_start_flag = 0;
378 
379  return;
380 }
381 
382 
383 void DeSerializerModule::pauseRun()
384 {
385  g_run_pause = 1;
386 #ifdef NONSTOP_DEBUG
387  printf("###########(Des) Pause the run ###############\n");
388  fflush(stdout);
389 #endif
390  return;
391 }
392 
393 
394 void DeSerializerModule::waitResume()
395 {
396  while (true) {
397  if (checkRunRecovery()) {
398  g_run_pause = 0;
399  break;
400  }
401 #ifdef NONSTOP_DEBUG
402  printf("###########(Des) Waiting for RESUME ###############\n");
403  fflush(stdout);
404 #endif
405  sleep(1);
406  }
407  return;
408 }
409 
410 
411 
412 void DeSerializerModule::callCheckRunPause(string& err_str)
413 {
414 #ifdef NONSTOP_DEBUG
415  printf("\033[34m");
416  printf("###########(Des) TIMEOUT during recv() ###############\n");
417  fflush(stdout);
418  printf("\033[0m");
419 #endif
420  if (checkRunPause()) {
421 #ifdef NONSTOP_DEBUG
422  printf("\033[31m");
423  printf("###########(Des) Pause is detected during recv(). ###############\n");
424  fflush(stdout);
425  printf("\033[0m");
426 #endif
427  g_run_pause = 1;
428  err_str = "RUN_PAUSE";
429 
430  throw (err_str);
431  }
432  return;
433 }
434 
435 int DeSerializerModule::CheckConnection(int socket)
436 {
437  // Modify Yamagata-san's eb/iseof.cc
438 
439 
440  int ret;
441  char buffer[100000];
442  int eagain_cnt = 0;
443  int tot_ret = 0;
444  while (true) {
445 
446  //
447  // Extract data in the socket buffer of a peer
448  //
449  // ret = recv( socket, buffer, sizeof(buffer), MSG_PEEK|MSG_DONTWAIT );
450  ret = recv(socket, buffer, sizeof(buffer), MSG_DONTWAIT);
451  switch (ret) {
452  case 0: /* EOF */
453  printf("EOF %d\n", socket); fflush(stdout);
454  close(socket);
455  return -1;
456  case -1:
457  if (errno == EAGAIN) {
458  printf("EAGAIN %d cnt %d recvd %d\n", socket, eagain_cnt, tot_ret); fflush(stdout);
459  /* not EOF, no data in queue */
460  if (eagain_cnt > 100) {
461  return 0;
462  }
463  usleep(10000);
464  eagain_cnt++;
465  } else {
466  printf("ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
467  close(socket);
468  return -1;
469  }
470  break;
471  default:
472  tot_ret += ret;
473  printf("Flushing data in socket buffer : sockid = %d %d bytes tot %d bytes\n", socket, ret, tot_ret); fflush(stdout);
474  }
475  }
476 }
477 
478 #endif
479 
481 {
482 }
483 
unsigned int calcSimpleChecksum(int *buf, int nwords)
calculate checksum
void shmOpen(char *path_cfg, char *path_sta)
open shared memory
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
Definition: DeSerializer.h:173
void initialize() override
Module functions to be called from main process.
Definition: DeSerializer.cc:89
unsigned int calcXORChecksum(int *buf, int nwords)
calculate checksum
std::string m_nodename
Node name.
Definition: DeSerializer.h:161
virtual int * getPreAllocBuf()
Getbuffer.
int * shmGet(int fd, int size_words)
Get shared memory.
void event() override
Module functions to be called from main process.
int BUF_SIZE_WORD
size of buffer for one event (word)
Definition: DeSerializer.h:83
static RunInfoBuffer g_status
buffer class to communicate with NSM client
Definition: DeSerializer.h:200
void terminate() override
This method is called at the end of the event processing.
int n_basf2evt
No. of sent events.
Definition: DeSerializer.h:86
CprErrorMessage print_err
wrapper for B2LOG system
Definition: DeSerializer.h:213
virtual void dumpData(char *buf, int size)
dump binary data
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
Definition: DeSerializer.h:92
int m_shmflag
Use shared memory.
Definition: DeSerializer.h:185
int monitor_numeve
buffer for shared memory
Definition: DeSerializer.h:207
int m_prev_nevt
No. of prev sent events.
Definition: DeSerializer.h:89
void recordTime(int event, double *array)
store time info.
int m_compressionLevel
Compression Level.
Definition: DeSerializer.h:80
int * m_bufary[NUM_PREALLOC_BUF]
buffer
Definition: DeSerializer.h:101
unsigned int m_exprunsubrun_no
run no.
Definition: DeSerializer.h:170
virtual void printASCIIData(int *buf, int nwords)
dump error data
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
Definition: DeSerializer.h:77
FILE * m_fp_dump
dump file descripter
Definition: DeSerializer.h:95
int m_nodeid
Node(PC or COPPER) ID.
Definition: DeSerializer.h:158
double getTimeSec()
store time info.
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.