8#include <daq/rawdata/modules/DAQConsts.h>
9#include <daq/rawdata/modules/Serializer.h>
10#include <daq/rawdata/modules/DeSerializer.h>
12#include <netinet/tcp.h>
51 B2INFO(
"Tx: Constructor done.");
57SerializerModule::~SerializerModule()
63 signal(SIGPIPE, SIG_IGN);
71 char temp_char1[100] =
"/cpr_config";
72 char temp_char2[100] =
"/cpr_status";
73 shmOpen(temp_char1, temp_char2);
81 memset(time_array0, 0,
sizeof(time_array0));
82 memset(time_array1, 0,
sizeof(time_array1));
83 memset(time_array2, 0,
sizeof(time_array2));
86 if (status.isAvailable()) {
87 status.setOutputNBytes(0);
88 status.setOutputCount(0);
97 B2INFO(
"Tx initialized.");
104 B2INFO(
"beginRun called.");
113 B2INFO(
"endRun done.");
119 B2INFO(
"terminate called");
133 return (
int*)mmap(NULL, size_words *
sizeof(
int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, offset);
142 sprintf(err_buf,
"[FATAL] Failed to shm_open (%s). Exiting... : path %s\n",
143 strerror(errno), path_cfg);
144 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
152 sprintf(err_buf,
"[FATAL] Failed to shm_open (%s). Exiting... : path %s\n",
153 strerror(errno), path_sta);
154 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
159 int size = 4 *
sizeof(int);
170 int total_send_nwords =
184 if (total_send_nwords != (rawdblk->
GetBuffer(0))[ 0 ] + 8) {
186 sprintf(err_buf,
"[FATAL] Length error. total length %d rawdblk length %d. Exting...\n",
187 total_send_nwords, (rawdblk->
GetBuffer(0))[ 0 ]);
189 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
212 char err_buf[500] =
"[FATAL] CORRUPTED DATA: No COPPER blocks in RawDataBlock. Exiting...";
213 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
222int SerializerModule::sendByWriteV(
RawDataBlock* rawdblk)
226 fillSendHeaderTrailer(&send_header, &send_trailer, rawdblk);
231 struct iovec iov[ NUM_BUFFER ];
237 iov[0].iov_base = (
char*)send_header.
GetBuffer();
238 iov[0].iov_len =
sizeof(int) * send_header.
GetHdrNwords();
241 iov[1].iov_len =
sizeof(int) * rawcopper_nwords;
243 iov[2].iov_base = (
char*)send_trailer.GetBuffer();
244 iov[2].iov_len =
sizeof(int) * send_trailer.GetTrlNwords();
251 if ((n = writev(
m_socket, iov, NUM_BUFFER)) < 0) {
252 if (errno == EINTR) {
254 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
259 callCheckRunPause(err_str);
264 sprintf(err_buf,
"[WARNING] WRITEVa error.(%s) : sent %d bytes, header %lu bytes body %lu trailer %lu : %s %s %d\n",
265 strerror(errno), n, iov[0].iov_len, iov[1].iov_len, iov[2].iov_len,
266 __FILE__, __PRETTY_FUNCTION__, __LINE__);
270 string err_str =
"RUN_ERROR";
273 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
282 printf(
"[DEBUG] *******BODY**********\n");
283 printf(
"[DEBUG] \n%.8d : ", 0);
284 printData((
int*)(iov[1].iov_base), iov[1].iov_len);
288 int total_send_bytes =
sizeof(int) * send_header.GetTotalNwords();
294 if (n != total_send_bytes) {
295 B2WARNING(
"Serializer: Sent byte(" << n <<
"bytes) is not same as the event size (" << total_send_bytes <<
"bytes). Retryring...");
299 if (n < (
int)(iov[ 0 ].iov_len)) {
300 n +=
Send(
m_socket, (
char*)iov[ 0 ].iov_base + n, iov[ 0 ].iov_len - n);
303 if (n < (
int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len)) {
304 n +=
Send(
m_socket, (
char*)iov[ 1 ].iov_base + (n - iov[ 0 ].iov_len), iov[ 1 ].iov_len - (n - iov[ 0 ].iov_len));
307 if (n < (
int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len + iov[ 2 ].iov_len)) {
308 n +=
Send(
m_socket, (
char*)iov[ 2 ].iov_base + (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len),
309 iov[ 2 ].iov_len - (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len));
313 B2WARNING(
"Resending ends. It takes " << retry_end - retry_start <<
"(s)");
316 return total_send_bytes;
327 if ((ret = send(socket, buf + sent_bytes, size_bytes - sent_bytes, MSG_NOSIGNAL)) < 0) {
328 if (errno == EINTR) {
330 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
333 callCheckRunPause(err_str);
338 sprintf(err_buf,
"[ERROR] Send Error. (%s) : %s %s %d", strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
342 string err_str =
"RUN_ERROR";
345 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
351 if (sent_bytes == size_bytes)
break;
356void SerializerModule::Accept()
363 struct hostent* host;
367 sprintf(temp_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
369 print_err.PrintError(temp_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
381 struct sockaddr_in sock_listen;
382 sock_listen.sin_family = AF_INET;
383 sock_listen.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
385 socklen_t addrlen =
sizeof(sock_listen);
387 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
390 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)
sizeof(flags));
392 perror(
"Failed to set REUSEADDR");
395 if (bind(fd_listen, (
struct sockaddr*)&sock_listen,
sizeof(
struct sockaddr)) < 0) {
397 sprintf(temp_char,
"[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
399 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
404 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)
sizeof(val1));
406 if (listen(fd_listen, backlog) < 0) {
408 sprintf(err_buf,
"[FATAL] Failed in listen(%s). Exting...", strerror(errno));
409 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
417 struct sockaddr_in sock_accept;
421 B2INFO(
"Accepting...");
422 if ((fd_accept = accept(fd_listen, (
struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
424 sprintf(err_buf,
"[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
425 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
432 struct timeval timeout;
435 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)
sizeof(timeout));
437 char temp_char[100] =
"[FATAL] Failed to set TIMEOUT. Exiting...";
438 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
448 if (status.isAvailable()) {
451 status.setOutputPort(ntohs(sock_listen.sin_port));
452 status.setOutputAddress(sock_listen.sin_addr.s_addr);
453 printf(
"%d %x\n", (
int)ntohs(sock_listen.sin_port), (
int)sock_listen.sin_addr.s_addr);
463 gettimeofday(&t, NULL);
464 return (t.tv_sec + t.tv_usec * 1.e-6);
471 array[
event - 50000 ] =
getTimeSec() - m_start_time;
479 unsigned int checksum = 0;
480 for (
int i = 0; i < nwords; i++) {
481 checksum = checksum ^ buf[ i ];
490 for (
int i = 0; i < nwords; i++) {
491 printf(
"%.8x ", buf[ i ]);
492 if (i % 10 == 9) printf(
"\n[DEBUG]");
501void SerializerModule::openRunPauseNshm()
503 char path_shm[100] =
"/cpr_pause_resume";
504 int fd = shm_open(path_shm, O_RDONLY, 0666);
506 printf(
"[DEBUG] %s\n", path_shm);
507 perror(
"[FATAL] Failed to open shm_open");
510 m_ptr = (
int*)mmap(NULL,
sizeof(
int), PROT_READ, MAP_SHARED, fd, 0);
514int SerializerModule::checkRunPause()
518 const RunInfoBuffer& status(DeSerializerModule::getStatus());
519 if (status.getState() == status.PAUSING) {
529void SerializerModule::resumeRun()
547 ret = recv(socket, buffer,
sizeof(buffer), MSG_DONTWAIT);
550 printf(
"EOF %d\n", socket); fflush(stdout);
554 if (errno == EAGAIN) {
555 printf(
"EAGAIN %d\n", socket); fflush(stdout);
559 printf(
"ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
565 printf(
"Flushing data in socket buffer (%d bytes) : sockid = %d\n", ret, socket); fflush(stdout);
570void SerializerModule::callCheckRunPause(
string& err_str)
574 printf(
"###########(Ser) TIMEOUT during send() ###############\n");
578 if (checkRunPause()) {
581 printf(
"###########(Ser) Stop is detected after return from send ###############\n");
585 err_str =
"RUN_PAUSE";
599 if (g_run_pause == 1) {
602 printf(
"###########(Ser) Go back to Deseializer() ###############\n");
607 }
else if (g_run_resuming == 1) {
610 printf(
"###########(Ser) Run resuming...() ###############\n");
632 for (
int j = 0; j < raw_dblkarray.
getEntries(); j++) {
637 B2INFO(
"SerializerPC: Sending the 1st packet...");
641 m_totbytes += sendByWriteV(raw_dblkarray[ j ]);
643 }
catch (
const string& err_str) {
646 if (err_str ==
"RUN_PAUSE" || err_str ==
"RUN_ERROR") {
650 print_err.PrintError((
char*)(err_str.c_str()), __FILE__, __PRETTY_FUNCTION__, __LINE__);
684 if (status.isAvailable()) {
686 status.addOutputCount(raw_dblkarray.
getEntries());
void setDescription(const std::string &description)
Sets the description of the module.
The RawDataBlock class Base class for rawdata handling.
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
void shmOpen(char *path_cfg, char *path_sta)
open shared memory
int CheckConnection(int socket)
Check socket status.
unsigned long long m_totbytes
sent data size
int m_port_to
Destination port.
std::string m_hostname_local
Destination Host.
SerializerModule()
Constructor / Destructor.
void initialize() override
Module functions to be called from main process.
unsigned int calcXORChecksum(int *buf, int nwords)
calculate checksum
int * shmGet(int fd, int size_words)
Get shared memory.
void event() override
This method is the core of the module.
int m_start_flag
start flag
int BUF_SIZE_WORD
size of buffer for one event (word)
int m_shmfd_cfg
file descripter for shm
int * m_cfg_buf
buffer for shared memory
void endRun() override
This method is called if the current run ends.
void terminate() override
This method is called at the end of the event processing.
int n_basf2evt
No. of sent events.
CprErrorMessage print_err
error message program
int m_shmfd_sta
file descripter for shm
void printData(int *buf, int nwords)
print data contents
void beginRun() override
Module functions to be called from event process.
int m_shmflag
Use shared memory.
void recordTime(int event, double *array)
store time info.
int * m_cfg_sta
buffer for shared memory
int m_compressionLevel
Compression parameter.
int Send(int socket, char *buf, int size_bytes)
send buffer
double getTimeSec()
store time info.
RawHeader_latest tmp_header
which format is used
Accessor to arrays stored in the data store.
int getEntries() const
Get the number of objects in the array.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
unsigned int GetEveNo()
get restart #(8bit)
unsigned int GetNodeID()
get contents of header
unsigned int GetExpRunSubrun()
get contents of header
Abstract base class for different kinds of events.