Belle II Software  release-05-01-25
storagerecord.cc
1 #include <unistd.h>
2 #include <cstdlib>
3 #include <sys/stat.h>
4 #include <fcntl.h>
5 
6 #include <framework/logging/Logger.h>
7 #include <framework/pcore/EvtMessage.h>
8 
9 #include <daq/storage/SharedEventBuffer.h>
10 
11 #include <daq/slc/psql/PostgreSQLInterface.h>
12 
13 #include <daq/slc/database/DBHandlerException.h>
14 
15 #include <daq/slc/readout/RunInfoBuffer.h>
16 
17 #include <daq/slc/base/ConfigFile.h>
18 #include <daq/slc/base/Date.h>
19 #include <daq/slc/base/StringUtil.h>
20 
21 #include <cstdio>
22 #include <cstring>
23 #include <fstream>
24 #include <iostream>
25 #include <signal.h>
26 #include <sys/statvfs.h>
27 #include <zlib.h>
28 
29 using namespace Belle2;
30 
31 const unsigned long long GB = 1000 * 1024 * 1024;
32 const unsigned long long MAX_FILE_SIZE = 8 * GB;
33 const char* g_table = "datafiles";
34 unsigned int g_streamersize = 0;
35 char* g_streamerinfo = new char[1000000];
36 int g_diskid = 0;
37 std::string g_file_diskid;
38 int g_runno = 0;
39 int g_expno = 0;
40 bool g_is_arich = false;
41 
42 class FileHandler {
43 
44 public:
45  FileHandler(DBInterface* db, const std::string& runtype,
46  const char* host, const char* dbtmp)
47  : m_db(db), m_runtype(runtype), m_host(host), m_dbtmp(dbtmp)
48  {
49  m_file = -1;
50  m_filesize = 0;
51  m_fileid = 0;
52  m_diskid = 1;
53  }
54  ~FileHandler() throw()
55  {
56  }
57 
58 public:
59  int getDiskId() { return m_diskid; }
60  int getFileId() { return m_fileid; }
61 
62 public:
63  int open(const std::string& dir, int ndisks, int expno, int runno, int fileid)
64  {
65  m_filesize = 0;
66  m_fileid = 0;
67  m_diskid = 1;
68  m_chksum = 1;
69  m_nevents = 0;
70  m_filesize = 0;
71  m_expno = (g_expno > 0) ? g_expno : expno;
72  m_runno = (g_runno > 0) ? g_runno : runno;
73  m_fileid = fileid;
74  bool available = false;
75  char filename[1024];
76  struct statvfs statfs;
77  if (g_diskid > 0) {
78  m_diskid = g_diskid;
79  available = true;
80  } else {
81  {
82  int diskid = 1;
83  std::ifstream fin(g_file_diskid.c_str());
84  fin >> diskid;
85  m_diskid = diskid;
86  }
87  for (int i = 0; i < ndisks; i++) {
88  sprintf(filename, "%s%02d", dir.c_str(), m_diskid);
89  statvfs(filename, &statfs);
90  float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
91  sprintf(filename, "%s%02d/storage/full_flag", dir.c_str(), m_diskid);
92  std::ifstream fin(filename);
93  int flag = 0;
94  fin >> flag;
95  fin.close();
96  /*
97  if (usage < 0.1) {
98  fin.close();
99  //::unlink(filename);
100  if (flag == 1) {
101  std::cout << "[DEBUG] disk : " << m_diskid << " is available again (full_flag removed)" << std::endl;
102  }
103  available = true;
104  std::cout << "[DEBUG] disk : " << m_diskid << " is available" << std::endl;
105  break;
106  } else
107  */
108  if (usage < 0.9) {
109  if (flag != 1) {
110  available = true;
111  std::cout << "[DEBUG] disk : " << m_diskid << " is available" << std::endl;
112  break;
113  }
114  std::cout << "[DEBUG] disk : " << m_diskid << " is with full_flag" << std::endl;
115  } else {
116  std::ofstream fout(filename);
117  fout << 1;
118  fout.close();
119  B2WARNING("disk-" << m_diskid << " is full " << usage);
120  }
121  m_diskid++;
122  if (m_diskid > ndisks) m_diskid = 1;
123  }
124  }
125  if (!available) {
126  B2FATAL("No disk available for writing " << __FILE__ << ":" << __LINE__);
127  exit(1);
128  }
129  std::string filedir = dir + StringUtil::form("%02d/storage/%4.4d/%5.5d/", m_diskid, expno, runno);
130  system(("mkdir -p " + filedir).c_str());
131  m_filename = StringUtil::form("%s.%4.4d.%5.5d.%s.f%5.5d.sroot",
132  m_runtype.c_str(), expno, runno, m_host.c_str(), m_fileid);
133  m_path = filedir + m_filename;
134  m_file = ::open(m_path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0664);
135  g_diskid = m_diskid;
136  std::ofstream fout(g_file_diskid.c_str());
137  fout << g_diskid;
138  if (m_file < 0) {
139  B2FATAL("Failed to open file : " << m_path);
140  exit(1);
141  }
142  try {
143  m_db->connect();
144  m_db->execute("insert into %s (name, path, host, label, expno, runno, fileno, nevents, chksum, size) "
145  "values ('%s', '%s', '%s', '%s', %d, %d, %d, 0, 0, 0);",
146  g_table, m_filename.c_str(), m_path.c_str(), m_host.c_str(),
147  m_runtype.c_str(), m_expno, m_runno, m_fileid);
148  } catch (const DBHandlerException& e) {
149  B2WARNING(e.what());
150  }
151  write(g_streamerinfo, g_streamersize, true);
152  B2INFO("New file " << m_path << " is opened");
153 
154  return m_id;
155  }
156 
157  void close()
158  {
159  if (m_file > 0) {
160  std::cout << "[DEBUG] File closed" << std::endl;
161  ::close(m_file);
162  try {
163  struct stat st;
164  stat(m_path.c_str(), &st);
165  std::string d = Date(st.st_mtime).toString();
166  m_db->connect();
167  m_db->execute("update %s set time_close = '%s', chksum = %lu, nevents = %lu, "
168  "size = %lu where name = '%s' and host = '%s';",
169  g_table, d.c_str(), m_chksum, m_nevents, m_filesize,
170  m_filename.c_str(), m_host.c_str());
171  } catch (const DBHandlerException& e) {
172  B2WARNING(e.what());
173  }
174  m_db->close();
175  }
176  }
177 
178  int write(char* evtbuf, int nbyte, bool isstreamer = false)
179  {
180  m_chksum = adler32(m_chksum, (unsigned char*)evtbuf, nbyte);
181  int ret = ::write(m_file, evtbuf, nbyte);
182  m_filesize += nbyte;
183  if (!isstreamer) {
184  m_nevents++;
185  }
186  return ret;
187  }
188 
189  operator bool()
190  {
191  return m_file > 0;
192  }
193 
194 private:
195  DBInterface* m_db;
196  std::string m_runtype;
197  std::string m_host;
198  std::string m_dbtmp;
199  int m_id;
200  std::string m_filename;
201  std::string m_path;
202  int m_diskid;
203  int m_fileid;
204  int m_file;
205  std::string m_configname;
206  int m_expno;
207  int m_runno;
208  unsigned long long m_filesize;
209  unsigned long long m_chksum;
210  unsigned long long m_nevents;
211 };
212 
213 FileHandler* g_file = NULL;
214 
215 void signalHandler(int)
216 {
217  if (g_file) g_file->close();
218  exit(1);
219 }
220 
221 int main(int argc, char** argv)
222 {
223  if (argc < 8) {
224  printf("%s : <ibufname> <ibufsize> <hostname> <runtype> <path> <ndisk> "
225  "<filepath_dbtmp> [<obufname> <obufsize> nodename, nodeid]\n", argv[0]);
226  return 1;
227  }
228  const unsigned interval = 1;
229  const char* ibufname = argv[1];
230  const int ibufsize = atoi(argv[2]);
231  const char* hostname = argv[3];
232  const char* runtype = argv[4];
233  const bool not_record = std::string(runtype) == "null";
234  const char* path = argv[5];
235  const int ndisks = atoi(argv[6]);
236  const char* file_dbtmp = argv[7];
237  const char* obufname = (argc > 7) ? argv[8] : "";
238  const int obufsize = (argc > 8) ? atoi(argv[9]) : -1;
239  const char* nodename = (argc > 9) ? argv[10] : "";
240  g_is_arich = StringUtil::find(runtype, "arich");
241  const int nodeid = (argc > 10) ? atoi(argv[11]) : -1;
242  const unsigned int ninput = (argc > 11) ? atoi(argv[12]) : 1;
243  g_file_diskid = StringUtil::form("/tmp/%s_diskid", nodename);
244 
245  RunInfoBuffer info;
246  const bool use_info = nodeid >= 0;
247  if (use_info) {
248  info.open(nodename, nodeid);
249  }
250  SharedEventBuffer ibuf[10];
251  for (unsigned int ib = 0; ib < ninput; ib++) {
252  ibuf[ib].open(StringUtil::form("%s_%d", ibufname, ib), ibufsize * 1000000);//, true);
253  }
254  signal(SIGINT, signalHandler);
255  signal(SIGKILL, signalHandler);
256  ConfigFile config("slowcontrol");
257  PostgreSQLInterface* db = new PostgreSQLInterface(config.get("database.host"),
258  config.get("database.dbname"),
259  config.get("database.user"),
260  config.get("database.password"),
261  config.getInt("database.port"));
262  SharedEventBuffer obuf;
263  if (obufsize > 0) obuf.open(obufname, obufsize * 1000000);//, true);
264  if (use_info) info.reportReady();
265  B2DEBUG(1, "started recording.");
266  unsigned long long nbyte_out = 0;
267  unsigned int count_out = 0;
268  unsigned int expno = 0;
269  unsigned int runno = 0;
270  unsigned int subno = 0;
271  int* evtbuf = new int[10000000];
272  g_file = new FileHandler(db, runtype, hostname, file_dbtmp);
273  FileHandler& file(*g_file);
274  int ecount = 0;
275  bool newrun = false;
276  unsigned int fileid = 0;
277  struct dataheader {
278  int nword;
279  int type;
280  unsigned int expno;
281  unsigned int runno;
282  } hd;
283  g_streamersize = 0;
284  while (true) {
285  if (use_info) info.reportRunning();
286  if (g_is_arich) {
287  if (g_runno == 0) {
288  while ((g_runno = info.getRunNumber()) <= 0) {
289  usleep(500);
290  }
291  g_expno = info.getExpNumber();
292  std::cout << "[DEBUG] expno = " << g_expno << ", runno =" << g_runno << std::endl;
293  }
294  }
295  for (unsigned int ib = 0; ib < ninput; ib++) {
296  ibuf[ib].lock();
297  ibuf[ib].read((int*)&hd, true, true);
298  ibuf[ib].read(evtbuf, true, true);
299  ibuf[ib].unlock();
300  int nbyte = evtbuf[0];
301  int nword = (nbyte - 1) / 4 + 1;
302  bool isnew = false;
303  if (hd.type == MSG_STREAMERINFO) {
304  memcpy(g_streamerinfo, evtbuf, nbyte);
305  g_streamersize = nbyte;
306  }
307  if (expno > hd.expno || runno > hd.runno) {
308  B2WARNING("The old run was detected => discard event exp = " << hd.expno <<
309  " (" << expno << "), runno" << hd.runno << "(" << runno << ")");
310  continue;
311  }
312  if (!newrun || expno < hd.expno || runno < hd.runno) {
313  newrun = true;
314  isnew = true;
315  expno = hd.expno;
316  runno = hd.runno;
317  fileid = 0;
318  if (use_info) {
319  info.setExpNumber(expno);
320  info.setRunNumber(runno);
321  info.setSubNumber(subno);
322  info.setInputCount(0);
323  info.setInputNBytes(0);
324  info.setOutputCount(0);
325  info.setOutputNBytes(0);
326  nbyte_out = 0;
327  count_out = 0;
328  }
329  obuf.lock();
330  SharedEventBuffer::Header* oheader = obuf.getHeader();
331  oheader->expno = expno;
332  oheader->runno = runno;
333  obuf.unlock();
334  if (!not_record) {
335  if (file) {
336  file.close();
337  }
338  std::string filename = StringUtil::form("%s%02d", path, g_diskid);
339  struct statvfs statfs;
340  statvfs(filename.c_str(), &statfs);
341  float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
342  if (usage > 0.9) g_diskid = 0;
343  file.open(path, ndisks, expno, runno, fileid);
344  ecount = 0;
345  nbyte_out += nbyte;
346  fileid++;
347  }
348  continue;
349  }
350  if (use_info) {
351  info.addInputCount(1);
352  info.addInputNBytes(nbyte);
353  }
354  if (hd.type == MSG_STREAMERINFO) {
355  continue;
356  }
357  if (file) {
358  if (nbyte_out > MAX_FILE_SIZE) {
359  file.close();
360  nbyte_out = 0;
361  if (g_diskid > 0) {
362  std::string filename = StringUtil::form("%s%02d", path, g_diskid);
363  struct statvfs statfs;
364  statvfs(filename.c_str(), &statfs);
365  float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
366  if (usage > 0.9) {
367  B2WARNING("disk-" << g_diskid << " is already full. Stopping Run#" << runno);
368  std::cout << "[STOP=RUNCONTROL]" << std::endl;
369  continue;
370  }
371  }
372  file.open(path, ndisks, expno, runno, fileid);
373  ecount = 0;
374  fileid++;
375  }
376  file.write((char*)evtbuf, nbyte);
377  nbyte_out += nbyte;
378  if (use_info) {
379  info.addOutputCount(1);
380  info.addOutputNBytes(nbyte);
381  info.get()->reserved[0] = file.getFileId();
382  info.get()->reserved[1] = file.getDiskId();
383  info.get()->reserved_f[0] = (float)info.getOutputNBytes() / 1024. / 1024.;
384  }
385  } else {
386  if (!ecount) {
387  B2WARNING("no run was initialzed for recording : " << hd.expno << "." << hd.runno);
388  }
389  ecount = 1;
390  }
391  // Dump data into output buffer
392  if (!isnew && obufsize > 0 && count_out % interval == 0 && obuf.isWritable(nword)) {
393  obuf.write(evtbuf, nword, true);
394  }
395  count_out++;
396  }
397  }
398  return 0;
399 }
400 
Belle2::DBHandlerException
Definition: DBHandlerException.h:12
Belle2::RunInfoBuffer
Definition: RunInfoBuffer.h:15
Belle2::gearbox::FileHandler::FileHandler
FileHandler(const std::string &uri)
Instantiate a new file handler, using the uri as base search path.
Definition: FileHandler.cc:36
Belle2::SharedEventBuffer
Definition: SharedEventBuffer.h:14
Belle2::gearbox::FileHandler::~FileHandler
virtual ~FileHandler()
empty, virtual destructor
Definition: FileHandler.h:85
Belle2::SharedEventBuffer::Header
Definition: SharedEventBuffer.h:17
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::gearbox::FileHandler
InputHandler which will read XML from plain files, optionally gzip compressed.
Definition: FileHandler.h:77
alignment.constraints_generator.filename
filename
File name.
Definition: constraints_generator.py:224
Belle2::Date
Definition: Date.h:12
Belle2::gearbox::FileHandler::m_path
std::string m_path
Search path to look for files.
Definition: FileHandler.h:93
Belle2::PostgreSQLInterface
Definition: PostgreSQLInterface.h:16
Belle2::DBInterface
Definition: DBInterface.h:19
Belle2::ConfigFile
Definition: ConfigFile.h:15
Belle2::gearbox::FileHandler::open
virtual InputContext * open(const std::string &path) override
create a new FileContext by searching the file system for a file named like path.
Definition: FileHandler.cc:65