Belle II Software development
DesSerPrePC.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/rawdata/DesSerPrePC.h>
9#include <rawdata/dataobjects/RawFTSWFormat_latest.h>
10#include <rawdata/dataobjects/RawTLUFormat.h>
11#include <utility>
12
13#include <arpa/inet.h>
14#include <netdb.h>
15#include <netinet/tcp.h>
16
17#include <sys/socket.h>
18
19//#define NO_DATA_CHECK
20//#define DUMHSLB
21
22using namespace std;
23using namespace Belle2;
24
25//----------------------------------------------------------------
26// Implementation
27//----------------------------------------------------------------
28DesSerPrePC::DesSerPrePC(string host_recv, int port_recv, const string& host_send, int port_send, int shmflag,
29 const std::string& nodename, int /*nodeid*/)
30{
31
32 for (int i = 0 ; i < m_num_connections; i++) {
33 // m_hostname_from.push_back( "localhost");
34 m_hostname_from.push_back(host_recv);
35 // m_port_from.push_back(30000);
36 m_port_from.push_back(port_recv) ;
37 m_socket_recv.push_back(-1);
38 }
39
40 // m_port_to = 31001;
41 m_port_to = port_send;
42 // m_hostname_local = "localhost";
43 m_hostname_local = host_send;
44 m_nodename = nodename;
45
46 m_shmflag = shmflag;
47
48 // B2INFO("DeSerializerPrePC: Constructor done.");
49 printf("[INFO] DeSerializerPrePC: Constructor done.\n"); fflush(stdout);
50}
51
52
53
54DesSerPrePC::~DesSerPrePC()
55{
56}
57
58
59
60int DesSerPrePC::recvFD(int sock, char* buf, int data_size_byte, int flag)
61{
62 int n = 0;
63 while (1) {
64 int read_size = 0;
65 if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n, flag)) < 0) {
66 if (errno == EINTR) {
67 continue;
68 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
69#ifdef NONSTOP
70 string err_str;
71 callCheckRunPause(err_str);
72#endif
73 continue;
74 } else {
75 perror("[WARNING]");
76 char err_buf[500];
77 sprintf(err_buf, "recv() returned error; ret = %d. : %s %s %d",
78 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
79#ifdef NONSTOP
80 m_run_error = 1;
81 // B2ERROR(err_buf);
82 printf("[WARNING] %s\n", err_buf); fflush(stdout);
83 string err_str = "RUN_ERROR";
84 printf("AIUEO********************\n"); fflush(stdout);
85 throw (err_str);
86#endif
87 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
88 exit(-1);
89 }
90 } else if (read_size == 0) {
91 // Connection is closed ( error )
92 char err_buf[500];
93 sprintf(err_buf, "[WARNING] Connection is closed by peer(%s). readsize = %d %d. : %s %s %d",
94 strerror(errno), read_size, errno, __FILE__, __PRETTY_FUNCTION__, __LINE__);
95#ifdef NONSTOP
96 m_run_error = 1;
97 // B2ERROR(err_buf);
98 printf("%s\n", err_buf); fflush(stdout);
99 string err_str = "RUN_ERROR";
100 throw (err_str);
101#else
102 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
103 exit(-1);
104#endif
105 } else {
106 n += read_size;
107 if (n == data_size_byte)break;
108 }
109 }
110 return n;
111}
112
113
115{
116
117 for (int i = 0; i < m_num_connections; i++) {
118
119 if (m_socket_recv[ i ] >= 0) continue; // Already have an established socket
120
121 //
122 // Connect to a downstream node
123 //
124 struct hostent* host;
125 host = gethostbyname(m_hostname_from[ i ].c_str());
126 if (host == NULL) {
127 char err_buf[100];
128 sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
129 strerror(errno));
130 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
131 sleep(1234567);
132 exit(1);
133 }
134
135 struct sockaddr_in socPC;
136 socPC.sin_family = AF_INET;
137 socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
138 socPC.sin_port = htons(m_port_from[ i ]);
139 int sd = socket(PF_INET, SOCK_STREAM, 0);
140 int val1 = 0;
141 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
142
143 struct timeval timeout;
144 timeout.tv_sec = 1;
145 timeout.tv_usec = 0;
146 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)sizeof(timeout));
147
148 printf("[DEBUG] Connecting to %s port %d\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]); fflush(stdout);
149
150 while (1) {
151 if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
152 perror("Failed to connect. Retrying...");
153 usleep(500000);
154 } else {
155 // B2INFO("Done");
156 printf("[DEBUG] Done\n"); fflush(stdout);
157 break;
158 }
159 }
160 // m_socket_recv.push_back(sd);
161 m_socket_recv[ i ] = sd;
162
163 // check socket paramters
164 int val, len;
165 len = sizeof(val);
166 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
167 // B2INFO("SO_RCVBUF" << val);
168 getsockopt(m_socket_recv[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
169 // B2DEBUG("SO_SNDBUF" << val);
170 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
171 // B2DEBUG("TCP_MAXSEG" << val);
172 getsockopt(m_socket_recv[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
173 // B2DEBUG("TCP_NODELAY" << val);
174
175 if (m_status.isAvailable()) {
176 sockaddr_in sa;
177 memset(&sa, 0, sizeof(sockaddr_in));
178 socklen_t sa_len = sizeof(sa);
179 if (getsockname(m_socket_recv[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
180 m_status.setInputPort(ntohs(sa.sin_port));
181 m_status.setInputAddress(sa.sin_addr.s_addr);
182 }
183 }
184
185 }
186 // B2INFO("[DEBUG] Initialization finished");
187 printf("[DEBUG] Initialization finished\n"); fflush(stdout);
188 return 0;
189}
190
191
192
193int* DesSerPrePC::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock, int* num_nodes_in_sendblock)
194{
195 int* temp_buf = NULL; // buffer for data-body
196 int flag = 0;
197
198 vector <int> each_buf_nwords;
199 each_buf_nwords.clear();
200 vector <int> each_buf_nodes;
201 each_buf_nodes.clear();
202 vector <int> each_buf_events;
203 each_buf_events.clear();
204
205 *total_buf_nwords = 0;
206 *num_nodes_in_sendblock = 0;
207 *num_events_in_sendblock = 0;
208
209 //
210 // Read Header and obtain data size
211 //
212 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
213
214 // Read header
215 for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
216
217 recvFD(m_socket_recv[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
218
219 SendHeader send_hdr;
220 send_hdr.SetBuffer(send_hdr_buf);
221
222 int temp_num_events = send_hdr.GetNumEventsinPacket();
223 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
224
225 if (i == 0) {
226 *num_events_in_sendblock = temp_num_events;
227 } else if (*num_events_in_sendblock != temp_num_events) {
228#ifndef NO_DATA_CHECK
229 char err_buf[500];
230 sprintf(err_buf,
231 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
232 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
233 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
234 sleep(1234567);
235 exit(1);
236#endif
237 }
238
239 *num_nodes_in_sendblock += temp_num_nodes;
240
241 int rawblk_nwords = send_hdr.GetTotalNwords()
242 - SendHeader::SENDHDR_NWORDS
243 - SendTrailer::SENDTRL_NWORDS;
244 *total_buf_nwords += rawblk_nwords;
245
246 //
247 // Data size check1
248 //
249 if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
250 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
251 char err_buf[500];
252 sprintf(err_buf, "CORRUPTED DATA: Too large event : Header %d %d %d %d :block size %d words\n", i, temp_num_events, temp_num_nodes,
253 send_hdr.GetTotalNwords(), rawblk_nwords);
254 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
255 sleep(123456);
256 exit(1);
257
258 }
259
260 each_buf_nwords.push_back(rawblk_nwords);
261 each_buf_events.push_back(temp_num_events);
262 each_buf_nodes.push_back(temp_num_nodes);
263
264 }
265
266
267 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
268 //
269 // Read body
270 //
271 int total_recvd_byte = 0;
272 for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
273
274 try {
275 total_recvd_byte += recvFD(m_socket_recv[ i ], (char*)temp_buf + total_recvd_byte,
276 each_buf_nwords[ i ] * sizeof(int), flag);
277 } catch (const string& err_str) {
278 if (*delete_flag) {
279 // B2WARNING("Delete buffer before going to Run-pause state");
280 printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
281 delete temp_buf;
282 }
283 throw (std::move(err_str));
284 }
285 //
286 // Data length check
287 //
288 int temp_length = 0;
289 for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
290 int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
291 temp_length += this_length * sizeof(int);
292 }
293 if (temp_length != (int)(each_buf_nwords[ i ] * sizeof(int))) {
294 printf("[DEBUG]*******SENDHDR*********** \n");
295 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
296 printf("[DEBUG]*******BODY***********\n ");
297 printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
298 char err_buf[500];
299 sprintf(err_buf, "CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
300 (int)(*total_buf_nwords * sizeof(int)), temp_length);
301 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
302 sleep(1234567);
303 exit(-1);
304 }
305
306 }
307
308 if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
309 char err_buf[500];
310 sprintf(err_buf, "CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
311 total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
312 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
313 sleep(1234567);
314 exit(-1);
315 }
316
317 // Read Traeiler
318 int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
319 for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
320 try {
321 recvFD(m_socket_recv[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
322 } catch (const string& err_str) {
323 if (*delete_flag) {
324 // B2WARNING("Delete buffer before going to Run-pause state");
325 printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
326 delete temp_buf;
327 }
328 throw (std::move(err_str));
329 }
330 }
331
332 return temp_buf;
333}
334
335
336void DesSerPrePC::setRecvdBuffer(RawDataBlockFormat* temp_raw_datablk, int* delete_flag)
337{
338 //
339 // Get data from socket
340 //
341 int total_buf_nwords = 0 ;
342 int num_events_in_sendblock = 0;
343 int num_nodes_in_sendblock = 0;
344
345 if (m_start_flag == 0) {
346 // B2INFO("DeSerializerPrePC: Reading the 1st packet from eb0...");
347 printf("DeSerializerPrePC: Reading the 1st packet from eb0...\n"); fflush(stdout);
348 }
349 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
350 &num_nodes_in_sendblock);
351 if (m_start_flag == 0) {
352 // B2INFO("DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords << " words");
353 printf("DeSerializerPrePC: Done. the size of the 1st packet %d words\n", total_buf_nwords); fflush(stdout);
354 m_start_flag = 1;
355 }
356 m_recvd_totbytes += total_buf_nwords * sizeof(int);
357
358 temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
359 num_events_in_sendblock, num_nodes_in_sendblock);
360
361 //
362 // check even # and node # in one Sendblock
363 //
364 int num_entries = temp_raw_datablk->GetNumEntries();
365 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
366 char err_buf[500];
367 sprintf(err_buf,
368 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
369 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
370 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
371 sleep(1234567);
372 exit(-1);
373 }
374 return;
375
376}
377
378
379
380
381void DesSerPrePC::checkData(RawDataBlockFormat* raw_datablk, unsigned int* eve_copper_0)
382{
383 // int data_size_copper_0 = -1;
384 // int data_size_copper_1 = -1;
385
386 //
387 // Data check
388 //
389 int* temp_buf = raw_datablk->GetBuffer(0);
390 int cpr_num = 0;
391 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
392 unsigned int eve_array[32]; // # of noeds is less than 17
393 unsigned int utime_array[32];// # of noeds is less than 17
394 unsigned int ctime_type_array[32];// # of noeds is less than 17
395
396#ifdef DUMHSLB
397 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
398#endif
399
400 for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
401 memset(eve_array, 0, sizeof(eve_array));
402 memset(utime_array, 0, sizeof(utime_array));
403 memset(ctime_type_array, 0, sizeof(ctime_type_array));
404
405 int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
406 for (int l = 0; l < num_nodes_in_sendblock; l++) {
407 int entry_id = l + k * num_nodes_in_sendblock;
408
409 //
410 // RawFTSW
411 //
412 if (raw_datablk->CheckFTSWID(entry_id)) {
413 RawFTSWFormat_latest* temp_rawftsw = new RawFTSWFormat_latest;
414 int block_id = 0;
415 temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
416 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
417 if (temp_rawftsw->GetEveNo(block_id) < 10) {
418 printf("[DEBUG] ######FTSW#########\n");
419 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
420 }
421
422#ifdef DUMHSLB
423 exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
424 ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
425 utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
426#endif
427
428
429#ifndef NO_DATA_CHECK
430 try {
431 temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
432 eve_array[ entry_id ] = cur_evenum;
433 } catch (const string& err_str) {
434 char err_buf[500];
435 strcpy(err_buf, err_str.c_str());
436 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
437 exit(1);
438 }
439#endif
440 utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
441 ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
442 delete temp_rawftsw;
443
444 //
445 // RawTLU
446 //
447 } else if (raw_datablk->CheckTLUID(entry_id)) {
448
449 RawTLUFormat* temp_rawtlu = new RawTLUFormat;
450 temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
451 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
452 if (temp_rawtlu->GetEveNo(0) < 10
453 ) {
454 printf("[DEBUG] ######TLU#########\n");
455 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
456 }
457
458#ifndef NO_DATA_CHECK
459 try {
460 temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
461 eve_array[ entry_id ] = cur_evenum;
462 } catch (const string& err_str) {
463 char err_buf[500];
464 strcpy(err_buf, err_str.c_str());
465 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
466 exit(1);
467 }
468#endif
469 delete temp_rawtlu;
470 } else {
471 //
472 // RawCOPPER
473 //
474 PreRawCOPPERFormat_v2* pre_rawcpr_fmt = new PreRawCOPPERFormat_v2;
475 pre_rawcpr_fmt->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
476 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
477
478#ifdef DUMHSLB
479 int block_id = 0;
480 "do not use the following for actual DAQ"
481 (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_EXP_RUN_NO ] = exp_run_ftsw;
482 (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
483 (pre_rawcpr_fmt->GetBuffer(block_id))[ RawHeader_v2::POS_TTUTIME ] = utime_ftsw;
484#endif
485
486#ifndef NO_DATA_CHECK
487 try {
488 pre_rawcpr_fmt->CheckData(0, m_prev_evenum, &cur_evenum,
489 m_prev_copper_ctr, &cur_copper_ctr,
491 eve_array[ entry_id ] = cur_evenum;
492 } catch (const string& err_str) {
493 exit(1); // Error in the contents of an event was detected
494 }
495#endif
496
497 utime_array[ entry_id ] = pre_rawcpr_fmt->GetTTUtime(0);
498 ctime_type_array[ entry_id ] = pre_rawcpr_fmt->GetTTCtimeTRGType(0);
499
500 if (cpr_num == 0) {
501 // data_size_copper_0 = raw_datablk->GetBlockNwords(entry_id);
502 *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
503 } else if (cpr_num == 1) {
504 // data_size_copper_1 = raw_datablk->GetBlockNwords(entry_id);
505 }
506 cpr_num++;
507 delete pre_rawcpr_fmt;
508 }
509 }
510
511#ifndef NO_DATA_CHECK
512 // event #, ctime, utime over nodes
513 for (int l = 1; l < num_nodes_in_sendblock; l++) {
514 if (eve_array[ 0 ] != eve_array[ l ] ||
515 utime_array[ 0 ] != utime_array[ l ] ||
516 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
517 char err_buf[500];
518 for (int m = 0; m < num_nodes_in_sendblock; m++) {
519 printf("[DEBUG] node %d eve # %x utime %x ctime %x\n",
520 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
521 }
522 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
523 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
524 sleep(1234567);
525 exit(-1);
526 }
527 }
528#endif
529
530 // Event # monitor in runchange
531// if (m_prev_runsubrun_no != m_runsubrun_no) {
532// printf("[DEBUG] ##############################################\n");
533// for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
534// printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
535// }
536// printf("[DEBUG] ##############################################\n");
537// fflush(stderr);
538// }
539 m_prev_evenum = cur_evenum;
540 m_prev_copper_ctr = cur_copper_ctr;
542 }
543 return;
544}
545
546
547
549{
550 // For data check
551 unsigned int eve_copper_0 = 0;
552 // B2INFO("initializing...");
553 printf("[DEBUG] initializing...\n"); fflush(stdout);
554 initialize();
555
556 // B2INFO("Done.");
557 printf("[DEBUG] Done.\n"); fflush(stdout);
558
559 if (m_start_flag == 0) {
560 //
561 // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
562 //
563 Connect();
564 if (m_status.isAvailable()) {
565 // B2INFO("DeSerializerPrePC: Waiting for Start...\n");
566 printf("[DEBUG] DeSerializerPrePC: Waiting for Start...\n"); fflush(stdout);
567 m_status.reportRunning();
568 }
569 m_start_time = getTimeSec();
570 n_basf2evt = 0;
571 }
572
573 //
574 // Main loop
575 //
576 while (1) {
577 //
578 // Stand-by loop
579 //
580#ifdef NONSTOP
581 if (m_run_pause > 0 || m_run_error > 0) {
582 waitResume();
583 }
584#endif
585
586 clearNumUsedBuf();
587 // RawDataBlock raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
588 RawDataBlockFormat raw_datablk[ NUM_EVT_PER_BASF2LOOP_PC ];
589
590 //
591 // Recv loop
592 //
593 for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
594 //
595 // Receive data from COPPER
596 //
597 eve_copper_0 = 0;
598 int delete_flag_from =
599 0; // Delete flag for temp_rawdatablk.It can be set to 1 by setRecvdBuffer if the buffer size is larger than that of pre-allocated buffer.
600 int delete_flag_to =
601 0; // Delete flag for raw_datablk[i]. It can be set to 1 by getNewBuffer if the buffer size is larger than that of pre-allocated buffer.
602 RawDataBlockFormat temp_rawdatablk;
603 try {
604 setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
605 checkData(&temp_rawdatablk, &eve_copper_0);
606 } catch (const string& err_str) {
607 printf("Error was detected\n"); fflush(stdout);
608 break;
609 }
610 // PreRawCOPPERFormat_v2 pre_rawcopper_v2;
611 // pre_rawcopper_v2.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
612 // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
614
615 int temp_num_events, temp_num_nodes;
616 int temp_nwords_to;
617 int* buf_to = NULL;
618#ifdef REDUCED_RAWCOPPER
619 //
620 // Copy reduced buffer
621 //
622 // int* buf_to = getNewBuffer(m_pre_rawcpr.CalcReducedDataSize(&temp_rawdatablk), &delete_flag_to); // basf2-dependent style
623 int* temp_bufin = temp_rawdatablk.GetWholeBuffer();
624 int temp_nwords_from = temp_rawdatablk.TotalBufNwords();
625 temp_num_events = temp_rawdatablk.GetNumEvents();
626 temp_num_nodes = temp_rawdatablk.GetNumNodes();
627 int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
628 buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
629
630 // m_pre_rawcpr.CopyReducedData(&temp_rawdatablk, buf_to, delete_flag_from); // basf2-dependent style
631 m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
632 if (calced_temp_nwords_to != temp_nwords_to) {
633 char err_buf[500];
634 sprintf(err_buf,
635 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
636 calced_temp_nwords_to, temp_nwords_to);
637 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
638 exit(1);
639 }
640 m_status.copyEventHeader(buf_to);
641#else
642 buf_to = temp_rawdatablk.GetWholeBuffer();
643 temp_nwords_to = temp_rawdatablk.TotalBufNwords();
644 temp_num_events = temp_rawdatablk.GetNumEvents();
645 temp_num_nodes = temp_rawdatablk.GetNumNodes();
646 delete_flag_to = delete_flag_from;
647 delete_flag_from = 0; // to avoid double delete
648#endif
649
650 //
651 // Set buffer to the RawData class stored in DataStore
652 //
653// raw_datablk[ j ].SetBuffer( (int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
654// delete_flag_to, temp_rawdatablk.GetNumEvents(),
655// temp_rawdatablk.GetNumNodes());
656 raw_datablk[ j ].SetBuffer(buf_to, temp_nwords_to, delete_flag_to, temp_num_events, temp_num_nodes);
657
658
659 //
660 // CRC16 check after data reduction
661 //
662#ifdef REDUCED_RAWCOPPER
663 PostRawCOPPERFormat_v2 post_rawcopper_v2; // Should be the latest version before ver.4(PCIe40)
664
665// post_rawcopper_v2.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
666// 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
667 post_rawcopper_v2.SetBuffer(raw_datablk[ j ].GetWholeBuffer(), raw_datablk[ j ].TotalBufNwords(),
668 0, raw_datablk[ j ].GetNumEvents(), raw_datablk[ j ].GetNumNodes());
669
670 for (int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
671 int block_num = 0;
672 if (post_rawcopper_v2.GetFINESSENwords(block_num, i_finesse_num) > 0) {
673 post_rawcopper_v2.CheckCRC16(block_num, i_finesse_num);
674 }
675 }
676
677#endif
678 }
679
680#ifdef NONSTOP
681 // Goto Stand-by loop when run is paused or stopped by error
682 if (m_run_pause != 0 || m_run_error != 0) continue;
683#endif
684
685
687 // From Serializer.cc
689 if (m_start_flag == 0) {
690 m_start_time = getTimeSec();
691 n_basf2evt = 0;
692 }
693 // StoreArray<RawCOPPER> rawcprarray;
694 // StoreArray<RawDataBlock> raw_dblkarray;
695
696 for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
697
698 //
699 // Send data
700 //
701 if (m_start_flag == 0) {
702 // B2INFO("SerializerPC: Sending the 1st packet...");
703 printf("[DEBUG] SerializerPC: Sending the 1st packet...\n"); fflush(stdout);
704 }
705
706 try {
707 m_sent_totbytes += sendByWriteV(&(raw_datablk[ j ]));
708 } catch (const string& err_str) {
709#ifdef NONSTOP
710 break;
711#endif
712 print_err.PrintError((char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
713 exit(1);
714 }
715 if (m_start_flag == 0) {
716 // B2INFO("Done. ");
717 printf("[DEBUG] Done.\n"); fflush(stdout);
718 m_start_flag = 1;
719 }
720 }
721
722#ifdef NONSTOP
723 // Goto Stand-by loop when run is paused or stopped by error
724 if (m_run_pause != 0 || m_run_error != 0) continue;
725#endif
726
727 //
728 // Monitor
729 //
730 if (max_nevt >= 0 || max_seconds >= 0.) {
731#ifdef AIUEO
732 if (n_basf2evt % 10000 == 0) {
733// if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
734// || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
735 printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
736 max_nevt, max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
737 }
738#endif
739 }
740
741 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC) % 100000 == 0) {
742 double interval = cur_time - m_prev_time;
743 double total_time = cur_time - m_start_time;
744 printf("[DEBUG] Event %12d Rate %6.2lf[kHz] Recvd %6.2lf[MB/s] sent %6.2lf[MB/s] RunTime %8.2lf[s] interval %8.4lf[s]\n",
745 n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC,
746 (n_basf2evt - m_prev_nevt)*NUM_EVT_PER_BASF2LOOP_PC / interval / 1.e3,
747 (m_recvd_totbytes - m_recvd_prev_totbytes) / interval / 1.e6,
748 (m_sent_totbytes - m_sent_prev_totbytes) / interval / 1.e6,
749 total_time,
750 interval);
751 fflush(stdout);
752
753 m_prev_time = cur_time;
754 m_recvd_prev_totbytes = m_recvd_totbytes;
755 m_sent_prev_totbytes = m_sent_totbytes;
758 }
759
760 n_basf2evt++;
761
762 if (m_status.isAvailable()) {
763 m_status.setOutputNBytes(m_sent_totbytes);
764 m_status.setOutputCount(n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC);
765 }
766
767 }
768
769 return;
770}
771
773// From Serializer.cc
775
776
777#ifdef NONSTOP
778void DesSerPrePC::waitResume()
779{
780 if (m_run_pause == 0) {
781 while (true) {
782 if (checkRunPause()) break;
783#ifdef NONSTOP_DEBUG
784 printf("\033[31m");
785 printf("###########(DesSerPrePC) Waiting for Runstop() ###############\n");
786 fflush(stdout);
787 printf("\033[0m");
788#endif
789 sleep(1);
790 }
791 }
792
793 while (true) {
794#ifdef NONSTOP_DEBUG
795 printf("\033[31m");
796 printf("###########(Ser) Waiting for Resume ###############\n");
797 fflush(stdout);
798 printf("\033[0m");
799#endif
800 if (checkRunRecovery()) {
801 m_run_pause = 0;
802 break;
803 }
804 sleep(1);
805 }
806
807
808 printf("Done!\n"); fflush(stdout);
809
810 printf("Checking connection to eb0\n"); fflush(stdout);
811 if (CheckConnection(m_socket_send) < 0) {
812 printf("Trying Accept1\n"); fflush(stdout);
813 Accept();
814 printf("Trying Accept2\n"); fflush(stdout);
815 }
816
817 printf("Checking connection to COPPER\n"); fflush(stdout);
818 for (int i = 0; i < m_num_connections; i++) {
819 if (CheckConnection(m_socket_recv[ i ]) < 0) m_socket_recv[ i ] = -1;
820 }
821 Connect();
822
823 resumeRun();
824 return;
825}
826
827
828
829#endif
830
int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
Definition: DesSerPrePC.cc:193
DesSerPrePC(std::string host_recv, int port_recv, const std::string &host_send, int port_send, int shmflag, const std::string &nodename, int nodeid)
Constructor / Destructor.
Definition: DesSerPrePC.cc:28
void DataAcquisition() override
Module functions to be called from event process.
Definition: DesSerPrePC.cc:548
int Connect()
Accept connection.
Definition: DesSerPrePC.cc:114
void setRecvdBuffer(RawDataBlockFormat *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
Definition: DesSerPrePC.cc:336
int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
Definition: DesSerPrePC.cc:60
void checkData(RawDataBlockFormat *raw_datablk, unsigned int *eve_copper_0)
check data contents
Definition: DesSerPrePC.cc:381
int m_port_to
Destination port.
Definition: DesSer.h:278
RunInfoBuffer m_status
Run info buffer.
Definition: DesSer.h:141
std::string m_hostname_local
Destination Host.
Definition: DesSer.h:275
unsigned int m_prev_exprunsubrun_no
run no.
Definition: DesSer.h:147
std::string m_nodename
Node Name for SLC.
Definition: DesSer.h:135
int m_start_flag
start flag
Definition: DesSer.h:181
std::vector< int > m_port_from
port # to connect data sources
Definition: DesSer.h:246
int n_basf2evt
No. of sent events.
Definition: DesSer.h:70
int m_socket_send
Reciever Socket.
Definition: DesSer.h:267
PreRawCOPPERFormat_v2 m_pre_rawcpr
report status to SLC
Definition: DesSer.h:191
CprErrorMessage print_err
wrapper for B2LOG system
Definition: DesSer.h:184
int m_run_pause
flag to show that run-controller pauses a run
Definition: DesSer.h:227
void printData(int *buf, int nwords)
dump error data
Definition: DesSer.cc:505
int m_run_error
flag to show that there is some errors with which DAQ cannot continue.
Definition: DesSer.h:230
int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
Definition: DesSer.cc:77
double cur_time
for time monitoring
Definition: DesSer.h:102
int m_shmflag
Use shared memory.
Definition: DesSer.h:159
int m_prev_nevt
No. of prev sent events.
Definition: DesSer.h:73
int m_num_connections
check data contents
Definition: DesSer.h:237
unsigned int m_exprunsubrun_no
run no.
Definition: DesSer.h:144
double max_seconds
time to stop a run
Definition: DesSer.h:64
int max_nevt
Definition: DesSer.h:61
double getTimeSec()
store time info.
Definition: DesSer.cc:478
std::vector< std::string > m_hostname_from
Reciever basf2 Socket.
Definition: DesSer.h:243
The Raw COPPER class ver.2 This class stores data received by COPPER via belle2link Data from all det...
int CheckCRC16(int n, int finesse_num)
check magic words
int GetFINESSENwords(int n, int finesse) OVERRIDE_CPP17
get data size of FINESSE buffer
The Raw COPPER class ver.1 ( the latest version since May, 2014 ) This class stores data received by ...
void CopyReducedData(int *bufin, int nwords, int num_events, int num_nodes, int *buf_to, int *nwords_to)
reduce and merge header/trailer
int CalcReducedDataSize(int *bufin, int nwords, int num_events, int num_nodes)
reduce and merge header/trailer
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check data contents
The RawDataBlockFormat class Format information for rawdata handling.
virtual int GetBlockNwords(int n)
get size of a data block
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBufferPos(int n)
get position of data block in word
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int TotalBufNwords()
Get total length of m_buffer.
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetNumEvents()
get # of events in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
The Raw FTSW class 3 ( 2019.8.20 )
unsigned int GetEveNo(int n) OVERRIDE_CPP17
Get event #.
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
get unixtime of the trigger
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
Get a word containing ctime and trigger type info.
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no) OVERRIDE_CPP17
check the data contents
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition: RawTLUFormat.h:25
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition: RawTLUFormat.cc:60
unsigned int GetEveNo(int n)
Get Event #.
Definition: RawTLUFormat.cc:37
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
unsigned int GetExpRunSubrun(int n) OVERRIDE_CPP17
Exp# (10bit) run# (14bit) restart # (8bit)
unsigned int GetTTUtime(int n) OVERRIDE_CPP17
Check if COPPER Magic words are correct.
unsigned int GetTTCtimeTRGType(int n) OVERRIDE_CPP17
get b2l block from "FEE b2link header"
Abstract base class for different kinds of events.
STL namespace.