Belle II Software  release-08-01-10
ZMQRawConnection.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/zmq/connections/ZMQRawConnection.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageHelper.h>
10 #include <framework/logging/Logger.h>
11 
12 #include <arpa/inet.h>
13 
14 using namespace Belle2;
15 
16 ZMQRawInput::ZMQRawInput(const std::string& inputAddress, unsigned int maximalBufferSize, bool receiveEventMessages,
17  const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(parent),
18  m_maximalBufferSize(maximalBufferSize),
19  m_receiveEventMessages(receiveEventMessages)
20 {
21  // We clear all our internal state and counters
22  log("data_size", 0.0);
23  log("received_events", 0l);
24  log("event_rate", 0.0);
25  log("average_received_byte_packages", 0.0);
26 
27  log("socket_state", "disconnected");
28  log("socket_connects", 0l);
29  log("socket_disconnects", 0l);
30  log("current_size", 0l);
31  log("write_address", 0l);
32  log("average_number_of_events_per_package", 0l);
33 
34  // STREAM is the ZMQ type for raw, non ZMQ connections
35  m_socket = m_parent->createSocket<ZMQ_STREAM>(inputAddress);
36 
37  m_buffer.reserve(maximalBufferSize);
38 }
39 
41 {
42  m_writeAddress = 0;
43  m_currentSize = 0;
44 }
45 
46 std::vector<zmq::message_t> ZMQRawInput::handleIncomingData()
47 {
48  std::vector<zmq::message_t> receivedMessages;
49 
50  // We will always get one or two parts. The first one is the identity..
51  zmq::message_t identity;
52  auto received = m_socket->recv(identity, zmq::recv_flags::none);
53  B2ASSERT("No message received", received);
54  B2ASSERT("Message should not be empty", *received > 0);
55  std::string identityString(identity.data<char>(), identity.size());
56  B2ASSERT("The message is incomplete!", m_socket->get(zmq::sockopt::rcvmore) == 1);
57  B2ASSERT("The app can only handle a single connection!",
58  m_inputIdentity == identityString or m_inputIdentity.empty());
59 
60  // ... and the second one is the message itself.
61  const size_t remainingSpace = m_maximalBufferSize - m_writeAddress;
62  auto receivedBytes = m_socket->recv(zmq::mutable_buffer{&m_buffer[m_writeAddress], remainingSpace}, zmq::recv_flags::none);
63  B2ASSERT("No message received", receivedBytes);
64  B2ASSERT("The message is longer than expected! Increase the buffer size.", !receivedBytes->truncated());
65  if (receivedBytes->size == 0) {
66  // Empty message means the client connected or disconnected
67  if (m_inputIdentity.empty()) {
68  m_inputIdentity = identityString;
69  log("socket_state", "connected");
70  increment("socket_connects");
71  } else {
72  m_inputIdentity = "";
73  log("socket_state", "disconnected");
74  increment("socket_disconnects");
75  }
76  return receivedMessages;
77  }
78  // We can maximal write `remainingSpace` into the buffer. If the message was longer, ZMQ will just cut it.
79  // This means we are loosing data and the buffer size should be increased.
80  if (receivedBytes->untruncated_size > remainingSpace) {
81  B2FATAL("The size of the buffer is too small! " << receivedBytes->untruncated_size << " > " << remainingSpace);
82  }
83  average("average_received_byte_packages", receivedBytes->size);
84 
85  // `m_writeAddress` always points to the index on where we will write next.
86  // As we have written `receivedBytes` we need to advance
87  m_writeAddress += receivedBytes->size;
88 
89  log("write_address", static_cast<long>(m_writeAddress));
90 
91  // If the current buffer is smaller than an int, we can not get the size
92  while (m_writeAddress >= sizeof(int)) {
93  if (m_currentSize == 0) {
94  // we do not know the size of the data package already, so lets get it from the buffer.
95  // It is always in the first sizeof(int) of the data
96  memcpy(&m_currentSize, &m_buffer[0], sizeof(int));
97 
98  // Here the two different message formats differ
100  m_currentSize = ntohl(m_currentSize);
101  }
102  B2ASSERT("Strange size in the data!", m_currentSize > 0);
104  m_currentSize += sizeof(int);
105  } else {
106  m_currentSize *= sizeof(int);
107  }
108 
109  log("current_size", static_cast<long>(m_currentSize));
110  }
111  if (m_writeAddress >= m_currentSize) {
112  // Now we know the size already, and we have enough data received so we have actually the full
113  // data. We can build the total message.
114  average("data_size", m_currentSize);
115  increment("received_events");
116  timeit("event_rate");
117 
118  // Again, here the two different message formats differ. One includes the first int with the length...
119  unsigned int startAddress = 0;
121  // .. and the other does not
122  startAddress = sizeof(int);
123  }
124  zmq::message_t dataMessage(&m_buffer[startAddress], static_cast<size_t>(m_currentSize - startAddress));
125 
128  m_currentSize = 0;
129 
130  log("write_address", static_cast<long>(m_writeAddress));
131  log("current_size", static_cast<long>(m_currentSize));
132 
133  receivedMessages.push_back(std::move(dataMessage));
134  } else {
135  // We did not receive a full message up to now
136  break;
137  }
138  }
139 
140  average("average_number_of_events_per_package", receivedMessages.size());
141  return receivedMessages;
142 }
143 
144 ZMQRawOutput::ZMQRawOutput(const std::string& outputAddress, bool addEventSize,
145  const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(
146  parent), m_addEventSize(addEventSize)
147 {
148  // We clear all our internal state and counters
149  log("data_size", 0.0);
150  log("sent_events", 0l);
151  log("event_rate", 0.0);
152 
153  log("socket_state", "disconnected");
154  log("socket_connects", 0l);
155  log("socket_disconnects", 0l);
156 
157  // STREAM is the ZMQ type for raw, non ZMQ connections
158  m_socket = m_parent->createSocket<ZMQ_STREAM>(outputAddress);
159 }
160 
161 void ZMQRawOutput::handleEvent(zmq::message_t message)
162 {
163  // Send the message. If requested, add the message size in front of the message
164  B2ASSERT("Data Socket needs to be connected", not m_dataIdentity.empty());
165 
166  const auto dataSize = message.size();
167 
168  average("data_size", dataSize);
169  increment("sent_events");
170  timeit("event_rate");
171 
172  m_socket->send(ZMQMessageHelper::createZMQMessage(m_dataIdentity), zmq::send_flags::sndmore);
173  if (not m_addEventSize) {
174  m_socket->send(std::move(message), zmq::send_flags::none);
175  } else {
176  zmq::message_t tmpMessage(message.size() + sizeof(int));
177  const int messageSize = htonl(message.size());
178  memcpy(tmpMessage.data<char>(), &messageSize, sizeof(int));
179  memcpy(tmpMessage.data<char>() + sizeof(int), message.data(), message.size());
180  m_socket->send(std::move(tmpMessage), zmq::send_flags::none);
181  }
182 }
183 
185 {
186  // The only possibility that we can receive a message is when the client connects or disconnects
187  zmq::message_t identity;
188  auto received = m_socket->recv(identity, zmq::recv_flags::none);
189  B2ASSERT("No message received", received);
190  zmq::message_t nullMessage;
191  received = m_socket->recv(nullMessage, zmq::recv_flags::none);
192  B2ASSERT("No message received", received);
193  std::string identityString(identity.data<char>(), identity.size());
194 
195  if (m_dataIdentity.empty()) {
196  m_dataIdentity = identityString;
197  log("socket_state", "connected");
198  increment("socket_connects");
199  } else {
200  B2ASSERT("The app can only handle a single connection!", m_dataIdentity == identityString);
201  m_dataIdentity = "";
202  log("socket_state", "disconnected");
203  increment("socket_disconnects");
204  }
205 }
206 
208 {
209  // Only ready of the client connected
210  return not m_dataIdentity.empty();
211 }
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:62
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:74
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:76
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
unsigned int m_maximalBufferSize
Parameter for the maximal buffer size. If this size is reached, a FATAL will be issued.
size_t m_writeAddress
Where in the buffer are we currently writing to.
std::vector< char > m_buffer
Internal storage for the buffer.
unsigned int m_currentSize
How large should the full message be? The information is from the first int of the message.
bool m_receiveEventMessages
Parameter to receive event messages (see above)
ZMQRawInput(const std::string &inputAddress, unsigned int maximalBufferSize, bool receiveEventMessages, const std::shared_ptr< ZMQParent > &parent)
Create a new raw input connection. The bind or connect behavior is chosen according to the given addr...
void clear()
Reset the internal buffer and counter.
std::vector< zmq::message_t > handleIncomingData()
Block until a TCP packet can be received from the socket.
std::string m_inputIdentity
Internal storage of the connected socket to check if we get messages from multiple ones.
bool m_addEventSize
Parameter to add the event size to a message.
void handleIncomingData()
Handle incoming data: a socket (dis)connect.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
ZMQRawOutput(const std::string &outputAddress, bool addEventSize, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output connection. The bind or connect behavior is chosen according to the given add...
bool isReady() const final
If no socket is connected, this connection is not ready.
std::string m_dataIdentity
Internal storage of the connected identity to no have multiple connections.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:117
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.