Belle II Software development
b2hlt_file2socket.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/rfarm/event/hltsocket/HLTMainLoop.h>
9#include <daq/rfarm/event/hltsocket/HLTFile.h>
10#include <daq/rfarm/event/hltsocket/HLTSocket.h>
11#include <framework/logging/Logger.h>
12#include <framework/pcore/EvtMessage.h>
13
14#include <boost/program_options.hpp>
15#include <chrono>
16#include <iostream>
17
18#define MAXEVTSIZE 80000000
19
20using namespace Belle2;
21namespace po = boost::program_options;
22
23int main(int argc, char* argv[])
24{
25 unsigned int port;
26 std::string sourceHost;
27 bool raw = false;
28 bool repeat = false;
29 std::string file_name;
30
31 po::options_description desc("b2hlt_file2socket FILE-NAME PORT SHM-NAME SHM-ID");
32 desc.add_options()
33 ("help,h", "Print this help message")
34 ("port,p", po::value<unsigned int>(&port)->required(), "port number to connect or listen to")
35 ("file-name,f", po::value<std::string>(&file_name)->required(), "file name to write to")
36 ("raw", po::bool_switch(&raw)->default_value(false), "send and receive raw data instead of event buffers")
37 ("repeat", po::bool_switch(&repeat)->default_value(false), "repeat after the file is finished");
38
39
40 po::positional_options_description p;
41 p.add("file-name", 1).add("port", 1);
42
43 po::variables_map vm;
44 try {
45 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
46 } catch (std::exception& e) {
47 B2FATAL(e.what());
48 }
49
50 if (vm.count("help")) {
51 std::cout << desc << std::endl;
52 return 1;
53 }
54
55 try {
56 po::notify(vm);
57 } catch (std::exception& e) {
58 B2FATAL(e.what());
59 }
60
61 int* buffer = new int[MAXEVTSIZE];
62
63 HLTMainLoop mainLoop;
64
65 HLTSocket socket;
66 HLTFile file;
67 int size;
68 int returnValue;
69 int nevt = 0;
70 bool terminate = false;
71
72 if (not file.open(file_name, raw, "r")) {
73 B2ERROR("Can not open file");
74 terminate = true;
75 }
76
77
78 auto start = std::chrono::steady_clock::now();
79 while (mainLoop.isRunning() and not terminate) {
80 // Connect socket if needed
81 if (not socket.initialized()) {
82 if (vm.count("connect-to")) {
83 if (not socket.connect(sourceHost, port, mainLoop)) {
84 B2ERROR("Could not reconnnect!");
85 break;
86 }
87 } else {
88 if (not socket.accept(port)) {
89 B2ERROR("Could not reconnect!");
90 break;
91 }
92 }
93 B2RESULT("Connected.");
94 start = std::chrono::steady_clock::now();
95 }
96
97 // Read from file
98 if (raw) {
99 size = file.get_wordbuf(buffer, MAXEVTSIZE);
100 } else {
101 size = file.get(reinterpret_cast<char*>(buffer), MAXEVTSIZE);
102 }
103 // Error checking socket
104 if (size == 0) {
105 if (repeat) {
106 file.open(file_name, raw, "r");
107 continue;
108 } else {
109 B2RESULT("Reached end of file");
110 break;
111 }
112 } else if (size < 0) {
113 if (mainLoop.isRunning()) {
114 B2ERROR("Error in receiving the event! Aborting.");
115 }
116 // This is fine if we are terminating anyways
117 break;
118 }
119 B2ASSERT("Size is negative! This should be handled above. Not good!", size > 0);
120
121 if (raw) {
122 returnValue = socket.put_wordbuf(buffer, size);
123 } else {
124 EvtMessage message(reinterpret_cast<char*>(buffer));
125 returnValue = socket.put(message.buffer(), message.size());
126 // Terminate messages make us terminate
127 if (message.type() == MSG_TERMINATE) {
128 B2RESULT("Having received terminate message");
129 terminate = true;
130 }
131 }
132 if (returnValue == 0) {
133 B2ERROR("Error in sending the event! Reconnecting.");
134 socket.deinitialize();
135 continue;
136 } else if (returnValue < 0) {
137 if (mainLoop.isRunning()) {
138 B2ERROR("Error in sending the event! Aborting.");
139 }
140 // This is fine if we are terminating anyways
141 break;
142 }
143 B2ASSERT("Written size is negative! This should be handled above. Not good!", returnValue > 0);
144
145 // Logging
146 nevt++;
147 if (nevt % 5000 == 0) {
148 auto current = std::chrono::steady_clock::now();
149 double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(current - start).count();
150 B2RESULT("b2hlt_file2socket event number: " << nevt << " with a rate of " << 5000 / elapsed << " Hz");
151 start = std::chrono::steady_clock::now();
152 }
153 }
154
155 B2RESULT("Program terminated.");
156}
Class to manage streamed object.
Definition: EvtMessage.h:59
Abstract base class for different kinds of events.