Bug Summary

File:daq/storage/tools/storagerecord.cc
Warning:line 326, column 9
Value stored to 'isnew' is never read

Annotated Source Code

Press '?' to see keyboard shortcuts

clang -cc1 -cc1 -triple x86_64-unknown-linux-gnu -O3 -analyze -disable-free -clear-ast-before-backend -disable-llvm-verifier -discard-value-names -main-file-name storagerecord.cc -analyzer-checker=core -analyzer-checker=apiModeling -analyzer-checker=unix -analyzer-checker=deadcode -analyzer-checker=cplusplus -analyzer-checker=security.insecureAPI.UncheckedReturn -analyzer-checker=security.insecureAPI.getpw -analyzer-checker=security.insecureAPI.gets -analyzer-checker=security.insecureAPI.mktemp -analyzer-checker=security.insecureAPI.mkstemp -analyzer-checker=security.insecureAPI.vfork -analyzer-checker=nullability.NullPassedToNonnull -analyzer-checker=nullability.NullReturnedFromNonnull -analyzer-output plist -w -setup-static-analyzer -mrelocation-model pic -pic-level 2 -pic-is-pie -mframe-pointer=none -fmath-errno -ffp-contract=on -fno-rounding-math -mconstructor-aliases -funwind-tables=2 -target-cpu x86-64 -tune-cpu generic -debugger-tuning=gdb -fdebug-compilation-dir=/data/b2soft/buildbot/development/build -fcoverage-compilation-dir=/data/b2soft/buildbot/development/build -resource-dir /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/lib/clang/21 -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/c++ -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/c++/x86_64-redhat-linux -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/c++/backward -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/include -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/python3.12 -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/include/CLHEP -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/Geant4 -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/include/root -isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/include/belle_legacy -I include/ -D _PACKAGE_="daq" -D G4UI_USE_TCSH -D RaveDllExport= -D HAS_SQLITE -D HAS_CALLGRIND -I include -I /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/include/libxml2 -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/bin/../lib64/gcc/x86_64-redhat-linux/15.2.0/../../../../include/c++ -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/bin/../lib64/gcc/x86_64-redhat-linux/15.2.0/../../../../include/c++/x86_64-redhat-linux -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/bin/../lib64/gcc/x86_64-redhat-linux/15.2.0/../../../../include/c++/backward -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/lib/clang/21/include -internal-isystem /usr/local/include -internal-isystem /cvmfs/belle.cern.ch/el9/externals/v02-04-00/Linux_x86_64/common/bin/../lib64/gcc/x86_64-redhat-linux/15.2.0/../../../../x86_64-redhat-linux/include -internal-externc-isystem /include -internal-externc-isystem /usr/include -Wno-missing-braces -Wno-unused-command-line-argument -std=c++20 -fdeprecated-macro -ferror-limit 19 -fgnuc-version=4.2.1 -fno-implicit-modules -fskip-odr-check-in-gmf -fcxx-exceptions -fexceptions -vectorize-loops -vectorize-slp -analyzer-output=html -faddrsig -D__GCC_HAVE_DWARF2_CFI_ASM=1 -o /scan_build/2026-05-31-004316-385593-1 -x c++ daq/storage/tools/storagerecord.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <unistd.h>
9#include <cstdlib>
10#include <sys/stat.h>
11#include <fcntl.h>
12
13#include <framework/logging/Logger.h>
14#include <framework/pcore/EvtMessage.h>
15
16#include <daq/storage/SharedEventBuffer.h>
17
18#include <daq/slc/psql/PostgreSQLInterface.h>
19
20#include <daq/slc/database/DBHandlerException.h>
21
22#include <daq/slc/readout/RunInfoBuffer.h>
23
24#include <daq/slc/base/ConfigFile.h>
25#include <daq/slc/base/Date.h>
26#include <daq/slc/base/StringUtil.h>
27
28#include <cstdio>
29#include <cstring>
30#include <fstream>
31#include <iostream>
32#include <signal.h>
33#include <sys/statvfs.h>
34#include <zlib.h>
35
36using namespace Belle2;
37
38const unsigned long long GB = 1000 * 1024 * 1024;
39const unsigned long long MAX_FILE_SIZE = 8 * GB;
40const char* g_table = "datafiles";
41unsigned int g_streamersize = 0;
42char* g_streamerinfo = new char[1000000];
43int g_diskid = 0;
44std::string g_file_diskid;
45int g_runno = 0;
46int g_expno = 0;
47bool g_is_arich = false;
48
49class FileHandler {
50
51public:
52 FileHandler(DBInterface* db, const std::string& runtype,
53 const char* host, const char* dbtmp)
54 : m_db(db), m_runtype(runtype), m_host(host), m_dbtmp(dbtmp)
55 {
56 m_file = -1;
57 m_id = -1;
58 m_filesize = 0;
59 m_fileid = 0;
60 m_diskid = 1;
61 m_expno = -1;
62 m_runno = -1;
63 m_nevents = 0;
64 m_chksum = 0;
65 }
66 ~FileHandler() throw()
67 {
68 }
69
70public:
71 int getDiskId() { return m_diskid; }
72 int getFileId() { return m_fileid; }
73
74public:
75 int open(const std::string& dir, int ndisks, int expno, int runno, int fileid)
76 {
77 m_filesize = 0;
78 m_fileid = 0;
79 m_diskid = 1;
80 m_chksum = 1;
81 m_nevents = 0;
82 m_filesize = 0;
83 m_expno = (g_expno > 0) ? g_expno : expno;
84 m_runno = (g_runno > 0) ? g_runno : runno;
85 m_fileid = fileid;
86 bool available = false;
87 char filename[1024];
88 struct statvfs statfs;
89 if (g_diskid > 0) {
90 m_diskid = g_diskid;
91 available = true;
92 } else {
93 {
94 int diskid = 1;
95 std::ifstream fin(g_file_diskid.c_str());
96 fin >> diskid;
97 m_diskid = diskid;
98 }
99 for (int i = 0; i < ndisks; i++) {
100 sprintf(filename, "%s%02d", dir.c_str(), m_diskid);
101 statvfs(filename, &statfs);
102 float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
103 sprintf(filename, "%s%02d/storage/full_flag", dir.c_str(), m_diskid);
104 std::ifstream fin(filename);
105 int flag = 0;
106 fin >> flag;
107 fin.close();
108 /*
109 if (usage < 0.1) {
110 fin.close();
111 //::unlink(filename);
112 if (flag == 1) {
113 std::cout << "[DEBUG] disk : " << m_diskid << " is available again (full_flag removed)" << std::endl;
114 }
115 available = true;
116 std::cout << "[DEBUG] disk : " << m_diskid << " is available" << std::endl;
117 break;
118 } else
119 */
120 if (usage < 0.7) {
121 if (flag != 1) {
122 available = true;
123 std::cout << "[DEBUG] disk : " << m_diskid << " is available" << std::endl;
124 break;
125 }
126 std::cout << "[DEBUG] disk : " << m_diskid << " is with full_flag" << std::endl;
127 } else {
128 std::ofstream fout(filename);
129 fout << 1;
130 fout.close();
131 B2WARNING("disk-" << m_diskid << " is full " << usage)do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Warning, 0, "daq")) { { LogVariableStream varStream
; varStream << "disk-" << m_diskid << " is full "
<< usage; Belle2::LogSystem::Instance().sendMessage(Belle2
::LogMessage(Belle2::LogConfig::c_Warning, std::move(varStream
), "daq", __PRETTY_FUNCTION__, "daq/storage/tools/storagerecord.cc"
, 131, 0)); }; } } while(false)
;
132 }
133 m_diskid++;
134 if (m_diskid > ndisks) m_diskid = 1;
135 }
136 }
137 if (!available) {
138 B2FATAL("No disk available for writing " << __FILE__ << ":" << __LINE__)do { { LogVariableStream varStream; varStream << "No disk available for writing "
<< "daq/storage/tools/storagerecord.cc" << ":" <<
138; Belle2::LogSystem::Instance().sendMessage(Belle2::LogMessage
(Belle2::LogConfig::c_Fatal, std::move(varStream), "daq", __PRETTY_FUNCTION__
, "daq/storage/tools/storagerecord.cc", 138, 0)); }; exit(1);
} while(false)
;
139 exit(1);
140 }
141 std::string filedir = dir + StringUtil::form("%02d/storage/%4.4d/%5.5d/", m_diskid, expno, runno);
142 system(("mkdir -p " + filedir).c_str());
143 m_filename = StringUtil::form("%s.%4.4d.%5.5d.%s.f%5.5d.sroot",
144 m_runtype.c_str(), expno, runno, m_host.c_str(), m_fileid);
145 m_path = filedir + m_filename;
146 m_file = ::open(m_path.c_str(), O_WRONLY01 | O_CREAT0100 | O_EXCL0200, 0664);
147 g_diskid = m_diskid;
148 std::ofstream fout(g_file_diskid.c_str());
149 fout << g_diskid;
150 if (m_file < 0) {
151 B2FATAL("Failed to open file : " << m_path)do { { LogVariableStream varStream; varStream << "Failed to open file : "
<< m_path; Belle2::LogSystem::Instance().sendMessage(Belle2
::LogMessage(Belle2::LogConfig::c_Fatal, std::move(varStream)
, "daq", __PRETTY_FUNCTION__, "daq/storage/tools/storagerecord.cc"
, 151, 0)); }; exit(1); } while(false)
;
152 exit(1);
153 }
154 try {
155 m_db->connect();
156 m_db->execute("insert into %s (name, path, host, label, expno, runno, fileno, nevents, chksum, size) "
157 "values ('%s', '%s', '%s', '%s', %d, %d, %d, 0, 0, 0);",
158 g_table, m_filename.c_str(), m_path.c_str(), m_host.c_str(),
159 m_runtype.c_str(), m_expno, m_runno, m_fileid);
160 } catch (const DBHandlerException& e) {
161 B2WARNING(e.what())do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Warning, 0, "daq")) { { LogVariableStream varStream
; varStream << e.what(); Belle2::LogSystem::Instance().
sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Warning, std
::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/storage/tools/storagerecord.cc"
, 161, 0)); }; } } while(false)
;
162 }
163 write(g_streamerinfo, g_streamersize, true);
164 B2INFO("New file " << m_path << " is opened")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Info, 0, "daq")) { { LogVariableStream varStream
; varStream << "New file " << m_path << " is opened"
; Belle2::LogSystem::Instance().sendMessage(Belle2::LogMessage
(Belle2::LogConfig::c_Info, std::move(varStream), "daq", __PRETTY_FUNCTION__
, "daq/storage/tools/storagerecord.cc", 164, 0)); }; } } while
(false)
;
165
166 return m_id;
167 }
168
169 void close()
170 {
171 if (m_file > 0) {
172 std::cout << "[DEBUG] File closed" << std::endl;
173 ::close(m_file);
174 try {
175 struct stat st;
176 stat(m_path.c_str(), &st);
177 std::string d = Date(st.st_mtimest_mtim.tv_sec).toString();
178 m_db->connect();
179 m_db->execute("update %s set time_close = '%s', chksum = %lu, nevents = %lu, "
180 "size = %lu where name = '%s' and host = '%s';",
181 g_table, d.c_str(), m_chksum, m_nevents, m_filesize,
182 m_filename.c_str(), m_host.c_str());
183 } catch (const DBHandlerException& e) {
184 B2WARNING(e.what())do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Warning, 0, "daq")) { { LogVariableStream varStream
; varStream << e.what(); Belle2::LogSystem::Instance().
sendMessage(Belle2::LogMessage(Belle2::LogConfig::c_Warning, std
::move(varStream), "daq", __PRETTY_FUNCTION__, "daq/storage/tools/storagerecord.cc"
, 184, 0)); }; } } while(false)
;
185 }
186 m_db->close();
187 }
188 }
189
190 int write(char* evtbuf, int nbyte, bool isstreamer = false)
191 {
192 m_chksum = adler32(m_chksum, (unsigned char*)evtbuf, nbyte);
193 int ret = ::write(m_file, evtbuf, nbyte);
194 m_filesize += nbyte;
195 if (!isstreamer) {
196 m_nevents++;
197 }
198 return ret;
199 }
200
201 operator bool()
202 {
203 return m_file > 0;
204 }
205
206private:
207 DBInterface* m_db;
208 std::string m_runtype;
209 std::string m_host;
210 std::string m_dbtmp;
211 int m_id;
212 std::string m_filename;
213 std::string m_path;
214 int m_diskid;
215 int m_fileid;
216 int m_file;
217 std::string m_configname;
218 int m_expno;
219 int m_runno;
220 unsigned long long m_filesize;
221 unsigned long long m_chksum;
222 unsigned long long m_nevents;
223};
224
225FileHandler* g_file = NULL__null;
226
227void signalHandler(int)
228{
229 if (g_file) g_file->close();
230 exit(1);
231}
232
233int main(int argc, char** argv)
234{
235 if (argc < 8) {
236 printf("%s : <ibufname> <ibufsize> <hostname> <runtype> <path> <ndisk> "
237 "<filepath_dbtmp> [<obufname> <obufsize> nodename, nodeid]\n", argv[0]);
238 return 1;
239 }
240 const unsigned interval = 1;
241 const char* ibufname = argv[1];
242 const int ibufsize = atoi(argv[2]);
243 const char* hostname = argv[3];
244 const char* runtype = argv[4];
245 const bool not_record = std::string(runtype).find(std::string("null")) != std::string::npos;
246 const char* path = argv[5];
247 const int ndisks = atoi(argv[6]);
248 const char* file_dbtmp = argv[7];
249 const char* obufname = argv[8];
250 const int obufsize = (argc > 8) ? atoi(argv[9]) : -1;
251 const char* nodename = (argc > 9) ? argv[10] : "";
252 g_is_arich = StringUtil::find(runtype, "arich");
253 const int nodeid = (argc > 10) ? atoi(argv[11]) : -1;
254 const unsigned int ninput = (argc > 11) ? atoi(argv[12]) : 1;
255 g_file_diskid = StringUtil::form("/tmp/%s_diskid", nodename);
256
257 RunInfoBuffer info;
258 const bool use_info = nodeid >= 0;
259 if (use_info) {
260 info.open(nodename, nodeid);
261 }
262 SharedEventBuffer ibuf[10];
263 for (unsigned int ib = 0; ib < ninput; ib++) {
264 ibuf[ib].open(StringUtil::form("%s_%d", ibufname, ib), ibufsize * 1000000);//, true);
265 }
266 signal(SIGINT2, signalHandler);
267 signal(SIGKILL9, signalHandler);
268 ConfigFile config("slowcontrol");
269 PostgreSQLInterface* db = new PostgreSQLInterface(config.get("database.host"),
270 config.get("database.dbname"),
271 config.get("database.user"),
272 config.get("database.password"),
273 config.getInt("database.port"));
274 SharedEventBuffer obuf;
275 if (obufsize > 0) obuf.open(obufname, obufsize * 1000000);//, true);
276 if (use_info) info.reportReady();
277 B2DEBUG(1, "started recording.")do { if (Belle2::LogSystem::debugEnabled()) do { if (Belle2::
LogSystem::Instance().isLevelEnabled(Belle2::LogConfig::c_Debug
, 1, "daq")) { { LogVariableStream varStream; varStream <<
"started recording."; Belle2::LogSystem::Instance().sendMessage
(Belle2::LogMessage(Belle2::LogConfig::c_Debug, std::move(varStream
), "daq", __PRETTY_FUNCTION__, "daq/storage/tools/storagerecord.cc"
, 277, 1)); }; } } while(false); } while(false)
;
278 unsigned long long nbyte_out = 0;
279 unsigned int count_out = 0;
280 unsigned int expno = 0;
281 unsigned int runno = 0;
282 unsigned int subno = 0;
283 int* evtbuf = new int[10000000];
284 g_file = new FileHandler(db, runtype, hostname, file_dbtmp);
285 FileHandler& file(*g_file);
286 int ecount = 0;
287 bool newrun = false;
288 unsigned int fileid = 0;
289 struct dataheader {
290 // int nword; // unused
291 int type;
292 unsigned int expno;
293 unsigned int runno;
294 } hd;
295 g_streamersize = 0;
296 while (true) {
297 if (use_info) info.reportRunning();
298 if (g_is_arich) {
299 if (g_runno == 0) {
300 while ((g_runno = info.getRunNumber()) <= 0) {
301 usleep(500);
302 }
303 g_expno = info.getExpNumber();
304 std::cout << "[DEBUG] expno = " << g_expno << ", runno =" << g_runno << std::endl;
305 }
306 }
307 for (unsigned int ib = 0; ib < ninput; ib++) {
308 ibuf[ib].lock();
309 ibuf[ib].read((int*)&hd, true, true);
310 ibuf[ib].read(evtbuf, true, true);
311 ibuf[ib].unlock();
312 int nbyte = evtbuf[0];
313 int nword = (nbyte - 1) / 4 + 1;
314 bool isnew = false;
315 if (hd.type == MSG_STREAMERINFO) {
316 memcpy(g_streamerinfo, evtbuf, nbyte);
317 g_streamersize = nbyte;
318 }
319 if (expno > hd.expno || runno > hd.runno) {
320 B2WARNING("The old run was detected => discard event exp = " << hd.expno <<do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Warning, 0, "daq")) { { LogVariableStream varStream
; varStream << "The old run was detected => discard event exp = "
<< hd.expno << " (" << expno << "), runno"
<< hd.runno << "(" << runno << ")"; Belle2
::LogSystem::Instance().sendMessage(Belle2::LogMessage(Belle2
::LogConfig::c_Warning, std::move(varStream), "daq", __PRETTY_FUNCTION__
, "daq/storage/tools/storagerecord.cc", 321, 0)); }; } } while
(false)
321 " (" << expno << "), runno" << hd.runno << "(" << runno << ")")do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Warning, 0, "daq")) { { LogVariableStream varStream
; varStream << "The old run was detected => discard event exp = "
<< hd.expno << " (" << expno << "), runno"
<< hd.runno << "(" << runno << ")"; Belle2
::LogSystem::Instance().sendMessage(Belle2::LogMessage(Belle2
::LogConfig::c_Warning, std::move(varStream), "daq", __PRETTY_FUNCTION__
, "daq/storage/tools/storagerecord.cc", 321, 0)); }; } } while
(false)
;
322 continue;
323 }
324 if (!newrun || expno < hd.expno || runno < hd.runno) {
325 newrun = true;
326 isnew = true;
Value stored to 'isnew' is never read
327 expno = hd.expno;
328 runno = hd.runno;
329 fileid = 0;
330 if (use_info) {
331 info.setExpNumber(expno);
332 info.setRunNumber(runno);
333 info.setSubNumber(subno);
334 info.setInputCount(0);
335 info.setInputNBytes(0);
336 info.setOutputCount(0);
337 info.setOutputNBytes(0);
338 nbyte_out = 0;
339 count_out = 0;
340 }
341 obuf.lock();
342 SharedEventBuffer::Header* oheader = obuf.getHeader();
343 oheader->expno = expno;
344 oheader->runno = runno;
345 obuf.unlock();
346 if (!not_record) {
347 if (file) {
348 file.close();
349 }
350 std::string filename = StringUtil::form("%s%02d", path, g_diskid);
351 struct statvfs statfs;
352 statvfs(filename.c_str(), &statfs);
353 float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
354 if (usage > 0.9) g_diskid = 0;
355 file.open(path, ndisks, expno, runno, fileid);
356 ecount = 0;
357 nbyte_out += nbyte;
358 fileid++;
359 }
360 continue;
361 }
362 if (use_info) {
363 info.addInputCount(1);
364 info.addInputNBytes(nbyte);
365 }
366 if (hd.type == MSG_STREAMERINFO) {
367 continue;
368 }
369 if (file) {
370 if (nbyte_out > MAX_FILE_SIZE) {
371 file.close();
372 nbyte_out = 0;
373 if (g_diskid > 0) {
374 std::string filename = StringUtil::form("%s%02d", path, g_diskid);
375 struct statvfs statfs;
376 statvfs(filename.c_str(), &statfs);
377 float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
378 if (usage > 0.9) {
379 B2WARNING("disk-" << g_diskid << " is already full. Stopping Run#" << runno)do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Warning, 0, "daq")) { { LogVariableStream varStream
; varStream << "disk-" << g_diskid << " is already full. Stopping Run#"
<< runno; Belle2::LogSystem::Instance().sendMessage(Belle2
::LogMessage(Belle2::LogConfig::c_Warning, std::move(varStream
), "daq", __PRETTY_FUNCTION__, "daq/storage/tools/storagerecord.cc"
, 379, 0)); }; } } while(false)
;
380 std::cout << "[STOP=RUNCONTROL]" << std::endl;
381 continue;
382 }
383 }
384 file.open(path, ndisks, expno, runno, fileid);
385 ecount = 0;
386 fileid++;
387 }
388 file.write((char*)evtbuf, nbyte);
389 nbyte_out += nbyte;
390 if (use_info) {
391 info.addOutputCount(1);
392 info.addOutputNBytes(nbyte);
393 info.get()->reserved[0] = file.getFileId();
394 info.get()->reserved[1] = file.getDiskId();
395 info.get()->reserved_f[0] = (float)info.getOutputNBytes() / 1024. / 1024.;
396 }
397 } else {
398 if (!ecount) {
399 B2WARNING("no run was initialzed for recording : " << hd.expno << "." << hd.runno)do { if (Belle2::LogSystem::Instance().isLevelEnabled(Belle2::
LogConfig::c_Warning, 0, "daq")) { { LogVariableStream varStream
; varStream << "no run was initialzed for recording : "
<< hd.expno << "." << hd.runno; Belle2::LogSystem
::Instance().sendMessage(Belle2::LogMessage(Belle2::LogConfig
::c_Warning, std::move(varStream), "daq", __PRETTY_FUNCTION__
, "daq/storage/tools/storagerecord.cc", 399, 0)); }; } } while
(false)
;
400 }
401 ecount = 1;
402 }
403 // Dump data into output buffer
404 if (!isnew && obufsize > 0 && count_out % interval == 0 && obuf.isWritable(nword)) {
405 obuf.write(evtbuf, nword, true);
406 }
407 count_out++;
408 }
409 }
410 return 0;
411}
412