Belle II Software development
ZMQCollector.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/apps/ZMQCollector.h>
9#include <framework/logging/Logger.h>
10
11using namespace Belle2;
12
13void ZMQCollector::addOptions(po::options_description& desc)
14{
16 desc.add_options()
17 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
18 "where to read the events from")
19 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
20 "where to send the events to")
21 ("lax", boost::program_options::bool_switch(&m_lax)->default_value(m_lax),
22 "dismiss events if no worker is ready (lax) or not")
23 ("stopWaitingTime",
24 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
25 "how long to wait after no events come anymore");
26}
27
34
36{
37 if (type == EMessageTypes::c_newRunMessage) {
38 m_input->clear();
39 m_output->clear();
40 } else if (type == EMessageTypes::c_lastEventMessage) {
42 resetTimer();
43 }
44}
45
47{
48 auto message = m_input->handleIncomingData();
49
50 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
51 resetTimer();
52
53 if (message) {
54 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
55 m_terminate = true;
56 }
57
58 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
59 }
60}
61
63{
64 auto message = m_input->overwriteStopMessage();
65 if (message) {
66 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
67 }
68
69 // We do not want to send out another stop message, so reset the counter
70 m_timeout = 0;
71 resetTimer();
72}
73
74void ZMQOutputAdapter::addOptions(po::options_description& desc)
75{
77 desc.add_options()
78 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
79 "where to read the events from")
80 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
81 "where to send the events to");
82}
83
90
94
96{
97 auto message = m_input->handleIncomingData();
98 if (not message) {
99 return;
100 }
101
102 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
103 m_terminate = true;
104 }
105
106 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
107 B2DEBUG(30, message->getDataMessage().size());
108 m_output->handleEvent(std::move(message->getDataMessage()));
109 }
110}
111
112void ZMQProxyCollector::addOptions(po::options_description& desc)
113{
115 desc.add_options()
116 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
117 "where to read the events from")
118 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
119 "where to send the events to")
120 ("stopWaitingTime",
121 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
122 "how long to wait after no events come anymore");
123}
124
131
133{
134 if (type == EMessageTypes::c_newRunMessage) {
135 m_input->clear();
136 } else if (type == EMessageTypes::c_lastEventMessage) {
138 resetTimer();
139 }
140}
141
143{
144 auto message = m_input->handleIncomingData();
145
146 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
147 resetTimer();
148
149 if (message) {
150 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
151 m_terminate = true;
152 }
153
154 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
155 }
156}
157
159{
160 auto message = m_input->overwriteStopMessage();
161 if (message) {
162 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
163 }
164
165 // We do not want to send out another stop message, so reset the counter
166 m_timeout = 0;
167 resetTimer();
168}
169
170void ZMQFinalCollector::addOptions(po::options_description& desc)
171{
173 desc.add_options()
174 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
175 "where to read the events from")
176 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
177 "where to send the events to")
178 ("addEventSize", boost::program_options::bool_switch(&m_addEventSize)->default_value(false),
179 "add the hlon of the event size at the beginning")
180 ("stopWaitingTime",
181 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
182 "how long to wait after no events come anymore");
183}
184
193
195{
196 if (type == EMessageTypes::c_newRunMessage) {
197 m_input->clear();
198 return;
199 } else if (type == EMessageTypes::c_lastEventMessage) {
201 resetTimer();
202 }
203}
204
206{
207 auto message = m_input->handleIncomingData();
208
209 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
210 resetTimer();
211
212 if (not message) {
213 return;
214 }
215
216 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
217 m_terminate = true;
218 }
219
220 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
221 m_output->handleEvent(std::move(message->getDataMessage()));
222 }
223}
224
226{
227 m_input->overwriteStopMessage();
228
229 // We do not want to send out another stop message, so reset the counter
230 m_timeout = 0;
231 resetTimer();
232}
233
234void ZMQFinalCollectorWithROI::addOptions(po::options_description& desc)
235{
237 desc.add_options()
238 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
239 "where to read the events from")
240 ("output", boost::program_options::value<std::string>(&m_dataOutputAddress)->required(),
241 "where to send the events to")
242 ("roi", boost::program_options::value<std::string>(&m_roiOutputAddress)->required(),
243 "where to send the rois to")
244 ("addEventSize", boost::program_options::bool_switch(&m_addEventSize)->default_value(false),
245 "add the hlon of the event size at the beginning")
246 ("stopWaitingTime",
247 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
248 "how long to wait after no events come anymore");
249}
250
259
261{
262 auto message = m_input->handleIncomingData();
263
264 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
265 resetTimer();
266
267 if (not message) {
268 return;
269 }
270
271 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
272 m_terminate = true;
273 }
274
275 m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
276}
277
279{
280 if (type == EMessageTypes::c_newRunMessage) {
281 m_input->clear();
282 return;
283 } else if (type == EMessageTypes::c_lastEventMessage) {
285 resetTimer();
286 }
287}
288
289void ZMQFinalCollectorWithROI::fillMonitoringJSON(std::stringstream& buffer) const
290{
291 buffer << "{" << std::endl;
292 buffer << "\"monitor\": " << m_monitor->getMonitoringJSON() << "," << std::endl;
293 buffer << "\"input\": " << m_input->getMonitoringJSON() << "," << std::endl;
294 buffer << "\"output\": " << m_output->getMonitoringJSON() << "," << std::endl;
295 buffer << "\"roi\": " << m_output->getROIMonitoringJSON() << std::endl;
296 buffer << "}" << std::endl;
297}
298
300{
301 m_input->overwriteStopMessage();
302
303 // We do not want to send out another stop message, so reset the counter
304 m_timeout = 0;
305 resetTimer();
306}
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Input part of a confirmed connection.
Output part of a confirmed connection.
Helper connection hosting both a normal raw and a ROI output and sending to both at the same time.
std::string m_inputAddress
Parameter: input address.
void fillMonitoringJSON(std::stringstream &buffer) const final
Special handling of the JSON function with additonal ROI.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
std::string m_dataOutputAddress
Parameter: output address for data (first part of message)
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_roiOutputAddress
Parameter: output address for ROIs (second part of message)
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (data message to first,...
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Input part of a load-balanced connection.
Output part of a load-balanced connection.
static auto stripIdentity(std::unique_ptr< ZMQIdMessage > message)
Create a No-ID Message out of an ID message.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Definition ZMQApp.h:69
std::unique_ptr< ZMQSimpleConnection > m_monitor
Definition ZMQApp.h:71
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.