8#include <daq/rawdata/modules/DAQConsts.h>
9#include <daq/rawdata/modules/DeSerializer.h>
46 addParam(
"UseShmFlag",
m_shmflag,
"Use shared memory to communicate with Runcontroller", 0);
84DeSerializerModule::~DeSerializerModule()
93 for (
int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
103 for (
int i = 0 ; i < NUM_EVT_PER_BASF2LOOP_COPPER; i++) {
155 return (
int*)mmap(NULL, size_words *
sizeof(
int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, offset);
163 unsigned int checksum = 0;
164 for (
int i = 0; i < nwords; i++) {
166 checksum = checksum ^ buf[ i ];
174 unsigned int checksum = 0;
175 for (
int i = 0; i < nwords; i++) {
176 checksum = checksum + (
unsigned int)buf[ i ];
187 gettimeofday(&t, NULL);
188 return (t.tv_sec + t.tv_usec * 1.e-6);
195 array[
event - 10000 ] =
getTimeSec() - m_start_time;
205 sprintf(err_buf,
"[FATAL] Failed to open file %s. Exiting...\n",
m_dump_fname.c_str());
206 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
213 if (fwrite(buf, size, 1,
m_fp_dump) <= 0) {
215 sprintf(err_buf,
"[FATAL] Failed to write buffer to a file. Exiting...\n");
216 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
224 for (
int i = 0; i < nwords; i++) {
225 printf(
"%.8x ", buf[ i ]);
226 if (i % 10 == 9) printf(
"\n[DEBUG]");
235 char ascii_code[500];
237 " ! #$ &'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[ ]^_'abcdefghijklmnopqrstuvwxyz{|}~ ");
239 for (
int i = 0; i < nwords; i++) {
240 for (
int j = 0 ; j < 4; j++) {
241 printf(
"%c", ascii_code[(buf[ i ] >> j * 8) & 0x7f ]);
243 if (i % 10 == 9) printf(
"\n[DEBUG]");
253 int* temp_buf = NULL;
258 temp_buf =
new int[ nwords ];
262 sprintf(err_buf,
"[FATAL] Null pointer from GetPreALlocBuf(). Exting...\n");
263 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
284 "[FATAL] No pre-allocated buffers are left. %d > %d. Not enough buffers are allocated or memory leak or forget to call ClearNumUsedBuf every event loop. Exting...",
286 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
297 double total_time = current_time - m_start_time;
298 double interval = current_time - m_prev_time;
302 t_st = localtime(&timer);
304 std::strftime(timeStr,
sizeof(timeStr),
"%Y-%m-%d %H:%M:%S\n", t_st);
306 printf(
"[INFO] run %d sub %d Event %12d Rate %6.2lf[kHz] Recvd Flow %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s] %s",
309 (m_totbytes - m_prev_totbytes) / interval / 1.e6,
315 m_prev_time = current_time;
316 m_prev_totbytes = m_totbytes;
322void DeSerializerModule::openRunPauseNshm()
324 char path_shm[100] =
"/cpr_pause_resume";
325 int fd = shm_open(path_shm, O_RDONLY, 0666);
327 printf(
"[DEBUG] %s\n", path_shm);
328 perror(
"[FATAL] failed to open shm");
331 m_ptr = (
int*)mmap(NULL,
sizeof(
int), PROT_READ, MAP_SHARED, fd, 0);
335int DeSerializerModule::checkRunPause()
339 B2INFO(
"Shared memory is not assigned.");
353int DeSerializerModule::checkRunRecovery()
356 B2INFO(
"Shared memory is not assigned.");
371void DeSerializerModule::resumeRun()
375 printf(
"###########(Des) Resume from PAUSE ############### %s %s %d\n", __FILE__, __PRETTY_FUNCTION__, __LINE__);
386void DeSerializerModule::pauseRun()
390 printf(
"###########(Des) Pause the run ###############\n");
397void DeSerializerModule::waitResume()
400 if (checkRunRecovery()) {
405 printf(
"###########(Des) Waiting for RESUME ###############\n");
415void DeSerializerModule::callCheckRunPause(
string& err_str)
419 printf(
"###########(Des) TIMEOUT during recv() ###############\n");
423 if (checkRunPause()) {
426 printf(
"###########(Des) Pause is detected during recv(). ###############\n");
431 err_str =
"RUN_PAUSE";
438int DeSerializerModule::CheckConnection(
int socket)
453 ret = recv(socket, buffer,
sizeof(buffer), MSG_DONTWAIT);
456 printf(
"EOF %d\n", socket); fflush(stdout);
460 if (errno == EAGAIN) {
461 printf(
"EAGAIN %d cnt %d recvd %d\n", socket, eagain_cnt, tot_ret); fflush(stdout);
463 if (eagain_cnt > 100) {
469 printf(
"ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
476 printf(
"Flushing data in socket buffer : sockid = %d %d bytes tot %d bytes\n", socket, ret, tot_ret); fflush(stdout);
unsigned int calcSimpleChecksum(int *buf, int nwords)
calculate checksum
void shmOpen(char *path_cfg, char *path_sta)
open shared memory
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
void initialize() override
Module functions to be called from main process.
unsigned int calcXORChecksum(int *buf, int nwords)
calculate checksum
std::string m_nodename
Node name.
virtual int * getPreAllocBuf()
Getbuffer.
int * shmGet(int fd, int size_words)
Get shared memory.
void event() override
Module functions to be called from main process.
int m_start_flag
start flag
int BUF_SIZE_WORD
size of buffer for one event (word)
static RunInfoBuffer g_status
buffer class to communicate with NSM client
void terminate() override
This method is called at the end of the event processing.
int n_basf2evt
No. of sent events.
int m_trunc_mask
trunc mask
DeSerializerModule()
Constructor / Destructor.
CprErrorMessage print_err
wrapper for B2LOG system
virtual void dumpData(char *buf, int size)
dump binary data
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
int m_shmflag
Use shared memory.
int monitor_numeve
buffer for shared memory
int m_prev_nevt
No. of prev sent events.
void recordTime(int event, double *array)
store time info.
int m_compressionLevel
Compression Level.
int * m_bufary[NUM_PREALLOC_BUF]
buffer
unsigned int m_exprunsubrun_no
run no.
virtual void printASCIIData(int *buf, int nwords)
dump error data
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
FILE * m_fp_dump
dump file descripter
int m_nodeid
Node(PC or COPPER) ID.
double getTimeSec()
store time info.
void setDescription(const std::string &description)
Sets the description of the module.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.