Belle II Software  release-06-01-15
storageout.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/logging/Logger.h>
10 
11 #include <daq/storage/SharedEventBuffer.h>
12 
13 #include <daq/slc/base/IOException.h>
14 #include <daq/slc/readout/RunInfoBuffer.h>
15 
16 #include <daq/slc/system/TCPServerSocket.h>
17 #include <daq/slc/system/TCPSocketWriter.h>
18 #include <daq/slc/system/LogFile.h>
19 
20 #include <arpa/inet.h>
21 
22 using namespace Belle2;
23 
24 int main(int argc, char** argv)
25 {
26  if (argc < 3) {
27  LogFile::debug("usage: %s bufname bufsize "
28  "port [nodename, nodeid]", argv[0]);
29  return 1;
30  }
31  RunInfoBuffer info;
32  bool use_info = (argc > 5);
33  if (use_info) {
34  info.open(argv[4], atoi(argv[5]));
35  }
36  SharedEventBuffer ibuf;
37  ibuf.open(argv[1], atoi(argv[2]) * 1000000);
38  int* evtbuf = new int[10000000];
39  unsigned int count_in = 0;
40  unsigned int count_out = 0;
41  unsigned int expno = 0;
42  unsigned int runno = 0;
43  unsigned int subno = 0;
44  info.reportRunning();
45  const int port = atoi(argv[3]);
46  TCPServerSocket serversocket("0.0.0.0", port);
47  try {
48  serversocket.open();
49  } catch (const IOException& e) {
50  B2ERROR("Failed to open server socket 0.0.0.0:" << port);
51  }
52  try {
53  while (true) {
54  if (use_info) info.setOutputPort(0);
55  TCPSocket socket = serversocket.accept();
56  TCPSocketWriter writer(socket);
57  LogFile::info("Connected from expreco.");
58  if (use_info) info.setOutputPort(port);
60  while (true) {
61  long long nbyte = (ibuf.read(evtbuf, false, false, &hdr)) * sizeof(int);
62  if (expno < hdr.expno || runno < hdr.runno) {
63  expno = hdr.expno;
64  runno = hdr.runno;
65  if (use_info) {
66  info.setExpNumber(expno);
67  info.setRunNumber(runno);
68  info.setSubNumber(subno);
69  info.setInputCount(0);
70  info.setInputNBytes(0);
71  info.setOutputCount(0);
72  info.setOutputNBytes(0);
73  count_in = count_out = 0;
74  }
75  }
76  count_in++;
77  if (use_info) {
78  info.addInputCount(1);
79  info.addInputNBytes(nbyte);
80  }
81  unsigned long long nbyte_out = 0;
82  try {
83  unsigned int nbyte = htonl(evtbuf[0]);
84  nbyte_out = writer.write(&nbyte, sizeof(int));
85  nbyte_out += writer.write(evtbuf, evtbuf[0]);
86  //std::cout << "[DEBUG] In : "<< ntohl(nbyte) << " evtbuf[0] : " << evtbuf[0] << std::endl;
87  } catch (const IOException& e) {
88  LogFile::warning("Lost connection to expreco %s", e.what());
89  break;
90  }
91  if (nbyte_out <= 0) {
92  LogFile::warning("Connection to expreco broken.");
93  if (use_info) info.setInputPort(0);
94  info.reportError(RunInfoBuffer::SOCKET_OUT);
95  break;
96  }
97  count_out++;
98  if (use_info) {
99  info.addOutputCount(1);
100  info.addOutputNBytes(nbyte_out);
101  }
102  }
103  socket.close();
104  }
105  } catch (const std::exception& e) {
106  B2ERROR("Unknown error: " << e.what());
107  }
108  return 0;
109 }
110 
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:75