Belle II Software development
ZMQRawInput Class Reference

Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket. More...

#include <ZMQRawConnection.h>

Inheritance diagram for ZMQRawInput:
ZMQConnectionOverSocket ZMQConnection ZMQLogger

Public Types

using ReactorFunction = std::function< void(void)>
 Typedef of a function which will be called if a connection has a message.
 

Public Member Functions

 ZMQRawInput (const std::string &inputAddress, unsigned int maximalBufferSize, bool receiveEventMessages, const std::shared_ptr< ZMQParent > &parent)
 Create a new raw input connection. The bind or connect behavior is chosen according to the given address.
 
std::vector< zmq::message_t > handleIncomingData ()
 Block until a TCP packet can be received from the socket.
 
void clear ()
 Reset the internal buffer and counter.
 
std::vector< zmq::socket_t * > getSockets () const final
 The socket used for polling is just the stored socket.
 
std::string getEndPoint () const
 Return the connection string for this socket.
 
virtual bool isReady () const
 Return true of this connection is able to send messages right now. Can be overloaded in derived classes.
 
virtual std::string getMonitoringJSON () const
 Convert the stored monitoring values to a JSON string ready for sending out via a message.
 
template<class AClass >
void log (const std::string &key, const AClass &value)
 Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key.
 
void increment (const std::string &key)
 Increment the value with the given key (only numerical values). If not present, set to 1.
 
void decrement (const std::string &key)
 Decrement the value with the given key (only numerical values). If not present, set to -1.
 
template<size_t MAX_SIZE = 100>
void average (const std::string &key, double value)
 Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values.
 
template<size_t AVERAGE_SIZE = 2000>
void timeit (const std::string &key)
 Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement)
 
void logTime (const std::string &key)
 Store the current time as a string under the given key.
 

Static Public Member Functions

static bool poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
 Poll on the given connections and call the attached function if a messages comes in.
 
static bool hasMessage (const ZMQConnection *connection)
 Check if the given connection as an incoming message (right now, no waiting).
 

Protected Attributes

std::shared_ptr< ZMQParentm_parent
 The shared ZMQParent instance.
 
std::unique_ptr< zmq::socket_t > m_socket
 The memory of the socket. Needs to be initialized in a derived class.
 

Private Attributes

unsigned int m_maximalBufferSize
 Parameter for the maximal buffer size. If this size is reached, a FATAL will be issued.
 
std::vector< char > m_buffer
 Internal storage for the buffer.
 
size_t m_writeAddress = 0
 Where in the buffer are we currently writing to.
 
unsigned int m_currentSize = 0
 How large should the full message be? The information is from the first int of the message.
 
bool m_receiveEventMessages
 Parameter to receive event messages (see above)
 
std::string m_inputIdentity = ""
 Internal storage of the connected socket to check if we get messages from multiple ones.
 
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
 Internal storage of all stored values.
 
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > m_averages
 Internal storage of the previous values when calculating averages.
 
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > m_timeCounters
 Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
 

Detailed Description

Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket.

Can only speak with a single socket.

On receiving a TC packet, it is stored in an internal buffer. Only if the message is complete it is returned wrapped up as a zmq:message_t.

Two message formats are understood:

  • if receiveEventMessages is set to false, the first int of the message must be the size L of the message, in units of sizeof(int) = words. (so full message size in bytes = L * sizeof(int)) Please note that this size also includes this first int, so the real data message has a size in bytes of L * sizeof(int) - sizeof(int)
  • if it is true, the first int of the message must be the ntohl-converted size of the message in bytes without this first size int.

The two message formats are chosen to be compatible with the HLT implementation.

Definition at line 41 of file ZMQRawConnection.h.

Member Typedef Documentation

◆ ReactorFunction

using ReactorFunction = std::function<void(void)>
inherited

Typedef of a function which will be called if a connection has a message.

Definition at line 34 of file ZMQConnection.h.

Constructor & Destructor Documentation

◆ ZMQRawInput()

ZMQRawInput ( const std::string &  inputAddress,
unsigned int  maximalBufferSize,
bool  receiveEventMessages,
const std::shared_ptr< ZMQParent > &  parent 
)

Create a new raw input connection. The bind or connect behavior is chosen according to the given address.

Definition at line 16 of file ZMQRawConnection.cc.

18 m_maximalBufferSize(maximalBufferSize),
19 m_receiveEventMessages(receiveEventMessages)
20{
21 // We clear all our internal state and counters
22 log("data_size", 0.0);
23 log("received_events", 0l);
24 log("event_rate", 0.0);
25 log("average_received_byte_packages", 0.0);
26
27 log("socket_state", "disconnected");
28 log("socket_connects", 0l);
29 log("socket_disconnects", 0l);
30 log("current_size", 0l);
31 log("write_address", 0l);
32 log("average_number_of_events_per_package", 0l);
33
34 // STREAM is the ZMQ type for raw, non ZMQ connections
35 m_socket = m_parent->createSocket<ZMQ_STREAM>(inputAddress);
36
37 m_buffer.reserve(maximalBufferSize);
38}
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:75
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:77
unsigned int m_maximalBufferSize
Parameter for the maximal buffer size. If this size is reached, a FATAL will be issued.
std::vector< char > m_buffer
Internal storage for the buffer.
bool m_receiveEventMessages
Parameter to receive event messages (see above)
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96

Member Function Documentation

◆ clear()

void clear ( )

Reset the internal buffer and counter.

Definition at line 40 of file ZMQRawConnection.cc.

41{
43 m_currentSize = 0;
44}
size_t m_writeAddress
Where in the buffer are we currently writing to.
unsigned int m_currentSize
How large should the full message be? The information is from the first int of the message.

◆ decrement()

void decrement ( const std::string &  key)
inherited

Decrement the value with the given key (only numerical values). If not present, set to -1.

Definition at line 37 of file ZMQLogger.cc.

38{
39 std::visit(Decrementor{}, m_monitoring[key]);
40}
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
Internal storage of all stored values.
Definition: ZMQLogger.h:58

◆ getEndPoint()

std::string getEndPoint ( ) const
inherited

Return the connection string for this socket.

Definition at line 87 of file ZMQConnection.cc.

88{
89 std::string endpoint{""};
90 if (m_socket) {
91 endpoint = m_socket->get(zmq::sockopt::last_endpoint);
92 }
93 return endpoint;
94}

◆ getMonitoringJSON()

std::string getMonitoringJSON ( ) const
virtualinherited

Convert the stored monitoring values to a JSON string ready for sending out via a message.

Reimplemented in ZMQHistoServerToZMQOutput, ZMQHistoServerToRawOutput, ZMQROIOutput, and ZMQDataAndROIOutput.

Definition at line 15 of file ZMQLogger.cc.

16{
17 std::stringstream buffer;
18 buffer << "{";
19 bool first = true;
20 for (const auto& keyValue : m_monitoring) {
21 if (not first) {
22 buffer << ", ";
23 }
24 first = false;
25 buffer << "\"" << keyValue.first << "\": ";
26 buffer << std::visit(toJSON{}, keyValue.second);
27 }
28 buffer << "}" << std::endl;
29 return buffer.str();
30}

◆ getSockets()

std::vector< zmq::socket_t * > getSockets ( ) const
finalvirtualinherited

The socket used for polling is just the stored socket.

Implements ZMQConnection.

Definition at line 82 of file ZMQConnection.cc.

83{
84 return {m_socket.get()};
85}

◆ handleIncomingData()

std::vector< zmq::message_t > handleIncomingData ( )

Block until a TCP packet can be received from the socket.

Returns only full messages as zmq:message_t. Please note that this can be

  • 0 messages if there has not been a full message so far
  • 1 message if there was
  • more than 1 if the message size is smaller than the TCP package size (~8kB)

Definition at line 46 of file ZMQRawConnection.cc.

47{
48 std::vector<zmq::message_t> receivedMessages;
49
50 // We will always get one or two parts. The first one is the identity..
51 zmq::message_t identity;
52 auto received = m_socket->recv(identity, zmq::recv_flags::none);
53 B2ASSERT("No message received", received);
54 B2ASSERT("Message should not be empty", *received > 0);
55 std::string identityString(identity.data<char>(), identity.size());
56 B2ASSERT("The message is incomplete!", m_socket->get(zmq::sockopt::rcvmore) == 1);
57 B2ASSERT("The app can only handle a single connection!",
58 m_inputIdentity == identityString or m_inputIdentity.empty());
59
60 // ... and the second one is the message itself.
61 const size_t remainingSpace = m_maximalBufferSize - m_writeAddress;
62 auto receivedBytes = m_socket->recv(zmq::mutable_buffer{&m_buffer[m_writeAddress], remainingSpace}, zmq::recv_flags::none);
63 B2ASSERT("No message received", receivedBytes);
64 B2ASSERT("The message is longer than expected! Increase the buffer size.", !receivedBytes->truncated());
65 if (receivedBytes->size == 0) {
66 // Empty message means the client connected or disconnected
67 if (m_inputIdentity.empty()) {
68 m_inputIdentity = identityString;
69 log("socket_state", "connected");
70 increment("socket_connects");
71 } else {
72 m_inputIdentity = "";
73 log("socket_state", "disconnected");
74 increment("socket_disconnects");
75 }
76 return receivedMessages;
77 }
78 // We can maximal write `remainingSpace` into the buffer. If the message was longer, ZMQ will just cut it.
79 // This means we are loosing data and the buffer size should be increased.
80 if (receivedBytes->untruncated_size > remainingSpace) {
81 B2FATAL("The size of the buffer is too small! " << receivedBytes->untruncated_size << " > " << remainingSpace);
82 }
83 average("average_received_byte_packages", receivedBytes->size);
84
85 // `m_writeAddress` always points to the index on where we will write next.
86 // As we have written `receivedBytes` we need to advance
87 m_writeAddress += receivedBytes->size;
88
89 log("write_address", static_cast<long>(m_writeAddress));
90
91 // If the current buffer is smaller than an int, we can not get the size
92 while (m_writeAddress >= sizeof(int)) {
93 if (m_currentSize == 0) {
94 // we do not know the size of the data package already, so lets get it from the buffer.
95 // It is always in the first sizeof(int) of the data
96 memcpy(&m_currentSize, &m_buffer[0], sizeof(int));
97
98 // Here the two different message formats differ
101 }
102 B2ASSERT("Strange size in the data!", m_currentSize > 0);
104 m_currentSize += sizeof(int);
105 } else {
106 m_currentSize *= sizeof(int);
107 }
108
109 log("current_size", static_cast<long>(m_currentSize));
110 }
112 // Now we know the size already, and we have enough data received so we have actually the full
113 // data. We can build the total message.
114 average("data_size", m_currentSize);
115 increment("received_events");
116 timeit("event_rate");
117
118 // Again, here the two different message formats differ. One includes the first int with the length...
119 unsigned int startAddress = 0;
121 // .. and the other does not
122 startAddress = sizeof(int);
123 }
124 zmq::message_t dataMessage(&m_buffer[startAddress], static_cast<size_t>(m_currentSize - startAddress));
125
128 m_currentSize = 0;
129
130 log("write_address", static_cast<long>(m_writeAddress));
131 log("current_size", static_cast<long>(m_currentSize));
132
133 receivedMessages.push_back(std::move(dataMessage));
134 } else {
135 // We did not receive a full message up to now
136 break;
137 }
138 }
139
140 average("average_number_of_events_per_package", receivedMessages.size());
141 return receivedMessages;
142}
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
std::string m_inputIdentity
Internal storage of the connected socket to check if we get messages from multiple ones.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102

◆ hasMessage()

bool hasMessage ( const ZMQConnection connection)
staticinherited

Check if the given connection as an incoming message (right now, no waiting).

Definition at line 20 of file ZMQConnection.cc.

21{
22 // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
23 const auto emptyFunction = []() {};
24 return ZMQConnection::poll({{connection, emptyFunction}}, 0);
25}
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.

◆ increment()

void increment ( const std::string &  key)
inherited

Increment the value with the given key (only numerical values). If not present, set to 1.

Definition at line 32 of file ZMQLogger.cc.

33{
34 std::visit(Incrementor{}, m_monitoring[key]);
35}

◆ isReady()

bool isReady ( ) const
virtualinherited

Return true of this connection is able to send messages right now. Can be overloaded in derived classes.

Reimplemented in ZMQROIOutput, ZMQDataAndROIOutput, ZMQLoadBalancedOutput, ZMQRawOutput, ZMQHistoServerToZMQOutput, and ZMQHistoServerToRawOutput.

Definition at line 15 of file ZMQConnection.cc.

16{
17 return true;
18}

◆ logTime()

void logTime ( const std::string &  key)
inherited

Store the current time as a string under the given key.

Definition at line 42 of file ZMQLogger.cc.

43{
44 auto current = std::chrono::system_clock::now();
45 auto displayTime = std::chrono::system_clock::to_time_t(current);
46 log(key, std::ctime(&displayTime));
47}

◆ poll()

bool poll ( const std::map< const ZMQConnection *, ReactorFunction > &  connectionList,
int  timeout 
)
staticinherited

Poll on the given connections and call the attached function if a messages comes in.

If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.

Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!

Definition at line 27 of file ZMQConnection.cc.

28{
29 std::vector<const ReactorFunction*> socketMapping;
30 std::vector<zmq::pollitem_t> pollItems;
31
32 // zmq needs a special format for its polling, so create it here.
33 for (const auto& [connection, function] : connectionList) {
34 auto sockets = connection->getSockets();
35 for (zmq::socket_t* socket : sockets) {
36 zmq::pollitem_t pollItem;
37 pollItem.socket = static_cast<void*>(*socket);
38 pollItem.events = ZMQ_POLLIN;
39 pollItem.revents = 0;
40 pollItems.push_back(std::move(pollItem));
41
42 // but keep reference to the original function, so we can call the correct one later
43 socketMapping.push_back(&function);
44 }
45 }
46
47 if (pollItems.empty()) {
48 return false;
49 }
50
51 try {
52 zmq::poll(&pollItems[0], pollItems.size(), timeout);
53
54 bool anySocket = false;
55 unsigned int counter = 0;
56 for (const auto& pollItem : pollItems) {
57 if (pollItem.revents & ZMQ_POLLIN) {
58 anySocket = true;
59 const auto* functionPtr = socketMapping.at(counter);
60 const auto function = *functionPtr;
61 function();
62 }
63 counter++;
64 }
65
66 return anySocket;
67 } catch (zmq::error_t& error) {
68 if (error.num() == EINTR) {
69 // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
70 return false;
71 } else {
72 // cannot handle, rethrow exception
73 throw;
74 }
75 }
76}

Member Data Documentation

◆ m_averages

std::unordered_map<std::string, std::tuple<std::vector<double>, size_t> > m_averages
privateinherited

Internal storage of the previous values when calculating averages.

Definition at line 60 of file ZMQLogger.h.

◆ m_buffer

std::vector<char> m_buffer
private

Internal storage for the buffer.

Definition at line 63 of file ZMQRawConnection.h.

◆ m_currentSize

unsigned int m_currentSize = 0
private

How large should the full message be? The information is from the first int of the message.

Definition at line 67 of file ZMQRawConnection.h.

◆ m_inputIdentity

std::string m_inputIdentity = ""
private

Internal storage of the connected socket to check if we get messages from multiple ones.

Definition at line 71 of file ZMQRawConnection.h.

◆ m_maximalBufferSize

unsigned int m_maximalBufferSize
private

Parameter for the maximal buffer size. If this size is reached, a FATAL will be issued.

Definition at line 61 of file ZMQRawConnection.h.

◆ m_monitoring

std::map<std::string, std::variant<long, double, std::string> > m_monitoring
privateinherited

Internal storage of all stored values.

Definition at line 58 of file ZMQLogger.h.

◆ m_parent

std::shared_ptr<ZMQParent> m_parent
protectedinherited

The shared ZMQParent instance.

Definition at line 75 of file ZMQConnection.h.

◆ m_receiveEventMessages

bool m_receiveEventMessages
private

Parameter to receive event messages (see above)

Definition at line 69 of file ZMQRawConnection.h.

◆ m_socket

std::unique_ptr<zmq::socket_t> m_socket
protectedinherited

The memory of the socket. Needs to be initialized in a derived class.

Definition at line 77 of file ZMQConnection.h.

◆ m_timeCounters

std::unordered_map<std::string, std::tuple<unsigned long, std::chrono::system_clock::time_point> > m_timeCounters
privateinherited

Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.

Definition at line 62 of file ZMQLogger.h.

◆ m_writeAddress

size_t m_writeAddress = 0
private

Where in the buffer are we currently writing to.

Definition at line 65 of file ZMQRawConnection.h.


The documentation for this class was generated from the following files: