8#include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h> 
   10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h> 
   11#include <framework/logging/Logger.h> 
   19  log(
"sent_ready", 0l);
 
   20  log(
"data_size", 0.0);
 
   21  log(
"received_events", 0l);
 
   22  log(
"event_rate", 0.0);
 
   28  for (
unsigned int i = 0; i < bufferSize; i++) {
 
 
   39  if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
 
   46    const auto dataSize = message->getDataMessage().size();
 
 
   64  log(
"data_size", 0.0);
 
   65  log(
"dismissed_events", 0l);
 
   66  log(
"event_rate", 0.0);
 
   67  log(
"sent_events", 0l);
 
   69  log(
"all_stop_messages", 0l);
 
   70  log(
"sent_stop_messages",  0l);
 
   71  log(
"last_stop_sent",  
"");
 
   73  log(
"all_terminate_messages", 0l);
 
   74  log(
"sent_terminate_messages",  0l);
 
   75  log(
"last_terminate_sent",  
"");
 
 
   83  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
 
   94  } 
else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
 
  103    logTime(
"last_terminate_sent");
 
  113    B2ERROR(
"Received events after stop! I will dismiss this event.");
 
  118  const auto dataSize = message->getDataMessage().size();
 
  124  average(
"data_size", dataSize);
 
  125  average(
"data_size_to[" + nextWorker + 
"]", dataSize);
 
  128  increment(
"sent_events[" + nextWorker + 
"]");
 
  137  decrement(
"ready_messages[" + nextWorker + 
"]");
 
 
  143  B2ASSERT(
"Should be a ready message", readyMessage->isMessage(EMessageTypes::c_readyMessage));
 
  146  const auto toIdentity = readyMessage->getIdentity();
 
  166  increment(
"ready_messages[" + toIdentity + 
"]");
 
 
ZMQConnectionOverSocket(const std::shared_ptr< ZMQParent > &parent)
Create a new instance passing the shared ZMQParent.
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
std::set< std::string > m_allWorkers
All ever registered inputs.
bool m_sentTerminateMessages
Did we already sent a terminate message?
ZMQLoadBalancedOutput(const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced output and bind to the given address.
void handleIncomingData()
Block until a ready message from an input is received and add it to the ready queue.
bool m_lax
Parameter to enable lax mode.
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
bool isReady() const final
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
void clear()
Clear the counter for sent stop and terminate messages. Should be called on run start.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message)
Send the given message (without identity) to the next input in the ready list.
bool m_sentStopMessages
Did we already sent a stop message?
void decrement(const std::string &key)
Decrement the value with the given key (only numerical values). If not present, set to -1.
void logTime(const std::string &key)
Store the current time as a string under the given key.
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
static std::unique_ptr< AMessage > fromSocket(const std::unique_ptr< zmq::socket_t > &socket)
Create a message of the given type by receiving a message from the socket.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void average(const std::string &key, double value)
Instead of storing the double value directly under the given key, store the average of the last MAX_S...
Abstract base class for different kinds of events.