Belle II Software development
b2hlt_rb2rb.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/RingBuffer.h>
10#include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
11#include <framework/logging/Logger.h>
12#include <framework/pcore/EvtMessage.h>
13
14#include <boost/program_options.hpp>
15#include <iostream>
16
17#define MAXEVTSIZE 80000000
18#define RBUFSIZE 10000000
19
20
21using namespace Belle2;
22namespace po = boost::program_options;
23
24int main(int argc, char* argv[])
25{
26 std::vector<std::string> inputRingBufferNames;
27 std::vector<std::string> outputRingBufferNames;
28
29 po::options_description desc("b2hlt_rb2rb");
30 desc.add_options()
31 ("help,h", "Print this help message")
32 ("input-ring-buffer-name,r", po::value<std::vector<std::string>>(&inputRingBufferNames)->required(),
33 "name of the input ring buffers")
34 ("output-ring-buffer-name,r", po::value<std::vector<std::string>>(&outputRingBufferNames)->required(),
35 "name of the output ring buffers");
36
37 po::variables_map vm;
38 try {
39 po::store(po::command_line_parser(argc, argv).options(desc).run(), vm);
40 } catch (std::exception& e) {
41 B2FATAL(e.what());
42 }
43
44 if (vm.count("help")) {
45 std::cout << desc << std::endl;
46 return 1;
47 }
48
49 try {
50 po::notify(vm);
51 } catch (std::exception& e) {
52 B2FATAL(e.what());
53 }
54
55 if (outputRingBufferNames.empty() or inputRingBufferNames.empty()) {
56 B2FATAL("Need at least one output and input ring buffer!");
57 }
58
59 // TODO: delete or not?
60 std::vector<RingBuffer*> inputRingBuffers;
61 for (const std::string& bufferName : inputRingBufferNames) {
62 inputRingBuffers.push_back(new RingBuffer(bufferName.c_str(), RBUFSIZE));
63 inputRingBuffers.back()->dump_db();
64 }
65
66 std::vector<RingBuffer*> outputRingBuffers;
67 for (const std::string& bufferName : outputRingBufferNames) {
68 outputRingBuffers.push_back(new RingBuffer(bufferName.c_str(), RBUFSIZE));
69 outputRingBuffers.back()->dump_db();
70 }
71
72 HLTMainLoop mainLoop;
73
74 int* buffer = new int[MAXEVTSIZE];
75 int nevt = 0;
76
77 auto inputRingBuffer = inputRingBuffers.begin();
78 auto outputRingBuffer = outputRingBuffers.begin();
79
80 while (mainLoop.isRunning()) {
81 // Read from ring buffer
82 const int size = mainLoop.readFromRingBufferWaiting(*inputRingBuffer, buffer);
83 // Error checking ring buffer
84 if (size <= 0) {
85 if (mainLoop.isRunning()) {
86 B2ERROR("Writing to the ring buffer failed!");
87 }
88 // This is fine if we are terminating anyways
89 break;
90 }
91 B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
92
93 // Write to ring buffer
94 const int returnValue = mainLoop.writeToRingBufferWaiting(*outputRingBuffer, buffer, size);
95 // Error check ring buffer
96 if (returnValue <= 0) {
97 if (mainLoop.isRunning()) {
98 B2ERROR("Writing to the ring buffer failed!");
99 }
100 // This is fine if we are terminating anyways
101 break;
102 }
103 B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
104
105 // Logging
106 nevt++;
107 if (nevt % 5000 == 0) {
108 B2RESULT("b2hlt_rb2socket event number: " << nevt);
109 }
110
111 inputRingBuffer++;
112 if (inputRingBuffer == inputRingBuffers.end()) {
113 inputRingBuffer = inputRingBuffers.begin();
114 }
115 outputRingBuffer++;
116 if (outputRingBuffer == outputRingBuffers.end()) {
117 outputRingBuffer = outputRingBuffers.begin();
118 }
119 }
120
121 B2RESULT("Program terminated.");
122}
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.