Belle II Software  release-08-01-10
ZMQClient.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <string>
11 #include <zmq.hpp>
12 #include <memory>
13 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
14 #include <framework/logging/Logger.h>
15 
16 namespace Belle2 {
22  class ZMQClient {
23  public:
25  template <int AZMQType>
26  void initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress, const std::string& socketAddress,
27  bool bind);
28 
30  void initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress);
31 
33  void terminate(bool sendGoodbye = true);
34 
36  void reset();
37 
39  void subscribe(EMessageTypes messageType);
40 
42  template <class AZMQMessage>
43  void send(AZMQMessage message) const
44  {
45  AZMQMessage::element_type::toSocket(std::move(message), m_socket);
46  }
47 
49  void send(zmq::message_t& message) const;
50 
52  template <class AZMQMessage>
53  void publish(AZMQMessage message) const
54  {
55  AZMQMessage::element_type::toSocket(std::move(message), m_pubSocket);
56  }
57 
59  bool isOnline() const
60  {
61  return m_context.get();
62  }
63 
64 
75  template <class AMulticastAnswer, class ASocketAnswer>
76  int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const;
77 
83  template <class ASocketAnswer>
84  int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const;
85 
91  template <class AMulticastAnswer>
92  int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const;
93 
94  private:
96  static int pollSocketVector(const std::vector<zmq::socket_t*>& socketList, int timeout);
97 
99  std::unique_ptr<zmq::context_t> m_context;
100 
102  std::vector<zmq::socket_t*> m_pollSocketPtrList;
103 
105  std::unique_ptr<zmq::socket_t> m_pubSocket;
107  std::unique_ptr<zmq::socket_t> m_subSocket;
109  std::unique_ptr<zmq::socket_t> m_socket;
110  };
111 
112  template <class AMulticastAnswer, class ASocketAnswer>
113  int ZMQClient::poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
114  {
115  B2ASSERT("Can only run this on started clients", m_subSocket and m_socket);
116  bool repeat = true;
117  int pollResult;
118  do {
119  pollResult = pollSocketVector(m_pollSocketPtrList, timeout);
120  if (pollResult & 1) {
121  // The multicast is the first entry.
122  // Get all entries if possible, but do not block anymore.
123  std::vector<zmq::socket_t*> vector = {m_subSocket.get()};
124  while (pollSocketVector(vector, 0) and repeat) {
125  repeat = multicastAnswer(m_subSocket);
126  }
127  }
128 
129  if (pollResult & 2) {
130  // Get all entries if possible, but do not block anymore.
131  std::vector<zmq::socket_t*> vector = {m_socket.get()};
132  while (pollSocketVector(vector, 0) and repeat) {
133  repeat = socketAnswer(m_socket);
134  }
135  }
136  } while (repeat and pollResult);
137 
138  return pollResult;
139  }
140 
141  template <class ASocketAnswer>
142  int ZMQClient::pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
143  {
144  B2ASSERT("Can only run this on started clients", m_socket);
145  std::vector<zmq::socket_t*> vector = {m_socket.get()};
146 
147  bool repeat = true;
148  int pollResult;
149  do {
150  pollResult = pollSocketVector(vector, timeout);
151  if (pollResult) {
152  while (pollSocketVector(vector, 0)) {
153  repeat = socketAnswer(m_socket);
154  }
155  }
156  } while (repeat and pollResult);
157 
158  return pollResult;
159  }
160 
161  template <class AMulticastAnswer>
162  int ZMQClient::pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
163  {
164  B2ASSERT("Can only run this on started clients", m_subSocket);
165  std::vector<zmq::socket_t*> vector = {m_subSocket.get()};
166 
167  bool repeat = true;
168  int pollResult;
169  do {
170  pollResult = pollSocketVector(vector, timeout);
171  if (pollResult) {
172  while (pollSocketVector(vector, 0)) {
173  repeat = multicastAnswer(m_subSocket);
174  }
175  }
176  } while (repeat and pollResult);
177 
178  return pollResult;
179  }
181 } // namespace Belle2
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
std::unique_ptr< zmq::socket_t > m_subSocket
ZMQ sub socket.
Definition: ZMQClient.h:107
std::unique_ptr< zmq::socket_t > m_pubSocket
ZMQ Pub socket.
Definition: ZMQClient.h:105
static int pollSocketVector(const std::vector< zmq::socket_t * > &socketList, int timeout)
Internal poll function.
Definition: ZMQClient.cc:118
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:59
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
Definition: ZMQClient.h:99
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
std::vector< zmq::socket_t * > m_pollSocketPtrList
Will use this vector for polling.
Definition: ZMQClient.h:102
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
std::unique_ptr< zmq::socket_t > m_socket
ZMQ socket.
Definition: ZMQClient.h:109
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
Definition: ZMQClient.cc:43
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:113
EMessageTypes
Type the messages can have.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:162
int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
Poll method to only the data socket.
Definition: ZMQClient.h:142
Abstract base class for different kinds of events.