Belle II Software development
ZMQHistoServerToZMQOutput Class Reference

Send the histograms as compressed byte stream via a ZMQConfirmedOutput connection after merging with all the properties of a normal confirmed output. More...

#include <ZMQHistogramConnection.h>

Inheritance diagram for ZMQHistoServerToZMQOutput:
ZMQConnection ZMQLogger

Public Types

using ReactorFunction = std::function< void(void)>
 Typedef of a function which will be called if a connection has a message.
 

Public Member Functions

 ZMQHistoServerToZMQOutput (const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
 Initialize the ZMQConfirmedOutput with the given address.
 
void mergeAndSend (const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
 Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
 
void clear ()
 Nothing to do on clear.
 
std::string getMonitoringJSON () const final
 The monitoring JSON is just passed from the ZMQConfirmedOutput.
 
void handleIncomingData ()
 The input data handling is passed from the ZMQConfirmedOutput.
 
bool isReady () const override
 The is ready is passed from the ZMQConfirmedOutput.
 
std::vector< zmq::socket_t * > getSockets () const final
 The sockets are passed from ZMQConfirmedOutput.
 
std::string getEndPoint () const
 Return the connection string.
 
template<class AClass >
void log (const std::string &key, const AClass &value)
 Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key.
 
void increment (const std::string &key)
 Increment the value with the given key (only numerical values). If not present, set to 1.
 
void decrement (const std::string &key)
 Decrement the value with the given key (only numerical values). If not present, set to -1.
 
template<size_t MAX_SIZE = 100>
void average (const std::string &key, double value)
 Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values.
 
template<size_t AVERAGE_SIZE = 2000>
void timeit (const std::string &key)
 Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement)
 
void logTime (const std::string &key)
 Store the current time as a string under the given key.
 

Static Public Member Functions

static bool poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
 Poll on the given connections and call the attached function if a messages comes in.
 
static bool hasMessage (const ZMQConnection *connection)
 Check if the given connection as an incoming message (right now, no waiting).
 

Private Attributes

ZMQConfirmedOutput m_output
 The output connection used for sending the histograms.
 
std::vector< char > m_outputBuffer
 Buffer used for compression.
 
unsigned int m_maximalCompressedSize = 100'000'000
 Maximal size of the compression buffer.
 
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
 Internal storage of all stored values.
 
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > m_averages
 Internal storage of the previous values when calculating averages.
 
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > m_timeCounters
 Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
 

Detailed Description

Send the histograms as compressed byte stream via a ZMQConfirmedOutput connection after merging with all the properties of a normal confirmed output.

Stop and terminate messages are sent as normal (after performing a last merge and sending it). As additional message the event meta data is passed.

This connection allows to built hierarchies of histogram servers with ZMQ.

Definition at line 88 of file ZMQHistogramConnection.h.

Member Typedef Documentation

◆ ReactorFunction

using ReactorFunction = std::function<void(void)>
inherited

Typedef of a function which will be called if a connection has a message.

Definition at line 34 of file ZMQConnection.h.

Constructor & Destructor Documentation

◆ ZMQHistoServerToZMQOutput()

ZMQHistoServerToZMQOutput ( const std::string &  outputAddress,
const std::shared_ptr< ZMQParent > &  parent 
)

Initialize the ZMQConfirmedOutput with the given address.

Definition at line 116 of file ZMQHistogramConnection.cc.

116 :
117 m_output(outputAddress, parent)
118{
119 m_output.log("histogram_merges", 0l);
120 m_output.log("last_merged_histograms", 0l);
121 m_output.log("average_merged_histograms", 0l);
122 m_output.log("last_merge", "");
123 m_output.log("size_before_compression", 0.0);
124 m_output.log("size_after_compression", 0.0);
125}
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96

Member Function Documentation

◆ clear()

void clear ( )
inline

Nothing to do on clear.

Definition at line 98 of file ZMQHistogramConnection.h.

98{}

◆ decrement()

void decrement ( const std::string &  key)
inherited

Decrement the value with the given key (only numerical values). If not present, set to -1.

Definition at line 37 of file ZMQLogger.cc.

38{
39 std::visit(Decrementor{}, m_monitoring[key]);
40}
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
Internal storage of all stored values.
Definition: ZMQLogger.h:58

◆ getEndPoint()

std::string getEndPoint ( ) const
inline

Return the connection string.

Definition at line 109 of file ZMQHistogramConnection.h.

109{ return m_output.getEndPoint(); }
std::string getEndPoint() const
Return the connection string for this socket.

◆ getMonitoringJSON()

std::string getMonitoringJSON ( ) const
inlinefinalvirtual

The monitoring JSON is just passed from the ZMQConfirmedOutput.

Reimplemented from ZMQLogger.

Definition at line 101 of file ZMQHistogramConnection.h.

101{ return m_output.getMonitoringJSON(); }
virtual std::string getMonitoringJSON() const
Convert the stored monitoring values to a JSON string ready for sending out via a message.
Definition: ZMQLogger.cc:15

◆ getSockets()

std::vector< zmq::socket_t * > getSockets ( ) const
inlinefinalvirtual

The sockets are passed from ZMQConfirmedOutput.

Implements ZMQConnection.

Definition at line 107 of file ZMQHistogramConnection.h.

107{ return m_output.getSockets(); }
std::vector< zmq::socket_t * > getSockets() const final
The socket used for polling is just the stored socket.

◆ handleIncomingData()

void handleIncomingData ( )
inline

The input data handling is passed from the ZMQConfirmedOutput.

Definition at line 103 of file ZMQHistogramConnection.h.

void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.

◆ hasMessage()

bool hasMessage ( const ZMQConnection connection)
staticinherited

Check if the given connection as an incoming message (right now, no waiting).

Definition at line 20 of file ZMQConnection.cc.

21{
22 // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
23 const auto emptyFunction = []() {};
24 return ZMQConnection::poll({{connection, emptyFunction}}, 0);
25}
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.

◆ increment()

void increment ( const std::string &  key)
inherited

Increment the value with the given key (only numerical values). If not present, set to 1.

Definition at line 32 of file ZMQLogger.cc.

33{
34 std::visit(Incrementor{}, m_monitoring[key]);
35}

◆ isReady()

bool isReady ( ) const
inlineoverridevirtual

The is ready is passed from the ZMQConfirmedOutput.

Reimplemented from ZMQConnection.

Definition at line 105 of file ZMQHistogramConnection.h.

105{ return m_output.isReady(); }
virtual bool isReady() const
Return true of this connection is able to send messages right now. Can be overloaded in derived class...

◆ logTime()

void logTime ( const std::string &  key)
inherited

Store the current time as a string under the given key.

Definition at line 42 of file ZMQLogger.cc.

43{
44 auto current = std::chrono::system_clock::now();
45 auto displayTime = std::chrono::system_clock::to_time_t(current);
46 log(key, std::ctime(&displayTime));
47}

◆ mergeAndSend()

void mergeAndSend ( const std::map< std::string, HistogramMapping > &  storedMessages,
const std::optional< unsigned int > &  experiment,
const std::optional< unsigned int > &  run,
EMessageTypes  messageType 
)

Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.

Definition at line 127 of file ZMQHistogramConnection.cc.

130{
131 if (messageType == EMessageTypes::c_lastEventMessage) {
132 // merge one last time
133 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
134 // and then send out a stop signal by ourself
135 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
136 m_output.handleEvent(std::move(message));
137 return;
138 } else if (messageType == EMessageTypes::c_terminateMessage) {
139 // merge one last time
140 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
141 // and send out a terminate message
142 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
143 m_output.handleEvent(std::move(message));
144 return;
145 }
146
147 B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
148
149 // Makes no sense to send out an empty event
150 if (storedMessages.empty()) {
151 return;
152 }
153
154 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
155
156 m_output.increment("histogram_merges");
157
158 HistogramMapping mergeHistograms;
159
160 m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
161 m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
162 m_output.logTime("last_merge");
163
164 for (const auto& keyValue : storedMessages) {
165 const auto& histogram = keyValue.second;
166 mergeHistograms += histogram;
167 }
168
169 auto eventMessage = mergeHistograms.toMessage();
170
171 if (m_outputBuffer.empty()) {
173 }
174
175 m_output.average("size_before_compression", eventMessage->size());
176 int size = m_maximalCompressedSize;
177 size = LZ4_compress_default(eventMessage->buffer(), &m_outputBuffer[0], eventMessage->size(), size);
178 B2ASSERT("Compression failed", size > 0);
179 m_output.average("size_after_compression", size);
180
181 zmq::message_t message(&m_outputBuffer[0], size);
182
183 EventMetaData eventMetaData(0, *run, *experiment);
184 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
185 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
186
187 auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_compressedDataMessage, std::move(message),
188 std::move(additionalEventMessage));
189 m_output.handleEvent(std::move(zmqMessage), true, 20000);
190}
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
std::vector< char > m_outputBuffer
Buffer used for compression.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102

◆ poll()

bool poll ( const std::map< const ZMQConnection *, ReactorFunction > &  connectionList,
int  timeout 
)
staticinherited

Poll on the given connections and call the attached function if a messages comes in.

If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.

Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!

Definition at line 27 of file ZMQConnection.cc.

28{
29 std::vector<const ReactorFunction*> socketMapping;
30 std::vector<zmq::pollitem_t> pollItems;
31
32 // zmq needs a special format for its polling, so create it here.
33 for (const auto& [connection, function] : connectionList) {
34 auto sockets = connection->getSockets();
35 for (zmq::socket_t* socket : sockets) {
36 zmq::pollitem_t pollItem;
37 pollItem.socket = static_cast<void*>(*socket);
38 pollItem.events = ZMQ_POLLIN;
39 pollItem.revents = 0;
40 pollItems.push_back(std::move(pollItem));
41
42 // but keep reference to the original function, so we can call the correct one later
43 socketMapping.push_back(&function);
44 }
45 }
46
47 if (pollItems.empty()) {
48 return false;
49 }
50
51 try {
52 zmq::poll(&pollItems[0], pollItems.size(), timeout);
53
54 bool anySocket = false;
55 unsigned int counter = 0;
56 for (const auto& pollItem : pollItems) {
57 if (pollItem.revents & ZMQ_POLLIN) {
58 anySocket = true;
59 const auto* functionPtr = socketMapping.at(counter);
60 const auto function = *functionPtr;
61 function();
62 }
63 counter++;
64 }
65
66 return anySocket;
67 } catch (zmq::error_t& error) {
68 if (error.num() == EINTR) {
69 // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
70 return false;
71 } else {
72 // cannot handle, rethrow exception
73 throw;
74 }
75 }
76}

Member Data Documentation

◆ m_averages

std::unordered_map<std::string, std::tuple<std::vector<double>, size_t> > m_averages
privateinherited

Internal storage of the previous values when calculating averages.

Definition at line 60 of file ZMQLogger.h.

◆ m_maximalCompressedSize

unsigned int m_maximalCompressedSize = 100'000'000
private

Maximal size of the compression buffer.

Definition at line 117 of file ZMQHistogramConnection.h.

◆ m_monitoring

std::map<std::string, std::variant<long, double, std::string> > m_monitoring
privateinherited

Internal storage of all stored values.

Definition at line 58 of file ZMQLogger.h.

◆ m_output

ZMQConfirmedOutput m_output
private

The output connection used for sending the histograms.

Definition at line 112 of file ZMQHistogramConnection.h.

◆ m_outputBuffer

std::vector<char> m_outputBuffer
private

Buffer used for compression.

Definition at line 115 of file ZMQHistogramConnection.h.

◆ m_timeCounters

std::unordered_map<std::string, std::tuple<unsigned long, std::chrono::system_clock::time_point> > m_timeCounters
privateinherited

Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.

Definition at line 62 of file ZMQLogger.h.


The documentation for this class was generated from the following files: