Bug Summary

File:daq/rawdata/modules/src/Serializer.cc
Warning:line 308, column 7
Value stored to 'n' is never read

Annotated Source Code

Press '?' to see keyboard shortcuts

clang -cc1 -cc1 -triple x86_64-unknown-linux-gnu -O3 -analyze -disable-free -clear-ast-before-backend -disable-llvm-verifier -discard-value-names -main-file-name Serializer.cc -analyzer-checker=core -analyzer-checker=apiModeling -analyzer-checker=unix -analyzer-checker=deadcode -analyzer-checker=cplusplus -analyzer-checker=security.insecureAPI.UncheckedReturn -analyzer-checker=security.insecureAPI.getpw -analyzer-checker=security.insecureAPI.gets -analyzer-checker=security.insecureAPI.mktemp -analyzer-checker=security.insecureAPI.mkstemp -analyzer-checker=security.insecureAPI.vfork -analyzer-checker=nullability.NullPassedToNonnull -analyzer-checker=nullability.NullReturnedFromNonnull -analyzer-output plist -w -setup-static-analyzer -mrelocation-model pic -pic-level 2 -fhalf-no-semantic-interposition -mframe-pointer=none -fmath-errno -ffp-contract=on -fno-rounding-math -mconstructor-aliases -funwind-tables=2 -target-cpu x86-64 -tune-cpu generic -debugger-tuning=gdb -fdebug-compilation-dir=/data/b2soft/buildbot/development/build -fcoverage-compilation-dir=/data/b2soft/buildbot/development/build -resource-dir /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/lib/clang/21 -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/c++ -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/c++/x86_64-redhat-linux -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/c++/backward -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/include -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/python3.12 -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/include/CLHEP -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/Geant4 -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/include/root -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/include/belle_legacy -I include/ -D _PACKAGE_="daq" -D G4UI_USE_TCSH -D RaveDllExport= -D HAS_SQLITE -D HAS_CALLGRIND -I include -I /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/libxml2 -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/bin/../lib64/gcc/x86_64-redhat-linux/15.2.0/../../../../include/c++ -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/bin/../lib64/gcc/x86_64-redhat-linux/15.2.0/../../../../include/c++/x86_64-redhat-linux -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/bin/../lib64/gcc/x86_64-redhat-linux/15.2.0/../../../../include/c++/backward -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/lib/clang/21/include -internal-isystem /usr/local/include -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/bin/../lib64/gcc/x86_64-redhat-linux/15.2.0/../../../../x86_64-redhat-linux/include -internal-externc-isystem /include -internal-externc-isystem /usr/include -Wno-missing-braces -Wno-unused-command-line-argument -std=c++20 -fdeprecated-macro -ferror-limit 19 -fgnuc-version=4.2.1 -fno-implicit-modules -fskip-odr-check-in-gmf -fcxx-exceptions -fexceptions -vectorize-loops -vectorize-slp -analyzer-output=html -faddrsig -D__GCC_HAVE_DWARF2_CFI_ASM=1 -o /scan_build/2026-05-31-004316-385593-1 -x c++ daq/rawdata/modules/src/Serializer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/rawdata/modules/DAQConsts.h>
9#include <daq/rawdata/modules/Serializer.h>
10#include <daq/rawdata/modules/DeSerializer.h>
11
12#include <netinet/tcp.h>
13
14#include <sys/uio.h>
15
16#include <csignal>
17#include <fcntl.h>
18
19using namespace std;
20using namespace Belle2;
21
22//#define DEBUG
23
24//-----------------------------------------------------------------
25// Register the Module
26//-----------------------------------------------------------------
27REG_MODULE(Serializer)namespace { struct ModuleProxySerializer: public ModuleProxyBase
{ ModuleProxySerializer(): ModuleProxyBase("Serializer", "" "daq"
) {} virtual ::Belle2::Module* createInstance() const override
final { return new SerializerModule(); } } proxySerializerModule
; }
;
28
29//-----------------------------------------------------------------
30// Implementation
31//-----------------------------------------------------------------
32
33
34
35SerializerModule::SerializerModule() : Module()
36{
37 //Set module properties
38 setDescription("Encode DataStore into RingBuffer");
39 addParam("DestPort", m_port_to, "Destination port", BASE_PORT_ROPC_COPPER30000);
40 addParam("LocalHostName", m_hostname_local, "local host", string(""));
41#ifdef DUMMY
42 addParam("EventDataBufferWords", BUF_SIZE_WORD, "DataBuffer words per event", 4800);
43#endif
44 addParam("use Shared Memory", m_shmflag, "m_shmflag", 0);
45
46 m_start_flag = 0;
47 n_basf2evt = -1;
48 m_compressionLevel = 0;
49
50 //Parameter definition
51 B2INFO("Tx: Constructor done.")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "Tx: Constructor done."; Belle2::LogSystem
::Instance().sendMessage(Belle2::LogMessage(Belle2::LogConfig
::c_Info, std::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 51, 0)); }; } } while(false)
;
52
53}
54
55
56
57SerializerModule::~SerializerModule()
58{
59}
60
61void SerializerModule::initialize()
62{
63 signal(SIGPIPE13, SIG_IGN((__sighandler_t) 1));
64
65#ifdef DUMMY
66 m_buffer = new int[ BUF_SIZE_WORD ];
67#endif
68
69
70 if (m_shmflag != 0) {
71 char temp_char1[100] = "/cpr_config";
72 char temp_char2[100] = "/cpr_status";
73 shmOpen(temp_char1, temp_char2);
74 // Status format : status_flag
75 m_cfg_buf = shmGet(m_shmfd_cfg, 4);
76 m_cfg_sta = shmGet(m_shmfd_sta, 4);
77 m_cfg_sta[ 0 ] = 1; // Status bit is 1 : ready before accept()
78 }
79
80 // Create Message Handler
81 memset(time_array0, 0, sizeof(time_array0));
82 memset(time_array1, 0, sizeof(time_array1));
83 memset(time_array2, 0, sizeof(time_array2));
84
85 RunInfoBuffer& status(DeSerializerModule::getStatus());
86 if (status.isAvailable()) {
87 status.setOutputNBytes(0);
88 status.setOutputCount(0);
89 }
90
91 Accept();
92
93#ifdef NONSTOP
94 openRunPauseNshm();
95#endif
96
97 B2INFO("Tx initialized.")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "Tx initialized."; Belle2::LogSystem::Instance
().sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Info, std
::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 97, 0)); }; } } while(false)
;
98}
99
100
101
102void SerializerModule::beginRun()
103{
104 B2INFO("beginRun called.")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "beginRun called."; Belle2::LogSystem::Instance
().sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Info, std
::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 104, 0)); }; } } while(false)
;
105}
106
107
108
109
110void SerializerModule::endRun()
111{
112 //fill Run data
113 B2INFO("endRun done.")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "endRun done."; Belle2::LogSystem::Instance
().sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Info, std
::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 113, 0)); }; } } while(false)
;
114}
115
116
117void SerializerModule::terminate()
118{
119 B2INFO("terminate called")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "terminate called"; Belle2::LogSystem::Instance
().sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Info, std
::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 119, 0)); }; } } while(false)
;
120}
121
122
123
124//
125// User defined functions
126//
127
128
129
130int* SerializerModule::shmGet(int fd, int size_words)
131{
132 int offset = 0;
133 return (int*)mmap(NULL__null, size_words * sizeof(int), PROT_READ0x1 | PROT_WRITE0x2, MAP_SHARED0x01, fd, offset);
134}
135
136void SerializerModule::shmOpen(char* path_cfg, char* path_sta)
137{
138 errno(*__errno_location ()) = 0;
139 m_shmfd_cfg = shm_open(path_cfg, O_RDWR02, 0666);
140 if (m_shmfd_cfg < 0) {
141 char err_buf[500];
142 sprintf(err_buf, "[FATAL] Failed to shm_open (%s). Exiting... : path %s\n",
143 strerror(errno(*__errno_location ())), path_cfg);
144 print_err.PrintError(err_buf, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__144);
145 sleep(1234567);
146 exit(1);
147 }
148
149 m_shmfd_sta = shm_open(path_sta, O_RDWR02, 0666);
150 if (m_shmfd_sta < 0) {
151 char err_buf[500];
152 sprintf(err_buf, "[FATAL] Failed to shm_open (%s). Exiting... : path %s\n",
153 strerror(errno(*__errno_location ())), path_sta);
154 print_err.PrintError(err_buf, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__154);
155 sleep(1234567);
156 exit(1);
157 }
158
159 int size = 4 * sizeof(int);
160 ftruncate(m_shmfd_cfg, size);
161 ftruncate(m_shmfd_sta, size);
162}
163
164
165
166void SerializerModule::fillSendHeaderTrailer(SendHeader* hdr, SendTrailer* trl,
167 RawDataBlock* rawdblk)
168{
169
170 int total_send_nwords =
171 hdr->GetHdrNwords() +
172 rawdblk->TotalBufNwords() +
173 // rawhdr.GetNwords() +
174 trl->GetTrlNwords();
175
176 hdr->SetNwords(total_send_nwords);
177 hdr->SetNumEventsinPacket(rawdblk->GetNumEvents());
178 hdr->SetNumNodesinPacket(rawdblk->GetNumNodes());
179
180 //
181 // For bug check
182 //
183 if (rawdblk->GetNumEntries() == 1) {
184 if (total_send_nwords != (rawdblk->GetBuffer(0))[ 0 ] + 8) {
185 char err_buf[500];
186 sprintf(err_buf, "[FATAL] Length error. total length %d rawdblk length %d. Exting...\n",
187 total_send_nwords, (rawdblk->GetBuffer(0))[ 0 ]);
188 printData(rawdblk->GetBuffer(0), rawdblk->TotalBufNwords());
189 print_err.PrintError(err_buf, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__189);
190 sleep(1234567);
191 exit(-1);
192 }
193 }
194
195
196 for (int i = 0; i < rawdblk->GetNumEntries(); i++) {
197
198 //copy event # from a tonp COPPER block
199 if (!(rawdblk->CheckFTSWID(i)) && !(rawdblk->CheckTLUID(i))) {
200 tmp_header.SetBuffer(rawdblk->GetBuffer(i));
201 hdr->SetEventNumber(tmp_header.GetEveNo());
202 hdr->SetNodeID(tmp_header.GetNodeID());
203 hdr->SetExpRunWord(tmp_header.GetExpRunSubrun());
204 break;
205 }
206
207 //Error if you cannot find any COPPER block
208 if (i == (rawdblk->GetNumEntries() - 1)) {
209 printf("[DEBUG] i= %d : num entries %d : Tot words %d\n", i, rawdblk->GetNumEntries(), rawdblk->TotalBufNwords());
210 printData(rawdblk->GetBuffer(0), rawdblk->TotalBufNwords());
211
212 char err_buf[500] = "[FATAL] CORRUPTED DATA: No COPPER blocks in RawDataBlock. Exiting...";
213 print_err.PrintError(err_buf, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__213);
214 sleep(1234567);
215 exit(-1);
216 }
217 }
218 return;
219}
220
221
222int SerializerModule::sendByWriteV(RawDataBlock* rawdblk)
223{
224 SendHeader send_header;
225 SendTrailer send_trailer;
226 fillSendHeaderTrailer(&send_header, &send_trailer, rawdblk);
227
228 enum {
229 NUM_BUFFER = 3
230 };
231 struct iovec iov[ NUM_BUFFER ];
232
233 // check Body data size
234 int rawcopper_nwords = rawdblk->TotalBufNwords();
235
236 //Fill iov info.
237 iov[0].iov_base = (char*)send_header.GetBuffer();
238 iov[0].iov_len = sizeof(int) * send_header.GetHdrNwords();
239
240 iov[1].iov_base = (char*)rawdblk->GetWholeBuffer();
241 iov[1].iov_len = sizeof(int) * rawcopper_nwords;
242
243 iov[2].iov_base = (char*)send_trailer.GetBuffer();
244 iov[2].iov_len = sizeof(int) * send_trailer.GetTrlNwords();
245
246
247 // Send Multiple buffers
248 int n = 0;
249
250 while (true) {
251 if ((n = writev(m_socket, iov, NUM_BUFFER)) < 0) {
252 if (errno(*__errno_location ()) == EINTR4) {
253 continue;
254 } else if (errno(*__errno_location ()) == EAGAIN11 || errno(*__errno_location ()) == EWOULDBLOCK11) {
255
256#ifdef NONSTOP
257 // check run-pause request
258 string err_str;
259 callCheckRunPause(err_str);
260#endif
261 continue;
262 } else {
263 char err_buf[500];
264 sprintf(err_buf, "[WARNING] WRITEVa error.(%s) : sent %d bytes, header %lu bytes body %lu trailer %lu : %s %s %d\n",
265 strerror(errno(*__errno_location ())), n, iov[0].iov_len, iov[1].iov_len, iov[2].iov_len,
266 __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__266);
267#ifdef NONSTOP
268 g_run_error = 1;
269 B2ERROR(err_buf)do { { LogVariableStream varStream; varStream << err_buf
; Belle2::LogSystem::Instance().sendMessage(Belle2::LogMessage
(Belle2::LogConfig::c_Error, std::move(varStream), "daq", __PRETTY_FUNCTION__
, "daq/rawdata/modules/src/Serializer.cc", 269, 0)); } } while
(false)
;
270 string err_str = "RUN_ERROR";
271 throw (err_str); // Go to DeSerializer** and wait for run-resume.
272#else
273 print_err.PrintError(err_buf, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__273);
274 exit(1);
275#endif
276 }
277 }
278 break;
279 }
280
281#ifdef DEBUG
282 printf("[DEBUG] *******BODY**********\n");
283 printf("[DEBUG] \n%.8d : ", 0);
284 printData((int*)(iov[1].iov_base), iov[1].iov_len);
285#endif
286
287
288 int total_send_bytes = sizeof(int) * send_header.GetTotalNwords();
289
290
291 //
292 // Retry sending
293 //
294 if (n != total_send_bytes) {
295 B2WARNING("Serializer: Sent byte(" << n << "bytes) is not same as the event size (" << total_send_bytes << "bytes). Retryring...")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Warning, 0, "daq")) { { LogVariableStream varStream
; varStream << "Serializer: Sent byte(" << n <<
"bytes) is not same as the event size (" << total_send_bytes
<< "bytes). Retryring..."; Belle2::LogSystem::Instance
().sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Warning
, std::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 295, 0)); }; } } while(false)
;
296 double retry_start = getTimeSec();
297
298 // Send Header
299 if (n < (int)(iov[ 0 ].iov_len)) {
300 n += Send(m_socket, (char*)iov[ 0 ].iov_base + n, iov[ 0 ].iov_len - n);
301 }
302
303 if (n < (int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len)) {
304 n += Send(m_socket, (char*)iov[ 1 ].iov_base + (n - iov[ 0 ].iov_len), iov[ 1 ].iov_len - (n - iov[ 0 ].iov_len));
305 }
306
307 if (n < (int)(iov[ 0 ].iov_len + iov[ 1 ].iov_len + iov[ 2 ].iov_len)) {
308 n += Send(m_socket, (char*)iov[ 2 ].iov_base + (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len),
Value stored to 'n' is never read
309 iov[ 2 ].iov_len - (n - iov[ 0 ].iov_len - iov[ 1 ].iov_len));
310 }
311
312 double retry_end = getTimeSec();
313 B2WARNING("Resending ends. It takes " << retry_end - retry_start << "(s)")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Warning, 0, "daq")) { { LogVariableStream varStream
; varStream << "Resending ends. It takes " << retry_end
- retry_start << "(s)"; Belle2::LogSystem::Instance().
sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Warning, std
::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 313, 0)); }; } } while(false)
;
314 }
315
316 return total_send_bytes;
317
318}
319
320
321
322int SerializerModule::Send(int socket, char* buf, int size_bytes)
323{
324 int sent_bytes = 0;
325 while (true) {
326 int ret = 0;
327 if ((ret = send(socket, buf + sent_bytes, size_bytes - sent_bytes, MSG_NOSIGNALMSG_NOSIGNAL)) < 0) {
328 if (errno(*__errno_location ()) == EINTR4) {
329 continue;
330 } else if (errno(*__errno_location ()) == EAGAIN11 || errno(*__errno_location ()) == EWOULDBLOCK11) {
331#ifdef NONSTOP
332 string err_str;
333 callCheckRunPause(err_str);
334#endif
335 continue;
336 } else {
337 char err_buf[500];
338 sprintf(err_buf, "[ERROR] Send Error. (%s) : %s %s %d", strerror(errno(*__errno_location ())), __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__338);
339#ifdef NONSTOP
340 g_run_error = 1;
341 B2ERROR(err_buf)do { { LogVariableStream varStream; varStream << err_buf
; Belle2::LogSystem::Instance().sendMessage(Belle2::LogMessage
(Belle2::LogConfig::c_Error, std::move(varStream), "daq", __PRETTY_FUNCTION__
, "daq/rawdata/modules/src/Serializer.cc", 341, 0)); } } while
(false)
;
342 string err_str = "RUN_ERROR";
343 throw (err_str);
344#else
345 print_err.PrintError(err_buf, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__345);
346 exit(1);
347#endif
348 }
349 }
350 sent_bytes += ret;
351 if (sent_bytes == size_bytes) break;
352 }
353 return sent_bytes;
354}
355
356void SerializerModule::Accept()
357{
358
359 //
360 // Connect to cprtb01
361 //
362
363 struct hostent* host;
364 host = gethostbyname(m_hostname_local.c_str());
365 if (host == NULL__null) {
366 char temp_buf[500];
367 sprintf(temp_buf, "[FATAL] hostname(%s) cannot be resolved(%s). Check /etc/hosts. Exiting...\n",
368 m_hostname_local.c_str(), strerror(errno(*__errno_location ())));
369 print_err.PrintError(temp_buf, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__369);
370 sleep(1234567);
371 exit(1);
372 }
373
374
375
376
377 //
378 // Bind and listen
379 //
380 int fd_listen;
381 struct sockaddr_in sock_listen;
382 sock_listen.sin_family = AF_INET2;
383 sock_listen.sin_addr.s_addr = *(unsigned int*)host->h_addr_list[0];
384
385 socklen_t addrlen = sizeof(sock_listen);
386 sock_listen.sin_port = htons(m_port_to)__bswap_16 (m_port_to);
387 fd_listen = socket(PF_INET2, SOCK_STREAMSOCK_STREAM, 0);
388
389 int flags = 1;
390 int ret = setsockopt(fd_listen, SOL_SOCKET1, SO_REUSEADDR2, &flags, (socklen_t)sizeof(flags));
391 if (ret < 0) {
392 perror("Failed to set REUSEADDR");
393 }
394
395 if (bind(fd_listen, (struct sockaddr*)&sock_listen, sizeof(struct sockaddr)) < 0) {
396 char temp_char[500];
397 sprintf(temp_char, "[FATAL] Failed to bind.(%s) Maybe other programs have already occupied this port(%d). Exiting...",
398 strerror(errno(*__errno_location ())), m_port_to);
399 print_err.PrintError(temp_char, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__399);
400 exit(1);
401 }
402
403 int val1 = 0;
404 setsockopt(fd_listen, IPPROTO_TCPIPPROTO_TCP, TCP_NODELAY1, &val1, (socklen_t)sizeof(val1));
405 int backlog = 1;
406 if (listen(fd_listen, backlog) < 0) {
407 char err_buf[500];
408 sprintf(err_buf, "[FATAL] Failed in listen(%s). Exting...", strerror(errno(*__errno_location ())));
409 print_err.PrintError(err_buf, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__409);
410 exit(-1);
411 }
412
413 //
414 // Accept
415 //
416 int fd_accept;
417 struct sockaddr_in sock_accept;
418 // printf( "[DEBUG] Accepting... : port %d server %s\n", m_port_to, m_hostname_local.c_str());
419 // fflush(stderr);
420 // B2INFO("Accepting... : port " << m_port_to << " server " << m_hostname_local.c_str() );
421 B2INFO("Accepting...")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "Accepting..."; Belle2::LogSystem::Instance
().sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Info, std
::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 421, 0)); }; } } while(false)
;
422 if ((fd_accept = accept(fd_listen, (struct sockaddr*) & (sock_accept), &addrlen)) == 0) {
423 char err_buf[500];
424 sprintf(err_buf, "[FATAL] Failed to accept(%s). Exiting...", strerror(errno(*__errno_location ())));
425 print_err.PrintError(err_buf, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__425);
426 exit(-1);
427 } else {
428 // B2INFO("Connection is established: port " << htons(sock_accept.sin_port) << " address " << sock_accept.sin_addr.s_addr );
429 B2INFO("Done.")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "Done."; Belle2::LogSystem::Instance().sendMessage
(Belle2::LogMessage(Belle2::LogConfig::c_Info, std::move(varStream
), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 429, 0)); }; } } while(false)
;
430
431 // set timepout option
432 struct timeval timeout;
433 timeout.tv_sec = 1;
434 timeout.tv_usec = 0;
435 ret = setsockopt(fd_accept, SOL_SOCKET1, SO_SNDTIMEO21, &timeout, (socklen_t)sizeof(timeout));
436 if (ret < 0) {
437 char temp_char[100] = "[FATAL] Failed to set TIMEOUT. Exiting...";
438 print_err.PrintError(temp_char, __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__438);
439 exit(-1);
440 }
441 }
442 close(fd_listen);
443
444 // int flag = 1;
445 // ret = setsockopt(fd_accept, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag) );
446 m_socket = fd_accept;
447 RunInfoBuffer& status(DeSerializerModule::getStatus());
448 if (status.isAvailable()) {
449 //status.setOutputPort(ntohs(sock_accept.sin_port));
450 //status.setOutputAddress(sock_accept.sin_addr.s_addr);
451 status.setOutputPort(ntohs(sock_listen.sin_port)__bswap_16 (sock_listen.sin_port));
452 status.setOutputAddress(sock_listen.sin_addr.s_addr);
453 printf("%d %x\n", (int)ntohs(sock_listen.sin_port)__bswap_16 (sock_listen.sin_port), (int)sock_listen.sin_addr.s_addr);
454 }
455
456 return;
457
458}
459
460double SerializerModule::getTimeSec()
461{
462 struct timeval t;
463 gettimeofday(&t, NULL__null);
464 return (t.tv_sec + t.tv_usec * 1.e-6);
465}
466
467
468void SerializerModule::recordTime(int event, double* array)
469{
470 if (event >= 50000 && event < 50500) {
471 array[ event - 50000 ] = getTimeSec() - m_start_time;
472 }
473 return;
474}
475
476
477unsigned int SerializerModule::calcXORChecksum(int* buf, int nwords)
478{
479 unsigned int checksum = 0;
480 for (int i = 0; i < nwords; i++) {
481 checksum = checksum ^ buf[ i ];
482 }
483 return checksum;
484}
485
486
487void SerializerModule::printData(int* buf, int nwords)
488{
489 printf("[DEBUG]");
490 for (int i = 0; i < nwords; i++) {
491 printf("%.8x ", buf[ i ]);
492 if (i % 10 == 9) printf("\n[DEBUG]");
493 }
494 printf("\n[DEBUG]");
495 printf("\n");
496 return;
497}
498
499
500#ifdef NONSTOP
501void SerializerModule::openRunPauseNshm()
502{
503 char path_shm[100] = "/cpr_pause_resume";
504 int fd = shm_open(path_shm, O_RDONLY00, 0666);
505 if (fd < 0) {
506 printf("[DEBUG] %s\n", path_shm);
507 perror("[FATAL] Failed to open shm_open");
508 exit(1);
509 }
510 m_ptr = (int*)mmap(NULL__null, sizeof(int), PROT_READ0x1, MAP_SHARED0x01, fd, 0);
511 return;
512}
513
514int SerializerModule::checkRunPause()
515{
516
517#ifdef NONSTOP_SLC
518 const RunInfoBuffer& status(DeSerializerModule::getStatus());
519 if (status.getState() == status.PAUSING) {
520#else
521 if (*m_ptr) {
522#endif
523 return 1;
524 } else {
525 return 0;
526 }
527}
528
529void SerializerModule::resumeRun()
530{
531 if (CheckConnection(m_socket) < 0) Accept();
532 g_run_resuming = 0; // run_resuming phase is over.
533 return;
534}
535
536
537int SerializerModule::CheckConnection(int socket)
538{
539 // Modify Yamagata-san's eb/iseof.cc
540 int ret;
541 char buffer[1000];
542 while (true) {
543 //
544 // Extract data in the socket buffer of a peer
545 //
546 // ret = recv( socket, buffer, sizeof(buffer), MSG_PEEK|MSG_DONTWAIT );
547 ret = recv(socket, buffer, sizeof(buffer), MSG_DONTWAITMSG_DONTWAIT);
548 switch (ret) {
549 case 0: /* EOF */
550 printf("EOF %d\n", socket); fflush(stdoutstdout);
551 close(socket);
552 return -1;
553 case -1:
554 if (errno(*__errno_location ()) == EAGAIN11) {
555 printf("EAGAIN %d\n", socket); fflush(stdoutstdout);
556 /* not EOF, no data in queue */
557 return 0;
558 } else {
559 printf("ERROR %d errno %d err %s\n", socket, errno(*__errno_location ()), strerror(errno(*__errno_location ()))); fflush(stdoutstdout);
560 close(socket);
561 return -1;
562 }
563 break;
564 default:
565 printf("Flushing data in socket buffer (%d bytes) : sockid = %d\n", ret, socket); fflush(stdoutstdout);
566 }
567 }
568}
569
570void SerializerModule::callCheckRunPause(string& err_str)
571{
572#ifdef NONSTOP_DEBUG
573 printf("\033[34m");
574 printf("###########(Ser) TIMEOUT during send() ###############\n");
575 fflush(stdoutstdout);
576 printf("\033[0m");
577#endif
578 if (checkRunPause()) {
579#ifdef NONSTOP_DEBUG
580 printf("\033[31m");
581 printf("###########(Ser) Stop is detected after return from send ###############\n");
582 fflush(stdoutstdout);
583 printf("\033[0m");
584#endif
585 err_str = "RUN_PAUSE";
586 g_run_pause = 1;
587 throw (err_str);
588 }
589 return;
590}
591
592#endif
593
594
595void SerializerModule::event()
596{
597
598#ifdef NONSTOP
599 if (g_run_pause == 1) {
600#ifdef NONSTOP_DEBUG
601 printf("\033[31m");
602 printf("###########(Ser) Go back to Deseializer() ###############\n");
603 fflush(stdoutstdout);
604 printf("\033[0m");
605#endif
606 return; // Nothing to do here
607 } else if (g_run_resuming == 1) {
608#ifdef NONSTOP_DEBUG
609 printf("\033[31m");
610 printf("###########(Ser) Run resuming...() ###############\n");
611 fflush(stdoutstdout);
612 printf("\033[0m");
613#endif
614 resumeRun();
615 return;
616 }
617#endif
618
619 if (m_start_flag == 0) {
620 m_start_time = getTimeSec();
621 n_basf2evt = 0;
622 }
623
624#ifdef TIME_MONITOR
625 recordTime(n_basf2evt, time_array0);
626#endif
627
628 // StoreArray<RawCOPPER> rawcprarray;
629 StoreArray<RawDataBlock> raw_dblkarray;
630
631
632 for (int j = 0; j < raw_dblkarray.getEntries(); j++) {
633 //
634 // Send data
635 //
636 if (m_start_flag == 0) {
637 B2INFO("SerializerPC: Sending the 1st packet...")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "SerializerPC: Sending the 1st packet..."
; Belle2::LogSystem::Instance().sendMessage(Belle2::LogMessage
(Belle2::LogConfig::c_Info, std::move(varStream), "daq", __PRETTY_FUNCTION__
, "daq/rawdata/modules/src/Serializer.cc", 637, 0)); }; } } while
(false)
;
638 }
639
640 try {
641 m_totbytes += sendByWriteV(raw_dblkarray[ j ]);
642 // } catch (string err_str) {
643 } catch (const string& err_str) {
644
645#ifdef NONSTOP
646 if (err_str == "RUN_PAUSE" || err_str == "RUN_ERROR") {
647 return; // Go to DeSerializer***() to wait for run-resume.
648 }
649#endif
650 print_err.PrintError((char*)(err_str.c_str()), __FILE__"daq/rawdata/modules/src/Serializer.cc", __PRETTY_FUNCTION__, __LINE__650);
651 exit(1);
652 }
653 if (m_start_flag == 0) {
654 B2INFO("Done. ")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "Done. "; Belle2::LogSystem::Instance().
sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Info, std
::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/rawdata/modules/src/Serializer.cc"
, 654, 0)); }; } } while(false)
;
655 m_start_flag = 1;
656 }
657 }
658
659
660 //
661 // Print current status
662 //
663 if (n_basf2evt % 1000 == 0) {
664 // double cur_time = getTimeSec();
665 // double total_time = cur_time - m_start_time;
666 // double interval = cur_time - m_prev_time;
667 // if (n_basf2evt != 0) {
668 // double multieve = (1. / interval);
669 // if (multieve > 2.) multieve = 2.;
670 // }
671 // time_t timer;
672 // struct tm* t_st;
673 // time(&timer);
674 // t_st = localtime(&timer);
675 // printf( "[DEBUG] Event %d TotSent %.1lf [MB] ElapsedTime %.1lf [s] RcvdRate %.2lf [MB/s] %s",
676 // n_basf2evt, m_totbytes / 1.e6, total_time, (m_totbytes - m_prev_totbytes) / interval / 1.e6, asctime(t_st));
677 // fflush(stderr);
678 // m_prev_time = cur_time;
679 // m_prev_totbytes = m_totbytes;
680 // m_prev_nevt = n_basf2evt;
681 }
682 n_basf2evt++;
683 RunInfoBuffer& status(DeSerializerModule::getStatus());
684 if (status.isAvailable()) {
685 status.setOutputNBytes(m_totbytes);
686 status.addOutputCount(raw_dblkarray.getEntries());
687 }
688
689}