Belle II Software development
ZMQCollector.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/apps/ZMQCollector.h>
9#include <framework/logging/Logger.h>
10
11using namespace Belle2;
12
13void ZMQCollector::addOptions(po::options_description& desc)
14{
16 desc.add_options()
17 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
18 "where to read the events from")
19 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
20 "where to send the events to")
21 ("lax", boost::program_options::bool_switch(&m_lax)->default_value(m_lax),
22 "dismiss events if no worker is ready (lax) or not")
23 ("stopWaitingTime",
24 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
25 "how long to wait after no events come anymore");
26}
27
29{
33}
34
36{
37 if (type == EMessageTypes::c_newRunMessage) {
38 m_input->clear();
39 m_output->clear();
40 } else if (type == EMessageTypes::c_lastEventMessage) {
42 resetTimer();
43 }
44}
45
47{
48 auto message = m_input->handleIncomingData();
49
50 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
51 resetTimer();
52
53 if (message) {
54 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
55 m_terminate = true;
56 }
57
58 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
59 }
60}
61
63{
64 auto message = m_input->overwriteStopMessage();
65 if (message) {
66 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
67 }
68
69 // We do not want to send out another stop message, so reset the counter
70 m_timeout = 0;
71 resetTimer();
72}
73
74void ZMQOutputAdapter::addOptions(po::options_description& desc)
75{
77 desc.add_options()
78 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
79 "where to read the events from")
80 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
81 "where to send the events to");
82}
83
85{
89}
90
92{
93}
94
96{
97 auto message = m_input->handleIncomingData();
98 if (not message) {
99 return;
100 }
101
102 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
103 m_terminate = true;
104 }
105
106 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
107 B2DEBUG(30, message->getDataMessage().size());
108 m_output->handleEvent(std::move(message->getDataMessage()));
109 }
110}
111
112void ZMQProxyCollector::addOptions(po::options_description& desc)
113{
115 desc.add_options()
116 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
117 "where to read the events from")
118 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
119 "where to send the events to")
120 ("stopWaitingTime",
121 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
122 "how long to wait after no events come anymore");
123}
124
126{
130}
131
133{
134 if (type == EMessageTypes::c_newRunMessage) {
135 m_input->clear();
136 } else if (type == EMessageTypes::c_lastEventMessage) {
138 resetTimer();
139 }
140}
141
143{
144 auto message = m_input->handleIncomingData();
145
146 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
147 resetTimer();
148
149 if (message) {
150 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
151 m_terminate = true;
152 }
153
154 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
155 }
156}
157
159{
160 auto message = m_input->overwriteStopMessage();
161 if (message) {
162 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
163 }
164
165 // We do not want to send out another stop message, so reset the counter
166 m_timeout = 0;
167 resetTimer();
168}
169
170void ZMQFinalCollector::addOptions(po::options_description& desc)
171{
173 desc.add_options()
174 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
175 "where to read the events from")
176 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
177 "where to send the events to")
178 ("addEventSize", boost::program_options::bool_switch(&m_addEventSize)->default_value(false),
179 "add the hlon of the event size at the beginning")
180 ("stopWaitingTime",
181 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
182 "how long to wait after no events come anymore");
183}
184
186{
190
192}
193
195{
196 if (type == EMessageTypes::c_newRunMessage) {
197 m_input->clear();
198 return;
199 } else if (type == EMessageTypes::c_lastEventMessage) {
201 resetTimer();
202 }
203}
204
206{
207 auto message = m_input->handleIncomingData();
208
209 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
210 resetTimer();
211
212 if (not message) {
213 return;
214 }
215
216 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
217 m_terminate = true;
218 }
219
220 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
221 m_output->handleEvent(std::move(message->getDataMessage()));
222 }
223}
224
226{
227 m_input->overwriteStopMessage();
228
229 // We do not want to send out another stop message, so reset the counter
230 m_timeout = 0;
231 resetTimer();
232}
233
234void ZMQFinalCollectorWithROI::addOptions(po::options_description& desc)
235{
237 desc.add_options()
238 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
239 "where to read the events from")
240 ("output", boost::program_options::value<std::string>(&m_dataOutputAddress)->required(),
241 "where to send the events to")
242 ("roi", boost::program_options::value<std::string>(&m_roiOutputAddress)->required(),
243 "where to send the rois to")
244 ("addEventSize", boost::program_options::bool_switch(&m_addEventSize)->default_value(false),
245 "add the hlon of the event size at the beginning")
246 ("stopWaitingTime",
247 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
248 "how long to wait after no events come anymore");
249}
250
252{
256
258}
259
261{
262 auto message = m_input->handleIncomingData();
263
264 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
265 resetTimer();
266
267 if (not message) {
268 return;
269 }
270
271 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
272 m_terminate = true;
273 }
274
275 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
276}
277
279{
280 if (type == EMessageTypes::c_newRunMessage) {
281 m_input->clear();
282 return;
283 } else if (type == EMessageTypes::c_lastEventMessage) {
285 resetTimer();
286 }
287}
288
289void ZMQFinalCollectorWithROI::fillMonitoringJSON(std::stringstream& buffer) const
290{
291 buffer << "{" << std::endl;
292 buffer << "\"monitor\": " << m_monitor->getMonitoringJSON() << "," << std::endl;
293 buffer << "\"input\": " << m_input->getMonitoringJSON() << "," << std::endl;
294 buffer << "\"output\": " << m_output->getMonitoringJSON() << "," << std::endl;
295 buffer << "\"roi\": " << m_output->getROIMonitoringJSON() << std::endl;
296 buffer << "}" << std::endl;
297}
298
300{
301 m_input->overwriteStopMessage();
302
303 // We do not want to send out another stop message, so reset the counter
304 m_timeout = 0;
305 resetTimer();
306}
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:47
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:28
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:13
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:49
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
Definition: ZMQCollector.h:51
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:35
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQCollector.cc:62
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:53
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Definition: ZMQCollector.cc:46
Input part of a confirmed connection.
Output part of a confirmed connection.
Helper connection hosting both a normal raw and a ROI output and sending to both at the same time.
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:159
void fillMonitoringJSON(std::stringstream &buffer) const final
Special handling of the JSON function with additonal ROI.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
Definition: ZMQCollector.h:165
std::string m_dataOutputAddress
Parameter: output address for data (first part of message)
Definition: ZMQCollector.h:161
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_roiOutputAddress
Parameter: output address for ROIs (second part of message)
Definition: ZMQCollector.h:163
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:167
void handleInput() final
Pass the message from the input connection to the output connection (data message to first,...
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:132
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
Definition: ZMQCollector.h:136
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:134
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:138
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Input part of a load-balanced connection.
Output part of a load-balanced connection.
static auto stripIdentity(std::unique_ptr< ZMQIdMessage > message)
Create a No-ID Message out of an ID message.
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:75
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:84
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:74
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:77
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:91
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Definition: ZMQCollector.cc:95
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:102
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:104
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:106
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
Definition: ZMQApp.h:77
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:65
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:69
std::unique_ptr< ZMQSimpleConnection > m_monitor
Pointer to the monitoring connection. Should be set in initialize.
Definition: ZMQApp.h:71
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:73
std::unique_ptr< ZMQConfirmedInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:67
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:75
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
void resetTimer()
Helper function to reset the start time and the remaining time.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.