Belle II Software  release-08-01-10
ZMQConnection.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/zmq/connections/ZMQConnection.h>
9 
10 #include <string>
11 #include <memory>
12 
13 using namespace Belle2;
14 
16 {
17  return true;
18 }
19 
21 {
22  // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
23  const auto emptyFunction = []() {};
24  return ZMQConnection::poll({{connection, emptyFunction}}, 0);
25 }
26 
27 bool ZMQConnection::poll(const std::map<const ZMQConnection*, ZMQConnection::ReactorFunction>& connectionList, int timeout)
28 {
29  std::vector<const ReactorFunction*> socketMapping;
30  std::vector<zmq::pollitem_t> pollItems;
31 
32  // zmq needs a special format for its polling, so create it here.
33  for (const auto& [connection, function] : connectionList) {
34  auto sockets = connection->getSockets();
35  for (zmq::socket_t* socket : sockets) {
36  zmq::pollitem_t pollItem;
37  pollItem.socket = static_cast<void*>(*socket);
38  pollItem.events = ZMQ_POLLIN;
39  pollItem.revents = 0;
40  pollItems.push_back(std::move(pollItem));
41 
42  // but keep reference to the original function, so we can call the correct one later
43  socketMapping.push_back(&function);
44  }
45  }
46 
47  if (pollItems.empty()) {
48  return false;
49  }
50 
51  try {
52  zmq::poll(&pollItems[0], pollItems.size(), timeout);
53 
54  bool anySocket = false;
55  unsigned int counter = 0;
56  for (const auto& pollItem : pollItems) {
57  if (pollItem.revents & ZMQ_POLLIN) {
58  anySocket = true;
59  const auto* functionPtr = socketMapping.at(counter);
60  const auto function = *functionPtr;
61  function();
62  }
63  counter++;
64  }
65 
66  return anySocket;
67  } catch (zmq::error_t& error) {
68  if (error.num() == EINTR) {
69  // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
70  return false;
71  } else {
72  // cannot handle, rethrow exception
73  throw;
74  }
75  }
76 }
77 
78 ZMQConnectionOverSocket::ZMQConnectionOverSocket(const std::shared_ptr<ZMQParent>& parent) : m_parent(parent)
79 {
80 }
81 
82 std::vector<zmq::socket_t*> ZMQConnectionOverSocket::getSockets() const
83 {
84  return {m_socket.get()};
85 }
86 
88 {
89  std::string endpoint{""};
90  if (m_socket) {
91  endpoint = m_socket->get(zmq::sockopt::last_endpoint);
92  }
93  return endpoint;
94 }
ZMQConnectionOverSocket(const std::shared_ptr< ZMQParent > &parent)
Create a new instance passing the shared ZMQParent.
std::string getEndPoint() const
Return the connection string for this socket.
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:76
std::vector< zmq::socket_t * > getSockets() const final
The socket used for polling is just the stored socket.
Base class for every connection with virtual functions to be implemented:
Definition: ZMQConnection.h:30
virtual bool isReady() const
Return true of this connection is able to send messages right now. Can be overloaded in derived class...
static bool hasMessage(const ZMQConnection *connection)
Check if the given connection as an incoming message (right now, no waiting).
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
Abstract base class for different kinds of events.