Belle II Software development
Serializer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/rawdata/modules/DAQConsts.h>
9#include <daq/rawdata/modules/Serializer.h>
10#include <daq/rawdata/modules/DeSerializer.h>
11
12#include <netinet/tcp.h>
13
14#include <sys/uio.h>
15
16#include <csignal>
17#include <fcntl.h>
18
19using namespace std;
20using namespace Belle2;
21
22//#define DEBUG
23
24//-----------------------------------------------------------------
25// Register the Module
26//-----------------------------------------------------------------
27REG_MODULE(Serializer);
28
29//-----------------------------------------------------------------
30// Implementation
31//-----------------------------------------------------------------
32
33
34
36{
37 //Set module properties
38 setDescription("Encode DataStore into RingBuffer");
39 addParam("DestPort", m_port_to, "Destination port", BASE_PORT_ROPC_COPPER);
40 addParam("LocalHostName", m_hostname_local, "local host", string(""));
41#ifdef DUMMY
42 addParam("EventDataBufferWords", BUF_SIZE_WORD, "DataBuffer words per event", 4800);
43#endif
44 addParam("use Shared Memory", m_shmflag, "m_shmflag", 0);
45
46 m_start_flag = 0;
47 n_basf2evt = -1;
49
50 //Parameter definition
51 B2INFO("Tx: Constructor done.");
52
53}
54
55
56
57SerializerModule::~SerializerModule()
58{
59}
60
62{
63 signal(SIGPIPE, SIG_IGN);
64
65#ifdef DUMMY
66 m_buffer = new int[ BUF_SIZE_WORD ];
67#endif
68
69
70 if (m_shmflag != 0) {
71 char temp_char1[100] = "/cpr_config";
72 char temp_char2[100] = "/cpr_status";
73 shmOpen(temp_char1, temp_char2);
74 // Status format : status_flag
77 m_cfg_sta[ 0 ] = 1; // Status bit is 1 : ready before accept()
78 }
79
80 // Create Message Handler
81 memset(time_array0, 0, sizeof(time_array0));
82 memset(time_array1, 0, sizeof(time_array1));
83 memset(time_array2, 0, sizeof(time_array2));
84
85 RunInfoBuffer& status(DeSerializerModule::getStatus());
86 if (status.isAvailable()) {
87 status.setOutputNBytes(0);
88 status.setOutputCount(0);
89 }
90
91 Accept();
92
93#ifdef NONSTOP
94 openRunPauseNshm();
95#endif
96
97 B2INFO("Tx initialized.");
98}
99
100
101
103{
104 B2INFO("beginRun called.");
105}
106
107
108
109
111{
112 //fill Run data
113 B2INFO("endRun done.");
114}
115
116
118{
119 B2INFO("terminate called");
120}
121
122
123
124//
125// User defined functions
126//
127
128
129
130int* SerializerModule::shmGet(int fd, int size_words)
131{
132 int offset = 0;
133 return (int*)mmap(NULL, size_words * sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, offset);
134}
135
136void SerializerModule::shmOpen(char* path_cfg, char* path_sta)
137{
138 errno = 0;
139 m_shmfd_cfg = shm_open(path_cfg, O_RDWR, 0666);
140 if (m_shmfd_cfg < 0) {
141 char err_buf[500];
142 sprintf(err_buf, "[FATAL] Failed to shm_open (%s). Exiting... : path %s\n",
143 strerror(errno), path_cfg);
144 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
145 sleep(1234567);
146 exit(1);
147 }
148
149 m_shmfd_sta = shm_open(path_sta, O_RDWR, 0666);
150 if (m_shmfd_sta < 0) {
151 char err_buf[500];
152 sprintf(err_buf, "[FATAL] Failed to shm_open (%s). Exiting... : path %s\n",
153 strerror(errno), path_sta);
154 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
155 sleep(1234567);
156 exit(1);
157 }
158
159 int size = 4 * sizeof(int);
160 ftruncate(m_shmfd_cfg, size);
161 ftruncate(m_shmfd_sta, size);
162}
163
164
165
166void SerializerModule::fillSendHeaderTrailer(SendHeader* hdr, SendTrailer* trl,
167 RawDataBlock* rawdblk)
168{
169
170 int total_send_nwords =
171 hdr->GetHdrNwords() +
172 rawdblk->TotalBufNwords() +
173 // rawhdr.GetNwords() +
174 trl->GetTrlNwords();
175
176 hdr->SetNwords(total_send_nwords);
177 hdr->SetNumEventsinPacket(rawdblk->GetNumEvents());
178 hdr->SetNumNodesinPacket(rawdblk->GetNumNodes());
179
180 //
181 // For bug check
182 //
183 if (rawdblk->GetNumEntries() == 1) {
184 if (total_send_nwords != (rawdblk->GetBuffer(0))[ 0 ] + 8) {
185 char err_buf[500];
186 sprintf(err_buf, "[FATAL] Length error. total length %d rawdblk length %d. Exting...\n",
187 total_send_nwords, (rawdblk->GetBuffer(0))[ 0 ]);
188 printData(rawdblk->GetBuffer(0), rawdblk->TotalBufNwords());
189 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
190 sleep(1234567);
191 exit(-1);
192 }
193 }
194
195
196 for (int i = 0; i < rawdblk->GetNumEntries(); i++) {
197
198 //copy event # from a tonp COPPER block
199 if (!(rawdblk->CheckFTSWID(i)) && !(rawdblk->CheckTLUID(i))) {
200 tmp_header.SetBuffer(rawdblk->GetBuffer(i));
201 hdr->SetEventNumber(tmp_header.GetEveNo());
202 hdr->SetNodeID(tmp_header.GetNodeID());
203 hdr->SetExpRunWord(tmp_header.GetExpRunSubrun());
204 break;
205 }
206
207 //Error if you cannot find any COPPER block
208 if (i == (rawdblk->GetNumEntries() - 1)) {
209 printf("[DEBUG] i= %d : num entries %d : Tot words %d\n", i, rawdblk->GetNumEntries(), rawdblk->TotalBufNwords());
210 printData(rawdblk->GetBuffer(0), rawdblk->TotalBufNwords());
211
212 char err_buf[500] = "[FATAL] CORRUPTED DATA: No COPPER blocks in RawDataBlock. Exiting...";
213 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
214 sleep(1234567);
215 exit(-1);
216 }
217 }
218 return;
219}
220
221
222int SerializerModule::sendByWriteV(RawDataBlock* rawdblk)
223{
224 SendHeader send_header;
225 SendTrailer send_trailer;
226 fillSendHeaderTrailer(&send_header, &send_trailer, rawdblk);
227
228 enum {
229 NUM_BUFFER = 3
230 };
231 struct iovec iov[ NUM_BUFFER ];
232
233 // check Body data size
234 int rawcopper_nwords = rawdblk->TotalBufNwords();
235
236 //Fill iov info.
237 iov[0].iov_base = (char*)send_header.GetBuffer();
238 iov[0].iov_len = sizeof(int) * send_header.GetHdrNwords();
239
240 iov[1].iov_base = (char*)rawdblk->GetWholeBuffer();
241 iov[1].iov_len = sizeof(int) * rawcopper_nwords;
242
243 iov[2].iov_base = (char*)send_trailer.GetBuffer();
244 iov[2].iov_len = sizeof(int) * send_trailer.GetTrlNwords();
245
246
247 // Send Multiple buffers
248 int n = 0;
249
250 while (true) {
251 if ((n = writev(m_socket, iov, NUM_BUFFER)) < 0) {
252 if (errno == EINTR) {
253 continue;
254 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
255
256#ifdef NONSTOP
257 // check run-pause request
258 string err_str;
259 callCheckRunPause(err_str);
260#endif
261 continue;
262 } else {
263 char err_buf[500];
264 sprintf(err_buf, "[WARNING] WRITEVa error.(%s) : sent %d bytes, header %lu bytes body %lu trailer %lu : %s %s %d\n",
265 strerror(errno), n, iov[0].iov_len, iov[1].iov_len, iov[2].iov_len,
266 __FILE__, __PRETTY_FUNCTION__, __LINE__);
267#ifdef NONSTOP
268 g_run_error = 1;
269 B2ERROR(err_buf);
270 string err_str = "RUN_ERROR";
271 throw (err_str); // Go to DeSerializer** and wait for run-resume.
272#else
273 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
274 exit(1);
275#endif
276 }
277 }
278 break;
279 }
280
281#ifdef DEBUG
282 printf("[DEBUG] *******BODY**********\n");
283 printf("[DEBUG] \n%.8d : ", 0);
284 printData((int*)(iov[1].iov_base), iov[1].iov_len);
285#endif
286
287
288 int total_send_bytes = sizeof(int) * send_header.GetTotalNwords();
289
290
291 //
292 // Retry sending
293 //
294 if (n != total_send_bytes) {
295 B2WARNING("Serializer: Sent byte(" << n << "bytes) is not same as the event size (" << total_send_bytes << "bytes). Retryring...");
296 double retry_start = getTimeSec();
297
298 // Send Header
299 if (n < (int)(iov[ 0 ].iov_len)) {
300 n += Send(m_socket, (char*)iov[ 0 ].iov_base + n, iov[ 0 ].iov_len - n);
301 }
302
303 if (n < (int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len)) {
304 n += Send(m_socket, (char*)iov[ 1 ].iov_base + (n - iov[ 0 ].iov_len), iov[ 1 ].iov_len - (n - iov[ 0 ].iov_len));
305 }
306
307 if (n < (int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len + iov[ 2 ].iov_len)) {
308 n += Send(m_socket, (char*)iov[ 2 ].iov_base + (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len),
309 iov[ 2 ].iov_len - (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len));
310 }
311
312 double retry_end = getTimeSec();
313 B2WARNING("Resending ends. It takes " << retry_end - retry_start << "(s)");
314 }
315
316 return total_send_bytes;
317
318}
319
320
321
322int SerializerModule::Send(int socket, char* buf, int size_bytes)
323{
324 int sent_bytes = 0;
325 while (true) {
326 int ret = 0;
327 if ((ret = send(socket, buf + sent_bytes, size_bytes - sent_bytes, MSG_NOSIGNAL)) < 0) {
328 if (errno == EINTR) {
329 continue;
330 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
331#ifdef NONSTOP
332 string err_str;
333 callCheckRunPause(err_str);
334#endif
335 continue;
336 } else {
337 char err_buf[500];
338 sprintf(err_buf, "[ERROR] Send Error. (%s) : %s %s %d", strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
339#ifdef NONSTOP
340 g_run_error = 1;
341 B2ERROR(err_buf);
342 string err_str = "RUN_ERROR";
343 throw (err_str);
344#else
345 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
346 exit(1);
347#endif
348 }
349 }
350 sent_bytes += ret;
351 if (sent_bytes == size_bytes) break;
352 }
353 return sent_bytes;
354}
355
356void SerializerModule::Accept()
357{
358
359 //
360 // Connect to cprtb01
361 //
362
363 struct hostent* host;
364 host = gethostbyname(m_hostname_local.c_str());
365 if (host == NULL) {
366 char temp_buf[500];
367 sprintf(temp_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
368 m_hostname_local.c_str(), strerror(errno));
369 print_err.PrintError(temp_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
370 sleep(1234567);
371 exit(1);
372 }
373
374
375
376
377 //
378 // Bind and listen
379 //
380 int fd_listen;
381 struct sockaddr_in sock_listen;
382 sock_listen.sin_family = AF_INET;
383 sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
384
385 socklen_t addrlen = sizeof(sock_listen);
386 sock_listen.sin_port = htons(m_port_to);
387 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
388
389 int flags = 1;
390 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)sizeof(flags));
391 if (ret < 0) {
392 perror("Failed to set REUSEADDR");
393 }
394
395 if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
396 char temp_char[500];
397 sprintf(temp_char, "[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
398 strerror(errno), m_port_to);
399 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
400 exit(1);
401 }
402
403 int val1 = 0;
404 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)sizeof(val1));
405 int backlog = 1;
406 if (listen(fd_listen, backlog) < 0) {
407 char err_buf[500];
408 sprintf(err_buf, "[FATAL] Failed in listen(%s). Exting...", strerror(errno));
409 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
410 exit(-1);
411 }
412
413 //
414 // Accept
415 //
416 int fd_accept;
417 struct sockaddr_in sock_accept;
418 // printf( "[DEBUG] Accepting... : port %d server %s\n", m_port_to, m_hostname_local.c_str());
419 // fflush(stderr);
420 // B2INFO("Accepting... : port " << m_port_to << " server " << m_hostname_local.c_str() );
421 B2INFO("Accepting...");
422 if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
423 char err_buf[500];
424 sprintf(err_buf, "[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
425 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
426 exit(-1);
427 } else {
428 // B2INFO("Connection is established: port " << htons(sock_accept.sin_port) << " address " << sock_accept.sin_addr.s_addr );
429 B2INFO("Done.");
430
431 // set timepout option
432 struct timeval timeout;
433 timeout.tv_sec = 1;
434 timeout.tv_usec = 0;
435 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)sizeof(timeout));
436 if (ret < 0) {
437 char temp_char[100] = "[FATAL] Failed to set TIMEOUT. Exiting...";
438 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
439 exit(-1);
440 }
441 }
442 close(fd_listen);
443
444 // int flag = 1;
445 // ret = setsockopt(fd_accept, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag) );
446 m_socket = fd_accept;
447 RunInfoBuffer& status(DeSerializerModule::getStatus());
448 if (status.isAvailable()) {
449 //status.setOutputPort(ntohs(sock_accept.sin_port));
450 //status.setOutputAddress(sock_accept.sin_addr.s_addr);
451 status.setOutputPort(ntohs(sock_listen.sin_port));
452 status.setOutputAddress(sock_listen.sin_addr.s_addr);
453 printf("%d %x\n", (int)ntohs(sock_listen.sin_port), (int)sock_listen.sin_addr.s_addr);
454 }
455
456 return;
457
458}
459
461{
462 struct timeval t;
463 gettimeofday(&t, NULL);
464 return (t.tv_sec + t.tv_usec * 1.e-6);
465}
466
467
468void SerializerModule::recordTime(int event, double* array)
469{
470 if (event >= 50000 && event < 50500) {
471 array[ event - 50000 ] = getTimeSec() - m_start_time;
472 }
473 return;
474}
475
476
477unsigned int SerializerModule::calcXORChecksum(int* buf, int nwords)
478{
479 unsigned int checksum = 0;
480 for (int i = 0; i < nwords; i++) {
481 checksum = checksum ^ buf[ i ];
482 }
483 return checksum;
484}
485
486
487void SerializerModule::printData(int* buf, int nwords)
488{
489 printf("[DEBUG]");
490 for (int i = 0; i < nwords; i++) {
491 printf("%.8x ", buf[ i ]);
492 if (i % 10 == 9) printf("\n[DEBUG]");
493 }
494 printf("\n[DEBUG]");
495 printf("\n");
496 return;
497}
498
499
500#ifdef NONSTOP
501void SerializerModule::openRunPauseNshm()
502{
503 char path_shm[100] = "/cpr_pause_resume";
504 int fd = shm_open(path_shm, O_RDONLY, 0666);
505 if (fd < 0) {
506 printf("[DEBUG] %s\n", path_shm);
507 perror("[FATAL] Failed to open shm_open");
508 exit(1);
509 }
510 m_ptr = (int*)mmap(NULL, sizeof(int), PROT_READ, MAP_SHARED, fd, 0);
511 return;
512}
513
514int SerializerModule::checkRunPause()
515{
516
517#ifdef NONSTOP_SLC
518 const RunInfoBuffer& status(DeSerializerModule::getStatus());
519 if (status.getState() == status.PAUSING) {
520#else
521 if (*m_ptr) {
522#endif
523 return 1;
524 } else {
525 return 0;
526 }
527}
528
529void SerializerModule::resumeRun()
530{
531 if (CheckConnection(m_socket) < 0) Accept();
532 g_run_resuming = 0; // run_resuming phase is over.
533 return;
534}
535
536
538{
539 // Modify Yamagata-san's eb/iseof.cc
540 int ret;
541 char buffer[1000];
542 while (true) {
543 //
544 // Extract data in the socket buffer of a peer
545 //
546 // ret = recv( socket, buffer, sizeof(buffer), MSG_PEEK|MSG_DONTWAIT );
547 ret = recv(socket, buffer, sizeof(buffer), MSG_DONTWAIT);
548 switch (ret) {
549 case 0: /* EOF */
550 printf("EOF %d\n", socket); fflush(stdout);
551 close(socket);
552 return -1;
553 case -1:
554 if (errno == EAGAIN) {
555 printf("EAGAIN %d\n", socket); fflush(stdout);
556 /* not EOF, no data in queue */
557 return 0;
558 } else {
559 printf("ERROR %d errno %d err %s\n", socket, errno, strerror(errno)); fflush(stdout);
560 close(socket);
561 return -1;
562 }
563 break;
564 default:
565 printf("Flushing data in socket buffer (%d bytes) : sockid = %d\n", ret, socket); fflush(stdout);
566 }
567 }
568}
569
570void SerializerModule::callCheckRunPause(string& err_str)
571{
572#ifdef NONSTOP_DEBUG
573 printf("\033[34m");
574 printf("###########(Ser) TIMEOUT during send() ###############\n");
575 fflush(stdout);
576 printf("\033[0m");
577#endif
578 if (checkRunPause()) {
579#ifdef NONSTOP_DEBUG
580 printf("\033[31m");
581 printf("###########(Ser) Stop is detected after return from send ###############\n");
582 fflush(stdout);
583 printf("\033[0m");
584#endif
585 err_str = "RUN_PAUSE";
586 g_run_pause = 1;
587 throw (err_str);
588 }
589 return;
590}
591
592#endif
593
594
596{
597
598#ifdef NONSTOP
599 if (g_run_pause == 1) {
600#ifdef NONSTOP_DEBUG
601 printf("\033[31m");
602 printf("###########(Ser) Go back to Deseializer() ###############\n");
603 fflush(stdout);
604 printf("\033[0m");
605#endif
606 return; // Nothing to do here
607 } else if (g_run_resuming == 1) {
608#ifdef NONSTOP_DEBUG
609 printf("\033[31m");
610 printf("###########(Ser) Run resuming...() ###############\n");
611 fflush(stdout);
612 printf("\033[0m");
613#endif
614 resumeRun();
615 return;
616 }
617#endif
618
619 if (m_start_flag == 0) {
620 m_start_time = getTimeSec();
621 n_basf2evt = 0;
622 }
623
624#ifdef TIME_MONITOR
625 recordTime(n_basf2evt, time_array0);
626#endif
627
628 // StoreArray<RawCOPPER> rawcprarray;
629 StoreArray<RawDataBlock> raw_dblkarray;
630
631
632 for (int j = 0; j < raw_dblkarray.getEntries(); j++) {
633 //
634 // Send data
635 //
636 if (m_start_flag == 0) {
637 B2INFO("SerializerPC: Sending the 1st packet...");
638 }
639
640 try {
641 m_totbytes += sendByWriteV(raw_dblkarray[ j ]);
642 // } catch (string err_str) {
643 } catch (const string& err_str) {
644
645#ifdef NONSTOP
646 if (err_str == "RUN_PAUSE" || err_str == "RUN_ERROR") {
647 return; // Go to DeSerializer***() to wait for run-resume.
648 }
649#endif
650 print_err.PrintError((char*)(err_str.c_str()), __FILE__, __PRETTY_FUNCTION__, __LINE__);
651 exit(1);
652 }
653 if (m_start_flag == 0) {
654 B2INFO("Done. ");
655 m_start_flag = 1;
656 }
657 }
658
659
660 //
661 // Print current status
662 //
663 if (n_basf2evt % 1000 == 0) {
664 // double cur_time = getTimeSec();
665 // double total_time = cur_time - m_start_time;
666 // double interval = cur_time - m_prev_time;
667 // if (n_basf2evt != 0) {
668 // double multieve = (1. / interval);
669 // if (multieve > 2.) multieve = 2.;
670 // }
671 // time_t timer;
672 // struct tm* t_st;
673 // time(&timer);
674 // t_st = localtime(&timer);
675 // printf( "[DEBUG] Event %d TotSent %.1lf [MB] ElapsedTime %.1lf [s] RcvdRate %.2lf [MB/s] %s",
676 // n_basf2evt, m_totbytes / 1.e6, total_time, (m_totbytes - m_prev_totbytes) / interval / 1.e6, asctime(t_st));
677 // fflush(stderr);
678 // m_prev_time = cur_time;
679 // m_prev_totbytes = m_totbytes;
680 // m_prev_nevt = n_basf2evt;
681 }
682 n_basf2evt++;
683 RunInfoBuffer& status(DeSerializerModule::getStatus());
684 if (status.isAvailable()) {
685 status.setOutputNBytes(m_totbytes);
686 status.addOutputCount(raw_dblkarray.getEntries());
687 }
688
689}
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
The RawDataBlock class Base class for rawdata handling.
Definition: RawDataBlock.h:27
virtual int * GetBuffer(int n)
get nth buffer pointer
Definition: RawDataBlock.h:53
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:101
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
Definition: RawDataBlock.h:67
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
Definition: RawDataBlock.h:74
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:108
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
Definition: RawDataBlock.h:60
virtual int GetNumEvents()
get # of events in m_buffer
Definition: RawDataBlock.h:81
virtual int TotalBufNwords()
Get total length of m_buffer.
Definition: RawDataBlock.h:39
void SetBuffer(int *bufin)
set buffer
void SetNumEventsinPacket(int num_events)
set contents of Header
Definition: SendHeader.cc:61
int GetHdrNwords()
get contents of Header
Definition: SendHeader.cc:124
void SetNwords(int total_data_nwords)
initialize Header
Definition: SendHeader.cc:51
int * GetBuffer(void)
Get Header contents.
Definition: SendHeader.cc:32
void shmOpen(char *path_cfg, char *path_sta)
open shared memory
Definition: Serializer.cc:136
int CheckConnection(int socket)
Check socket status.
unsigned long long m_totbytes
sent data size
Definition: Serializer.h:152
int m_port_to
Destination port.
Definition: Serializer.h:118
std::string m_hostname_local
Destination Host.
Definition: Serializer.h:115
SerializerModule()
Constructor / Destructor.
Definition: Serializer.cc:35
void initialize() override
Module functions to be called from main process.
Definition: Serializer.cc:61
unsigned int calcXORChecksum(int *buf, int nwords)
calculate checksum
Definition: Serializer.cc:477
int m_socket
Socket ID.
Definition: Serializer.h:121
int * shmGet(int fd, int size_words)
Get shared memory.
Definition: Serializer.cc:130
void event() override
This method is the core of the module.
Definition: Serializer.cc:595
int m_start_flag
start flag
Definition: Serializer.h:155
int BUF_SIZE_WORD
size of buffer for one event (word)
Definition: Serializer.h:102
int m_shmfd_cfg
file descripter for shm
Definition: Serializer.h:82
int * m_cfg_buf
buffer for shared memory
Definition: Serializer.h:88
void endRun() override
This method is called if the current run ends.
Definition: Serializer.cc:110
void terminate() override
This method is called at the end of the event processing.
Definition: Serializer.cc:117
int n_basf2evt
No. of sent events.
Definition: Serializer.h:108
CprErrorMessage print_err
error message program
Definition: Serializer.h:149
int m_shmfd_sta
file descripter for shm
Definition: Serializer.h:85
void printData(int *buf, int nwords)
print data contents
Definition: Serializer.cc:487
void beginRun() override
Module functions to be called from event process.
Definition: Serializer.cc:102
int m_shmflag
Use shared memory.
Definition: Serializer.h:64
void recordTime(int event, double *array)
store time info.
Definition: Serializer.cc:468
int * m_cfg_sta
buffer for shared memory
Definition: Serializer.h:91
int m_compressionLevel
Compression parameter.
Definition: Serializer.h:105
int Send(int socket, char *buf, int size_bytes)
send buffer
Definition: Serializer.cc:322
double getTimeSec()
store time info.
Definition: Serializer.cc:460
RawHeader_latest tmp_header
which format is used
Definition: Serializer.h:176
Accessor to arrays stored in the data store.
Definition: StoreArray.h:113
int getEntries() const
Get the number of objects in the array.
Definition: StoreArray.h:216
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
unsigned int GetEveNo()
get restart #(8bit)
unsigned int GetNodeID()
get contents of header
unsigned int GetExpRunSubrun()
get contents of header
Abstract base class for different kinds of events.
STL namespace.