Belle II Software
release-08-01-10
|
Input part of a confirmed connection. More...
#include <ZMQConfirmedConnection.h>
Public Types | |
using | ReactorFunction = std::function< void(void)> |
Typedef of a function which will be called if a connection has a message. | |
Public Member Functions | |
ZMQConfirmedInput (const std::string &inputAddress, const std::shared_ptr< ZMQParent > &parent) | |
Create a new confirmed output by binding to the address. | |
std::unique_ptr< ZMQIdMessage > | handleIncomingData () |
Block until a message can be received from one of the inputs. More... | |
void | clear () |
Reset the counters for all received stop and terminate messages. Should be called on run start. | |
std::unique_ptr< ZMQIdMessage > | overwriteStopMessage () |
Manually overwrite the stop message counter and set it to have all stop messages received. | |
std::vector< zmq::socket_t * > | getSockets () const final |
The socket used for polling is just the stored socket. | |
std::string | getEndPoint () const |
Return the connection string for this socket. | |
virtual bool | isReady () const |
Return true of this connection is able to send messages right now. Can be overloaded in derived classes. | |
virtual std::string | getMonitoringJSON () const |
Convert the stored monitoring values to a JSON string ready for sending out via a message. | |
template<class AClass > | |
void | log (const std::string &key, const AClass &value) |
Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key. | |
void | increment (const std::string &key) |
Increment the value with the given key (only numerical values). If not present, set to 1. | |
void | decrement (const std::string &key) |
Decrement the value with the given key (only numerical values). If not present, set to -1. | |
template<size_t MAX_SIZE = 100> | |
void | average (const std::string &key, double value) |
Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values. | |
template<size_t AVERAGE_SIZE = 2000> | |
void | timeit (const std::string &key) |
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement) | |
void | logTime (const std::string &key) |
Store the current time as a string under the given key. | |
Static Public Member Functions | |
static bool | poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout) |
Poll on the given connections and call the attached function if a messages comes in. More... | |
static bool | hasMessage (const ZMQConnection *connection) |
Check if the given connection as an incoming message (right now, no waiting). | |
Protected Attributes | |
std::shared_ptr< ZMQParent > | m_parent |
The shared ZMQParent instance. | |
std::unique_ptr< zmq::socket_t > | m_socket |
The memory of the socket. Needs to be initialized in a derived class. | |
Private Attributes | |
std::set< std::string > | m_receivedStopMessages |
The set of input identities which have already sent a stop message. | |
bool | m_allStopMessages = false |
Have we received all stop messages? | |
std::set< std::string > | m_receivedTerminateMessages |
The set of input identities which have already sent a terminate message. | |
bool | m_allTerminateMessages = false |
Have we received all terminante messages? | |
std::set< std::string > | m_registeredWorkersInput |
The set of all registered inputs. | |
std::map< std::string, std::variant< long, double, std::string > > | m_monitoring |
Internal storage of all stored values. | |
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > | m_averages |
Internal storage of the previous values when calculating averages. | |
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > | m_timeCounters |
Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE. | |
Input part of a confirmed connection.
In a confirmed connection every message sent by the input to the output is confirmed by a confirmation message by the output. The output can handle multiple inputs, each input can only be connected to a single output.
The input part keeps track of the registered workers and if it has received all stop and/or terminate messages. Stop/Terminate messages are only passed on if received from all registered workers.
If the input receives a message from one of the outputs, it sends back a confirmation message. Depending on the message type, it does several things:
Internally, a ZMQ_ROUTER is used in bind-mode. The order of starting output and input does not play any role.
Definition at line 55 of file ZMQConfirmedConnection.h.
std::unique_ptr< ZMQIdMessage > handleIncomingData | ( | ) |
Block until a message can be received from one of the inputs.
React as described in the general description of this class. Only if a message is to be passed on actually returns a message (in all other cases a nullptr).
Definition at line 48 of file ZMQConfirmedConnection.cc.
|
staticinherited |
Poll on the given connections and call the attached function if a messages comes in.
If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.
Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!
Definition at line 27 of file ZMQConnection.cc.