Belle II Software  release-06-02-00
ZMQClient.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <string>
11 #include <zmq.hpp>
12 #include <memory>
13 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
14 #include <framework/logging/Logger.h>
15 
16 namespace Belle2 {
22  class ZMQClient {
23  public:
25  template <int AZMQType>
26  void initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress, const std::string& socketName, bool bind);
27 
29  void initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress);
30 
32  void terminate(bool sendGoodbye = true);
33 
35  void reset();
36 
38  void subscribe(EMessageTypes messageType);
39 
41  template <class AZMQMessage>
42  void send(AZMQMessage message) const
43  {
44  AZMQMessage::element_type::toSocket(std::move(message), m_socket);
45  }
46 
48  void send(zmq::message_t& message) const;
49 
51  template <class AZMQMessage>
52  void publish(AZMQMessage message) const
53  {
54  AZMQMessage::element_type::toSocket(std::move(message), m_pubSocket);
55  }
56 
58  bool isOnline() const
59  {
60  return m_context.get();
61  }
62 
63 
74  template <class AMulticastAnswer, class ASocketAnswer>
75  int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const;
76 
82  template <class ASocketAnswer>
83  int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const;
84 
90  template <class AMulticastAnswer>
91  int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const;
92 
93  private:
95  static int pollSocketVector(const std::vector<zmq::socket_t*>& socketList, int timeout);
96 
98  std::unique_ptr<zmq::context_t> m_context;
99 
101  std::vector<zmq::socket_t*> m_pollSocketPtrList;
102 
104  std::unique_ptr<zmq::socket_t> m_pubSocket;
106  std::unique_ptr<zmq::socket_t> m_subSocket;
108  std::unique_ptr<zmq::socket_t> m_socket;
109  };
110 
111  template <class AMulticastAnswer, class ASocketAnswer>
112  int ZMQClient::poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
113  {
114  B2ASSERT("Can only run this on started clients", m_subSocket and m_socket);
115  bool repeat = true;
116  int pollResult;
117  do {
118  pollResult = pollSocketVector(m_pollSocketPtrList, timeout);
119  if (pollResult & 1) {
120  // The multicast is the first entry.
121  // Get all entries if possible, but do not block anymore.
122  std::vector<zmq::socket_t*> vector = {m_subSocket.get()};
123  while (pollSocketVector(vector, 0) and repeat) {
124  repeat = multicastAnswer(m_subSocket);
125  }
126  }
127 
128  if (pollResult & 2) {
129  // Get all entries if possible, but do not block anymore.
130  std::vector<zmq::socket_t*> vector = {m_socket.get()};
131  while (pollSocketVector(vector, 0) and repeat) {
132  repeat = socketAnswer(m_socket);
133  }
134  }
135  } while (repeat and pollResult);
136 
137  return pollResult;
138  }
139 
140  template <class ASocketAnswer>
141  int ZMQClient::pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
142  {
143  B2ASSERT("Can only run this on started clients", m_socket);
144  std::vector<zmq::socket_t*> vector = {m_socket.get()};
145 
146  bool repeat = true;
147  int pollResult;
148  do {
149  pollResult = pollSocketVector(vector, timeout);
150  if (pollResult) {
151  while (pollSocketVector(vector, 0)) {
152  repeat = socketAnswer(m_socket);
153  }
154  }
155  } while (repeat and pollResult);
156 
157  return pollResult;
158  }
159 
160  template <class AMulticastAnswer>
161  int ZMQClient::pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
162  {
163  B2ASSERT("Can only run this on started clients", m_subSocket);
164  std::vector<zmq::socket_t*> vector = {m_subSocket.get()};
165 
166  bool repeat = true;
167  int pollResult;
168  do {
169  pollResult = pollSocketVector(vector, timeout);
170  if (pollResult) {
171  while (pollSocketVector(vector, 0)) {
172  repeat = multicastAnswer(m_subSocket);
173  }
174  }
175  } while (repeat and pollResult);
176 
177  return pollResult;
178  }
180 } // namespace Belle2
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
std::unique_ptr< zmq::socket_t > m_subSocket
ZMQ sub socket.
Definition: ZMQClient.h:106
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
std::unique_ptr< zmq::socket_t > m_pubSocket
ZMQ Pub socket.
Definition: ZMQClient.h:104
static int pollSocketVector(const std::vector< zmq::socket_t * > &socketList, int timeout)
Internal poll function.
Definition: ZMQClient.cc:118
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:58
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:52
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
Definition: ZMQClient.h:98
std::vector< zmq::socket_t * > m_pollSocketPtrList
Will use this vector for polling.
Definition: ZMQClient.h:101
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
std::unique_ptr< zmq::socket_t > m_socket
ZMQ socket.
Definition: ZMQClient.h:108
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
Definition: ZMQClient.cc:43
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:42
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:112
EMessageTypes
Type the messages can have.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:161
int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
Poll method to only the data socket.
Definition: ZMQClient.h:141
Abstract base class for different kinds of events.