Belle II Software development
DesSer Class Referenceabstract

A class definition of an input module for Sequential ROOT I/O. More...

#include <DesSer.h>

Inheritance diagram for DesSer:
DesSerCOPPER DesSerPrePC

Public Member Functions

 DesSer ()
 Constructor / Destructor.
 
void initialize (bool close_listen=true)
 
virtual void DataAcquisition ()=0
 Module functions to be called from event process.
 
void printData (int *buf, int nwords)
 dump error data
 
int * getPreAllocBuf ()
 Getbuffer.
 
int * getNewBuffer (int nwords, int *delete_flag)
 Getbuffer.
 
double getTimeSec ()
 store time info.
 
void recordTime (int event, double *array)
 store time info.
 
unsigned int calcSimpleChecksum (int *buf, int nwords)
 calculate checksum
 
unsigned int calcXORChecksum (int *buf, int nwords)
 calculate checksum
 
void clearNumUsedBuf ()
 
void RateMonitor (unsigned int nevt)
 monitor rate
 
void shmOpen (char *path_cfg, char *path_sta)
 open shared memory
 
int * shmGet (int fd, int size_words)
 Get shared memory.
 
void initializeNode ()
 Module functions to be called from main process.
 
void beginRun ()
 Module functions to be called from event process.
 
void endRun ()
 
void terminate ()
 
int sendByWriteV (RawDataBlockFormat *rawdblk)
 
void Accept (bool close_listen=true)
 
void fillSendHeaderTrailer (SendHeader *hdr, SendTrailer *trl, RawDataBlockFormat *rawdblk)
 
int Send (int socket, char *buf, int size_bytes)
 send buffer
 

Public Attributes

int max_nevt
 
double max_seconds
 time to stop a run
 
int m_compressionLevel
 Compression Level.
 
int n_basf2evt
 No. of sent events.
 
int m_prev_nevt
 No. of prev sent events.
 
std::string m_dump_fname
 dump filename
 
FILE * m_fp_dump
 dump file descripter
 
int * m_buffer
 buffer
 
int * m_bufary [NUM_PREALLOC_BUF]
 buffer
 
timeval m_t0
 
double m_recvd_totbytes
 
double m_recvd_prev_totbytes
 
double m_sent_totbytes
 
double m_sent_prev_totbytes
 
int m_ncycle
 
double cur_time
 for time monitoring
 
double m_start_time
 
double m_prev_time
 
int prev_event
 
int m_num_usedbuf
 
std::string m_nodename
 Node Name for SLC.
 
int m_nodeid
 Node ID for SLC.
 
RunInfoBuffer m_status
 Run info buffer.
 
unsigned int m_exprunsubrun_no
 run no.
 
unsigned int m_prev_exprunsubrun_no
 run no.
 
int m_exp_no
 exp no.
 
int m_data_type
 data type
 
int m_trunc_mask
 trunc mask
 
int m_shmflag
 Use shared memory.
 
int m_shmfd_cfg
 file descripter for shm
 
int m_shmfd_sta
 file descripter for shm
 
int monitor_numeve
 buffer for shared memory
 
int m_start_flag
 start flag
 
CprErrorMessage print_err
 wrapper for B2LOG system
 
PreRawCOPPERFormat_v2 m_pre_rawcpr
 report status to SLC
 
int * m_cfg_buf
 buffer for shared memory
 
int * m_cfg_sta
 buffer for shared memory
 
int m_run_pause
 flag to show that run-controller pauses a run
 
int m_run_error
 flag to show that there is some errors with which DAQ cannot continue.
 

Protected Types

enum  {
  COPPER = 1 ,
  ROPC = 2
}
 

Protected Attributes

int m_num_connections
 check data contents
 
std::vector< std::string > m_hostname_from
 Reciever basf2 Socket.
 
std::vector< int > m_port_from
 port # to connect data sources
 
std::vector< int > m_socket_recv
 
int event_diff
 
unsigned int m_prev_copper_ctr
 
unsigned int m_prev_evenum
 
int m_socket_send
 Reciever Socket.
 
std::string m_hostname_local
 Destination Host.
 
int m_port_to
 Destination port.
 
std::string p_method
 EvtSocket.
 
int p_method_val
 
RawHeader_v2 tmp_header
 which format is used
 

Detailed Description

A class definition of an input module for Sequential ROOT I/O.

Definition at line 41 of file DesSer.h.

Member Enumeration Documentation

◆ anonymous enum

anonymous enum
protected

Definition at line 269 of file DesSer.h.

269 {
270 COPPER = 1,
271 ROPC = 2
272 };

Constructor & Destructor Documentation

◆ DesSer()

DesSer ( )

Constructor / Destructor.

Definition at line 30 of file DesSer.cc.

31{
32
34
35 m_exprunsubrun_no = 0; // will obtain info from data
36
37 m_prev_exprunsubrun_no = 0xFFFFFFFF;
38
39#ifdef NONSTOP
40 m_run_pause = 0;
41
42 m_run_error = 0;
43#endif
44
45 // B2INFO("DeSerializerPrePC: Constructor done.");
46 printf("[DEBUG] DesSer: Constructor done.\n"); fflush(stdout);
47}
unsigned int m_prev_exprunsubrun_no
run no.
Definition: DesSer.h:147
int m_run_pause
flag to show that run-controller pauses a run
Definition: DesSer.h:227
int m_run_error
flag to show that there is some errors with which DAQ cannot continue.
Definition: DesSer.h:230
int m_num_connections
check data contents
Definition: DesSer.h:237
unsigned int m_exprunsubrun_no
run no.
Definition: DesSer.h:144

◆ ~DesSer()

~DesSer ( )
virtual

Definition at line 51 of file DesSer.cc.

52{
53
54}

Member Function Documentation

◆ Accept()

void Accept ( bool  close_listen = true)

Definition at line 364 of file DesSer.cc.

365{
366 //
367 // Connect to cprtb01
368 //
369 struct hostent* host;
370 host = gethostbyname(m_hostname_local.c_str());
371 if (host == NULL) {
372 char temp_buf[500];
373 sprintf(temp_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
374 m_hostname_local.c_str(), strerror(errno));
375 print_err.PrintError(temp_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
376 exit(1);
377 }
378
379 //
380 // Bind and listen
381 //
382 int fd_listen;
383 struct sockaddr_in sock_listen;
384 sock_listen.sin_family = AF_INET;
385 sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
386
387 socklen_t addrlen = sizeof(sock_listen);
388 sock_listen.sin_port = htons(m_port_to);
389 fd_listen = socket(PF_INET, SOCK_STREAM, 0);
390
391 int flags = 1;
392 int ret = setsockopt(fd_listen, SOL_SOCKET, SO_REUSEADDR, &flags, (socklen_t)sizeof(flags));
393 if (ret < 0) {
394 perror("Failed to set REUSEADDR");
395 }
396
397 if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
398 printf("[FATAL] Failed to bind. Maybe other programs have already occupied this port(%d). Exiting...\n",
399 m_port_to); fflush(stdout);
400 // Check the process occupying the port 30000.
401 FILE* fp;
402 char buf[256];
403 char cmdline[500];
404 sprintf(cmdline, "/usr/sbin/ss -ap | grep %d", m_port_to);
405 if ((fp = popen(cmdline, "r")) == NULL) {
406 printf("[WARNING] Failed to run %s\n", cmdline);
407 }
408 while (fgets(buf, 256, fp) != NULL) {
409 printf("[ERROR] Failed to bind. output of ss(port %d) : %s\n", m_port_to, buf); fflush(stdout);
410 }
411 // Error message
412 fclose(fp);
413 char temp_char[500];
414 sprintf(temp_char, "[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
415 strerror(errno), m_port_to);
416 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
417 exit(1);
418 }
419
420 int val1 = 0;
421 setsockopt(fd_listen, IPPROTO_TCP, TCP_NODELAY, &val1, (socklen_t)sizeof(val1));
422 int backlog = 1;
423 if (listen(fd_listen, backlog) < 0) {
424 char err_buf[500];
425 sprintf(err_buf, "Failed in listen(%s). Exting...", strerror(errno));
426 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
427 exit(-1);
428 }
429
430 //
431 // Accept
432 //
433 int fd_accept;
434 struct sockaddr_in sock_accept;
435 printf("[DEBUG] Accepting... : port %d server %s\n", m_port_to, m_hostname_local.c_str());
436 fflush(stdout);
437
438 if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
439 char err_buf[500];
440 sprintf(err_buf, "[FATAL] Failed to accept(%s). Exiting...", strerror(errno));
441 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
442 exit(-1);
443 } else {
444 // B2INFO("Done.");
445 printf("[DEBUG] Done.\n"); fflush(stdout);
446
447 // set timepout option
448 struct timeval timeout;
449 timeout.tv_sec = 1;
450 timeout.tv_usec = 0;
451 ret = setsockopt(fd_accept, SOL_SOCKET, SO_SNDTIMEO, &timeout, (socklen_t)sizeof(timeout));
452 if (ret < 0) {
453 char temp_char[100] = "[FATAL] Failed to set TIMEOUT. Exiting...";
454 print_err.PrintError(temp_char, __FILE__, __PRETTY_FUNCTION__, __LINE__);
455 exit(-1);
456 }
457 }
458
459 if (close_listen) {
460 close(fd_listen);
461 }
462
463 // int flag = 1;
464 // ret = setsockopt(fd_accept, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag) );
465 m_socket_send = fd_accept;
466
467 if (m_status.isAvailable()) {
468 m_status.setOutputPort(ntohs(sock_listen.sin_port));
469 m_status.setOutputAddress(sock_listen.sin_addr.s_addr);
470 // B2INFO("Accepted " << (int)ntohs(sock_listen.sin_port) << " " << (int)sock_listen.sin_addr.s_addr);
471 printf("Accepted. port %d address %d\n", (int)ntohs(sock_listen.sin_port), (int)sock_listen.sin_addr.s_addr); fflush(stdout);
472 }
473
474 return;
475
476}
int m_port_to
Destination port.
Definition: DesSer.h:278
RunInfoBuffer m_status
Run info buffer.
Definition: DesSer.h:141
std::string m_hostname_local
Destination Host.
Definition: DesSer.h:275
int m_socket_send
Reciever Socket.
Definition: DesSer.h:267
CprErrorMessage print_err
wrapper for B2LOG system
Definition: DesSer.h:184

◆ calcXORChecksum()

unsigned int calcXORChecksum ( int *  buf,
int  nwords 
)

calculate checksum

Definition at line 495 of file DesSer.cc.

496{
497 unsigned int checksum = 0;
498 for (int i = 0; i < nwords; i++) {
499 checksum = checksum ^ buf[ i ];
500 }
501 return checksum;
502}

◆ clearNumUsedBuf()

void clearNumUsedBuf ( )
inline

Definition at line 121 of file DesSer.h.

122 {
123 m_num_usedbuf = 0;
124 return ;
125 }
int m_num_usedbuf
Definition: DesSer.h:132

◆ DataAcquisition()

virtual void DataAcquisition ( )
pure virtual

Module functions to be called from event process.

Implemented in DesSerCOPPER, and DesSerPrePC.

◆ fillSendHeaderTrailer()

void fillSendHeaderTrailer ( SendHeader hdr,
SendTrailer trl,
RawDataBlockFormat rawdblk 
)

Definition at line 171 of file DesSer.cc.

172{
173
174 int total_send_nwords =
175 hdr->GetHdrNwords() +
176 rawdblk->TotalBufNwords() +
177 // rawhdr.GetNwords() +
178 trl->GetTrlNwords();
179
180 hdr->SetNwords(total_send_nwords);
181 hdr->SetNumEventsinPacket(rawdblk->GetNumEvents());
182 hdr->SetNumNodesinPacket(rawdblk->GetNumNodes());
183
184 //
185 // For bug check
186 //
187 if (rawdblk->GetNumEntries() == 1) {
188 if (total_send_nwords != (rawdblk->GetBuffer(0))[ 0 ] + 8) {
189 char err_buf[500];
190 sprintf(err_buf, "Length error. total length %d rawdblk length %d. Exting...\n",
191 total_send_nwords, (rawdblk->GetBuffer(0))[ 0 ]);
192 printData(rawdblk->GetBuffer(0), rawdblk->TotalBufNwords());
193 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
194 sleep(1234567);
195 exit(-1);
196 }
197 }
198
199
200 for (int i = 0; i < rawdblk->GetNumEntries(); i++) {
201
202 //copy event # from a tonp COPPER block
203 if (!(rawdblk->CheckFTSWID(i)) && !(rawdblk->CheckTLUID(i))) {
204 tmp_header.SetBuffer(rawdblk->GetBuffer(i));
205 hdr->SetEventNumber(tmp_header.GetEveNo());
206 hdr->SetNodeID(tmp_header.GetNodeID());
207 hdr->SetExpRunWord(tmp_header.GetExpRunSubrun());
208 break;
209 }
210
211 //Error if you cannot find any COPPER block
212 if (i == (rawdblk->GetNumEntries() - 1)) {
213 printf("[DEBUG] i= %d : num entries %d : Tot words %d\n", i, rawdblk->GetNumEntries(), rawdblk->TotalBufNwords());
214 printData(rawdblk->GetBuffer(0), rawdblk->TotalBufNwords());
215
216 char err_buf[500] = "[FATAL] CORRUPTED DATA: No COPPER blocks in RawDataBlock. Exiting...";
217 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
218 sleep(1234567);
219 exit(-1);
220 }
221 }
222 return;
223}
RawHeader_v2 tmp_header
which format is used
Definition: DesSer.h:323
void printData(int *buf, int nwords)
dump error data
Definition: DesSer.cc:505
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual int TotalBufNwords()
Get total length of m_buffer.
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetNumEvents()
get # of events in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
void SetBuffer(int *bufin)
set buffer
Definition: RawHeader_v2.h:47
void SetNumEventsinPacket(int num_events)
set contents of Header
Definition: SendHeader.cc:61
int GetHdrNwords()
get contents of Header
Definition: SendHeader.cc:124
void SetNwords(int total_data_nwords)
initialize Header
Definition: SendHeader.cc:51
unsigned int GetEveNo()
get restart #(8bit)
Definition: RawHeader_v2.h:396
unsigned int GetNodeID()
get contents of header
Definition: RawHeader_v2.h:402
unsigned int GetExpRunSubrun()
get contents of header
Definition: RawHeader_v2.h:389

◆ getNewBuffer()

int * getNewBuffer ( int  nwords,
int *  delete_flag 
)

Getbuffer.

Definition at line 77 of file DesSer.cc.

78{
79
80 int* temp_buf = NULL;
81 // Prepare buffer
82 if (nwords > BUF_SIZE_WORD) {
83 *delete_flag = 1;
84 temp_buf = new int[ nwords ];
85 } else {
86 if ((temp_buf = getPreAllocBuf()) == 0x0) {
87 char err_buf[500];
88 sprintf(err_buf, "Null pointer from GetPreALlocBuf(). Exting...\n");
89 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
90 sleep(1234567);
91 exit(1);
92 } else {
93 *delete_flag = 0;
94 }
95 }
96
97 return temp_buf;
98
99}
int * getPreAllocBuf()
Getbuffer.
Definition: DesSer.cc:57

◆ getPreAllocBuf()

int * getPreAllocBuf ( )

Getbuffer.

Definition at line 57 of file DesSer.cc.

58{
59 int* tempbuf = 0;
60 if (m_num_usedbuf < NUM_PREALLOC_BUF) {
61 tempbuf = m_bufary[ m_num_usedbuf ];
63 } else {
64 char err_buf[500];
65 sprintf(err_buf,
66 "No pre-allocated buffers are left. %d > %d. Not enough buffers are allocated or "
67 "memory leak or forget to call ClearNumUsedBuf every event loop. Exting...",
68 m_num_usedbuf, NUM_PREALLOC_BUF);
69 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
70 sleep(1234567);
71 exit(1);
72 }
73 return tempbuf;
74}
int * m_bufary[NUM_PREALLOC_BUF]
buffer
Definition: DesSer.h:85

◆ getTimeSec()

double getTimeSec ( )

store time info.

Definition at line 478 of file DesSer.cc.

479{
480 struct timeval t;
481 gettimeofday(&t, NULL);
482 return (t.tv_sec + t.tv_usec * 1.e-6);
483}

◆ initialize()

void initialize ( bool  close_listen = true)

Definition at line 101 of file DesSer.cc.

102{
103 printf("[DEBUG] DesSer: initialize() started.\n"); fflush(stdout);
104 signal(SIGPIPE, SIG_IGN);
105
106 //
107 // initialize Rx part from DeSerializer**.cc
108 //
109
110 // allocate buffer
111 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
112 m_bufary[i] = new int[ BUF_SIZE_WORD ];
113 }
114 m_buffer = new int[ BUF_SIZE_WORD ];
115
116 // initialize buffer
117 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
118 memset(m_bufary[i], 0, BUF_SIZE_WORD * sizeof(int));
119 }
120
121 // Open message handler
122 clearNumUsedBuf();
123 // Shared memory
124 if (m_shmflag > 0) {
125 if (m_nodename.size() == 0 || m_nodeid < 0) {
126 m_shmflag = 0;
127 } else {
129 m_status.reportReady();
130 }
131 }
132
133 event_diff = 0;
134
135 m_prev_copper_ctr = 0xFFFFFFFF;
136 m_prev_evenum = 0xFFFFFFFF;
137
138
139 //
140 // initialize Rx part from DeSerializer**.cc
141 //
142 m_start_flag = 0;
143 n_basf2evt = -1;
145
146#ifdef DUMMY
147 m_buffer = new int[ BUF_SIZE_WORD ];
148#endif
149 Accept(close_listen);
150#ifdef NONSTOP
151 openRunPauseNshm();
152#endif
153
154
155 // For monitor
156 if (m_status.isAvailable()) {
157 m_status.setOutputNBytes(0);
158 m_status.setOutputCount(0);
159 }
160 // B2INFO("DesSer: initialize() was done.");
161 printf("[DEBUG] DesSer: initialize() was done.\n"); fflush(stdout);
162
163}
std::string m_nodename
Node Name for SLC.
Definition: DesSer.h:135
int m_start_flag
start flag
Definition: DesSer.h:181
int * m_buffer
buffer
Definition: DesSer.h:82
int n_basf2evt
No. of sent events.
Definition: DesSer.h:70
int m_shmflag
Use shared memory.
Definition: DesSer.h:159
int m_compressionLevel
Compression Level.
Definition: DesSer.h:67
int m_nodeid
Node ID for SLC.
Definition: DesSer.h:138

◆ printData()

void printData ( int *  buf,
int  nwords 
)

dump error data

Definition at line 505 of file DesSer.cc.

506{
507 printf("[DEBUG]");
508 for (int i = 0; i < nwords; i++) {
509 printf("%.8x ", buf[ i ]);
510 if (i % 10 == 9) printf("\n[DEBUG]");
511 }
512 printf("\n[DEBUG]");
513 printf("\n");
514 return;
515}

◆ recordTime()

void recordTime ( int  event,
double *  array 
)

store time info.

Definition at line 486 of file DesSer.cc.

487{
488 if (event >= 50000 && event < 50500) {
489 array[ event - 50000 ] = getTimeSec() - m_start_time;
490 }
491 return;
492}
double getTimeSec()
store time info.
Definition: DesSer.cc:478

◆ Send()

int Send ( int  socket,
char *  buf,
int  size_bytes 
)

send buffer

Definition at line 328 of file DesSer.cc.

329{
330 int sent_bytes = 0;
331 while (true) {
332 int ret = 0;
333 if ((ret = send(socket,
334 buf + sent_bytes, size_bytes - sent_bytes, MSG_NOSIGNAL)) < 0) {
335 if (errno == EINTR) {
336 continue;
337 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
338#ifdef NONSTOP
339 string err_str;
340 callCheckRunPause(err_str);
341#endif
342 continue;
343 } else {
344 char err_buf[500];
345 sprintf(err_buf, "[WARNING] SEND ERROR.(%s)", strerror(errno));
346#ifdef NONSTOP
347 m_run_error = 1;
348 // B2ERROR(err_buf);
349 printf("%s\n", err_buf); fflush(stdout);
350 string err_str = "RUN_ERROR";
351 throw (err_str); // To exit this module, go to DeSerializer** and wait for run-resume.
352#else
353 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
354 exit(1);
355#endif
356 }
357 }
358 sent_bytes += ret;
359 if (sent_bytes == size_bytes) break;
360 }
361 return sent_bytes;
362}

◆ sendByWriteV()

int sendByWriteV ( RawDataBlockFormat rawdblk)

Definition at line 227 of file DesSer.cc.

228{
229 SendHeader send_header;
230 SendTrailer send_trailer;
231 fillSendHeaderTrailer(&send_header, &send_trailer, rawdblk);
232
233 enum {
234 NUM_BUFFER = 3
235 };
236 struct iovec iov[ NUM_BUFFER ];
237
238 // check Body data size
239 int rawcopper_nwords = rawdblk->TotalBufNwords();
240
241 //Fill iov info.
242 iov[0].iov_base = (char*)send_header.GetBuffer();
243 iov[0].iov_len = sizeof(int) * send_header.GetHdrNwords();
244
245 iov[1].iov_base = (char*)rawdblk->GetWholeBuffer();
246 iov[1].iov_len = sizeof(int) * rawcopper_nwords;
247
248 iov[2].iov_base = (char*)send_trailer.GetBuffer();
249 iov[2].iov_len = sizeof(int) * send_trailer.GetTrlNwords();
250
251 // Send Multiple buffers
252 int n = 0;
253 while (true) {
254 if ((n = writev(m_socket_send, iov, NUM_BUFFER)) < 0) {
255 if (errno == EINTR) {
256 continue;
257 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
258#ifdef NONSTOP
259 string err_str;
260 callCheckRunPause(err_str);
261#endif
262 continue;
263 } else {
264 char err_buf[500];
265 sprintf(err_buf, "[WARNING] WRITEV error.(%s) : sent %d bytes, header %lu bytes body %lu trailer %lu\n",
266 strerror(errno), n, iov[0].iov_len, iov[1].iov_len, iov[2].iov_len);
267#ifdef NONSTOP
268 m_run_error = 1;
269 // B2ERROR(err_buf);
270 printf("%s\n", err_buf); fflush(stdout);
271 string err_str = "RUN_ERROR";
272 throw (err_str); // To exit this module, go to DeSerializer** and wait for run-resume.
273#else
274 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
275 exit(1);
276#endif
277 }
278 }
279 break;
280 }
281
282#ifdef DEBUG
283 printf("[DEBUG] *******BODY**********\n");
284 printf("[DEBUG] \n%.8d : ", 0);
285 printData((int*)(iov[1].iov_base), iov[1].iov_len);
286#endif
287
288
289 int total_send_bytes = sizeof(int) * send_header.GetTotalNwords();
290
291
292 //
293 // Retry sending
294 //
295 if (n != total_send_bytes) {
296 // B2WARNING("Serializer: Sent byte(" << n << "bytes) is not same as the event size (" << total_send_bytes << "bytes). Retrying...");
297 printf("[WARNING] Serializer: Sent byte( %d bytes) is not same as the event size ( %d bytes). Retrying...\n", n,
298 total_send_bytes); // back to WARNING due to request from Konno-san on 2018/7/3
299 fflush(stdout);
300
301 double retry_start = getTimeSec();
302 // Send Header
303 if (n < (int)(iov[ 0 ].iov_len)) {
304 n += Send(m_socket_send, (char*)iov[ 0 ].iov_base + n, iov[ 0 ].iov_len - n);
305 }
306
307 if (n < (int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len)) {
308 n += Send(m_socket_send, (char*)iov[ 1 ].iov_base + (n - iov[ 0 ].iov_len), iov[ 1 ].iov_len - (n - iov[ 0 ].iov_len));
309 }
310
311 if (n < (int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len + iov[ 2 ].iov_len)) {
312 n += Send(m_socket_send, (char*)iov[ 2 ].iov_base + (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len),
313 iov[ 2 ].iov_len - (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len));
314 }
315 double retry_end = getTimeSec();
316 // B2WARNING("Resending ends. It takes " << retry_end - retry_start << "(s)");
317 printf("[WARNING] Resending ends. It takes %lf (s)\n", retry_end - retry_start);
318 fflush(stdout); // back to WARNING due to request from Konno-san on 2018/7/3
319 }
320 // printf( "[DEBUG] n %d total %d\n", n, total_send_bytes);
321 // delete temp_buf;
322
323 return total_send_bytes;
324
325}
int Send(int socket, char *buf, int size_bytes)
send buffer
Definition: DesSer.cc:328
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
int * GetBuffer(void)
Get Header contents.
Definition: SendHeader.cc:32

◆ shmOpen()

void shmOpen ( char *  path_cfg,
char *  path_sta 
)

open shared memory

Definition at line 679 of file DesSer.cc.

681{
682 errno = 0;
683 /*m_shmfd_cfg = shm_open( "/cpr_config2", O_CREAT | O_EXCL | O_RDWR, 0666);
684 if (m_shmfd_cfg < 0) {
685 if (errno != EEXIST) {
686 perror("shm_open1");
687 exit(1);
688 }
689 m_shmfd_cfg = shm_open(path_cfg, O_RDWR, 0666);
690 if (m_shmfd_cfg < 0) {
691 printf( "[DEBUG] %s\n", path_cfg);
692 perror("[ERROR] shm_open2");
693 exit(1);
694 }
695 */
696 //}
697 /*
698 m_shmfd_sta = shm_open( "/cpr_status2", O_CREAT | O_EXCL | O_RDWR, 0666);
699 if (m_shmfd_sta < 0) {
700 if (errno != EEXIST) {
701 perror("shm_open1");
702 exit(1);
703 }
704 m_shmfd_sta = shm_open(path_sta , O_RDWR, 0666);
705 if (m_shmfd_sta < 0) {
706 printf( "[DEBUG] %s\n", path_sta);
707 perror("[ERROR] shm_open2");
708 exit(1);
709 }
710 //}
711 int size = 4 * sizeof(int);
712 ftruncate(m_shmfd_cfg, size);
713 ftruncate(m_shmfd_sta, size);
714 */
715}

Member Data Documentation

◆ cur_time

double cur_time

for time monitoring

Definition at line 102 of file DesSer.h.

◆ event_diff

int event_diff
protected

Definition at line 256 of file DesSer.h.

◆ m_bufary

int* m_bufary[NUM_PREALLOC_BUF]

buffer

Definition at line 85 of file DesSer.h.

◆ m_buffer

int* m_buffer

buffer

Definition at line 82 of file DesSer.h.

◆ m_cfg_buf

int* m_cfg_buf

buffer for shared memory

Definition at line 221 of file DesSer.h.

◆ m_cfg_sta

int* m_cfg_sta

buffer for shared memory

Definition at line 224 of file DesSer.h.

◆ m_compressionLevel

int m_compressionLevel

Compression Level.

Definition at line 67 of file DesSer.h.

◆ m_data_type

int m_data_type

data type

Definition at line 153 of file DesSer.h.

◆ m_dump_fname

std::string m_dump_fname

dump filename

Definition at line 76 of file DesSer.h.

◆ m_exp_no

int m_exp_no

exp no.

Definition at line 150 of file DesSer.h.

◆ m_exprunsubrun_no

unsigned int m_exprunsubrun_no

run no.

Definition at line 144 of file DesSer.h.

◆ m_fp_dump

FILE* m_fp_dump

dump file descripter

Definition at line 79 of file DesSer.h.

◆ m_hostname_from

std::vector<std::string> m_hostname_from
protected

Reciever basf2 Socket.

hostname of upstream Data Sources

Definition at line 243 of file DesSer.h.

◆ m_hostname_local

std::string m_hostname_local
protected

Destination Host.

Definition at line 275 of file DesSer.h.

◆ m_ncycle

int m_ncycle

Definition at line 100 of file DesSer.h.

◆ m_nodeid

int m_nodeid

Node ID for SLC.

Definition at line 138 of file DesSer.h.

◆ m_nodename

std::string m_nodename

Node Name for SLC.

Definition at line 135 of file DesSer.h.

◆ m_num_connections

int m_num_connections
protected

check data contents

of connections

Definition at line 237 of file DesSer.h.

◆ m_num_usedbuf

int m_num_usedbuf

of already used buffers

Definition at line 132 of file DesSer.h.

◆ m_port_from

std::vector<int> m_port_from
protected

port # to connect data sources

Definition at line 246 of file DesSer.h.

◆ m_port_to

int m_port_to
protected

Destination port.

Definition at line 278 of file DesSer.h.

◆ m_pre_rawcpr

PreRawCOPPERFormat_v2 m_pre_rawcpr

report status to SLC

Use ver.2 for the moment(ver.4 -> PCI40)

Definition at line 191 of file DesSer.h.

◆ m_prev_copper_ctr

unsigned int m_prev_copper_ctr
protected

Definition at line 258 of file DesSer.h.

◆ m_prev_evenum

unsigned int m_prev_evenum
protected

Definition at line 260 of file DesSer.h.

◆ m_prev_exprunsubrun_no

unsigned int m_prev_exprunsubrun_no

run no.

Definition at line 147 of file DesSer.h.

◆ m_prev_nevt

int m_prev_nevt

No. of prev sent events.

Definition at line 73 of file DesSer.h.

◆ m_prev_time

double m_prev_time

Definition at line 105 of file DesSer.h.

◆ m_recvd_prev_totbytes

double m_recvd_prev_totbytes

Definition at line 97 of file DesSer.h.

◆ m_recvd_totbytes

double m_recvd_totbytes

Definition at line 96 of file DesSer.h.

◆ m_run_error

int m_run_error

flag to show that there is some errors with which DAQ cannot continue.

Definition at line 230 of file DesSer.h.

◆ m_run_pause

int m_run_pause

flag to show that run-controller pauses a run

Definition at line 227 of file DesSer.h.

◆ m_sent_prev_totbytes

double m_sent_prev_totbytes

Definition at line 99 of file DesSer.h.

◆ m_sent_totbytes

double m_sent_totbytes

Definition at line 98 of file DesSer.h.

◆ m_shmfd_cfg

int m_shmfd_cfg

file descripter for shm

Definition at line 168 of file DesSer.h.

◆ m_shmfd_sta

int m_shmfd_sta

file descripter for shm

Definition at line 171 of file DesSer.h.

◆ m_shmflag

int m_shmflag

Use shared memory.

Definition at line 159 of file DesSer.h.

◆ m_socket_recv

std::vector<int> m_socket_recv
protected

Definition at line 248 of file DesSer.h.

◆ m_socket_send

int m_socket_send
protected

Reciever Socket.

Definition at line 267 of file DesSer.h.

◆ m_start_flag

int m_start_flag

start flag

Definition at line 181 of file DesSer.h.

◆ m_start_time

double m_start_time

Definition at line 104 of file DesSer.h.

◆ m_status

RunInfoBuffer m_status

Run info buffer.

Definition at line 141 of file DesSer.h.

◆ m_t0

timeval m_t0

Definition at line 95 of file DesSer.h.

◆ m_trunc_mask

int m_trunc_mask

trunc mask

Definition at line 156 of file DesSer.h.

◆ max_nevt

int max_nevt

of events in a run

Definition at line 61 of file DesSer.h.

◆ max_seconds

double max_seconds

time to stop a run

Definition at line 64 of file DesSer.h.

◆ monitor_numeve

int monitor_numeve

buffer for shared memory

buffer for shared memory

Definition at line 178 of file DesSer.h.

◆ n_basf2evt

int n_basf2evt

No. of sent events.

Definition at line 70 of file DesSer.h.

◆ p_method

std::string p_method
protected

EvtSocket.

How to handle data

Definition at line 284 of file DesSer.h.

◆ p_method_val

int p_method_val
protected

Definition at line 285 of file DesSer.h.

◆ prev_event

int prev_event

Definition at line 106 of file DesSer.h.

◆ print_err

CprErrorMessage print_err

wrapper for B2LOG system

Definition at line 184 of file DesSer.h.

◆ tmp_header

RawHeader_v2 tmp_header
protected

which format is used

Definition at line 323 of file DesSer.h.


The documentation for this class was generated from the following files: