8 #include <framework/pcore/zmq/connections/ZMQRawConnection.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageHelper.h>
10 #include <framework/logging/Logger.h>
12 #include <arpa/inet.h>
18 m_maximalBufferSize(maximalBufferSize),
19 m_receiveEventMessages(receiveEventMessages)
22 log(
"data_size", 0.0);
23 log(
"received_events", 0l);
24 log(
"event_rate", 0.0);
25 log(
"average_received_byte_packages", 0.0);
27 log(
"socket_state",
"disconnected");
28 log(
"socket_connects", 0l);
29 log(
"socket_disconnects", 0l);
30 log(
"current_size", 0l);
31 log(
"write_address", 0l);
32 log(
"average_number_of_events_per_package", 0l);
48 std::vector<zmq::message_t> receivedMessages;
51 zmq::message_t identity;
52 auto received =
m_socket->recv(identity, zmq::recv_flags::none);
53 B2ASSERT(
"No message received", received);
54 B2ASSERT(
"Message should not be empty", *received > 0);
55 std::string identityString(identity.data<
char>(), identity.size());
56 B2ASSERT(
"The message is incomplete!",
m_socket->get(zmq::sockopt::rcvmore) == 1);
57 B2ASSERT(
"The app can only handle a single connection!",
63 B2ASSERT(
"No message received", receivedBytes);
64 B2ASSERT(
"The message is longer than expected! Increase the buffer size.", !receivedBytes->truncated());
65 if (receivedBytes->size == 0) {
69 log(
"socket_state",
"connected");
73 log(
"socket_state",
"disconnected");
76 return receivedMessages;
80 if (receivedBytes->untruncated_size > remainingSpace) {
81 B2FATAL(
"The size of the buffer is too small! " << receivedBytes->untruncated_size <<
" > " << remainingSpace);
83 average(
"average_received_byte_packages", receivedBytes->size);
119 unsigned int startAddress = 0;
122 startAddress =
sizeof(int);
124 zmq::message_t dataMessage(&
m_buffer[startAddress],
static_cast<size_t>(
m_currentSize - startAddress));
133 receivedMessages.push_back(std::move(dataMessage));
140 average(
"average_number_of_events_per_package", receivedMessages.size());
141 return receivedMessages;
146 parent), m_addEventSize(addEventSize)
149 log(
"data_size", 0.0);
150 log(
"sent_events", 0l);
151 log(
"event_rate", 0.0);
153 log(
"socket_state",
"disconnected");
154 log(
"socket_connects", 0l);
155 log(
"socket_disconnects", 0l);
164 B2ASSERT(
"Data Socket needs to be connected", not
m_dataIdentity.empty());
166 const auto dataSize = message.size();
168 average(
"data_size", dataSize);
174 m_socket->send(std::move(message), zmq::send_flags::none);
176 zmq::message_t tmpMessage(message.size() +
sizeof(
int));
177 const int messageSize = htonl(message.size());
178 memcpy(tmpMessage.data<
char>(), &messageSize,
sizeof(
int));
179 memcpy(tmpMessage.data<
char>() +
sizeof(
int), message.data(), message.size());
180 m_socket->send(std::move(tmpMessage), zmq::send_flags::none);
187 zmq::message_t identity;
188 auto received =
m_socket->recv(identity, zmq::recv_flags::none);
189 B2ASSERT(
"No message received", received);
190 zmq::message_t nullMessage;
191 received =
m_socket->recv(nullMessage, zmq::recv_flags::none);
192 B2ASSERT(
"No message received", received);
193 std::string identityString(identity.data<
char>(), identity.size());
197 log(
"socket_state",
"connected");
200 B2ASSERT(
"The app can only handle a single connection!",
m_dataIdentity == identityString);
202 log(
"socket_state",
"disconnected");
Specialized connection over a ZMQ socket.
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
bool m_addEventSize
Parameter to add the event size to a message.
void handleIncomingData()
Handle incoming data: a socket (dis)connect.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
ZMQRawOutput(const std::string &outputAddress, bool addEventSize, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output connection. The bind or connect behavior is chosen according to the given add...
bool isReady() const final
If no socket is connected, this connection is not ready.
std::string m_dataIdentity
Internal storage of the connected identity to no have multiple connections.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Abstract base class for different kinds of events.