Belle II Software  release-08-01-10
storagerecord.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <unistd.h>
9 #include <cstdlib>
10 #include <sys/stat.h>
11 #include <fcntl.h>
12 
13 #include <framework/logging/Logger.h>
14 #include <framework/pcore/EvtMessage.h>
15 
16 #include <daq/storage/SharedEventBuffer.h>
17 
18 #include <daq/slc/psql/PostgreSQLInterface.h>
19 
20 #include <daq/slc/database/DBHandlerException.h>
21 
22 #include <daq/slc/readout/RunInfoBuffer.h>
23 
24 #include <daq/slc/base/ConfigFile.h>
25 #include <daq/slc/base/Date.h>
26 #include <daq/slc/base/StringUtil.h>
27 
28 #include <cstdio>
29 #include <cstring>
30 #include <fstream>
31 #include <iostream>
32 #include <signal.h>
33 #include <sys/statvfs.h>
34 #include <zlib.h>
35 
36 using namespace Belle2;
37 
38 const unsigned long long GB = 1000 * 1024 * 1024;
39 const unsigned long long MAX_FILE_SIZE = 8 * GB;
40 const char* g_table = "datafiles";
41 unsigned int g_streamersize = 0;
42 char* g_streamerinfo = new char[1000000];
43 int g_diskid = 0;
44 std::string g_file_diskid;
45 int g_runno = 0;
46 int g_expno = 0;
47 bool g_is_arich = false;
48 
49 class FileHandler {
50 
51 public:
52  FileHandler(DBInterface* db, const std::string& runtype,
53  const char* host, const char* dbtmp)
54  : m_db(db), m_runtype(runtype), m_host(host), m_dbtmp(dbtmp)
55  {
56  m_file = -1;
57  m_filesize = 0;
58  m_fileid = 0;
59  m_diskid = 1;
60  }
61  ~FileHandler() throw()
62  {
63  }
64 
65 public:
66  int getDiskId() { return m_diskid; }
67  int getFileId() { return m_fileid; }
68 
69 public:
70  int open(const std::string& dir, int ndisks, int expno, int runno, int fileid)
71  {
72  m_filesize = 0;
73  m_fileid = 0;
74  m_diskid = 1;
75  m_chksum = 1;
76  m_nevents = 0;
77  m_filesize = 0;
78  m_expno = (g_expno > 0) ? g_expno : expno;
79  m_runno = (g_runno > 0) ? g_runno : runno;
80  m_fileid = fileid;
81  bool available = false;
82  char filename[1024];
83  struct statvfs statfs;
84  if (g_diskid > 0) {
85  m_diskid = g_diskid;
86  available = true;
87  } else {
88  {
89  int diskid = 1;
90  std::ifstream fin(g_file_diskid.c_str());
91  fin >> diskid;
92  m_diskid = diskid;
93  }
94  for (int i = 0; i < ndisks; i++) {
95  sprintf(filename, "%s%02d", dir.c_str(), m_diskid);
96  statvfs(filename, &statfs);
97  float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
98  sprintf(filename, "%s%02d/storage/full_flag", dir.c_str(), m_diskid);
99  std::ifstream fin(filename);
100  int flag = 0;
101  fin >> flag;
102  fin.close();
103  /*
104  if (usage < 0.1) {
105  fin.close();
106  //::unlink(filename);
107  if (flag == 1) {
108  std::cout << "[DEBUG] disk : " << m_diskid << " is available again (full_flag removed)" << std::endl;
109  }
110  available = true;
111  std::cout << "[DEBUG] disk : " << m_diskid << " is available" << std::endl;
112  break;
113  } else
114  */
115  if (usage < 0.7) {
116  if (flag != 1) {
117  available = true;
118  std::cout << "[DEBUG] disk : " << m_diskid << " is available" << std::endl;
119  break;
120  }
121  std::cout << "[DEBUG] disk : " << m_diskid << " is with full_flag" << std::endl;
122  } else {
123  std::ofstream fout(filename);
124  fout << 1;
125  fout.close();
126  B2WARNING("disk-" << m_diskid << " is full " << usage);
127  }
128  m_diskid++;
129  if (m_diskid > ndisks) m_diskid = 1;
130  }
131  }
132  if (!available) {
133  B2FATAL("No disk available for writing " << __FILE__ << ":" << __LINE__);
134  exit(1);
135  }
136  std::string filedir = dir + StringUtil::form("%02d/storage/%4.4d/%5.5d/", m_diskid, expno, runno);
137  system(("mkdir -p " + filedir).c_str());
138  m_filename = StringUtil::form("%s.%4.4d.%5.5d.%s.f%5.5d.sroot",
139  m_runtype.c_str(), expno, runno, m_host.c_str(), m_fileid);
140  m_path = filedir + m_filename;
141  m_file = ::open(m_path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0664);
142  g_diskid = m_diskid;
143  std::ofstream fout(g_file_diskid.c_str());
144  fout << g_diskid;
145  if (m_file < 0) {
146  B2FATAL("Failed to open file : " << m_path);
147  exit(1);
148  }
149  try {
150  m_db->connect();
151  m_db->execute("insert into %s (name, path, host, label, expno, runno, fileno, nevents, chksum, size) "
152  "values ('%s', '%s', '%s', '%s', %d, %d, %d, 0, 0, 0);",
153  g_table, m_filename.c_str(), m_path.c_str(), m_host.c_str(),
154  m_runtype.c_str(), m_expno, m_runno, m_fileid);
155  } catch (const DBHandlerException& e) {
156  B2WARNING(e.what());
157  }
158  write(g_streamerinfo, g_streamersize, true);
159  B2INFO("New file " << m_path << " is opened");
160 
161  return m_id;
162  }
163 
164  void close()
165  {
166  if (m_file > 0) {
167  std::cout << "[DEBUG] File closed" << std::endl;
168  ::close(m_file);
169  try {
170  struct stat st;
171  stat(m_path.c_str(), &st);
172  std::string d = Date(st.st_mtime).toString();
173  m_db->connect();
174  m_db->execute("update %s set time_close = '%s', chksum = %lu, nevents = %lu, "
175  "size = %lu where name = '%s' and host = '%s';",
176  g_table, d.c_str(), m_chksum, m_nevents, m_filesize,
177  m_filename.c_str(), m_host.c_str());
178  } catch (const DBHandlerException& e) {
179  B2WARNING(e.what());
180  }
181  m_db->close();
182  }
183  }
184 
185  int write(char* evtbuf, int nbyte, bool isstreamer = false)
186  {
187  m_chksum = adler32(m_chksum, (unsigned char*)evtbuf, nbyte);
188  int ret = ::write(m_file, evtbuf, nbyte);
189  m_filesize += nbyte;
190  if (!isstreamer) {
191  m_nevents++;
192  }
193  return ret;
194  }
195 
196  operator bool()
197  {
198  return m_file > 0;
199  }
200 
201 private:
202  DBInterface* m_db;
203  std::string m_runtype;
204  std::string m_host;
205  std::string m_dbtmp;
206  int m_id;
207  std::string m_filename;
208  std::string m_path;
209  int m_diskid;
210  int m_fileid;
211  int m_file;
212  std::string m_configname;
213  int m_expno;
214  int m_runno;
215  unsigned long long m_filesize;
216  unsigned long long m_chksum;
217  unsigned long long m_nevents;
218 };
219 
220 FileHandler* g_file = NULL;
221 
222 void signalHandler(int)
223 {
224  if (g_file) g_file->close();
225  exit(1);
226 }
227 
228 int main(int argc, char** argv)
229 {
230  if (argc < 8) {
231  printf("%s : <ibufname> <ibufsize> <hostname> <runtype> <path> <ndisk> "
232  "<filepath_dbtmp> [<obufname> <obufsize> nodename, nodeid]\n", argv[0]);
233  return 1;
234  }
235  const unsigned interval = 1;
236  const char* ibufname = argv[1];
237  const int ibufsize = atoi(argv[2]);
238  const char* hostname = argv[3];
239  const char* runtype = argv[4];
240  const bool not_record = std::string(runtype).find(std::string("null")) != std::string::npos;
241  const char* path = argv[5];
242  const int ndisks = atoi(argv[6]);
243  const char* file_dbtmp = argv[7];
244  const char* obufname = (argc > 7) ? argv[8] : "";
245  const int obufsize = (argc > 8) ? atoi(argv[9]) : -1;
246  const char* nodename = (argc > 9) ? argv[10] : "";
247  g_is_arich = StringUtil::find(runtype, "arich");
248  const int nodeid = (argc > 10) ? atoi(argv[11]) : -1;
249  const unsigned int ninput = (argc > 11) ? atoi(argv[12]) : 1;
250  g_file_diskid = StringUtil::form("/tmp/%s_diskid", nodename);
251 
252  RunInfoBuffer info;
253  const bool use_info = nodeid >= 0;
254  if (use_info) {
255  info.open(nodename, nodeid);
256  }
257  SharedEventBuffer ibuf[10];
258  for (unsigned int ib = 0; ib < ninput; ib++) {
259  ibuf[ib].open(StringUtil::form("%s_%d", ibufname, ib), ibufsize * 1000000);//, true);
260  }
261  signal(SIGINT, signalHandler);
262  signal(SIGKILL, signalHandler);
263  ConfigFile config("slowcontrol");
264  PostgreSQLInterface* db = new PostgreSQLInterface(config.get("database.host"),
265  config.get("database.dbname"),
266  config.get("database.user"),
267  config.get("database.password"),
268  config.getInt("database.port"));
269  SharedEventBuffer obuf;
270  if (obufsize > 0) obuf.open(obufname, obufsize * 1000000);//, true);
271  if (use_info) info.reportReady();
272  B2DEBUG(1, "started recording.");
273  unsigned long long nbyte_out = 0;
274  unsigned int count_out = 0;
275  unsigned int expno = 0;
276  unsigned int runno = 0;
277  unsigned int subno = 0;
278  int* evtbuf = new int[10000000];
279  g_file = new FileHandler(db, runtype, hostname, file_dbtmp);
280  FileHandler& file(*g_file);
281  int ecount = 0;
282  bool newrun = false;
283  unsigned int fileid = 0;
284  struct dataheader {
285  int nword;
286  int type;
287  unsigned int expno;
288  unsigned int runno;
289  } hd;
290  g_streamersize = 0;
291  while (true) {
292  if (use_info) info.reportRunning();
293  if (g_is_arich) {
294  if (g_runno == 0) {
295  while ((g_runno = info.getRunNumber()) <= 0) {
296  usleep(500);
297  }
298  g_expno = info.getExpNumber();
299  std::cout << "[DEBUG] expno = " << g_expno << ", runno =" << g_runno << std::endl;
300  }
301  }
302  for (unsigned int ib = 0; ib < ninput; ib++) {
303  ibuf[ib].lock();
304  ibuf[ib].read((int*)&hd, true, true);
305  ibuf[ib].read(evtbuf, true, true);
306  ibuf[ib].unlock();
307  int nbyte = evtbuf[0];
308  int nword = (nbyte - 1) / 4 + 1;
309  bool isnew = false;
310  if (hd.type == MSG_STREAMERINFO) {
311  memcpy(g_streamerinfo, evtbuf, nbyte);
312  g_streamersize = nbyte;
313  }
314  if (expno > hd.expno || runno > hd.runno) {
315  B2WARNING("The old run was detected => discard event exp = " << hd.expno <<
316  " (" << expno << "), runno" << hd.runno << "(" << runno << ")");
317  continue;
318  }
319  if (!newrun || expno < hd.expno || runno < hd.runno) {
320  newrun = true;
321  isnew = true;
322  expno = hd.expno;
323  runno = hd.runno;
324  fileid = 0;
325  if (use_info) {
326  info.setExpNumber(expno);
327  info.setRunNumber(runno);
328  info.setSubNumber(subno);
329  info.setInputCount(0);
330  info.setInputNBytes(0);
331  info.setOutputCount(0);
332  info.setOutputNBytes(0);
333  nbyte_out = 0;
334  count_out = 0;
335  }
336  obuf.lock();
337  SharedEventBuffer::Header* oheader = obuf.getHeader();
338  oheader->expno = expno;
339  oheader->runno = runno;
340  obuf.unlock();
341  if (!not_record) {
342  if (file) {
343  file.close();
344  }
345  std::string filename = StringUtil::form("%s%02d", path, g_diskid);
346  struct statvfs statfs;
347  statvfs(filename.c_str(), &statfs);
348  float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
349  if (usage > 0.9) g_diskid = 0;
350  file.open(path, ndisks, expno, runno, fileid);
351  ecount = 0;
352  nbyte_out += nbyte;
353  fileid++;
354  }
355  continue;
356  }
357  if (use_info) {
358  info.addInputCount(1);
359  info.addInputNBytes(nbyte);
360  }
361  if (hd.type == MSG_STREAMERINFO) {
362  continue;
363  }
364  if (file) {
365  if (nbyte_out > MAX_FILE_SIZE) {
366  file.close();
367  nbyte_out = 0;
368  if (g_diskid > 0) {
369  std::string filename = StringUtil::form("%s%02d", path, g_diskid);
370  struct statvfs statfs;
371  statvfs(filename.c_str(), &statfs);
372  float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
373  if (usage > 0.9) {
374  B2WARNING("disk-" << g_diskid << " is already full. Stopping Run#" << runno);
375  std::cout << "[STOP=RUNCONTROL]" << std::endl;
376  continue;
377  }
378  }
379  file.open(path, ndisks, expno, runno, fileid);
380  ecount = 0;
381  fileid++;
382  }
383  file.write((char*)evtbuf, nbyte);
384  nbyte_out += nbyte;
385  if (use_info) {
386  info.addOutputCount(1);
387  info.addOutputNBytes(nbyte);
388  info.get()->reserved[0] = file.getFileId();
389  info.get()->reserved[1] = file.getDiskId();
390  info.get()->reserved_f[0] = (float)info.getOutputNBytes() / 1024. / 1024.;
391  }
392  } else {
393  if (!ecount) {
394  B2WARNING("no run was initialzed for recording : " << hd.expno << "." << hd.runno);
395  }
396  ecount = 1;
397  }
398  // Dump data into output buffer
399  if (!isnew && obufsize > 0 && count_out % interval == 0 && obuf.isWritable(nword)) {
400  obuf.write(evtbuf, nword, true);
401  }
402  count_out++;
403  }
404  }
405  return 0;
406 }
407 
InputHandler which will read XML from plain files, optionally gzip compressed.
Definition: FileHandler.h:67
virtual ~FileHandler()
empty, virtual destructor
Definition: FileHandler.h:75
std::string m_path
Search path to look for files.
Definition: FileHandler.h:83
virtual InputContext * open(const std::string &path) override
create a new FileContext by searching the file system for a file named like path.
Definition: FileHandler.cc:64
FileHandler(const std::string &uri)
Instantiate a new file handler, using the uri as base search path.
Definition: FileHandler.cc:35
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91