Belle II Software  release-08-02-04
storagein.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <unistd.h>
10 #include <cstdlib>
11 #include <iostream>
12 
13 #include <framework/logging/Logger.h>
14 
15 #include <daq/storage/BinData.h>
16 #include <daq/storage/SharedEventBuffer.h>
17 
18 #include <daq/slc/base/IOException.h>
19 #include <daq/slc/readout/RunInfoBuffer.h>
20 
21 #include <daq/slc/system/TCPSocket.h>
22 #include <daq/slc/system/TCPSocketReader.h>
23 #include <daq/slc/system/Time.h>
24 #include <daq/slc/system/LogFile.h>
25 
26 using namespace Belle2;
27 
28 int main(int argc, char** argv)
29 {
30  if (argc < 4) {
31  LogFile::debug("%s : bufname bufsize hostname port "
32  "[nodename, nodeid]", argv[0]);
33  return 1;
34  }
35 
36  RunInfoBuffer info;
37  bool use_info = (argc > 6);
38  if (use_info) {
39  info.open(argv[5], atoi(argv[6]));
40  }
41  SharedEventBuffer ibuf;
42  ibuf.open(argv[1], atoi(argv[2]) * 1000000);//, true);
43  info.reportReady();
44  TCPSocket socket(argv[3], atoi(argv[4]));
45  info.reportReady();
46  int* evtbuf = new int[10000000];
47  BinData data;
48  data.setBuffer(evtbuf);
49  Time t0;
50  int expno = 0;
51  int runno = 0;
52  int subno = 0;
53  int ntried = 0;
54  while (true) {
55  while (socket.get_fd() <= 0) {
56  try {
57  socket.connect();
58  B2INFO("Connected to data source");
59  socket.setBufferSize(32 * 1024 * 1024);
60  ntried = 0;
61  if (info.isAvailable()) {
62  info.setInputPort(socket.getLocalPort());
63  info.setInputAddress(socket.getLocalAddress());
64  }
65  break;
66  } catch (const IOException& e) {
67  socket.close();
68  if (info.isAvailable()) {
69  info.setInputPort(0);
70  info.setInputAddress(0);
71  }
72  if (ntried < 5)
73  B2WARNING("failed to connect to eb2 (try=" << ntried++ << ")");
74  sleep(5);
75  }
76  }
77  info.reportRunning();
78  try {
79  TCPSocketReader reader(socket);
80  B2INFO("storagein: Cconnected to eb2.");
81  int count = 0;
82  while (true) {
83  reader.read(data.getBuffer(), sizeof(int));
84  unsigned int nbyte = data.getByteSize() - sizeof(int);
85  int nword = data.getWordSize();
86  reader.read((data.getBuffer() + 1), nbyte);
87  nbyte += sizeof(int);
88  if (info.isAvailable()) {
89  info.addInputCount(1);
90  info.addInputNBytes(nbyte);
91  }
92  if (expno > data.getExpNumber() || runno > data.getRunNumber()) {
93  /*
94  B2WARNING("storagein: old run event detected : exp="
95  << data.getExpNumber() << " runno="
96  << data.getRunNumber() << " current = ("
97  << expno << "," << runno << ")");
98  */
99  continue;
100  } else if (expno < data.getExpNumber() || runno < data.getRunNumber()) {
101  expno = data.getExpNumber();
102  runno = data.getRunNumber();
103  B2INFO("new run detected : exp=" << expno << " runno=" << runno);
104  SharedEventBuffer::Header* iheader = ibuf.getHeader();
105  iheader->expno = expno;
106  iheader->runno = runno;
107  iheader->subno = subno;
108  if (info.isAvailable()) {
109  info.setExpNumber(expno);
110  info.setRunNumber(runno);
111  info.setSubNumber(subno);
112  info.setInputCount(0);
113  info.setInputNBytes(0);
114  info.setOutputCount(0);
115  info.setOutputNBytes(0);
116  }
117  count = 0;
118  }
119  if (count < 1000000 && (count < 10 || (count > 10 && count < 100 && count % 10 == 0) ||
120  (count > 100 && count < 1000 && count % 100 == 0) ||
121  (count > 1000 && count < 10000 && count % 1000 == 0) ||
122  (count > 10000 && count < 100000 && count % 10000 == 0) ||
123  (count > 100000 && count < 1000000 && count % 100000 == 0))) {
124  std::cout << "[DEBUG] Event count = " << count << " nword = " << nword << std::endl;
125  }
126  count++;
127  ibuf.write(data.getBuffer(), nword, true);
128  if (info.isAvailable()) {
129  info.addOutputCount(1);
130  info.addOutputNBytes(nword * sizeof(int));
131  }
132  }
133  } catch (const IOException& e) {
134  socket.close();
135  if (info.isAvailable()) info.setInputPort(0);
136  B2WARNING("Connection to eb2 broken.");
137  sleep(5);
138  }
139  }
140  return 0;
141 }
142 
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91