Belle II Software  release-05-01-25
ZMQRawConnection.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <framework/pcore/zmq/connections/ZMQRawConnection.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageHelper.h>
12 #include <framework/logging/Logger.h>
13 
14 #include <arpa/inet.h>
15 
16 using namespace Belle2;
17 
18 ZMQRawInput::ZMQRawInput(const std::string& inputAddress, unsigned int maximalBufferSize, bool receiveEventMessages,
19  const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(parent),
20  m_maximalBufferSize(maximalBufferSize),
21  m_receiveEventMessages(receiveEventMessages)
22 {
23  // We clear all our internal state and counters
24  log("data_size", 0.0);
25  log("received_events", 0l);
26  log("event_rate", 0.0);
27  log("average_received_byte_packages", 0.0);
28 
29  log("socket_state", "disconnected");
30  log("socket_connects", 0l);
31  log("socket_disconnects", 0l);
32  log("current_size", 0l);
33  log("write_address", 0l);
34  log("average_number_of_events_per_package", 0l);
35 
36  // STREAM is the ZMQ type for raw, non ZMQ connections
37  m_socket = m_parent->createSocket<ZMQ_STREAM>(inputAddress);
38 
39  m_buffer.reserve(maximalBufferSize);
40 }
41 
43 {
44  m_writeAddress = 0;
45  m_currentSize = 0;
46 }
47 
48 std::vector<zmq::message_t> ZMQRawInput::handleIncomingData()
49 {
50  std::vector<zmq::message_t> receivedMessages;
51 
52  // We will always get one or two parts. The first one is the identity..
53  zmq::message_t identity;
54  m_socket->recv(&identity);
55  std::string identityString(identity.data<char>(), identity.size());
56  B2ASSERT("The message is incomplete!", m_socket->getsockopt<int>(ZMQ_RCVMORE) == 1);
57  B2ASSERT("The app can only handle a single connection!",
58  m_inputIdentity == identityString or m_inputIdentity.empty());
59 
60  // ... and the second one is the message itself.
61  const size_t remainingSpace = m_maximalBufferSize - m_writeAddress;
62  const size_t receivedBytes = m_socket->recv(&m_buffer[m_writeAddress], remainingSpace);
63  B2ASSERT("The message is longer than expected! Increase the buffer size.", m_socket->getsockopt<int>(ZMQ_RCVMORE) == 0);
64  if (receivedBytes == 0) {
65  // Empty message means the client connected or disconnected
66  if (m_inputIdentity.empty()) {
67  m_inputIdentity = identityString;
68  log("socket_state", "connected");
69  increment("socket_connects");
70  } else {
71  m_inputIdentity = "";
72  log("socket_state", "disconnected");
73  increment("socket_disconnects");
74  }
75  return receivedMessages;
76  }
77  // We can maximal write `remainingSpace` into the buffer. If the message was longer, ZMQ will just cut it.
78  // This means we are loosing data and the buffer size should be increased.
79  if (receivedBytes > remainingSpace) {
80  B2FATAL("The size of the buffer is too small! " << receivedBytes << " > " << remainingSpace);
81  }
82  average("average_received_byte_packages", receivedBytes);
83 
84  // `m_writeAddress` always points to the index on where we will write next.
85  // As we have written `receivedBytes` we need to advance
86  m_writeAddress += receivedBytes;
87 
88  log("write_address", static_cast<long>(m_writeAddress));
89 
90  // If the current buffer is smaller than an int, we can not get the size
91  while (m_writeAddress >= sizeof(int)) {
92  if (m_currentSize == 0) {
93  // we do not know the size of the data package already, so lets get it from the buffer.
94  // It is always in the first sizeof(int) of the data
95  memcpy(&m_currentSize, &m_buffer[0], sizeof(int));
96 
97  // Here the two different message formats differ
100  }
101  B2ASSERT("Strange size in the data!", m_currentSize > 0);
103  m_currentSize += sizeof(int);
104  } else {
105  m_currentSize *= sizeof(int);
106  }
107 
108  log("current_size", static_cast<long>(m_currentSize));
109  }
110  if (m_writeAddress >= m_currentSize) {
111  // Now we know the size already, and we have enough data received so we have actually the full
112  // data. We can build the total message.
113  average("data_size", m_currentSize);
114  increment("received_events");
115  timeit("event_rate");
116 
117  // Again, here the two different message formats differ. One includes the first int with the length...
118  unsigned int startAddress = 0;
120  // .. and the other does not
121  startAddress = sizeof(int);
122  }
123  zmq::message_t dataMessage(&m_buffer[startAddress], static_cast<size_t>(m_currentSize - startAddress));
124 
127  m_currentSize = 0;
128 
129  log("write_address", static_cast<long>(m_writeAddress));
130  log("current_size", static_cast<long>(m_currentSize));
131 
132  receivedMessages.push_back(std::move(dataMessage));
133  } else {
134  // We did not receive a full message up to now
135  break;
136  }
137  }
138 
139  average("average_number_of_events_per_package", receivedMessages.size());
140  return receivedMessages;
141 }
142 
143 ZMQRawOutput::ZMQRawOutput(const std::string& outputAddress, bool addEventSize,
144  const std::shared_ptr<ZMQParent>& parent) : ZMQConnectionOverSocket(
145  parent), m_addEventSize(addEventSize)
146 {
147  // We clear all our internal state and counters
148  log("data_size", 0.0);
149  log("sent_events", 0l);
150  log("event_rate", 0.0);
151 
152  log("socket_state", "disconnected");
153  log("socket_connects", 0l);
154  log("socket_disconnects", 0l);
155 
156  // STREAM is the ZMQ type for raw, non ZMQ connections
157  m_socket = m_parent->createSocket<ZMQ_STREAM>(outputAddress);
158 }
159 
160 void ZMQRawOutput::handleEvent(zmq::message_t message)
161 {
162  // Send the message. If requested, add the message size in front of the message
163  B2ASSERT("Data Socket needs to be connected", not m_dataIdentity.empty());
164 
165  const auto dataSize = message.size();
166 
167  average("data_size", dataSize);
168  increment("sent_events");
169  timeit("event_rate");
170 
172  if (not m_addEventSize) {
173  m_socket->send(std::move(message));
174  } else {
175  zmq::message_t tmpMessage(message.size() + sizeof(int));
176  const int messageSize = htonl(message.size());
177  memcpy(tmpMessage.data<char>(), &messageSize, sizeof(int));
178  memcpy(tmpMessage.data<char>() + sizeof(int), message.data(), message.size());
179  m_socket->send(std::move(tmpMessage));
180  }
181 }
182 
184 {
185  // The only possibility that we can receive a message is when the client connects or disconnects
186  zmq::message_t identity;
187  m_socket->recv(&identity);
188  zmq::message_t nullMessage;
189  m_socket->recv(&nullMessage);
190  std::string identityString(identity.data<char>(), identity.size());
191 
192  if (m_dataIdentity.empty()) {
193  m_dataIdentity = identityString;
194  log("socket_state", "connected");
195  increment("socket_connects");
196  } else {
197  B2ASSERT("The app can only handle a single connection!", m_dataIdentity == identityString);
198  m_dataIdentity = "";
199  log("socket_state", "disconnected");
200  increment("socket_disconnects");
201  }
202 }
203 
205 {
206  // Only ready of the client connected
207  return not m_dataIdentity.empty();
208 }
Belle2::ZMQLogger::timeit
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
Definition: ZMQLogger.h:127
Belle2::ZMQRawOutput::m_dataIdentity
std::string m_dataIdentity
Internal storage of the connected identity to no have multiple connections.
Definition: ZMQRawConnection.h:111
Belle2::ZMQRawInput::m_receiveEventMessages
bool m_receiveEventMessages
Parameter to receive event messages (see above)
Definition: ZMQRawConnection.h:79
Belle2::ZMQMessageHelper::createZMQMessage
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
Definition: ZMQMessageHelper.h:39
Belle2::ZMQConnectionOverSocket::m_parent
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
Definition: ZMQConnection.h:82
Belle2::ZMQRawInput::clear
void clear()
Reset the internal buffer and counter.
Definition: ZMQRawConnection.cc:42
Belle2::ZMQRawOutput::handleIncomingData
void handleIncomingData()
Handle incoming data: a socket (dis)connect.
Definition: ZMQRawConnection.cc:183
Belle2::ZMQLogger::log
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:106
Belle2::ZMQRawInput::m_maximalBufferSize
unsigned int m_maximalBufferSize
Parameter for the maximal buffer size. If this size is reached, a FATAL will be issued.
Definition: ZMQRawConnection.h:71
Belle2::ZMQLogger::increment
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:34
Belle2::ZMQRawOutput::m_addEventSize
bool m_addEventSize
Parameter to add the event size to a message.
Definition: ZMQRawConnection.h:113
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQRawInput::m_inputIdentity
std::string m_inputIdentity
Internal storage of the connected socket to check if we get messages from multiple ones.
Definition: ZMQRawConnection.h:81
Belle2::ZMQRawOutput::handleEvent
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
Definition: ZMQRawConnection.cc:160
Belle2::ZMQRawInput::m_currentSize
unsigned int m_currentSize
How large should the full message be? The information is from the first int of the message.
Definition: ZMQRawConnection.h:77
Belle2::ZMQLogger::average
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:112
Belle2::ZMQRawInput::handleIncomingData
std::vector< zmq::message_t > handleIncomingData()
Block until a TCP packet can be received from the socket.
Definition: ZMQRawConnection.cc:48
Belle2::ZMQRawInput::m_buffer
std::vector< char > m_buffer
Internal storage for the buffer.
Definition: ZMQRawConnection.h:73
Belle2::ZMQRawOutput::isReady
bool isReady() const final
If no socket is connected, this connection is not ready.
Definition: ZMQRawConnection.cc:204
Belle2::ZMQConnectionOverSocket
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:72
Belle2::ZMQRawInput::ZMQRawInput
ZMQRawInput(const std::string &inputAddress, unsigned int maximalBufferSize, bool receiveEventMessages, const std::shared_ptr< ZMQParent > &parent)
Create a new raw input connection. The bind or connect behavior is chosen according to the given addr...
Definition: ZMQRawConnection.cc:18
Belle2::ZMQConnectionOverSocket::m_socket
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:84
Belle2::ZMQRawOutput::ZMQRawOutput
ZMQRawOutput(const std::string &outputAddress, bool addEventSize, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output connection. The bind or connect behavior is chosen according to the given add...
Definition: ZMQRawConnection.cc:143
Belle2::ZMQRawInput::m_writeAddress
size_t m_writeAddress
Where in the buffer are we currently writing to.
Definition: ZMQRawConnection.h:75