Belle II Software development
ZMQLoadBalancedConnection.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
9
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11#include <framework/logging/Logger.h>
12
13using namespace Belle2;
14
15ZMQLoadBalancedInput::ZMQLoadBalancedInput(const std::string& inputAddress, unsigned int bufferSize,
16 const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(parent)
17{
18 // We clear all our internal state and counters
19 log("sent_ready", 0l);
20 log("data_size", 0.0);
21 log("received_events", 0l);
22 log("event_rate", 0.0);
23
24 // Create a non-binding DEALER socket
25 m_socket = m_parent->createSocket<ZMQ_DEALER>(inputAddress, false);
26
27 // Send as many ready message as our buffer size is. This means we will get that many events, which will then be "in flight"
28 for (unsigned int i = 0; i < bufferSize; i++) {
29 auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
30 ZMQParent::send(m_socket, std::move(readyMessage));
31 increment("sent_ready");
32 }
33}
34
35std::unique_ptr<ZMQNoIdMessage> ZMQLoadBalancedInput::handleIncomingData()
36{
37 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(m_socket);
38
39 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
40 // if it is an event message, return a ready message back. If not, no need for that.
41 auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
42 ZMQParent::send(m_socket, std::move(readyMessage));
43 increment("sent_ready");
44
45 // and also do some logging
46 const auto dataSize = message->getDataMessage().size();
47
48 average("data_size", dataSize);
49 increment("received_events");
50 timeit("event_rate");
51 }
52
53 return message;
54}
55
56ZMQLoadBalancedOutput::ZMQLoadBalancedOutput(const std::string& outputAddress, bool lax,
57 const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(
58 parent), m_lax(lax)
59{
60 // We clear all our internal state and counters
61 log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
62 log("registered_workers", static_cast<long>(m_allWorkers.size()));
63
64 log("data_size", 0.0);
65 log("dismissed_events", 0l);
66 log("event_rate", 0.0);
67 log("sent_events", 0l);
68
69 log("all_stop_messages", 0l);
70 log("sent_stop_messages", 0l);
71 log("last_stop_sent", "");
72
73 log("all_terminate_messages", 0l);
74 log("sent_terminate_messages", 0l);
75 log("last_terminate_sent", "");
76
77 // Create a binding ROUTER socket
78 m_socket = m_parent->createSocket<ZMQ_ROUTER>(outputAddress, true);
79}
80
81void ZMQLoadBalancedOutput::handleEvent(std::unique_ptr<ZMQNoIdMessage> message)
82{
83 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
84 // Tell all workers to stop this run
85 for (auto worker : m_allWorkers) {
86 auto sendMessage = ZMQMessageFactory::createMessage(worker, EMessageTypes::c_lastEventMessage);
87 ZMQParent::send(m_socket, std::move(sendMessage));
88 }
89 m_sentStopMessages = true;
90 log("all_stop_messages", static_cast<long>(m_sentStopMessages));
91 increment("sent_stop_messages");
92 logTime("last_stop_sent");
93 return;
94 } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
95 // Tell all workers to terminate
96 for (auto worker : m_allWorkers) {
97 auto sendMessage = ZMQMessageFactory::createMessage(worker, EMessageTypes::c_terminateMessage);
98 ZMQParent::send(m_socket, std::move(sendMessage));
99 }
101 log("all_terminate_messages", static_cast<long>(m_sentTerminateMessages));
102 increment("sent_terminate_messages");
103 logTime("last_terminate_sent");
104 return;
105 }
106
107 if (m_lax and m_readyWorkers.empty()) {
108 // There is no one that can handle the event in the moment, dismiss it (if lax is true)
109 increment("dismissed_events");
110 return;
111 }
112 if (m_sentStopMessages) {
113 B2ERROR("Received events after stop! I will dismiss this event.");
114 increment("dismissed_events");
115 return;
116 }
117
118 const auto dataSize = message->getDataMessage().size();
119
120 B2ASSERT("Must be > 0", not m_readyWorkers.empty());
121 auto nextWorker = m_readyWorkers.front();
122 m_readyWorkers.pop_front();
123
124 average("data_size", dataSize);
125 average("data_size_to[" + nextWorker + "]", dataSize);
126
127 increment("sent_events");
128 increment("sent_events[" + nextWorker + "]");
129
130 timeit("event_rate");
131 timeit<200>("event_rate_to[" + nextWorker + "]");
132
133 m_socket->send(ZMQMessageHelper::createZMQMessage(nextWorker), zmq::send_flags::sndmore);
134 ZMQParent::send(m_socket, std::move(message));
135
136 log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
137 decrement("ready_messages[" + nextWorker + "]");
138}
139
141{
142 auto readyMessage = ZMQMessageFactory::fromSocket<ZMQIdMessage>(m_socket);
143 B2ASSERT("Should be a ready message", readyMessage->isMessage(EMessageTypes::c_readyMessage));
144
145 // Register it as another ready worker
146 const auto toIdentity = readyMessage->getIdentity();
147 m_readyWorkers.push_back(toIdentity);
148
149 if (m_allWorkers.emplace(toIdentity).second) {
150 // Aha, we did never see this worker so far, so add it to our list.
151 if (m_sentStopMessages) {
152 // If it turned up late (everyone else has already stopped), send a stop message directly
153 auto sendMessage = ZMQMessageFactory::createMessage(toIdentity, EMessageTypes::c_lastEventMessage);
154 ZMQParent::send(m_socket, std::move(sendMessage));
155 }
156
158 // If it turned up late (everyone else has already terminates), send a terminate message directly
159 auto sendMessage = ZMQMessageFactory::createMessage(toIdentity, EMessageTypes::c_terminateMessage);
160 ZMQParent::send(m_socket, std::move(sendMessage));
161 }
162 }
163
164 log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
165 log("registered_workers", static_cast<long>(m_allWorkers.size()));
166 increment("ready_messages[" + toIdentity + "]");
167}
168
170{
171 m_sentStopMessages = false;
173
174 log("all_stop_messages", static_cast<long>(m_sentStopMessages));
175 log("all_terminate_messages", static_cast<long>(m_sentTerminateMessages));
176}
177
179{
180 // if we are lax, we are always ready. If not, we need to have at least a single ready worker. This prevents the B2ASSERT to fail.
181 return m_lax or not m_readyWorkers.empty();
182}
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:75
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:77
ZMQLoadBalancedInput(const std::string &inputAddress, unsigned int bufferSize, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced input connecting to the given address. Send bufferSize ready messages.
std::unique_ptr< ZMQNoIdMessage > handleIncomingData()
Answer event messages with a ready message and pass on every received message.
std::set< std::string > m_allWorkers
All ever registered inputs.
bool m_sentTerminateMessages
Did we already sent a terminate message?
ZMQLoadBalancedOutput(const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced output and bind to the given address.
void handleIncomingData()
Block until a ready message from an input is received and add it to the ready queue.
bool m_lax
Parameter to enable lax mode.
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
bool isReady() const final
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
void clear()
Clear the counter for sent stop and terminate messages. Should be called on run start.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message)
Send the given message (without identity) to the next input in the ready list.
bool m_sentStopMessages
Did we already sent a stop message?
void decrement(const std::string &key)
Decrement the value with the given key (only numerical values). If not present, set to -1.
Definition: ZMQLogger.cc:37
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.