Belle II Software development
storagerecord_arich.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <sys/types.h>
9#include <sys/stat.h>
10#include <fcntl.h>
11
12#include <framework/logging/Logger.h>
13#include <framework/pcore/EvtMessage.h>
14
15#include <daq/storage/SharedEventBuffer.h>
16
17#include <daq/slc/database/DBHandlerException.h>
18
19#include <daq/slc/psql/PostgreSQLInterface.h>
20
21#include <daq/slc/readout/RunInfoBuffer.h>
22
23#include <daq/slc/base/ConfigFile.h>
24#include <daq/slc/base/Date.h>
25#include <daq/slc/base/StringUtil.h>
26
27#include <cstdio>
28#include <cstring>
29#include <fstream>
30#include <iostream>
31#include <signal.h>
32#include <sys/statvfs.h>
33#include <zlib.h>
34
35using namespace Belle2;
36
37const char* g_table = "datafiles";
38unsigned int g_streamersize = 0;
39char* g_streamerinfo = new char[1000000];
40int g_diskid = 0;
41int g_expno = 0;
42int g_runno = 0;
43
44class FileHandler {
45
46public:
47 FileHandler(DBInterface* db, const std::string& runtype,
48 const char* host, const char* dbtmp)
49 : m_db(db), m_runtype(runtype), m_host(host), m_dbtmp(dbtmp)
50 {
51 m_file = -1;
52 m_filesize = 0;
53 m_fileid = 0;
54 m_diskid = 1;
55 m_id = -1;
56 m_expno = -1;
57 m_runno = -1;
58 m_nevents = 0;
59 m_chksum = 0;
60 }
61 ~FileHandler() throw()
62 {
63 }
64
65public:
66 int getDiskId() { return m_diskid; }
67 int getFileId() { return m_fileid; }
68
69public:
70 int open(const std::string& dir, int ndisks, int /*expno*/, int /*runno*/, int fileid)
71 {
72 m_filesize = 0;
73 m_fileid = 0;
74 m_diskid = 1;
75 m_chksum = 1;
76 m_nevents = 0;
77 m_filesize = 0;
78 m_expno = g_expno;
79 m_runno = g_runno;
80 m_fileid = fileid;
81 bool available = false;
82 for (int i = 0; i < ndisks; i++) {
83 struct statvfs statfs;
84 char filename[1024];
85 sprintf(filename, "%s%02d", dir.c_str(), m_diskid);
86 statvfs(filename, &statfs);
87 float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
88 if (usage < 0.7) {
89 sprintf(filename, "%s%02d/storage/full_flag", dir.c_str(), m_diskid);
90 std::ifstream fin(filename);
91 int flag = 0;
92 fin >> flag;
93 if (flag != 1) {
94 available = true;
95 std::cout << "[DEBUG] disk : " << m_diskid << " is available" << std::endl;
96 break;
97 }
98 fin.close();
99 std::cout << "[DEBUG] disk : " << m_diskid << " is still full" << std::endl;
100 } else {
101 sprintf(filename, "%s%02d/storage/full_flag", dir.c_str(), m_diskid);
102 std::ofstream fout(filename);
103 fout << 1;
104 fout.close();
105 B2WARNING("disk-" << m_diskid << " is full " << usage);
106 }
107 m_diskid++;
108 if (m_diskid > ndisks) m_diskid = 1;
109 }
110 if (!available) {
111 B2FATAL("No disk available for writing");
112 exit(1);
113 }
114 std::string filedir = dir + StringUtil::form("%02d/storage/%4.4d/%5.5d/",
115 m_diskid, m_expno, m_runno);
116 system(("mkdir -p " + filedir).c_str());
117 m_filename = StringUtil::form("%s.%4.4d.%5.5d.%s.f%5.5d.sroot",
118 m_runtype.c_str(), m_expno, m_runno, m_host.c_str(), m_fileid);
119 m_path = filedir + m_filename;
120 m_file = ::open(m_path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0664);
121 if (g_diskid > 0 && g_diskid != m_diskid) {
122 B2FATAL("disk-" << m_diskid << " is already full! Terminating process..");
123 exit(1);
124 }
125 g_diskid = m_diskid;
126 if (m_file < 0) {
127 B2FATAL("Failed to open file : " << m_path);
128 exit(1);
129 }
130 try {
131 m_db->connect();
132 m_db->execute("insert into %s (name, path, host, label, expno, runno, fileno, nevents, chksum, size) "
133 "values ('%s', '%s', '%s', '%s', %d, %d, %d, 0, 0, 0);",
134 g_table, m_filename.c_str(), m_path.c_str(), m_host.c_str(),
135 m_runtype.c_str(), m_expno, m_runno, m_fileid);
136 } catch (const DBHandlerException& e) {
137 B2WARNING(e.what());
138 }
139 write(g_streamerinfo, g_streamersize, true);
140 B2INFO("New file " << m_path << " is opened");
141
142 return m_id;
143 }
144
145 void close()
146 {
147 if (m_file > 0) {
148 std::cout << "[DEBUG] File closed" << std::endl;
149 ::close(m_file);
150 try {
151 struct stat st;
152 stat(m_path.c_str(), &st);
153 std::string d = Date(st.st_mtime).toString();
154 //if (m_fileid == 0) m_nevents--;//1 entry for StreamerInfo
155 m_db->connect();
156 m_db->execute("update %s set time_close = '%s', chksum = %lu, nevents = %lu, "
157 "size = %lu where name = '%s' and host = '%s';",
158 g_table, d.c_str(), m_chksum, m_nevents, m_filesize,
159 m_filename.c_str(), m_host.c_str());
160 } catch (const DBHandlerException& e) {
161 B2WARNING(e.what());
162 }
163 m_db->close();
164 }
165 }
166
167 int write(char* evtbuf, int nbyte, bool isstreamer = false)
168 {
169 int ret = ::write(m_file, evtbuf, nbyte);
170 m_filesize += nbyte;
171 if (!isstreamer) {
172 m_nevents++;
173 }
174 m_chksum = adler32(m_chksum, (unsigned char*)evtbuf, nbyte);
175 return ret;
176 }
177
178 operator bool()
179 {
180 return m_file > 0;
181 }
182
183private:
184 DBInterface* m_db;
185 std::string m_runtype;
186 std::string m_host;
187 std::string m_dbtmp;
188 int m_id;
189 std::string m_filename;
190 std::string m_path;
191 int m_diskid;
192 int m_fileid;
193 int m_file;
194 std::string m_configname;
195 int m_expno;
196 int m_runno;
197 unsigned long long m_filesize;
198 unsigned long long m_chksum;
199 unsigned long long m_nevents;
200};
201
202FileHandler* g_file = NULL;
203
204void signalHandler(int)
205{
206 if (g_file) g_file->close();
207 exit(1);
208}
209
210int main(int argc, char** argv)
211{
212 if (argc < 8) {
213 printf("%s : <ibufname> <ibufsize> <hostname> <runtype> <path> <ndisk> "
214 "<filepath_dbtmp> [<obufname> <obufsize> nodename, nodeid]\n", argv[0]);
215 return 1;
216 }
217 //storagerecord STORE02:REC 10 HLT2 debug /rawdata/disk 11 /home/usr/stordaq/data/dbtmp.txt STORE02:OUT 10 store02_storagerecord 3
218 const unsigned interval = 1;
219 const char* ibufname = argv[1];
220 const int ibufsize = atoi(argv[2]);
221 const char* hostname = argv[3];
222 const char* runtype = argv[4];
223 const bool not_record = std::string(runtype).find(std::string("null")) != std::string::npos;
224 const char* path = argv[5];
225 const int ndisks = atoi(argv[6]);
226 const char* file_dbtmp = argv[7];
227 const char* obufname = argv[8];
228 const int obufsize = (argc > 8) ? atoi(argv[9]) : -1;
229 const char* nodename = (argc > 9) ? argv[10] : "";
230 const int nodeid = (argc > 10) ? atoi(argv[11]) : -1;
231 const unsigned int ninput = (argc > 11) ? atoi(argv[12]) : 1;
232
233 RunInfoBuffer info;
234 const bool use_info = nodeid >= 0;
235 if (use_info) {
236 info.open(nodename, nodeid);
237 }
238 SharedEventBuffer ibuf[10];
239 for (unsigned int ib = 0; ib < ninput; ib++) {
240 ibuf[ib].open(StringUtil::form("%s_%d", ibufname, ib), ibufsize * 1000000);//, true);
241 }
242 signal(SIGINT, signalHandler);
243 signal(SIGKILL, signalHandler);
244 ConfigFile config("slowcontrol");
245 PostgreSQLInterface* db = new PostgreSQLInterface(config.get("database.host"),
246 config.get("database.dbname"),
247 config.get("database.user"),
248 config.get("database.password"),
249 config.getInt("database.port"));
251 if (obufsize > 0) obuf.open(obufname, obufsize * 1000000);//, true);
252 if (use_info) info.reportReady();
253 B2DEBUG(1, "started recording.");
254 unsigned long long nbyte_out [[maybe_unused]] = 0;
255 unsigned int count_out = 0;
256 unsigned int expno = 0;
257 unsigned int runno = 0;
258 unsigned int subno = 0;
259 int* evtbuf = new int[10000000];
260 g_file = new FileHandler(db, runtype, hostname, file_dbtmp);
261 FileHandler& file(*g_file);
262 int ecount = 0;
263 bool newrun = false;
264 unsigned int fileid = 0;
265 struct dataheader {
266 // int nword; // unused
267 int type;
268 unsigned int expno;
269 unsigned int runno;
270 } hd;
271 g_streamersize = 0;
272 while (true) {
273 if (use_info) info.reportRunning();
274 if (g_runno == 0) {
275 while ((g_runno = info.getRunNumber()) <= 0) {
276 usleep(500);
277 }
278 g_expno = info.getExpNumber();
279 std::cout << "[DEBUG] expno = " << g_expno << ", runno =" << g_runno << std::endl;
280 }
281 for (unsigned int ib = 0; ib < ninput; ib++) {
282 ibuf[ib].lock();
283 ibuf[ib].read((int*)&hd, true, true);
284 ibuf[ib].read(evtbuf, true, true);
285 ibuf[ib].unlock();
286 int nbyte = evtbuf[0];
287 int nword = (nbyte - 1) / 4 + 1;
288 bool isnew = false;
289 if (not_record) continue;
290 if (hd.type == MSG_STREAMERINFO) {
291 memcpy(g_streamerinfo, evtbuf, nbyte);
292 g_streamersize = nbyte;
293 }
294 if (expno > hd.expno || runno > hd.runno) {
295 B2WARNING("Old run was detected => discard event exp = " << hd.expno << " (" << expno << "), runno" << hd.runno << "(" << runno <<
296 ")");
297 continue;
298 }
299 if (!newrun || expno < hd.expno || runno < hd.runno) {
300 newrun = true;
301 isnew = true;
302 expno = hd.expno;
303 runno = hd.runno;
304 //fileid = 0;
305 if (use_info) {
306 info.setExpNumber(expno);
307 info.setRunNumber(runno);
308 info.setSubNumber(subno);
309 info.setInputCount(0);
310 info.setInputNBytes(0);
311 info.setOutputCount(0);
312 info.setOutputNBytes(0);
313 nbyte_out = 0;
314 count_out = 0;
315 }
316 obuf.lock();
317 SharedEventBuffer::Header* oheader = obuf.getHeader();
318 oheader->expno = expno;
319 oheader->runno = runno;
320 obuf.unlock();
321 if (file) {
322 file.close();
323 }
324 file.open(path, ndisks, expno, runno, fileid);
325 nbyte_out += nbyte;
326 fileid++;
327 continue;
328 }
329 if (use_info) {
330 info.addInputCount(1);
331 info.addInputNBytes(nbyte);
332 }
333 if (hd.type == MSG_STREAMERINFO) {
334 continue;
335 }
336 if (file) {
337 file.write((char*)evtbuf, nbyte);
338 nbyte_out += nbyte;
339 if (!isnew && obufsize > 0 && count_out % interval == 0 && obuf.isWritable(nword)) {
340 obuf.write(evtbuf, nword, true);
341 }
342 count_out++;
343 if (use_info) {
344 info.addOutputCount(1);
345 info.addOutputNBytes(nbyte);
346 info.get()->reserved[0] = file.getFileId();
347 info.get()->reserved[1] = file.getDiskId();
348 info.get()->reserved_f[0] = (float)info.getOutputNBytes() / 1024. / 1024.;
349 }
350 } else {
351 if (!ecount) {
352 B2WARNING("no run was initialzed for recording : " << hd.expno << "." << hd.runno);
353 }
354 ecount = 1;
355 }
356 }
357 }
358 return 0;
359}
360
Abstract base class for different kinds of events.