Belle II Software development
ZMQConfirmedOutput Class Reference

Output part of a confirmed connection. More...

#include <ZMQConfirmedConnection.h>

Inheritance diagram for ZMQConfirmedOutput:
ZMQConnectionOverSocket ZMQConnection ZMQLogger

Public Types

using ReactorFunction = std::function< void(void)>
 Typedef of a function which will be called if a connection has a message.
 

Public Member Functions

 ZMQConfirmedOutput (const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
 Create a new confirmed output by connecting to the address.
 
void handleEvent (std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
 Send the message to the output (a message without a ID as there is only a single output).
 
void handleIncomingData ()
 Blocks until it can receive the (hopefully confirmation) message from the output.
 
std::vector< zmq::socket_t * > getSockets () const final
 The socket used for polling is just the stored socket.
 
std::string getEndPoint () const
 Return the connection string for this socket.
 
virtual bool isReady () const
 Return true of this connection is able to send messages right now. Can be overloaded in derived classes.
 
virtual std::string getMonitoringJSON () const
 Convert the stored monitoring values to a JSON string ready for sending out via a message.
 
template<class AClass >
void log (const std::string &key, const AClass &value)
 Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key.
 
void increment (const std::string &key)
 Increment the value with the given key (only numerical values). If not present, set to 1.
 
void decrement (const std::string &key)
 Decrement the value with the given key (only numerical values). If not present, set to -1.
 
template<size_t MAX_SIZE = 100>
void average (const std::string &key, double value)
 Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values.
 
template<size_t AVERAGE_SIZE = 2000>
void timeit (const std::string &key)
 Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement)
 
void logTime (const std::string &key)
 Store the current time as a string under the given key.
 

Static Public Member Functions

static bool poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
 Poll on the given connections and call the attached function if a messages comes in.
 
static bool hasMessage (const ZMQConnection *connection)
 Check if the given connection as an incoming message (right now, no waiting).
 

Protected Attributes

std::shared_ptr< ZMQParentm_parent
 The shared ZMQParent instance.
 
std::unique_ptr< zmq::socket_t > m_socket
 The memory of the socket. Needs to be initialized in a derived class.
 

Private Attributes

unsigned int m_waitingForConfirmation = 0
 On how many confirmation messages are we still waiting?
 
long m_timespanWaitingForConfirmation = 0
 Internal monitoring how long we were waiting for confirmation messages.
 
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
 Internal storage of all stored values.
 
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > m_averages
 Internal storage of the previous values when calculating averages.
 
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > m_timeCounters
 Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
 

Detailed Description

Output part of a confirmed connection.

For an introduction to confirmed connections please see the ZMQConfirmedInput.

The confirmed output basically just sends out messages and waits for confirmations. Two additions to that:

  • on startup, it sends a single hello message to the output to register itself. This message also needs to be confirmed.
  • Instead of waiting for a confirmation after sending the message, the blocking happens before sending the next message. This gives the output some time for reacting without slowing down the output part.

If no confirmation is received on time, either a FATAL is issued or (configurable) only a warning.

Internally, a ZMQ_DEALER socket is used in non-bind mode. The order of starting output and input does not play any role.

Definition at line 110 of file ZMQConfirmedConnection.h.

Member Typedef Documentation

◆ ReactorFunction

using ReactorFunction = std::function<void(void)>
inherited

Typedef of a function which will be called if a connection has a message.

Definition at line 34 of file ZMQConnection.h.

Constructor & Destructor Documentation

◆ ZMQConfirmedOutput()

ZMQConfirmedOutput ( const std::string &  outputAddress,
const std::shared_ptr< ZMQParent > &  parent 
)

Create a new confirmed output by connecting to the address.

Definition at line 217 of file ZMQConfirmedConnection.cc.

219{
220 // These are all the log output we will have, set to 0 in the beginning.
221 log("no_confirmation_message", 0l);
222 log("last_sent_event_message", 0l);
223 log("data_size", 0.0);
224 log("sent_events", 0l);
225 log("event_rate", 0.0);
226 log("timespan_waiting_for_confirmation", 0l);
227
228 // Register a non-binding DEALER socket
229 m_socket = m_parent->createSocket<ZMQ_DEALER>(outputAddress, false);
230
231 logTime("last_hello_sent");
232
233 // Say hello to the receiver end of the connection (which is a confirmed input)
234 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage);
235 handleEvent(std::move(message));
236}
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:75
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:77
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96

Member Function Documentation

◆ decrement()

void decrement ( const std::string &  key)
inherited

Decrement the value with the given key (only numerical values). If not present, set to -1.

Definition at line 37 of file ZMQLogger.cc.

38{
39 std::visit(Decrementor{}, m_monitoring[key]);
40}
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
Internal storage of all stored values.
Definition: ZMQLogger.h:58

◆ getEndPoint()

std::string getEndPoint ( ) const
inherited

Return the connection string for this socket.

Definition at line 87 of file ZMQConnection.cc.

88{
89 std::string endpoint{""};
90 if (m_socket) {
91 endpoint = m_socket->get(zmq::sockopt::last_endpoint);
92 }
93 return endpoint;
94}

◆ getMonitoringJSON()

std::string getMonitoringJSON ( ) const
virtualinherited

Convert the stored monitoring values to a JSON string ready for sending out via a message.

Reimplemented in ZMQHistoServerToZMQOutput, ZMQHistoServerToRawOutput, ZMQROIOutput, and ZMQDataAndROIOutput.

Definition at line 15 of file ZMQLogger.cc.

16{
17 std::stringstream buffer;
18 buffer << "{";
19 bool first = true;
20 for (const auto& keyValue : m_monitoring) {
21 if (not first) {
22 buffer << ", ";
23 }
24 first = false;
25 buffer << "\"" << keyValue.first << "\": ";
26 buffer << std::visit(toJSON{}, keyValue.second);
27 }
28 buffer << "}" << std::endl;
29 return buffer.str();
30}

◆ getSockets()

std::vector< zmq::socket_t * > getSockets ( ) const
finalvirtualinherited

The socket used for polling is just the stored socket.

Implements ZMQConnection.

Definition at line 82 of file ZMQConnection.cc.

83{
84 return {m_socket.get()};
85}

◆ handleEvent()

void handleEvent ( std::unique_ptr< ZMQNoIdMessage message,
bool  requireConfirmation = true,
int  maximalWaitTime = 10000 
)

Send the message to the output (a message without a ID as there is only a single output).

If requireConfirmation is set to true and no confirmation (from the last sent event!) is received before sending the new event, a B2FATAL is issued (in other cases only a warning). Set the maximalWaitTime to -1 to always wait infinitely (not preferred).

Definition at line 238 of file ZMQConfirmedConnection.cc.

239{
240 auto current = std::chrono::system_clock::now();
241 while (m_waitingForConfirmation > 0) {
242 // Wait for all conformation messages that are still pending.
243 if (ZMQParent::poll({m_socket.get()}, maximalWaitTime)) {
245 } else if (requireConfirmation) {
246 B2FATAL("Did not receive a confirmation message in time!");
247 } else {
248 B2WARNING("Did not receive a confirmation message in time!");
249 increment("no_confirmation_message");
250 // If we did not receive one, we will also not receive the next one so lets break out.
251 break;
252 }
253 }
254 auto afterWaiting = std::chrono::system_clock::now();
255 m_timespanWaitingForConfirmation += std::chrono::duration_cast<std::chrono::milliseconds>(afterWaiting - current).count();
256 log("timespan_waiting_for_confirmation", m_timespanWaitingForConfirmation);
257
258 // We have received a confirmation for the old, so we can also sent a new message.
259
260 // TODO: makes no sense for signal messages!
261 const auto dataSize = message->getDataMessage().size();
262
263 average("data_size", dataSize);
264 increment("sent_events");
265 timeit("event_rate");
266
267 logTime("last_sent_event_message");
268
269 ZMQParent::send(m_socket, std::move(message));
271}
long m_timespanWaitingForConfirmation
Internal monitoring how long we were waiting for confirmation messages.
void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.
unsigned int m_waitingForConfirmation
On how many confirmation messages are we still waiting?
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102

◆ handleIncomingData()

void handleIncomingData ( )

Blocks until it can receive the (hopefully confirmation) message from the output.

Can be called even before sending an event to receive the confirmation message ahead of time (e.g. when there is nothing else to do).

Definition at line 273 of file ZMQConfirmedConnection.cc.

274{
275 // This should only ever be a confirmation message
276 B2ASSERT("There should be no data coming here, if we have already a confirmation!", m_waitingForConfirmation > 0);
277 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(m_socket);
278 B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
280}

◆ hasMessage()

bool hasMessage ( const ZMQConnection connection)
staticinherited

Check if the given connection as an incoming message (right now, no waiting).

Definition at line 20 of file ZMQConnection.cc.

21{
22 // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
23 const auto emptyFunction = []() {};
24 return ZMQConnection::poll({{connection, emptyFunction}}, 0);
25}
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.

◆ increment()

void increment ( const std::string &  key)
inherited

Increment the value with the given key (only numerical values). If not present, set to 1.

Definition at line 32 of file ZMQLogger.cc.

33{
34 std::visit(Incrementor{}, m_monitoring[key]);
35}

◆ isReady()

bool isReady ( ) const
virtualinherited

Return true of this connection is able to send messages right now. Can be overloaded in derived classes.

Reimplemented in ZMQROIOutput, ZMQDataAndROIOutput, ZMQLoadBalancedOutput, ZMQRawOutput, ZMQHistoServerToZMQOutput, and ZMQHistoServerToRawOutput.

Definition at line 15 of file ZMQConnection.cc.

16{
17 return true;
18}

◆ logTime()

void logTime ( const std::string &  key)
inherited

Store the current time as a string under the given key.

Definition at line 42 of file ZMQLogger.cc.

43{
44 auto current = std::chrono::system_clock::now();
45 auto displayTime = std::chrono::system_clock::to_time_t(current);
46 log(key, std::ctime(&displayTime));
47}

◆ poll()

bool poll ( const std::map< const ZMQConnection *, ReactorFunction > &  connectionList,
int  timeout 
)
staticinherited

Poll on the given connections and call the attached function if a messages comes in.

If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.

Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!

Definition at line 27 of file ZMQConnection.cc.

28{
29 std::vector<const ReactorFunction*> socketMapping;
30 std::vector<zmq::pollitem_t> pollItems;
31
32 // zmq needs a special format for its polling, so create it here.
33 for (const auto& [connection, function] : connectionList) {
34 auto sockets = connection->getSockets();
35 for (zmq::socket_t* socket : sockets) {
36 zmq::pollitem_t pollItem;
37 pollItem.socket = static_cast<void*>(*socket);
38 pollItem.events = ZMQ_POLLIN;
39 pollItem.revents = 0;
40 pollItems.push_back(std::move(pollItem));
41
42 // but keep reference to the original function, so we can call the correct one later
43 socketMapping.push_back(&function);
44 }
45 }
46
47 if (pollItems.empty()) {
48 return false;
49 }
50
51 try {
52 zmq::poll(&pollItems[0], pollItems.size(), timeout);
53
54 bool anySocket = false;
55 unsigned int counter = 0;
56 for (const auto& pollItem : pollItems) {
57 if (pollItem.revents & ZMQ_POLLIN) {
58 anySocket = true;
59 const auto* functionPtr = socketMapping.at(counter);
60 const auto function = *functionPtr;
61 function();
62 }
63 counter++;
64 }
65
66 return anySocket;
67 } catch (zmq::error_t& error) {
68 if (error.num() == EINTR) {
69 // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
70 return false;
71 } else {
72 // cannot handle, rethrow exception
73 throw;
74 }
75 }
76}

Member Data Documentation

◆ m_averages

std::unordered_map<std::string, std::tuple<std::vector<double>, size_t> > m_averages
privateinherited

Internal storage of the previous values when calculating averages.

Definition at line 60 of file ZMQLogger.h.

◆ m_monitoring

std::map<std::string, std::variant<long, double, std::string> > m_monitoring
privateinherited

Internal storage of all stored values.

Definition at line 58 of file ZMQLogger.h.

◆ m_parent

std::shared_ptr<ZMQParent> m_parent
protectedinherited

The shared ZMQParent instance.

Definition at line 75 of file ZMQConnection.h.

◆ m_socket

std::unique_ptr<zmq::socket_t> m_socket
protectedinherited

The memory of the socket. Needs to be initialized in a derived class.

Definition at line 77 of file ZMQConnection.h.

◆ m_timeCounters

std::unordered_map<std::string, std::tuple<unsigned long, std::chrono::system_clock::time_point> > m_timeCounters
privateinherited

Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.

Definition at line 62 of file ZMQLogger.h.

◆ m_timespanWaitingForConfirmation

long m_timespanWaitingForConfirmation = 0
private

Internal monitoring how long we were waiting for confirmation messages.

Definition at line 134 of file ZMQConfirmedConnection.h.

◆ m_waitingForConfirmation

unsigned int m_waitingForConfirmation = 0
private

On how many confirmation messages are we still waiting?

Definition at line 132 of file ZMQConfirmedConnection.h.


The documentation for this class was generated from the following files: