Belle II Software  release-08-01-10
ZMQLoadBalancedConnection.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
9 
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11 #include <framework/logging/Logger.h>
12 
13 using namespace Belle2;
14 
15 ZMQLoadBalancedInput::ZMQLoadBalancedInput(const std::string& inputAddress, unsigned int bufferSize,
16  const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(parent)
17 {
18  // We clear all our internal state and counters
19  log("sent_ready", 0l);
20  log("data_size", 0.0);
21  log("received_events", 0l);
22  log("event_rate", 0.0);
23 
24  // Create a non-binding DEALER socket
25  m_socket = m_parent->createSocket<ZMQ_DEALER>(inputAddress, false);
26 
27  // Send as many ready message as our buffer size is. This means we will get that many events, which will then be "in flight"
28  for (unsigned int i = 0; i < bufferSize; i++) {
29  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
30  ZMQParent::send(m_socket, std::move(readyMessage));
31  increment("sent_ready");
32  }
33 }
34 
35 std::unique_ptr<ZMQNoIdMessage> ZMQLoadBalancedInput::handleIncomingData()
36 {
37  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(m_socket);
38 
39  if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
40  // if it is an event message, return a ready message back. If not, no need for that.
41  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
42  ZMQParent::send(m_socket, std::move(readyMessage));
43  increment("sent_ready");
44 
45  // and also do some logging
46  const auto dataSize = message->getDataMessage().size();
47 
48  average("data_size", dataSize);
49  increment("received_events");
50  timeit("event_rate");
51  }
52 
53  return message;
54 }
55 
56 ZMQLoadBalancedOutput::ZMQLoadBalancedOutput(const std::string& outputAddress, bool lax,
57  const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(
58  parent), m_lax(lax)
59 {
60  // We clear all our internal state and counters
61  log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
62  log("registered_workers", static_cast<long>(m_allWorkers.size()));
63 
64  log("data_size", 0.0);
65  log("dismissed_events", 0l);
66  log("event_rate", 0.0);
67  log("sent_events", 0l);
68 
69  log("all_stop_messages", 0l);
70  log("sent_stop_messages", 0l);
71  log("last_stop_sent", "");
72 
73  log("all_terminate_messages", 0l);
74  log("sent_terminate_messages", 0l);
75  log("last_terminate_sent", "");
76 
77  // Create a binding ROUTER socket
78  m_socket = m_parent->createSocket<ZMQ_ROUTER>(outputAddress, true);
79 }
80 
81 void ZMQLoadBalancedOutput::handleEvent(std::unique_ptr<ZMQNoIdMessage> message)
82 {
83  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
84  // Tell all workers to stop this run
85  for (auto worker : m_allWorkers) {
86  auto sendMessage = ZMQMessageFactory::createMessage(worker, EMessageTypes::c_lastEventMessage);
87  ZMQParent::send(m_socket, std::move(sendMessage));
88  }
89  m_sentStopMessages = true;
90  log("all_stop_messages", static_cast<long>(m_sentStopMessages));
91  increment("sent_stop_messages");
92  logTime("last_stop_sent");
93  return;
94  } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
95  // Tell all workers to terminate
96  for (auto worker : m_allWorkers) {
97  auto sendMessage = ZMQMessageFactory::createMessage(worker, EMessageTypes::c_terminateMessage);
98  ZMQParent::send(m_socket, std::move(sendMessage));
99  }
101  log("all_terminate_messages", static_cast<long>(m_sentTerminateMessages));
102  increment("sent_terminate_messages");
103  logTime("last_terminate_sent");
104  return;
105  }
106 
107  if (m_lax and m_readyWorkers.empty()) {
108  // There is no one that can handle the event in the moment, dismiss it (if lax is true)
109  increment("dismissed_events");
110  return;
111  }
112  if (m_sentStopMessages) {
113  B2ERROR("Received events after stop! I will dismiss this event.");
114  increment("dismissed_events");
115  return;
116  }
117 
118  const auto dataSize = message->getDataMessage().size();
119 
120  B2ASSERT("Must be > 0", not m_readyWorkers.empty());
121  auto nextWorker = m_readyWorkers.front();
122  m_readyWorkers.pop_front();
123 
124  average("data_size", dataSize);
125  average("data_size_to[" + nextWorker + "]", dataSize);
126 
127  increment("sent_events");
128  increment("sent_events[" + nextWorker + "]");
129 
130  timeit("event_rate");
131  timeit<200>("event_rate_to[" + nextWorker + "]");
132 
133  m_socket->send(ZMQMessageHelper::createZMQMessage(nextWorker), zmq::send_flags::sndmore);
134  ZMQParent::send(m_socket, std::move(message));
135 
136  log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
137  decrement("ready_messages[" + nextWorker + "]");
138 }
139 
141 {
142  auto readyMessage = ZMQMessageFactory::fromSocket<ZMQIdMessage>(m_socket);
143  B2ASSERT("Should be a ready message", readyMessage->isMessage(EMessageTypes::c_readyMessage));
144 
145  // Register it as another ready worker
146  const auto toIdentity = readyMessage->getIdentity();
147  m_readyWorkers.push_back(toIdentity);
148 
149  if (m_allWorkers.emplace(toIdentity).second) {
150  // Aha, we did never see this worker so far, so add it to our list.
151  if (m_sentStopMessages) {
152  // If it turned up late (everyone else has already stopped), send a stop message directly
153  auto sendMessage = ZMQMessageFactory::createMessage(toIdentity, EMessageTypes::c_lastEventMessage);
154  ZMQParent::send(m_socket, std::move(sendMessage));
155  }
156 
158  // If it turned up late (everyone else has already terminates), send a terminate message directly
159  auto sendMessage = ZMQMessageFactory::createMessage(toIdentity, EMessageTypes::c_terminateMessage);
160  ZMQParent::send(m_socket, std::move(sendMessage));
161  }
162  }
163 
164  log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
165  log("registered_workers", static_cast<long>(m_allWorkers.size()));
166  increment("ready_messages[" + toIdentity + "]");
167 }
168 
170 {
171  m_sentStopMessages = false;
172  m_sentTerminateMessages = false;
173 
174  log("all_stop_messages", static_cast<long>(m_sentStopMessages));
175  log("all_terminate_messages", static_cast<long>(m_sentTerminateMessages));
176 }
177 
179 {
180  // if we are lax, we are always ready. If not, we need to have at least a single ready worker. This prevents the B2ASSERT to fail.
181  return m_lax or not m_readyWorkers.empty();
182 }
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:62
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:74
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:76
ZMQLoadBalancedInput(const std::string &inputAddress, unsigned int bufferSize, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced input connecting to the given address. Send bufferSize ready messages.
std::unique_ptr< ZMQNoIdMessage > handleIncomingData()
Answer event messages with a ready message and pass on every received message.
std::set< std::string > m_allWorkers
All ever registered inputs.
bool m_sentTerminateMessages
Did we already sent a terminate message?
ZMQLoadBalancedOutput(const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced output and bind to the given address.
void handleIncomingData()
Block until a ready message from an input is received and add it to the ready queue.
bool m_lax
Parameter to enable lax mode.
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
bool isReady() const final
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
void clear()
Clear the counter for sent stop and terminate messages. Should be called on run start.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message)
Send the given message (without identity) to the next input in the ready list.
bool m_sentStopMessages
Did we already sent a stop message?
void decrement(const std::string &key)
Decrement the value with the given key (only numerical values). If not present, set to -1.
Definition: ZMQLogger.cc:37
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.