Belle II Software development
stevtserver.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <unistd.h>
10#include <cstdlib>
11#include <cstring>
12
13#include <framework/logging/Logger.h>
14
15#include <daq/storage/BinData.h>
16
17#include <daq/slc/system/PThread.h>
18#include <daq/slc/system/Cond.h>
19#include <daq/slc/system/Mutex.h>
20#include <daq/slc/system/TCPServerSocket.h>
21#include <daq/slc/system/TCPSocket.h>
22#include <daq/slc/system/TCPSocketReader.h>
23#include <daq/slc/system/TCPSocketWriter.h>
24#include <daq/slc/system/Time.h>
25#include <daq/slc/system/LogFile.h>
26#include <daq/slc/base/IOException.h>
27
28const unsigned int NWORD_BUFFER = 10000000;//10Mword
29
30namespace Belle2 {
36 Mutex g_mutex;
37 Cond g_cond;
38
39 class EvtBuffer {
40
41 public:
42 EvtBuffer(int id)
43 {
44 m_id = id;
45 m_evtbuf = new int[NWORD_BUFFER];
46 m_writable = true;
47 m_readable = false;
48 }
49
50 public:
51 bool write(int* buf, int nbyte)
52 {
53 m_mutex.lock();
54 bool succeded = false;
55 if (m_writable) {
56 memcpy(m_evtbuf, buf, nbyte);
57 m_cond.signal();
58 m_writable = false;
59 m_readable = true;
60 succeded = true;
61 }
62 m_mutex.unlock();
63 return succeded;
64 }
65 bool writable()
66 {
67 m_mutex.lock();
68 bool succeded = false;
69 if (m_writable) {
70 succeded = true;
71 }
72 m_mutex.unlock();
73 return succeded;
74 }
75 int* buffer()
76 {
77 m_writable = false;
78 m_readable = true;
79 return m_evtbuf;
80 }
81 void read(Writer& writer)
82 {
83 m_mutex.lock();
84 while (true) {
85 if (!m_readable) {
86 m_cond.wait(m_mutex);
87 } else {
88 //int evtno = m_evtbuf[4];
89 //LogFile::info("write %d %d", m_id, evtno);
90 writer.write(m_evtbuf, m_evtbuf[0] * 4);
91 m_writable = true;
92 m_readable = false;
93 g_mutex.lock();
94 g_cond.signal();
95 g_mutex.unlock();
96 break;
97 }
98 }
99 m_mutex.unlock();
100 }
101 void lock() { m_mutex.lock(); }
102 void unlock() { m_mutex.unlock(); }
103 void signal() { m_cond.signal(); }
104
105 private:
106 int m_id;
107 int* m_evtbuf;
108 Mutex m_mutex;
109 Cond m_cond;
110 bool m_writable;
111 bool m_readable;
112
113 };
114
115 class Sender {
116 public:
117 Sender(int port, EvtBuffer* buf)
118 : m_port(port), m_buf(buf) {}
119
120 public:
121 void run()
122 {
123 TCPServerSocket server("0.0.0.0", m_port);
124 server.open();
125 while (true) {
126 TCPSocket socket;
127 try {
128 socket = server.accept();
129 socket.setBufferSize(32 * 1024 * 1024);
130 TCPSocketWriter writer(socket);
131 while (true) {
132 m_buf->read(writer);
133 }
134 } catch (const IOException& e) {
135 socket.close();
136 }
137 }
138 }
139
140 private:
141 int m_port;
142 EvtBuffer* m_buf;
143
144 };
145
147}
148
149using namespace Belle2;
150
151int main(int argc, char** argv)
152{
153 if (argc < 4) {
154 LogFile::debug("%s : hostname port nsenders portbase", argv[0]);
155 return 1;
156 }
157
158 const int nsenders = atoi(argv[3]);
159 const int portbase = atoi(argv[4]);
160 std::vector<EvtBuffer*> buf_v;
161
162 for (int i = 0; i < nsenders; i++) {
163 EvtBuffer* buf = new EvtBuffer(i);
164 buf_v.push_back(buf);
165 PThread(new Sender(portbase + i + 1, buf));
166 }
167
168 TCPSocket socket(argv[1], atoi(argv[2]));
169 int ntried = 0;
170 while (true) {
171 while (socket.get_fd() <= 0) {
172 try {
173 socket.connect();
174 B2INFO("Connected to upstream");
175 socket.setBufferSize(32 * 1024 * 1024);
176 ntried = 0;
177 break;
178 } catch (const IOException& e) {
179 socket.close();
180 if (ntried < 5)
181 B2WARNING("failed to connect to upstream (try=" << ntried++ << ")");
182 sleep(5);
183 }
184 }
185 try {
186 TCPSocketReader reader(socket);
187 B2INFO("storagein: Cconnected to eb2.");
188 int count = 0;
189 unsigned long ilast = 0;
190 BinData data;
191 int* evtbuf = new int[NWORD_BUFFER];
192 unsigned int nremains = 0;
193 double nbyteall = 0;
194 Time t0;
195 while (true) {
196 unsigned int nword = socket.read_once(evtbuf + nremains,
197 (NWORD_BUFFER - nremains) * sizeof(int)) / sizeof(int)
198 + nremains;
199 if (nword == 0) continue;
200 nbyteall += nword * 4;
201 //B2INFO("nword = " << nword << " nremains = " << nremains);
202 unsigned int offset = 0;
203 while (offset < nword && nword >= evtbuf[offset] + offset) {
204 if (offset > 0 && evtbuf[offset - 1] != (int)BinData::TRAILER_MAGIC) {
205 B2FATAL("Invalid trailer magic" << evtbuf[offset - 1] << "!=" << (int)BinData::TRAILER_MAGIC);
206 return 1;
207 }
208 int nword_evt = evtbuf[offset];
209 int evtno = evtbuf[offset + 4];
210 while (true) {
211 int i = ilast % nsenders;
212 ilast++;
213 if (buf_v[i]->writable()) {
214 buf_v[i]->write(&evtbuf[offset], nword_evt * 4);
215 offset += evtbuf[offset];
216 count++;
217 break;
218 }
219 if (i == 0 && ilast > 0) {
220 g_mutex.lock();
221 g_cond.wait(g_mutex);
222 g_mutex.unlock();
223 }
224 }
225 if (count < 1000000 && (count < 10 ||
226 (count > 10 && count < 100 && count % 10 == 0) ||
227 (count > 100 && count < 1000 && count % 100 == 0) ||
228 (count > 1000 && count < 10000 && count % 1000 == 0) ||
229 (count > 10000 && count < 100000 && count % 10000 == 0) ||
230 (count > 100000 && count % 100000 == 0))) {
231 B2INFO("Event count = " << count << " nword = " << nword_evt << " evtno = " << evtno);
232 }
233 const int nth = 100000;
234 if (count % nth == 0) {
235 Time t;
236 double dt = (t.get() - t0.get());
237 double freq = nth / dt / 1000.;
238 double rate = nbyteall / dt / 1000. / 1000.;
239 printf("[DEBUG] Serial = %d Freq = %f [kHz], Rate = %f [MB/s], DataSize = %f [kB/event]\n",
240 count, freq, rate, nbyteall / 1000. / nth);
241 t0 = t;
242 nbyteall = 0;
243 }
244 }
245 nremains = nword - offset;
246 if (nremains > 0) {
247 memmove(evtbuf, evtbuf + offset, nremains * sizeof(int));
248 }
249 }
250 } catch (const IOException& e) {
251 socket.close();
252 B2WARNING("Connection to upstream broken.");
253 sleep(5);
254 }
255 }
256 return 0;
257}
258
Abstract base class for different kinds of events.