8 #include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
9 #include <daq/rfarm/event/hltsocket/HLTFile.h>
10 #include <daq/rfarm/event/hltsocket/HLTSocket.h>
11 #include <framework/logging/Logger.h>
12 #include <framework/pcore/EvtMessage.h>
14 #include <boost/program_options.hpp>
18 #define MAXEVTSIZE 80000000
21 namespace po = boost::program_options;
23 int main(
int argc,
char* argv[])
25 std::string ringBufferName;
27 std::string sourceHost;
30 std::string file_name;
32 po::options_description desc(
"b2hlt_file2socket FILE-NAME PORT SHM-NAME SHM-ID");
34 (
"help,h",
"Print this help message")
35 (
"port,p", po::value<unsigned int>(&port)->required(),
"port number to connect or listen to")
36 (
"file-name,f", po::value<std::string>(&file_name)->required(),
"file name to write to")
37 (
"raw", po::bool_switch(&raw)->default_value(
false),
"send and receive raw data instead of event buffers")
38 (
"repeat", po::bool_switch(&repeat)->default_value(
false),
"repeat after the file is finished");
41 po::positional_options_description p;
42 p.add(
"file-name", 1).add(
"port", 1);
46 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
47 }
catch (std::exception& e) {
51 if (vm.count(
"help")) {
52 std::cout << desc << std::endl;
58 }
catch (std::exception& e) {
62 int* buffer =
new int[MAXEVTSIZE];
71 bool terminate =
false;
73 if (not file.open(file_name, raw,
"r")) {
74 B2ERROR(
"Can not open file");
79 auto start = std::chrono::steady_clock::now();
80 while (mainLoop.isRunning() and not terminate) {
82 if (not socket.initialized()) {
83 if (vm.count(
"connect-to")) {
84 if (not socket.connect(sourceHost, port, mainLoop)) {
85 B2ERROR(
"Could not reconnnect!");
89 if (not socket.accept(port)) {
90 B2ERROR(
"Could not reconnect!");
94 B2RESULT(
"Connected.");
95 start = std::chrono::steady_clock::now();
100 size = file.get_wordbuf(buffer, MAXEVTSIZE);
102 size = file.get(
reinterpret_cast<char*
>(buffer), MAXEVTSIZE);
107 file.open(file_name, raw,
"r");
110 B2RESULT(
"Reached end of file");
113 }
else if (size < 0) {
114 if (mainLoop.isRunning()) {
115 B2ERROR(
"Error in receiving the event! Aborting.");
120 B2ASSERT(
"Size is negative! This should be handled above. Not good!", size > 0);
123 returnValue = socket.put_wordbuf(buffer, size);
125 EvtMessage message(
reinterpret_cast<char*
>(buffer));
126 returnValue = socket.put(message.buffer(), message.size());
128 if (message.type() == MSG_TERMINATE) {
129 B2RESULT(
"Having received terminate message");
133 if (returnValue == 0) {
134 B2ERROR(
"Error in sending the event! Reconnecting.");
135 socket.deinitialize();
137 }
else if (returnValue < 0) {
138 if (mainLoop.isRunning()) {
139 B2ERROR(
"Error in sending the event! Aborting.");
144 B2ASSERT(
"Written size is negative! This should be handled above. Not good!", returnValue > 0);
148 if (nevt % 5000 == 0) {
149 auto current = std::chrono::steady_clock::now();
150 double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(current - start).count();
151 B2RESULT(
"b2hlt_file2socket event number: " << nevt <<
" with a rate of " << 5000 / elapsed <<
" Hz");
152 start = std::chrono::steady_clock::now();
156 B2RESULT(
"Program terminated.");
Class to manage streamed object.
Abstract base class for different kinds of events.
int main(int argc, char **argv)
Run all tests.