Belle II Software  release-05-01-25
storagerecord_arich.cc
1 #include <sys/types.h>
2 #include <sys/stat.h>
3 #include <fcntl.h>
4 
5 #include <framework/logging/Logger.h>
6 #include <framework/pcore/EvtMessage.h>
7 
8 #include <daq/storage/SharedEventBuffer.h>
9 
10 #include <daq/slc/database/DBHandlerException.h>
11 
12 #include <daq/slc/psql/PostgreSQLInterface.h>
13 
14 #include <daq/slc/readout/RunInfoBuffer.h>
15 
16 #include <daq/slc/base/ConfigFile.h>
17 #include <daq/slc/base/Date.h>
18 #include <daq/slc/base/StringUtil.h>
19 
20 #include <cstdio>
21 #include <cstring>
22 #include <fstream>
23 #include <iostream>
24 #include <signal.h>
25 #include <sys/statvfs.h>
26 #include <zlib.h>
27 
28 using namespace Belle2;
29 
30 const unsigned long long GB = 1000 * 1024 * 1024;
31 const unsigned long long MAX_FILE_SIZE = 2 * GB;
32 const char* g_table = "datafiles";
33 unsigned int g_streamersize = 0;
34 char* g_streamerinfo = new char[1000000];
35 int g_diskid = 0;
36 int g_expno = 0;
37 int g_runno = 0;
38 
39 class FileHandler {
40 
41 public:
42  FileHandler(DBInterface* db, const std::string& runtype,
43  const char* host, const char* dbtmp)
44  : m_db(db), m_runtype(runtype), m_host(host), m_dbtmp(dbtmp)
45  {
46  m_file = -1;
47  m_filesize = 0;
48  m_fileid = 0;
49  m_diskid = 1;
50  }
51  ~FileHandler() throw()
52  {
53  }
54 
55 public:
56  int getDiskId() { return m_diskid; }
57  int getFileId() { return m_fileid; }
58 
59 public:
60  int open(const std::string& dir, int ndisks, int expno, int runno, int fileid)
61  {
62  m_filesize = 0;
63  m_fileid = 0;
64  m_diskid = 1;
65  m_chksum = 1;
66  m_nevents = 0;
67  m_filesize = 0;
68  m_expno = g_expno;
69  m_runno = g_runno;
70  m_fileid = fileid;
71  bool available = false;
72  for (int i = 0; i < ndisks; i++) {
73  struct statvfs statfs;
74  char filename[1024];
75  sprintf(filename, "%s%02d", dir.c_str(), m_diskid);
76  statvfs(filename, &statfs);
77  float usage = 1 - ((float)statfs.f_bfree / statfs.f_blocks);
78  if (usage < 0.9) {
79  sprintf(filename, "%s%02d/storage/full_flag", dir.c_str(), m_diskid);
80  std::ifstream fin(filename);
81  int flag = 0;
82  fin >> flag;
83  if (flag != 1) {
84  available = true;
85  std::cout << "[DEBUG] disk : " << m_diskid << " is available" << std::endl;
86  break;
87  }
88  fin.close();
89  std::cout << "[DEBUG] disk : " << m_diskid << " is still full" << std::endl;
90  } else {
91  sprintf(filename, "%s%02d/storage/full_flag", dir.c_str(), m_diskid);
92  std::ofstream fout(filename);
93  fout << 1;
94  fout.close();
95  B2WARNING("disk-" << m_diskid << " is full " << usage);
96  }
97  m_diskid++;
98  if (m_diskid > ndisks) m_diskid = 1;
99  }
100  if (!available) {
101  B2FATAL("No disk available for writing");
102  exit(1);
103  }
104  std::string filedir = dir + StringUtil::form("%02d/storage/%4.4d/%5.5d/",
105  m_diskid, m_expno, m_runno);
106  system(("mkdir -p " + filedir).c_str());
107  m_filename = StringUtil::form("%s.%4.4d.%5.5d.%s.f%5.5d.sroot",
108  m_runtype.c_str(), m_expno, m_runno, m_host.c_str(), m_fileid);
109  m_path = filedir + m_filename;
110  m_file = ::open(m_path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0664);
111  if (g_diskid > 0 && g_diskid != m_diskid) {
112  B2FATAL("disk-" << m_diskid << " is already full! Terminating process..");
113  exit(1);
114  }
115  g_diskid = m_diskid;
116  if (m_file < 0) {
117  B2FATAL("Failed to open file : " << m_path);
118  exit(1);
119  }
120  try {
121  m_db->connect();
122  m_db->execute("insert into %s (name, path, host, label, expno, runno, fileno, nevents, chksum, size) "
123  "values ('%s', '%s', '%s', '%s', %d, %d, %d, 0, 0, 0);",
124  g_table, m_filename.c_str(), m_path.c_str(), m_host.c_str(),
125  m_runtype.c_str(), m_expno, m_runno, m_fileid);
126  } catch (const DBHandlerException& e) {
127  B2WARNING(e.what());
128  }
129  write(g_streamerinfo, g_streamersize, true);
130  B2INFO("New file " << m_path << " is opened");
131 
132  return m_id;
133  }
134 
135  void close()
136  {
137  if (m_file > 0) {
138  std::cout << "[DEBUG] File closed" << std::endl;
139  ::close(m_file);
140  try {
141  struct stat st;
142  stat(m_path.c_str(), &st);
143  std::string d = Date(st.st_mtime).toString();
144  //if (m_fileid == 0) m_nevents--;//1 entry for StreamerInfo
145  m_db->connect();
146  m_db->execute("update %s set time_close = '%s', chksum = %lu, nevents = %lu, "
147  "size = %lu where name = '%s' and host = '%s';",
148  g_table, d.c_str(), m_chksum, m_nevents, m_filesize,
149  m_filename.c_str(), m_host.c_str());
150  } catch (const DBHandlerException& e) {
151  B2WARNING(e.what());
152  }
153  m_db->close();
154  }
155  }
156 
157  int write(char* evtbuf, int nbyte, bool isstreamer = false)
158  {
159  int ret = ::write(m_file, evtbuf, nbyte);
160  m_filesize += nbyte;
161  if (!isstreamer) {
162  m_nevents++;
163  }
164  m_chksum = adler32(m_chksum, (unsigned char*)evtbuf, nbyte);
165  return ret;
166  }
167 
168  operator bool()
169  {
170  return m_file > 0;
171  }
172 
173 private:
174  DBInterface* m_db;
175  std::string m_runtype;
176  std::string m_host;
177  std::string m_dbtmp;
178  int m_id;
179  std::string m_filename;
180  std::string m_path;
181  int m_diskid;
182  int m_fileid;
183  int m_file;
184  std::string m_configname;
185  int m_expno;
186  int m_runno;
187  unsigned long long m_filesize;
188  unsigned long long m_chksum;
189  unsigned long long m_nevents;
190 };
191 
192 FileHandler* g_file = NULL;
193 
194 void signalHandler(int)
195 {
196  if (g_file) g_file->close();
197  exit(1);
198 }
199 
200 int main(int argc, char** argv)
201 {
202  if (argc < 8) {
203  printf("%s : <ibufname> <ibufsize> <hostname> <runtype> <path> <ndisk> "
204  "<filepath_dbtmp> [<obufname> <obufsize> nodename, nodeid]\n", argv[0]);
205  return 1;
206  }
207  //storagerecord STORE02:REC 10 HLT2 debug /rawdata/disk 11 /home/usr/stordaq/data/dbtmp.txt STORE02:OUT 10 store02_storagerecord 3
208  const unsigned interval = 1;
209  const char* ibufname = argv[1];
210  const int ibufsize = atoi(argv[2]);
211  const char* hostname = argv[3];
212  const char* runtype = argv[4];
213  const bool not_record = std::string(runtype) == "null";
214  const char* path = argv[5];
215  const int ndisks = atoi(argv[6]);
216  const char* file_dbtmp = argv[7];
217  const char* obufname = (argc > 7) ? argv[8] : "";
218  const int obufsize = (argc > 8) ? atoi(argv[9]) : -1;
219  const char* nodename = (argc > 9) ? argv[10] : "";
220  const int nodeid = (argc > 10) ? atoi(argv[11]) : -1;
221  const unsigned int ninput = (argc > 11) ? atoi(argv[12]) : 1;
222 
223  RunInfoBuffer info;
224  const bool use_info = nodeid >= 0;
225  if (use_info) {
226  info.open(nodename, nodeid);
227  }
228  SharedEventBuffer ibuf[10];
229  for (unsigned int ib = 0; ib < ninput; ib++) {
230  ibuf[ib].open(StringUtil::form("%s_%d", ibufname, ib), ibufsize * 1000000);//, true);
231  }
232  signal(SIGINT, signalHandler);
233  signal(SIGKILL, signalHandler);
234  ConfigFile config("slowcontrol");
235  PostgreSQLInterface* db = new PostgreSQLInterface(config.get("database.host"),
236  config.get("database.dbname"),
237  config.get("database.user"),
238  config.get("database.password"),
239  config.getInt("database.port"));
240  SharedEventBuffer obuf;
241  if (obufsize > 0) obuf.open(obufname, obufsize * 1000000);//, true);
242  if (use_info) info.reportReady();
243  B2DEBUG(1, "started recording.");
244  unsigned long long nbyte_out = 0;
245  unsigned int count_out = 0;
246  unsigned int expno = 0;
247  unsigned int runno = 0;
248  unsigned int subno = 0;
249  int* evtbuf = new int[10000000];
250  g_file = new FileHandler(db, runtype, hostname, file_dbtmp);
251  FileHandler& file(*g_file);
252  int ecount = 0;
253  bool newrun = false;
254  unsigned int fileid = 0;
255  struct dataheader {
256  int nword;
257  int type;
258  unsigned int expno;
259  unsigned int runno;
260  } hd;
261  g_streamersize = 0;
262  while (true) {
263  if (use_info) info.reportRunning();
264  if (g_runno == 0) {
265  while ((g_runno = info.getRunNumber()) <= 0) {
266  usleep(500);
267  }
268  g_expno = info.getExpNumber();
269  std::cout << "[DEBUG] expno = " << g_expno << ", runno =" << g_runno << std::endl;
270  }
271  for (unsigned int ib = 0; ib < ninput; ib++) {
272  ibuf[ib].lock();
273  ibuf[ib].read((int*)&hd, true, true);
274  ibuf[ib].read(evtbuf, true, true);
275  ibuf[ib].unlock();
276  int nbyte = evtbuf[0];
277  int nword = (nbyte - 1) / 4 + 1;
278  bool isnew = false;
279  if (not_record) continue;
280  if (hd.type == MSG_STREAMERINFO) {
281  memcpy(g_streamerinfo, evtbuf, nbyte);
282  g_streamersize = nbyte;
283  }
284  if (expno > hd.expno || runno > hd.runno) {
285  B2WARNING("Old run was detected => discard event exp = " << hd.expno << " (" << expno << "), runno" << hd.runno << "(" << runno <<
286  ")");
287  continue;
288  }
289  if (!newrun || expno < hd.expno || runno < hd.runno) {
290  newrun = true;
291  isnew = true;
292  expno = hd.expno;
293  runno = hd.runno;
294  //fileid = 0;
295  if (use_info) {
296  info.setExpNumber(expno);
297  info.setRunNumber(runno);
298  info.setSubNumber(subno);
299  info.setInputCount(0);
300  info.setInputNBytes(0);
301  info.setOutputCount(0);
302  info.setOutputNBytes(0);
303  nbyte_out = 0;
304  count_out = 0;
305  }
306  obuf.lock();
307  SharedEventBuffer::Header* oheader = obuf.getHeader();
308  oheader->expno = expno;
309  oheader->runno = runno;
310  obuf.unlock();
311  if (file) {
312  file.close();
313  }
314  file.open(path, ndisks, expno, runno, fileid);
315  nbyte_out += nbyte;
316  fileid++;
317  continue;
318  }
319  if (use_info) {
320  info.addInputCount(1);
321  info.addInputNBytes(nbyte);
322  }
323  if (hd.type == MSG_STREAMERINFO) {
324  continue;
325  }
326  if (file) {
327  /*
328  if (nbyte_out > MAX_FILE_SIZE) {
329  file.close();
330  nbyte_out = 0;
331  file.open(path, ndisks, expno, runno, fileid);
332  fileid++;
333  }
334  */
335  file.write((char*)evtbuf, nbyte);
336  nbyte_out += nbyte;
337  if (!isnew && obufsize > 0 && count_out % interval == 0 && obuf.isWritable(nword)) {
338  obuf.write(evtbuf, nword, true);
339  }
340  count_out++;
341  if (use_info) {
342  info.addOutputCount(1);
343  info.addOutputNBytes(nbyte);
344  info.get()->reserved[0] = file.getFileId();
345  info.get()->reserved[1] = file.getDiskId();
346  info.get()->reserved_f[0] = (float)info.getOutputNBytes() / 1024. / 1024.;
347  }
348  } else {
349  if (!ecount) {
350  B2WARNING("no run was initialzed for recording : " << hd.expno << "." << hd.runno);
351  }
352  ecount = 1;
353  }
354  }
355  }
356  return 0;
357 }
358 
Belle2::DBHandlerException
Definition: DBHandlerException.h:12
Belle2::RunInfoBuffer
Definition: RunInfoBuffer.h:15
Belle2::gearbox::FileHandler::FileHandler
FileHandler(const std::string &uri)
Instantiate a new file handler, using the uri as base search path.
Definition: FileHandler.cc:36
prepareAsicCrosstalkSimDB.e
e
aux.
Definition: prepareAsicCrosstalkSimDB.py:53
Belle2::SharedEventBuffer
Definition: SharedEventBuffer.h:14
Belle2::gearbox::FileHandler::~FileHandler
virtual ~FileHandler()
empty, virtual destructor
Definition: FileHandler.h:85
Belle2::SharedEventBuffer::Header
Definition: SharedEventBuffer.h:17
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::gearbox::FileHandler
InputHandler which will read XML from plain files, optionally gzip compressed.
Definition: FileHandler.h:77
alignment.constraints_generator.filename
filename
File name.
Definition: constraints_generator.py:224
Belle2::Date
Definition: Date.h:12
Belle2::gearbox::FileHandler::m_path
std::string m_path
Search path to look for files.
Definition: FileHandler.h:93
Belle2::PostgreSQLInterface
Definition: PostgreSQLInterface.h:16
Belle2::DBInterface
Definition: DBInterface.h:19
Belle2::ConfigFile
Definition: ConfigFile.h:15
Belle2::gearbox::FileHandler::open
virtual InputContext * open(const std::string &path) override
create a new FileContext by searching the file system for a file named like path.
Definition: FileHandler.cc:65