Belle II Software development
ZMQApp.details.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <daq/hbasf2/apps/ZMQApp.h>
11#include <fstream>
12#include <nlohmann/json.hpp>
13
14namespace Belle2 {
19
20 template <class AInputConnection, class AOutputConnection>
21 void ZMQStandardApp<AInputConnection, AOutputConnection>::initFromConsole(const std::string& description, int argc, char* argv[])
22 {
23
24 po::options_description desc(description);
25 std::string connection_file;
26 int debugLevel(0);
27 desc.add_options()
28 ("connection-file", boost::program_options::value<std::string>(&connection_file),
29 "if given print the connection information for input/output and monitoring socket to the given filename "
30 "in json format")
31 ("debug", boost::program_options::value<int>(&debugLevel), "Enable debug logging");
32 addOptions(desc);
33
34 po::positional_options_description p;
35
36 po::variables_map vm;
37 try {
38 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
39 } catch (std::exception& e) {
40 B2FATAL(e.what());
41 }
42
43 if (vm.count("help")) {
44 std::cout << desc << std::endl;
45 exit(1);
46 }
47
48 try {
49 po::notify(vm);
50 } catch (std::exception& e) {
51 B2FATAL(e.what());
52 }
53
54 if (debugLevel > 0) {
55 auto& logging = LogSystem::Instance();
56 logging.getLogConfig()->setLogLevel(LogConfig::c_Debug);
57 logging.getLogConfig()->setDebugLevel(debugLevel);
58 B2DEBUG(1, "Enabled debug logging");
59 }
60
62
63 if (not connection_file.empty()) {
64 B2DEBUG(1, "Write connection file" << LogVar("connection_file", connection_file));
65 nlohmann::json json;
66 try {
67 json["input"] = m_input->getEndPoint();
68 } catch (zmq::error_t& e) {
69 B2WARNING(e.what());
70 }
71 try {
72 json["output"] = m_output->getEndPoint();
73 } catch (zmq::error_t& e) {
74 B2WARNING(e.what());
75 }
76 std::ofstream connections(connection_file, std::ofstream::trunc);
77 if (!connections) {
78 B2FATAL("Cannot write connection file" << LogVar("connection_file", connection_file));
79 }
80 connections << std::setw(4) << json << std::endl;
81 }
82 }
83
84 template <class AInputConnection, class AOutputConnection>
86 {
87 resetTimer();
89 while (not terminated()) {
90 if (not m_output->isReady()) {
91 // if the output is not ready, we can not sent anything. So lets just poll on output
92 // and monitoring until it becomes ready
93 m_monitor->log("output_state", "not_ready");
94 pollEvent(false);
95 } else {
96 // if it is ready, we can also include the input socket as long as output stays ready
97 m_monitor->log("output_state", "ready");
98 pollEvent(true);
99 }
100 }
101 }
102
103 template <class AInputConnection, class AOutputConnection>
105 {
106 auto reactToOutput = [this]() {
107 // Get all messages from output socket
108 while (ZMQConnection::hasMessage(m_output.get()) and not terminated()) {
109 handleOutput();
111 break;
112 }
114 };
116 auto reactToMonitor = [this]() {
117 // Get all messages from monitoring socket
118 while (ZMQConnection::hasMessage(m_monitor.get()) and not terminated()) {
120 }
121 };
122
123 auto reactToInput = [this]() {
124 // Get all messages from input as long output is ready
125 while (ZMQConnection::hasMessage(m_input.get()) and m_output->isReady() and not terminated()) {
126 handleInput();
127 if (m_monitorHasPriority) {
128 break;
129 }
130 }
131 };
132
133 m_monitor->logTime("waiting_since");
134
135 if (pollOnInput) {
136 ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}, {m_input.get(), reactToInput}},
137 m_remainingTime);
138 } else {
139 ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}}, m_remainingTime);
140 }
141
142 if (checkTimer() and not terminated()) {
143 B2ASSERT("There is no timeout set, but we still call the timeout() function? A bug!", m_timeout != 0);
144 m_monitor->increment("timeouts");
145 handleTimeout();
146 resetTimer();
147 } else {
148 updateTimer();
149 }
150 }
151
152 template <class AInputConnection, class AOutputConnection>
158
159 template <class AInputConnection, class AOutputConnection>
161 {
162 desc.add_options()("help,h", "Print this help message")("monitor", po::value<std::string>(&m_monitoringAddress)->required(),
163 "where to listen for monitoring");
164 }
165
166 template <class AInputConnection, class AOutputConnection>
170
171 template <class AInputConnection, class AOutputConnection>
176
177 template <class AInputConnection, class AOutputConnection>
182
183 template <class AInputConnection, class AOutputConnection>
187
188 template <class AInputConnection, class AOutputConnection>
190 {
191 return not m_mainLoop.isRunning() or m_terminate;
192 }
193
194 template <class AInputConnection, class AOutputConnection>
200
201 template <class AInputConnection, class AOutputConnection>
203 {
204 // if there is no timeout, we should never update the remaining time
205 if (m_timeout == 0) {
206 m_remainingTime = -1;
207 return;
208 }
209
210 auto currentTime = std::chrono::system_clock::now();
211 auto timeDifference = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - m_start);
212 m_remainingTime = m_timeout * 1000 - timeDifference.count();
213 if (m_remainingTime < 0) {
214 m_remainingTime = 0;
215 }
216 }
217
218 template <class AInputConnection, class AOutputConnection>
220 {
221 // if there is no timeout, we should never set the remaining time
222 if (m_timeout == 0) {
223 m_remainingTime = -1;
224 return;
225 }
226
227 m_start = std::chrono::system_clock::now();
228 m_remainingTime = m_timeout * 1000;
229 }
230
231 template <class AInputConnection, class AOutputConnection>
233 {
234 auto monitoringMessage = m_monitor->handleIncomingData();
235
236 if (monitoringMessage->isMessage(EMessageTypes::c_newRunMessage)) {
237 handleExternalSignal(EMessageTypes::c_newRunMessage);
238 return;
239 } else if (monitoringMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
240 handleExternalSignal(EMessageTypes::c_lastEventMessage);
241 return;
242 } else if (monitoringMessage->isMessage(EMessageTypes::c_terminateMessage)) {
243 handleExternalSignal(EMessageTypes::c_terminateMessage);
244 return;
245 }
246
247 std::stringstream buffer;
248 fillMonitoringJSON(buffer);
249
250 auto message = ZMQMessageFactory::createMessage(monitoringMessage->getIdentity(),
251 EMessageTypes::c_confirmMessage, buffer.str());
252 m_monitor->handleEvent(std::move(message));
253 }
254
255 template <class AInputConnection, class AOutputConnection>
257 {
258 buffer << "{" << std::endl;
259 buffer << "\"monitor\": " << m_monitor->getMonitoringJSON() << "," << std::endl;
260 buffer << "\"input\": " << m_input->getMonitoringJSON() << "," << std::endl;
261 buffer << "\"output\": " << m_output->getMonitoringJSON() << std::endl;
262 buffer << "}" << std::endl;
263 }
264
266} // namespace Belle2
@ c_Debug
Debug: for code development.
Definition LogConfig.h:26
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition LogSystem.cc:28
static bool hasMessage(const ZMQConnection *connection)
Check if the given connection as an incoming message (right now, no waiting).
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition ZMQParent.h:39
Connection type to be used for answering simple requests, e.g.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
Definition ZMQApp.h:77
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition ZMQApp.h:65
std::unique_ptr< ZMQNullConnection > m_output
Definition ZMQApp.h:69
HLTMainLoop m_mainLoop
Internal signal handler.
Definition ZMQApp.h:102
std::unique_ptr< ZMQSimpleConnection > m_monitor
Definition ZMQApp.h:71
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition ZMQApp.h:73
std::unique_ptr< ZMQConfirmedInput > m_input
Definition ZMQApp.h:67
std::string m_monitoringAddress
Storage for the monitoring address for the cmd arguments.
Definition ZMQApp.h:104
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition ZMQApp.h:75
int m_remainingTime
Counter for the remaining time until a timeout happens.
Definition ZMQApp.h:106
std::chrono::system_clock::time_point m_start
Start time for the timeout.
Definition ZMQApp.h:108
Class to store variables with their name which were sent to the logging service.
void initFromConsole(const std::string &description, int argc, char *argv[])
Should be called before the main() function to initialize the connections using the paremeters given ...
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
void updateTimer()
Helper function to update the remaining time.
virtual void fillMonitoringJSON(std::stringstream &buffer) const
Using the connections, fill up a buffer with the content to be monitored.
virtual void handleExternalSignal(EMessageTypes)
Will get called for every signal message on the monitoring connection. Can be overridden in a derived...
virtual void handleInput()
Will get called for every message on the input connection. Can be overridden in a derived class....
void main()
Start the main loop polling on the output and monitoring connections and eventually also on the input...
virtual void handleOutput()
Will get called for every message on the output connection. Can be overridden in a derived class....
bool checkTimer()
Helper function to check, if the timeout should happen.
virtual void handleTimeout()
Will get called on a timeout. Can be overridden in a derived class. Empty by default.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.