Belle II Software development
ZMQDataAndROIOutput Class Reference

Helper connection hosting both a normal raw and a ROI output and sending to both at the same time. More...

#include <ZMQROIConnection.h>

Inheritance diagram for ZMQDataAndROIOutput:
ZMQConnection ZMQLogger

Public Types

using ReactorFunction = std::function< void(void)>
 Typedef of a function which will be called if a connection has a message.
 

Public Member Functions

 ZMQDataAndROIOutput (const std::string &dataAddress, const std::string &roiAddress, bool addEventSize, const std::shared_ptr< ZMQParent > &parent)
 Initialize the raw and roi connection.
 
std::string getMonitoringJSON () const final
 Get the monitoring JSON from the raw connection.
 
std::string getROIMonitoringJSON () const
 Get the monitoring JSON from the ROI connection.
 
void handleEvent (std::unique_ptr< ZMQNoIdMessage > message)
 Send the normal data message to raw and the additional message to ROI.
 
void handleIncomingData ()
 Handle both a socket (dis)connect by raw or ROI.
 
bool isReady () const final
 Ready only when both sockets are ready.
 
std::vector< zmq::socket_t * > getSockets () const final
 Return both sockets for polling.
 
std::string getEndPoint () const
 Return the connection string.
 
template<class AClass >
void log (const std::string &key, const AClass &value)
 Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key.
 
void increment (const std::string &key)
 Increment the value with the given key (only numerical values). If not present, set to 1.
 
void decrement (const std::string &key)
 Decrement the value with the given key (only numerical values). If not present, set to -1.
 
template<size_t MAX_SIZE = 100>
void average (const std::string &key, double value)
 Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values.
 
template<size_t AVERAGE_SIZE = 2000>
void timeit (const std::string &key)
 Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement)
 
void logTime (const std::string &key)
 Store the current time as a string under the given key.
 

Static Public Member Functions

static bool poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
 Poll on the given connections and call the attached function if a messages comes in.
 
static bool hasMessage (const ZMQConnection *connection)
 Check if the given connection as an incoming message (right now, no waiting).
 

Private Attributes

ZMQRawOutput m_dataOutput
 The used raw connection.
 
ZMQRawOutput m_roiOutput
 The used ROI connection.
 
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
 Internal storage of all stored values.
 
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > m_averages
 Internal storage of the previous values when calculating averages.
 
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > m_timeCounters
 Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
 

Detailed Description

Helper connection hosting both a normal raw and a ROI output and sending to both at the same time.

The normal message will be sent to the raw connection whereas the additional message will be sent to the roi connection.

As two connections are hosted also two monitoring JSON can be requested.

Definition at line 70 of file ZMQROIConnection.h.

Member Typedef Documentation

◆ ReactorFunction

using ReactorFunction = std::function<void(void)>
inherited

Typedef of a function which will be called if a connection has a message.

Definition at line 34 of file ZMQConnection.h.

Constructor & Destructor Documentation

◆ ZMQDataAndROIOutput()

ZMQDataAndROIOutput ( const std::string &  dataAddress,
const std::string &  roiAddress,
bool  addEventSize,
const std::shared_ptr< ZMQParent > &  parent 
)

Initialize the raw and roi connection.

Definition at line 13 of file ZMQROIConnection.cc.

14 :
15 m_dataOutput(dataAddress, addEventSize, parent), m_roiOutput(roiAddress, false, parent) {}
ZMQRawOutput m_dataOutput
The used raw connection.
ZMQRawOutput m_roiOutput
The used ROI connection.

Member Function Documentation

◆ decrement()

void decrement ( const std::string &  key)
inherited

Decrement the value with the given key (only numerical values). If not present, set to -1.

Definition at line 37 of file ZMQLogger.cc.

38{
39 std::visit(Decrementor{}, m_monitoring[key]);
40}
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
Internal storage of all stored values.
Definition: ZMQLogger.h:58

◆ getEndPoint()

std::string getEndPoint ( ) const
inline

Return the connection string.

Definition at line 90 of file ZMQROIConnection.h.

90{ return m_dataOutput.getEndPoint() + ";" + m_roiOutput.getEndPoint(); }
std::string getEndPoint() const
Return the connection string for this socket.

◆ getMonitoringJSON()

std::string getMonitoringJSON ( ) const
finalvirtual

Get the monitoring JSON from the raw connection.

Reimplemented from ZMQLogger.

Definition at line 48 of file ZMQROIConnection.cc.

49{
51}
virtual std::string getMonitoringJSON() const
Convert the stored monitoring values to a JSON string ready for sending out via a message.
Definition: ZMQLogger.cc:15

◆ getROIMonitoringJSON()

std::string getROIMonitoringJSON ( ) const

Get the monitoring JSON from the ROI connection.

Definition at line 53 of file ZMQROIConnection.cc.

54{
56}

◆ getSockets()

std::vector< zmq::socket_t * > getSockets ( ) const
finalvirtual

Return both sockets for polling.

Implements ZMQConnection.

Definition at line 43 of file ZMQROIConnection.cc.

44{
46}
std::vector< zmq::socket_t * > getSockets() const final
The socket used for polling is just the stored socket.

◆ handleEvent()

void handleEvent ( std::unique_ptr< ZMQNoIdMessage message)

Send the normal data message to raw and the additional message to ROI.

Definition at line 17 of file ZMQROIConnection.cc.

18{
19 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
20 B2ASSERT("The data message needs to be present!", message->getDataMessage().size() > 0);
21 B2ASSERT("The roi message needs to be present!", message->getAdditionalDataMessage().size() > 0);
22
23 m_dataOutput.handleEvent(std::move(message->getDataMessage()));
24 m_roiOutput.handleEvent(std::move(message->getAdditionalDataMessage()));
25 }
26}
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.

◆ handleIncomingData()

void handleIncomingData ( )

Handle both a socket (dis)connect by raw or ROI.

Definition at line 28 of file ZMQROIConnection.cc.

29{
32 }
35 }
36}
static bool hasMessage(const ZMQConnection *connection)
Check if the given connection as an incoming message (right now, no waiting).
void handleIncomingData()
Handle incoming data: a socket (dis)connect.

◆ hasMessage()

bool hasMessage ( const ZMQConnection connection)
staticinherited

Check if the given connection as an incoming message (right now, no waiting).

Definition at line 20 of file ZMQConnection.cc.

21{
22 // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
23 const auto emptyFunction = []() {};
24 return ZMQConnection::poll({{connection, emptyFunction}}, 0);
25}
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.

◆ increment()

void increment ( const std::string &  key)
inherited

Increment the value with the given key (only numerical values). If not present, set to 1.

Definition at line 32 of file ZMQLogger.cc.

33{
34 std::visit(Incrementor{}, m_monitoring[key]);
35}

◆ isReady()

bool isReady ( ) const
finalvirtual

Ready only when both sockets are ready.

Reimplemented from ZMQConnection.

Definition at line 38 of file ZMQROIConnection.cc.

39{
41}
bool isReady() const final
If no socket is connected, this connection is not ready.

◆ logTime()

void logTime ( const std::string &  key)
inherited

Store the current time as a string under the given key.

Definition at line 42 of file ZMQLogger.cc.

43{
44 auto current = std::chrono::system_clock::now();
45 auto displayTime = std::chrono::system_clock::to_time_t(current);
46 log(key, std::ctime(&displayTime));
47}
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96

◆ poll()

bool poll ( const std::map< const ZMQConnection *, ReactorFunction > &  connectionList,
int  timeout 
)
staticinherited

Poll on the given connections and call the attached function if a messages comes in.

If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.

Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!

Definition at line 27 of file ZMQConnection.cc.

28{
29 std::vector<const ReactorFunction*> socketMapping;
30 std::vector<zmq::pollitem_t> pollItems;
31
32 // zmq needs a special format for its polling, so create it here.
33 for (const auto& [connection, function] : connectionList) {
34 auto sockets = connection->getSockets();
35 for (zmq::socket_t* socket : sockets) {
36 zmq::pollitem_t pollItem;
37 pollItem.socket = static_cast<void*>(*socket);
38 pollItem.events = ZMQ_POLLIN;
39 pollItem.revents = 0;
40 pollItems.push_back(std::move(pollItem));
41
42 // but keep reference to the original function, so we can call the correct one later
43 socketMapping.push_back(&function);
44 }
45 }
46
47 if (pollItems.empty()) {
48 return false;
49 }
50
51 try {
52 zmq::poll(&pollItems[0], pollItems.size(), timeout);
53
54 bool anySocket = false;
55 unsigned int counter = 0;
56 for (const auto& pollItem : pollItems) {
57 if (pollItem.revents & ZMQ_POLLIN) {
58 anySocket = true;
59 const auto* functionPtr = socketMapping.at(counter);
60 const auto function = *functionPtr;
61 function();
62 }
63 counter++;
64 }
65
66 return anySocket;
67 } catch (zmq::error_t& error) {
68 if (error.num() == EINTR) {
69 // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
70 return false;
71 } else {
72 // cannot handle, rethrow exception
73 throw;
74 }
75 }
76}

Member Data Documentation

◆ m_averages

std::unordered_map<std::string, std::tuple<std::vector<double>, size_t> > m_averages
privateinherited

Internal storage of the previous values when calculating averages.

Definition at line 60 of file ZMQLogger.h.

◆ m_dataOutput

ZMQRawOutput m_dataOutput
private

The used raw connection.

Definition at line 93 of file ZMQROIConnection.h.

◆ m_monitoring

std::map<std::string, std::variant<long, double, std::string> > m_monitoring
privateinherited

Internal storage of all stored values.

Definition at line 58 of file ZMQLogger.h.

◆ m_roiOutput

ZMQRawOutput m_roiOutput
private

The used ROI connection.

Definition at line 95 of file ZMQROIConnection.h.

◆ m_timeCounters

std::unordered_map<std::string, std::tuple<unsigned long, std::chrono::system_clock::time_point> > m_timeCounters
privateinherited

Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.

Definition at line 62 of file ZMQLogger.h.


The documentation for this class was generated from the following files: