10#include <daq/hbasf2/apps/ZMQApp.h>
12#include <nlohmann/json.hpp>
20 template <
class AInputConnection,
class AOutputConnection>
24 po::options_description desc(description);
25 std::string connection_file;
28 (
"connection-file", boost::program_options::value<std::string>(&connection_file),
29 "if given print the connection information for input/output and monitoring socket to the given filename "
31 (
"debug", boost::program_options::value<int>(&debugLevel),
"Enable debug logging");
34 po::positional_options_description p;
38 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
39 }
catch (std::exception& e) {
43 if (vm.count(
"help")) {
44 std::cout << desc << std::endl;
50 }
catch (std::exception& e) {
57 logging.getLogConfig()->setDebugLevel(debugLevel);
58 B2DEBUG(1,
"Enabled debug logging");
63 if (not connection_file.empty()) {
64 B2DEBUG(1,
"Write connection file" <<
LogVar(
"connection_file", connection_file));
67 json[
"input"] =
m_input->getEndPoint();
68 }
catch (zmq::error_t& e) {
72 json[
"output"] = m_output->getEndPoint();
73 }
catch (zmq::error_t& e) {
76 std::ofstream connections(connection_file, std::ofstream::trunc);
78 B2FATAL(
"Cannot write connection file" << LogVar(
"connection_file", connection_file));
80 connections << std::setw(4) << json << std::endl;
84 template <
class AInputConnection,
class AOutputConnection>
93 m_monitor->log(
"output_state",
"not_ready");
103 template <
class AInputConnection,
class AOutputConnection>
106 auto reactToOutput = [
this]() {
116 auto reactToMonitor = [
this]() {
123 auto reactToInput = [
this]() {
127 if (m_monitorHasPriority) {
133 m_monitor->logTime(
"waiting_since");
136 ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}, {m_input.get(), reactToInput}},
139 ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}}, m_remainingTime);
142 if (checkTimer() and not terminated()) {
143 B2ASSERT(
"There is no timeout set, but we still call the timeout() function? A bug!", m_timeout != 0);
144 m_monitor->increment(
"timeouts");
152 template <
class AInputConnection,
class AOutputConnection>
159 template <
class AInputConnection,
class AOutputConnection>
162 desc.add_options()(
"help,h",
"Print this help message")(
"monitor", po::value<std::string>(&
m_monitoringAddress)->required(),
163 "where to listen for monitoring");
166 template <
class AInputConnection,
class AOutputConnection>
171 template <
class AInputConnection,
class AOutputConnection>
177 template <
class AInputConnection,
class AOutputConnection>
183 template <
class AInputConnection,
class AOutputConnection>
188 template <
class AInputConnection,
class AOutputConnection>
194 template <
class AInputConnection,
class AOutputConnection>
201 template <
class AInputConnection,
class AOutputConnection>
210 auto currentTime = std::chrono::system_clock::now();
211 auto timeDifference = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime -
m_start);
218 template <
class AInputConnection,
class AOutputConnection>
227 m_start = std::chrono::system_clock::now();
231 template <
class AInputConnection,
class AOutputConnection>
234 auto monitoringMessage =
m_monitor->handleIncomingData();
236 if (monitoringMessage->isMessage(EMessageTypes::c_newRunMessage)) {
239 }
else if (monitoringMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
242 }
else if (monitoringMessage->isMessage(EMessageTypes::c_terminateMessage)) {
247 std::stringstream buffer;
251 EMessageTypes::c_confirmMessage, buffer.str());
252 m_monitor->handleEvent(std::move(message));
255 template <
class AInputConnection,
class AOutputConnection>
258 buffer <<
"{" << std::endl;
259 buffer <<
"\"monitor\": " <<
m_monitor->getMonitoringJSON() <<
"," << std::endl;
260 buffer <<
"\"input\": " <<
m_input->getMonitoringJSON() <<
"," << std::endl;
261 buffer <<
"\"output\": " <<
m_output->getMonitoringJSON() << std::endl;
262 buffer <<
"}" << std::endl;
@ c_Debug
Debug: for code development.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
static bool hasMessage(const ZMQConnection *connection)
Check if the given connection as an incoming message (right now, no waiting).
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Connection type to be used for answering simple requests, e.g.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
std::unique_ptr< ZMQNullConnection > m_output
HLTMainLoop m_mainLoop
Internal signal handler.
std::unique_ptr< ZMQSimpleConnection > m_monitor
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
std::unique_ptr< ZMQConfirmedInput > m_input
std::string m_monitoringAddress
Storage for the monitoring address for the cmd arguments.
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
int m_remainingTime
Counter for the remaining time until a timeout happens.
std::chrono::system_clock::time_point m_start
Start time for the timeout.
Class to store variables with their name which were sent to the logging service.
void pollEvent(bool pollOnInput)
void initFromConsole(const std::string &description, int argc, char *argv[])
Should be called before the main() function to initialize the connections using the paremeters given ...
virtual void initialize()
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
void updateTimer()
Helper function to update the remaining time.
virtual void fillMonitoringJSON(std::stringstream &buffer) const
Using the connections, fill up a buffer with the content to be monitored.
virtual void handleExternalSignal(EMessageTypes)
Will get called for every signal message on the monitoring connection. Can be overridden in a derived...
virtual void handleInput()
Will get called for every message on the input connection. Can be overridden in a derived class....
void main()
Start the main loop polling on the output and monitoring connections and eventually also on the input...
virtual void handleOutput()
Will get called for every message on the output connection. Can be overridden in a derived class....
bool checkTimer()
Helper function to check, if the timeout should happen.
virtual void handleTimeout()
Will get called on a timeout. Can be overridden in a derived class. Empty by default.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.