Belle II Software  release-08-01-10
storageout.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/logging/Logger.h>
10 
11 #include <daq/storage/SharedEventBuffer.h>
12 
13 #include <daq/slc/base/IOException.h>
14 #include <daq/slc/readout/RunInfoBuffer.h>
15 
16 #include <daq/slc/system/TCPServerSocket.h>
17 #include <daq/slc/system/TCPSocketWriter.h>
18 #include <daq/slc/system/LogFile.h>
19 
20 #include <arpa/inet.h>
21 
22 using namespace Belle2;
23 
24 int main(int argc, char** argv)
25 {
26  if (argc < 3) {
27  LogFile::debug("usage: %s bufname bufsize "
28  "port [nodename, nodeid]", argv[0]);
29  return 1;
30  }
31  RunInfoBuffer info;
32  bool use_info = (argc > 5);
33  if (use_info) {
34  info.open(argv[4], atoi(argv[5]));
35  }
36  SharedEventBuffer ibuf;
37  ibuf.open(argv[1], atoi(argv[2]) * 1000000);
38  int* evtbuf = new int[10000000];
39  info.reportRunning();
40  const int port = atoi(argv[3]);
41  TCPServerSocket serversocket("0.0.0.0", port);
42  try {
43  serversocket.open();
44  } catch (const IOException& e) {
45  B2ERROR("Failed to open server socket 0.0.0.0:" << port);
46  }
47  try {
48  while (true) {
49  unsigned int count_in = 0;
50  unsigned int count_out = 0;
51  unsigned int expno = 0;
52  unsigned int runno = 0;
53  unsigned int subno = 0;
54 
55  if (use_info) info.setOutputPort(0);
56  TCPSocket socket = serversocket.accept();
57  TCPSocketWriter writer(socket);
58  LogFile::info("Connected from expreco.");
59  if (use_info) info.setOutputPort(port);
61  while (true) {
62  long long nbyte = (ibuf.read(evtbuf, false, false, &hdr)) * sizeof(int);
63  if (expno < hdr.expno || runno < hdr.runno) {
64  expno = hdr.expno;
65  runno = hdr.runno;
66  if (use_info) {
67  info.setExpNumber(expno);
68  info.setRunNumber(runno);
69  info.setSubNumber(subno);
70  info.setInputCount(0);
71  info.setInputNBytes(0);
72  info.setOutputCount(0);
73  info.setOutputNBytes(0);
74  count_in = count_out = 0;
75  }
76  }
77  count_in++;
78  if (use_info) {
79  info.addInputCount(1);
80  info.addInputNBytes(nbyte);
81  }
82  unsigned long long nbyte_out = 0;
83  try {
84  unsigned int nByte = htonl(evtbuf[0]);
85  nbyte_out = writer.write(&nByte, sizeof(int));
86  nbyte_out += writer.write(evtbuf, evtbuf[0]);
87  //std::cout << "[DEBUG] In : "<< ntohl(nbyte) << " evtbuf[0] : " << evtbuf[0] << std::endl;
88  } catch (const IOException& e) {
89  LogFile::warning("Lost connection to expreco %s", e.what());
90  break;
91  }
92  if (nbyte_out == 0) {
93  LogFile::warning("Connection to expreco broken.");
94  if (use_info) info.setInputPort(0);
95  info.reportError(RunInfoBuffer::SOCKET_OUT);
96  break;
97  }
98  count_out++;
99  if (use_info) {
100  info.addOutputCount(1);
101  info.addOutputNBytes(nbyte_out);
102  }
103  }
104  socket.close();
105  }
106  } catch (const std::exception& e) {
107  B2ERROR("Unknown error: " << e.what());
108  }
109  return 0;
110 }
111 
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91