Belle II Software  release-08-01-10
stevtserver.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <unistd.h>
10 #include <cstdlib>
11 #include <cstring>
12 
13 #include <framework/logging/Logger.h>
14 
15 #include <daq/storage/BinData.h>
16 
17 #include <daq/slc/system/PThread.h>
18 #include <daq/slc/system/Cond.h>
19 #include <daq/slc/system/Mutex.h>
20 #include <daq/slc/system/TCPServerSocket.h>
21 #include <daq/slc/system/TCPSocket.h>
22 #include <daq/slc/system/TCPSocketReader.h>
23 #include <daq/slc/system/TCPSocketWriter.h>
24 #include <daq/slc/system/Time.h>
25 #include <daq/slc/system/LogFile.h>
26 #include <daq/slc/base/IOException.h>
27 
28 const unsigned int NWORD_BUFFER = 10000000;//10Mword
29 
30 namespace Belle2 {
36  Mutex g_mutex;
37  Cond g_cond;
38 
39  class EvtBuffer {
40 
41  public:
42  EvtBuffer(int id)
43  {
44  m_id = id;
45  m_evtbuf = new int[NWORD_BUFFER];
46  m_writable = true;
47  m_readable = false;
48  }
49 
50  public:
51  bool write(int* buf, int nbyte)
52  {
53  m_mutex.lock();
54  bool succeded = false;
55  if (m_writable) {
56  memcpy(m_evtbuf, buf, nbyte);
57  m_cond.signal();
58  m_writable = false;
59  m_readable = true;
60  succeded = true;
61  }
62  m_mutex.unlock();
63  return succeded;
64  }
65  bool writable()
66  {
67  m_mutex.lock();
68  bool succeded = false;
69  if (m_writable) {
70  succeded = true;
71  }
72  m_mutex.unlock();
73  return succeded;
74  }
75  int* buffer()
76  {
77  m_writable = false;
78  m_readable = true;
79  return m_evtbuf;
80  }
81  void read(Writer& writer)
82  {
83  m_mutex.lock();
84  while (true) {
85  if (!m_readable) {
86  m_cond.wait(m_mutex);
87  } else {
88  //int evtno = m_evtbuf[4];
89  //LogFile::info("write %d %d", m_id, evtno);
90  writer.write(m_evtbuf, m_evtbuf[0] * 4);
91  m_writable = true;
92  m_readable = false;
93  g_mutex.lock();
94  g_cond.signal();
95  g_mutex.unlock();
96  break;
97  }
98  }
99  m_mutex.unlock();
100  }
101  void lock() { m_mutex.lock(); }
102  void unlock() { m_mutex.unlock(); }
103  void signal() { m_cond.signal(); }
104 
105  private:
106  int m_id;
107  int* m_evtbuf;
108  Mutex m_mutex;
109  Cond m_cond;
110  bool m_writable;
111  bool m_readable;
112 
113  };
114 
115  class Sender {
116  public:
117  Sender(int port, EvtBuffer* buf)
118  : m_port(port), m_buf(buf) {}
119 
120  public:
121  void run()
122  {
123  TCPServerSocket server("0.0.0.0", m_port);
124  server.open();
125  while (true) {
126  TCPSocket socket;
127  try {
128  socket = server.accept();
129  socket.setBufferSize(32 * 1024 * 1024);
130  TCPSocketWriter writer(socket);
131  while (true) {
132  m_buf->read(writer);
133  }
134  } catch (const IOException& e) {
135  socket.close();
136  }
137  }
138  }
139 
140  private:
141  int m_port;
142  EvtBuffer* m_buf;
143 
144  };
145 
147 }
148 
149 using namespace Belle2;
150 
151 int main(int argc, char** argv)
152 {
153  if (argc < 4) {
154  LogFile::debug("%s : hostname port nsenders portbase", argv[0]);
155  return 1;
156  }
157 
158  const int nsenders = atoi(argv[3]);
159  const int portbase = atoi(argv[4]);
160  std::vector<EvtBuffer*> buf_v;
161 
162  for (int i = 0; i < nsenders; i++) {
163  EvtBuffer* buf = new EvtBuffer(i);
164  buf_v.push_back(buf);
165  PThread(new Sender(portbase + i + 1, buf));
166  }
167 
168  TCPSocket socket(argv[1], atoi(argv[2]));
169  int ntried = 0;
170  while (true) {
171  while (socket.get_fd() <= 0) {
172  try {
173  socket.connect();
174  B2INFO("Connected to upstream");
175  socket.setBufferSize(32 * 1024 * 1024);
176  ntried = 0;
177  break;
178  } catch (const IOException& e) {
179  socket.close();
180  if (ntried < 5)
181  B2WARNING("failed to connect to upstream (try=" << ntried++ << ")");
182  sleep(5);
183  }
184  }
185  try {
186  TCPSocketReader reader(socket);
187  B2INFO("storagein: Cconnected to eb2.");
188  int count = 0;
189  unsigned long ilast = 0;
190  BinData data;
191  int* evtbuf = new int[NWORD_BUFFER];
192  unsigned int nremains = 0;
193  double nbyteall = 0;
194  Time t0;
195  while (true) {
196  unsigned int nword = socket.read_once(evtbuf + nremains,
197  (NWORD_BUFFER - nremains) * sizeof(int)) / sizeof(int)
198  + nremains;
199  if (nword == 0) continue;
200  nbyteall += nword * 4;
201  //B2INFO("nword = " << nword << " nremains = " << nremains);
202  unsigned int offset = 0;
203  while (offset < nword && nword >= evtbuf[offset] + offset) {
204  if (offset > 0 && evtbuf[offset - 1] != (int)BinData::TRAILER_MAGIC) {
205  B2FATAL("Invalid trailer magic" << evtbuf[offset - 1] << "!=" << (int)BinData::TRAILER_MAGIC);
206  return 1;
207  }
208  int nword_evt = evtbuf[offset];
209  int evtno = evtbuf[offset + 4];
210  while (true) {
211  int i = ilast % nsenders;
212  ilast++;
213  if (buf_v[i]->writable()) {
214  buf_v[i]->write(&evtbuf[offset], nword_evt * 4);
215  offset += evtbuf[offset];
216  count++;
217  break;
218  }
219  if (i == 0 && ilast > 0) {
220  g_mutex.lock();
221  g_cond.wait(g_mutex);
222  g_mutex.unlock();
223  }
224  }
225  if (count < 1000000 && (count < 10 || (count > 10 && count < 100 && count % 10 == 0) ||
226  (count > 100 && count < 1000 && count % 100 == 0) ||
227  (count > 1000 && count < 10000 && count % 1000 == 0) ||
228  (count > 10000 && count < 100000 && count % 10000 == 0) ||
229  (count > 100000 && count < 1000000 && count % 100000 == 0))) {
230  B2INFO("Event count = " << count << " nword = " << nword_evt << " evtno = " << evtno);
231  }
232  const int nth = 100000;
233  if (count % nth == 0) {
234  Time t;
235  double dt = (t.get() - t0.get());
236  double freq = nth / dt / 1000.;
237  double rate = nbyteall / dt / 1000. / 1000.;
238  printf("[DEBUG] Serial = %d Freq = %f [kHz], Rate = %f [MB/s], DataSize = %f [kB/event]\n",
239  count, freq, rate, nbyteall / 1000. / nth);
240  t0 = t;
241  nbyteall = 0;
242  }
243  }
244  nremains = nword - offset;
245  if (nremains > 0) {
246  memmove(evtbuf, evtbuf + offset, nremains * sizeof(int));
247  }
248  }
249  } catch (const IOException& e) {
250  socket.close();
251  B2WARNING("Connection to upstream broken.");
252  sleep(5);
253  }
254  }
255  return 0;
256 }
257 
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91