Belle II Software development
ZMQDistributor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/apps/ZMQDistributor.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10
11using namespace Belle2;
12
13void ZMQDistributor::addOptions(po::options_description& desc)
14{
16 desc.add_options()
17 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
18 "where to read the events from")
19 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
20 "where to send the events to")
21 ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
22 "express reco mode: send out event messages (instead of raw messages)")
23 ("lax", boost::program_options::bool_switch(&m_lax)->default_value(m_lax),
24 "lax mode: dismiss events if no worker is ready")
25 ("maximalBufferSize",
26 boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
27 "size of the input buffer")
28 ("stopWaitingTime",
29 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
30 "how long to wait after no events come anymore");
31}
32
39
41{
42 if (type == EMessageTypes::c_newRunMessage) {
43 m_input->clear();
44 m_output->clear();
45 } else if (type == EMessageTypes::c_lastEventMessage) {
47 resetTimer();
48 } else if (type == EMessageTypes::c_terminateMessage) {
49 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
50 m_input->clear();
51 m_terminate = true;
52 }
53}
54
56{
57 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
58
59 // We do not want to send out another stop message, so reset the counter
60 m_timeout = 0;
61 resetTimer();
62}
63
65{
66 auto messages = m_input->handleIncomingData();
67
68 for (auto&& message : messages) {
69 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
70 resetTimer();
71
72 EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
74 messageType = EMessageTypes::c_eventMessage;
75 }
76
77 auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
78
79 // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
80 // be safe and poll the output if it is not ready so far
81 while (not m_output->isReady() and not terminated()) {
82 pollEvent(false);
83 }
84 if (terminated()) {
85 return;
86 }
87 m_output->handleEvent(std::move(outputMessage));
88 }
89}
90
91void ZMQInputAdapter::addOptions(po::options_description& desc)
92{
94 desc.add_options()
95 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
96 "where to read the events from")
97 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
98 "where to send the events to")
99 ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
100 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
101 ("maximalBufferSize",
102 boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
103 "size of the input buffer")
104 ("stopWaitingTime",
105 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
106 "how long to wait after no events come anymore");
107}
108
117
119{
120 if (type == EMessageTypes::c_newRunMessage) {
121 m_input->clear();
122 } else if (type == EMessageTypes::c_lastEventMessage) {
124 resetTimer();
125 } else if (type == EMessageTypes::c_terminateMessage) {
126 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
127 m_input->clear();
128 m_terminate = true;
129 }
130}
131
133{
134 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
135
136 // We do not want to send out another stop message, so reset the counter
137 m_timeout = 0;
138 resetTimer();
139}
140
142{
143 auto messages = m_input->handleIncomingData();
144
145 for (auto&& message : messages) {
146 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
147 resetTimer();
148
149 EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
150 if (m_expressRecoMode) {
151 messageType = EMessageTypes::c_eventMessage;
152 }
153
154 auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
155
156 // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
157 // be safe and poll the output if it is not ready so far
158 while (not m_output->isReady() and not terminated()) {
159 pollEvent(false);
160 }
161 if (terminated()) {
162 return;
163 }
164 m_output->handleEvent(std::move(outputMessage));
165 }
166}
Output part of a confirmed connection.
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: send out event messages instead of raw messages.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Choose how the input events are formatted.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Output part of a load-balanced connection.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket.
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Definition ZMQApp.h:69
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.