Belle II Software prerelease-10-00-00a
DeSerializerPrePC.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/rawdata/modules/DeSerializerPrePC.h>
10#include <daq/dataobjects/SendHeader.h>
11#include <daq/dataobjects/SendTrailer.h>
12#include <rawdata/dataobjects/RawTLU.h>
13
14#include <netinet/tcp.h>
15
16//#define MAXEVTSIZE 400000000
17#define CHECKEVT 5000
18
19//#define DEBUG
20//#define NO_DATA_CHECK
21//#define DUMHSLB
22
23using namespace std;
24using namespace Belle2;
25
26//-----------------------------------------------------------------
27// Register the Module
28//-----------------------------------------------------------------
29REG_MODULE(DeSerializerPrePC);
30
31//-----------------------------------------------------------------
32// Implementation
33//-----------------------------------------------------------------
34
36{
37 //Set module properties
38 setDescription("Encode DataStore into RingBuffer");
39 // setPropertyFlags(c_Input | c_ParallelProcessingCertified);
40 addParam("NumConn", m_num_connections, "Number of Connections", 0);
41 addParam("HostNameFrom", m_hostname_from, "Hostnames of data sources");
42 addParam("PortFrom", m_port_from, "port numbers of data sources");
43
44 B2INFO("DeSerializerPrePC: Constructor done.");
45}
46
47
48
49DeSerializerPrePCModule::~DeSerializerPrePCModule()
50{
51
52}
53
54
56{
57 B2INFO("DeSerializerPrePC: initialize() started.");
58
59 // allocate buffer
60 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
61 m_bufary[i] = new int[ BUF_SIZE_WORD ];
62 }
63 m_buffer = new int[ BUF_SIZE_WORD ];
64
65
66 // initialize buffer
67 for (int i = 0 ; i < NUM_PREALLOC_BUF; i++) {
68 memset(m_bufary[i], 0, BUF_SIZE_WORD * sizeof(int));
69 }
70
71 // Initialize EvtMetaData
72 m_eventMetaDataPtr.registerInDataStore();
73
74 raw_datablkarray.registerInDataStore();
75 rawcprarray.registerInDataStore();
76 raw_ftswarray.registerInDataStore();
77
78 // Initialize Array of RawCOPPER
79
80 if (m_dump_fname.size() > 0) {
82 }
83
84 // Initialize arrays for time monitor
85 memset(time_array0, 0, sizeof(time_array0));
86 memset(time_array1, 0, sizeof(time_array1));
87 memset(time_array2, 0, sizeof(time_array2));
88
89 clearNumUsedBuf();
90
91 // Shared memory
92 if (m_shmflag > 0) {
93 if (m_nodename.size() == 0 || m_nodeid < 0) {
94 m_shmflag = 0;
95 } else {
97 g_status.reportReady();
98 }
99 }
100
101 event_diff = 0;
102
103 m_prev_copper_ctr = 0xFFFFFFFF;
104 m_prev_evenum = 0xFFFFFFFF;
105
106 B2INFO("DeSerializerPrePC: initialize() done.");
107}
108
109
110int DeSerializerPrePCModule::recvFD(int sock, char* buf, int data_size_byte, int flag)
111{
112 int n = 0;
113 while (1) {
114 int read_size = 0;
115 if ((read_size = recv(sock, (char*)buf + n, data_size_byte - n, flag)) < 0) {
116 if (errno == EINTR) {
117 continue;
118 } else {
119 char err_buf[500];
120 sprintf(err_buf, "[FATAL] Failed to receive data(%s). Exiting...", strerror(errno));
121 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
122 sleep(1234567);
123 exit(-1);
124 }
125 } else {
126 n += read_size;
127 if (n == data_size_byte)break;
128 }
129 }
130 return n;
131}
132
134{
135 for (int i = 0; i < m_num_connections; i++) {
136 //
137 // Connect to a downstream node
138 //
139 struct sockaddr_in socPC;
140 socPC.sin_family = AF_INET;
141
142 struct hostent* host;
143 host = gethostbyname(m_hostname_from[ i ].c_str());
144 if (host == NULL) {
145 char err_buf[100];
146 sprintf(err_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...", m_hostname_from[ i ].c_str(),
147 strerror(errno));
148 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
149 sleep(1234567);
150 exit(1);
151 }
152 socPC.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
153 socPC.sin_port = htons(m_port_from[ i ]);
154 int sd = socket(PF_INET, SOCK_STREAM, 0);
155
156 int val1 = 0;
157 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &val1, sizeof(val1));
158
159 printf("[DEBUG] Connecting to %s port %d ...\n", m_hostname_from[ i ].c_str(), m_port_from[ i ]);
160 while (1) {
161 if (connect(sd, (struct sockaddr*)(&socPC), sizeof(socPC)) < 0) {
162 perror("Failed to connect. Retrying...");
163 usleep(500000);
164 } else {
165 printf("[DEBUG] Done\n");
166 break;
167 }
168 }
169 m_socket.push_back(sd);
170
171 // check socket paramters
172 int val, len;
173 len = sizeof(val);
174 getsockopt(m_socket[ i ], SOL_SOCKET, SO_RCVBUF, &val, (socklen_t*)&len);
175#ifdef DEBUG
176 printf("[DEBUG] SO_RCVBUF %d\n", val);
177#endif
178 getsockopt(m_socket[ i ], SOL_SOCKET, SO_SNDBUF, &val, (socklen_t*)&len);
179#ifdef DEBUG
180 printf("[DEBUG] SO_SNDBUF %d\n", val);
181#endif
182 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_MAXSEG, &val, (socklen_t*)&len);
183#ifdef DEBUG
184 printf("[DEBUG] TCP_MAXSEG %d\n", val);
185#endif
186 getsockopt(m_socket[ i ], IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t*)&len);
187#ifdef DEBUG
188 printf("[DEBUG] TCP_NODELAY %d\n", val);
189#endif
190 if (g_status.isAvailable()) {
191 sockaddr_in sa;
192 memset(&sa, 0, sizeof(sockaddr_in));
193 socklen_t sa_len = sizeof(sa);
194 if (getsockname(m_socket[i], (struct sockaddr*)&sa, (socklen_t*)&sa_len) == 0) {
195 g_status.setInputPort(ntohs(sa.sin_port));
196 g_status.setInputAddress(sa.sin_addr.s_addr);
197 }
198 }
199
200 }
201 printf("[DEBUG] Initialization finished\n");
202 return 0;
203}
204
205
206
207int* DeSerializerPrePCModule::recvData(int* delete_flag, int* total_buf_nwords, int* num_events_in_sendblock,
208 int* num_nodes_in_sendblock)
209{
210
211 int* temp_buf = NULL; // buffer for data-body
212 int flag = 0;
213
214 vector <int> each_buf_nwords;
215 each_buf_nwords.clear();
216 vector <int> each_buf_nodes;
217 each_buf_nodes.clear();
218 vector <int> each_buf_events;
219 each_buf_events.clear();
220
221 *total_buf_nwords = 0;
222 *num_nodes_in_sendblock = 0;
223 *num_events_in_sendblock = 0;
224
225 //
226 // Read Header and obtain data size
227 //
228 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
229
230 // Read header
231 for (int i = 0; i < (int)(m_socket.size()); i++) {
232
233 recvFD(m_socket[ i ], (char*)send_hdr_buf, sizeof(int)*SendHeader::SENDHDR_NWORDS, flag);
234
235 SendHeader send_hdr;
236 send_hdr.SetBuffer(send_hdr_buf);
237
238 int temp_num_events = send_hdr.GetNumEventsinPacket();
239 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
240
241
242
243 if (i == 0) {
244 *num_events_in_sendblock = temp_num_events;
245 } else if (*num_events_in_sendblock != temp_num_events) {
246#ifndef NO_DATA_CHECK
247 char err_buf[500];
248 sprintf(err_buf,
249 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
250 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
251 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
252 sleep(1234567);
253 exit(1);
254#endif
255 }
256
257 *num_nodes_in_sendblock += temp_num_nodes;
258
259 int rawblk_nwords = send_hdr.GetTotalNwords()
260 - SendHeader::SENDHDR_NWORDS
261 - SendTrailer::SENDTRL_NWORDS;
262 *total_buf_nwords += rawblk_nwords;
263
264 //
265 // Data size check1
266 //
267 if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
268 printf("[DEBUG] *******HDR**********\n");
269 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
270 char err_buf[500];
271 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Too large event : Header %d %d %d %d\n", i, temp_num_events, temp_num_nodes,
272 send_hdr.GetTotalNwords());
273 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
274 sleep(123456);
275 exit(1);
276
277 }
278
279 each_buf_nwords.push_back(rawblk_nwords);
280 each_buf_events.push_back(temp_num_events);
281 each_buf_nodes.push_back(temp_num_nodes);
282
283 }
284
285
286 temp_buf = getNewBuffer(*total_buf_nwords, delete_flag); // this include only data body
287 //
288 // Read body
289 //
290 int total_recvd_byte = 0;
291 for (int i = 0; i < (int)(m_socket.size()); i++) {
292 total_recvd_byte += recvFD(m_socket[ i ], (char*)temp_buf + total_recvd_byte,
293 each_buf_nwords[ i ] * sizeof(int), flag);
294
295 //
296 // Data length check
297 //
298 unsigned temp_length = 0;
299 for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
300 int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
301 temp_length += this_length * sizeof(int);
302 }
303 if (temp_length != each_buf_nwords[ i ] * sizeof(int)) {
304 printf("[DEBUG]*******SENDHDR*********** \n");
305 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
306 printf("[DEBUG]*******BODY***********\n ");
307 printData(temp_buf, (int)(total_recvd_byte / sizeof(int)));
308 char err_buf[500];
309 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %u. Exting...",
310 (int)(*total_buf_nwords * sizeof(int)), temp_length);
311 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
312 sleep(1234567);
313 exit(-1);
314 }
315
316 }
317
318 if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
319 char err_buf[500];
320 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
321 total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
322 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
323 sleep(1234567);
324 exit(-1);
325 }
326
327 // Read Traeiler
328 int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
329 for (int i = 0; i < (int)(m_socket.size()); i++) {
330 recvFD(m_socket[ i ], (char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS * sizeof(int), flag);
331 }
332
333 return temp_buf;
334}
335
336
337void DeSerializerPrePCModule::setRecvdBuffer(RawDataBlock* temp_raw_datablk, int* delete_flag)
338{
339 //
340 // Get data from socket
341 //
342 int total_buf_nwords = 0 ;
343 int num_events_in_sendblock = 0;
344 int num_nodes_in_sendblock = 0;
345
346 if (m_start_flag == 0) B2INFO("DeSerializerPrePC: Reading the 1st packet from eb0...");
347 int* temp_buf = recvData(delete_flag, &total_buf_nwords, &num_events_in_sendblock,
348 &num_nodes_in_sendblock);
349 if (m_start_flag == 0) {
350 B2INFO("DeSerializerPrePC: Done. the size of the 1st packet " << total_buf_nwords << " words");
351 m_start_flag = 1;
352 }
353 m_totbytes += total_buf_nwords * sizeof(int);
354
355 temp_raw_datablk->SetBuffer((int*)temp_buf, total_buf_nwords, *delete_flag,
356 num_events_in_sendblock, num_nodes_in_sendblock);
357
358 //
359 // check even # and node # in one Sendblock
360 //
361 int num_entries = temp_raw_datablk->GetNumEntries();
362 if (num_entries != num_events_in_sendblock * num_nodes_in_sendblock) {
363 char err_buf[500];
364 sprintf(err_buf,
365 "[FATAL] CORRUPTED DATA: Inconsistent SendHeader value. # of nodes(%d) times # of events(%d) differs from # of entries(%d). Exiting...",
366 num_nodes_in_sendblock, num_events_in_sendblock, num_entries);
367 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
368 sleep(1234567);
369 exit(-1);
370 }
371 return;
372
373}
374
375
376
377void DeSerializerPrePCModule::checkData(RawDataBlock* raw_datablk, unsigned int* eve_copper_0)
378{
379 //
380 // Data check
381 //
382 int* temp_buf = raw_datablk->GetBuffer(0);
383 int cpr_num = 0;
384 unsigned int cur_evenum = 0, cur_copper_ctr = 0;
385 unsigned int eve_array[32]; // # of noeds is less than 17
386 unsigned int utime_array[32];// # of noeds is less than 17
387 unsigned int ctime_type_array[32];// # of noeds is less than 17
388
389#ifdef DUMHSLB
390 unsigned int exp_run_ftsw = 0, ctime_trgtype_ftsw = 0, utime_ftsw = 0;
391#endif
392
393 for (int k = 0; k < raw_datablk->GetNumEvents(); k++) {
394 memset(eve_array, 0, sizeof(eve_array));
395 memset(utime_array, 0, sizeof(utime_array));
396 memset(ctime_type_array, 0, sizeof(ctime_type_array));
397
398 int num_nodes_in_sendblock = raw_datablk->GetNumNodes();
399 for (int l = 0; l < num_nodes_in_sendblock; l++) {
400 int entry_id = l + k * num_nodes_in_sendblock;
401
402 //
403 // RawFTSW
404 //
405 if (raw_datablk->CheckFTSWID(entry_id)) {
406 RawFTSW* temp_rawftsw = new RawFTSW;
407 int block_id = 0;
408 temp_rawftsw->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
409 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
410 if (temp_rawftsw->GetEveNo(block_id) < 10) {
411 printf("[DEBUG] ######FTSW#########\n");
412 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
413 }
414
415#ifdef DUMHSLB
416 exp_run_ftsw = temp_rawftsw->GetExpRunSubrun(block_id);
417 ctime_trgtype_ftsw = temp_rawftsw->GetTTCtimeTRGType(block_id);
418 utime_ftsw = temp_rawftsw->GetTTUtime(block_id);
419#endif
420
421
422#ifndef NO_DATA_CHECK
423 try {
424 temp_rawftsw->CheckData(0, m_prev_evenum, &cur_evenum, m_prev_exprunsubrun_no, &m_exprunsubrun_no);
425 eve_array[ entry_id ] = cur_evenum;
426 } catch (const string& err_str) {
427 print_err.PrintError(m_shmflag, &g_status, err_str);
428 exit(1);
429 }
430#endif
431 utime_array[ entry_id ] = temp_rawftsw->GetTTUtime(block_id);
432 ctime_type_array[ entry_id ] = temp_rawftsw->GetTTCtimeTRGType(block_id);
433 delete temp_rawftsw;
434
435 //
436 // RawTLU
437 //
438 } else if (raw_datablk->CheckTLUID(entry_id)) {
439
440 RawTLU* temp_rawtlu = new RawTLU;
441 temp_rawtlu->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
442 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
443 if (temp_rawtlu->GetEveNo(0) < 10
444 ) {
445 printf("[DEBUG] ######TLU#########\n");
446 printData((int*)temp_buf + raw_datablk->GetBufferPos(entry_id), raw_datablk->GetBlockNwords(entry_id));
447 }
448
449#ifndef NO_DATA_CHECK
450 try {
451 temp_rawtlu->CheckData(0, m_prev_evenum, &cur_evenum);
452 eve_array[ entry_id ] = cur_evenum;
453 } catch (const string& err_str) {
454 print_err.PrintError(m_shmflag, &g_status, err_str);
455 exit(1);
456 }
457#endif
458 delete temp_rawtlu;
459 } else {
460
461
462
463 //
464 // RawCOPPER
465 //
466
467 RawCOPPER* temp_rawcopper = new RawCOPPER;
468 temp_rawcopper->SetBuffer((int*)temp_buf + raw_datablk->GetBufferPos(entry_id),
469 raw_datablk->GetBlockNwords(entry_id), 0, 1, 1);
470
471#ifdef DUMHSLB
472 int block_id = 0;
473 (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_EXP_RUN_NO ] = exp_run_ftsw;
474 (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTCTIME_TRGTYPE ] = ctime_trgtype_ftsw;
475 (temp_rawcopper->GetBuffer(block_id))[ RawHeader_latest::POS_TTUTIME ] = utime_ftsw;
476#endif
477
478
479
480#ifndef NO_DATA_CHECK
481 try {
482
483 temp_rawcopper->CheckData(0, m_prev_evenum, &cur_evenum,
484 m_prev_copper_ctr, &cur_copper_ctr,
486 eve_array[ entry_id ] = cur_evenum;
487 } catch (const string& err_str) {
488 print_err.PrintError(m_shmflag, &g_status, err_str);
489 exit(1);
490 }
491#endif
492
493 utime_array[ entry_id ] = temp_rawcopper->GetTTUtime(0);
494 ctime_type_array[ entry_id ] = temp_rawcopper->GetTTCtimeTRGType(0);
495
496 if (cpr_num == 0) {
497 raw_datablk->GetBlockNwords(entry_id); // returns data_size_copper_0
498 *eve_copper_0 = (raw_datablk->GetBuffer(entry_id))[ 3 ];
499 } else if (cpr_num == 1) {
500 raw_datablk->GetBlockNwords(entry_id); // returns data_size_copper_1
501 }
502 cpr_num++;
503 delete temp_rawcopper;
504
505
506 }
507
508 }
509
510#ifndef NO_DATA_CHECK
511 // event #, ctime, utime over nodes
512 for (int l = 1; l < num_nodes_in_sendblock; l++) {
513 if (eve_array[ 0 ] != eve_array[ l ] ||
514 utime_array[ 0 ] != utime_array[ l ] ||
515 ctime_type_array[ 0 ] != ctime_type_array[ l ]) {
516 char err_buf[500];
517 for (int m = 0; m < num_nodes_in_sendblock; m++) {
518 printf("[DEBUG] node %d eve # %u utime %x ctime %x\n",
519 m, eve_array[ m ], utime_array[ m ], ctime_type_array[ m ]);
520 }
521 sprintf(err_buf, "[FATAL] CORRUPTED DATA: Event or Time record mismatch. Exiting...");
522 print_err.PrintError(m_shmflag, &g_status, err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
523 sleep(1234567);
524 exit(-1);
525 }
526 }
527#endif
528
529 // Event # monitor in runchange
530// if (m_prev_runsubrun_no != m_runsubrun_no) {
531// printf("[DEBUG] ##############################################\n");
532// for (int m = 0; m < raw_datablk->GetNumEntries(); m++) {
533// printf("[DEBUG] %d eve %u prev %u\n", m, eve_array[ m ], m_prev_evenum);
534// }
535// printf("[DEBUG] ##############################################\n");
536// fflush(stderr);
537// }
538 m_prev_evenum = cur_evenum;
539 m_prev_copper_ctr = cur_copper_ctr;
541 }
542 return;
543}
544
545
546
548{
549 // For data check
550
551 unsigned int eve_copper_0 = 0;
552
553 clearNumUsedBuf();
554
555 if (m_start_flag == 0) {
556 //
557 // Connect to eb0: This should be here because we want Serializer first to accept connection from eb1tx
558 //
559 Connect();
560 if (g_status.isAvailable()) {
561 B2INFO("DeSerializerPrePC: Waiting for Start...\n");
562 g_status.reportRunning();
563 }
564 m_start_time = getTimeSec();
565 n_basf2evt = 0;
566 }
567
568 // Make rawdatablk array
569
570
571 //
572 // Main loop
573 //
574 for (int j = 0; j < NUM_EVT_PER_BASF2LOOP_PC; j++) {
575
576 //
577 // Receive data from COPPER
578 //
579 eve_copper_0 = 0;
580 int delete_flag_from = 0, delete_flag_to = 0;
581 RawDataBlock temp_rawdatablk;
582 setRecvdBuffer(&temp_rawdatablk, &delete_flag_from);
583 // temp_rawdatablk.PrintData( temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords() );
584 checkData(&temp_rawdatablk, &eve_copper_0);
585 // PreRawCOPPERFormat_latest pre_rawcopper_latest;
586 // pre_rawcopper_latest.SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
587 // 0, temp_rawdatablk.GetNumEvents(), temp_rawdatablk.GetNumNodes());
589
590
591 int temp_num_events, temp_num_nodes;
592 int temp_nwords_to;
593 int* buf_to = NULL;
594#ifdef REDUCED_RAWCOPPER
595 //
596 // Copy reduced buffer
597 //
598 // int* buf_to = getNewBuffer(m_pre_rawcpr.CalcReducedDataSize(&temp_rawdatablk), &delete_flag_to); // basf2-dependent style
599 int* temp_bufin = temp_rawdatablk.GetWholeBuffer();
600 int temp_nwords_from = temp_rawdatablk.TotalBufNwords();
601 temp_num_events = temp_rawdatablk.GetNumEvents();
602 temp_num_nodes = temp_rawdatablk.GetNumNodes();
603 int calced_temp_nwords_to = m_pre_rawcpr.CalcReducedDataSize(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes);
604 buf_to = getNewBuffer(calced_temp_nwords_to, &delete_flag_to);
605
606 // m_pre_rawcpr.CopyReducedData(&temp_rawdatablk, buf_to, delete_flag_from); // basf2-dependent style
607 m_pre_rawcpr.CopyReducedData(temp_bufin, temp_nwords_from, temp_num_events, temp_num_nodes, buf_to, &temp_nwords_to);
608 if (calced_temp_nwords_to != temp_nwords_to) {
609 char err_buf[500];
610 sprintf(err_buf,
611 "[FATAL] CORRUPTED DATA: Estimations of reduced event size are inconsistent. CalcReducedDataSize = %d. CopyReducedData = %d. Exiting...",
612 calced_temp_nwords_to, temp_nwords_to);
613 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
614 exit(1);
615 }
616
617#else
618 buf_to = temp_rawdatablk.GetWholeBuffer();
619 temp_nwords_to = temp_rawdatablk.TotalBufNwords();
620 temp_num_events = temp_rawdatablk.GetNumEvents();
621 temp_num_nodes = temp_rawdatablk.GetNumNodes();
622 delete_flag_to = delete_flag_from;
623 delete_flag_from = 0; // to avoid double delete
624#endif
625
626
627 //
628 // Set buffer to the RawData class stored in DataStore
629 //
630 RawDataBlock* raw_datablk = raw_datablkarray.appendNew();
631// raw_datablk->SetBuffer((int*)temp_rawdatablk.GetWholeBuffer(), temp_rawdatablk.TotalBufNwords(),
632// delete_flag_to, temp_rawdatablk.GetNumEvents(),
633// temp_rawdatablk.GetNumNodes());
634 raw_datablk->SetBuffer(buf_to, temp_nwords_to, delete_flag_to,
635 temp_num_events, temp_num_nodes);
636
637
638 //
639 // CRC16 check after data reduction
640 //
641#ifdef REDUCED_RAWCOPPER
642 PostRawCOPPERFormat_latest post_rawcopper_latest;
643 post_rawcopper_latest.SetBuffer(buf_to, temp_nwords_to,
644 0, temp_num_events, temp_num_nodes);
645
646 for (int i_finesse_num = 0; i_finesse_num < 4; i_finesse_num ++) {
647 int block_num = 0;
648 if (post_rawcopper_latest.GetFINESSENwords(block_num, i_finesse_num) > 0) {
649 post_rawcopper_latest.CheckCRC16(block_num, i_finesse_num);
650 }
651 }
652
653#endif
654
655 }
656
657
658 //
659 // Update EventMetaData
660 //
661 m_eventMetaDataPtr.create();
662 m_eventMetaDataPtr->setExperiment(1);
663 m_eventMetaDataPtr->setRun(1);
664 m_eventMetaDataPtr->setEvent(n_basf2evt);
665
666
667
668
669 //
670 // Run stop via NSM (Already obsolete. Need to ask Konno-san about implementation)
671 //
672 // if (m_shmflag != 0) {
673 // if (n_basf2evt % 10 == 0) {
674 // if (g_status.isStopped()) {
675 // printf("[DEBUG] [INFO] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n", max_nevt , max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
676 // m_eventMetaDataPtr->setEndOfData();
677 // }
678 // }
679 // }
680
681 //
682 // Monitor
683 //
684 if (max_nevt >= 0 || max_seconds >= 0.) {
685 if ((n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC >= max_nevt && max_nevt > 0)
686 || (getTimeSec() - m_start_time > max_seconds && max_seconds > 0.)) {
687 printf("[DEBUG] RunStop was detected. ( Setting: Max event # %d MaxTime %lf ) Processed Event %d Elapsed Time %lf[s]\n",
688 max_nevt, max_seconds, n_basf2evt * NUM_EVT_PER_BASF2LOOP_PC, getTimeSec() - m_start_time);
689 m_eventMetaDataPtr->setEndOfData();
690 }
691 }
692
693 if (n_basf2evt % 20000 == 0 || n_basf2evt < 10) {
694 RateMonitor(eve_copper_0);
695 }
696
697
698
699 if (g_status.isAvailable()) {
700 g_status.setInputNBytes(m_totbytes);
701 g_status.setInputCount(n_basf2evt);
702 }
703
704 n_basf2evt++;
705 return;
706}
void RateMonitor(unsigned int nevt, int subrun=-1, int run=-1)
monitor rate
unsigned int m_prev_exprunsubrun_no
run no.
std::string m_nodename
Node name.
int BUF_SIZE_WORD
size of buffer for one event (word)
static RunInfoBuffer g_status
buffer class to communicate with NSM client
int n_basf2evt
No. of sent events.
DeSerializerModule()
Constructor / Destructor.
CprErrorMessage print_err
wrapper for B2LOG system
virtual void printData(int *buf, int nwords)
dump error data
virtual int * getNewBuffer(int nwords, int *delete_flag)
Getbuffer.
std::string m_dump_fname
dump filename
int m_shmflag
Use shared memory.
int * m_bufary[NUM_PREALLOC_BUF]
buffer
unsigned int m_exprunsubrun_no
run no.
virtual void openOutputFile()
Module functions to be called from event process.
double max_seconds
time to stop a run
int m_nodeid
Node(PC or COPPER) ID.
double getTimeSec()
store time info.
std::vector< int > m_socket
Reciever Socket.
virtual int * recvData(int *delete_flag, int *total_m_size_word, int *num_events_in_sendblock, int *num_nodes_in_sendblock)
receive data
void initialize() override
Module functions to be called from main process.
void event() override
Module functions to be called from event process.
virtual void checkData(RawDataBlock *raw_datablk, unsigned int *eve_copper_0)
check data contents
std::vector< int > m_port_from
port # to connect data sources
virtual int Connect()
Accept connection.
virtual int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data
DeSerializerPrePCModule()
Constructor / Destructor.
virtual void setRecvdBuffer(RawDataBlock *raw_datablk, int *delete_flag)
attach buffer to RawDataBlock
std::vector< std::string > m_hostname_from
hostname of upstream Data Sources
void setDescription(const std::string &description)
Sets the description of the module.
Definition Module.cc:214
int CheckCRC16(int n, int finesse_num)
check magic words
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
Definition RawCOPPER.h:52
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freed( = 0 )/ not freed( = 1 ) in Destructor )
Definition RawCOPPER.cc:141
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freed( = 0 )/ not freed( = 1 ) in Destructor )
The RawDataBlock class Base class for rawdata handling.
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int GetBufferPos(int n)
get position of data block in word
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetNumEntries()
get # of data blocks = (# of nodes)*(# of events)
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freed( = 0 )/ not freed( = 1 ) in Destructor )
virtual int GetNumNodes()
get # of data sources(e.g. # of COPPER boards) in m_buffer
virtual int CheckTLUID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual int GetBlockNwords(int n)
get size of a data block
virtual int * GetWholeBuffer()
get pointer to buffer(m_buffer)
virtual int GetNumEvents()
get # of events in m_buffer
virtual int TotalBufNwords()
Get total length of m_buffer.
The Raw FTSW class.
Definition RawFTSW.h:30
unsigned int GetTTCtimeTRGType(int n)
Get a word containing ctime and trigger type info.
Definition RawFTSW.h:72
unsigned int GetExpRunSubrun(int n)
Exp# (10bit) run# (14bit) restart # (8bit)
Definition RawFTSW.h:130
unsigned int GetEveNo(int n)
Get event #.
Definition RawFTSW.h:65
unsigned int GetTTUtime(int n)
get unixtime of the trigger
Definition RawFTSW.h:79
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freed( = 0 )/ not freed( = 1 ) in Destructor )
Definition RawFTSW.cc:107
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check the data contents
Definition RawFTSW.h:121
The Raw TLU class Class for data from DAQ PC for TLU(Trigger Logic Unit) It is supposed to be used on...
Definition RawTLU.h:27
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum)
check data contents
Definition RawTLU.h:81
unsigned int GetEveNo(int n)
Get Event #.
Definition RawTLU.h:53
void SetBuffer(int *hdr)
set buffer
Definition SendHeader.cc:37
int GetNumEventsinPacket()
get contents of Header
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition Module.h:559
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition Module.h:649
unsigned int GetTTCtimeTRGType(int n)
Check if COPPER Magic words are correct.
Definition RawCOPPER.h:608
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
Definition RawCOPPER.h:614
void CheckData(int n, unsigned int prev_evenum, unsigned int *cur_evenum, unsigned int prev_copper_ctr, unsigned int *cur_copper_ctr, unsigned int prev_exprunsubrun_no, unsigned int *cur_exprunsubrun_no)
check data contents
Definition RawCOPPER.h:699
int GetFINESSENwords(int n, int finesse_num) OVERRIDE_CPP17
Get the size of a finesse buffer.
Abstract base class for different kinds of events.
STL namespace.