13#include <framework/logging/Logger.h>
15#include <daq/storage/BinData.h>
17#include <daq/slc/system/PThread.h>
18#include <daq/slc/system/Cond.h>
19#include <daq/slc/system/Mutex.h>
20#include <daq/slc/system/TCPServerSocket.h>
21#include <daq/slc/system/TCPSocket.h>
22#include <daq/slc/system/TCPSocketReader.h>
23#include <daq/slc/system/TCPSocketWriter.h>
24#include <daq/slc/system/Time.h>
25#include <daq/slc/system/LogFile.h>
26#include <daq/slc/base/IOException.h>
28const unsigned int NWORD_BUFFER = 10000000;
45 m_evtbuf =
new int[NWORD_BUFFER];
51 bool write(
int* buf,
int nbyte)
54 bool succeded =
false;
56 memcpy(m_evtbuf, buf, nbyte);
68 bool succeded =
false;
90 writer.write(m_evtbuf, m_evtbuf[0] * 4);
101 void lock() { m_mutex.lock(); }
102 void unlock() { m_mutex.unlock(); }
103 void signal() { m_cond.signal(); }
118 : m_port(port), m_buf(buf) {}
128 socket = server.accept();
129 socket.setBufferSize(32 * 1024 * 1024);
151int main(
int argc,
char** argv)
154 LogFile::debug(
"%s : hostname port nsenders portbase", argv[0]);
158 const int nsenders = atoi(argv[3]);
159 const int portbase = atoi(argv[4]);
160 std::vector<EvtBuffer*> buf_v;
162 for (
int i = 0; i < nsenders; i++) {
164 buf_v.push_back(buf);
168 TCPSocket socket(argv[1], atoi(argv[2]));
171 while (socket.get_fd() <= 0) {
174 B2INFO(
"Connected to upstream");
175 socket.setBufferSize(32 * 1024 * 1024);
181 B2WARNING(
"failed to connect to upstream (try=" << ntried++ <<
")");
187 B2INFO(
"storagein: Cconnected to eb2.");
189 unsigned long ilast = 0;
191 int* evtbuf =
new int[NWORD_BUFFER];
192 unsigned int nremains = 0;
196 unsigned int nword = socket.read_once(evtbuf + nremains,
197 (NWORD_BUFFER - nremains) *
sizeof(
int)) /
sizeof(int)
199 if (nword == 0)
continue;
200 nbyteall += nword * 4;
202 unsigned int offset = 0;
203 while (offset < nword && nword >= evtbuf[offset] + offset) {
204 if (offset > 0 && evtbuf[offset - 1] != (
int)BinData::TRAILER_MAGIC) {
205 B2FATAL(
"Invalid trailer magic" << evtbuf[offset - 1] <<
"!=" << (
int)BinData::TRAILER_MAGIC);
208 int nword_evt = evtbuf[offset];
209 int evtno = evtbuf[offset + 4];
211 int i = ilast % nsenders;
213 if (buf_v[i]->writable()) {
214 buf_v[i]->write(&evtbuf[offset], nword_evt * 4);
215 offset += evtbuf[offset];
219 if (i == 0 && ilast > 0) {
221 g_cond.wait(g_mutex);
225 if (count < 1000000 && (count < 10 ||
226 (count > 10 && count < 100 && count % 10 == 0) ||
227 (count > 100 && count < 1000 && count % 100 == 0) ||
228 (count > 1000 && count < 10000 && count % 1000 == 0) ||
229 (count > 10000 && count < 100000 && count % 10000 == 0) ||
230 (count > 100000 && count % 100000 == 0))) {
231 B2INFO(
"Event count = " << count <<
" nword = " << nword_evt <<
" evtno = " << evtno);
233 const int nth = 100000;
234 if (count % nth == 0) {
236 double dt = (t.get() - t0.get());
237 double freq = nth / dt / 1000.;
238 double rate = nbyteall / dt / 1000. / 1000.;
239 printf(
"[DEBUG] Serial = %d Freq = %f [kHz], Rate = %f [MB/s], DataSize = %f [kB/event]\n",
240 count, freq, rate, nbyteall / 1000. / nth);
245 nremains = nword - offset;
247 memmove(evtbuf, evtbuf + offset, nremains *
sizeof(
int));
252 B2WARNING(
"Connection to upstream broken.");
Abstract base class for different kinds of events.