Belle II Software  release-08-01-10
ZMQApp.details.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <daq/hbasf2/apps/ZMQApp.h>
11 #include <fstream>
12 #include <nlohmann/json.hpp>
13 
14 namespace Belle2 {
20  template <class AInputConnection, class AOutputConnection>
21  void ZMQStandardApp<AInputConnection, AOutputConnection>::initFromConsole(const std::string& description, int argc, char* argv[])
22  {
23 
24  po::options_description desc(description);
25  std::string connection_file;
26  int debugLevel(0);
27  desc.add_options()
28  ("connection-file", boost::program_options::value<std::string>(&connection_file),
29  "if given print the connection information for input/output and monitoring socket to the given filename "
30  "in json format")
31  ("debug", boost::program_options::value<int>(&debugLevel), "Enable debug logging");
32  addOptions(desc);
33 
34  po::positional_options_description p;
35 
36  po::variables_map vm;
37  try {
38  po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
39  } catch (std::exception& e) {
40  B2FATAL(e.what());
41  }
42 
43  if (vm.count("help")) {
44  std::cout << desc << std::endl;
45  exit(1);
46  }
47 
48  try {
49  po::notify(vm);
50  } catch (std::exception& e) {
51  B2FATAL(e.what());
52  }
53 
54  if (debugLevel > 0) {
55  auto& logging = LogSystem::Instance();
56  logging.getLogConfig()->setLogLevel(LogConfig::c_Debug);
57  logging.getLogConfig()->setDebugLevel(debugLevel);
58  B2DEBUG(1, "Enabled debug logging");
59  }
60 
61  initialize();
62 
63  if (not connection_file.empty()) {
64  B2DEBUG(1, "Write connection file" << LogVar("connection_file", connection_file));
65  nlohmann::json json;
66  try {
67  json["input"] = m_input->getEndPoint();
68  } catch (zmq::error_t& e) {
69  B2WARNING(e.what());
70  }
71  try {
72  json["output"] = m_output->getEndPoint();
73  } catch (zmq::error_t& e) {
74  B2WARNING(e.what());
75  }
76  std::ofstream connections(connection_file, std::ofstream::trunc);
77  if (!connections) {
78  B2FATAL("Cannot write connection file" << LogVar("connection_file", connection_file));
79  }
80  connections << std::setw(4) << json << std::endl;
81  }
82  }
83 
84  template <class AInputConnection, class AOutputConnection>
86  {
87  resetTimer();
88 
89  while (not terminated()) {
90  if (not m_output->isReady()) {
91  // if the output is not ready, we can not sent anything. So lets just poll on output
92  // and monitoring until it becomes ready
93  m_monitor->log("output_state", "not_ready");
94  pollEvent(false);
95  } else {
96  // if it is ready, we can also include the input socket as long as output stays ready
97  m_monitor->log("output_state", "ready");
98  pollEvent(true);
99  }
100  }
101  }
102 
103  template <class AInputConnection, class AOutputConnection>
105  {
106  auto reactToOutput = [this]() {
107  // Get all messages from output socket
108  while (ZMQConnection::hasMessage(m_output.get()) and not terminated()) {
109  handleOutput();
110  if (m_monitorHasPriority) {
111  break;
112  }
113  }
114  };
115 
116  auto reactToMonitor = [this]() {
117  // Get all messages from monitoring socket
118  while (ZMQConnection::hasMessage(m_monitor.get()) and not terminated()) {
119  handleMonitoring();
120  }
121  };
122 
123  auto reactToInput = [this]() {
124  // Get all messages from input as long output is ready
125  while (ZMQConnection::hasMessage(m_input.get()) and m_output->isReady() and not terminated()) {
126  handleInput();
127  if (m_monitorHasPriority) {
128  break;
129  }
130  }
131  };
132 
133  m_monitor->logTime("waiting_since");
134 
135  if (pollOnInput) {
136  ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}, {m_input.get(), reactToInput}},
137  m_remainingTime);
138  } else {
139  ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}}, m_remainingTime);
140  }
141 
142  if (checkTimer() and not terminated()) {
143  B2ASSERT("There is no timeout set, but we still call the timeout() function? A bug!", m_timeout != 0);
144  m_monitor->increment("timeouts");
145  handleTimeout();
146  resetTimer();
147  } else {
148  updateTimer();
149  }
150  }
151 
152  template <class AInputConnection, class AOutputConnection>
154  {
155  m_parent.reset(new ZMQParent);
156  m_monitor.reset(new ZMQSimpleConnection(m_monitoringAddress, m_parent));
157  }
158 
159  template <class AInputConnection, class AOutputConnection>
161  {
162  desc.add_options()("help,h", "Print this help message")("monitor", po::value<std::string>(&m_monitoringAddress)->required(),
163  "where to listen for monitoring");
164  }
165 
166  template <class AInputConnection, class AOutputConnection>
168  {
169  }
170 
171  template <class AInputConnection, class AOutputConnection>
173  {
174  m_input->handleIncomingData();
175  }
176 
177  template <class AInputConnection, class AOutputConnection>
179  {
180  m_output->handleIncomingData();
181  }
182 
183  template <class AInputConnection, class AOutputConnection>
185  {
186  }
187 
188  template <class AInputConnection, class AOutputConnection>
190  {
191  return not m_mainLoop.isRunning() or m_terminate;
192  }
193 
194  template <class AInputConnection, class AOutputConnection>
196  {
197  updateTimer();
198  return m_remainingTime == 0;
199  }
200 
201  template <class AInputConnection, class AOutputConnection>
203  {
204  // if there is no timeout, we should never update the remaining time
205  if (m_timeout == 0) {
206  m_remainingTime = -1;
207  return;
208  }
209 
210  auto currentTime = std::chrono::system_clock::now();
211  auto timeDifference = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - m_start);
212  m_remainingTime = m_timeout * 1000 - timeDifference.count();
213  if (m_remainingTime < 0) {
214  m_remainingTime = 0;
215  }
216  }
217 
218  template <class AInputConnection, class AOutputConnection>
220  {
221  // if there is no timeout, we should never set the remaining time
222  if (m_timeout == 0) {
223  m_remainingTime = -1;
224  return;
225  }
226 
227  m_start = std::chrono::system_clock::now();
228  m_remainingTime = m_timeout * 1000;
229  }
230 
231  template <class AInputConnection, class AOutputConnection>
233  {
234  auto monitoringMessage = m_monitor->handleIncomingData();
235 
236  if (monitoringMessage->isMessage(EMessageTypes::c_newRunMessage)) {
237  handleExternalSignal(EMessageTypes::c_newRunMessage);
238  return;
239  } else if (monitoringMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
240  handleExternalSignal(EMessageTypes::c_lastEventMessage);
241  return;
242  } else if (monitoringMessage->isMessage(EMessageTypes::c_terminateMessage)) {
243  handleExternalSignal(EMessageTypes::c_terminateMessage);
244  return;
245  }
246 
247  std::stringstream buffer;
248  fillMonitoringJSON(buffer);
249 
250  auto message = ZMQMessageFactory::createMessage(monitoringMessage->getIdentity(),
251  EMessageTypes::c_confirmMessage, buffer.str());
252  m_monitor->handleEvent(std::move(message));
253  }
254 
255  template <class AInputConnection, class AOutputConnection>
257  {
258  buffer << "{" << std::endl;
259  buffer << "\"monitor\": " << m_monitor->getMonitoringJSON() << "," << std::endl;
260  buffer << "\"input\": " << m_input->getMonitoringJSON() << "," << std::endl;
261  buffer << "\"output\": " << m_output->getMonitoringJSON() << std::endl;
262  buffer << "}" << std::endl;
263  }
264 
266 } // namespace Belle2
@ c_Debug
Debug: for code development.
Definition: LogConfig.h:26
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
static bool hasMessage(const ZMQConnection *connection)
Check if the given connection as an incoming message (right now, no waiting).
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:39
Connection type to be used for answering simple requests, e.g.
Class to store variables with their name which were sent to the logging service.
void pollEvent(bool pollOnInput)
Poll until a single event is retreived.
void initFromConsole(const std::string &description, int argc, char *argv[])
Should be called before the main() function to initialize the connections using the paremeters given ...
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
void handleMonitoring()
Handle an incoming message on the monitoring socket by either calling handleExternalSignal() or by pa...
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
void updateTimer()
Helper function to update the remaining time.
virtual void fillMonitoringJSON(std::stringstream &buffer) const
Using the connections, fill up a buffer with the content to be monitored.
virtual void handleExternalSignal(EMessageTypes)
Will get called for every signal message on the monitoring connection. Can be overridden in a derived...
virtual void handleInput()
Will get called for every message on the input connection. Can be overridden in a derived class....
bool terminated() const
Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
void main()
Start the main loop polling on the output and monitoring connections and eventually also on the input...
void resetTimer()
Helper function to reset the start time and the remaining time.
virtual void handleOutput()
Will get called for every message on the output connection. Can be overridden in a derived class....
bool checkTimer()
Helper function to check, if the timeout should happen.
virtual void handleTimeout()
Will get called on a timeout. Can be overridden in a derived class. Empty by default.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.