 |
Belle II Software
release-05-01-25
|
10 #include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
13 #include <framework/logging/Logger.h>
21 log(
"sent_ready", 0l);
22 log(
"data_size", 0.0);
23 log(
"received_events", 0l);
24 log(
"event_rate", 0.0);
30 for (
unsigned int i = 0; i < bufferSize; i++) {
39 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(
m_socket);
41 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
48 const auto dataSize = message->getDataMessage().size();
66 log(
"data_size", 0.0);
67 log(
"dismissed_events", 0l);
68 log(
"event_rate", 0.0);
69 log(
"sent_events", 0l);
71 log(
"all_stop_messages", 0l);
72 log(
"sent_stop_messages", 0l);
73 log(
"last_stop_sent",
"");
75 log(
"all_terminate_messages", 0l);
76 log(
"sent_terminate_messages", 0l);
77 log(
"last_terminate_sent",
"");
85 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
96 }
else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
105 logTime(
"last_terminate_sent");
115 B2ERROR(
"Received events after stop! I will dismiss this event.");
120 const auto dataSize = message->getDataMessage().size();
126 average(
"data_size", dataSize);
127 average(
"data_size_to[" + nextWorker +
"]", dataSize);
130 increment(
"sent_events[" + nextWorker +
"]");
133 timeit<200>(
"event_rate_to[" + nextWorker +
"]");
139 decrement(
"ready_messages[" + nextWorker +
"]");
144 auto readyMessage = ZMQMessageFactory::fromSocket<ZMQIdMessage>(
m_socket);
145 B2ASSERT(
"Should be a ready message", readyMessage->isMessage(EMessageTypes::c_readyMessage));
148 const auto toIdentity = readyMessage->getIdentity();
168 increment(
"ready_messages[" + toIdentity +
"]");
bool m_sentStopMessages
Did we already sent a stop message?
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message)
Send the given message (without identity) to the next input in the ready list.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
void logTime(const std::string &key)
Store the current time as a string under the given key.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
bool isReady() const final
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
void decrement(const std::string &key)
Decrement the value with the given key (only numerical values). If not present, set to -1.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
bool m_lax
Parameter to enable lax mode.
Abstract base class for different kinds of events.
void handleIncomingData()
Block until a ready message from an input is received and add it to the ready queue.
bool m_sentTerminateMessages
Did we already sent a terminate message?
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
void clear()
Clear the counter for sent stop and terminate messages. Should be called on run start.
Specialized connection over a ZMQ socket.
ZMQLoadBalancedOutput(const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced output and bind to the given address.
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
std::set< std::string > m_allWorkers
All ever registered inputs.