Belle II Software development
ZMQRawOutput Class Reference

Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket. More...

#include <ZMQRawConnection.h>

Inheritance diagram for ZMQRawOutput:
ZMQConnectionOverSocket ZMQConnection ZMQLogger

Public Types

using ReactorFunction = std::function< void(void)>
 Typedef of a function which will be called if a connection has a message.
 

Public Member Functions

 ZMQRawOutput (const std::string &outputAddress, bool addEventSize, const std::shared_ptr< ZMQParent > &parent)
 Create a new raw output connection. The bind or connect behavior is chosen according to the given address.
 
virtual void handleEvent (zmq::message_t message)
 Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
 
void handleIncomingData ()
 Handle incoming data: a socket (dis)connect.
 
bool isReady () const final
 If no socket is connected, this connection is not ready.
 
std::vector< zmq::socket_t * > getSockets () const final
 The socket used for polling is just the stored socket.
 
std::string getEndPoint () const
 Return the connection string for this socket.
 
virtual std::string getMonitoringJSON () const
 Convert the stored monitoring values to a JSON string ready for sending out via a message.
 
template<class AClass >
void log (const std::string &key, const AClass &value)
 Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key.
 
void increment (const std::string &key)
 Increment the value with the given key (only numerical values). If not present, set to 1.
 
void decrement (const std::string &key)
 Decrement the value with the given key (only numerical values). If not present, set to -1.
 
template<size_t MAX_SIZE = 100>
void average (const std::string &key, double value)
 Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values.
 
template<size_t AVERAGE_SIZE = 2000>
void timeit (const std::string &key)
 Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement)
 
void logTime (const std::string &key)
 Store the current time as a string under the given key.
 

Static Public Member Functions

static bool poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
 Poll on the given connections and call the attached function if a messages comes in.
 
static bool hasMessage (const ZMQConnection *connection)
 Check if the given connection as an incoming message (right now, no waiting).
 

Protected Attributes

std::shared_ptr< ZMQParentm_parent
 The shared ZMQParent instance.
 
std::unique_ptr< zmq::socket_t > m_socket
 The memory of the socket. Needs to be initialized in a derived class.
 

Private Attributes

std::string m_dataIdentity = ""
 Internal storage of the connected identity to no have multiple connections.
 
bool m_addEventSize = false
 Parameter to add the event size to a message.
 
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
 Internal storage of all stored values.
 
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > m_averages
 Internal storage of the previous values when calculating averages.
 
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > m_timeCounters
 Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
 

Detailed Description

Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.

If a socket is connected, sends out the given message over this socket as it is. If addEventSize is true, will prefix the message with the htonl-converted size of the message in bytes. Compare this to the receiveEventMessages setting of the ZMQRawInput connection. If no socket is connected, this connection is not ready.

Please note that although it is an output connection, there will still be "incoming messages", namely whenever the socket connects or disconnects. So you need to poll on this connection. Can only send messages to a single connected socket.

Definition at line 87 of file ZMQRawConnection.h.

Member Typedef Documentation

◆ ReactorFunction

using ReactorFunction = std::function<void(void)>
inherited

Typedef of a function which will be called if a connection has a message.

Definition at line 34 of file ZMQConnection.h.

Constructor & Destructor Documentation

◆ ZMQRawOutput()

ZMQRawOutput ( const std::string &  outputAddress,
bool  addEventSize,
const std::shared_ptr< ZMQParent > &  parent 
)

Create a new raw output connection. The bind or connect behavior is chosen according to the given address.

Definition at line 144 of file ZMQRawConnection.cc.

146 parent), m_addEventSize(addEventSize)
147{
148 // We clear all our internal state and counters
149 log("data_size", 0.0);
150 log("sent_events", 0l);
151 log("event_rate", 0.0);
152
153 log("socket_state", "disconnected");
154 log("socket_connects", 0l);
155 log("socket_disconnects", 0l);
156
157 // STREAM is the ZMQ type for raw, non ZMQ connections
158 m_socket = m_parent->createSocket<ZMQ_STREAM>(outputAddress);
159}
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:75
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:77
bool m_addEventSize
Parameter to add the event size to a message.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96

Member Function Documentation

◆ decrement()

void decrement ( const std::string &  key)
inherited

Decrement the value with the given key (only numerical values). If not present, set to -1.

Definition at line 37 of file ZMQLogger.cc.

38{
39 std::visit(Decrementor{}, m_monitoring[key]);
40}
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
Internal storage of all stored values.
Definition: ZMQLogger.h:58

◆ getEndPoint()

std::string getEndPoint ( ) const
inherited

Return the connection string for this socket.

Definition at line 87 of file ZMQConnection.cc.

88{
89 std::string endpoint{""};
90 if (m_socket) {
91 endpoint = m_socket->get(zmq::sockopt::last_endpoint);
92 }
93 return endpoint;
94}

◆ getMonitoringJSON()

std::string getMonitoringJSON ( ) const
virtualinherited

Convert the stored monitoring values to a JSON string ready for sending out via a message.

Reimplemented in ZMQHistoServerToZMQOutput, ZMQHistoServerToRawOutput, ZMQROIOutput, and ZMQDataAndROIOutput.

Definition at line 15 of file ZMQLogger.cc.

16{
17 std::stringstream buffer;
18 buffer << "{";
19 bool first = true;
20 for (const auto& keyValue : m_monitoring) {
21 if (not first) {
22 buffer << ", ";
23 }
24 first = false;
25 buffer << "\"" << keyValue.first << "\": ";
26 buffer << std::visit(toJSON{}, keyValue.second);
27 }
28 buffer << "}" << std::endl;
29 return buffer.str();
30}

◆ getSockets()

std::vector< zmq::socket_t * > getSockets ( ) const
finalvirtualinherited

The socket used for polling is just the stored socket.

Implements ZMQConnection.

Definition at line 82 of file ZMQConnection.cc.

83{
84 return {m_socket.get()};
85}

◆ handleEvent()

void handleEvent ( zmq::message_t  message)
virtual

Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.

Definition at line 161 of file ZMQRawConnection.cc.

162{
163 // Send the message. If requested, add the message size in front of the message
164 B2ASSERT("Data Socket needs to be connected", not m_dataIdentity.empty());
165
166 const auto dataSize = message.size();
167
168 average("data_size", dataSize);
169 increment("sent_events");
170 timeit("event_rate");
171
172 m_socket->send(ZMQMessageHelper::createZMQMessage(m_dataIdentity), zmq::send_flags::sndmore);
173 if (not m_addEventSize) {
174 m_socket->send(std::move(message), zmq::send_flags::none);
175 } else {
176 zmq::message_t tmpMessage(message.size() + sizeof(int));
177 const int messageSize = htonl(message.size());
178 memcpy(tmpMessage.data<char>(), &messageSize, sizeof(int));
179 memcpy(tmpMessage.data<char>() + sizeof(int), message.data(), message.size());
180 m_socket->send(std::move(tmpMessage), zmq::send_flags::none);
181 }
182}
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
std::string m_dataIdentity
Internal storage of the connected identity to no have multiple connections.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102

◆ handleIncomingData()

void handleIncomingData ( )

Handle incoming data: a socket (dis)connect.

Definition at line 184 of file ZMQRawConnection.cc.

185{
186 // The only possibility that we can receive a message is when the client connects or disconnects
187 zmq::message_t identity;
188 auto received = m_socket->recv(identity, zmq::recv_flags::none);
189 B2ASSERT("No message received", received);
190 zmq::message_t nullMessage;
191 received = m_socket->recv(nullMessage, zmq::recv_flags::none);
192 B2ASSERT("No message received", received);
193 std::string identityString(identity.data<char>(), identity.size());
194
195 if (m_dataIdentity.empty()) {
196 m_dataIdentity = identityString;
197 log("socket_state", "connected");
198 increment("socket_connects");
199 } else {
200 B2ASSERT("The app can only handle a single connection!", m_dataIdentity == identityString);
201 m_dataIdentity = "";
202 log("socket_state", "disconnected");
203 increment("socket_disconnects");
204 }
205}

◆ hasMessage()

bool hasMessage ( const ZMQConnection connection)
staticinherited

Check if the given connection as an incoming message (right now, no waiting).

Definition at line 20 of file ZMQConnection.cc.

21{
22 // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
23 const auto emptyFunction = []() {};
24 return ZMQConnection::poll({{connection, emptyFunction}}, 0);
25}
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.

◆ increment()

void increment ( const std::string &  key)
inherited

Increment the value with the given key (only numerical values). If not present, set to 1.

Definition at line 32 of file ZMQLogger.cc.

33{
34 std::visit(Incrementor{}, m_monitoring[key]);
35}

◆ isReady()

bool isReady ( ) const
finalvirtual

If no socket is connected, this connection is not ready.

Reimplemented from ZMQConnection.

Definition at line 207 of file ZMQRawConnection.cc.

208{
209 // Only ready of the client connected
210 return not m_dataIdentity.empty();
211}

◆ logTime()

void logTime ( const std::string &  key)
inherited

Store the current time as a string under the given key.

Definition at line 42 of file ZMQLogger.cc.

43{
44 auto current = std::chrono::system_clock::now();
45 auto displayTime = std::chrono::system_clock::to_time_t(current);
46 log(key, std::ctime(&displayTime));
47}

◆ poll()

bool poll ( const std::map< const ZMQConnection *, ReactorFunction > &  connectionList,
int  timeout 
)
staticinherited

Poll on the given connections and call the attached function if a messages comes in.

If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.

Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!

Definition at line 27 of file ZMQConnection.cc.

28{
29 std::vector<const ReactorFunction*> socketMapping;
30 std::vector<zmq::pollitem_t> pollItems;
31
32 // zmq needs a special format for its polling, so create it here.
33 for (const auto& [connection, function] : connectionList) {
34 auto sockets = connection->getSockets();
35 for (zmq::socket_t* socket : sockets) {
36 zmq::pollitem_t pollItem;
37 pollItem.socket = static_cast<void*>(*socket);
38 pollItem.events = ZMQ_POLLIN;
39 pollItem.revents = 0;
40 pollItems.push_back(std::move(pollItem));
41
42 // but keep reference to the original function, so we can call the correct one later
43 socketMapping.push_back(&function);
44 }
45 }
46
47 if (pollItems.empty()) {
48 return false;
49 }
50
51 try {
52 zmq::poll(&pollItems[0], pollItems.size(), timeout);
53
54 bool anySocket = false;
55 unsigned int counter = 0;
56 for (const auto& pollItem : pollItems) {
57 if (pollItem.revents & ZMQ_POLLIN) {
58 anySocket = true;
59 const auto* functionPtr = socketMapping.at(counter);
60 const auto function = *functionPtr;
61 function();
62 }
63 counter++;
64 }
65
66 return anySocket;
67 } catch (zmq::error_t& error) {
68 if (error.num() == EINTR) {
69 // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
70 return false;
71 } else {
72 // cannot handle, rethrow exception
73 throw;
74 }
75 }
76}

Member Data Documentation

◆ m_addEventSize

bool m_addEventSize = false
private

Parameter to add the event size to a message.

Definition at line 103 of file ZMQRawConnection.h.

◆ m_averages

std::unordered_map<std::string, std::tuple<std::vector<double>, size_t> > m_averages
privateinherited

Internal storage of the previous values when calculating averages.

Definition at line 60 of file ZMQLogger.h.

◆ m_dataIdentity

std::string m_dataIdentity = ""
private

Internal storage of the connected identity to no have multiple connections.

Definition at line 101 of file ZMQRawConnection.h.

◆ m_monitoring

std::map<std::string, std::variant<long, double, std::string> > m_monitoring
privateinherited

Internal storage of all stored values.

Definition at line 58 of file ZMQLogger.h.

◆ m_parent

std::shared_ptr<ZMQParent> m_parent
protectedinherited

The shared ZMQParent instance.

Definition at line 75 of file ZMQConnection.h.

◆ m_socket

std::unique_ptr<zmq::socket_t> m_socket
protectedinherited

The memory of the socket. Needs to be initialized in a derived class.

Definition at line 77 of file ZMQConnection.h.

◆ m_timeCounters

std::unordered_map<std::string, std::tuple<unsigned long, std::chrono::system_clock::time_point> > m_timeCounters
privateinherited

Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.

Definition at line 62 of file ZMQLogger.h.


The documentation for this class was generated from the following files: