8#include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11#include <framework/logging/Logger.h>
19 log(
"sent_ready", 0l);
20 log(
"data_size", 0.0);
21 log(
"received_events", 0l);
22 log(
"event_rate", 0.0);
28 for (
unsigned int i = 0; i < bufferSize; i++) {
37 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(
m_socket);
39 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
46 const auto dataSize = message->getDataMessage().size();
64 log(
"data_size", 0.0);
65 log(
"dismissed_events", 0l);
66 log(
"event_rate", 0.0);
67 log(
"sent_events", 0l);
69 log(
"all_stop_messages", 0l);
70 log(
"sent_stop_messages", 0l);
71 log(
"last_stop_sent",
"");
73 log(
"all_terminate_messages", 0l);
74 log(
"sent_terminate_messages", 0l);
75 log(
"last_terminate_sent",
"");
83 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
94 }
else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
103 logTime(
"last_terminate_sent");
113 B2ERROR(
"Received events after stop! I will dismiss this event.");
118 const auto dataSize = message->getDataMessage().size();
124 average(
"data_size", dataSize);
125 average(
"data_size_to[" + nextWorker +
"]", dataSize);
128 increment(
"sent_events[" + nextWorker +
"]");
131 timeit<200>(
"event_rate_to[" + nextWorker +
"]");
137 decrement(
"ready_messages[" + nextWorker +
"]");
142 auto readyMessage = ZMQMessageFactory::fromSocket<ZMQIdMessage>(
m_socket);
143 B2ASSERT(
"Should be a ready message", readyMessage->isMessage(EMessageTypes::c_readyMessage));
146 const auto toIdentity = readyMessage->getIdentity();
166 increment(
"ready_messages[" + toIdentity +
"]");
Specialized connection over a ZMQ socket.
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
std::set< std::string > m_allWorkers
All ever registered inputs.
bool m_sentTerminateMessages
Did we already sent a terminate message?
ZMQLoadBalancedOutput(const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced output and bind to the given address.
void handleIncomingData()
Block until a ready message from an input is received and add it to the ready queue.
bool m_lax
Parameter to enable lax mode.
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
bool isReady() const final
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
void clear()
Clear the counter for sent stop and terminate messages. Should be called on run start.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message)
Send the given message (without identity) to the next input in the ready list.
bool m_sentStopMessages
Did we already sent a stop message?
void decrement(const std::string &key)
Decrement the value with the given key (only numerical values). If not present, set to -1.
void logTime(const std::string &key)
Store the current time as a string under the given key.
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Abstract base class for different kinds of events.