Belle II Software development
ZMQDistributor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/apps/ZMQDistributor.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10
11using namespace Belle2;
12
13void ZMQDistributor::addOptions(po::options_description& desc)
14{
16 desc.add_options()
17 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
18 "where to read the events from")
19 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
20 "where to send the events to")
21 ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
22 "express reco mode: send out event messages (instead of raw messages)")
23 ("lax", boost::program_options::bool_switch(&m_lax)->default_value(m_lax),
24 "lax mode: dismiss events if no worker is ready")
25 ("maximalBufferSize",
26 boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
27 "size of the input buffer")
28 ("stopWaitingTime",
29 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
30 "how long to wait after no events come anymore");
31}
32
34{
38}
39
41{
42 if (type == EMessageTypes::c_newRunMessage) {
43 m_input->clear();
44 m_output->clear();
45 } else if (type == EMessageTypes::c_lastEventMessage) {
47 resetTimer();
48 } else if (type == EMessageTypes::c_terminateMessage) {
49 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
50 m_input->clear();
51 m_terminate = true;
52 }
53}
54
56{
57 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
58
59 // We do not want to send out another stop message, so reset the counter
60 m_timeout = 0;
61 resetTimer();
62}
63
65{
66 auto messages = m_input->handleIncomingData();
67
68 for (auto&& message : messages) {
69 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
70 resetTimer();
71
72 EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
74 messageType = EMessageTypes::c_eventMessage;
75 }
76
77 auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
78
79 // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
80 // be safe and poll the output if it is not ready so far
81 while (not m_output->isReady() and not terminated()) {
82 pollEvent(false);
83 }
84 if (terminated()) {
85 return;
86 }
87 m_output->handleEvent(std::move(outputMessage));
88 }
89}
90
91void ZMQInputAdapter::addOptions(po::options_description& desc)
92{
94 desc.add_options()
95 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
96 "where to read the events from")
97 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
98 "where to send the events to")
99 ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
100 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
101 ("maximalBufferSize",
102 boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
103 "size of the input buffer")
104 ("stopWaitingTime",
105 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
106 "how long to wait after no events come anymore");
107}
108
110{
114
116}
117
119{
120 if (type == EMessageTypes::c_newRunMessage) {
121 m_input->clear();
122 } else if (type == EMessageTypes::c_lastEventMessage) {
124 resetTimer();
125 } else if (type == EMessageTypes::c_terminateMessage) {
126 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
127 m_input->clear();
128 m_terminate = true;
129 }
130}
131
133{
134 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
135
136 // We do not want to send out another stop message, so reset the counter
137 m_timeout = 0;
138 resetTimer();
139}
140
142{
143 auto messages = m_input->handleIncomingData();
144
145 for (auto&& message : messages) {
146 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
147 resetTimer();
148
149 EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
150 if (m_expressRecoMode) {
151 messageType = EMessageTypes::c_eventMessage;
152 }
153
154 auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
155
156 // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
157 // be safe and poll the output if it is not ready so far
158 while (not m_output->isReady() and not terminated()) {
159 pollEvent(false);
160 }
161 if (terminated()) {
162 return;
163 }
164 m_output->handleEvent(std::move(outputMessage));
165 }
166}
Output part of a confirmed connection.
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: send out event messages instead of raw messages.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Choose how the input events are formatted.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Output part of a load-balanced connection.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
Definition: ZMQApp.h:77
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:65
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:69
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:73
std::unique_ptr< ZMQRawInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:67
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:75
void pollEvent(bool pollOnInput)
Poll until a single event is retreived.
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
bool terminated() const
Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
void resetTimer()
Helper function to reset the start time and the remaining time.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.