8#include <daq/rfarm/manager/RFFlowStat.h>
9#include <framework/pcore/RingBuffer.h>
10#include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
11#include <daq/rfarm/event/hltsocket/HLTSocket.h>
12#include <framework/logging/Logger.h>
13#include <framework/pcore/EvtMessage.h>
15#include <boost/program_options.hpp>
18#define MAXEVTSIZE 80000000
21namespace po = boost::program_options;
23int main(
int argc,
char* argv[])
25 std::string ringBufferName;
30 std::string sourceHost;
32 po::options_description desc(
"b2hlt_socket2rb PORT RING-BUFFER-NAME SHM-NAME SHM-ID");
34 (
"help,h",
"Print this help message")
35 (
"port,p", po::value<unsigned int>(&port)->required(),
"port number to connect or listen to")
36 (
"ring-buffer-name,r", po::value<std::string>(&ringBufferName)->required(),
"name of the ring buffer")
37 (
"shm-name,n", po::value<std::string>(&shmName)->required(),
"name of the shm for flow output")
38 (
"shm-id,i", po::value<unsigned int>(&shmID)->required(),
"id in the shm for flow output")
39 (
"connect-to,c", po::value<std::string>(&sourceHost),
"connect to a given host instead of listening")
40 (
"raw", po::bool_switch(&raw)->default_value(
false),
"send and receive raw data instead of event buffers");
42 po::positional_options_description p;
43 p.add(
"port", 1).add(
"ring-buffer-name", 1).add(
"shm-name", 1).add(
"shm-id", 1);
47 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
48 }
catch (std::exception& e) {
52 if (vm.count(
"help")) {
53 std::cout << desc << std::endl;
59 }
catch (std::exception& e) {
65 RFFlowStat flow((
char*)shmName.c_str(), shmID, ringBuffer);
66 int* buffer =
new int[MAXEVTSIZE];
73 bool terminate =
false;
75 while (mainLoop.isRunning() and not terminate) {
77 if (not socket.initialized()) {
78 if (vm.count(
"connect-to")) {
79 if (not socket.connect(sourceHost, port, mainLoop)) {
80 B2ERROR(
"Could not reconnnect!");
84 if (not socket.accept(port)) {
85 B2ERROR(
"Could not reconnect!");
89 B2RESULT(
"Connected.");
94 size = socket.get_wordbuf(buffer, MAXEVTSIZE);
96 size = socket.get(
reinterpret_cast<char*
>(buffer), MAXEVTSIZE);
100 B2ERROR(
"Error in receiving the event! Reconnecting.");
101 socket.deinitialize();
103 }
else if (size < 0) {
104 if (mainLoop.isRunning()) {
105 B2ERROR(
"Error in receiving the event! Aborting.");
110 B2ASSERT(
"Size is negative! This should be handled above. Not good!", size > 0);
114 int sizeInWords = ((size - 1) /
sizeof(
int) + 1);
117 unsigned int sizeRoundedUp = sizeInWords *
sizeof(int);
118 auto charBuffer =
reinterpret_cast<char*
>(buffer);
119 for (
unsigned int pos = size; pos < sizeRoundedUp; ++pos) {
125 EvtMessage message(
reinterpret_cast<char*
>(buffer));
126 if (message.type() == MSG_TERMINATE) {
127 B2RESULT(
"Having received terminate message");
133 flow.log(size *
sizeof(
int));
136 const int returnValue = mainLoop.writeToRingBufferWaiting(ringBuffer, buffer, size);
138 if (returnValue <= 0) {
139 if (mainLoop.isRunning()) {
140 B2ERROR(
"Writing to the ring buffer failed!");
145 B2ASSERT(
"Written size is negative! This should be handled above. Not good!", returnValue > 0);
149 if (nevt % 5000 == 0) {
150 B2RESULT(
"b2hlt_rb2socket event number: " << nevt);
154 B2RESULT(
"Program terminated.");
Class to manage streamed object.
Class to manage a Ring Buffer placed in an IPC shared memory.
Abstract base class for different kinds of events.