Belle II Software development
ZMQConfirmedInput Class Reference

Input part of a confirmed connection. More...

#include <ZMQConfirmedConnection.h>

Inheritance diagram for ZMQConfirmedInput:
ZMQConnectionOverSocket ZMQConnection ZMQLogger

Public Types

using ReactorFunction = std::function< void(void)>
 Typedef of a function which will be called if a connection has a message.
 

Public Member Functions

 ZMQConfirmedInput (const std::string &inputAddress, const std::shared_ptr< ZMQParent > &parent)
 Create a new confirmed output by binding to the address.
 
std::unique_ptr< ZMQIdMessagehandleIncomingData ()
 Block until a message can be received from one of the inputs.
 
void clear ()
 Reset the counters for all received stop and terminate messages. Should be called on run start.
 
std::unique_ptr< ZMQIdMessageoverwriteStopMessage ()
 Manually overwrite the stop message counter and set it to have all stop messages received.
 
std::vector< zmq::socket_t * > getSockets () const final
 The socket used for polling is just the stored socket.
 
std::string getEndPoint () const
 Return the connection string for this socket.
 
virtual bool isReady () const
 Return true of this connection is able to send messages right now. Can be overloaded in derived classes.
 
virtual std::string getMonitoringJSON () const
 Convert the stored monitoring values to a JSON string ready for sending out via a message.
 
template<class AClass >
void log (const std::string &key, const AClass &value)
 Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key.
 
void increment (const std::string &key)
 Increment the value with the given key (only numerical values). If not present, set to 1.
 
void decrement (const std::string &key)
 Decrement the value with the given key (only numerical values). If not present, set to -1.
 
template<size_t MAX_SIZE = 100>
void average (const std::string &key, double value)
 Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values.
 
template<size_t AVERAGE_SIZE = 2000>
void timeit (const std::string &key)
 Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement)
 
void logTime (const std::string &key)
 Store the current time as a string under the given key.
 

Static Public Member Functions

static bool poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
 Poll on the given connections and call the attached function if a messages comes in.
 
static bool hasMessage (const ZMQConnection *connection)
 Check if the given connection as an incoming message (right now, no waiting).
 

Protected Attributes

std::shared_ptr< ZMQParentm_parent
 The shared ZMQParent instance.
 
std::unique_ptr< zmq::socket_t > m_socket
 The memory of the socket. Needs to be initialized in a derived class.
 

Private Attributes

std::set< std::string > m_receivedStopMessages
 The set of input identities which have already sent a stop message.
 
bool m_allStopMessages = false
 Have we received all stop messages?
 
std::set< std::string > m_receivedTerminateMessages
 The set of input identities which have already sent a terminate message.
 
bool m_allTerminateMessages = false
 Have we received all terminante messages?
 
std::set< std::string > m_registeredWorkersInput
 The set of all registered inputs.
 
bool m_eventAfterAllStopMessages = false
 A flag to check the events appear after the all stop messages.
 
std::chrono::time_point< std::chrono::system_clock > m_whenEventAfterAllStopMessages
 A time when the eventAfterAllStopMessages is issued.
 
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
 Internal storage of all stored values.
 
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > m_averages
 Internal storage of the previous values when calculating averages.
 
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > m_timeCounters
 Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
 

Detailed Description

Input part of a confirmed connection.

In a confirmed connection every message sent by the input to the output is confirmed by a confirmation message by the output. The output can handle multiple inputs, each input can only be connected to a single output.

The input part keeps track of the registered workers and if it has received all stop and/or terminate messages. Stop/Terminate messages are only passed on if received from all registered workers.

If the input receives a message from one of the outputs, it sends back a confirmation message. Depending on the message type, it does several things:

  • if the message is a hello message (the input registered itself), it adds the input identity to the registered workers. Nothing is returned to the caller in this case.
  • if the message is a delete message (someone unregistered a worker), it removes the specified input identity in the message from the registered workers. Nothing is returned to the caller in this case. (corner case: if it turns out the worker to be unregistered was the last one missing for a full stop or terminate message from all workers, this message is passed on)
  • if the message is a stop message, add it to the received stop messages. Only if all registered workers have send a stop message pass it on.
  • same for terminate messages
  • in all other cases just pass on the message.

Internally, a ZMQ_ROUTER is used in bind-mode. The order of starting output and input does not play any role.

Definition at line 55 of file ZMQConfirmedConnection.h.

Member Typedef Documentation

◆ ReactorFunction

using ReactorFunction = std::function<void(void)>
inherited

Typedef of a function which will be called if a connection has a message.

Definition at line 34 of file ZMQConnection.h.

Constructor & Destructor Documentation

◆ ZMQConfirmedInput()

ZMQConfirmedInput ( const std::string &  inputAddress,
const std::shared_ptr< ZMQParent > &  parent 
)

Create a new confirmed output by binding to the address.

Definition at line 17 of file ZMQConfirmedConnection.cc.

19{
20 // These are all the log output we will have, set to 0 in the beginning.
21 log("last_received_message", "");
22 log("total_number_messages", 0l);
23 log("registered_workers", 0l);
24 log("hello_messages", 0l);
25 log("dead_workers", 0l);
26 log("all_stop_messages", 0l);
27 log("sent_stop_messages", 0l);
28 log("last_stop_sent", "");
29 log("received_stop_messages", 0l);
30 log("all_terminate_messages", 0l);
31 log("sent_terminate_messages", 0l);
32 log("last_terminate_sent", "");
33 log("received_terminate_messages", 0l);
34 log("received_messages_after_stop", 0l);
35 log("last_received_event_message", "");
36 log("last_clear", "");
37 log("stop_overwrites", 0l);
38 log("last_stop_overwrite", "");
39
40 log("data_size", 0.0);
41 log("received_events", 0l);
42 log("event_rate", 0.0);
43
44 // Create a binding socket of router type
45 m_socket = m_parent->createSocket<ZMQ_ROUTER>(inputAddress, true);
46}
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:75
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:77
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96

Member Function Documentation

◆ clear()

void clear ( )

Reset the counters for all received stop and terminate messages. Should be called on run start.

Definition at line 180 of file ZMQConfirmedConnection.cc.

181{
182 // We clear all our internal state and counters
184 m_allStopMessages = false;
188
189 log("received_stop_messages", static_cast<long>(m_receivedStopMessages.size()));
190 log("all_stop_messages", static_cast<long>(m_allStopMessages));
191 log("received_terminate_messages", static_cast<long>(m_receivedTerminateMessages.size()));
192 log("all_terminate_messages", static_cast<long>(m_allTerminateMessages));
193
194 logTime("last_clear");
195}
std::set< std::string > m_receivedStopMessages
The set of input identities which have already sent a stop message.
bool m_eventAfterAllStopMessages
A flag to check the events appear after the all stop messages.
std::set< std::string > m_receivedTerminateMessages
The set of input identities which have already sent a terminate message.
bool m_allTerminateMessages
Have we received all terminante messages?
bool m_allStopMessages
Have we received all stop messages?
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42

◆ decrement()

void decrement ( const std::string &  key)
inherited

Decrement the value with the given key (only numerical values). If not present, set to -1.

Definition at line 37 of file ZMQLogger.cc.

38{
39 std::visit(Decrementor{}, m_monitoring[key]);
40}
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
Internal storage of all stored values.
Definition: ZMQLogger.h:58

◆ getEndPoint()

std::string getEndPoint ( ) const
inherited

Return the connection string for this socket.

Definition at line 87 of file ZMQConnection.cc.

88{
89 std::string endpoint{""};
90 if (m_socket) {
91 endpoint = m_socket->get(zmq::sockopt::last_endpoint);
92 }
93 return endpoint;
94}

◆ getMonitoringJSON()

std::string getMonitoringJSON ( ) const
virtualinherited

Convert the stored monitoring values to a JSON string ready for sending out via a message.

Reimplemented in ZMQHistoServerToZMQOutput, ZMQHistoServerToRawOutput, ZMQROIOutput, and ZMQDataAndROIOutput.

Definition at line 15 of file ZMQLogger.cc.

16{
17 std::stringstream buffer;
18 buffer << "{";
19 bool first = true;
20 for (const auto& keyValue : m_monitoring) {
21 if (not first) {
22 buffer << ", ";
23 }
24 first = false;
25 buffer << "\"" << keyValue.first << "\": ";
26 buffer << std::visit(toJSON{}, keyValue.second);
27 }
28 buffer << "}" << std::endl;
29 return buffer.str();
30}

◆ getSockets()

std::vector< zmq::socket_t * > getSockets ( ) const
finalvirtualinherited

The socket used for polling is just the stored socket.

Implements ZMQConnection.

Definition at line 82 of file ZMQConnection.cc.

83{
84 return {m_socket.get()};
85}

◆ handleIncomingData()

std::unique_ptr< ZMQIdMessage > handleIncomingData ( )

Block until a message can be received from one of the inputs.

React as described in the general description of this class. Only if a message is to be passed on actually returns a message (in all other cases a nullptr).

Definition at line 48 of file ZMQConfirmedConnection.cc.

49{
50 auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(m_socket);
51 const auto fromIdentity = message->getIdentity();
52
53 logTime("last_received_message");
54 increment("total_number_messages");
55 increment("total_number_messages_from[" + fromIdentity + "]");
56
57 auto confirmMessage = ZMQMessageFactory::createMessage(fromIdentity, EMessageTypes::c_confirmMessage);
58 ZMQParent::send(m_socket, std::move(confirmMessage));
59
60 if (message->isMessage(EMessageTypes::c_helloMessage)) {
61 // a hello message makes us register the worker identity - which is the identity of the sender of the message
62 m_registeredWorkersInput.emplace(fromIdentity);
63 log("registered_workers", static_cast<long>(m_registeredWorkersInput.size()));
64 increment("hello_messages");
65 increment("hello_messages_from[" + fromIdentity + "]");
66 return {};
67 } else if (message->isMessage(EMessageTypes::c_deleteWorkerMessage)) {
68 // a delete message makes us forget about the worker identity. The identity is taken from the message data
69 // making it possible to delete other workers.
70 B2DEBUG(30, "Got message from " << message->getIdentity() << " to kill " << message->getMessagePartAsString<2>());
71 const std::string& killedIdentity = message->getMessagePartAsString<2>();
72 m_registeredWorkersInput.erase(killedIdentity);
73
74 log("registered_workers", static_cast<long>(m_registeredWorkersInput.size()));
75 increment("dead_workers");
76 increment("dead_worker_messaged_from[" + fromIdentity + "]");
77
78 if (m_registeredWorkersInput.empty()) {
79 B2ERROR("There is not a single worker registered anymore!");
80 return {};
81 }
82
83 // Corner case: could be that this was the one we were waiting for
85 m_allStopMessages = true;
86 log("all_stop_messages", static_cast<long>(m_allStopMessages));
87 increment("sent_stop_messages");
88 logTime("last_stop_sent");
89
90 return ZMQMessageFactory::createMessage(killedIdentity, EMessageTypes::c_lastEventMessage);
91 }
92 // Corner case: could be that this was the one we were waiting for
95 log("all_terminate_messages", static_cast<long>(m_allTerminateMessages));
96 increment("sent_terminate_messages");
97 logTime("last_terminate_sent");
98
99 return ZMQMessageFactory::createMessage(killedIdentity, EMessageTypes::c_terminateMessage);
100 }
101
102 return {};
103 }
104
105 B2ASSERT("Worker without proper registration!",
106 m_registeredWorkersInput.find(fromIdentity) != m_registeredWorkersInput.end());
107
108 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
109 // Increment the stop messages
110 m_receivedStopMessages.emplace(fromIdentity);
111 log("received_stop_messages", static_cast<long>(m_receivedStopMessages.size()));
112 increment("total_received_stop_messages");
113
115 // But only return this if everyone has sent a stop message already
116 m_allStopMessages = true;
117 log("all_stop_messages", static_cast<long>(m_allStopMessages));
118 increment("sent_stop_messages");
119 logTime("last_stop_sent");
120
121 return ZMQMessageFactory::createMessage(fromIdentity, EMessageTypes::c_lastEventMessage);
122 }
123
124 // Whatever we return here will be carried on to the application and eventually also to the output.
125 // This means as we are not passing the stop message now, we return nothing.
126 return {};
127 } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
128 // Increment the terminate messages
129 m_receivedTerminateMessages.emplace(fromIdentity);
130 log("received_terminate_messages", static_cast<long>(m_receivedTerminateMessages.size()));
131 increment("total_received_terminate_messages");
132
134 // But only return this if everyone has sent a terminate message already
136 log("all_terminate_messages", static_cast<long>(m_allTerminateMessages));
137 increment("sent_terminate_messages");
138 logTime("last_terminate_sent");
139
140 return ZMQMessageFactory::createMessage(fromIdentity, EMessageTypes::c_terminateMessage);
141 }
142
143 // Whatever we return here will be carried on to the application and eventually also to the output.
144 // This means as we are not passing the stop message now, we return nothing.
145 return {};
146 }
147
148 if (m_allStopMessages) {
151 m_whenEventAfterAllStopMessages = std::chrono::system_clock::now();
152 B2ERROR("Received an event after having received stop messages from every worker. This is not a good sign! I will dismiss this event and next events!");
153 }
154 increment("received_messages_after_stop");
155 auto t1 = std::chrono::system_clock::now();
156 const auto intervalAfterAllStopMessages = std::chrono::duration_cast<std::chrono::seconds> (t1 - m_whenEventAfterAllStopMessages);
157 if (intervalAfterAllStopMessages > std::chrono::seconds{150}) {
158 B2FATAL("Too many events after having received stop messages! This is abnormal. I will kill the process!");
159 }
160 return {};
161 }
162
163 // Now it can only be a plain normal data message, so just pass it on
164 const auto dataSize = message->getDataMessage().size();
165
166 average("data_size", dataSize);
167 average("data_size_from[" + fromIdentity + "]", dataSize);
168
169 increment("received_events");
170 increment("received_events[" + fromIdentity + "]");
171
172 timeit("event_rate");
173 timeit<200>("event_rate_from[" + fromIdentity + "]");
174
175 logTime("last_received_event_message");
176
177 return message;
178}
std::set< std::string > m_registeredWorkersInput
The set of all registered inputs.
std::chrono::time_point< std::chrono::system_clock > m_whenEventAfterAllStopMessages
A time when the eventAfterAllStopMessages is issued.
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102

◆ hasMessage()

bool hasMessage ( const ZMQConnection connection)
staticinherited

Check if the given connection as an incoming message (right now, no waiting).

Definition at line 20 of file ZMQConnection.cc.

21{
22 // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
23 const auto emptyFunction = []() {};
24 return ZMQConnection::poll({{connection, emptyFunction}}, 0);
25}
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.

◆ increment()

void increment ( const std::string &  key)
inherited

Increment the value with the given key (only numerical values). If not present, set to 1.

Definition at line 32 of file ZMQLogger.cc.

33{
34 std::visit(Incrementor{}, m_monitoring[key]);
35}

◆ isReady()

bool isReady ( ) const
virtualinherited

Return true of this connection is able to send messages right now. Can be overloaded in derived classes.

Reimplemented in ZMQROIOutput, ZMQDataAndROIOutput, ZMQLoadBalancedOutput, ZMQRawOutput, ZMQHistoServerToZMQOutput, and ZMQHistoServerToRawOutput.

Definition at line 15 of file ZMQConnection.cc.

16{
17 return true;
18}

◆ logTime()

void logTime ( const std::string &  key)
inherited

Store the current time as a string under the given key.

Definition at line 42 of file ZMQLogger.cc.

43{
44 auto current = std::chrono::system_clock::now();
45 auto displayTime = std::chrono::system_clock::to_time_t(current);
46 log(key, std::ctime(&displayTime));
47}

◆ overwriteStopMessage()

std::unique_ptr< ZMQIdMessage > overwriteStopMessage ( )

Manually overwrite the stop message counter and set it to have all stop messages received.

Definition at line 197 of file ZMQConfirmedConnection.cc.

198{
199 if (not m_allStopMessages) {
200 // We did not already receive all stop messages, but someone externally asked us to stop anyways. So lets do it.
201 B2ERROR("Sending out a stop message although not all of the workers are finished already!");
202 increment("stop_overwrites");
203 logTime("last_stop_overwrite");
204
205 m_allStopMessages = true;
206 log("all_stop_messages", static_cast<long>(m_allStopMessages));
207 increment("sent_stop_messages");
208 logTime("last_stop_sent");
209
210 return ZMQMessageFactory::createMessage("", EMessageTypes::c_lastEventMessage);
211 }
212
213 // We have already stopped, no need to sent it twice.
214 return {};
215}

◆ poll()

bool poll ( const std::map< const ZMQConnection *, ReactorFunction > &  connectionList,
int  timeout 
)
staticinherited

Poll on the given connections and call the attached function if a messages comes in.

If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.

Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!

Definition at line 27 of file ZMQConnection.cc.

28{
29 std::vector<const ReactorFunction*> socketMapping;
30 std::vector<zmq::pollitem_t> pollItems;
31
32 // zmq needs a special format for its polling, so create it here.
33 for (const auto& [connection, function] : connectionList) {
34 auto sockets = connection->getSockets();
35 for (zmq::socket_t* socket : sockets) {
36 zmq::pollitem_t pollItem;
37 pollItem.socket = static_cast<void*>(*socket);
38 pollItem.events = ZMQ_POLLIN;
39 pollItem.revents = 0;
40 pollItems.push_back(std::move(pollItem));
41
42 // but keep reference to the original function, so we can call the correct one later
43 socketMapping.push_back(&function);
44 }
45 }
46
47 if (pollItems.empty()) {
48 return false;
49 }
50
51 try {
52 zmq::poll(&pollItems[0], pollItems.size(), timeout);
53
54 bool anySocket = false;
55 unsigned int counter = 0;
56 for (const auto& pollItem : pollItems) {
57 if (pollItem.revents & ZMQ_POLLIN) {
58 anySocket = true;
59 const auto* functionPtr = socketMapping.at(counter);
60 const auto function = *functionPtr;
61 function();
62 }
63 counter++;
64 }
65
66 return anySocket;
67 } catch (zmq::error_t& error) {
68 if (error.num() == EINTR) {
69 // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
70 return false;
71 } else {
72 // cannot handle, rethrow exception
73 throw;
74 }
75 }
76}

Member Data Documentation

◆ m_allStopMessages

bool m_allStopMessages = false
private

Have we received all stop messages?

Definition at line 78 of file ZMQConfirmedConnection.h.

◆ m_allTerminateMessages

bool m_allTerminateMessages = false
private

Have we received all terminante messages?

Definition at line 82 of file ZMQConfirmedConnection.h.

◆ m_averages

std::unordered_map<std::string, std::tuple<std::vector<double>, size_t> > m_averages
privateinherited

Internal storage of the previous values when calculating averages.

Definition at line 60 of file ZMQLogger.h.

◆ m_eventAfterAllStopMessages

bool m_eventAfterAllStopMessages = false
private

A flag to check the events appear after the all stop messages.

Definition at line 86 of file ZMQConfirmedConnection.h.

◆ m_monitoring

std::map<std::string, std::variant<long, double, std::string> > m_monitoring
privateinherited

Internal storage of all stored values.

Definition at line 58 of file ZMQLogger.h.

◆ m_parent

std::shared_ptr<ZMQParent> m_parent
protectedinherited

The shared ZMQParent instance.

Definition at line 75 of file ZMQConnection.h.

◆ m_receivedStopMessages

std::set<std::string> m_receivedStopMessages
private

The set of input identities which have already sent a stop message.

Definition at line 76 of file ZMQConfirmedConnection.h.

◆ m_receivedTerminateMessages

std::set<std::string> m_receivedTerminateMessages
private

The set of input identities which have already sent a terminate message.

Definition at line 80 of file ZMQConfirmedConnection.h.

◆ m_registeredWorkersInput

std::set<std::string> m_registeredWorkersInput
private

The set of all registered inputs.

Definition at line 84 of file ZMQConfirmedConnection.h.

◆ m_socket

std::unique_ptr<zmq::socket_t> m_socket
protectedinherited

The memory of the socket. Needs to be initialized in a derived class.

Definition at line 77 of file ZMQConnection.h.

◆ m_timeCounters

std::unordered_map<std::string, std::tuple<unsigned long, std::chrono::system_clock::time_point> > m_timeCounters
privateinherited

Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.

Definition at line 62 of file ZMQLogger.h.

◆ m_whenEventAfterAllStopMessages

std::chrono::time_point<std::chrono::system_clock> m_whenEventAfterAllStopMessages
private

A time when the eventAfterAllStopMessages is issued.

Definition at line 88 of file ZMQConfirmedConnection.h.


The documentation for this class was generated from the following files: