Belle II Software
release-08-01-10
|
Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket. More...
#include <ZMQRawConnection.h>
Public Types | |
using | ReactorFunction = std::function< void(void)> |
Typedef of a function which will be called if a connection has a message. | |
Public Member Functions | |
ZMQRawInput (const std::string &inputAddress, unsigned int maximalBufferSize, bool receiveEventMessages, const std::shared_ptr< ZMQParent > &parent) | |
Create a new raw input connection. The bind or connect behavior is chosen according to the given address. | |
std::vector< zmq::message_t > | handleIncomingData () |
Block until a TCP packet can be received from the socket. More... | |
void | clear () |
Reset the internal buffer and counter. | |
std::vector< zmq::socket_t * > | getSockets () const final |
The socket used for polling is just the stored socket. | |
std::string | getEndPoint () const |
Return the connection string for this socket. | |
virtual bool | isReady () const |
Return true of this connection is able to send messages right now. Can be overloaded in derived classes. | |
virtual std::string | getMonitoringJSON () const |
Convert the stored monitoring values to a JSON string ready for sending out via a message. | |
template<class AClass > | |
void | log (const std::string &key, const AClass &value) |
Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key. | |
void | increment (const std::string &key) |
Increment the value with the given key (only numerical values). If not present, set to 1. | |
void | decrement (const std::string &key) |
Decrement the value with the given key (only numerical values). If not present, set to -1. | |
template<size_t MAX_SIZE = 100> | |
void | average (const std::string &key, double value) |
Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values. | |
template<size_t AVERAGE_SIZE = 2000> | |
void | timeit (const std::string &key) |
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement) | |
void | logTime (const std::string &key) |
Store the current time as a string under the given key. | |
Static Public Member Functions | |
static bool | poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout) |
Poll on the given connections and call the attached function if a messages comes in. More... | |
static bool | hasMessage (const ZMQConnection *connection) |
Check if the given connection as an incoming message (right now, no waiting). | |
Protected Attributes | |
std::shared_ptr< ZMQParent > | m_parent |
The shared ZMQParent instance. | |
std::unique_ptr< zmq::socket_t > | m_socket |
The memory of the socket. Needs to be initialized in a derived class. | |
Private Attributes | |
unsigned int | m_maximalBufferSize |
Parameter for the maximal buffer size. If this size is reached, a FATAL will be issued. | |
std::vector< char > | m_buffer |
Internal storage for the buffer. | |
size_t | m_writeAddress = 0 |
Where in the buffer are we currently writing to. | |
unsigned int | m_currentSize = 0 |
How large should the full message be? The information is from the first int of the message. | |
bool | m_receiveEventMessages |
Parameter to receive event messages (see above) | |
std::string | m_inputIdentity = "" |
Internal storage of the connected socket to check if we get messages from multiple ones. | |
std::map< std::string, std::variant< long, double, std::string > > | m_monitoring |
Internal storage of all stored values. | |
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > | m_averages |
Internal storage of the previous values when calculating averages. | |
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > | m_timeCounters |
Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE. | |
Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket.
Can only speak with a single socket.
On receiving a TC packet, it is stored in an internal buffer. Only if the message is complete it is returned wrapped up as a zmq:message_t.
Two message formats are understood:
The two message formats are chosen to be compatible with the HLT implementation.
Definition at line 41 of file ZMQRawConnection.h.
std::vector< zmq::message_t > handleIncomingData | ( | ) |
Block until a TCP packet can be received from the socket.
Returns only full messages as zmq:message_t. Please note that this can be
Definition at line 46 of file ZMQRawConnection.cc.
|
staticinherited |
Poll on the given connections and call the attached function if a messages comes in.
If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.
Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!
Definition at line 27 of file ZMQConnection.cc.