8#include <framework/pcore/zmq/connections/ZMQRawConnection.h> 
    9#include <framework/pcore/zmq/messages/ZMQMessageHelper.h> 
   10#include <framework/logging/Logger.h> 
   22  log(
"data_size", 0.0);
 
   23  log(
"received_events", 0l);
 
   24  log(
"event_rate", 0.0);
 
   25  log(
"average_received_byte_packages", 0.0);
 
   27  log(
"socket_state", 
"disconnected");
 
   28  log(
"socket_connects", 0l);
 
   29  log(
"socket_disconnects", 0l);
 
   30  log(
"current_size", 0l);
 
   31  log(
"write_address", 0l);
 
   32  log(
"average_number_of_events_per_package", 0l);
 
 
   48  std::vector<zmq::message_t> receivedMessages;
 
   51  zmq::message_t identity;
 
   52  auto received = 
m_socket->recv(identity, zmq::recv_flags::none);
 
   53  B2ASSERT(
"No message received", received);
 
   54  B2ASSERT(
"Message should not be empty", *received > 0);
 
   55  std::string identityString(identity.data<
char>(), identity.size());
 
   56  B2ASSERT(
"The message is incomplete!", 
m_socket->get(zmq::sockopt::rcvmore) == 1);
 
   57  B2ASSERT(
"The app can only handle a single connection!",
 
   63  B2ASSERT(
"No message received", receivedBytes);
 
   64  B2ASSERT(
"The message is longer than expected! Increase the buffer size.", !receivedBytes->truncated());
 
   65  if (receivedBytes->size == 0) {
 
   69      log(
"socket_state", 
"connected");
 
   73      log(
"socket_state", 
"disconnected");
 
   76    return receivedMessages;
 
   80  if (receivedBytes->untruncated_size > remainingSpace) {
 
   81    B2FATAL(
"The size of the buffer is too small! " << receivedBytes->untruncated_size << 
" > " << remainingSpace);
 
   83  average(
"average_received_byte_packages", receivedBytes->size);
 
  119      unsigned int startAddress = 0;
 
  122        startAddress = 
sizeof(int);
 
  124      zmq::message_t dataMessage(&
m_buffer[startAddress], 
static_cast<size_t>(
m_currentSize - startAddress));
 
  133      receivedMessages.push_back(std::move(dataMessage));
 
  140  average(
"average_number_of_events_per_package", receivedMessages.size());
 
  141  return receivedMessages;
 
 
  149  log(
"data_size", 0.0);
 
  150  log(
"sent_events", 0l);
 
  151  log(
"event_rate", 0.0);
 
  153  log(
"socket_state", 
"disconnected");
 
  154  log(
"socket_connects", 0l);
 
  155  log(
"socket_disconnects", 0l);
 
 
  164  B2ASSERT(
"Data Socket needs to be connected", not 
m_dataIdentity.empty());
 
  166  const auto dataSize = message.size();
 
  168  average(
"data_size", dataSize);
 
  174    m_socket->send(std::move(message), zmq::send_flags::none);
 
  176    zmq::message_t tmpMessage(message.size() + 
sizeof(
int));
 
  177    const int messageSize = htonl(message.size());
 
  178    memcpy(tmpMessage.data<
char>(), &messageSize, 
sizeof(
int));
 
  179    memcpy(tmpMessage.data<
char>() + 
sizeof(
int), message.data(), message.size());
 
  180    m_socket->send(std::move(tmpMessage), zmq::send_flags::none);
 
 
  187  zmq::message_t identity;
 
  188  auto received = 
m_socket->recv(identity, zmq::recv_flags::none);
 
  189  B2ASSERT(
"No message received", received);
 
  190  zmq::message_t nullMessage;
 
  191  received = 
m_socket->recv(nullMessage, zmq::recv_flags::none);
 
  192  B2ASSERT(
"No message received", received);
 
  193  std::string identityString(identity.data<
char>(), identity.size());
 
  197    log(
"socket_state", 
"connected");
 
  200    B2ASSERT(
"The app can only handle a single connection!", 
m_dataIdentity == identityString);
 
  202    log(
"socket_state", 
"disconnected");
 
 
ZMQConnectionOverSocket(const std::shared_ptr< ZMQParent > &parent)
Create a new instance passing the shared ZMQParent.
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
bool m_addEventSize
Parameter to add the event size to a message.
void handleIncomingData()
Handle incoming data: a socket (dis)connect.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
ZMQRawOutput(const std::string &outputAddress, bool addEventSize, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output connection. The bind or connect behavior is chosen according to the given add...
bool isReady() const final
If no socket is connected, this connection is not ready.
std::string m_dataIdentity
Internal storage of the connected identity to no have multiple connections.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void average(const std::string &key, double value)
Instead of storing the double value directly under the given key, store the average of the last MAX_S...
Abstract base class for different kinds of events.