Belle II Software  release-08-01-10
storagerecord_arich.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 
12 #include <framework/logging/Logger.h>
13 #include <framework/pcore/EvtMessage.h>
14 
15 #include <daq/storage/SharedEventBuffer.h>
16 
17 #include <daq/slc/database/DBHandlerException.h>
18 
19 #include <daq/slc/psql/PostgreSQLInterface.h>
20 
21 #include <daq/slc/readout/RunInfoBuffer.h>
22 
23 #include <daq/slc/base/ConfigFile.h>
24 #include <daq/slc/base/Date.h>
25 #include <daq/slc/base/StringUtil.h>
26 
27 #include <cstdio>
28 #include <cstring>
29 #include <fstream>
30 #include <iostream>
31 #include <signal.h>
32 #include <sys/statvfs.h>
33 #include <zlib.h>
34 
35 using namespace Belle2;
36 
37 const unsigned long long GB = 1000 * 1024 * 1024;
38 const unsigned long long MAX_FILE_SIZE = 2 * GB;
39 const char* g_table = "datafiles";
40 unsigned int g_streamersize = 0;
41 char* g_streamerinfo = new char[1000000];
42 int g_diskid = 0;
43 int g_expno = 0;
44 int g_runno = 0;
45 
46 class FileHandler {
47 
48 public:
49  FileHandler(DBInterface* db, const std::string& runtype,
50  const char* host, const char* dbtmp)
51  : m_db(db), m_runtype(runtype), m_host(host), m_dbtmp(dbtmp)
52  {
53  m_file = -1;
54  m_filesize = 0;
55  m_fileid = 0;
56  m_diskid = 1;
57  }
58  ~FileHandler() throw()
59  {
60  }
61 
62 public:
63  int getDiskId() { return m_diskid; }
64  int getFileId() { return m_fileid; }
65 
66 public:
67  int open(const std::string& dir, int ndisks, int /*expno*/, int /*runno*/, int fileid)
68  {
69  m_filesize = 0;
70  m_fileid = 0;
71  m_diskid = 1;
72  m_chksum = 1;
73  m_nevents = 0;
74  m_filesize = 0;
75  m_expno = g_expno;
76  m_runno = g_runno;
77  m_fileid = fileid;
78  bool available = false;
79  for (int i = 0; i < ndisks; i++) {
80  struct statvfs statfs;
81  char filename[1024];
82  sprintf(filename, "%s%02d", dir.c_str(), m_diskid);
83  statvfs(filename, &statfs);
84  float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
85  if (usage < 0.7) {
86  sprintf(filename, "%s%02d/storage/full_flag", dir.c_str(), m_diskid);
87  std::ifstream fin(filename);
88  int flag = 0;
89  fin >> flag;
90  if (flag != 1) {
91  available = true;
92  std::cout << "[DEBUG] disk : " << m_diskid << " is available" << std::endl;
93  break;
94  }
95  fin.close();
96  std::cout << "[DEBUG] disk : " << m_diskid << " is still full" << std::endl;
97  } else {
98  sprintf(filename, "%s%02d/storage/full_flag", dir.c_str(), m_diskid);
99  std::ofstream fout(filename);
100  fout << 1;
101  fout.close();
102  B2WARNING("disk-" << m_diskid << " is full " << usage);
103  }
104  m_diskid++;
105  if (m_diskid > ndisks) m_diskid = 1;
106  }
107  if (!available) {
108  B2FATAL("No disk available for writing");
109  exit(1);
110  }
111  std::string filedir = dir + StringUtil::form("%02d/storage/%4.4d/%5.5d/",
112  m_diskid, m_expno, m_runno);
113  system(("mkdir -p " + filedir).c_str());
114  m_filename = StringUtil::form("%s.%4.4d.%5.5d.%s.f%5.5d.sroot",
115  m_runtype.c_str(), m_expno, m_runno, m_host.c_str(), m_fileid);
116  m_path = filedir + m_filename;
117  m_file = ::open(m_path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0664);
118  if (g_diskid > 0 && g_diskid != m_diskid) {
119  B2FATAL("disk-" << m_diskid << " is already full! Terminating process..");
120  exit(1);
121  }
122  g_diskid = m_diskid;
123  if (m_file < 0) {
124  B2FATAL("Failed to open file : " << m_path);
125  exit(1);
126  }
127  try {
128  m_db->connect();
129  m_db->execute("insert into %s (name, path, host, label, expno, runno, fileno, nevents, chksum, size) "
130  "values ('%s', '%s', '%s', '%s', %d, %d, %d, 0, 0, 0);",
131  g_table, m_filename.c_str(), m_path.c_str(), m_host.c_str(),
132  m_runtype.c_str(), m_expno, m_runno, m_fileid);
133  } catch (const DBHandlerException& e) {
134  B2WARNING(e.what());
135  }
136  write(g_streamerinfo, g_streamersize, true);
137  B2INFO("New file " << m_path << " is opened");
138 
139  return m_id;
140  }
141 
142  void close()
143  {
144  if (m_file > 0) {
145  std::cout << "[DEBUG] File closed" << std::endl;
146  ::close(m_file);
147  try {
148  struct stat st;
149  stat(m_path.c_str(), &st);
150  std::string d = Date(st.st_mtime).toString();
151  //if (m_fileid == 0) m_nevents--;//1 entry for StreamerInfo
152  m_db->connect();
153  m_db->execute("update %s set time_close = '%s', chksum = %lu, nevents = %lu, "
154  "size = %lu where name = '%s' and host = '%s';",
155  g_table, d.c_str(), m_chksum, m_nevents, m_filesize,
156  m_filename.c_str(), m_host.c_str());
157  } catch (const DBHandlerException& e) {
158  B2WARNING(e.what());
159  }
160  m_db->close();
161  }
162  }
163 
164  int write(char* evtbuf, int nbyte, bool isstreamer = false)
165  {
166  int ret = ::write(m_file, evtbuf, nbyte);
167  m_filesize += nbyte;
168  if (!isstreamer) {
169  m_nevents++;
170  }
171  m_chksum = adler32(m_chksum, (unsigned char*)evtbuf, nbyte);
172  return ret;
173  }
174 
175  operator bool()
176  {
177  return m_file > 0;
178  }
179 
180 private:
181  DBInterface* m_db;
182  std::string m_runtype;
183  std::string m_host;
184  std::string m_dbtmp;
185  int m_id;
186  std::string m_filename;
187  std::string m_path;
188  int m_diskid;
189  int m_fileid;
190  int m_file;
191  std::string m_configname;
192  int m_expno;
193  int m_runno;
194  unsigned long long m_filesize;
195  unsigned long long m_chksum;
196  unsigned long long m_nevents;
197 };
198 
199 FileHandler* g_file = NULL;
200 
201 void signalHandler(int)
202 {
203  if (g_file) g_file->close();
204  exit(1);
205 }
206 
207 int main(int argc, char** argv)
208 {
209  if (argc < 8) {
210  printf("%s : <ibufname> <ibufsize> <hostname> <runtype> <path> <ndisk> "
211  "<filepath_dbtmp> [<obufname> <obufsize> nodename, nodeid]\n", argv[0]);
212  return 1;
213  }
214  //storagerecord STORE02:REC 10 HLT2 debug /rawdata/disk 11 /home/usr/stordaq/data/dbtmp.txt STORE02:OUT 10 store02_storagerecord 3
215  const unsigned interval = 1;
216  const char* ibufname = argv[1];
217  const int ibufsize = atoi(argv[2]);
218  const char* hostname = argv[3];
219  const char* runtype = argv[4];
220  const bool not_record = std::string(runtype).find(std::string("null")) != std::string::npos;
221  const char* path = argv[5];
222  const int ndisks = atoi(argv[6]);
223  const char* file_dbtmp = argv[7];
224  const char* obufname = (argc > 7) ? argv[8] : "";
225  const int obufsize = (argc > 8) ? atoi(argv[9]) : -1;
226  const char* nodename = (argc > 9) ? argv[10] : "";
227  const int nodeid = (argc > 10) ? atoi(argv[11]) : -1;
228  const unsigned int ninput = (argc > 11) ? atoi(argv[12]) : 1;
229 
230  RunInfoBuffer info;
231  const bool use_info = nodeid >= 0;
232  if (use_info) {
233  info.open(nodename, nodeid);
234  }
235  SharedEventBuffer ibuf[10];
236  for (unsigned int ib = 0; ib < ninput; ib++) {
237  ibuf[ib].open(StringUtil::form("%s_%d", ibufname, ib), ibufsize * 1000000);//, true);
238  }
239  signal(SIGINT, signalHandler);
240  signal(SIGKILL, signalHandler);
241  ConfigFile config("slowcontrol");
242  PostgreSQLInterface* db = new PostgreSQLInterface(config.get("database.host"),
243  config.get("database.dbname"),
244  config.get("database.user"),
245  config.get("database.password"),
246  config.getInt("database.port"));
247  SharedEventBuffer obuf;
248  if (obufsize > 0) obuf.open(obufname, obufsize * 1000000);//, true);
249  if (use_info) info.reportReady();
250  B2DEBUG(1, "started recording.");
251  unsigned long long nbyte_out = 0;
252  unsigned int count_out = 0;
253  unsigned int expno = 0;
254  unsigned int runno = 0;
255  unsigned int subno = 0;
256  int* evtbuf = new int[10000000];
257  g_file = new FileHandler(db, runtype, hostname, file_dbtmp);
258  FileHandler& file(*g_file);
259  int ecount = 0;
260  bool newrun = false;
261  unsigned int fileid = 0;
262  struct dataheader {
263  int nword;
264  int type;
265  unsigned int expno;
266  unsigned int runno;
267  } hd;
268  g_streamersize = 0;
269  while (true) {
270  if (use_info) info.reportRunning();
271  if (g_runno == 0) {
272  while ((g_runno = info.getRunNumber()) <= 0) {
273  usleep(500);
274  }
275  g_expno = info.getExpNumber();
276  std::cout << "[DEBUG] expno = " << g_expno << ", runno =" << g_runno << std::endl;
277  }
278  for (unsigned int ib = 0; ib < ninput; ib++) {
279  ibuf[ib].lock();
280  ibuf[ib].read((int*)&hd, true, true);
281  ibuf[ib].read(evtbuf, true, true);
282  ibuf[ib].unlock();
283  int nbyte = evtbuf[0];
284  int nword = (nbyte - 1) / 4 + 1;
285  bool isnew = false;
286  if (not_record) continue;
287  if (hd.type == MSG_STREAMERINFO) {
288  memcpy(g_streamerinfo, evtbuf, nbyte);
289  g_streamersize = nbyte;
290  }
291  if (expno > hd.expno || runno > hd.runno) {
292  B2WARNING("Old run was detected => discard event exp = " << hd.expno << " (" << expno << "), runno" << hd.runno << "(" << runno <<
293  ")");
294  continue;
295  }
296  if (!newrun || expno < hd.expno || runno < hd.runno) {
297  newrun = true;
298  isnew = true;
299  expno = hd.expno;
300  runno = hd.runno;
301  //fileid = 0;
302  if (use_info) {
303  info.setExpNumber(expno);
304  info.setRunNumber(runno);
305  info.setSubNumber(subno);
306  info.setInputCount(0);
307  info.setInputNBytes(0);
308  info.setOutputCount(0);
309  info.setOutputNBytes(0);
310  nbyte_out = 0;
311  count_out = 0;
312  }
313  obuf.lock();
314  SharedEventBuffer::Header* oheader = obuf.getHeader();
315  oheader->expno = expno;
316  oheader->runno = runno;
317  obuf.unlock();
318  if (file) {
319  file.close();
320  }
321  file.open(path, ndisks, expno, runno, fileid);
322  nbyte_out += nbyte;
323  fileid++;
324  continue;
325  }
326  if (use_info) {
327  info.addInputCount(1);
328  info.addInputNBytes(nbyte);
329  }
330  if (hd.type == MSG_STREAMERINFO) {
331  continue;
332  }
333  if (file) {
334  /*
335  if (nbyte_out > MAX_FILE_SIZE) {
336  file.close();
337  nbyte_out = 0;
338  file.open(path, ndisks, expno, runno, fileid);
339  fileid++;
340  }
341  */
342  file.write((char*)evtbuf, nbyte);
343  nbyte_out += nbyte;
344  if (!isnew && obufsize > 0 && count_out % interval == 0 && obuf.isWritable(nword)) {
345  obuf.write(evtbuf, nword, true);
346  }
347  count_out++;
348  if (use_info) {
349  info.addOutputCount(1);
350  info.addOutputNBytes(nbyte);
351  info.get()->reserved[0] = file.getFileId();
352  info.get()->reserved[1] = file.getDiskId();
353  info.get()->reserved_f[0] = (float)info.getOutputNBytes() / 1024. / 1024.;
354  }
355  } else {
356  if (!ecount) {
357  B2WARNING("no run was initialzed for recording : " << hd.expno << "." << hd.runno);
358  }
359  ecount = 1;
360  }
361  }
362  }
363  return 0;
364 }
365 
InputHandler which will read XML from plain files, optionally gzip compressed.
Definition: FileHandler.h:67
virtual ~FileHandler()
empty, virtual destructor
Definition: FileHandler.h:75
std::string m_path
Search path to look for files.
Definition: FileHandler.h:83
virtual InputContext * open(const std::string &path) override
create a new FileContext by searching the file system for a file named like path.
Definition: FileHandler.cc:64
FileHandler(const std::string &uri)
Instantiate a new file handler, using the uri as base search path.
Definition: FileHandler.cc:35
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91