Belle II Software  release-05-01-25
b2hlt_file2socket.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Ryosuke Itoh, Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
11 #include <daq/rfarm/event/hltsocket/HLTFile.h>
12 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
13 #include <framework/logging/Logger.h>
14 #include <framework/pcore/EvtMessage.h>
15 
16 #include <boost/program_options.hpp>
17 #include <chrono>
18 #include <iostream>
19 
20 #define MAXEVTSIZE 80000000
21 
22 using namespace Belle2;
23 namespace po = boost::program_options;
24 
25 int main(int argc, char* argv[])
26 {
27  std::string ringBufferName;
28  unsigned int port;
29  std::string sourceHost;
30  bool raw;
31  bool repeat;
32  std::string file_name;
33 
34  po::options_description desc("b2hlt_file2socket FILE-NAME PORT SHM-NAME SHM-ID");
35  desc.add_options()
36  ("help,h", "Print this help message")
37  ("port,p", po::value<unsigned int>(&port)->required(), "port number to connect or listen to")
38  ("file-name,f", po::value<std::string>(&file_name)->required(), "file name to write to")
39  ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers")
40  ("repeat", po::bool_switch(&repeat)->default_value(false), "repeat after the file is finished");
41 
42 
43  po::positional_options_description p;
44  p.add("file-name", 1).add("port", 1);
45 
46  po::variables_map vm;
47  try {
48  po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
49  } catch (std::exception& e) {
50  B2FATAL(e.what());
51  }
52 
53  if (vm.count("help")) {
54  std::cout << desc << std::endl;
55  return 1;
56  }
57 
58  try {
59  po::notify(vm);
60  } catch (std::exception& e) {
61  B2FATAL(e.what());
62  }
63 
64  int* buffer = new int[MAXEVTSIZE];
65 
66  HLTMainLoop mainLoop;
67 
68  HLTSocket socket;
69  HLTFile file;
70  int size;
71  int returnValue;
72  int nevt = 0;
73  bool terminate = false;
74 
75  if (not file.open(file_name, raw, "r")) {
76  B2ERROR("Can not open file");
77  terminate = true;
78  }
79 
80 
81  auto start = std::chrono::steady_clock::now();
82  while (mainLoop.isRunning() and not terminate) {
83  // Connect socket if needed
84  if (not socket.initialized()) {
85  if (vm.count("connect-to")) {
86  if (not socket.connect(sourceHost, port, mainLoop)) {
87  B2ERROR("Could not reconnnect!");
88  break;
89  }
90  } else {
91  if (not socket.accept(port)) {
92  B2ERROR("Could not reconnect!");
93  break;
94  }
95  }
96  B2RESULT("Connected.");
97  start = std::chrono::steady_clock::now();
98  }
99 
100  // Read from file
101  if (raw) {
102  size = file.get_wordbuf(buffer, MAXEVTSIZE);
103  } else {
104  size = file.get(reinterpret_cast<char*>(buffer), MAXEVTSIZE);
105  }
106  // Error checking socket
107  if (size == 0) {
108  if (repeat) {
109  file.open(file_name, raw, "r");
110  continue;
111  } else {
112  B2RESULT("Reached end of file");
113  break;
114  }
115  } else if (size < 0) {
116  if (mainLoop.isRunning()) {
117  B2ERROR("Error in receiving the event! Aborting.");
118  }
119  // This is fine if we are terminating anyways
120  break;
121  }
122  B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
123 
124  if (raw) {
125  returnValue = socket.put_wordbuf(buffer, size);
126  } else {
127  EvtMessage message(reinterpret_cast<char*>(buffer));
128  returnValue = socket.put(message.buffer(), message.size());
129  // Terminate messages make us terminate
130  if (message.type() == MSG_TERMINATE) {
131  B2RESULT("Having received terminate message");
132  terminate = true;
133  }
134  }
135  if (returnValue == 0) {
136  B2ERROR("Error in sending the event! Reconnecting.");
137  socket.deinitialize();
138  continue;
139  } else if (returnValue < 0) {
140  if (mainLoop.isRunning()) {
141  B2ERROR("Error in sending the event! Aborting.");
142  }
143  // This is fine if we are terminating anyways
144  break;
145  }
146  B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
147 
148  // Logging
149  nevt++;
150  if (nevt % 5000 == 0) {
151  auto current = std::chrono::steady_clock::now();
152  double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(current - start).count();
153  B2RESULT("b2hlt_file2socket event number: " << nevt << " with a rate of " << 5000 / elapsed << " Hz");
154  start = std::chrono::steady_clock::now();
155  }
156  }
157 
158  B2RESULT("Program terminated.");
159 }
prepareAsicCrosstalkSimDB.e
e
aux.
Definition: prepareAsicCrosstalkSimDB.py:53
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::HLTMainLoop
Definition: HLTMainLoop.h:28
Belle2::HLTFile
Definition: HLTFile.h:11
Belle2::HLTSocket
Definition: HLTSocket.h:13