Belle II Software  release-08-01-10
ZMQConfirmedConnection.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 
11 #include <thread>
12 #include <chrono>
13 #include <set>
14 
15 using namespace Belle2;
16 
17 ZMQConfirmedInput::ZMQConfirmedInput(const std::string& inputAddress, const std::shared_ptr<ZMQParent>& parent)
18  : ZMQConnectionOverSocket(parent)
19 {
20  // These are all the log output we will have, set to 0 in the beginning.
21  log("last_received_message", "");
22  log("total_number_messages", 0l);
23  log("registered_workers", 0l);
24  log("hello_messages", 0l);
25  log("dead_workers", 0l);
26  log("all_stop_messages", 0l);
27  log("sent_stop_messages", 0l);
28  log("last_stop_sent", "");
29  log("received_stop_messages", 0l);
30  log("all_terminate_messages", 0l);
31  log("sent_terminate_messages", 0l);
32  log("last_terminate_sent", "");
33  log("received_terminate_messages", 0l);
34  log("received_messages_after_stop", 0l);
35  log("last_received_event_message", "");
36  log("last_clear", "");
37  log("stop_overwrites", 0l);
38  log("last_stop_overwrite", "");
39 
40  log("data_size", 0.0);
41  log("received_events", 0l);
42  log("event_rate", 0.0);
43 
44  // Create a binding socket of router type
45  m_socket = m_parent->createSocket<ZMQ_ROUTER>(inputAddress, true);
46 }
47 
48 std::unique_ptr<ZMQIdMessage> ZMQConfirmedInput::handleIncomingData()
49 {
50  auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(m_socket);
51  const auto fromIdentity = message->getIdentity();
52 
53  logTime("last_received_message");
54  increment("total_number_messages");
55  increment("total_number_messages_from[" + fromIdentity + "]");
56 
57  auto confirmMessage = ZMQMessageFactory::createMessage(fromIdentity, EMessageTypes::c_confirmMessage);
58  ZMQParent::send(m_socket, std::move(confirmMessage));
59 
60  if (message->isMessage(EMessageTypes::c_helloMessage)) {
61  // a hello message makes us register the worker identity - which is the identity of the sender of the message
62  m_registeredWorkersInput.emplace(fromIdentity);
63  log("registered_workers", static_cast<long>(m_registeredWorkersInput.size()));
64  increment("hello_messages");
65  increment("hello_messages_from[" + fromIdentity + "]");
66  return {};
67  } else if (message->isMessage(EMessageTypes::c_deleteWorkerMessage)) {
68  // a delete message makes us forget about the worker identity. The identity is taken from the message data
69  // making it possible to delete other workers.
70  B2DEBUG(30, "Got message from " << message->getIdentity() << " to kill " << message->getMessagePartAsString<2>());
71  const std::string& killedIdentity = message->getMessagePartAsString<2>();
72  m_registeredWorkersInput.erase(killedIdentity);
73 
74  log("registered_workers", static_cast<long>(m_registeredWorkersInput.size()));
75  increment("dead_workers");
76  increment("dead_worker_messaged_from[" + fromIdentity + "]");
77 
78  if (m_registeredWorkersInput.empty()) {
79  B2ERROR("There is not a single worker registered anymore!");
80  return {};
81  }
82 
83  // Corner case: could be that this was the one we were waiting for
85  m_allStopMessages = true;
86  log("all_stop_messages", static_cast<long>(m_allStopMessages));
87  increment("sent_stop_messages");
88  logTime("last_stop_sent");
89 
90  return ZMQMessageFactory::createMessage(killedIdentity, EMessageTypes::c_lastEventMessage);
91  }
92  // Corner case: could be that this was the one we were waiting for
95  log("all_terminate_messages", static_cast<long>(m_allTerminateMessages));
96  increment("sent_terminate_messages");
97  logTime("last_terminate_sent");
98 
99  return ZMQMessageFactory::createMessage(killedIdentity, EMessageTypes::c_terminateMessage);
100  }
101 
102  return {};
103  }
104 
105  B2ASSERT("Worker without proper registration!",
106  m_registeredWorkersInput.find(fromIdentity) != m_registeredWorkersInput.end());
107 
108  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
109  // Increment the stop messages
110  m_receivedStopMessages.emplace(fromIdentity);
111  log("received_stop_messages", static_cast<long>(m_receivedStopMessages.size()));
112  increment("total_received_stop_messages");
113 
115  // But only return this if everyone has sent a stop message already
116  m_allStopMessages = true;
117  log("all_stop_messages", static_cast<long>(m_allStopMessages));
118  increment("sent_stop_messages");
119  logTime("last_stop_sent");
120 
121  return ZMQMessageFactory::createMessage(fromIdentity, EMessageTypes::c_lastEventMessage);
122  }
123 
124  // Whatever we return here will be carried on to the application and eventually also to the output.
125  // This means as we are not passing the stop message now, we return nothing.
126  return {};
127  } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
128  // Increment the terminate messages
129  m_receivedTerminateMessages.emplace(fromIdentity);
130  log("received_terminate_messages", static_cast<long>(m_receivedTerminateMessages.size()));
131  increment("total_received_terminate_messages");
132 
134  // But only return this if everyone has sent a terminate message already
135  m_allTerminateMessages = true;
136  log("all_terminate_messages", static_cast<long>(m_allTerminateMessages));
137  increment("sent_terminate_messages");
138  logTime("last_terminate_sent");
139 
140  return ZMQMessageFactory::createMessage(fromIdentity, EMessageTypes::c_terminateMessage);
141  }
142 
143  // Whatever we return here will be carried on to the application and eventually also to the output.
144  // This means as we are not passing the stop message now, we return nothing.
145  return {};
146  }
147 
148  if (m_allStopMessages) {
149  B2ERROR("Received an event after having received stop messages from every worker. This is not a good sign! I will dismiss this event!");
150  increment("received_messages_after_stop");
151  return {};
152  }
153 
154  // Now it can only be a plain normal data message, so just pass it on
155  const auto dataSize = message->getDataMessage().size();
156 
157  average("data_size", dataSize);
158  average("data_size_from[" + fromIdentity + "]", dataSize);
159 
160  increment("received_events");
161  increment("received_events[" + fromIdentity + "]");
162 
163  timeit("event_rate");
164  timeit<200>("event_rate_from[" + fromIdentity + "]");
165 
166  logTime("last_received_event_message");
167 
168  return message;
169 }
170 
172 {
173  // We clear all our internal state and counters
174  m_receivedStopMessages.clear();
175  m_allStopMessages = false;
177  m_allTerminateMessages = false;
178 
179  log("received_stop_messages", static_cast<long>(m_receivedStopMessages.size()));
180  log("all_stop_messages", static_cast<long>(m_allStopMessages));
181  log("received_terminate_messages", static_cast<long>(m_receivedTerminateMessages.size()));
182  log("all_terminate_messages", static_cast<long>(m_allTerminateMessages));
183 
184  logTime("last_clear");
185 }
186 
187 std::unique_ptr<ZMQIdMessage> ZMQConfirmedInput::overwriteStopMessage()
188 {
189  if (not m_allStopMessages) {
190  // We did not already receive all stop messages, but someone externally asked us to stop anyways. So lets do it.
191  B2ERROR("Sending out a stop message although not all of the workers are finished already!");
192  increment("stop_overwrites");
193  logTime("last_stop_overwrite");
194 
195  m_allStopMessages = true;
196  log("all_stop_messages", static_cast<long>(m_allStopMessages));
197  increment("sent_stop_messages");
198  logTime("last_stop_sent");
199 
200  return ZMQMessageFactory::createMessage("", EMessageTypes::c_lastEventMessage);
201  }
202 
203  // We have already stopped, no need to sent it twice.
204  return {};
205 }
206 
207 ZMQConfirmedOutput::ZMQConfirmedOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent)
208  : ZMQConnectionOverSocket(parent)
209 {
210  // These are all the log output we will have, set to 0 in the beginning.
211  log("no_confirmation_message", 0l);
212  log("last_sent_event_message", 0l);
213  log("data_size", 0.0);
214  log("sent_events", 0l);
215  log("event_rate", 0.0);
216  log("timespan_waiting_for_confirmation", 0l);
217 
218  // Register a non-binding DEALER socket
219  m_socket = m_parent->createSocket<ZMQ_DEALER>(outputAddress, false);
220 
221  logTime("last_hello_sent");
222 
223  // Say hello to the receiver end of the connection (which is a confirmed input)
224  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage);
225  handleEvent(std::move(message));
226 }
227 
228 void ZMQConfirmedOutput::handleEvent(std::unique_ptr<ZMQNoIdMessage> message, bool requireConfirmation, int maximalWaitTime)
229 {
230  auto current = std::chrono::system_clock::now();
231  while (m_waitingForConfirmation > 0) {
232  // Wait for all conformation messages that are still pending.
233  if (ZMQParent::poll({m_socket.get()}, maximalWaitTime)) {
235  } else if (requireConfirmation) {
236  B2FATAL("Did not receive a confirmation message in time!");
237  } else {
238  B2WARNING("Did not receive a confirmation message in time!");
239  increment("no_confirmation_message");
240  // If we did not receive one, we will also not receive the next one so lets break out.
241  break;
242  }
243  }
244  auto afterWaiting = std::chrono::system_clock::now();
245  m_timespanWaitingForConfirmation += std::chrono::duration_cast<std::chrono::milliseconds>(afterWaiting - current).count();
246  log("timespan_waiting_for_confirmation", m_timespanWaitingForConfirmation);
247 
248  // We have received a confirmation for the old, so we can also sent a new message.
249 
250  // TODO: makes no sense for signal messages!
251  const auto dataSize = message->getDataMessage().size();
252 
253  average("data_size", dataSize);
254  increment("sent_events");
255  timeit("event_rate");
256 
257  logTime("last_sent_event_message");
258 
259  ZMQParent::send(m_socket, std::move(message));
261 }
262 
264 {
265  // This should only ever be a confirmation message
266  B2ASSERT("There should be no data coming here, if we have already a confirmation!", m_waitingForConfirmation > 0);
267  auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(m_socket);
268  B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
270 }
std::set< std::string > m_receivedStopMessages
The set of input identities which have already sent a stop message.
std::unique_ptr< ZMQIdMessage > handleIncomingData()
Block until a message can be received from one of the inputs.
std::set< std::string > m_receivedTerminateMessages
The set of input identities which have already sent a terminate message.
std::set< std::string > m_registeredWorkersInput
The set of all registered inputs.
bool m_allTerminateMessages
Have we received all terminante messages?
ZMQConfirmedInput(const std::string &inputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by binding to the address.
void clear()
Reset the counters for all received stop and terminate messages. Should be called on run start.
bool m_allStopMessages
Have we received all stop messages?
std::unique_ptr< ZMQIdMessage > overwriteStopMessage()
Manually overwrite the stop message counter and set it to have all stop messages received.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
long m_timespanWaitingForConfirmation
Internal monitoring how long we were waiting for confirmation messages.
void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.
unsigned int m_waitingForConfirmation
On how many confirmation messages are we still waiting?
ZMQConfirmedOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by connecting to the address.
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:62
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:74
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:76
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.