Belle II Software  release-05-01-25
ZMQLoadBalancedConnection.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
11 
12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
13 #include <framework/logging/Logger.h>
14 
15 using namespace Belle2;
16 
17 ZMQLoadBalancedInput::ZMQLoadBalancedInput(const std::string& inputAddress, unsigned int bufferSize,
18  const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(parent)
19 {
20  // We clear all our internal state and counters
21  log("sent_ready", 0l);
22  log("data_size", 0.0);
23  log("received_events", 0l);
24  log("event_rate", 0.0);
25 
26  // Create a non-binding DEALER socket
27  m_socket = m_parent->createSocket<ZMQ_DEALER>(inputAddress, false);
28 
29  // Send as many ready message as our buffer size is. This means we will get that many events, which will then be "in flight"
30  for (unsigned int i = 0; i < bufferSize; i++) {
31  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
32  ZMQParent::send(m_socket, std::move(readyMessage));
33  increment("sent_ready");
34  }
35 }
36 
37 std::unique_ptr<ZMQNoIdMessage> ZMQLoadBalancedInput::handleIncomingData()
38 {
39  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(m_socket);
40 
41  if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
42  // if it is an event message, return a ready message back. If not, no need for that.
43  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
44  ZMQParent::send(m_socket, std::move(readyMessage));
45  increment("sent_ready");
46 
47  // and also do some logging
48  const auto dataSize = message->getDataMessage().size();
49 
50  average("data_size", dataSize);
51  increment("received_events");
52  timeit("event_rate");
53  }
54 
55  return message;
56 }
57 
58 ZMQLoadBalancedOutput::ZMQLoadBalancedOutput(const std::string& outputAddress, bool lax,
59  const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(
60  parent), m_lax(lax)
61 {
62  // We clear all our internal state and counters
63  log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
64  log("registered_workers", static_cast<long>(m_allWorkers.size()));
65 
66  log("data_size", 0.0);
67  log("dismissed_events", 0l);
68  log("event_rate", 0.0);
69  log("sent_events", 0l);
70 
71  log("all_stop_messages", 0l);
72  log("sent_stop_messages", 0l);
73  log("last_stop_sent", "");
74 
75  log("all_terminate_messages", 0l);
76  log("sent_terminate_messages", 0l);
77  log("last_terminate_sent", "");
78 
79  // Create a binding ROUTER socket
80  m_socket = m_parent->createSocket<ZMQ_ROUTER>(outputAddress, true);
81 }
82 
83 void ZMQLoadBalancedOutput::handleEvent(std::unique_ptr<ZMQNoIdMessage> message)
84 {
85  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
86  // Tell all workers to stop this run
87  for (auto worker : m_allWorkers) {
88  auto sendMessage = ZMQMessageFactory::createMessage(worker, EMessageTypes::c_lastEventMessage);
89  ZMQParent::send(m_socket, std::move(sendMessage));
90  }
91  m_sentStopMessages = true;
92  log("all_stop_messages", static_cast<long>(m_sentStopMessages));
93  increment("sent_stop_messages");
94  logTime("last_stop_sent");
95  return;
96  } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
97  // Tell all workers to terminate
98  for (auto worker : m_allWorkers) {
99  auto sendMessage = ZMQMessageFactory::createMessage(worker, EMessageTypes::c_terminateMessage);
100  ZMQParent::send(m_socket, std::move(sendMessage));
101  }
103  log("all_terminate_messages", static_cast<long>(m_sentTerminateMessages));
104  increment("sent_terminate_messages");
105  logTime("last_terminate_sent");
106  return;
107  }
108 
109  if (m_lax and m_readyWorkers.empty()) {
110  // There is no one that can handle the event in the moment, dismiss it (if lax is true)
111  increment("dismissed_events");
112  return;
113  }
114  if (m_sentStopMessages) {
115  B2ERROR("Received events after stop! I will dismiss this event.");
116  increment("dismissed_events");
117  return;
118  }
119 
120  const auto dataSize = message->getDataMessage().size();
121 
122  B2ASSERT("Must be > 0", not m_readyWorkers.empty());
123  auto nextWorker = m_readyWorkers.front();
124  m_readyWorkers.pop_front();
125 
126  average("data_size", dataSize);
127  average("data_size_to[" + nextWorker + "]", dataSize);
128 
129  increment("sent_events");
130  increment("sent_events[" + nextWorker + "]");
131 
132  timeit("event_rate");
133  timeit<200>("event_rate_to[" + nextWorker + "]");
134 
135  m_socket->send(ZMQMessageHelper::createZMQMessage(nextWorker), ZMQ_SNDMORE);
136  ZMQParent::send(m_socket, std::move(message));
137 
138  log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
139  decrement("ready_messages[" + nextWorker + "]");
140 }
141 
143 {
144  auto readyMessage = ZMQMessageFactory::fromSocket<ZMQIdMessage>(m_socket);
145  B2ASSERT("Should be a ready message", readyMessage->isMessage(EMessageTypes::c_readyMessage));
146 
147  // Register it as another ready worker
148  const auto toIdentity = readyMessage->getIdentity();
149  m_readyWorkers.push_back(toIdentity);
150 
151  if (m_allWorkers.emplace(toIdentity).second) {
152  // Aha, we did never see this worker so far, so add it to our list.
153  if (m_sentStopMessages) {
154  // If it turned up late (everyone else has already stopped), send a stop message directly
155  auto sendMessage = ZMQMessageFactory::createMessage(toIdentity, EMessageTypes::c_lastEventMessage);
156  ZMQParent::send(m_socket, std::move(sendMessage));
157  }
158 
160  // If it turned up late (everyone else has already terminates), send a terminate message directly
161  auto sendMessage = ZMQMessageFactory::createMessage(toIdentity, EMessageTypes::c_terminateMessage);
162  ZMQParent::send(m_socket, std::move(sendMessage));
163  }
164  }
165 
166  log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
167  log("registered_workers", static_cast<long>(m_allWorkers.size()));
168  increment("ready_messages[" + toIdentity + "]");
169 }
170 
172 {
173  m_sentStopMessages = false;
174  m_sentTerminateMessages = false;
175 
176  log("all_stop_messages", static_cast<long>(m_sentStopMessages));
177  log("all_terminate_messages", static_cast<long>(m_sentTerminateMessages));
178 }
179 
181 {
182  // if we are lax, we are always ready. If not, we need to have at least a single ready worker. This prevents the B2ASSERT to fail.
183  return m_lax or not m_readyWorkers.empty();
184 }
Belle2::ZMQLoadBalancedOutput::m_sentStopMessages
bool m_sentStopMessages
Did we already sent a stop message?
Definition: ZMQLoadBalancedConnection.h:113
Belle2::ZMQLoadBalancedOutput::handleEvent
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message)
Send the given message (without identity) to the next input in the ready list.
Definition: ZMQLoadBalancedConnection.cc:83
Belle2::ZMQLogger::timeit
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:127
Belle2::ZMQParent::send
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:163
Belle2::ZMQLogger::logTime
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:44
Belle2::ZMQMessageHelper::createZMQMessage
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
Definition: ZMQMessageHelper.h:39
Belle2::ZMQConnectionOverSocket::m_parent
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:82
Belle2::ZMQLoadBalancedOutput::isReady
bool isReady() const final
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
Definition: ZMQLoadBalancedConnection.cc:180
Belle2::ZMQLogger::decrement
void decrement(const std::string &key)
Decrement the value with the given key (only numerical values). If not present, set to -1.
Definition: ZMQLogger.cc:39
Belle2::ZMQLoadBalancedInput::handleIncomingData
std::unique_ptr< ZMQNoIdMessage > handleIncomingData()
Answer event messages with a ready message and pass on every received message.
Definition: ZMQLoadBalancedConnection.cc:37
Belle2::ZMQLogger::log
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:106
Belle2::ZMQLogger::increment
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:34
Belle2::ZMQLoadBalancedOutput::m_lax
bool m_lax
Parameter to enable lax mode.
Definition: ZMQLoadBalancedConnection.h:117
Belle2::ZMQLoadBalancedInput::ZMQLoadBalancedInput
ZMQLoadBalancedInput(const std::string &inputAddress, unsigned int bufferSize, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced input connecting to the given address. Send bufferSize ready messages.
Definition: ZMQLoadBalancedConnection.cc:17
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQLoadBalancedOutput::handleIncomingData
void handleIncomingData()
Block until a ready message from an input is received and add it to the ready queue.
Definition: ZMQLoadBalancedConnection.cc:142
Belle2::ZMQLoadBalancedOutput::m_sentTerminateMessages
bool m_sentTerminateMessages
Did we already sent a terminate message?
Definition: ZMQLoadBalancedConnection.h:115
Belle2::ZMQMessageFactory::createMessage
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Definition: ZMQMessageFactory.h:37
Belle2::ZMQLogger::average
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:112
Belle2::ZMQLoadBalancedOutput::m_readyWorkers
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
Definition: ZMQLoadBalancedConnection.h:109
Belle2::ZMQLoadBalancedOutput::clear
void clear()
Clear the counter for sent stop and terminate messages. Should be called on run start.
Definition: ZMQLoadBalancedConnection.cc:171
Belle2::ZMQConnectionOverSocket
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:72
Belle2::ZMQLoadBalancedOutput::ZMQLoadBalancedOutput
ZMQLoadBalancedOutput(const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced output and bind to the given address.
Definition: ZMQLoadBalancedConnection.cc:58
Belle2::ZMQConnectionOverSocket::m_socket
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:84
Belle2::ZMQLoadBalancedOutput::m_allWorkers
std::set< std::string > m_allWorkers
All ever registered inputs.
Definition: ZMQLoadBalancedConnection.h:111