Belle II Software development
DeSerializerPC.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/rawdata/modules/DeSerializerPC.h>
10#include <daq/dataobjects/SendHeader.h>
11#include <daq/dataobjects/SendTrailer.h>
12#include <rawdata/dataobjects/RawTLU.h>
13
14#include <netdb.h>
15#include <netinet/tcp.h>
16
17//#define MAXEVTSIZE 400000000
18#define CHECKEVT 5000
19
20
21
22#define USE_DESERIALIZER_PREPC
23
24//#define DEBUG
25//#define NO_DATA_CHECK
26//#define DUMHSLB
27
28using namespace std;
29using namespace Belle2;
30
31//-----------------------------------------------------------------
32// Register the Module
33//-----------------------------------------------------------------
34REG_MODULE(DeSerializerPC);
35
36//-----------------------------------------------------------------
37// Implementation
38//-----------------------------------------------------------------
39
40#ifndef REDUCED_RAWCOPPER
41#ifdef USE_DESERIALIZER_PREPC
42//compile error
43#endif
44#endif
45
47{
48 //Set module properties
49 setDescription("Encode DataStore into RingBuffer");
50 // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
51 addParam("NumConn", m_num_connections, "Number of Connections", 0);
52 addParam("HostNameFrom", m_hostname_from, "Hostnames of data sources");
53 addParam("PortFrom", m_port_from, "port numbers of data sources");
54
55
56 B2INFO("DeSerializerPC: Constructor done.");
57
58
59}
60
61
62
63DeSerializerPCModule::~DeSerializerPCModule()
64{
65
66}
67
68
70{
71 B2INFO("DeSerializerPC: initialize() started.");
72
73 // Set m_socket
74 if (m_num_connections > (int)m_hostname_from.size() || m_num_connections > (int)m_port_from.size()) {
75 B2FATAL("[FATAL] Hostname or port# is not specified for all connections. Please check a python script. Exiting... \n");
76 exit(1);
77 }
78 for (int i = 0; i < m_num_connections; i++) {
79 m_socket.push_back(-1);
80 }
81
82 // allocate buffer
83 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
84 m_bufary[i] = new int[ BUF_SIZE_WORD ];
85 }
86 m_buffer = new int[ BUF_SIZE_WORD ];
87
88
89 // initialize buffer
90 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
91 memset(m_bufary[i], 0, BUF_SIZE_WORD * sizeof(int));
92 }
93
94 // Initialize EvtMetaData
95 m_eventMetaDataPtr.registerInDataStore();
96
97 raw_datablkarray.registerInDataStore();
98 rawcprarray.registerInDataStore();
99 raw_ftswarray.registerInDataStore();
100
101
102 // Initialize Array of RawCOPPER
103
104 if (m_dump_fname.size() > 0) {
106 }
107
108 // Initialize arrays for time monitor
109 memset(time_array0, 0, sizeof(time_array0));
110 memset(time_array1, 0, sizeof(time_array1));
111 memset(time_array2, 0, sizeof(time_array2));
112
113 clearNumUsedBuf();
114
115 // Shared memory
116 if (m_shmflag > 0) {
117 if (m_nodename.size() == 0 || m_nodeid < 0) {
118 m_shmflag = 0;
119 } else {
121 g_status.reportReady();
122 }
123 }
124
125 event_diff = 0;
126
127 m_prev_copper_ctr = 0xFFFFFFFF;
128 m_prev_evenum = 0xFFFFFFFF;
129
130#ifdef NONSTOP
131 openRunPauseNshm();
132#endif
133
134 B2INFO("DeSerializerPC: initialize() done.");
135}
136
137
138int DeSerializerPCModule::recvFD(int sock, char* buf, int data_size_byte, int flag)
139{
140 int n = 0;
141 while (1) {
142 int read_size = 0;
143 if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n, flag)) < 0) {
144 if (errno == EINTR) {
145 continue;
146 } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
147 // No data received within SO_RCVTIMEO
148#ifdef NONSTOP
149 string err_str;
150 callCheckRunPause(err_str);
151#endif
152 continue;
153 } else {
154 char err_buf[500];
155 sprintf(err_buf, "[WARNING] recv() returned error; ret = %d. : %s %s %d",
156 read_size, __FILE__, __PRETTY_FUNCTION__, __LINE__);
157#ifdef NONSTOP
158 g_run_error = 1;
159 B2ERROR(err_buf);
160 string err_str = "RUN_ERROR";
161 throw (err_str);
162#endif
163 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
164 exit(-1);
165 }
166 } else if (read_size == 0) {
167 char err_buf[500];
168 sprintf(err_buf, "[WARNING] Connection is closed by peer(%s).: %s %s %d",
169 strerror(errno), __FILE__, __PRETTY_FUNCTION__, __LINE__);
170#ifdef NONSTOP
171 g_run_error = 1;
172 B2ERROR(err_buf);
173 string err_str = "RUN_ERROR";
174 throw (err_str);
175#endif
176 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
177 exit(-1);
178 } else {
179 n += read_size;
180 if (n == data_size_byte)break;
181 }
182 }
183 return n;
184}
185
187{
188 for (int i = 0; i < m_num_connections; i++) {
189 if (m_socket[ i ] >= 0) continue; // Already have an established socket
190 //
191 // Connect to a downstream node
192 //
193 struct sockaddr_in socPC;
194 socPC.sin_family = AF_INET;
195
196 struct hostent* host;
197 host = gethostbyname(m_hostname_from[ i ].c_str());
198 if (host == NULL) {
199 char err_buf[100];
200 sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
201 strerror(errno));
202 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
203 sleep(1234567);
204 exit(1);
205 }
206 socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
207 socPC.sin_port = htons(m_port_from[ i ]);
208 int sd = socket(PF_INET, SOCK_STREAM, 0);
209
210 int val1 = 0;
211 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
212
213 struct timeval timeout;
214 timeout.tv_sec = 1;
215 timeout.tv_usec = 0;
216 setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, (socklen_t)sizeof(timeout));
217
218 printf("[DEBUG] Connecting to %s port %d ...\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]);
219 while (1) {
220 if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
221 perror("Failed to connect. Retrying...");
222 usleep(500000);
223 } else {
224 printf("[DEBUG] Done\n");
225 break;
226 }
227 }
228
229 // m_socket.push_back(sd);
230 m_socket[ i ] = sd;
231
232 // check socket paramters
233 int val, len;
234 len = sizeof(val);
235 getsockopt(m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
236#ifdef DEBUG
237 printf("[DEBUG] SO_RCVBUF %d\n", val);
238#endif
239 getsockopt(m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
240#ifdef DEBUG
241 printf("[DEBUG] SO_SNDBUF %d\n", val);
242#endif
243 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
244#ifdef DEBUG
245 printf("[DEBUG] TCP_MAXSEG %d\n", val);
246#endif
247 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
248#ifdef DEBUG
249 printf("[DEBUG] TCP_NODELAY %d\n", val);
250#endif
251 if (g_status.isAvailable()) {
252 sockaddr_in sa;
253 memset(&sa, 0, sizeof(sockaddr_in));
254 socklen_t sa_len = sizeof(sa);
255 if (getsockname(m_socket[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
256 g_status.setInputPort(ntohs(sa.sin_port));
257 g_status.setInputAddress(sa.sin_addr.s_addr);
258 }
259 }
260
261 }
262 printf("[DEBUG] Initialization finished\n");
263 return 0;
264}
265
266
267
268int* DeSerializerPCModule::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock,
269 int* num_nodes_in_sendblock)
270{
271
272 int* temp_buf = NULL; // buffer for data-body
273 int flag = 0;
274
275 vector <int> each_buf_nwords;
276 each_buf_nwords.clear();
277 vector <int> each_buf_nodes;
278 each_buf_nodes.clear();
279 vector <int> each_buf_events;
280 each_buf_events.clear();
281
282 *total_buf_nwords = 0;
283 *num_nodes_in_sendblock = 0;
284 *num_events_in_sendblock = 0;
285
286 //
287 // Read Header and obtain data size
288 //
289 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
290
291 // Read header
292 for (int i = 0; i < (int)(m_socket.size()); i++) {
293
294 recvFD(m_socket[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
295
296 SendHeader send_hdr;
297 send_hdr.SetBuffer(send_hdr_buf);
298
299 int temp_num_events = send_hdr.GetNumEventsinPacket();
300 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
301
302
303
304 if (i == 0) {
305 *num_events_in_sendblock = temp_num_events;
306 } else if (*num_events_in_sendblock != temp_num_events) {
307
308#ifndef NO_DATA_CHECK
309 printf("[DEBUG] *******HDR**********\n");
310 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
311 char err_buf[500];
312 sprintf(err_buf,
313 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
314 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
315 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
316 sleep(1234567);
317 exit(1);
318#endif
319 }
320
321 *num_nodes_in_sendblock += temp_num_nodes;
322
323 int rawblk_nwords = send_hdr.GetTotalNwords()
324 - SendHeader::SENDHDR_NWORDS
325 - SendTrailer::SENDTRL_NWORDS;
326 *total_buf_nwords += rawblk_nwords;
327
328 //
329 // Data size check1
330 //
331 if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
332 printf("[DEBUG] *******HDR**********\n");
333 // printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
334 printData(send_hdr_buf, 100);
335 printASCIIData(send_hdr_buf, 100);
336 char err_buf[500];
337 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
338 send_hdr.GetTotalNwords());
339 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
340 sleep(123456);
341 exit(1);
342
343 }
344
345 each_buf_nwords.push_back(rawblk_nwords);
346 each_buf_events.push_back(temp_num_events);
347 each_buf_nodes.push_back(temp_num_nodes);
348
349 }
350
351
352 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
353 //
354 // Read body
355 //
356 int total_recvd_byte = 0;
357 for (int i = 0; i < (int)(m_socket.size()); i++) {
358
359 try {
360 total_recvd_byte += recvFD(m_socket[ i ], (char*)temp_buf + total_recvd_byte,
361 each_buf_nwords[ i ] * sizeof(int), flag);
362 } catch (const string& err_str) {
363 if (*delete_flag) {
364 B2WARNING("Delete buffer before going to Run-pause state");
365 delete temp_buf;
366 }
367 throw (err_str);
368 }
369
370 //
371 // Data length check
372 //
373 int temp_length = 0;
374 for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
375 int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
376 temp_length += this_length * sizeof(int);
377 }
378 if (temp_length != (int)(each_buf_nwords[ i ] * sizeof(int))) {
379 printf("[DEBUG]*******SENDHDR*********** \n");
380 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
381 printf("[DEBUG]*******BODY***********\n ");
382 printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
383 char err_buf[500];
384 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
385 (int)(*total_buf_nwords * sizeof(int)), temp_length);
386 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
387 sleep(1234567);
388 exit(-1);
389 }
390
391 }
392
393 if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
394 char err_buf[500];
395 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
396 total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
397 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
398 sleep(1234567);
399 exit(-1);
400 }
401
402 // Read Traeiler
403 int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
404 for (int i = 0; i < (int)(m_socket.size()); i++) {
405 try {
406 recvFD(m_socket[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
407 } catch (const string& err_str) {
408 if (*delete_flag) {
409 B2WARNING("Delete buffer before going to Run-pause state");
410 delete temp_buf;
411 }
412 throw (err_str);
413 }
414 }
415
416 return temp_buf;
417}
418
419
420void DeSerializerPCModule::setRecvdBuffer(RawDataBlock* temp_raw_datablk, int* delete_flag)
421{
422 //
423 // Get data from socket
424 //
425 int total_buf_nwords = 0 ;
426 int num_events_in_sendblock = 0;
427 int num_nodes_in_sendblock = 0;
428
429 if (m_start_flag == 0) B2INFO("DeSerializerPC: Reading the 1st packet from eb0...");
430
431 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
432 &num_nodes_in_sendblock);
433 if (m_start_flag == 0) {
434 B2INFO("DeSerializerPC: Done. the size of the 1st packet " << total_buf_nwords << " words");
435 m_start_flag = 1;
436 }
437 m_totbytes += total_buf_nwords * sizeof(int);
438
439 // Fixed for glibc error at Jan. 2017, reported in "Re: data taking with the new firmware".
440 // for temp_raw_datablk, delete_flag should be 0. raw_datablk will take care of deleting buffer
441 // temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
442 temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, 0,
443
444 num_events_in_sendblock, num_nodes_in_sendblock);
445
446 //
447 // check even # and node # in one Sendblock
448 //
449 int num_entries = temp_raw_datablk->GetNumEntries();
450 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
451 char err_buf[500];
452 sprintf(err_buf,
453 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
454 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
455 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
456 sleep(1234567);
457 exit(-1);
458 }
459 return;
460
461}
462
463
464
465void DeSerializerPCModule::checkData(RawDataBlock* raw_datablk, unsigned int* exp_copper_0, unsigned int* run_copper_0,
466 unsigned int* subrun_copper_0, unsigned int* eve_copper_0, unsigned int* error_bit_flag)
467{
468
469 // int data_size_copper_0 = -1;
470 // int data_size_copper_1 = -1;
471
472 //
473 // Data check
474 //
475 int* temp_buf = raw_datablk->GetBuffer(0);
476 int cpr_num = 0;
477 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
478 unsigned int eve_array[32]; // # of noeds is less than 17
479 unsigned int utime_array[32];// # of noeds is less than 17
480 unsigned int ctime_type_array[32];// # of noeds is less than 17
481
482#ifdef DUMHSLB
483 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
484#endif
485
486 for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
487 memset(eve_array, 0, sizeof(eve_array));
488 memset(utime_array, 0, sizeof(utime_array));
489 memset(ctime_type_array, 0, sizeof(ctime_type_array));
490
491 int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
492 for (int l = 0; l < num_nodes_in_sendblock; l++) {
493 int entry_id = l + k * num_nodes_in_sendblock;
494 //
495 // RawFTSW
496 //
497 if (raw_datablk->CheckFTSWID(entry_id)) {
498 RawFTSW* temp_rawftsw = new RawFTSW;
499 int block_id = 0;
500 temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
501 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
502 if (temp_rawftsw->GetEveNo(block_id) < 10) {
503 printf("[DEBUG] ######FTSW#########\n");
504 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
505 }
506
507#ifdef DUMHSLB
508 exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
509 ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
510 utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
511#endif
512
513
514#ifndef NO_DATA_CHECK
515 try {
516 temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
517 eve_array[ entry_id ] = cur_evenum;
518 } catch (const string& err_str) {
519 print_err.PrintError(m_shmflag, &g_status, err_str);
520 exit(1);
521 }
522#endif
523 utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
524 ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
525 delete temp_rawftsw;
526
527 //
528 // RawTLU
529 //
530 } else if (raw_datablk->CheckTLUID(entry_id)) {
531
532 RawTLU* temp_rawtlu = new RawTLU;
533 temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
534 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
535 if (temp_rawtlu->GetEveNo(0) < 10
536 ) {
537 printf("[DEBUG] ######TLU#########\n");
538 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
539 }
540
541#ifndef NO_DATA_CHECK
542 try {
543 temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
544 eve_array[ entry_id ] = cur_evenum;
545 } catch (const string& err_str) {
546 print_err.PrintError(m_shmflag, &g_status, err_str);
547 exit(1);
548 }
549#endif
550 delete temp_rawtlu;
551 } else {
552
553 //
554 // RawCOPPER
555 //
556
557
558 RawCOPPER* temp_rawcopper = new RawCOPPER;
559 temp_rawcopper->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
560 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
561
562#ifdef DUMHSLB
563 int block_id = 0;
564 (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
565 (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
566 (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
567#endif
568
569#ifndef NO_DATA_CHECK
570 try {
571
572 temp_rawcopper->CheckData(0, m_prev_evenum, &cur_evenum,
573 m_prev_copper_ctr, &cur_copper_ctr,
575 eve_array[ entry_id ] = cur_evenum;
576 } catch (const string& err_str) {
577 temp_rawcopper->PrintData(temp_rawcopper->GetWholeBuffer(), temp_rawcopper->TotalBufNwords());
578 print_err.PrintError(m_shmflag, &g_status, err_str);
579 exit(1);
580 }
581#endif
582
583 utime_array[ entry_id ] = temp_rawcopper->GetTTUtime(0);
584 ctime_type_array[ entry_id ] = temp_rawcopper->GetTTCtimeTRGType(0);
585
586 if (cpr_num == 0) {
587 // data_size_copper_0 = raw_datablk->GetBlockNwords(entry_id);
588 // *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
589 *eve_copper_0 = temp_rawcopper->GetEveNo(0);
590 *exp_copper_0 = temp_rawcopper->GetExpNo(0);
591 *run_copper_0 = temp_rawcopper->GetRunNo(0);
592 *subrun_copper_0 = temp_rawcopper->GetSubRunNo(0);
593 } else if (cpr_num == 1) {
594 // data_size_copper_1 = raw_datablk->GetBlockNwords(entry_id);
595 }
596 cpr_num++;
597
598 // Check Error bit flag
599 *error_bit_flag |= temp_rawcopper->GetErrorBitFlag(0);
600
601 delete temp_rawcopper;
602 }
603 }
604
605#ifndef NO_DATA_CHECK
606 // event #, ctime, utime over nodes
607 for (int l = 1; l < num_nodes_in_sendblock; l++) {
608 if (eve_array[ 0 ] != eve_array[ l ]) {
609// if (eve_array[ 0 ] != eve_array[ l ] ||
610// utime_array[ 0 ] != utime_array[ l ] ||
611// ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
612 char err_buf[500];
613 for (int m = 0; m < num_nodes_in_sendblock; m++) {
614 printf("[DEBUG] node %d eve # %u utime %x ctime %x\n",
615 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
616 }
617 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
618 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
619 sleep(1234567);
620 exit(-1);
621 }
622 }
623#endif
624
625 // Event # monitor in runchange
627 printf("[DEBUG] ##############################################\n");
628 for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
629 printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
630 }
631 printf("[DEBUG] ##############################################\n");
632 fflush(stderr);
633 }
634 m_prev_evenum = cur_evenum;
635 m_prev_copper_ctr = cur_copper_ctr;
637 }
638 return;
639}
640
641#ifdef NONSTOP
642void DeSerializerPCModule::waitResume()
643{
644 while (true) {
645#ifdef NONSTOP_DEBUG
646 printf("\033[31m");
647 printf("###########(Ser) Waiting for Resume ###############\n");
648 fflush(stdout);
649 printf("\033[0m");
650#endif
651 if (checkRunRecovery()) {
652 g_run_pause = 0;
653 for (int i = 0; i < m_num_connections; i++) {
654 if (CheckConnection(m_socket[ i ]) < 0) m_socket[ i ] = -1;
655 }
656 resumeRun();
657 break;
658 }
659 sleep(1);
660 }
661 return;
662
663}
664#endif
665
666void DeSerializerPCModule::setErrorFlag(unsigned int error_flag, StoreObjPtr<EventMetaData> evtmetadata)
667{
668 // RawHeader_latest raw_hdr;
669 int error_set = 0;
670 if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
671 evtmetadata->addErrorFlag(EventMetaData::c_B2LinkPacketCRCError);
672 error_set = 1;
673 }
674 if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
675 evtmetadata->addErrorFlag(EventMetaData::c_B2LinkEventCRCError);
676 error_set = 1;
677 }
678 if (error_set) B2INFO("Raw2Ds: Error flag was set in EventMetaData.");
679}
680
681
683{
684
685 // For data check
686 unsigned int exp_copper_0 = 0;
687 unsigned int run_copper_0 = 0;
688 unsigned int subrun_copper_0 = 0;
689 unsigned int eve_copper_0 = 0;
690 unsigned int error_bit_flag = 0;
691
692 clearNumUsedBuf();
693
694 if (m_start_flag == 0) {
695 //
696 // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
697 //
698 Connect();
699 if (g_status.isAvailable()) {
700 B2INFO("DeSerializerPC: Waiting for Start...\n");
701 g_status.reportRunning();
702 }
703 m_start_time = getTimeSec();
704 n_basf2evt = 0;
705 }
706
707
708#ifdef NONSTOP
709 if (g_run_pause > 0 || g_run_error > 0) {
710 if (g_run_pause == 0) {
711 while (true) {
712 if (checkRunPause()) break;
713#ifdef NONSTOP_DEBUG
714 printf("\033[31m");
715 printf("###########(DeserializerPC) Waiting for Runpause() ###############\n");
716 fflush(stdout);
717 printf("\033[0m");
718#endif
719 sleep(1);
720 }
721 }
722 waitResume();
723 m_eventMetaDataPtr.create(); // Otherwise endRun() is called.
724 return;
725 }
726#endif
727
728 // Make rawdatablk array
729
730
731 //
732 // Main loop
733 //
734 int* buf_rc = NULL;
735 for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
736 eve_copper_0 = 0;
737 //
738 // Set buffer to the RawData class stored in DataStore
739 //
740 int delete_flag = 0;
741 RawDataBlock temp_rawdatablk;
742 try {
743 setRecvdBuffer(&temp_rawdatablk, &delete_flag);
744 checkData(&temp_rawdatablk, &exp_copper_0, &run_copper_0, &subrun_copper_0, &eve_copper_0, &error_bit_flag);
745 } catch (const string& err_str) {
746#ifdef NONSTOP
747 // Update EventMetaData otherwise basf2 stops.
748 if (err_str == "RUN_PAUSE" || err_str == "RUN_ERROR") {
749 m_eventMetaDataPtr.create();
750 return;
751 }
752#endif
753 print_err.PrintError((char*)err_str.c_str(), __FILE__, __PRETTY_FUNCTION__, __LINE__);
754 exit(1);
755 }
756
757 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
758 raw_datablk->SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(), delete_flag,
759 temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
760 buf_rc = temp_rawdatablk.GetWholeBuffer();
761 }
762 if (buf_rc != NULL) {
763 g_status.copyEventHeader(buf_rc);
764 }
765
766 //
767 // Update EventMetaData
768 //
769 m_eventMetaDataPtr.create();
770 m_eventMetaDataPtr->setExperiment(exp_copper_0);
771 m_eventMetaDataPtr->setRun(run_copper_0);
772 m_eventMetaDataPtr->setSubrun(subrun_copper_0);
773 m_eventMetaDataPtr->setEvent(eve_copper_0);
774
775 setErrorFlag(error_bit_flag, m_eventMetaDataPtr);
776 if (error_bit_flag != 0) {
777 m_eventMetaDataPtr->addErrorFlag(EventMetaData::c_B2LinkEventCRCError);
778 printf("[ERROR] error bit was detected. exp %u run %u eve %u count = %u\n",
779 exp_copper_0, run_copper_0, eve_copper_0, error_bit_flag);
780 }
781
782
783 //
784 // Run stop via NSM (Already obsolete. Need to ask Konno-san about implementation)
785 //
786 // if (m_shmflag != 0) {
787 // if (n_basf2evt % 10 == 0) {
788 // if (g_status.isStopped()) {
789 // printf("[DEBUG] [INFO] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n", max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
790 // m_eventMetaDataPtr->setEndOfData();
791 // }
792 // }
793 // }
794
795 //
796 // Monitor
797 //
798 if (max_nevt >= 0 || max_seconds >= 0.) {
799 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
800 || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
801 printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
802 max_nevt, max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
803 m_eventMetaDataPtr->setEndOfData();
804 }
805 }
806
807 if (n_basf2evt % 20000 == 0 || n_basf2evt < 10) {
808 RateMonitor(eve_copper_0, subrun_copper_0, run_copper_0);
809 }
810 n_basf2evt++;
811 if (g_status.isAvailable()) {
812 g_status.setInputNBytes(m_totbytes);
813 g_status.setInputCount(n_basf2evt);
814 }
815
816 return;
817}
818
819
A class definition of an input module for Sequential ROOT I/O.
Definition: DeSerializer.h:36
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
Definition: DeSerializer.h:173
std::string m_nodename
Node name.
Definition: DeSerializer.h:161
int BUF_SIZE_WORD
size of buffer for one event (word)
Definition: DeSerializer.h:83
static RunInfoBuffer g_status
buffer class to communicate with NSM client
Definition: DeSerializer.h:200
int n_basf2evt
No. of sent events.
Definition: DeSerializer.h:86
CprErrorMessage print_err
wrapper for B2LOG system
Definition: DeSerializer.h:213
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
Definition: DeSerializer.h:92
int m_shmflag
Use shared memory.
Definition: DeSerializer.h:185
int * m_bufary[NUM_PREALLOC_BUF]
buffer
Definition: DeSerializer.h:101
unsigned int m_exprunsubrun_no
run no.
Definition: DeSerializer.h:170
virtual void printASCIIData(int *buf, int nwords)
dump error data
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
Definition: DeSerializer.h:77
int m_nodeid
Node(PC or COPPER) ID.
Definition: DeSerializer.h:158
double getTimeSec()
store time info.
std::vector< int > m_socket
Reciever Socket.
virtual int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
void initialize() override
Module functions to be called from main process.
void event() override
Module functions to be called from event process.
std::vector< int > m_port_from
port # to connect data sources
virtual int Connect()
Accept connection.
virtual int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
virtual void setRecvdBuffer(RawDataBlock *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
DeSerializerPCModule()
Constructor / Destructor.
virtual void checkData(RawDataBlock *raw_datablk, unsigned int *exp_copper_0, unsigned int *run_copper_0, unsigned int *subrun_copper_0, unsigned int *eve_copper_0, unsigned int *error_bit_flag)
check data contents
std::vector< std::string > m_hostname_from
hostname of upstream Data Sources
@ c_B2LinkPacketCRCError
Belle2link CRC error is detected in the event.
Definition: EventMetaData.h:44
@ c_B2LinkEventCRCError
HSLB_COPPER CRC error is detected in the event.
Definition: EventMetaData.h:45
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
Definition: RawCOPPER.h:52
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawCOPPER.cc:141
The RawDataBlock class Base class for rawdata handling.
Definition: RawDataBlock.h:27
virtual void PrintData(int *buf, int nwords)
print data
Definition: RawDataBlock.h:122
virtual int * GetBuffer(int n)
get nth buffer pointer
Definition: RawDataBlock.h:53
virtual int GetBufferPos(int n)
get position of data block in word
Definition: RawDataBlock.h:46
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:101
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
Definition: RawDataBlock.h:67
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawDataBlock.cc:35
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
Definition: RawDataBlock.h:74
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:108
virtual int GetBlockNwords(int n)
get size of a data block
Definition: RawDataBlock.h:94
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
Definition: RawDataBlock.h:60
virtual int GetNumEvents()
get # of events in m_buffer
Definition: RawDataBlock.h:81
virtual int TotalBufNwords()
Get total length of m_buffer.
Definition: RawDataBlock.h:39
The Raw FTSW class.
Definition: RawFTSW.h:30
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
Definition: RawFTSW.h:72
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Definition: RawFTSW.h:130
unsigned int GetEveNo(int n)
Get event #.
Definition: RawFTSW.h:65
unsigned int GetTTUtime(int n)
get unixtime of the trigger
Definition: RawFTSW.h:79
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawFTSW.cc:107
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
Definition: RawFTSW.h:121
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition: RawTLU.h:27
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition: RawTLU.h:81
unsigned int GetEveNo(int n)
Get Event #.
Definition: RawTLU.h:53
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:608
int GetExpNo(int n)
get Experimental # from header
Definition: RawCOPPER.h:365
unsigned int GetEveNo(int n)
get subrun #(8bit)
Definition: RawCOPPER.h:390
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:614
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
Definition: RawCOPPER.h:699
int GetSubRunNo(int n)
get run # (14bit)
Definition: RawCOPPER.h:384
unsigned int GetErrorBitFlag(int n)
get contents of header
Definition: RawCOPPER.h:416
int GetRunNo(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Definition: RawCOPPER.h:377
Abstract base class for different kinds of events.
STL namespace.