Belle II Software development
DeSerializerPC.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/rawdata/modules/DeSerializerPC.h>
10#include <daq/dataobjects/SendHeader.h>
11#include <daq/dataobjects/SendTrailer.h>
12#include <rawdata/dataobjects/RawTLU.h>
13
14#include <netdb.h>
15#include <netinet/tcp.h>
16
17//#define MAXEVTSIZE 400000000
18#define CHECKEVT 5000
19
20
21
22#define USE_DESERIALIZER_PREPC
23
24//#define DEBUG
25//#define NO_DATA_CHECK
26//#define DUMHSLB
27
28using namespace std;
29using namespace Belle2;
30
31//-----------------------------------------------------------------
32// Register the Module
33//-----------------------------------------------------------------
34REG_MODULE(DeSerializerPC);
35
36//-----------------------------------------------------------------
37// Implementation
38//-----------------------------------------------------------------
39
40#ifndef REDUCED_RAWCOPPER
41#ifdef USE_DESERIALIZER_PREPC
42//compile error
43#endif
44#endif
45
47{
48 //Set module properties
49 setDescription("Encode DataStore into RingBuffer");
50 // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
51 addParam("NumConn", m_num_connections, "Number of Connections", 0);
52 addParam("HostNameFrom", m_hostname_from, "Hostnames of data sources");
53 addParam("PortFrom", m_port_from, "port numbers of data sources");
54
55
56 B2INFO("DeSerializerPC: Constructor done.");
57
58
59}
60
61
62
63DeSerializerPCModule::~DeSerializerPCModule()
64{
65
66}
67
68
70{
71 B2INFO("DeSerializerPC: initialize() started.");
72
73 // Set m_socket
74 if (m_num_connections > (int)m_hostname_from.size() || m_num_connections > (int)m_port_from.size()) {
75 B2FATAL("[FATAL] Hostname or port# is not specified for all connections. Please check a python script. Exiting... \n");
76 exit(1);
77 }
78 for (int i = 0; i < m_num_connections; i++) {
79 m_socket.push_back(-1);
80 }
81
82 // allocate buffer
83 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
84 m_bufary[i] = new int[ BUF_SIZE_WORD ];
85 }
86 m_buffer = new int[ BUF_SIZE_WORD ];
87
88
89 // initialize buffer
90 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
91 memset(m_bufary[i], 0, BUF_SIZE_WORD * sizeof(int));
92 }
93
94 // Initialize EvtMetaData
95 m_eventMetaDataPtr.registerInDataStore();
96
97 raw_datablkarray.registerInDataStore();
98 rawcprarray.registerInDataStore();
99 raw_ftswarray.registerInDataStore();
100
101
102 // Initialize Array of RawCOPPER
103
104 if (m_dump_fname.size() > 0) {
106 }
107
108 // Initialize arrays for time monitor
109 memset(time_array0, 0, sizeof(time_array0));
110 memset(time_array1, 0, sizeof(time_array1));
111 memset(time_array2, 0, sizeof(time_array2));
112
113 clearNumUsedBuf();
114
115 // Shared memory
116 if (m_shmflag > 0) {
117 if (m_nodename.size() == 0 || m_nodeid < 0) {
118 m_shmflag = 0;
119 } else {
121 g_status.reportReady();
122 }
123 }
124
125 event_diff = 0;
126
127 m_prev_copper_ctr = 0xFFFFFFFF;
128 m_prev_evenum = 0xFFFFFFFF;
129
130#ifdef NONSTOP
131 openRunPauseNshm();
132#endif
133
134 B2INFO("DeSerializerPC: initialize() done.");
135}
136
137
138int DeSerializerPCModule::recvFD(int sock, char* buf, int data_size_byte, int flag)
139{
140 int n = 0;
141 while (1) {
142 int read_size = 0;
143 if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n, flag)) < 0) {
144 if (errno == EINTR) {
145 continue;
146 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
147 // No data received within SO_RCVTIMEO
148#ifdef NONSTOP
149 string err_str;
150 callCheckRunPause(err_str);
151#endif
152 continue;
153 } else {
154 char err_buf[500];
155 sprintf(err_buf, "[WARNING] recv() returned error; ret = %d. : %s %s %d",
156 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
157#ifdef NONSTOP
158 g_run_error = 1;
159 B2ERROR(err_buf);
160 string err_str = "RUN_ERROR";
161 throw (err_str);
162#endif
163 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
164 exit(-1);
165 }
166 } else if (read_size == 0) {
167 char err_buf[500];
168 sprintf(err_buf, "[WARNING] Connection is closed by peer(%s).: %s %s %d",
169 strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
170#ifdef NONSTOP
171 g_run_error = 1;
172 B2ERROR(err_buf);
173 string err_str = "RUN_ERROR";
174 throw (err_str);
175#endif
176 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
177 exit(-1);
178 } else {
179 n += read_size;
180 if (n == data_size_byte)break;
181 }
182 }
183 return n;
184}
185
187{
188 for (int i = 0; i < m_num_connections; i++) {
189 if (m_socket[ i ] >= 0) continue; // Already have an established socket
190 //
191 // Connect to a downstream node
192 //
193 struct sockaddr_in socPC;
194 socPC.sin_family = AF_INET;
195
196 struct hostent* host;
197 host = gethostbyname(m_hostname_from[ i ].c_str());
198 if (host == NULL) {
199 char err_buf[100];
200 sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
201 strerror(errno));
202 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
203 sleep(1234567);
204 exit(1);
205 }
206 socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
207 socPC.sin_port = htons(m_port_from[ i ]);
208 int sd = socket(PF_INET, SOCK_STREAM, 0);
209
210 int val1 = 0;
211 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
212
213 struct timeval timeout;
214 timeout.tv_sec = 1;
215 timeout.tv_usec = 0;
216 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)sizeof(timeout));
217
218 printf("[DEBUG] Connecting to %s port %d ...\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]);
219 while (1) {
220 if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
221 perror("Failed to connect. Retrying...");
222 usleep(500000);
223 } else {
224 printf("[DEBUG] Done\n");
225 break;
226 }
227 }
228
229 // m_socket.push_back(sd);
230 m_socket[ i ] = sd;
231
232 // check socket paramters
233 int val, len;
234 len = sizeof(val);
235 getsockopt(m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
236#ifdef DEBUG
237 printf("[DEBUG] SO_RCVBUF %d\n", val);
238#endif
239 getsockopt(m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
240#ifdef DEBUG
241 printf("[DEBUG] SO_SNDBUF %d\n", val);
242#endif
243 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
244#ifdef DEBUG
245 printf("[DEBUG] TCP_MAXSEG %d\n", val);
246#endif
247 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
248#ifdef DEBUG
249 printf("[DEBUG] TCP_NODELAY %d\n", val);
250#endif
251 if (g_status.isAvailable()) {
252 sockaddr_in sa;
253 memset(&sa, 0, sizeof(sockaddr_in));
254 socklen_t sa_len = sizeof(sa);
255 if (getsockname(m_socket[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
256 g_status.setInputPort(ntohs(sa.sin_port));
257 g_status.setInputAddress(sa.sin_addr.s_addr);
258 }
259 }
260
261 }
262 printf("[DEBUG] Initialization finished\n");
263 return 0;
264}
265
266
267
268int* DeSerializerPCModule::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock,
269 int* num_nodes_in_sendblock)
270{
271
272 int* temp_buf = NULL; // buffer for data-body
273 int flag = 0;
274
275 vector <int> each_buf_nwords;
276 each_buf_nwords.clear();
277 vector <int> each_buf_nodes;
278 each_buf_nodes.clear();
279 vector <int> each_buf_events;
280 each_buf_events.clear();
281
282 *total_buf_nwords = 0;
283 *num_nodes_in_sendblock = 0;
284 *num_events_in_sendblock = 0;
285
286 //
287 // Read Header and obtain data size
288 //
289 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
290
291 // Read header
292 for (int i = 0; i < (int)(m_socket.size()); i++) {
293
294 recvFD(m_socket[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
295
296 SendHeader send_hdr;
297 send_hdr.SetBuffer(send_hdr_buf);
298
299 int temp_num_events = send_hdr.GetNumEventsinPacket();
300 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
301
302
303
304 if (i == 0) {
305 *num_events_in_sendblock = temp_num_events;
306 } else if (*num_events_in_sendblock != temp_num_events) {
307
308#ifndef NO_DATA_CHECK
309 printf("[DEBUG] *******HDR**********\n");
310 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
311 char err_buf[500];
312 sprintf(err_buf,
313 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
314 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
315 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
316 sleep(1234567);
317 exit(1);
318#endif
319 }
320
321 *num_nodes_in_sendblock += temp_num_nodes;
322
323 int rawblk_nwords = send_hdr.GetTotalNwords()
324 - SendHeader::SENDHDR_NWORDS
325 - SendTrailer::SENDTRL_NWORDS;
326 *total_buf_nwords += rawblk_nwords;
327
328 //
329 // Data size check1
330 //
331 if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
332 printf("[DEBUG] *******HDR**********\n");
333 // printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
334 printData(send_hdr_buf, 100);
335 printASCIIData(send_hdr_buf, 100);
336 char err_buf[500];
337 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
338 send_hdr.GetTotalNwords());
339 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
340 sleep(123456);
341 exit(1);
342
343 }
344
345 each_buf_nwords.push_back(rawblk_nwords);
346 each_buf_events.push_back(temp_num_events);
347 each_buf_nodes.push_back(temp_num_nodes);
348
349 }
350
351
352 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
353 //
354 // Read body
355 //
356 int total_recvd_byte = 0;
357 for (int i = 0; i < (int)(m_socket.size()); i++) {
358
359 try {
360 total_recvd_byte += recvFD(m_socket[ i ], (char*)temp_buf + total_recvd_byte,
361 each_buf_nwords[ i ] * sizeof(int), flag);
362 } catch (const string& err_str) {
363 if (*delete_flag) {
364 B2WARNING("Delete buffer before going to Run-pause state");
365 delete temp_buf;
366 }
367 throw (err_str);
368 }
369
370 //
371 // Data length check
372 //
373 int temp_length = 0;
374 for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
375 int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
376 temp_length += this_length * sizeof(int);
377 }
378 if (temp_length != (int)(each_buf_nwords[ i ] * sizeof(int))) {
379 printf("[DEBUG]*******SENDHDR*********** \n");
380 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
381 printf("[DEBUG]*******BODY***********\n ");
382 printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
383 char err_buf[500];
384 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
385 (int)(*total_buf_nwords * sizeof(int)), temp_length);
386 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
387 sleep(1234567);
388 exit(-1);
389 }
390
391 }
392
393 if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
394 char err_buf[500];
395 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
396 total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
397 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
398 sleep(1234567);
399 exit(-1);
400 }
401
402 // Read Traeiler
403 int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
404 for (int i = 0; i < (int)(m_socket.size()); i++) {
405 try {
406 recvFD(m_socket[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
407 } catch (const string& err_str) {
408 if (*delete_flag) {
409 B2WARNING("Delete buffer before going to Run-pause state");
410 delete temp_buf;
411 }
412 throw (err_str);
413 }
414 }
415
416 return temp_buf;
417}
418
419
420void DeSerializerPCModule::setRecvdBuffer(RawDataBlock* temp_raw_datablk, int* delete_flag)
421{
422 //
423 // Get data from socket
424 //
425 int total_buf_nwords = 0 ;
426 int num_events_in_sendblock = 0;
427 int num_nodes_in_sendblock = 0;
428
429 if (m_start_flag == 0) B2INFO("DeSerializerPC: Reading the 1st packet from eb0...");
430
431 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
432 &num_nodes_in_sendblock);
433 if (m_start_flag == 0) {
434 B2INFO("DeSerializerPC: Done. the size of the 1st packet " << total_buf_nwords << " words");
435 m_start_flag = 1;
436 }
437 m_totbytes += total_buf_nwords * sizeof(int);
438
439 // Fixed for glibc error at Jan. 2017, reported in "Re: data taking with the new firmware".
440 // for temp_raw_datablk, delete_flag should be 0. raw_datablk will take care of deleting buffer
441 // temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
442 temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, 0,
443
444 num_events_in_sendblock, num_nodes_in_sendblock);
445
446 //
447 // check even # and node # in one Sendblock
448 //
449 int num_entries = temp_raw_datablk->GetNumEntries();
450 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
451 char err_buf[500];
452 sprintf(err_buf,
453 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
454 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
455 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
456 sleep(1234567);
457 exit(-1);
458 }
459 return;
460
461}
462
463
464
465void DeSerializerPCModule::checkData(RawDataBlock* raw_datablk, unsigned int* exp_copper_0, unsigned int* run_copper_0,
466 unsigned int* subrun_copper_0, unsigned int* eve_copper_0, unsigned int* error_bit_flag)
467{
468
469 // int data_size_copper_0 = -1;
470 // int data_size_copper_1 = -1;
471
472 //
473 // Data check
474 //
475 int* temp_buf = raw_datablk->GetBuffer(0);
476 int cpr_num = 0;
477 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
478 unsigned int eve_array[32]; // # of noeds is less than 17
479 unsigned int utime_array[32];// # of noeds is less than 17
480 unsigned int ctime_type_array[32];// # of noeds is less than 17
481
482#ifdef DUMHSLB
483 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
484#endif
485
486 for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
487 memset(eve_array, 0, sizeof(eve_array));
488 memset(utime_array, 0, sizeof(utime_array));
489 memset(ctime_type_array, 0, sizeof(ctime_type_array));
490
491 int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
492 for (int l = 0; l < num_nodes_in_sendblock; l++) {
493 int entry_id = l + k * num_nodes_in_sendblock;
494 //
495 // RawFTSW
496 //
497 if (raw_datablk->CheckFTSWID(entry_id)) {
498 RawFTSW* temp_rawftsw = new RawFTSW;
499 int block_id = 0;
500 temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
501 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
502 if (temp_rawftsw->GetEveNo(block_id) < 10) {
503 printf("[DEBUG] ######FTSW#########\n");
504 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
505 }
506
507#ifdef DUMHSLB
508 exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
509 ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
510 utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
511#endif
512
513
514#ifndef NO_DATA_CHECK
515 try {
516 temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
517 eve_array[ entry_id ] = cur_evenum;
518 } catch (const string& err_str) {
519 print_err.PrintError(m_shmflag, &g_status, err_str);
520 exit(1);
521 }
522#endif
523 utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
524 ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
525 delete temp_rawftsw;
526
527 //
528 // RawTLU
529 //
530 } else if (raw_datablk->CheckTLUID(entry_id)) {
531
532 RawTLU* temp_rawtlu = new RawTLU;
533 temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
534 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
535 if (temp_rawtlu->GetEveNo(0) < 10
536 ) {
537 printf("[DEBUG] ######TLU#########\n");
538 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
539 }
540
541#ifndef NO_DATA_CHECK
542 try {
543 temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
544 eve_array[ entry_id ] = cur_evenum;
545 } catch (const string& err_str) {
546 print_err.PrintError(m_shmflag, &g_status, err_str);
547 exit(1);
548 }
549#endif
550 delete temp_rawtlu;
551 } else {
552
553 //
554 // RawCOPPER
555 //
556
557
558 RawCOPPER* temp_rawcopper = new RawCOPPER;
559 temp_rawcopper->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
560 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
561
562#ifdef DUMHSLB
563 int block_id = 0;
564 (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
565 (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
566 (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
567#endif
568
569#ifndef NO_DATA_CHECK
570 try {
571
572 temp_rawcopper->CheckData(0, m_prev_evenum, &cur_evenum,
573 m_prev_copper_ctr, &cur_copper_ctr,
575 eve_array[ entry_id ] = cur_evenum;
576 } catch (const string& err_str) {
577 temp_rawcopper->PrintData(temp_rawcopper->GetWholeBuffer(), temp_rawcopper->TotalBufNwords());
578 print_err.PrintError(m_shmflag, &g_status, err_str);
579 exit(1);
580 }
581#endif
582
583 utime_array[ entry_id ] = temp_rawcopper->GetTTUtime(0);
584 ctime_type_array[ entry_id ] = temp_rawcopper->GetTTCtimeTRGType(0);
585
586 if (cpr_num == 0) {
587 // data_size_copper_0 = raw_datablk->GetBlockNwords(entry_id);
588 // *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
589 *eve_copper_0 = temp_rawcopper->GetEveNo(0);
590 *exp_copper_0 = temp_rawcopper->GetExpNo(0);
591 *run_copper_0 = temp_rawcopper->GetRunNo(0);
592 *subrun_copper_0 = temp_rawcopper->GetSubRunNo(0);
593 } else if (cpr_num == 1) {
594 // data_size_copper_1 = raw_datablk->GetBlockNwords(entry_id);
595 }
596 cpr_num++;
597
598 // Check Error bit flag
599 *error_bit_flag |= temp_rawcopper->GetErrorBitFlag(0);
600
601 delete temp_rawcopper;
602 }
603 }
604
605#ifndef NO_DATA_CHECK
606 // event #, ctime, utime over nodes
607 for (int l = 1; l < num_nodes_in_sendblock; l++) {
608 if (eve_array[ 0 ] != eve_array[ l ]) {
609// if (eve_array[ 0 ] != eve_array[ l ] ||
610// utime_array[ 0 ] != utime_array[ l ] ||
611// ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
612 char err_buf[500];
613 for (int m = 0; m < num_nodes_in_sendblock; m++) {
614 printf("[DEBUG] node %d eve # %u utime %x ctime %x\n",
615 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
616 }
617 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
618 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
619 sleep(1234567);
620 exit(-1);
621 }
622 }
623#endif
624
625 // Event # monitor in runchange
627 printf("[DEBUG] ##############################################\n");
628 for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
629 printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
630 }
631 printf("[DEBUG] ##############################################\n");
632 fflush(stderr);
633 }
634 m_prev_evenum = cur_evenum;
635 m_prev_copper_ctr = cur_copper_ctr;
637 }
638 return;
639}
640
641#ifdef NONSTOP
642void DeSerializerPCModule::waitResume()
643{
644 while (true) {
645#ifdef NONSTOP_DEBUG
646 printf("\033[31m");
647 printf("###########(Ser) Waiting for Resume ###############\n");
648 fflush(stdout);
649 printf("\033[0m");
650#endif
651 if (checkRunRecovery()) {
652 g_run_pause = 0;
653 for (int i = 0; i < m_num_connections; i++) {
654 if (CheckConnection(m_socket[ i ]) < 0) m_socket[ i ] = -1;
655 }
656 resumeRun();
657 break;
658 }
659 sleep(1);
660 }
661 return;
662
663}
664#endif
665
666void DeSerializerPCModule::setErrorFlag(unsigned int error_flag, StoreObjPtr<EventMetaData> evtmetadata)
667{
668 // RawHeader_latest raw_hdr;
669 int error_set = 0;
670 if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
671 evtmetadata->addErrorFlag(EventMetaData::c_B2LinkPacketCRCError);
672 error_set = 1;
673 }
674 if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
675 evtmetadata->addErrorFlag(EventMetaData::c_B2LinkEventCRCError);
676 error_set = 1;
677 }
678 if (error_set) B2INFO("Raw2Ds: Error flag was set in EventMetaData.");
679}
680
681
683{
684
685 // For data check
686 unsigned int exp_copper_0 = 0;
687 unsigned int run_copper_0 = 0;
688 unsigned int subrun_copper_0 = 0;
689 unsigned int eve_copper_0 = 0;
690 unsigned int error_bit_flag = 0;
691
692 clearNumUsedBuf();
693
694 if (m_start_flag == 0) {
695 //
696 // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
697 //
698 Connect();
699 if (g_status.isAvailable()) {
700 B2INFO("DeSerializerPC: Waiting for Start...\n");
701 g_status.reportRunning();
702 }
703 m_start_time = getTimeSec();
704 n_basf2evt = 0;
705 }
706
707
708#ifdef NONSTOP
709 if (g_run_pause > 0 || g_run_error > 0) {
710 if (g_run_pause == 0) {
711 while (true) {
712 if (checkRunPause()) break;
713#ifdef NONSTOP_DEBUG
714 printf("\033[31m");
715 printf("###########(DeserializerPC) Waiting for Runpause() ###############\n");
716 fflush(stdout);
717 printf("\033[0m");
718#endif
719 sleep(1);
720 }
721 }
722 waitResume();
723 m_eventMetaDataPtr.create(); // Otherwise endRun() is called.
724 return;
725 }
726#endif
727
728 // Make rawdatablk array
729
730
731 //
732 // Main loop
733 //
734 int* buf_rc = NULL;
735 for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
736 eve_copper_0 = 0;
737 //
738 // Set buffer to the RawData class stored in DataStore
739 //
740 int delete_flag = 0;
741 RawDataBlock temp_rawdatablk;
742 try {
743 setRecvdBuffer(&temp_rawdatablk, &delete_flag);
744 checkData(&temp_rawdatablk, &exp_copper_0, &run_copper_0, &subrun_copper_0, &eve_copper_0, &error_bit_flag);
745 } catch (const string& err_str) {
746#ifdef NONSTOP
747 // Update EventMetaData otherwise basf2 stops.
748 if (err_str == "RUN_PAUSE" || err_str == "RUN_ERROR") {
749 m_eventMetaDataPtr.create();
750 return;
751 }
752#endif
753 print_err.PrintError((char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
754 exit(1);
755 }
756
757 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
758 raw_datablk->SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(), delete_flag,
759 temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
760 buf_rc = temp_rawdatablk.GetWholeBuffer();
761 }
762 if (buf_rc != NULL) {
763 g_status.copyEventHeader(buf_rc);
764 }
765
766 //
767 // Update EventMetaData
768 //
769 m_eventMetaDataPtr.create();
770 m_eventMetaDataPtr->setExperiment(exp_copper_0);
771 m_eventMetaDataPtr->setRun(run_copper_0);
772 m_eventMetaDataPtr->setSubrun(subrun_copper_0);
773 m_eventMetaDataPtr->setEvent(eve_copper_0);
774
775 setErrorFlag(error_bit_flag, m_eventMetaDataPtr);
776 if (error_bit_flag != 0) {
777 m_eventMetaDataPtr->addErrorFlag(EventMetaData::c_B2LinkEventCRCError);
778 printf("[ERROR] error bit was detected. exp %u run %u eve %u count = %u\n",
779 exp_copper_0, run_copper_0, eve_copper_0, error_bit_flag);
780 }
781
782
783 //
784 // Run stop via NSM (Already obsolete. Need to ask Konno-san about implementation)
785 //
786 // if (m_shmflag != 0) {
787 // if (n_basf2evt % 10 == 0) {
788 // if (g_status.isStopped()) {
789 // printf("[DEBUG] [INFO] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n", max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
790 // m_eventMetaDataPtr->setEndOfData();
791 // }
792 // }
793 // }
794
795 //
796 // Monitor
797 //
798 if (max_nevt >= 0 || max_seconds >= 0.) {
799 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
800 || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
801 printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
802 max_nevt, max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
803 m_eventMetaDataPtr->setEndOfData();
804 }
805 }
806
807 if (n_basf2evt % 20000 == 0 || n_basf2evt < 10) {
808 RateMonitor(eve_copper_0, subrun_copper_0, run_copper_0);
809 }
810 n_basf2evt++;
811 if (g_status.isAvailable()) {
812 g_status.setInputNBytes(m_totbytes);
813 g_status.setInputCount(n_basf2evt);
814 }
815
816 return;
817}
818
819
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
std::string m_nodename
Node name.
int BUF_SIZE_WORD
size of buffer for one event (word)
static RunInfoBuffer g_status
buffer class to communicate with NSM client
int n_basf2evt
No. of sent events.
DeSerializerModule()
Constructor / Destructor.
CprErrorMessage print_err
wrapper for B2LOG system
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
int m_shmflag
Use shared memory.
int * m_bufary[NUM_PREALLOC_BUF]
buffer
unsigned int m_exprunsubrun_no
run no.
virtual void printASCIIData(int *buf, int nwords)
dump error data
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
int m_nodeid
Node(PC or COPPER) ID.
double getTimeSec()
store time info.
std::vector< int > m_socket
Reciever Socket.
virtual int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
void initialize() override
Module functions to be called from main process.
void event() override
Module functions to be called from event process.
std::vector< int > m_port_from
port # to connect data sources
virtual int Connect()
Accept connection.
virtual int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
virtual void setRecvdBuffer(RawDataBlock *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
DeSerializerPCModule()
Constructor / Destructor.
virtual void checkData(RawDataBlock *raw_datablk, unsigned int *exp_copper_0, unsigned int *run_copper_0, unsigned int *subrun_copper_0, unsigned int *eve_copper_0, unsigned int *error_bit_flag)
check data contents
std::vector< std::string > m_hostname_from
hostname of upstream Data Sources
@ c_B2LinkPacketCRCError
Belle2link CRC error is detected in the event.
@ c_B2LinkEventCRCError
HSLB_COPPER CRC error is detected in the event.
void setDescription(const std::string &description)
Sets the description of the module.
Definition Module.cc:214
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
Definition RawCOPPER.h:52
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freed( = 0 )/ not freed( = 1 ) in Destructor )
Definition RawCOPPER.cc:141
The RawDataBlock class Base class for rawdata handling.
virtual void PrintData(int *buf, int nwords)
print data
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetBufferPos(int n)
get position of data block in word
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freed( = 0 )/ not freed( = 1 ) in Destructor )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBlockNwords(int n)
get size of a data block
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
The Raw FTSW class.
Definition RawFTSW.h:30
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
Definition RawFTSW.h:72
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Definition RawFTSW.h:130
unsigned int GetEveNo(int n)
Get event #.
Definition RawFTSW.h:65
unsigned int GetTTUtime(int n)
get unixtime of the trigger
Definition RawFTSW.h:79
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freed( = 0 )/ not freed( = 1 ) in Destructor )
Definition RawFTSW.cc:107
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
Definition RawFTSW.h:121
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition RawTLU.h:27
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition RawTLU.h:81
unsigned int GetEveNo(int n)
Get Event #.
Definition RawTLU.h:53
void SetBuffer(int *hdr)
set buffer
Definition SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
Type-safe access to single objects in the data store.
Definition StoreObjPtr.h:96
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition Module.h:559
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition Module.h:649
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
Definition RawCOPPER.h:608
int GetExpNo(int n)
get Experimental # from header
Definition RawCOPPER.h:365
unsigned int GetEveNo(int n)
get subrun #(8bit)
Definition RawCOPPER.h:390
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
Definition RawCOPPER.h:614
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
Definition RawCOPPER.h:699
int GetSubRunNo(int n)
get run # (14bit)
Definition RawCOPPER.h:384
unsigned int GetErrorBitFlag(int n)
get contents of header
Definition RawCOPPER.h:416
int GetRunNo(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Definition RawCOPPER.h:377
Abstract base class for different kinds of events.
STL namespace.