Belle II Software development
|
Input part of a confirmed connection. More...
#include <ZMQConfirmedConnection.h>
Public Types | |
using | ReactorFunction = std::function< void(void)> |
Typedef of a function which will be called if a connection has a message. | |
Public Member Functions | |
ZMQConfirmedInput (const std::string &inputAddress, const std::shared_ptr< ZMQParent > &parent) | |
Create a new confirmed output by binding to the address. | |
std::unique_ptr< ZMQIdMessage > | handleIncomingData () |
Block until a message can be received from one of the inputs. | |
void | clear () |
Reset the counters for all received stop and terminate messages. Should be called on run start. | |
std::unique_ptr< ZMQIdMessage > | overwriteStopMessage () |
Manually overwrite the stop message counter and set it to have all stop messages received. | |
std::vector< zmq::socket_t * > | getSockets () const final |
The socket used for polling is just the stored socket. | |
std::string | getEndPoint () const |
Return the connection string for this socket. | |
virtual bool | isReady () const |
Return true of this connection is able to send messages right now. Can be overloaded in derived classes. | |
virtual std::string | getMonitoringJSON () const |
Convert the stored monitoring values to a JSON string ready for sending out via a message. | |
template<class AClass > | |
void | log (const std::string &key, const AClass &value) |
Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key. | |
void | increment (const std::string &key) |
Increment the value with the given key (only numerical values). If not present, set to 1. | |
void | decrement (const std::string &key) |
Decrement the value with the given key (only numerical values). If not present, set to -1. | |
template<size_t MAX_SIZE = 100> | |
void | average (const std::string &key, double value) |
Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values. | |
template<size_t AVERAGE_SIZE = 2000> | |
void | timeit (const std::string &key) |
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement) | |
void | logTime (const std::string &key) |
Store the current time as a string under the given key. | |
Static Public Member Functions | |
static bool | poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout) |
Poll on the given connections and call the attached function if a messages comes in. | |
static bool | hasMessage (const ZMQConnection *connection) |
Check if the given connection as an incoming message (right now, no waiting). | |
Protected Attributes | |
std::shared_ptr< ZMQParent > | m_parent |
The shared ZMQParent instance. | |
std::unique_ptr< zmq::socket_t > | m_socket |
The memory of the socket. Needs to be initialized in a derived class. | |
Private Attributes | |
std::set< std::string > | m_receivedStopMessages |
The set of input identities which have already sent a stop message. | |
bool | m_allStopMessages = false |
Have we received all stop messages? | |
std::set< std::string > | m_receivedTerminateMessages |
The set of input identities which have already sent a terminate message. | |
bool | m_allTerminateMessages = false |
Have we received all terminante messages? | |
std::set< std::string > | m_registeredWorkersInput |
The set of all registered inputs. | |
bool | m_eventAfterAllStopMessages = false |
A flag to check the events appear after the all stop messages. | |
std::chrono::time_point< std::chrono::system_clock > | m_whenEventAfterAllStopMessages |
A time when the eventAfterAllStopMessages is issued. | |
std::map< std::string, std::variant< long, double, std::string > > | m_monitoring |
Internal storage of all stored values. | |
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > | m_averages |
Internal storage of the previous values when calculating averages. | |
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > | m_timeCounters |
Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE. | |
Input part of a confirmed connection.
In a confirmed connection every message sent by the input to the output is confirmed by a confirmation message by the output. The output can handle multiple inputs, each input can only be connected to a single output.
The input part keeps track of the registered workers and if it has received all stop and/or terminate messages. Stop/Terminate messages are only passed on if received from all registered workers.
If the input receives a message from one of the outputs, it sends back a confirmation message. Depending on the message type, it does several things:
Internally, a ZMQ_ROUTER is used in bind-mode. The order of starting output and input does not play any role.
Definition at line 55 of file ZMQConfirmedConnection.h.
|
inherited |
Typedef of a function which will be called if a connection has a message.
Definition at line 34 of file ZMQConnection.h.
ZMQConfirmedInput | ( | const std::string & | inputAddress, |
const std::shared_ptr< ZMQParent > & | parent | ||
) |
Create a new confirmed output by binding to the address.
Definition at line 17 of file ZMQConfirmedConnection.cc.
void clear | ( | ) |
Reset the counters for all received stop and terminate messages. Should be called on run start.
Definition at line 180 of file ZMQConfirmedConnection.cc.
|
inherited |
Decrement the value with the given key (only numerical values). If not present, set to -1.
Definition at line 37 of file ZMQLogger.cc.
|
inherited |
Return the connection string for this socket.
Definition at line 87 of file ZMQConnection.cc.
|
virtualinherited |
Convert the stored monitoring values to a JSON string ready for sending out via a message.
Reimplemented in ZMQHistoServerToZMQOutput, ZMQHistoServerToRawOutput, ZMQROIOutput, and ZMQDataAndROIOutput.
Definition at line 15 of file ZMQLogger.cc.
|
finalvirtualinherited |
The socket used for polling is just the stored socket.
Implements ZMQConnection.
Definition at line 82 of file ZMQConnection.cc.
std::unique_ptr< ZMQIdMessage > handleIncomingData | ( | ) |
Block until a message can be received from one of the inputs.
React as described in the general description of this class. Only if a message is to be passed on actually returns a message (in all other cases a nullptr).
Definition at line 48 of file ZMQConfirmedConnection.cc.
|
staticinherited |
Check if the given connection as an incoming message (right now, no waiting).
Definition at line 20 of file ZMQConnection.cc.
|
inherited |
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition at line 32 of file ZMQLogger.cc.
|
virtualinherited |
Return true of this connection is able to send messages right now. Can be overloaded in derived classes.
Reimplemented in ZMQROIOutput, ZMQDataAndROIOutput, ZMQLoadBalancedOutput, ZMQRawOutput, ZMQHistoServerToZMQOutput, and ZMQHistoServerToRawOutput.
Definition at line 15 of file ZMQConnection.cc.
|
inherited |
Store the current time as a string under the given key.
Definition at line 42 of file ZMQLogger.cc.
std::unique_ptr< ZMQIdMessage > overwriteStopMessage | ( | ) |
Manually overwrite the stop message counter and set it to have all stop messages received.
Definition at line 197 of file ZMQConfirmedConnection.cc.
|
staticinherited |
Poll on the given connections and call the attached function if a messages comes in.
If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.
Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!
Definition at line 27 of file ZMQConnection.cc.
|
private |
Have we received all stop messages?
Definition at line 78 of file ZMQConfirmedConnection.h.
|
private |
Have we received all terminante messages?
Definition at line 82 of file ZMQConfirmedConnection.h.
|
privateinherited |
Internal storage of the previous values when calculating averages.
Definition at line 60 of file ZMQLogger.h.
|
private |
A flag to check the events appear after the all stop messages.
Definition at line 86 of file ZMQConfirmedConnection.h.
|
privateinherited |
Internal storage of all stored values.
Definition at line 58 of file ZMQLogger.h.
|
protectedinherited |
The shared ZMQParent instance.
Definition at line 75 of file ZMQConnection.h.
|
private |
The set of input identities which have already sent a stop message.
Definition at line 76 of file ZMQConfirmedConnection.h.
|
private |
The set of input identities which have already sent a terminate message.
Definition at line 80 of file ZMQConfirmedConnection.h.
|
private |
The set of all registered inputs.
Definition at line 84 of file ZMQConfirmedConnection.h.
|
protectedinherited |
The memory of the socket. Needs to be initialized in a derived class.
Definition at line 77 of file ZMQConnection.h.
|
privateinherited |
Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
Definition at line 62 of file ZMQLogger.h.
|
private |
A time when the eventAfterAllStopMessages is issued.
Definition at line 88 of file ZMQConfirmedConnection.h.