8#include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
9#include <daq/rfarm/event/hltsocket/HLTFile.h>
10#include <daq/rfarm/event/hltsocket/HLTSocket.h>
11#include <framework/logging/Logger.h>
12#include <framework/pcore/EvtMessage.h>
14#include <boost/program_options.hpp>
18#define MAXEVTSIZE 80000000
21namespace po = boost::program_options;
23int main(
int argc,
char* argv[])
26 std::string sourceHost;
29 std::string file_name;
31 po::options_description desc(
"b2hlt_file2socket FILE-NAME PORT SHM-NAME SHM-ID");
33 (
"help,h",
"Print this help message")
34 (
"port,p", po::value<unsigned int>(&port)->required(),
"port number to connect or listen to")
35 (
"file-name,f", po::value<std::string>(&file_name)->required(),
"file name to write to")
36 (
"raw", po::bool_switch(&raw)->default_value(
false),
"send and receive raw data instead of event buffers")
37 (
"repeat", po::bool_switch(&repeat)->default_value(
false),
"repeat after the file is finished");
40 po::positional_options_description p;
41 p.add(
"file-name", 1).add(
"port", 1);
45 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
46 }
catch (std::exception& e) {
50 if (vm.count(
"help")) {
51 std::cout << desc << std::endl;
57 }
catch (std::exception& e) {
61 int* buffer =
new int[MAXEVTSIZE];
70 bool terminate =
false;
72 if (not file.open(file_name, raw,
"r")) {
73 B2ERROR(
"Can not open file");
78 auto start = std::chrono::steady_clock::now();
79 while (mainLoop.isRunning() and not terminate) {
81 if (not socket.initialized()) {
82 if (vm.count(
"connect-to")) {
83 if (not socket.connect(sourceHost, port, mainLoop)) {
84 B2ERROR(
"Could not reconnnect!");
88 if (not socket.accept(port)) {
89 B2ERROR(
"Could not reconnect!");
93 B2RESULT(
"Connected.");
94 start = std::chrono::steady_clock::now();
99 size = file.get_wordbuf(buffer, MAXEVTSIZE);
101 size = file.get(
reinterpret_cast<char*
>(buffer), MAXEVTSIZE);
106 file.open(file_name, raw,
"r");
109 B2RESULT(
"Reached end of file");
112 }
else if (size < 0) {
113 if (mainLoop.isRunning()) {
114 B2ERROR(
"Error in receiving the event! Aborting.");
119 B2ASSERT(
"Size is negative! This should be handled above. Not good!", size > 0);
122 returnValue = socket.put_wordbuf(buffer, size);
124 EvtMessage message(
reinterpret_cast<char*
>(buffer));
125 returnValue = socket.put(message.buffer(), message.size());
127 if (message.type() == MSG_TERMINATE) {
128 B2RESULT(
"Having received terminate message");
132 if (returnValue == 0) {
133 B2ERROR(
"Error in sending the event! Reconnecting.");
134 socket.deinitialize();
136 }
else if (returnValue < 0) {
137 if (mainLoop.isRunning()) {
138 B2ERROR(
"Error in sending the event! Aborting.");
143 B2ASSERT(
"Written size is negative! This should be handled above. Not good!", returnValue > 0);
147 if (nevt % 5000 == 0) {
148 auto current = std::chrono::steady_clock::now();
149 double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(current - start).count();
150 B2RESULT(
"b2hlt_file2socket event number: " << nevt <<
" with a rate of " << 5000 / elapsed <<
" Hz");
151 start = std::chrono::steady_clock::now();
155 B2RESULT(
"Program terminated.");
Class to manage streamed object.
Abstract base class for different kinds of events.