10 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
11 #include <daq/rfarm/event/hltsocket/HLTFile.h>
12 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
13 #include <framework/logging/Logger.h>
14 #include <framework/pcore/EvtMessage.h>
16 #include <boost/program_options.hpp>
20 #define MAXEVTSIZE 80000000
23 namespace po = boost::program_options;
25 int main(
int argc,
char* argv[])
27 std::string ringBufferName;
29 std::string sourceHost;
32 std::string file_name;
34 po::options_description desc(
"b2hlt_file2socket FILE-NAME PORT SHM-NAME SHM-ID");
36 (
"help,h",
"Print this help message")
37 (
"port,p", po::value<unsigned int>(&port)->required(),
"port number to connect or listen to")
38 (
"file-name,f", po::value<std::string>(&file_name)->required(),
"file name to write to")
39 (
"raw", po::bool_switch(&raw)->default_value(
false),
"send and receive raw data instead of event buffers")
40 (
"repeat", po::bool_switch(&repeat)->default_value(
false),
"repeat after the file is finished");
43 po::positional_options_description p;
44 p.add(
"file-name", 1).add(
"port", 1);
48 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
49 }
catch (std::exception& e) {
53 if (vm.count(
"help")) {
54 std::cout << desc << std::endl;
60 }
catch (std::exception& e) {
64 int* buffer =
new int[MAXEVTSIZE];
73 bool terminate =
false;
75 if (not file.open(file_name, raw,
"r")) {
76 B2ERROR(
"Can not open file");
81 auto start = std::chrono::steady_clock::now();
82 while (mainLoop.isRunning() and not terminate) {
84 if (not socket.initialized()) {
85 if (vm.count(
"connect-to")) {
86 if (not socket.connect(sourceHost, port, mainLoop)) {
87 B2ERROR(
"Could not reconnnect!");
91 if (not socket.accept(port)) {
92 B2ERROR(
"Could not reconnect!");
96 B2RESULT(
"Connected.");
97 start = std::chrono::steady_clock::now();
102 size = file.get_wordbuf(buffer, MAXEVTSIZE);
104 size = file.get(
reinterpret_cast<char*
>(buffer), MAXEVTSIZE);
109 file.open(file_name, raw,
"r");
112 B2RESULT(
"Reached end of file");
115 }
else if (size < 0) {
116 if (mainLoop.isRunning()) {
117 B2ERROR(
"Error in receiving the event! Aborting.");
122 B2ASSERT(
"Size is negative! This should be handled above. Not good!", size > 0);
125 returnValue = socket.put_wordbuf(buffer, size);
127 EvtMessage message(
reinterpret_cast<char*
>(buffer));
128 returnValue = socket.put(message.buffer(), message.size());
130 if (message.type() == MSG_TERMINATE) {
131 B2RESULT(
"Having received terminate message");
135 if (returnValue == 0) {
136 B2ERROR(
"Error in sending the event! Reconnecting.");
137 socket.deinitialize();
139 }
else if (returnValue < 0) {
140 if (mainLoop.isRunning()) {
141 B2ERROR(
"Error in sending the event! Aborting.");
146 B2ASSERT(
"Written size is negative! This should be handled above. Not good!", returnValue > 0);
150 if (nevt % 5000 == 0) {
151 auto current = std::chrono::steady_clock::now();
152 double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(current - start).count();
153 B2RESULT(
"b2hlt_file2socket event number: " << nevt <<
" with a rate of " << 5000 / elapsed <<
" Hz");
154 start = std::chrono::steady_clock::now();
158 B2RESULT(
"Program terminated.");