Belle II Software development
ZMQLoadBalancedOutput Class Reference

Output part of a load-balanced connection. More...

#include <ZMQLoadBalancedConnection.h>

Inheritance diagram for ZMQLoadBalancedOutput:
ZMQConnectionOverSocket ZMQConnection ZMQLogger

Public Types

using ReactorFunction = std::function< void(void)>
 Typedef of a function which will be called if a connection has a message.
 

Public Member Functions

 ZMQLoadBalancedOutput (const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent)
 Create a new load-balanced output and bind to the given address.
 
void handleEvent (std::unique_ptr< ZMQNoIdMessage > message)
 Send the given message (without identity) to the next input in the ready list.
 
void handleIncomingData ()
 Block until a ready message from an input is received and add it to the ready queue.
 
void clear ()
 Clear the counter for sent stop and terminate messages. Should be called on run start.
 
bool isReady () const final
 If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
 
std::vector< zmq::socket_t * > getSockets () const final
 The socket used for polling is just the stored socket.
 
std::string getEndPoint () const
 Return the connection string for this socket.
 
virtual std::string getMonitoringJSON () const
 Convert the stored monitoring values to a JSON string ready for sending out via a message.
 
template<class AClass >
void log (const std::string &key, const AClass &value)
 Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key.
 
void increment (const std::string &key)
 Increment the value with the given key (only numerical values). If not present, set to 1.
 
void decrement (const std::string &key)
 Decrement the value with the given key (only numerical values). If not present, set to -1.
 
template<size_t MAX_SIZE = 100>
void average (const std::string &key, double value)
 Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values.
 
template<size_t AVERAGE_SIZE = 2000>
void timeit (const std::string &key)
 Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement)
 
void logTime (const std::string &key)
 Store the current time as a string under the given key.
 

Static Public Member Functions

static bool poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
 Poll on the given connections and call the attached function if a messages comes in.
 
static bool hasMessage (const ZMQConnection *connection)
 Check if the given connection as an incoming message (right now, no waiting).
 

Protected Attributes

std::deque< std::string > m_readyWorkers
 List of identities of ready inputs in LIFO order.
 
std::set< std::string > m_allWorkers
 All ever registered inputs.
 
bool m_sentStopMessages = false
 Did we already sent a stop message?
 
bool m_sentTerminateMessages = false
 Did we already sent a terminate message?
 
bool m_lax = false
 Parameter to enable lax mode.
 
std::shared_ptr< ZMQParentm_parent
 The shared ZMQParent instance.
 
std::unique_ptr< zmq::socket_t > m_socket
 The memory of the socket. Needs to be initialized in a derived class.
 

Private Attributes

std::map< std::string, std::variant< long, double, std::string > > m_monitoring
 Internal storage of all stored values.
 
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > m_averages
 Internal storage of the previous values when calculating averages.
 
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > m_timeCounters
 Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
 

Detailed Description

Output part of a load-balanced connection.

Multiple inputs can connect to this output. The output keeps a list of all received ready messages and sends events on requests always to the next input in the list, which creates some sort of load-balancing.

Stop and terminate messages are sent all all inputs that have sent at least a single ready so far. There is no unregistration happening, so dead inputs will still be served with both events and stop/terminate messages. For stop/terminate messages this is no problem and event messages will only be sent as long as there are ready messages.

After a stop message is sent all additional incoming events will be dismissed.

If no input has send any ready message, the output is not ready and needs to be polled for new ready messages. However, there is also a "lax" mode which makes the events being silently discarded if no input is ready.

Internally a ZMQ_ROUTER in bind mode is used.

Definition at line 71 of file ZMQLoadBalancedConnection.h.

Member Typedef Documentation

◆ ReactorFunction

using ReactorFunction = std::function<void(void)>
inherited

Typedef of a function which will be called if a connection has a message.

Definition at line 34 of file ZMQConnection.h.

Constructor & Destructor Documentation

◆ ZMQLoadBalancedOutput()

ZMQLoadBalancedOutput ( const std::string &  outputAddress,
bool  lax,
const std::shared_ptr< ZMQParent > &  parent 
)

Create a new load-balanced output and bind to the given address.

Definition at line 56 of file ZMQLoadBalancedConnection.cc.

58 parent), m_lax(lax)
59{
60 // We clear all our internal state and counters
61 log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
62 log("registered_workers", static_cast<long>(m_allWorkers.size()));
63
64 log("data_size", 0.0);
65 log("dismissed_events", 0l);
66 log("event_rate", 0.0);
67 log("sent_events", 0l);
68
69 log("all_stop_messages", 0l);
70 log("sent_stop_messages", 0l);
71 log("last_stop_sent", "");
72
73 log("all_terminate_messages", 0l);
74 log("sent_terminate_messages", 0l);
75 log("last_terminate_sent", "");
76
77 // Create a binding ROUTER socket
78 m_socket = m_parent->createSocket<ZMQ_ROUTER>(outputAddress, true);
79}
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:75
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:77
std::set< std::string > m_allWorkers
All ever registered inputs.
bool m_lax
Parameter to enable lax mode.
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96

Member Function Documentation

◆ clear()

void clear ( )

Clear the counter for sent stop and terminate messages. Should be called on run start.

Definition at line 169 of file ZMQLoadBalancedConnection.cc.

170{
171 m_sentStopMessages = false;
173
174 log("all_stop_messages", static_cast<long>(m_sentStopMessages));
175 log("all_terminate_messages", static_cast<long>(m_sentTerminateMessages));
176}
bool m_sentTerminateMessages
Did we already sent a terminate message?
bool m_sentStopMessages
Did we already sent a stop message?

◆ decrement()

void decrement ( const std::string &  key)
inherited

Decrement the value with the given key (only numerical values). If not present, set to -1.

Definition at line 37 of file ZMQLogger.cc.

38{
39 std::visit(Decrementor{}, m_monitoring[key]);
40}
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
Internal storage of all stored values.
Definition: ZMQLogger.h:58

◆ getEndPoint()

std::string getEndPoint ( ) const
inherited

Return the connection string for this socket.

Definition at line 87 of file ZMQConnection.cc.

88{
89 std::string endpoint{""};
90 if (m_socket) {
91 endpoint = m_socket->get(zmq::sockopt::last_endpoint);
92 }
93 return endpoint;
94}

◆ getMonitoringJSON()

std::string getMonitoringJSON ( ) const
virtualinherited

Convert the stored monitoring values to a JSON string ready for sending out via a message.

Reimplemented in ZMQHistoServerToZMQOutput, ZMQHistoServerToRawOutput, ZMQROIOutput, and ZMQDataAndROIOutput.

Definition at line 15 of file ZMQLogger.cc.

16{
17 std::stringstream buffer;
18 buffer << "{";
19 bool first = true;
20 for (const auto& keyValue : m_monitoring) {
21 if (not first) {
22 buffer << ", ";
23 }
24 first = false;
25 buffer << "\"" << keyValue.first << "\": ";
26 buffer << std::visit(toJSON{}, keyValue.second);
27 }
28 buffer << "}" << std::endl;
29 return buffer.str();
30}

◆ getSockets()

std::vector< zmq::socket_t * > getSockets ( ) const
finalvirtualinherited

The socket used for polling is just the stored socket.

Implements ZMQConnection.

Definition at line 82 of file ZMQConnection.cc.

83{
84 return {m_socket.get()};
85}

◆ handleEvent()

void handleEvent ( std::unique_ptr< ZMQNoIdMessage message)

Send the given message (without identity) to the next input in the ready list.

If it is a stop or terminate message send the message to all inputs which have ever sent a ready message. If it is an event message, sent it only to the next ready input. If there is no ready input it either (a) throws and exception if lax mode is not enabled. Make sure to only sent events if the output is ready (which indicates exactly this: an input is ready). Or (b) discard the event without warning if lax mode is enabled.

Definition at line 81 of file ZMQLoadBalancedConnection.cc.

82{
83 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
84 // Tell all workers to stop this run
85 for (auto worker : m_allWorkers) {
86 auto sendMessage = ZMQMessageFactory::createMessage(worker, EMessageTypes::c_lastEventMessage);
87 ZMQParent::send(m_socket, std::move(sendMessage));
88 }
89 m_sentStopMessages = true;
90 log("all_stop_messages", static_cast<long>(m_sentStopMessages));
91 increment("sent_stop_messages");
92 logTime("last_stop_sent");
93 return;
94 } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
95 // Tell all workers to terminate
96 for (auto worker : m_allWorkers) {
97 auto sendMessage = ZMQMessageFactory::createMessage(worker, EMessageTypes::c_terminateMessage);
98 ZMQParent::send(m_socket, std::move(sendMessage));
99 }
101 log("all_terminate_messages", static_cast<long>(m_sentTerminateMessages));
102 increment("sent_terminate_messages");
103 logTime("last_terminate_sent");
104 return;
105 }
106
107 if (m_lax and m_readyWorkers.empty()) {
108 // There is no one that can handle the event in the moment, dismiss it (if lax is true)
109 increment("dismissed_events");
110 return;
111 }
112 if (m_sentStopMessages) {
113 B2ERROR("Received events after stop! I will dismiss this event.");
114 increment("dismissed_events");
115 return;
116 }
117
118 const auto dataSize = message->getDataMessage().size();
119
120 B2ASSERT("Must be > 0", not m_readyWorkers.empty());
121 auto nextWorker = m_readyWorkers.front();
122 m_readyWorkers.pop_front();
123
124 average("data_size", dataSize);
125 average("data_size_to[" + nextWorker + "]", dataSize);
126
127 increment("sent_events");
128 increment("sent_events[" + nextWorker + "]");
129
130 timeit("event_rate");
131 timeit<200>("event_rate_to[" + nextWorker + "]");
132
133 m_socket->send(ZMQMessageHelper::createZMQMessage(nextWorker), zmq::send_flags::sndmore);
134 ZMQParent::send(m_socket, std::move(message));
135
136 log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
137 decrement("ready_messages[" + nextWorker + "]");
138}
void decrement(const std::string &key)
Decrement the value with the given key (only numerical values). If not present, set to -1.
Definition: ZMQLogger.cc:37
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102

◆ handleIncomingData()

void handleIncomingData ( )

Block until a ready message from an input is received and add it to the ready queue.

Definition at line 140 of file ZMQLoadBalancedConnection.cc.

141{
142 auto readyMessage = ZMQMessageFactory::fromSocket<ZMQIdMessage>(m_socket);
143 B2ASSERT("Should be a ready message", readyMessage->isMessage(EMessageTypes::c_readyMessage));
144
145 // Register it as another ready worker
146 const auto toIdentity = readyMessage->getIdentity();
147 m_readyWorkers.push_back(toIdentity);
148
149 if (m_allWorkers.emplace(toIdentity).second) {
150 // Aha, we did never see this worker so far, so add it to our list.
151 if (m_sentStopMessages) {
152 // If it turned up late (everyone else has already stopped), send a stop message directly
153 auto sendMessage = ZMQMessageFactory::createMessage(toIdentity, EMessageTypes::c_lastEventMessage);
154 ZMQParent::send(m_socket, std::move(sendMessage));
155 }
156
158 // If it turned up late (everyone else has already terminates), send a terminate message directly
159 auto sendMessage = ZMQMessageFactory::createMessage(toIdentity, EMessageTypes::c_terminateMessage);
160 ZMQParent::send(m_socket, std::move(sendMessage));
161 }
162 }
163
164 log("ready_queue_size", static_cast<long>(m_readyWorkers.size()));
165 log("registered_workers", static_cast<long>(m_allWorkers.size()));
166 increment("ready_messages[" + toIdentity + "]");
167}

◆ hasMessage()

bool hasMessage ( const ZMQConnection connection)
staticinherited

Check if the given connection as an incoming message (right now, no waiting).

Definition at line 20 of file ZMQConnection.cc.

21{
22 // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
23 const auto emptyFunction = []() {};
24 return ZMQConnection::poll({{connection, emptyFunction}}, 0);
25}
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.

◆ increment()

void increment ( const std::string &  key)
inherited

Increment the value with the given key (only numerical values). If not present, set to 1.

Definition at line 32 of file ZMQLogger.cc.

33{
34 std::visit(Incrementor{}, m_monitoring[key]);
35}

◆ isReady()

bool isReady ( ) const
finalvirtual

If lax mode is disabled, the output is ready if at least a single input is ready. Else always.

Reimplemented from ZMQConnection.

Definition at line 178 of file ZMQLoadBalancedConnection.cc.

179{
180 // if we are lax, we are always ready. If not, we need to have at least a single ready worker. This prevents the B2ASSERT to fail.
181 return m_lax or not m_readyWorkers.empty();
182}

◆ logTime()

void logTime ( const std::string &  key)
inherited

Store the current time as a string under the given key.

Definition at line 42 of file ZMQLogger.cc.

43{
44 auto current = std::chrono::system_clock::now();
45 auto displayTime = std::chrono::system_clock::to_time_t(current);
46 log(key, std::ctime(&displayTime));
47}

◆ poll()

bool poll ( const std::map< const ZMQConnection *, ReactorFunction > &  connectionList,
int  timeout 
)
staticinherited

Poll on the given connections and call the attached function if a messages comes in.

If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.

Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!

Definition at line 27 of file ZMQConnection.cc.

28{
29 std::vector<const ReactorFunction*> socketMapping;
30 std::vector<zmq::pollitem_t> pollItems;
31
32 // zmq needs a special format for its polling, so create it here.
33 for (const auto& [connection, function] : connectionList) {
34 auto sockets = connection->getSockets();
35 for (zmq::socket_t* socket : sockets) {
36 zmq::pollitem_t pollItem;
37 pollItem.socket = static_cast<void*>(*socket);
38 pollItem.events = ZMQ_POLLIN;
39 pollItem.revents = 0;
40 pollItems.push_back(std::move(pollItem));
41
42 // but keep reference to the original function, so we can call the correct one later
43 socketMapping.push_back(&function);
44 }
45 }
46
47 if (pollItems.empty()) {
48 return false;
49 }
50
51 try {
52 zmq::poll(&pollItems[0], pollItems.size(), timeout);
53
54 bool anySocket = false;
55 unsigned int counter = 0;
56 for (const auto& pollItem : pollItems) {
57 if (pollItem.revents & ZMQ_POLLIN) {
58 anySocket = true;
59 const auto* functionPtr = socketMapping.at(counter);
60 const auto function = *functionPtr;
61 function();
62 }
63 counter++;
64 }
65
66 return anySocket;
67 } catch (zmq::error_t& error) {
68 if (error.num() == EINTR) {
69 // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
70 return false;
71 } else {
72 // cannot handle, rethrow exception
73 throw;
74 }
75 }
76}

Member Data Documentation

◆ m_allWorkers

std::set<std::string> m_allWorkers
protected

All ever registered inputs.

Definition at line 101 of file ZMQLoadBalancedConnection.h.

◆ m_averages

std::unordered_map<std::string, std::tuple<std::vector<double>, size_t> > m_averages
privateinherited

Internal storage of the previous values when calculating averages.

Definition at line 60 of file ZMQLogger.h.

◆ m_lax

bool m_lax = false
protected

Parameter to enable lax mode.

Definition at line 107 of file ZMQLoadBalancedConnection.h.

◆ m_monitoring

std::map<std::string, std::variant<long, double, std::string> > m_monitoring
privateinherited

Internal storage of all stored values.

Definition at line 58 of file ZMQLogger.h.

◆ m_parent

std::shared_ptr<ZMQParent> m_parent
protectedinherited

The shared ZMQParent instance.

Definition at line 75 of file ZMQConnection.h.

◆ m_readyWorkers

std::deque<std::string> m_readyWorkers
protected

List of identities of ready inputs in LIFO order.

Definition at line 99 of file ZMQLoadBalancedConnection.h.

◆ m_sentStopMessages

bool m_sentStopMessages = false
protected

Did we already sent a stop message?

Definition at line 103 of file ZMQLoadBalancedConnection.h.

◆ m_sentTerminateMessages

bool m_sentTerminateMessages = false
protected

Did we already sent a terminate message?

Definition at line 105 of file ZMQLoadBalancedConnection.h.

◆ m_socket

std::unique_ptr<zmq::socket_t> m_socket
protectedinherited

The memory of the socket. Needs to be initialized in a derived class.

Definition at line 77 of file ZMQConnection.h.

◆ m_timeCounters

std::unordered_map<std::string, std::tuple<unsigned long, std::chrono::system_clock::time_point> > m_timeCounters
privateinherited

Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.

Definition at line 62 of file ZMQLogger.h.


The documentation for this class was generated from the following files: