Belle II Software development
ZMQConfirmedConnection.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10
11#include <chrono>
12#include <set>
13
14using namespace Belle2;
15
16ZMQConfirmedInput::ZMQConfirmedInput(const std::string& inputAddress, const std::shared_ptr<ZMQParent>& parent)
18{
19 // These are all the log output we will have, set to 0 in the beginning.
20 log("last_received_message", "");
21 log("total_number_messages", 0l);
22 log("registered_workers", 0l);
23 log("hello_messages", 0l);
24 log("dead_workers", 0l);
25 log("all_stop_messages", 0l);
26 log("sent_stop_messages", 0l);
27 log("last_stop_sent", "");
28 log("received_stop_messages", 0l);
29 log("all_terminate_messages", 0l);
30 log("sent_terminate_messages", 0l);
31 log("last_terminate_sent", "");
32 log("received_terminate_messages", 0l);
33 log("received_messages_after_stop", 0l);
34 log("last_received_event_message", "");
35 log("last_clear", "");
36 log("stop_overwrites", 0l);
37 log("last_stop_overwrite", "");
38
39 log("data_size", 0.0);
40 log("received_events", 0l);
41 log("event_rate", 0.0);
42
43 // Create a binding socket of router type
44 m_socket = m_parent->createSocket<ZMQ_ROUTER>(inputAddress, true);
45}
46
47std::unique_ptr<ZMQIdMessage> ZMQConfirmedInput::handleIncomingData()
48{
49 auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(m_socket);
50 const auto fromIdentity = message->getIdentity();
51
52 logTime("last_received_message");
53 increment("total_number_messages");
54 increment("total_number_messages_from[" + fromIdentity + "]");
55
56 auto confirmMessage = ZMQMessageFactory::createMessage(fromIdentity, EMessageTypes::c_confirmMessage);
57 ZMQParent::send(m_socket, std::move(confirmMessage));
58
59 if (message->isMessage(EMessageTypes::c_helloMessage)) {
60 // a hello message makes us register the worker identity - which is the identity of the sender of the message
61 m_registeredWorkersInput.emplace(fromIdentity);
62 log("registered_workers", static_cast<long>(m_registeredWorkersInput.size()));
63 increment("hello_messages");
64 increment("hello_messages_from[" + fromIdentity + "]");
65 return {};
66 } else if (message->isMessage(EMessageTypes::c_deleteWorkerMessage)) {
67 // a delete message makes us forget about the worker identity. The identity is taken from the message data
68 // making it possible to delete other workers.
69 B2DEBUG(30, "Got message from " << message->getIdentity() << " to kill " << message->getMessagePartAsString<2>());
70 const std::string& killedIdentity = message->getMessagePartAsString<2>();
71 m_registeredWorkersInput.erase(killedIdentity);
72
73 log("registered_workers", static_cast<long>(m_registeredWorkersInput.size()));
74 increment("dead_workers");
75 increment("dead_worker_messaged_from[" + fromIdentity + "]");
76
77 if (m_registeredWorkersInput.empty()) {
78 B2ERROR("There is not a single worker registered anymore!");
79 return {};
80 }
81
82 // Corner case: could be that this was the one we were waiting for
84 m_allStopMessages = true;
85 log("all_stop_messages", static_cast<long>(m_allStopMessages));
86 increment("sent_stop_messages");
87 logTime("last_stop_sent");
88
89 return ZMQMessageFactory::createMessage(killedIdentity, EMessageTypes::c_lastEventMessage);
90 }
91 // Corner case: could be that this was the one we were waiting for
94 log("all_terminate_messages", static_cast<long>(m_allTerminateMessages));
95 increment("sent_terminate_messages");
96 logTime("last_terminate_sent");
97
98 return ZMQMessageFactory::createMessage(killedIdentity, EMessageTypes::c_terminateMessage);
99 }
100
101 return {};
102 }
103
104 B2ASSERT("Worker without proper registration!",
105 m_registeredWorkersInput.find(fromIdentity) != m_registeredWorkersInput.end());
106
107 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
108 // Increment the stop messages
109 m_receivedStopMessages.emplace(fromIdentity);
110 log("received_stop_messages", static_cast<long>(m_receivedStopMessages.size()));
111 increment("total_received_stop_messages");
112
114 // But only return this if everyone has sent a stop message already
115 m_allStopMessages = true;
116 log("all_stop_messages", static_cast<long>(m_allStopMessages));
117 increment("sent_stop_messages");
118 logTime("last_stop_sent");
119
120 return ZMQMessageFactory::createMessage(fromIdentity, EMessageTypes::c_lastEventMessage);
121 }
122
123 // Whatever we return here will be carried on to the application and eventually also to the output.
124 // This means as we are not passing the stop message now, we return nothing.
125 return {};
126 } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
127 // Increment the terminate messages
128 m_receivedTerminateMessages.emplace(fromIdentity);
129 log("received_terminate_messages", static_cast<long>(m_receivedTerminateMessages.size()));
130 increment("total_received_terminate_messages");
131
133 // But only return this if everyone has sent a terminate message already
135 log("all_terminate_messages", static_cast<long>(m_allTerminateMessages));
136 increment("sent_terminate_messages");
137 logTime("last_terminate_sent");
138
139 return ZMQMessageFactory::createMessage(fromIdentity, EMessageTypes::c_terminateMessage);
140 }
141
142 // Whatever we return here will be carried on to the application and eventually also to the output.
143 // This means as we are not passing the stop message now, we return nothing.
144 return {};
145 }
146
147 if (m_allStopMessages) {
150 m_whenEventAfterAllStopMessages = std::chrono::system_clock::now();
151 B2ERROR("Received an event after having received stop messages from every worker. This is not a good sign! I will dismiss this event and next events!");
152 }
153 increment("received_messages_after_stop");
154 auto t1 = std::chrono::system_clock::now();
155 const auto intervalAfterAllStopMessages = std::chrono::duration_cast<std::chrono::seconds> (t1 - m_whenEventAfterAllStopMessages);
156 if (intervalAfterAllStopMessages > std::chrono::seconds{150}) {
157 B2FATAL("Too many events after having received stop messages! This is abnormal. I will kill the process!");
158 }
159 return {};
160 }
161
162 // Now it can only be a plain normal data message, so just pass it on
163 const auto dataSize = message->getDataMessage().size();
164
165 average("data_size", dataSize);
166 average("data_size_from[" + fromIdentity + "]", dataSize);
167
168 increment("received_events");
169 increment("received_events[" + fromIdentity + "]");
170
171 timeit("event_rate");
172 timeit<200>("event_rate_from[" + fromIdentity + "]");
173
174 logTime("last_received_event_message");
175
176 return message;
177}
178
180{
181 // We clear all our internal state and counters
183 m_allStopMessages = false;
187
188 log("received_stop_messages", static_cast<long>(m_receivedStopMessages.size()));
189 log("all_stop_messages", static_cast<long>(m_allStopMessages));
190 log("received_terminate_messages", static_cast<long>(m_receivedTerminateMessages.size()));
191 log("all_terminate_messages", static_cast<long>(m_allTerminateMessages));
192
193 logTime("last_clear");
194}
195
196std::unique_ptr<ZMQIdMessage> ZMQConfirmedInput::overwriteStopMessage()
197{
198 if (not m_allStopMessages) {
199 // We did not already receive all stop messages, but someone externally asked us to stop anyways. So lets do it.
200 B2ERROR("Sending out a stop message although not all of the workers are finished already!");
201 increment("stop_overwrites");
202 logTime("last_stop_overwrite");
203
204 m_allStopMessages = true;
205 log("all_stop_messages", static_cast<long>(m_allStopMessages));
206 increment("sent_stop_messages");
207 logTime("last_stop_sent");
208
209 return ZMQMessageFactory::createMessage("", EMessageTypes::c_lastEventMessage);
210 }
211
212 // We have already stopped, no need to sent it twice.
213 return {};
214}
215
216ZMQConfirmedOutput::ZMQConfirmedOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent)
218{
219 // These are all the log output we will have, set to 0 in the beginning.
220 log("no_confirmation_message", 0l);
221 log("last_sent_event_message", 0l);
222 log("data_size", 0.0);
223 log("sent_events", 0l);
224 log("event_rate", 0.0);
225 log("timespan_waiting_for_confirmation", 0l);
226
227 // Register a non-binding DEALER socket
228 m_socket = m_parent->createSocket<ZMQ_DEALER>(outputAddress, false);
229
230 logTime("last_hello_sent");
231
232 // Say hello to the receiver end of the connection (which is a confirmed input)
233 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage);
234 handleEvent(std::move(message));
235}
236
237void ZMQConfirmedOutput::handleEvent(std::unique_ptr<ZMQNoIdMessage> message, bool requireConfirmation, int maximalWaitTime)
238{
239 auto current = std::chrono::system_clock::now();
240 while (m_waitingForConfirmation > 0) {
241 // Wait for all conformation messages that are still pending.
242 if (ZMQParent::poll({m_socket.get()}, maximalWaitTime)) {
244 } else if (requireConfirmation) {
245 B2FATAL("Did not receive a confirmation message in time!");
246 } else {
247 B2WARNING("Did not receive a confirmation message in time!");
248 increment("no_confirmation_message");
249 // If we did not receive one, we will also not receive the next one so lets break out.
250 break;
251 }
252 }
253 auto afterWaiting = std::chrono::system_clock::now();
254 m_timespanWaitingForConfirmation += std::chrono::duration_cast<std::chrono::milliseconds>(afterWaiting - current).count();
255 log("timespan_waiting_for_confirmation", m_timespanWaitingForConfirmation);
256
257 // We have received a confirmation for the old, so we can also sent a new message.
258
259 // TODO: makes no sense for signal messages!
260 const auto dataSize = message->getDataMessage().size();
261
262 average("data_size", dataSize);
263 increment("sent_events");
264 timeit("event_rate");
265
266 logTime("last_sent_event_message");
267
268 ZMQParent::send(m_socket, std::move(message));
270}
271
273{
274 // This should only ever be a confirmation message
275 B2ASSERT("There should be no data coming here, if we have already a confirmation!", m_waitingForConfirmation > 0);
276 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(m_socket);
277 B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
279}
std::set< std::string > m_receivedStopMessages
The set of input identities which have already sent a stop message.
bool m_eventAfterAllStopMessages
A flag to check the events appear after the all stop messages.
std::unique_ptr< ZMQIdMessage > handleIncomingData()
Block until a message can be received from one of the inputs.
std::set< std::string > m_receivedTerminateMessages
The set of input identities which have already sent a terminate message.
std::set< std::string > m_registeredWorkersInput
The set of all registered inputs.
std::chrono::time_point< std::chrono::system_clock > m_whenEventAfterAllStopMessages
A time when the eventAfterAllStopMessages is issued.
bool m_allTerminateMessages
Have we received all terminante messages?
ZMQConfirmedInput(const std::string &inputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by binding to the address.
void clear()
Reset the counters for all received stop and terminate messages. Should be called on run start.
bool m_allStopMessages
Have we received all stop messages?
std::unique_ptr< ZMQIdMessage > overwriteStopMessage()
Manually overwrite the stop message counter and set it to have all stop messages received.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
long m_timespanWaitingForConfirmation
Internal monitoring how long we were waiting for confirmation messages.
void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.
unsigned int m_waitingForConfirmation
On how many confirmation messages are we still waiting?
ZMQConfirmedOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by connecting to the address.
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:75
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:77
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storing the double value directly under the given key, store the average of the last MAX_S...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.