Belle II Software  release-05-01-25
ZMQConnection.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <framework/pcore/zmq/connections/ZMQConnection.h>
11 
12 #include <string>
13 #include <memory>
14 
15 using namespace Belle2;
16 
18 {
19  return true;
20 }
21 
23 {
24  // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
25  const auto emptyFunction = []() {};
26  return ZMQConnection::poll({{connection, emptyFunction}}, 0);
27 }
28 
29 bool ZMQConnection::poll(const std::map<const ZMQConnection*, ZMQConnection::ReactorFunction>& connectionList, int timeout)
30 {
31  std::vector<const ReactorFunction*> socketMapping;
32  std::vector<zmq::pollitem_t> pollItems;
33 
34  // zmq needs a special format for its polling, so create it here.
35  for (const auto& [connection, function] : connectionList) {
36  auto sockets = connection->getSockets();
37  for (zmq::socket_t* socket : sockets) {
38  zmq::pollitem_t pollItem;
39  pollItem.socket = static_cast<void*>(*socket);
40  pollItem.events = ZMQ_POLLIN;
41  pollItem.revents = 0;
42  pollItems.push_back(std::move(pollItem));
43 
44  // but keep reference to the original function, so we can call the correct one later
45  socketMapping.push_back(&function);
46  }
47  }
48 
49  if (pollItems.empty()) {
50  return false;
51  }
52 
53  try {
54  zmq::poll(&pollItems[0], pollItems.size(), timeout);
55 
56  bool anySocket = false;
57  unsigned int counter = 0;
58  for (const auto& pollItem : pollItems) {
59  if (pollItem.revents & ZMQ_POLLIN) {
60  anySocket = true;
61  const auto* functionPtr = socketMapping.at(counter);
62  const auto function = *functionPtr;
63  function();
64  }
65  counter++;
66  }
67 
68  return anySocket;
69  } catch (zmq::error_t& error) {
70  if (error.num() == EINTR) {
71  // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
72  return false;
73  } else {
74  // cannot handle, rethrow exception
75  throw;
76  }
77  }
78 }
79 
80 ZMQConnectionOverSocket::ZMQConnectionOverSocket(const std::shared_ptr<ZMQParent>& parent) : m_parent(parent)
81 {
82 
83 }
84 
85 std::vector<zmq::socket_t*> ZMQConnectionOverSocket::getSockets() const
86 {
87  return {m_socket.get()};
88 }
Belle2::ZMQConnection::isReady
virtual bool isReady() const
Return true of this connection is able to send messages right now. Can be overloaded in derived class...
Definition: ZMQConnection.cc:17
Belle2::ZMQConnection
Base class for every connection with virtual functions to be implemented:
Definition: ZMQConnection.h:40
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQConnection::poll
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
Definition: ZMQConnection.cc:29
Belle2::ZMQConnectionOverSocket::ZMQConnectionOverSocket
ZMQConnectionOverSocket(const std::shared_ptr< ZMQParent > &parent)
Create a new instance passing the shared ZMQParent.
Definition: ZMQConnection.cc:80
Belle2::ZMQConnection::hasMessage
static bool hasMessage(const ZMQConnection *connection)
Check if the given connection as an incoming message (right now, no waiting).
Definition: ZMQConnection.cc:22
Belle2::ZMQConnectionOverSocket::m_socket
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
Definition: ZMQConnection.h:84
Belle2::ZMQConnectionOverSocket::getSockets
std::vector< zmq::socket_t * > getSockets() const final
The socket used for polling is just the stored socket.
Definition: ZMQConnection.cc:85