Belle II Software  release-05-01-25
ZMQCollector.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <daq/hbasf2/apps/ZMQCollector.h>
11 #include <framework/logging/Logger.h>
12 
13 using namespace Belle2;
14 
15 void ZMQCollector::addOptions(po::options_description& desc)
16 {
18  desc.add_options()
19  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
20  "where to read the events from")
21  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
22  "where to send the events to")
23  ("lax", boost::program_options::bool_switch(&m_lax)->default_value(m_lax),
24  "dismiss events if no worker is ready (lax) or not")
25  ("stopWaitingTime",
26  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
27  "how long to wait after no events come anymore");
28 }
29 
31 {
35 }
36 
38 {
39  if (type == EMessageTypes::c_newRunMessage) {
40  m_input->clear();
41  m_output->clear();
42  } else if (type == EMessageTypes::c_lastEventMessage) {
44  resetTimer();
45  }
46 }
47 
49 {
50  auto message = m_input->handleIncomingData();
51 
52  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
53  resetTimer();
54 
55  if (message) {
56  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
57  m_terminate = true;
58  }
59 
60  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
61  }
62 }
63 
65 {
66  auto message = m_input->overwriteStopMessage();
67  if (message) {
68  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
69  }
70 
71  // We do not want to send out another stop message, so reset the counter
72  m_timeout = 0;
73  resetTimer();
74 }
75 
76 void ZMQOutputAdapter::addOptions(po::options_description& desc)
77 {
79  desc.add_options()
80  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
81  "where to read the events from")
82  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
83  "where to send the events to");
84 }
85 
87 {
90  m_output.reset(new ZMQRawOutput(m_outputAddress, true, m_parent));
91 }
92 
94 {
95 }
96 
98 {
99  auto message = m_input->handleIncomingData();
100  if (not message) {
101  return;
102  }
103 
104  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
105  m_terminate = true;
106  }
107 
108  if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
109  B2INFO(message->getDataMessage().size());
110  m_output->handleEvent(std::move(message->getDataMessage()));
111  }
112 }
113 
114 void ZMQProxyCollector::addOptions(po::options_description& desc)
115 {
117  desc.add_options()
118  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
119  "where to read the events from")
120  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
121  "where to send the events to")
122  ("stopWaitingTime",
123  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
124  "how long to wait after no events come anymore");
125 }
126 
128 {
132 }
133 
135 {
136  if (type == EMessageTypes::c_newRunMessage) {
137  m_input->clear();
138  } else if (type == EMessageTypes::c_lastEventMessage) {
140  resetTimer();
141  }
142 }
143 
145 {
146  auto message = m_input->handleIncomingData();
147 
148  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
149  resetTimer();
150 
151  if (message) {
152  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
153  m_terminate = true;
154  }
155 
156  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
157  }
158 }
159 
161 {
162  auto message = m_input->overwriteStopMessage();
163  if (message) {
164  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
165  }
166 
167  // We do not want to send out another stop message, so reset the counter
168  m_timeout = 0;
169  resetTimer();
170 }
171 
172 void ZMQFinalCollector::addOptions(po::options_description& desc)
173 {
175  desc.add_options()
176  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
177  "where to read the events from")
178  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
179  "where to send the events to")
180  ("addEventSize", boost::program_options::bool_switch(&m_addEventSize)->default_value(false),
181  "add the hlon of the event size at the beginning")
182  ("stopWaitingTime",
183  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
184  "how long to wait after no events come anymore");
185 }
186 
188 {
192 
193  m_monitorHasPriority = true;
194 }
195 
197 {
198  if (type == EMessageTypes::c_newRunMessage) {
199  m_input->clear();
200  return;
201  } else if (type == EMessageTypes::c_lastEventMessage) {
203  resetTimer();
204  }
205 }
206 
208 {
209  auto message = m_input->handleIncomingData();
210 
211  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
212  resetTimer();
213 
214  if (not message) {
215  return;
216  }
217 
218  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
219  m_terminate = true;
220  }
221 
222  if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
223  m_output->handleEvent(std::move(message->getDataMessage()));
224  }
225 }
226 
228 {
229  m_input->overwriteStopMessage();
230 
231  // We do not want to send out another stop message, so reset the counter
232  m_timeout = 0;
233  resetTimer();
234 }
235 
236 void ZMQFinalCollectorWithROI::addOptions(po::options_description& desc)
237 {
239  desc.add_options()
240  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
241  "where to read the events from")
242  ("output", boost::program_options::value<std::string>(&m_dataOutputAddress)->required(),
243  "where to send the events to")
244  ("roi", boost::program_options::value<std::string>(&m_roiOutputAddress)->required(),
245  "where to send the rois to")
246  ("addEventSize", boost::program_options::bool_switch(&m_addEventSize)->default_value(false),
247  "add the hlon of the event size at the beginning")
248  ("stopWaitingTime",
249  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
250  "how long to wait after no events come anymore");
251 }
252 
254 {
258 
259  m_monitorHasPriority = true;
260 }
261 
263 {
264  auto message = m_input->handleIncomingData();
265 
266  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
267  resetTimer();
268 
269  if (not message) {
270  return;
271  }
272 
273  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
274  m_terminate = true;
275  }
276 
277  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
278 }
279 
281 {
282  if (type == EMessageTypes::c_newRunMessage) {
283  m_input->clear();
284  return;
285  } else if (type == EMessageTypes::c_lastEventMessage) {
287  resetTimer();
288  }
289 }
290 
291 void ZMQFinalCollectorWithROI::fillMonitoringJSON(std::stringstream& buffer) const
292 {
293  buffer << "{" << std::endl;
294  buffer << "\"monitor\": " << m_monitor->getMonitoringJSON() << "," << std::endl;
295  buffer << "\"input\": " << m_input->getMonitoringJSON() << "," << std::endl;
296  buffer << "\"output\": " << m_output->getMonitoringJSON() << "," << std::endl;
297  buffer << "\"roi\": " << m_output->getROIMonitoringJSON() << std::endl;
298  buffer << "}" << std::endl;
299 }
300 
302 {
303  m_input->overwriteStopMessage();
304 
305  // We do not want to send out another stop message, so reset the counter
306  m_timeout = 0;
307  resetTimer();
308 }
Belle2::ZMQOutputAdapter::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:93
Belle2::ZMQCollector::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:37
Belle2::ZMQCollector::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:57
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQLoadBalancedOutput >::m_output
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:79
Belle2::ZMQOutputAdapter::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:85
Belle2::ZMQCollector::m_lax
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
Definition: ZMQCollector.h:61
Belle2::ZMQConfirmedOutput
Output part of a confirmed connection.
Definition: ZMQConfirmedConnection.h:116
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
Belle2::ZMQFinalCollectorWithROI::m_roiOutputAddress
std::string m_roiOutputAddress
Parameter: output address for ROIs (second part of message)
Definition: ZMQCollector.h:173
Belle2::ZMQFinalCollector::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:196
Belle2::ZMQFinalCollectorWithROI::m_stopWaitingTime
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:177
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQRawOutput >::m_monitorHasPriority
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
Definition: ZMQApp.h:87
Belle2::ZMQProxyCollector::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:134
Belle2::ZMQProxyCollector::handleTimeout
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQCollector.cc:160
Belle2::ZMQProxyCollector::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:127
Belle2::ZMQProxyCollector::m_stopWaitingTime
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:116
Belle2::ZMQOutputAdapter::handleInput
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Definition: ZMQCollector.cc:97
Belle2::ZMQConfirmedInput
Input part of a confirmed connection.
Definition: ZMQConfirmedConnection.h:65
Belle2::ZMQOutputAdapter::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:86
Belle2::ZMQMessageFactory::stripIdentity
static auto stripIdentity(std::unique_ptr< ZMQIdMessage > message)
Create a No-ID Message out of an ID message.
Definition: ZMQMessageFactory.h:127
Belle2::ZMQFinalCollector::m_stopWaitingTime
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:148
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQLoadBalancedOutput >::m_terminate
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:83
Belle2::ZMQCollector::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:59
Belle2::ZMQProxyCollector::handleInput
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Definition: ZMQCollector.cc:144
Belle2::ZMQCollector::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:30
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQLoadBalancedOutput >::m_parent
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:75
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQLoadBalancedOutput >::resetTimer
void resetTimer()
Helper function to reset the start time and the remaining time.
Definition: ZMQApp.details.h:193
Belle2::ZMQProxyCollector::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:114
Belle2::ZMQFinalCollector::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:172
Belle2::ZMQFinalCollectorWithROI::handleTimeout
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQCollector.cc:301
Belle2::ZMQFinalCollectorWithROI::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:236
Belle2::ZMQOutputAdapter::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:87
Belle2::ZMQStandardApp::addOptions
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
Definition: ZMQApp.details.h:134
Belle2::ZMQFinalCollector::m_addEventSize
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
Definition: ZMQCollector.h:146
Belle2::ZMQCollector::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:15
Belle2::ZMQFinalCollector::handleInput
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Definition: ZMQCollector.cc:207
Belle2::ZMQFinalCollector::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:142
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQDataAndROIOutput >::m_monitor
std::unique_ptr< ZMQSimpleConnection > m_monitor
Pointer to the monitoring connection. Should be set in initialize.
Definition: ZMQApp.h:81
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQProxyCollector::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:114
Belle2::ZMQFinalCollector::handleTimeout
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQCollector.cc:227
Belle2::ZMQFinalCollectorWithROI::fillMonitoringJSON
void fillMonitoringJSON(std::stringstream &buffer) const final
Special handling of the JSON function with additonal ROI.
Definition: ZMQCollector.cc:291
Belle2::ZMQCollector::handleTimeout
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQCollector.cc:64
Belle2::ZMQProxyCollector::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:112
Belle2::ZMQFinalCollectorWithROI::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:253
Belle2::ZMQCollector::m_stopWaitingTime
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:63
Belle2::ZMQFinalCollector::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:187
Belle2::ZMQFinalCollectorWithROI::m_dataOutputAddress
std::string m_dataOutputAddress
Parameter: output address for data (first part of message)
Definition: ZMQCollector.h:171
Belle2::ZMQFinalCollectorWithROI::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:280
Belle2::ZMQLoadBalancedInput
Input part of a load-balanced connection.
Definition: ZMQLoadBalancedConnection.h:50
Belle2::ZMQFinalCollectorWithROI::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:169
Belle2::ZMQFinalCollector::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:144
Belle2::ZMQFinalCollectorWithROI::handleInput
void handleInput() final
Pass the message from the input connection to the output connection (data message to first,...
Definition: ZMQCollector.cc:262
Belle2::ZMQLoadBalancedOutput
Output part of a load-balanced connection.
Definition: ZMQLoadBalancedConnection.h:81
Belle2::ZMQDataAndROIOutput
Helper connection hosting both a normal raw and a ROI output and sending to both at the same time.
Definition: ZMQROIConnection.h:41
Belle2::ZMQRawOutput
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
Definition: ZMQRawConnection.h:97
Belle2::ZMQOutputAdapter::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:76
Belle2::ZMQFinalCollectorWithROI::m_addEventSize
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
Definition: ZMQCollector.h:175
Belle2::ZMQStandardApp::initialize
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
Definition: ZMQApp.details.h:127
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQLoadBalancedOutput >::m_timeout
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:85
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQLoadBalancedOutput >::m_input
std::unique_ptr< ZMQConfirmedInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:77
Belle2::ZMQCollector::handleInput
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Definition: ZMQCollector.cc:48