Belle II Software  release-08-01-10
b2hlt_rb2rb.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/RingBuffer.h>
10 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
11 #include <framework/logging/Logger.h>
12 #include <framework/pcore/EvtMessage.h>
13 
14 #include <boost/program_options.hpp>
15 #include <iostream>
16 
17 #define MAXEVTSIZE 80000000
18 #define RBUFSIZE 10000000
19 
20 
21 using namespace Belle2;
22 namespace po = boost::program_options;
23 
24 int main(int argc, char* argv[])
25 {
26  std::vector<std::string> inputRingBufferNames;
27  std::vector<std::string> outputRingBufferNames;
28 
29  po::options_description desc("b2hlt_rb2rb");
30  desc.add_options()
31  ("help,h", "Print this help message")
32  ("input-ring-buffer-name,r", po::value<std::vector<std::string>>(&inputRingBufferNames)->required(),
33  "name of the input ring buffers")
34  ("output-ring-buffer-name,r", po::value<std::vector<std::string>>(&outputRingBufferNames)->required(),
35  "name of the output ring buffers");
36 
37  po::variables_map vm;
38  try {
39  po::store(po::command_line_parser(argc, argv).options(desc).run(), vm);
40  } catch (std::exception& e) {
41  B2FATAL(e.what());
42  }
43 
44  if (vm.count("help")) {
45  std::cout << desc << std::endl;
46  return 1;
47  }
48 
49  try {
50  po::notify(vm);
51  } catch (std::exception& e) {
52  B2FATAL(e.what());
53  }
54 
55  if (outputRingBufferNames.empty() or inputRingBufferNames.empty()) {
56  B2FATAL("Need at least one output and input ring buffer!");
57  }
58 
59  // TODO: delete or not?
60  std::vector<RingBuffer*> inputRingBuffers;
61  for (const std::string& bufferName : inputRingBufferNames) {
62  inputRingBuffers.push_back(new RingBuffer(bufferName.c_str(), RBUFSIZE));
63  inputRingBuffers.back()->dump_db();
64  }
65 
66  std::vector<RingBuffer*> outputRingBuffers;
67  for (const std::string& bufferName : outputRingBufferNames) {
68  outputRingBuffers.push_back(new RingBuffer(bufferName.c_str(), RBUFSIZE));
69  outputRingBuffers.back()->dump_db();
70  }
71 
72  HLTMainLoop mainLoop;
73 
74  int* buffer = new int[MAXEVTSIZE];
75  int nevt = 0;
76 
77  auto inputRingBuffer = inputRingBuffers.begin();
78  auto outputRingBuffer = outputRingBuffers.begin();
79 
80  while (mainLoop.isRunning()) {
81  // Read from ring buffer
82  const int size = mainLoop.readFromRingBufferWaiting(*inputRingBuffer, buffer);
83  // Error checking ring buffer
84  if (size <= 0) {
85  if (mainLoop.isRunning()) {
86  B2ERROR("Writing to the ring buffer failed!");
87  }
88  // This is fine if we are terminating anyways
89  break;
90  }
91  B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
92 
93  // Write to ring buffer
94  const int returnValue = mainLoop.writeToRingBufferWaiting(*outputRingBuffer, buffer, size);
95  // Error check ring buffer
96  if (returnValue <= 0) {
97  if (mainLoop.isRunning()) {
98  B2ERROR("Writing to the ring buffer failed!");
99  }
100  // This is fine if we are terminating anyways
101  break;
102  }
103  B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
104 
105  // Logging
106  nevt++;
107  if (nevt % 5000 == 0) {
108  B2RESULT("b2hlt_rb2socket event number: " << nevt);
109  }
110 
111  inputRingBuffer++;
112  if (inputRingBuffer == inputRingBuffers.end()) {
113  inputRingBuffer = inputRingBuffers.begin();
114  }
115  outputRingBuffer++;
116  if (outputRingBuffer == outputRingBuffers.end()) {
117  outputRingBuffer = outputRingBuffers.begin();
118  }
119  }
120 
121  B2RESULT("Program terminated.");
122 }
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91