Belle II Software development
ZMQDistributor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/apps/ZMQDistributor.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10
11using namespace Belle2;
12
13void ZMQDistributor::addOptions(po::options_description& desc)
14{
16 desc.add_options()
17 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
18 "where to read the events from")
19 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
20 "where to send the events to")
21 ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
22 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
23 ("maximalBufferSize",
24 boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
25 "size of the input buffer")
26 ("stopWaitingTime",
27 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
28 "how long to wait after no events come anymore");
29}
30
32{
36}
37
39{
40 if (type == EMessageTypes::c_newRunMessage) {
41 m_input->clear();
42 m_output->clear();
43 } else if (type == EMessageTypes::c_lastEventMessage) {
45 resetTimer();
46 } else if (type == EMessageTypes::c_terminateMessage) {
47 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
48 m_input->clear();
49 m_terminate = true;
50 }
51}
52
54{
55 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
56
57 // We do not want to send out another stop message, so reset the counter
58 m_timeout = 0;
59 resetTimer();
60}
61
63{
64 auto messages = m_input->handleIncomingData();
65
66 for (auto&& message : messages) {
67 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
68 resetTimer();
69
70 EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
72 messageType = EMessageTypes::c_eventMessage;
73 }
74
75 auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
76
77 // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
78 // be safe and poll the output if it is not ready so far
79 while (not m_output->isReady() and not terminated()) {
80 pollEvent(false);
81 }
82 if (terminated()) {
83 return;
84 }
85 m_output->handleEvent(std::move(outputMessage));
86 }
87}
88
89void ZMQInputAdapter::addOptions(po::options_description& desc)
90{
92 desc.add_options()
93 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
94 "where to read the events from")
95 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
96 "where to send the events to")
97 ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
98 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
99 ("maximalBufferSize",
100 boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
101 "size of the input buffer")
102 ("stopWaitingTime",
103 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
104 "how long to wait after no events come anymore");
105}
106
108{
112
114}
115
117{
118 if (type == EMessageTypes::c_newRunMessage) {
119 m_input->clear();
120 } else if (type == EMessageTypes::c_lastEventMessage) {
122 resetTimer();
123 } else if (type == EMessageTypes::c_terminateMessage) {
124 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
125 m_input->clear();
126 m_terminate = true;
127 }
128}
129
131{
132 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
133
134 // We do not want to send out another stop message, so reset the counter
135 m_timeout = 0;
136 resetTimer();
137}
138
140{
141 auto messages = m_input->handleIncomingData();
142
143 for (auto&& message : messages) {
144 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
145 resetTimer();
146
147 EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
148 if (m_expressRecoMode) {
149 messageType = EMessageTypes::c_eventMessage;
150 }
151
152 auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
153
154 // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
155 // be safe and poll the output if it is not ready so far
156 while (not m_output->isReady() and not terminated()) {
157 pollEvent(false);
158 }
159 if (terminated()) {
160 return;
161 }
162 m_output->handleEvent(std::move(outputMessage));
163 }
164}
Output part of a confirmed connection.
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Choose how the input events are formatted.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Output part of a load-balanced connection.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
Definition: ZMQApp.h:77
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:65
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:69
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:73
std::unique_ptr< ZMQRawInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:67
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:75
void pollEvent(bool pollOnInput)
Poll until a single event is retreived.
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
bool terminated() const
Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
void resetTimer()
Helper function to reset the start time and the remaining time.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.