Belle II Software development
DeSerializer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/rawdata/modules/DAQConsts.h>
9#include <daq/rawdata/modules/DeSerializer.h>
10
11#include <sys/mman.h>
12
13//#define MAXEVTSIZE 400000000
14
15using namespace std;
16using namespace Belle2;
17
18//-----------------------------------------------------------------
19// Register the Module
20//-----------------------------------------------------------------
21REG_MODULE(DeSerializer);
22
23//-----------------------------------------------------------------
24// Implementation
25//-----------------------------------------------------------------
26
28
30{
31 //Set module properties
32 setDescription("Encode DataStore into RingBuffer");
33
34 addParam("DumpFileName", m_dump_fname, "filename to record data", string(""));
35
36 addParam("EventDataBufferWords", BUF_SIZE_WORD, "DataBuffer words per event", 4800);
37
38 addParam("MaxEventNum", max_nevt, "Maximum event number in one run", -1);
39
40 addParam("MaxTime", max_seconds, "Time(s) to stop, DAQ", -1.);
41
42 addParam("NodeID", m_nodeid, "Node(subsystem) ID", 0);
43
44 addParam("NodeName", m_nodename, "Node(subsystem) name", std::string(""));
45
46 addParam("UseShmFlag", m_shmflag, "Use shared memory to communicate with Runcontroller", 0);
47
48 m_nodeid = m_nodeid << 12; // input value is used as slog ID in subsystemID record
49
50 n_basf2evt = -1;
51
52 m_totbytes = 0;
53
55
56 m_prev_time = 0.;
57
58 monitor_numeve = 10;
59
60 m_exp_no = 0; // will obtain info from parameter
61
62 m_data_type = 0; // will obtain info from parameter
63
64 m_trunc_mask = 0; // will obtain info from parameter
65
66 m_prev_nevt = -1;
67
68 prev_event = -1;
69
70 m_run_no = 0; // will obtain info from data
71
72 m_prev_run_no = -1;
73
74 m_exprunsubrun_no = 0; // will obtain info from data
75
76 m_prev_exprunsubrun_no = 0xFFFFFFFF;
77
78 m_start_flag = 0;
79
80
81}
82
83
84DeSerializerModule::~DeSerializerModule()
85{
86}
87
88
90{
91
92 // allocate buffer
93 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
94 m_bufary[i] = new int[ BUF_SIZE_WORD ];
95 }
96 m_buffer = new int[ BUF_SIZE_WORD ];
97
98}
99
101{
102 delete[] m_buffer;
103 for (int i = 0 ; i < NUM_EVT_PER_BASF2LOOP_COPPER; i++) {
104 delete[] m_bufary[i];
105 }
106
107 if (m_dump_fname.size() > 0) {
108 fclose(m_fp_dump);
109 }
110
111}
112
113
115//(char* path_cfg, char* path_sta)
116{
117 errno = 0;
118 /*m_shmfd_cfg = shm_open( "/cpr_config2", O_CREAT | O_EXCL | O_RDWR, 0666);
119 if (m_shmfd_cfg < 0) {
120 if (errno != EEXIST) {
121 perror("shm_open1");
122 exit(1);
123 }
124 m_shmfd_cfg = shm_open(path_cfg, O_RDWR, 0666);
125 if (m_shmfd_cfg < 0) {
126 printf( "[DEBUG] %s\n", path_cfg);
127 perror("[ERROR] shm_open2");
128 exit(1);
129 }
130 */
131 //}
132 /*
133 m_shmfd_sta = shm_open( "/cpr_status2", O_CREAT | O_EXCL | O_RDWR, 0666);
134 if (m_shmfd_sta < 0) {
135 if (errno != EEXIST) {
136 perror("shm_open1");
137 exit(1);
138 }
139 m_shmfd_sta = shm_open(path_sta , O_RDWR, 0666);
140 if (m_shmfd_sta < 0) {
141 printf( "[DEBUG] %s\n", path_sta);
142 perror("[ERROR] shm_open2");
143 exit(1);
144 }
145 //}
146 int size = 4 * sizeof(int);
147 ftruncate(m_shmfd_cfg, size);
148 ftruncate(m_shmfd_sta, size);
149 */
150}
151
152int* DeSerializerModule::shmGet(int fd, int size_words)
153{
154 int offset = 0;
155 return (int*)mmap(NULL, size_words * sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, offset);
156}
157
158
159
160
161unsigned int DeSerializerModule::calcXORChecksum(int* buf, int nwords)
162{
163 unsigned int checksum = 0;
164 for (int i = 0; i < nwords; i++) {
165
166 checksum = checksum ^ buf[ i ];
167 }
168 return checksum;
169}
170
171
172unsigned int DeSerializerModule::calcSimpleChecksum(int* buf, int nwords)
173{
174 unsigned int checksum = 0;
175 for (int i = 0; i < nwords; i++) {
176 checksum = checksum + (unsigned int)buf[ i ];
177 // printf( "[DEBUG] i %.4d 0x%.8x 0x%.8x\n", i, checksum, buf[i]);
178 }
179 return checksum;
180}
181
182
183
185{
186 struct timeval t;
187 gettimeofday(&t, NULL);
188 return (t.tv_sec + t.tv_usec * 1.e-6);
189}
190
191
192void DeSerializerModule::recordTime(int event, double* array)
193{
194 if (event >= 10000 && event < 10500) {
195 array[ event - 10000 ] = getTimeSec() - m_start_time;
196 }
197 return;
198}
199
200
202{
203 if ((m_fp_dump = fopen(m_dump_fname.c_str(), "wb")) == NULL) {
204 char err_buf[500];
205 sprintf(err_buf, "[FATAL] Failed to open file %s. Exiting...\n", m_dump_fname.c_str());
206 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
207 exit(-1);
208 }
209}
210
211void DeSerializerModule::dumpData(char* buf, int size)
212{
213 if (fwrite(buf, size, 1, m_fp_dump) <= 0) {
214 char err_buf[500];
215 sprintf(err_buf, "[FATAL] Failed to write buffer to a file. Exiting...\n");
216 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
217 exit(-1);
218 }
219}
220
221void DeSerializerModule::printData(int* buf, int nwords)
222{
223 printf("[DEBUG]");
224 for (int i = 0; i < nwords; i++) {
225 printf("%.8x ", buf[ i ]);
226 if (i % 10 == 9) printf("\n[DEBUG]");
227 }
228 printf("\n[DEBUG]");
229 printf("\n");
230 return;
231}
232
233void DeSerializerModule::printASCIIData(int* buf, int nwords)
234{
235 char ascii_code[500];
236 sprintf(ascii_code,
237 " ! #$ &'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[ ]^_'abcdefghijklmnopqrstuvwxyz{|}~ ");
238 printf("[DEBUG]");
239 for (int i = 0; i < nwords; i++) {
240 for (int j = 0 ; j < 4; j++) {
241 printf("%c", ascii_code[(buf[ i ] >> j * 8) & 0x7f ]);
242 }
243 if (i % 10 == 9) printf("\n[DEBUG]");
244 }
245 printf("\n[DEBUG]");
246 printf("\n");
247 return;
248}
249
250int* DeSerializerModule::getNewBuffer(int nwords, int* delete_flag)
251{
252
253 int* temp_buf = NULL;
254 // Prepare buffer
255 // printf( "[DEBUG] ############ %d %d %d %d\n", nwords, BUF_SIZE_WORD, m_num_usedbuf, NUM_PREALLOC_BUF );
256 if (nwords > BUF_SIZE_WORD) {
257 *delete_flag = 1;
258 temp_buf = new int[ nwords ];
259 } else {
260 if ((temp_buf = getPreAllocBuf()) == 0x0) {
261 char err_buf[500];
262 sprintf(err_buf, "[FATAL] Null pointer from GetPreALlocBuf(). Exting...\n");
263 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
264 sleep(1234567);
265 exit(1);
266 } else {
267 *delete_flag = 0;
268 }
269 }
270
271 return temp_buf;
272
273}
274
276{
277 int* tempbuf = 0;
278 if (m_num_usedbuf < NUM_PREALLOC_BUF) {
279 tempbuf = m_bufary[ m_num_usedbuf ];
281 } else {
282 char err_buf[500];
283 sprintf(err_buf,
284 "[FATAL] No pre-allocated buffers are left. %d > %d. Not enough buffers are allocated or memory leak or forget to call ClearNumUsedBuf every event loop. Exting...",
285 m_num_usedbuf, NUM_PREALLOC_BUF);
286 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
287 sleep(1234567);
288 exit(1);
289 }
290 return tempbuf;
291}
292
293
294void DeSerializerModule::RateMonitor(unsigned int nevt, int subrun, int run)
295{
296 double current_time = getTimeSec();
297 double total_time = current_time - m_start_time;
298 double interval = current_time - m_prev_time;
299 time_t timer;
300 struct tm* t_st;
301 time(&timer);
302 t_st = localtime(&timer);
303 char timeStr[100];
304 std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%d %H:%M:%S\n", t_st);
305
306 printf("[INFO] run %d sub %d Event %12d Rate %6.2lf[kHz] Recvd Flow %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s] %s",
307 run, subrun,
308 nevt, (nevt - m_prev_nevt) / interval / 1.e3,
309 (m_totbytes - m_prev_totbytes) / interval / 1.e6,
310 total_time,
311 interval,
312 timeStr);
313
314 fflush(stdout);
315 m_prev_time = current_time;
316 m_prev_totbytes = m_totbytes;
317 m_prev_nevt = nevt;
318
319}
320
321#ifdef NONSTOP
322void DeSerializerModule::openRunPauseNshm()
323{
324 char path_shm[100] = "/cpr_pause_resume";
325 int fd = shm_open(path_shm, O_RDONLY, 0666);
326 if (fd < 0) {
327 printf("[DEBUG] %s\n", path_shm);
328 perror("[FATAL] failed to open shm");
329 exit(1);
330 }
331 m_ptr = (int*)mmap(NULL, sizeof(int), PROT_READ, MAP_SHARED, fd, 0);
332 return;
333}
334
335int DeSerializerModule::checkRunPause()
336{
337
338 if (m_ptr == NULL) {
339 B2INFO("Shared memory is not assigned.");
340 return 0;
341 }
342#ifdef NONSTOP_SLC
343 if (g_status.getState() == g_status.PAUSING) {
344#else
345 if (*m_ptr) {
346#endif
347 return 1;
348 } else {
349 return 0;
350 }
351}
352
353int DeSerializerModule::checkRunRecovery()
354{
355 if (m_ptr == NULL) {
356 B2INFO("Shared memory is not assigned.");
357 return 0;
358 }
359#ifdef NONSTOP_SLC
360 if (g_status.getState() == g_status.RESUMING) {
361#else
362 if (*m_ptr) {
363#endif
364 return 0;
365 } else {
366 return 1;
367 }
368}
369
370
371void DeSerializerModule::resumeRun()
372{
373
374#ifdef NONSTOP_DEBUG
375 printf("###########(Des) Resume from PAUSE ############### %s %s %d\n", __FILE__, __PRETTY_FUNCTION__, __LINE__);
376 fflush(stdout);
377#endif
378 g_run_error = 0;
379 g_run_resuming = 1;
380 m_start_flag = 0;
381
382 return;
383}
384
385
386void DeSerializerModule::pauseRun()
387{
388 g_run_pause = 1;
389#ifdef NONSTOP_DEBUG
390 printf("###########(Des) Pause the run ###############\n");
391 fflush(stdout);
392#endif
393 return;
394}
395
396
397void DeSerializerModule::waitResume()
398{
399 while (true) {
400 if (checkRunRecovery()) {
401 g_run_pause = 0;
402 break;
403 }
404#ifdef NONSTOP_DEBUG
405 printf("###########(Des) Waiting for RESUME ###############\n");
406 fflush(stdout);
407#endif
408 sleep(1);
409 }
410 return;
411}
412
413
414
415void DeSerializerModule::callCheckRunPause(string& err_str)
416{
417#ifdef NONSTOP_DEBUG
418 printf("\033[34m");
419 printf("###########(Des) TIMEOUT during recv() ###############\n");
420 fflush(stdout);
421 printf("\033[0m");
422#endif
423 if (checkRunPause()) {
424#ifdef NONSTOP_DEBUG
425 printf("\033[31m");
426 printf("###########(Des) Pause is detected during recv(). ###############\n");
427 fflush(stdout);
428 printf("\033[0m");
429#endif
430 g_run_pause = 1;
431 err_str = "RUN_PAUSE";
432
433 throw (err_str);
434 }
435 return;
436}
437
438int DeSerializerModule::CheckConnection(int socket)
439{
440 // Modify Yamagata-san's eb/iseof.cc
441
442
443 int ret;
444 char buffer[100000];
445 int eagain_cnt = 0;
446 int tot_ret = 0;
447 while (true) {
448
449 //
450 // Extract data in the socket buffer of a peer
451 //
452 // ret = recv( socket, buffer, sizeof(buffer), MSG_PEEK|MSG_DONTWAIT );
453 ret = recv(socket, buffer, sizeof(buffer), MSG_DONTWAIT);
454 switch (ret) {
455 case 0: /* EOF */
456 printf("EOF %d\n", socket); fflush(stdout);
457 close(socket);
458 return -1;
459 case -1:
460 if (errno == EAGAIN) {
461 printf("EAGAIN %d cnt %d recvd %d\n", socket, eagain_cnt, tot_ret); fflush(stdout);
462 /* not EOF, no data in queue */
463 if (eagain_cnt > 100) {
464 return 0;
465 }
466 usleep(10000);
467 eagain_cnt++;
468 } else {
469 printf("ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
470 close(socket);
471 return -1;
472 }
473 break;
474 default:
475 tot_ret += ret;
476 printf("Flushing data in socket buffer : sockid = %d %d bytes tot %d bytes\n", socket, ret, tot_ret); fflush(stdout);
477 }
478 }
479}
480
481#endif
482
484{
485}
486
unsigned int calcSimpleChecksum(int *buf, int nwords)
calculate checksum
void shmOpen(char *path_cfg, char *path_sta)
open shared memory
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
Definition: DeSerializer.h:173
void initialize() override
Module functions to be called from main process.
Definition: DeSerializer.cc:89
unsigned int calcXORChecksum(int *buf, int nwords)
calculate checksum
std::string m_nodename
Node name.
Definition: DeSerializer.h:161
virtual int * getPreAllocBuf()
Getbuffer.
int * shmGet(int fd, int size_words)
Get shared memory.
void event() override
Module functions to be called from main process.
int BUF_SIZE_WORD
size of buffer for one event (word)
Definition: DeSerializer.h:83
static RunInfoBuffer g_status
buffer class to communicate with NSM client
Definition: DeSerializer.h:200
void terminate() override
This method is called at the end of the event processing.
int n_basf2evt
No. of sent events.
Definition: DeSerializer.h:86
DeSerializerModule()
Constructor / Destructor.
Definition: DeSerializer.cc:29
CprErrorMessage print_err
wrapper for B2LOG system
Definition: DeSerializer.h:213
virtual void dumpData(char *buf, int size)
dump binary data
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
Definition: DeSerializer.h:92
int m_shmflag
Use shared memory.
Definition: DeSerializer.h:185
int monitor_numeve
buffer for shared memory
Definition: DeSerializer.h:207
int m_prev_nevt
No. of prev sent events.
Definition: DeSerializer.h:89
void recordTime(int event, double *array)
store time info.
int m_compressionLevel
Compression Level.
Definition: DeSerializer.h:80
int * m_bufary[NUM_PREALLOC_BUF]
buffer
Definition: DeSerializer.h:101
unsigned int m_exprunsubrun_no
run no.
Definition: DeSerializer.h:170
virtual void printASCIIData(int *buf, int nwords)
dump error data
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
Definition: DeSerializer.h:77
FILE * m_fp_dump
dump file descripter
Definition: DeSerializer.h:95
int m_nodeid
Node(PC or COPPER) ID.
Definition: DeSerializer.h:158
double getTimeSec()
store time info.
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.