Belle II Software  release-08-01-10
b2hlt_file2socket.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
9 #include <daq/rfarm/event/hltsocket/HLTFile.h>
10 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
11 #include <framework/logging/Logger.h>
12 #include <framework/pcore/EvtMessage.h>
13 
14 #include <boost/program_options.hpp>
15 #include <chrono>
16 #include <iostream>
17 
18 #define MAXEVTSIZE 80000000
19 
20 using namespace Belle2;
21 namespace po = boost::program_options;
22 
23 int main(int argc, char* argv[])
24 {
25  unsigned int port;
26  std::string sourceHost;
27  bool raw;
28  bool repeat;
29  std::string file_name;
30 
31  po::options_description desc("b2hlt_file2socket FILE-NAME PORT SHM-NAME SHM-ID");
32  desc.add_options()
33  ("help,h", "Print this help message")
34  ("port,p", po::value<unsigned int>(&port)->required(), "port number to connect or listen to")
35  ("file-name,f", po::value<std::string>(&file_name)->required(), "file name to write to")
36  ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers")
37  ("repeat", po::bool_switch(&repeat)->default_value(false), "repeat after the file is finished");
38 
39 
40  po::positional_options_description p;
41  p.add("file-name", 1).add("port", 1);
42 
43  po::variables_map vm;
44  try {
45  po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
46  } catch (std::exception& e) {
47  B2FATAL(e.what());
48  }
49 
50  if (vm.count("help")) {
51  std::cout << desc << std::endl;
52  return 1;
53  }
54 
55  try {
56  po::notify(vm);
57  } catch (std::exception& e) {
58  B2FATAL(e.what());
59  }
60 
61  int* buffer = new int[MAXEVTSIZE];
62 
63  HLTMainLoop mainLoop;
64 
65  HLTSocket socket;
66  HLTFile file;
67  int size;
68  int returnValue;
69  int nevt = 0;
70  bool terminate = false;
71 
72  if (not file.open(file_name, raw, "r")) {
73  B2ERROR("Can not open file");
74  terminate = true;
75  }
76 
77 
78  auto start = std::chrono::steady_clock::now();
79  while (mainLoop.isRunning() and not terminate) {
80  // Connect socket if needed
81  if (not socket.initialized()) {
82  if (vm.count("connect-to")) {
83  if (not socket.connect(sourceHost, port, mainLoop)) {
84  B2ERROR("Could not reconnnect!");
85  break;
86  }
87  } else {
88  if (not socket.accept(port)) {
89  B2ERROR("Could not reconnect!");
90  break;
91  }
92  }
93  B2RESULT("Connected.");
94  start = std::chrono::steady_clock::now();
95  }
96 
97  // Read from file
98  if (raw) {
99  size = file.get_wordbuf(buffer, MAXEVTSIZE);
100  } else {
101  size = file.get(reinterpret_cast<char*>(buffer), MAXEVTSIZE);
102  }
103  // Error checking socket
104  if (size == 0) {
105  if (repeat) {
106  file.open(file_name, raw, "r");
107  continue;
108  } else {
109  B2RESULT("Reached end of file");
110  break;
111  }
112  } else if (size < 0) {
113  if (mainLoop.isRunning()) {
114  B2ERROR("Error in receiving the event! Aborting.");
115  }
116  // This is fine if we are terminating anyways
117  break;
118  }
119  B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
120 
121  if (raw) {
122  returnValue = socket.put_wordbuf(buffer, size);
123  } else {
124  EvtMessage message(reinterpret_cast<char*>(buffer));
125  returnValue = socket.put(message.buffer(), message.size());
126  // Terminate messages make us terminate
127  if (message.type() == MSG_TERMINATE) {
128  B2RESULT("Having received terminate message");
129  terminate = true;
130  }
131  }
132  if (returnValue == 0) {
133  B2ERROR("Error in sending the event! Reconnecting.");
134  socket.deinitialize();
135  continue;
136  } else if (returnValue < 0) {
137  if (mainLoop.isRunning()) {
138  B2ERROR("Error in sending the event! Aborting.");
139  }
140  // This is fine if we are terminating anyways
141  break;
142  }
143  B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
144 
145  // Logging
146  nevt++;
147  if (nevt % 5000 == 0) {
148  auto current = std::chrono::steady_clock::now();
149  double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(current - start).count();
150  B2RESULT("b2hlt_file2socket event number: " << nevt << " with a rate of " << 5000 / elapsed << " Hz");
151  start = std::chrono::steady_clock::now();
152  }
153  }
154 
155  B2RESULT("Program terminated.");
156 }
Class to manage streamed object.
Definition: EvtMessage.h:59
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91