13 #include <framework/logging/Logger.h>
14 #include <framework/pcore/EvtMessage.h>
16 #include <daq/storage/SharedEventBuffer.h>
18 #include <daq/slc/psql/PostgreSQLInterface.h>
20 #include <daq/slc/database/DBHandlerException.h>
22 #include <daq/slc/readout/RunInfoBuffer.h>
24 #include <daq/slc/base/ConfigFile.h>
25 #include <daq/slc/base/Date.h>
26 #include <daq/slc/base/StringUtil.h>
33 #include <sys/statvfs.h>
38 const unsigned long long GB = 1000 * 1024 * 1024;
39 const unsigned long long MAX_FILE_SIZE = 8 * GB;
40 const char* g_table =
"datafiles";
41 unsigned int g_streamersize = 0;
42 char* g_streamerinfo =
new char[1000000];
44 std::string g_file_diskid;
47 bool g_is_arich =
false;
53 const char* host,
const char* dbtmp)
54 : m_db(db), m_runtype(runtype), m_host(host), m_dbtmp(dbtmp)
66 int getDiskId() {
return m_diskid; }
67 int getFileId() {
return m_fileid; }
70 int open(
const std::string& dir,
int ndisks,
int expno,
int runno,
int fileid)
78 m_expno = (g_expno > 0) ? g_expno : expno;
79 m_runno = (g_runno > 0) ? g_runno : runno;
81 bool available =
false;
83 struct statvfs statfs;
90 std::ifstream fin(g_file_diskid.c_str());
94 for (
int i = 0; i < ndisks; i++) {
95 sprintf(filename,
"%s%02d", dir.c_str(), m_diskid);
96 statvfs(filename, &statfs);
97 float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
98 sprintf(filename,
"%s%02d/storage/full_flag", dir.c_str(), m_diskid);
99 std::ifstream fin(filename);
118 std::cout <<
"[DEBUG] disk : " << m_diskid <<
" is available" << std::endl;
121 std::cout <<
"[DEBUG] disk : " << m_diskid <<
" is with full_flag" << std::endl;
123 std::ofstream fout(filename);
126 B2WARNING(
"disk-" << m_diskid <<
" is full " << usage);
129 if (m_diskid > ndisks) m_diskid = 1;
133 B2FATAL(
"No disk available for writing " << __FILE__ <<
":" << __LINE__);
136 std::string filedir = dir + StringUtil::form(
"%02d/storage/%4.4d/%5.5d/", m_diskid, expno, runno);
137 system((
"mkdir -p " + filedir).c_str());
138 m_filename = StringUtil::form(
"%s.%4.4d.%5.5d.%s.f%5.5d.sroot",
139 m_runtype.c_str(), expno, runno, m_host.c_str(), m_fileid);
140 m_path = filedir + m_filename;
141 m_file =
::open(
m_path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0664);
143 std::ofstream fout(g_file_diskid.c_str());
146 B2FATAL(
"Failed to open file : " <<
m_path);
151 m_db->execute(
"insert into %s (name, path, host, label, expno, runno, fileno, nevents, chksum, size) "
152 "values ('%s', '%s', '%s', '%s', %d, %d, %d, 0, 0, 0);",
153 g_table, m_filename.c_str(),
m_path.c_str(), m_host.c_str(),
154 m_runtype.c_str(), m_expno, m_runno, m_fileid);
158 write(g_streamerinfo, g_streamersize,
true);
159 B2INFO(
"New file " <<
m_path <<
" is opened");
167 std::cout <<
"[DEBUG] File closed" << std::endl;
171 stat(
m_path.c_str(), &st);
172 std::string d =
Date(st.st_mtime).toString();
174 m_db->execute(
"update %s set time_close = '%s', chksum = %lu, nevents = %lu, "
175 "size = %lu where name = '%s' and host = '%s';",
176 g_table, d.c_str(), m_chksum, m_nevents, m_filesize,
177 m_filename.c_str(), m_host.c_str());
185 int write(
char* evtbuf,
int nbyte,
bool isstreamer =
false)
187 m_chksum = adler32(m_chksum, (
unsigned char*)evtbuf, nbyte);
188 int ret = ::write(m_file, evtbuf, nbyte);
203 std::string m_runtype;
207 std::string m_filename;
212 std::string m_configname;
215 unsigned long long m_filesize;
216 unsigned long long m_chksum;
217 unsigned long long m_nevents;
222 void signalHandler(
int)
224 if (g_file) g_file->close();
228 int main(
int argc,
char** argv)
231 printf(
"%s : <ibufname> <ibufsize> <hostname> <runtype> <path> <ndisk> "
232 "<filepath_dbtmp> [<obufname> <obufsize> nodename, nodeid]\n", argv[0]);
235 const unsigned interval = 1;
236 const char* ibufname = argv[1];
237 const int ibufsize = atoi(argv[2]);
238 const char* hostname = argv[3];
239 const char* runtype = argv[4];
240 const bool not_record = std::string(runtype).find(std::string(
"null")) != std::string::npos;
241 const char* path = argv[5];
242 const int ndisks = atoi(argv[6]);
243 const char* file_dbtmp = argv[7];
244 const char* obufname = (argc > 7) ? argv[8] :
"";
245 const int obufsize = (argc > 8) ? atoi(argv[9]) : -1;
246 const char* nodename = (argc > 9) ? argv[10] :
"";
247 g_is_arich = StringUtil::find(runtype,
"arich");
248 const int nodeid = (argc > 10) ? atoi(argv[11]) : -1;
249 const unsigned int ninput = (argc > 11) ? atoi(argv[12]) : 1;
250 g_file_diskid = StringUtil::form(
"/tmp/%s_diskid", nodename);
253 const bool use_info = nodeid >= 0;
255 info.open(nodename, nodeid);
258 for (
unsigned int ib = 0; ib < ninput; ib++) {
259 ibuf[ib].open(StringUtil::form(
"%s_%d", ibufname, ib), ibufsize * 1000000);
261 signal(SIGINT, signalHandler);
262 signal(SIGKILL, signalHandler);
265 config.get(
"database.dbname"),
266 config.get(
"database.user"),
267 config.get(
"database.password"),
268 config.getInt(
"database.port"));
270 if (obufsize > 0) obuf.open(obufname, obufsize * 1000000);
271 if (use_info) info.reportReady();
272 B2DEBUG(1,
"started recording.");
273 unsigned long long nbyte_out = 0;
274 unsigned int count_out = 0;
275 unsigned int expno = 0;
276 unsigned int runno = 0;
277 unsigned int subno = 0;
278 int* evtbuf =
new int[10000000];
279 g_file =
new FileHandler(db, runtype, hostname, file_dbtmp);
283 unsigned int fileid = 0;
292 if (use_info) info.reportRunning();
295 while ((g_runno = info.getRunNumber()) <= 0) {
298 g_expno = info.getExpNumber();
299 std::cout <<
"[DEBUG] expno = " << g_expno <<
", runno =" << g_runno << std::endl;
302 for (
unsigned int ib = 0; ib < ninput; ib++) {
304 ibuf[ib].read((
int*)&hd,
true,
true);
305 ibuf[ib].read(evtbuf,
true,
true);
307 int nbyte = evtbuf[0];
308 int nword = (nbyte - 1) / 4 + 1;
310 if (hd.type == MSG_STREAMERINFO) {
311 memcpy(g_streamerinfo, evtbuf, nbyte);
312 g_streamersize = nbyte;
314 if (expno > hd.expno || runno > hd.runno) {
315 B2WARNING(
"The old run was detected => discard event exp = " << hd.expno <<
316 " (" << expno <<
"), runno" << hd.runno <<
"(" << runno <<
")");
319 if (!newrun || expno < hd.expno || runno < hd.runno) {
326 info.setExpNumber(expno);
327 info.setRunNumber(runno);
328 info.setSubNumber(subno);
329 info.setInputCount(0);
330 info.setInputNBytes(0);
331 info.setOutputCount(0);
332 info.setOutputNBytes(0);
338 oheader->expno = expno;
339 oheader->runno = runno;
345 std::string filename = StringUtil::form(
"%s%02d", path, g_diskid);
346 struct statvfs statfs;
347 statvfs(filename.c_str(), &statfs);
348 float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
349 if (usage > 0.9) g_diskid = 0;
350 file.open(path, ndisks, expno, runno, fileid);
358 info.addInputCount(1);
359 info.addInputNBytes(nbyte);
361 if (hd.type == MSG_STREAMERINFO) {
365 if (nbyte_out > MAX_FILE_SIZE) {
369 std::string filename = StringUtil::form(
"%s%02d", path, g_diskid);
370 struct statvfs statfs;
371 statvfs(filename.c_str(), &statfs);
372 float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
374 B2WARNING(
"disk-" << g_diskid <<
" is already full. Stopping Run#" << runno);
375 std::cout <<
"[STOP=RUNCONTROL]" << std::endl;
379 file.open(path, ndisks, expno, runno, fileid);
383 file.write((
char*)evtbuf, nbyte);
386 info.addOutputCount(1);
387 info.addOutputNBytes(nbyte);
388 info.get()->reserved[0] = file.getFileId();
389 info.get()->reserved[1] = file.getDiskId();
390 info.get()->reserved_f[0] = (float)info.getOutputNBytes() / 1024. / 1024.;
394 B2WARNING(
"no run was initialzed for recording : " << hd.expno <<
"." << hd.runno);
399 if (!isnew && obufsize > 0 && count_out % interval == 0 && obuf.isWritable(nword)) {
400 obuf.write(evtbuf, nword,
true);
InputHandler which will read XML from plain files, optionally gzip compressed.
virtual ~FileHandler()
empty, virtual destructor
std::string m_path
Search path to look for files.
virtual InputContext * open(const std::string &path) override
create a new FileContext by searching the file system for a file named like path.
FileHandler(const std::string &uri)
Instantiate a new file handler, using the uri as base search path.
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.