Belle II Software  release-08-01-10
ZMQCollector.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/apps/ZMQCollector.h>
9 #include <framework/logging/Logger.h>
10 
11 using namespace Belle2;
12 
13 void ZMQCollector::addOptions(po::options_description& desc)
14 {
16  desc.add_options()
17  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
18  "where to read the events from")
19  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
20  "where to send the events to")
21  ("lax", boost::program_options::bool_switch(&m_lax)->default_value(m_lax),
22  "dismiss events if no worker is ready (lax) or not")
23  ("stopWaitingTime",
24  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
25  "how long to wait after no events come anymore");
26 }
27 
29 {
33 }
34 
36 {
37  if (type == EMessageTypes::c_newRunMessage) {
38  m_input->clear();
39  m_output->clear();
40  } else if (type == EMessageTypes::c_lastEventMessage) {
42  resetTimer();
43  }
44 }
45 
47 {
48  auto message = m_input->handleIncomingData();
49 
50  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
51  resetTimer();
52 
53  if (message) {
54  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
55  m_terminate = true;
56  }
57 
58  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
59  }
60 }
61 
63 {
64  auto message = m_input->overwriteStopMessage();
65  if (message) {
66  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
67  }
68 
69  // We do not want to send out another stop message, so reset the counter
70  m_timeout = 0;
71  resetTimer();
72 }
73 
74 void ZMQOutputAdapter::addOptions(po::options_description& desc)
75 {
77  desc.add_options()
78  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
79  "where to read the events from")
80  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
81  "where to send the events to");
82 }
83 
85 {
88  m_output.reset(new ZMQRawOutput(m_outputAddress, true, m_parent));
89 }
90 
92 {
93 }
94 
96 {
97  auto message = m_input->handleIncomingData();
98  if (not message) {
99  return;
100  }
101 
102  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
103  m_terminate = true;
104  }
105 
106  if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
107  B2DEBUG(30, message->getDataMessage().size());
108  m_output->handleEvent(std::move(message->getDataMessage()));
109  }
110 }
111 
112 void ZMQProxyCollector::addOptions(po::options_description& desc)
113 {
115  desc.add_options()
116  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
117  "where to read the events from")
118  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
119  "where to send the events to")
120  ("stopWaitingTime",
121  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
122  "how long to wait after no events come anymore");
123 }
124 
126 {
130 }
131 
133 {
134  if (type == EMessageTypes::c_newRunMessage) {
135  m_input->clear();
136  } else if (type == EMessageTypes::c_lastEventMessage) {
138  resetTimer();
139  }
140 }
141 
143 {
144  auto message = m_input->handleIncomingData();
145 
146  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
147  resetTimer();
148 
149  if (message) {
150  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
151  m_terminate = true;
152  }
153 
154  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
155  }
156 }
157 
159 {
160  auto message = m_input->overwriteStopMessage();
161  if (message) {
162  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
163  }
164 
165  // We do not want to send out another stop message, so reset the counter
166  m_timeout = 0;
167  resetTimer();
168 }
169 
170 void ZMQFinalCollector::addOptions(po::options_description& desc)
171 {
173  desc.add_options()
174  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
175  "where to read the events from")
176  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
177  "where to send the events to")
178  ("addEventSize", boost::program_options::bool_switch(&m_addEventSize)->default_value(false),
179  "add the hlon of the event size at the beginning")
180  ("stopWaitingTime",
181  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
182  "how long to wait after no events come anymore");
183 }
184 
186 {
190 
191  m_monitorHasPriority = true;
192 }
193 
195 {
196  if (type == EMessageTypes::c_newRunMessage) {
197  m_input->clear();
198  return;
199  } else if (type == EMessageTypes::c_lastEventMessage) {
201  resetTimer();
202  }
203 }
204 
206 {
207  auto message = m_input->handleIncomingData();
208 
209  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
210  resetTimer();
211 
212  if (not message) {
213  return;
214  }
215 
216  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
217  m_terminate = true;
218  }
219 
220  if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
221  m_output->handleEvent(std::move(message->getDataMessage()));
222  }
223 }
224 
226 {
227  m_input->overwriteStopMessage();
228 
229  // We do not want to send out another stop message, so reset the counter
230  m_timeout = 0;
231  resetTimer();
232 }
233 
234 void ZMQFinalCollectorWithROI::addOptions(po::options_description& desc)
235 {
237  desc.add_options()
238  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
239  "where to read the events from")
240  ("output", boost::program_options::value<std::string>(&m_dataOutputAddress)->required(),
241  "where to send the events to")
242  ("roi", boost::program_options::value<std::string>(&m_roiOutputAddress)->required(),
243  "where to send the rois to")
244  ("addEventSize", boost::program_options::bool_switch(&m_addEventSize)->default_value(false),
245  "add the hlon of the event size at the beginning")
246  ("stopWaitingTime",
247  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
248  "how long to wait after no events come anymore");
249 }
250 
252 {
256 
257  m_monitorHasPriority = true;
258 }
259 
261 {
262  auto message = m_input->handleIncomingData();
263 
264  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
265  resetTimer();
266 
267  if (not message) {
268  return;
269  }
270 
271  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
272  m_terminate = true;
273  }
274 
275  m_output->handleEvent(ZMQMessageFactory::stripIdentity(std::move(message)));
276 }
277 
279 {
280  if (type == EMessageTypes::c_newRunMessage) {
281  m_input->clear();
282  return;
283  } else if (type == EMessageTypes::c_lastEventMessage) {
285  resetTimer();
286  }
287 }
288 
289 void ZMQFinalCollectorWithROI::fillMonitoringJSON(std::stringstream& buffer) const
290 {
291  buffer << "{" << std::endl;
292  buffer << "\"monitor\": " << m_monitor->getMonitoringJSON() << "," << std::endl;
293  buffer << "\"input\": " << m_input->getMonitoringJSON() << "," << std::endl;
294  buffer << "\"output\": " << m_output->getMonitoringJSON() << "," << std::endl;
295  buffer << "\"roi\": " << m_output->getROIMonitoringJSON() << std::endl;
296  buffer << "}" << std::endl;
297 }
298 
300 {
301  m_input->overwriteStopMessage();
302 
303  // We do not want to send out another stop message, so reset the counter
304  m_timeout = 0;
305  resetTimer();
306 }
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:47
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:28
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:13
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:49
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
Definition: ZMQCollector.h:51
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:35
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQCollector.cc:62
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:53
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Definition: ZMQCollector.cc:46
Input part of a confirmed connection.
Output part of a confirmed connection.
Helper connection hosting both a normal raw and a ROI output and sending to both at the same time.
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:159
void fillMonitoringJSON(std::stringstream &buffer) const final
Special handling of the JSON function with additonal ROI.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
Definition: ZMQCollector.h:165
std::string m_dataOutputAddress
Parameter: output address for data (first part of message)
Definition: ZMQCollector.h:161
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_roiOutputAddress
Parameter: output address for ROIs (second part of message)
Definition: ZMQCollector.h:163
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:167
void handleInput() final
Pass the message from the input connection to the output connection (data message to first,...
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:132
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
Definition: ZMQCollector.h:136
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:134
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:138
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Input part of a load-balanced connection.
Output part of a load-balanced connection.
static auto stripIdentity(std::unique_ptr< ZMQIdMessage > message)
Create a No-ID Message out of an ID message.
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:75
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:84
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:74
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:77
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:91
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Definition: ZMQCollector.cc:95
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:102
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:104
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:106
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
Definition: ZMQApp.h:77
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:65
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:69
std::unique_ptr< ZMQSimpleConnection > m_monitor
Pointer to the monitoring connection. Should be set in initialize.
Definition: ZMQApp.h:71
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:73
std::unique_ptr< ZMQConfirmedInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:67
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:75
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
void resetTimer()
Helper function to reset the start time and the remaining time.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.