Belle II Software development
storageout.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/logging/Logger.h>
10
11#include <daq/storage/SharedEventBuffer.h>
12
13#include <daq/slc/base/IOException.h>
14#include <daq/slc/readout/RunInfoBuffer.h>
15
16#include <daq/slc/system/TCPServerSocket.h>
17#include <daq/slc/system/TCPSocketWriter.h>
18#include <daq/slc/system/LogFile.h>
19
20#include <arpa/inet.h>
21
22using namespace Belle2;
23
24int main(int argc, char** argv)
25{
26 if (argc < 3) {
27 LogFile::debug("usage: %s bufname bufsize "
28 "port [nodename, nodeid]", argv[0]);
29 return 1;
30 }
31 RunInfoBuffer info;
32 bool use_info = (argc > 5);
33 if (use_info) {
34 info.open(argv[4], atoi(argv[5]));
35 }
37 ibuf.open(argv[1], atoi(argv[2]) * 1000000);
38 int* evtbuf = new int[10000000];
39 info.reportRunning();
40 const int port = atoi(argv[3]);
41 TCPServerSocket serversocket("0.0.0.0", port);
42 try {
43 serversocket.open();
44 } catch (const IOException& e) {
45 B2ERROR("Failed to open server socket 0.0.0.0:" << port);
46 }
47 try {
48 while (true) {
49 unsigned int count_in = 0;
50 unsigned int count_out = 0;
51 unsigned int expno = 0;
52 unsigned int runno = 0;
53 unsigned int subno = 0;
54
55 if (use_info) info.setOutputPort(0);
56 TCPSocket socket = serversocket.accept();
57 TCPSocketWriter writer(socket);
58 LogFile::info("Connected from expreco.");
59 if (use_info) info.setOutputPort(port);
61 while (true) {
62 long long nbyte = (ibuf.read(evtbuf, false, false, &hdr)) * sizeof(int);
63 if (expno < hdr.expno || runno < hdr.runno) {
64 expno = hdr.expno;
65 runno = hdr.runno;
66 if (use_info) {
67 info.setExpNumber(expno);
68 info.setRunNumber(runno);
69 info.setSubNumber(subno);
70 info.setInputCount(0);
71 info.setInputNBytes(0);
72 info.setOutputCount(0);
73 info.setOutputNBytes(0);
74 count_in = count_out = 0;
75 }
76 }
77 count_in++;
78 if (use_info) {
79 info.addInputCount(1);
80 info.addInputNBytes(nbyte);
81 }
82 unsigned long long nbyte_out = 0;
83 try {
84 unsigned int nByte = htonl(evtbuf[0]);
85 nbyte_out = writer.write(&nByte, sizeof(int));
86 nbyte_out += writer.write(evtbuf, evtbuf[0]);
87 //std::cout << "[DEBUG] In : "<< ntohl(nbyte) << " evtbuf[0] : " << evtbuf[0] << std::endl;
88 } catch (const IOException& e) {
89 LogFile::warning("Lost connection to expreco %s", e.what());
90 break;
91 }
92 if (nbyte_out == 0) {
93 LogFile::warning("Connection to expreco broken.");
94 if (use_info) info.setInputPort(0);
95 info.reportError(RunInfoBuffer::SOCKET_OUT);
96 break;
97 }
98 count_out++;
99 if (use_info) {
100 info.addOutputCount(1);
101 info.addOutputNBytes(nbyte_out);
102 }
103 }
104 socket.close();
105 }
106 } catch (const std::exception& e) {
107 B2ERROR("Unknown error: " << e.what());
108 }
109 return 0;
110}
111
Abstract base class for different kinds of events.