Belle II Software development
b2hlt_socket2rb.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/rfarm/manager/RFFlowStat.h>
9#include <framework/pcore/RingBuffer.h>
10#include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
11#include <daq/rfarm/event/hltsocket/HLTSocket.h>
12#include <framework/logging/Logger.h>
13#include <framework/pcore/EvtMessage.h>
14
15#include <boost/program_options.hpp>
16#include <iostream>
17
18#define MAXEVTSIZE 80000000
19
20using namespace Belle2;
21namespace po = boost::program_options;
22
23int main(int argc, char* argv[])
24{
25 std::string ringBufferName;
26 unsigned int port;
27 std::string shmName;
28 unsigned int shmID;
29 bool raw = false;
30 std::string sourceHost;
31
32 po::options_description desc("b2hlt_socket2rb PORT RING-BUFFER-NAME SHM-NAME SHM-ID");
33 desc.add_options()
34 ("help,h", "Print this help message")
35 ("port,p", po::value<unsigned int>(&port)->required(), "port number to connect or listen to")
36 ("ring-buffer-name,r", po::value<std::string>(&ringBufferName)->required(), "name of the ring buffer")
37 ("shm-name,n", po::value<std::string>(&shmName)->required(), "name of the shm for flow output")
38 ("shm-id,i", po::value<unsigned int>(&shmID)->required(), "id in the shm for flow output")
39 ("connect-to,c", po::value<std::string>(&sourceHost), "connect to a given host instead of listening")
40 ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers");
41
42 po::positional_options_description p;
43 p.add("port", 1).add("ring-buffer-name", 1).add("shm-name", 1).add("shm-id", 1);
44
45 po::variables_map vm;
46 try {
47 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
48 } catch (std::exception& e) {
49 B2FATAL(e.what());
50 }
51
52 if (vm.count("help")) {
53 std::cout << desc << std::endl;
54 return 1;
55 }
56
57 try {
58 po::notify(vm);
59 } catch (std::exception& e) {
60 B2FATAL(e.what());
61 }
62
63 // TODO: delete or not?
64 RingBuffer* ringBuffer = new RingBuffer(ringBufferName.c_str());
65 RFFlowStat flow((char*)shmName.c_str(), shmID, ringBuffer);
66 int* buffer = new int[MAXEVTSIZE];
67
68 HLTMainLoop mainLoop;
69
70 HLTSocket socket;
71 int size;
72 int nevt = 0;
73 bool terminate = false;
74
75 while (mainLoop.isRunning() and not terminate) {
76 // Connect socket if needed
77 if (not socket.initialized()) {
78 if (vm.count("connect-to")) {
79 if (not socket.connect(sourceHost, port, mainLoop)) {
80 B2ERROR("Could not reconnnect!");
81 break;
82 }
83 } else {
84 if (not socket.accept(port)) {
85 B2ERROR("Could not reconnect!");
86 break;
87 }
88 }
89 B2RESULT("Connected.");
90 }
91
92 // Read from socket
93 if (raw) {
94 size = socket.get_wordbuf(buffer, MAXEVTSIZE);
95 } else {
96 size = socket.get(reinterpret_cast<char*>(buffer), MAXEVTSIZE);
97 }
98 // Error checking socket
99 if (size == 0) {
100 B2ERROR("Error in receiving the event! Reconnecting.");
101 socket.deinitialize();
102 continue;
103 } else if (size < 0) {
104 if (mainLoop.isRunning()) {
105 B2ERROR("Error in receiving the event! Aborting.");
106 }
107 // This is fine if we are terminating anyways
108 break;
109 }
110 B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
111
112 if (not raw) {
113 // We want to have it in words, not bytes
114 int sizeInWords = ((size - 1) / sizeof(int) + 1);
115
116 // However we have to make sure to pad the buffer correctly, as sizeInWords could be a larger buffer
117 unsigned int sizeRoundedUp = sizeInWords * sizeof(int);
118 auto charBuffer = reinterpret_cast<char*>(buffer);
119 for (unsigned int pos = size; pos < sizeRoundedUp; ++pos) {
120 charBuffer[pos] = 0;
121 }
122 size = sizeInWords;
123
124 // Terminate messages make us terminate
125 EvtMessage message(reinterpret_cast<char*>(buffer));
126 if (message.type() == MSG_TERMINATE) {
127 B2RESULT("Having received terminate message");
128 terminate = true;
129 }
130 }
131
132 // Monitoring
133 flow.log(size * sizeof(int));
134
135 // Write to ring buffer
136 const int returnValue = mainLoop.writeToRingBufferWaiting(ringBuffer, buffer, size);
137 // Error check ring buffer
138 if (returnValue <= 0) {
139 if (mainLoop.isRunning()) {
140 B2ERROR("Writing to the ring buffer failed!");
141 }
142 // This is fine if we are terminating anyways
143 break;
144 }
145 B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
146
147 // Logging
148 nevt++;
149 if (nevt % 5000 == 0) {
150 B2RESULT("b2hlt_rb2socket event number: " << nevt);
151 }
152 }
153
154 B2RESULT("Program terminated.");
155}
Class to manage streamed object.
Definition: EvtMessage.h:59
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
Abstract base class for different kinds of events.