Belle II Software development
|
Output part of a load-balanced connection. More...
#include <ZMQLoadBalancedConnection.h>
Public Types | |
using | ReactorFunction = std::function< void(void)> |
Typedef of a function which will be called if a connection has a message. | |
Public Member Functions | |
ZMQLoadBalancedOutput (const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent) | |
Create a new load-balanced output and bind to the given address. | |
void | handleEvent (std::unique_ptr< ZMQNoIdMessage > message) |
Send the given message (without identity) to the next input in the ready list. | |
void | handleIncomingData () |
Block until a ready message from an input is received and add it to the ready queue. | |
void | clear () |
Clear the counter for sent stop and terminate messages. Should be called on run start. | |
bool | isReady () const final |
If lax mode is disabled, the output is ready if at least a single input is ready. Else always. | |
std::vector< zmq::socket_t * > | getSockets () const final |
The socket used for polling is just the stored socket. | |
std::string | getEndPoint () const |
Return the connection string for this socket. | |
virtual std::string | getMonitoringJSON () const |
Convert the stored monitoring values to a JSON string ready for sending out via a message. | |
template<class AClass > | |
void | log (const std::string &key, const AClass &value) |
Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key. | |
void | increment (const std::string &key) |
Increment the value with the given key (only numerical values). If not present, set to 1. | |
void | decrement (const std::string &key) |
Decrement the value with the given key (only numerical values). If not present, set to -1. | |
template<size_t MAX_SIZE = 100> | |
void | average (const std::string &key, double value) |
Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values. | |
template<size_t AVERAGE_SIZE = 2000> | |
void | timeit (const std::string &key) |
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement) | |
void | logTime (const std::string &key) |
Store the current time as a string under the given key. | |
Static Public Member Functions | |
static bool | poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout) |
Poll on the given connections and call the attached function if a messages comes in. | |
static bool | hasMessage (const ZMQConnection *connection) |
Check if the given connection as an incoming message (right now, no waiting). | |
Protected Attributes | |
std::deque< std::string > | m_readyWorkers |
List of identities of ready inputs in LIFO order. | |
std::set< std::string > | m_allWorkers |
All ever registered inputs. | |
bool | m_sentStopMessages = false |
Did we already sent a stop message? | |
bool | m_sentTerminateMessages = false |
Did we already sent a terminate message? | |
bool | m_lax = false |
Parameter to enable lax mode. | |
std::shared_ptr< ZMQParent > | m_parent |
The shared ZMQParent instance. | |
std::unique_ptr< zmq::socket_t > | m_socket |
The memory of the socket. Needs to be initialized in a derived class. | |
Private Attributes | |
std::map< std::string, std::variant< long, double, std::string > > | m_monitoring |
Internal storage of all stored values. | |
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > | m_averages |
Internal storage of the previous values when calculating averages. | |
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > | m_timeCounters |
Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE. | |
Output part of a load-balanced connection.
Multiple inputs can connect to this output. The output keeps a list of all received ready messages and sends events on requests always to the next input in the list, which creates some sort of load-balancing.
Stop and terminate messages are sent all all inputs that have sent at least a single ready so far. There is no unregistration happening, so dead inputs will still be served with both events and stop/terminate messages. For stop/terminate messages this is no problem and event messages will only be sent as long as there are ready messages.
After a stop message is sent all additional incoming events will be dismissed.
If no input has send any ready message, the output is not ready and needs to be polled for new ready messages. However, there is also a "lax" mode which makes the events being silently discarded if no input is ready.
Internally a ZMQ_ROUTER in bind mode is used.
Definition at line 71 of file ZMQLoadBalancedConnection.h.
|
inherited |
Typedef of a function which will be called if a connection has a message.
Definition at line 34 of file ZMQConnection.h.
ZMQLoadBalancedOutput | ( | const std::string & | outputAddress, |
bool | lax, | ||
const std::shared_ptr< ZMQParent > & | parent | ||
) |
Create a new load-balanced output and bind to the given address.
Definition at line 56 of file ZMQLoadBalancedConnection.cc.
void clear | ( | ) |
Clear the counter for sent stop and terminate messages. Should be called on run start.
Definition at line 169 of file ZMQLoadBalancedConnection.cc.
|
inherited |
Decrement the value with the given key (only numerical values). If not present, set to -1.
Definition at line 37 of file ZMQLogger.cc.
|
inherited |
Return the connection string for this socket.
Definition at line 87 of file ZMQConnection.cc.
|
virtualinherited |
Convert the stored monitoring values to a JSON string ready for sending out via a message.
Reimplemented in ZMQHistoServerToZMQOutput, ZMQHistoServerToRawOutput, ZMQROIOutput, and ZMQDataAndROIOutput.
Definition at line 15 of file ZMQLogger.cc.
|
finalvirtualinherited |
The socket used for polling is just the stored socket.
Implements ZMQConnection.
Definition at line 82 of file ZMQConnection.cc.
void handleEvent | ( | std::unique_ptr< ZMQNoIdMessage > | message | ) |
Send the given message (without identity) to the next input in the ready list.
If it is a stop or terminate message send the message to all inputs which have ever sent a ready message. If it is an event message, sent it only to the next ready input. If there is no ready input it either (a) throws and exception if lax mode is not enabled. Make sure to only sent events if the output is ready (which indicates exactly this: an input is ready). Or (b) discard the event without warning if lax mode is enabled.
Definition at line 81 of file ZMQLoadBalancedConnection.cc.
void handleIncomingData | ( | ) |
Block until a ready message from an input is received and add it to the ready queue.
Definition at line 140 of file ZMQLoadBalancedConnection.cc.
|
staticinherited |
Check if the given connection as an incoming message (right now, no waiting).
Definition at line 20 of file ZMQConnection.cc.
|
inherited |
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition at line 32 of file ZMQLogger.cc.
|
finalvirtual |
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
Reimplemented from ZMQConnection.
Definition at line 178 of file ZMQLoadBalancedConnection.cc.
|
inherited |
Store the current time as a string under the given key.
Definition at line 42 of file ZMQLogger.cc.
|
staticinherited |
Poll on the given connections and call the attached function if a messages comes in.
If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.
Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!
Definition at line 27 of file ZMQConnection.cc.
|
protected |
All ever registered inputs.
Definition at line 101 of file ZMQLoadBalancedConnection.h.
|
privateinherited |
Internal storage of the previous values when calculating averages.
Definition at line 60 of file ZMQLogger.h.
|
protected |
Parameter to enable lax mode.
Definition at line 107 of file ZMQLoadBalancedConnection.h.
|
privateinherited |
Internal storage of all stored values.
Definition at line 58 of file ZMQLogger.h.
|
protectedinherited |
The shared ZMQParent instance.
Definition at line 75 of file ZMQConnection.h.
|
protected |
List of identities of ready inputs in LIFO order.
Definition at line 99 of file ZMQLoadBalancedConnection.h.
|
protected |
Did we already sent a stop message?
Definition at line 103 of file ZMQLoadBalancedConnection.h.
|
protected |
Did we already sent a terminate message?
Definition at line 105 of file ZMQLoadBalancedConnection.h.
|
protectedinherited |
The memory of the socket. Needs to be initialized in a derived class.
Definition at line 77 of file ZMQConnection.h.
|
privateinherited |
Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
Definition at line 62 of file ZMQLogger.h.