8 #include <daq/rawdata/modules/DAQConsts.h>
9 #include <daq/rawdata/modules/Serializer.h>
10 #include <daq/rawdata/modules/DeSerializer.h>
12 #include <netinet/tcp.h>
38 setDescription(
"Encode DataStore into RingBuffer");
39 addParam(
"DestPort", m_port_to,
"Destination port", BASE_PORT_ROPC_COPPER);
40 addParam(
"LocalHostName", m_hostname_local,
"local host",
string(
""));
42 addParam(
"EventDataBufferWords", BUF_SIZE_WORD,
"DataBuffer words per event", 4800);
44 addParam(
"use Shared Memory", m_shmflag,
"m_shmflag", 0);
48 m_compressionLevel = 0;
51 B2INFO(
"Tx: Constructor done.");
57 SerializerModule::~SerializerModule()
61 void SerializerModule::initialize()
63 signal(SIGPIPE, SIG_IGN);
66 m_buffer =
new int[ BUF_SIZE_WORD ];
71 char temp_char1[100] =
"/cpr_config";
72 char temp_char2[100] =
"/cpr_status";
73 shmOpen(temp_char1, temp_char2);
75 m_cfg_buf = shmGet(m_shmfd_cfg, 4);
76 m_cfg_sta = shmGet(m_shmfd_sta, 4);
81 memset(time_array0, 0,
sizeof(time_array0));
82 memset(time_array1, 0,
sizeof(time_array1));
83 memset(time_array2, 0,
sizeof(time_array2));
86 if (status.isAvailable()) {
87 status.setOutputNBytes(0);
88 status.setOutputCount(0);
97 B2INFO(
"Tx initialized.");
102 void SerializerModule::beginRun()
104 B2INFO(
"beginRun called.");
110 void SerializerModule::endRun()
113 B2INFO(
"endRun done.");
117 void SerializerModule::terminate()
119 B2INFO(
"terminate called");
130 int* SerializerModule::shmGet(
int fd,
int size_words)
133 return (
int*)mmap(NULL, size_words *
sizeof(
int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, offset);
136 void SerializerModule::shmOpen(
char* path_cfg,
char* path_sta)
139 m_shmfd_cfg = shm_open(path_cfg, O_RDWR, 0666);
140 if (m_shmfd_cfg < 0) {
142 sprintf(err_buf,
"[FATAL] Failed to shm_open (%s). Exiting... : path %s\n",
143 strerror(errno), path_cfg);
144 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
149 m_shmfd_sta = shm_open(path_sta, O_RDWR, 0666);
150 if (m_shmfd_sta < 0) {
152 sprintf(err_buf,
"[FATAL] Failed to shm_open (%s). Exiting... : path %s\n",
153 strerror(errno), path_sta);
154 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
159 int size = 4 *
sizeof(int);
160 ftruncate(m_shmfd_cfg, size);
161 ftruncate(m_shmfd_sta, size);
170 int total_send_nwords =
184 if (total_send_nwords != (rawdblk->
GetBuffer(0))[ 0 ] + 8) {
186 sprintf(err_buf,
"[FATAL] Length error. total length %d rawdblk length %d. Exting...\n",
187 total_send_nwords, (rawdblk->
GetBuffer(0))[ 0 ]);
189 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
200 tmp_header.SetBuffer(rawdblk->
GetBuffer(i));
201 hdr->SetEventNumber(tmp_header.GetEveNo());
202 hdr->SetNodeID(tmp_header.GetNodeID());
203 hdr->SetExpRunWord(tmp_header.GetExpRunSubrun());
212 char err_buf[500] =
"[FATAL] CORRUPTED DATA: No COPPER blocks in RawDataBlock. Exiting...";
213 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
222 int SerializerModule::sendByWriteV(
RawDataBlock* rawdblk)
226 fillSendHeaderTrailer(&send_header, &send_trailer, rawdblk);
231 struct iovec iov[ NUM_BUFFER ];
237 iov[0].iov_base = (
char*)send_header.
GetBuffer();
238 iov[0].iov_len =
sizeof(int) * send_header.
GetHdrNwords();
241 iov[1].iov_len =
sizeof(int) * rawcopper_nwords;
243 iov[2].iov_base = (
char*)send_trailer.GetBuffer();
244 iov[2].iov_len =
sizeof(int) * send_trailer.GetTrlNwords();
251 if ((n = writev(m_socket, iov, NUM_BUFFER)) < 0) {
252 if (errno == EINTR) {
254 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
259 callCheckRunPause(err_str);
264 sprintf(err_buf,
"[WARNING] WRITEVa error.(%s) : sent %d bytes, header %lu bytes body %lu trailer %lu : %s %s %d\n",
265 strerror(errno), n, iov[0].iov_len, iov[1].iov_len, iov[2].iov_len,
266 __FILE__, __PRETTY_FUNCTION__, __LINE__);
270 string err_str =
"RUN_ERROR";
273 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
282 printf(
"[DEBUG] *******BODY**********\n");
283 printf(
"[DEBUG] \n%.8d : ", 0);
284 printData((
int*)(iov[1].iov_base), iov[1].iov_len);
288 int total_send_bytes =
sizeof(int) * send_header.GetTotalNwords();
294 if (n != total_send_bytes) {
295 B2WARNING(
"Serializer: Sent byte(" << n <<
"bytes) is not same as the event size (" << total_send_bytes <<
"bytes). Retryring...");
296 double retry_start = getTimeSec();
299 if (n < (
int)(iov[ 0 ].iov_len)) {
300 n += Send(m_socket, (
char*)iov[ 0 ].iov_base + n, iov[ 0 ].iov_len - n);
303 if (n < (
int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len)) {
304 n += Send(m_socket, (
char*)iov[ 1 ].iov_base + (n - iov[ 0 ].iov_len), iov[ 1 ].iov_len - (n - iov[ 0 ].iov_len));
307 if (n < (
int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len + iov[ 2 ].iov_len)) {
308 n += Send(m_socket, (
char*)iov[ 2 ].iov_base + (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len),
309 iov[ 2 ].iov_len - (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len));
312 double retry_end = getTimeSec();
313 B2WARNING(
"Resending ends. It takes " << retry_end - retry_start <<
"(s)");
316 return total_send_bytes;
322 int SerializerModule::Send(
int socket,
char* buf,
int size_bytes)
327 if ((ret = send(socket, buf + sent_bytes, size_bytes - sent_bytes, MSG_NOSIGNAL)) < 0) {
328 if (errno == EINTR) {
330 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
333 callCheckRunPause(err_str);
338 sprintf(err_buf,
"[ERROR] Send Error. (%s) : %s %s %d", strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
342 string err_str =
"RUN_ERROR";
345 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
351 if (sent_bytes == size_bytes)
break;
356 void SerializerModule::Accept()
363 struct hostent* host;
364 host = gethostbyname(m_hostname_local.c_str());
367 sprintf(temp_buf,
"[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
368 m_hostname_local.c_str(), strerror(errno));
369 print_err.PrintError(temp_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
381 struct sockaddr_in sock_listen;
382 sock_listen.sin_family = AF_INET;
383 sock_listen.sin_addr.s_addr = *(
unsigned int*)host->h_addr_list[0];
385 socklen_t addrlen =
sizeof(sock_listen);
386 sock_listen.sin_port = htons(m_port_to);
387 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
390 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)
sizeof(flags));
392 perror(
"Failed to set REUSEADDR");
395 if (bind(fd_listen, (
struct sockaddr*)&sock_listen,
sizeof(
struct sockaddr)) < 0) {
397 sprintf(temp_char,
"[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
398 strerror(errno), m_port_to);
399 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
404 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)
sizeof(val1));
406 if (listen(fd_listen, backlog) < 0) {
408 sprintf(err_buf,
"[FATAL] Failed in listen(%s). Exting...", strerror(errno));
409 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
417 struct sockaddr_in sock_accept;
421 B2INFO(
"Accepting...");
422 if ((fd_accept = accept(fd_listen, (
struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
424 sprintf(err_buf,
"[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
425 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
432 struct timeval timeout;
435 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)
sizeof(timeout));
437 char temp_char[100] =
"[FATAL] Failed to set TIMEOUT. Exiting...";
438 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
446 m_socket = fd_accept;
448 if (status.isAvailable()) {
451 status.setOutputPort(ntohs(sock_listen.sin_port));
452 status.setOutputAddress(sock_listen.sin_addr.s_addr);
453 printf(
"%d %x\n", (
int)ntohs(sock_listen.sin_port), (
int)sock_listen.sin_addr.s_addr);
460 double SerializerModule::getTimeSec()
463 gettimeofday(&t, NULL);
464 return (t.tv_sec + t.tv_usec * 1.e-6);
468 void SerializerModule::recordTime(
int event,
double* array)
470 if (event >= 50000 && event < 50500) {
471 array[
event - 50000 ] = getTimeSec() - m_start_time;
477 unsigned int SerializerModule::calcXORChecksum(
int* buf,
int nwords)
479 unsigned int checksum = 0;
480 for (
int i = 0; i < nwords; i++) {
481 checksum = checksum ^ buf[ i ];
487 void SerializerModule::printData(
int* buf,
int nwords)
490 for (
int i = 0; i < nwords; i++) {
491 printf(
"%.8x ", buf[ i ]);
492 if (i % 10 == 9) printf(
"\n[DEBUG]");
501 void SerializerModule::openRunPauseNshm()
503 char path_shm[100] =
"/cpr_pause_resume";
504 int fd = shm_open(path_shm, O_RDONLY, 0666);
506 printf(
"[DEBUG] %s\n", path_shm);
507 perror(
"[FATAL] Failed to open shm_open");
510 m_ptr = (
int*)mmap(NULL,
sizeof(
int), PROT_READ, MAP_SHARED, fd, 0);
514 int SerializerModule::checkRunPause()
518 const RunInfoBuffer& status(DeSerializerModule::getStatus());
519 if (status.getState() == status.PAUSING) {
529 void SerializerModule::resumeRun()
531 if (CheckConnection(m_socket) < 0) Accept();
537 int SerializerModule::CheckConnection(
int socket)
547 ret = recv(socket, buffer,
sizeof(buffer), MSG_DONTWAIT);
550 printf(
"EOF %d\n", socket); fflush(stdout);
554 if (errno == EAGAIN) {
555 printf(
"EAGAIN %d\n", socket); fflush(stdout);
559 printf(
"ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
565 printf(
"Flushing data in socket buffer (%d bytes) : sockid = %d\n", ret, socket); fflush(stdout);
570 void SerializerModule::callCheckRunPause(
string& err_str)
574 printf(
"###########(Ser) TIMEOUT during send() ###############\n");
578 if (checkRunPause()) {
581 printf(
"###########(Ser) Stop is detected after return from send ###############\n");
585 err_str =
"RUN_PAUSE";
595 void SerializerModule::event()
599 if (g_run_pause == 1) {
602 printf(
"###########(Ser) Go back to Deseializer() ###############\n");
607 }
else if (g_run_resuming == 1) {
610 printf(
"###########(Ser) Run resuming...() ###############\n");
619 if (m_start_flag == 0) {
620 m_start_time = getTimeSec();
625 recordTime(n_basf2evt, time_array0);
632 for (
int j = 0; j < raw_dblkarray.
getEntries(); j++) {
636 if (m_start_flag == 0) {
637 B2INFO(
"SerializerPC: Sending the 1st packet...");
641 m_totbytes += sendByWriteV(raw_dblkarray[ j ]);
643 }
catch (
string err_str) {
646 if (err_str ==
"RUN_PAUSE" || err_str ==
"RUN_ERROR") {
650 print_err.PrintError((
char*)(err_str.c_str()), __FILE__, __PRETTY_FUNCTION__, __LINE__);
653 if (m_start_flag == 0) {
663 if (n_basf2evt % 1000 == 0) {
684 if (status.isAvailable()) {
685 status.setOutputNBytes(m_totbytes);
686 status.addOutputCount(raw_dblkarray.
getEntries());
The RawDataBlock class Base class for rawdata handling.
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
A class definition of an input module for Sequential ROOT I/O.
Accessor to arrays stored in the data store.
int getEntries() const
Get the number of objects in the array.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.