Belle II Software  release-05-01-25
ZMQApp.details.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <daq/hbasf2/apps/ZMQApp.h>
13 
14 namespace Belle2 {
20  template <class AInputConnection, class AOutputConnection>
21  void ZMQStandardApp<AInputConnection, AOutputConnection>::initFromConsole(const std::string& description, int argc, char* argv[])
22  {
23  po::options_description desc(description);
24  addOptions(desc);
25 
26  po::positional_options_description p;
27 
28  po::variables_map vm;
29  try {
30  po::store(
31  po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
32  } catch (std::exception& e) {
33  B2FATAL(e.what());
34  }
35 
36  if (vm.count("help")) {
37  std::cout << desc << std::endl;
38  exit(1);
39  }
40 
41  try {
42  po::notify(vm);
43  } catch (std::exception& e) {
44  B2FATAL(e.what());
45  }
46 
47  initialize();
48  }
49 
50  template <class AInputConnection, class AOutputConnection>
52  {
53  resetTimer();
54 
55  while (not terminated()) {
56  if (not m_output->isReady()) {
57  // if the output is not ready, we can not sent anything. So lets just poll on output
58  // and monitoring until it becomes ready
59  m_monitor->log("output_state", "not_ready");
60  pollEvent(false);
61  } else {
62  // if it is ready, we can also include the input socket as long as output stays ready
63  m_monitor->log("output_state", "ready");
64  pollEvent(true);
65  }
66  }
67  }
68 
69  template <class AInputConnection, class AOutputConnection>
71  {
72  auto reactToOutput = [this]() {
73  // Get all messages from output socket
74  while (ZMQConnection::hasMessage(m_output.get()) and not terminated()) {
75  handleOutput();
76  if (m_monitorHasPriority) {
77  break;
78  }
79  }
80  };
81 
82  auto reactToMonitor = [this]() {
83  // Get all messages from monitoring socket
84  while (ZMQConnection::hasMessage(m_monitor.get()) and not terminated()) {
85  handleMonitoring();
86  }
87  };
88 
89  auto reactToInput = [this]() {
90  // Get all messages from input as long output is ready
91  while (ZMQConnection::hasMessage(m_input.get()) and m_output->isReady() and not terminated()) {
92  handleInput();
93  if (m_monitorHasPriority) {
94  break;
95  }
96  }
97  };
98 
99  m_monitor->logTime("waiting_since");
100 
101  if (pollOnInput) {
102  ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}, {m_input.get(), reactToInput}},
103  m_remainingTime);
104  } else {
105  ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}}, m_remainingTime);
106  }
107 
108  if (checkTimer() and not terminated()) {
109  B2ASSERT("There is no timeout set, but we still call the timeout() function? A bug!", m_timeout != 0);
110  m_monitor->increment("timeouts");
111  handleTimeout();
112  resetTimer();
113  } else {
114  updateTimer();
115  }
116  }
117 
118  template <class AInputConnection, class AOutputConnection>
120  {
121  m_parent.reset(new ZMQParent);
122  m_monitor.reset(new ZMQSimpleConnection(m_monitoringAddress, m_parent));
123  }
124 
125  template <class AInputConnection, class AOutputConnection>
126  void ZMQStandardApp<AInputConnection, AOutputConnection>::addOptions(po::options_description& desc)
127  {
128  desc.add_options()("help,h", "Print this help message")("monitor", po::value<std::string>(&m_monitoringAddress)->required(),
129  "where to listen for monitoring");
130  }
131 
132  template <class AInputConnection, class AOutputConnection>
134  {
135  }
136 
137  template <class AInputConnection, class AOutputConnection>
139  {
140  m_input->handleIncomingData();
141  }
142 
143  template <class AInputConnection, class AOutputConnection>
145  {
146  m_output->handleIncomingData();
147  }
148 
149  template <class AInputConnection, class AOutputConnection>
151  {
152  }
153 
154  template <class AInputConnection, class AOutputConnection>
156  {
157  return not m_mainLoop.isRunning() or m_terminate;
158  }
159 
160  template <class AInputConnection, class AOutputConnection>
162  {
163  updateTimer();
164  return m_remainingTime == 0;
165  }
166 
167  template <class AInputConnection, class AOutputConnection>
169  {
170  // if there is no timeout, we should never update the remaining time
171  if (m_timeout == 0) {
172  m_remainingTime = -1;
173  return;
174  }
175 
176  auto currentTime = std::chrono::system_clock::now();
177  auto timeDifference = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - m_start);
178  m_remainingTime = m_timeout * 1000 - timeDifference.count();
179  if (m_remainingTime < 0) {
180  m_remainingTime = 0;
181  }
182  }
183 
184  template <class AInputConnection, class AOutputConnection>
186  {
187  // if there is no timeout, we should never set the remaining time
188  if (m_timeout == 0) {
189  m_remainingTime = -1;
190  return;
191  }
192 
193  m_start = std::chrono::system_clock::now();
194  m_remainingTime = m_timeout * 1000;
195  }
196 
197  template <class AInputConnection, class AOutputConnection>
199  {
200  auto monitoringMessage = m_monitor->handleIncomingData();
201 
202  if (monitoringMessage->isMessage(EMessageTypes::c_newRunMessage)) {
203  handleExternalSignal(EMessageTypes::c_newRunMessage);
204  return;
205  } else if (monitoringMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
206  handleExternalSignal(EMessageTypes::c_lastEventMessage);
207  return;
208  } else if (monitoringMessage->isMessage(EMessageTypes::c_terminateMessage)) {
209  handleExternalSignal(EMessageTypes::c_terminateMessage);
210  return;
211  }
212 
213  std::stringstream buffer;
214  fillMonitoringJSON(buffer);
215 
216  auto message = ZMQMessageFactory::createMessage(monitoringMessage->getIdentity(),
217  EMessageTypes::c_confirmMessage, buffer.str());
218  m_monitor->handleEvent(std::move(message));
219  }
220 
221  template <class AInputConnection, class AOutputConnection>
223  {
224  buffer << "{" << std::endl;
225  buffer << "\"monitor\": " << m_monitor->getMonitoringJSON() << "," << std::endl;
226  buffer << "\"input\": " << m_input->getMonitoringJSON() << "," << std::endl;
227  buffer << "\"output\": " << m_output->getMonitoringJSON() << std::endl;
228  buffer << "}" << std::endl;
229  }
230 
232 } // namespace Belle2
Belle2::ZMQStandardApp::pollEvent
void pollEvent(bool pollOnInput)
Poll until a single event is retreived.
Definition: ZMQApp.details.h:78
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
prepareAsicCrosstalkSimDB.e
e
aux.
Definition: prepareAsicCrosstalkSimDB.py:53
Belle2::ZMQStandardApp::handleTimeout
virtual void handleTimeout()
Will get called on a timeout. Can be overridden in a derived class. Empty by default.
Definition: ZMQApp.details.h:158
Belle2::ZMQStandardApp::checkTimer
bool checkTimer()
Helper function to check, if the timeout should happen.
Definition: ZMQApp.details.h:169
Belle2::ZMQStandardApp::main
void main()
Start the main loop polling on the output and monitoring connections and eventually also on the input...
Definition: ZMQApp.details.h:59
Belle2::ZMQStandardApp::handleInput
virtual void handleInput()
Will get called for every message on the input connection. Can be overridden in a derived class....
Definition: ZMQApp.details.h:146
Belle2::ZMQStandardApp::resetTimer
void resetTimer()
Helper function to reset the start time and the remaining time.
Definition: ZMQApp.details.h:193
Belle2::ZMQStandardApp::handleExternalSignal
virtual void handleExternalSignal(EMessageTypes)
Will get called for every signal message on the monitoring connection. Can be overridden in a derived...
Definition: ZMQApp.details.h:141
Belle2::ZMQStandardApp::addOptions
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
Definition: ZMQApp.details.h:134
Belle2::ZMQStandardApp::updateTimer
void updateTimer()
Helper function to update the remaining time.
Definition: ZMQApp.details.h:176
Belle2::ZMQStandardApp::terminated
bool terminated() const
Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
Definition: ZMQApp.details.h:163
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQMessageFactory::createMessage
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Definition: ZMQMessageFactory.h:37
Belle2::ZMQConnection::poll
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
Definition: ZMQConnection.cc:29
Belle2::ZMQStandardApp::handleMonitoring
void handleMonitoring()
Handle an incoming message on the monitoring socket by either calling handleExternalSignal() or by pa...
Definition: ZMQApp.details.h:206
Belle2::ZMQStandardApp::initFromConsole
void initFromConsole(const std::string &description, int argc, char *argv[])
Should be called before the main() function to initialize the connections using the paremeters given ...
Definition: ZMQApp.details.h:29
Belle2::ZMQConnection::hasMessage
static bool hasMessage(const ZMQConnection *connection)
Check if the given connection as an incoming message (right now, no waiting).
Definition: ZMQConnection.cc:22
Belle2::ZMQStandardApp::handleOutput
virtual void handleOutput()
Will get called for every message on the output connection. Can be overridden in a derived class....
Definition: ZMQApp.details.h:152
Belle2::ZMQStandardApp::initialize
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
Definition: ZMQApp.details.h:127
Belle2::ZMQStandardApp::fillMonitoringJSON
virtual void fillMonitoringJSON(std::stringstream &buffer) const
Using the connections, fill up a buffer with the content to be monitored.
Definition: ZMQApp.details.h:230