Belle II Software  release-06-02-00
b2hlt_file2socket.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
9 #include <daq/rfarm/event/hltsocket/HLTFile.h>
10 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
11 #include <framework/logging/Logger.h>
12 #include <framework/pcore/EvtMessage.h>
13 
14 #include <boost/program_options.hpp>
15 #include <chrono>
16 #include <iostream>
17 
18 #define MAXEVTSIZE 80000000
19 
20 using namespace Belle2;
21 namespace po = boost::program_options;
22 
23 int main(int argc, char* argv[])
24 {
25  std::string ringBufferName;
26  unsigned int port;
27  std::string sourceHost;
28  bool raw;
29  bool repeat;
30  std::string file_name;
31 
32  po::options_description desc("b2hlt_file2socket FILE-NAME PORT SHM-NAME SHM-ID");
33  desc.add_options()
34  ("help,h", "Print this help message")
35  ("port,p", po::value<unsigned int>(&port)->required(), "port number to connect or listen to")
36  ("file-name,f", po::value<std::string>(&file_name)->required(), "file name to write to")
37  ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers")
38  ("repeat", po::bool_switch(&repeat)->default_value(false), "repeat after the file is finished");
39 
40 
41  po::positional_options_description p;
42  p.add("file-name", 1).add("port", 1);
43 
44  po::variables_map vm;
45  try {
46  po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
47  } catch (std::exception& e) {
48  B2FATAL(e.what());
49  }
50 
51  if (vm.count("help")) {
52  std::cout << desc << std::endl;
53  return 1;
54  }
55 
56  try {
57  po::notify(vm);
58  } catch (std::exception& e) {
59  B2FATAL(e.what());
60  }
61 
62  int* buffer = new int[MAXEVTSIZE];
63 
64  HLTMainLoop mainLoop;
65 
66  HLTSocket socket;
67  HLTFile file;
68  int size;
69  int returnValue;
70  int nevt = 0;
71  bool terminate = false;
72 
73  if (not file.open(file_name, raw, "r")) {
74  B2ERROR("Can not open file");
75  terminate = true;
76  }
77 
78 
79  auto start = std::chrono::steady_clock::now();
80  while (mainLoop.isRunning() and not terminate) {
81  // Connect socket if needed
82  if (not socket.initialized()) {
83  if (vm.count("connect-to")) {
84  if (not socket.connect(sourceHost, port, mainLoop)) {
85  B2ERROR("Could not reconnnect!");
86  break;
87  }
88  } else {
89  if (not socket.accept(port)) {
90  B2ERROR("Could not reconnect!");
91  break;
92  }
93  }
94  B2RESULT("Connected.");
95  start = std::chrono::steady_clock::now();
96  }
97 
98  // Read from file
99  if (raw) {
100  size = file.get_wordbuf(buffer, MAXEVTSIZE);
101  } else {
102  size = file.get(reinterpret_cast<char*>(buffer), MAXEVTSIZE);
103  }
104  // Error checking socket
105  if (size == 0) {
106  if (repeat) {
107  file.open(file_name, raw, "r");
108  continue;
109  } else {
110  B2RESULT("Reached end of file");
111  break;
112  }
113  } else if (size < 0) {
114  if (mainLoop.isRunning()) {
115  B2ERROR("Error in receiving the event! Aborting.");
116  }
117  // This is fine if we are terminating anyways
118  break;
119  }
120  B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
121 
122  if (raw) {
123  returnValue = socket.put_wordbuf(buffer, size);
124  } else {
125  EvtMessage message(reinterpret_cast<char*>(buffer));
126  returnValue = socket.put(message.buffer(), message.size());
127  // Terminate messages make us terminate
128  if (message.type() == MSG_TERMINATE) {
129  B2RESULT("Having received terminate message");
130  terminate = true;
131  }
132  }
133  if (returnValue == 0) {
134  B2ERROR("Error in sending the event! Reconnecting.");
135  socket.deinitialize();
136  continue;
137  } else if (returnValue < 0) {
138  if (mainLoop.isRunning()) {
139  B2ERROR("Error in sending the event! Aborting.");
140  }
141  // This is fine if we are terminating anyways
142  break;
143  }
144  B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
145 
146  // Logging
147  nevt++;
148  if (nevt % 5000 == 0) {
149  auto current = std::chrono::steady_clock::now();
150  double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(current - start).count();
151  B2RESULT("b2hlt_file2socket event number: " << nevt << " with a rate of " << 5000 / elapsed << " Hz");
152  start = std::chrono::steady_clock::now();
153  }
154  }
155 
156  B2RESULT("Program terminated.");
157 }
Class to manage streamed object.
Definition: EvtMessage.h:59
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:75