Belle II Software development
ZMQRawConnection.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <framework/pcore/zmq/connections/ZMQRawConnection.h>
9#include <framework/pcore/zmq/messages/ZMQMessageHelper.h>
10#include <framework/logging/Logger.h>
11
12#include <arpa/inet.h>
13
14using namespace Belle2;
15
16ZMQRawInput::ZMQRawInput(const std::string& inputAddress, unsigned int maximalBufferSize, bool receiveEventMessages,
17 const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(parent),
18 m_maximalBufferSize(maximalBufferSize),
19 m_receiveEventMessages(receiveEventMessages)
20{
21 // We clear all our internal state and counters
22 log("data_size", 0.0);
23 log("received_events", 0l);
24 log("event_rate", 0.0);
25 log("average_received_byte_packages", 0.0);
26
27 log("socket_state", "disconnected");
28 log("socket_connects", 0l);
29 log("socket_disconnects", 0l);
30 log("current_size", 0l);
31 log("write_address", 0l);
32 log("average_number_of_events_per_package", 0l);
33
34 // STREAM is the ZMQ type for raw, non ZMQ connections
35 m_socket = m_parent->createSocket<ZMQ_STREAM>(inputAddress);
36
37 m_buffer.reserve(maximalBufferSize);
38}
39
41{
43 m_currentSize = 0;
44}
45
46std::vector<zmq::message_t> ZMQRawInput::handleIncomingData()
47{
48 std::vector<zmq::message_t> receivedMessages;
49
50 // We will always get one or two parts. The first one is the identity..
51 zmq::message_t identity;
52 auto received = m_socket->recv(identity, zmq::recv_flags::none);
53 B2ASSERT("No message received", received);
54 B2ASSERT("Message should not be empty", *received > 0);
55 std::string identityString(identity.data<char>(), identity.size());
56 B2ASSERT("The message is incomplete!", m_socket->get(zmq::sockopt::rcvmore) == 1);
57 B2ASSERT("The app can only handle a single connection!",
58 m_inputIdentity == identityString or m_inputIdentity.empty());
59
60 // ... and the second one is the message itself.
61 const size_t remainingSpace = m_maximalBufferSize - m_writeAddress;
62 auto receivedBytes = m_socket->recv(zmq::mutable_buffer{&m_buffer[m_writeAddress], remainingSpace}, zmq::recv_flags::none);
63 B2ASSERT("No message received", receivedBytes);
64 B2ASSERT("The message is longer than expected! Increase the buffer size.", !receivedBytes->truncated());
65 if (receivedBytes->size == 0) {
66 // Empty message means the client connected or disconnected
67 if (m_inputIdentity.empty()) {
68 m_inputIdentity = identityString;
69 log("socket_state", "connected");
70 increment("socket_connects");
71 } else {
72 m_inputIdentity = "";
73 log("socket_state", "disconnected");
74 increment("socket_disconnects");
75 }
76 return receivedMessages;
77 }
78 // We can maximal write `remainingSpace` into the buffer. If the message was longer, ZMQ will just cut it.
79 // This means we are loosing data and the buffer size should be increased.
80 if (receivedBytes->untruncated_size > remainingSpace) {
81 B2FATAL("The size of the buffer is too small! " << receivedBytes->untruncated_size << " > " << remainingSpace);
82 }
83 average("average_received_byte_packages", receivedBytes->size);
84
85 // `m_writeAddress` always points to the index on where we will write next.
86 // As we have written `receivedBytes` we need to advance
87 m_writeAddress += receivedBytes->size;
88
89 log("write_address", static_cast<long>(m_writeAddress));
90
91 // If the current buffer is smaller than an int, we can not get the size
92 while (m_writeAddress >= sizeof(int)) {
93 if (m_currentSize == 0) {
94 // we do not know the size of the data package already, so lets get it from the buffer.
95 // It is always in the first sizeof(int) of the data
96 memcpy(&m_currentSize, &m_buffer[0], sizeof(int));
97
98 // Here the two different message formats differ
101 }
102 B2ASSERT("Strange size in the data!", m_currentSize > 0);
104 m_currentSize += sizeof(int);
105 } else {
106 m_currentSize *= sizeof(int);
107 }
108
109 log("current_size", static_cast<long>(m_currentSize));
110 }
112 // Now we know the size already, and we have enough data received so we have actually the full
113 // data. We can build the total message.
114 average("data_size", m_currentSize);
115 increment("received_events");
116 timeit("event_rate");
117
118 // Again, here the two different message formats differ. One includes the first int with the length...
119 unsigned int startAddress = 0;
121 // .. and the other does not
122 startAddress = sizeof(int);
123 }
124 zmq::message_t dataMessage(&m_buffer[startAddress], static_cast<size_t>(m_currentSize - startAddress));
125
128 m_currentSize = 0;
129
130 log("write_address", static_cast<long>(m_writeAddress));
131 log("current_size", static_cast<long>(m_currentSize));
132
133 receivedMessages.push_back(std::move(dataMessage));
134 } else {
135 // We did not receive a full message up to now
136 break;
137 }
138 }
139
140 average("average_number_of_events_per_package", receivedMessages.size());
141 return receivedMessages;
142}
143
144ZMQRawOutput::ZMQRawOutput(const std::string& outputAddress, bool addEventSize,
145 const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(
146 parent), m_addEventSize(addEventSize)
147{
148 // We clear all our internal state and counters
149 log("data_size", 0.0);
150 log("sent_events", 0l);
151 log("event_rate", 0.0);
152
153 log("socket_state", "disconnected");
154 log("socket_connects", 0l);
155 log("socket_disconnects", 0l);
156
157 // STREAM is the ZMQ type for raw, non ZMQ connections
158 m_socket = m_parent->createSocket<ZMQ_STREAM>(outputAddress);
159}
160
161void ZMQRawOutput::handleEvent(zmq::message_t message)
162{
163 // Send the message. If requested, add the message size in front of the message
164 B2ASSERT("Data Socket needs to be connected", not m_dataIdentity.empty());
165
166 const auto dataSize = message.size();
167
168 average("data_size", dataSize);
169 increment("sent_events");
170 timeit("event_rate");
171
172 m_socket->send(ZMQMessageHelper::createZMQMessage(m_dataIdentity), zmq::send_flags::sndmore);
173 if (not m_addEventSize) {
174 m_socket->send(std::move(message), zmq::send_flags::none);
175 } else {
176 zmq::message_t tmpMessage(message.size() + sizeof(int));
177 const int messageSize = htonl(message.size());
178 memcpy(tmpMessage.data<char>(), &messageSize, sizeof(int));
179 memcpy(tmpMessage.data<char>() + sizeof(int), message.data(), message.size());
180 m_socket->send(std::move(tmpMessage), zmq::send_flags::none);
181 }
182}
183
185{
186 // The only possibility that we can receive a message is when the client connects or disconnects
187 zmq::message_t identity;
188 auto received = m_socket->recv(identity, zmq::recv_flags::none);
189 B2ASSERT("No message received", received);
190 zmq::message_t nullMessage;
191 received = m_socket->recv(nullMessage, zmq::recv_flags::none);
192 B2ASSERT("No message received", received);
193 std::string identityString(identity.data<char>(), identity.size());
194
195 if (m_dataIdentity.empty()) {
196 m_dataIdentity = identityString;
197 log("socket_state", "connected");
198 increment("socket_connects");
199 } else {
200 B2ASSERT("The app can only handle a single connection!", m_dataIdentity == identityString);
201 m_dataIdentity = "";
202 log("socket_state", "disconnected");
203 increment("socket_disconnects");
204 }
205}
206
208{
209 // Only ready of the client connected
210 return not m_dataIdentity.empty();
211}
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:75
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:77
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
unsigned int m_maximalBufferSize
Parameter for the maximal buffer size. If this size is reached, a FATAL will be issued.
size_t m_writeAddress
Where in the buffer are we currently writing to.
std::vector< char > m_buffer
Internal storage for the buffer.
unsigned int m_currentSize
How large should the full message be? The information is from the first int of the message.
bool m_receiveEventMessages
Parameter to receive event messages (see above)
ZMQRawInput(const std::string &inputAddress, unsigned int maximalBufferSize, bool receiveEventMessages, const std::shared_ptr< ZMQParent > &parent)
Create a new raw input connection. The bind or connect behavior is chosen according to the given addr...
void clear()
Reset the internal buffer and counter.
std::vector< zmq::message_t > handleIncomingData()
Block until a TCP packet can be received from the socket.
std::string m_inputIdentity
Internal storage of the connected socket to check if we get messages from multiple ones.
bool m_addEventSize
Parameter to add the event size to a message.
void handleIncomingData()
Handle incoming data: a socket (dis)connect.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
ZMQRawOutput(const std::string &outputAddress, bool addEventSize, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output connection. The bind or connect behavior is chosen according to the given add...
bool isReady() const final
If no socket is connected, this connection is not ready.
std::string m_dataIdentity
Internal storage of the connected identity to no have multiple connections.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.