 |
Belle II Software
release-05-01-25
|
10 #include <framework/pcore/zmq/connections/ZMQRawConnection.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageHelper.h>
12 #include <framework/logging/Logger.h>
14 #include <arpa/inet.h>
20 m_maximalBufferSize(maximalBufferSize),
21 m_receiveEventMessages(receiveEventMessages)
24 log(
"data_size", 0.0);
25 log(
"received_events", 0l);
26 log(
"event_rate", 0.0);
27 log(
"average_received_byte_packages", 0.0);
29 log(
"socket_state",
"disconnected");
30 log(
"socket_connects", 0l);
31 log(
"socket_disconnects", 0l);
32 log(
"current_size", 0l);
33 log(
"write_address", 0l);
34 log(
"average_number_of_events_per_package", 0l);
50 std::vector<zmq::message_t> receivedMessages;
53 zmq::message_t identity;
55 std::string identityString(identity.data<
char>(), identity.size());
56 B2ASSERT(
"The message is incomplete!",
m_socket->getsockopt<
int>(ZMQ_RCVMORE) == 1);
57 B2ASSERT(
"The app can only handle a single connection!",
63 B2ASSERT(
"The message is longer than expected! Increase the buffer size.",
m_socket->getsockopt<
int>(ZMQ_RCVMORE) == 0);
64 if (receivedBytes == 0) {
68 log(
"socket_state",
"connected");
72 log(
"socket_state",
"disconnected");
75 return receivedMessages;
79 if (receivedBytes > remainingSpace) {
80 B2FATAL(
"The size of the buffer is too small! " << receivedBytes <<
" > " << remainingSpace);
82 average(
"average_received_byte_packages", receivedBytes);
118 unsigned int startAddress = 0;
121 startAddress =
sizeof(int);
123 zmq::message_t dataMessage(&
m_buffer[startAddress],
static_cast<size_t>(
m_currentSize - startAddress));
132 receivedMessages.push_back(std::move(dataMessage));
139 average(
"average_number_of_events_per_package", receivedMessages.size());
140 return receivedMessages;
145 parent), m_addEventSize(addEventSize)
148 log(
"data_size", 0.0);
149 log(
"sent_events", 0l);
150 log(
"event_rate", 0.0);
152 log(
"socket_state",
"disconnected");
153 log(
"socket_connects", 0l);
154 log(
"socket_disconnects", 0l);
163 B2ASSERT(
"Data Socket needs to be connected", not
m_dataIdentity.empty());
165 const auto dataSize = message.size();
167 average(
"data_size", dataSize);
175 zmq::message_t tmpMessage(message.size() +
sizeof(
int));
176 const int messageSize = htonl(message.size());
177 memcpy(tmpMessage.data<
char>(), &messageSize,
sizeof(
int));
178 memcpy(tmpMessage.data<
char>() +
sizeof(
int), message.data(), message.size());
179 m_socket->send(std::move(tmpMessage));
186 zmq::message_t identity;
188 zmq::message_t nullMessage;
190 std::string identityString(identity.data<
char>(), identity.size());
194 log(
"socket_state",
"connected");
197 B2ASSERT(
"The app can only handle a single connection!",
m_dataIdentity == identityString);
199 log(
"socket_state",
"disconnected");
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
std::string m_dataIdentity
Internal storage of the connected identity to no have multiple connections.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
void handleIncomingData()
Handle incoming data: a socket (dis)connect.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
bool m_addEventSize
Parameter to add the event size to a message.
Abstract base class for different kinds of events.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
bool isReady() const final
If no socket is connected, this connection is not ready.
Specialized connection over a ZMQ socket.
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
ZMQRawOutput(const std::string &outputAddress, bool addEventSize, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output connection. The bind or connect behavior is chosen according to the given add...