Belle II Software  release-05-02-19
ZMQClient.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <string>
13 #include <zmq.hpp>
14 #include <memory>
15 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
16 #include <framework/logging/Logger.h>
17 
18 namespace Belle2 {
23  class ZMQClient {
25  public:
27  template <int AZMQType>
28  void initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress, const std::string& socketName, bool bind);
29 
31  void initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress);
32 
34  void terminate(bool sendGoodbye = true);
35 
37  void reset();
38 
40  void subscribe(EMessageTypes messageType);
41 
43  template <class AZMQMessage>
44  void send(AZMQMessage message) const
45  {
46  AZMQMessage::element_type::toSocket(std::move(message), m_socket);
47  }
48 
50  void send(zmq::message_t& message) const;
51 
53  template <class AZMQMessage>
54  void publish(AZMQMessage message) const
55  {
56  AZMQMessage::element_type::toSocket(std::move(message), m_pubSocket);
57  }
58 
60  bool isOnline() const
61  {
62  return m_context.get();
63  }
64 
65 
76  template <class AMulticastAnswer, class ASocketAnswer>
77  int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const;
78 
84  template <class ASocketAnswer>
85  int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const;
86 
92  template <class AMulticastAnswer>
93  int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const;
94 
95  private:
97  static int pollSocketVector(const std::vector<zmq::socket_t*>& socketList, int timeout);
98 
100  std::unique_ptr<zmq::context_t> m_context;
101 
103  std::vector<zmq::socket_t*> m_pollSocketPtrList;
104 
106  std::unique_ptr<zmq::socket_t> m_pubSocket;
108  std::unique_ptr<zmq::socket_t> m_subSocket;
110  std::unique_ptr<zmq::socket_t> m_socket;
111  };
112 
113  template <class AMulticastAnswer, class ASocketAnswer>
114  int ZMQClient::poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
115  {
116  B2ASSERT("Can only run this on started clients", m_subSocket and m_socket);
117  bool repeat = true;
118  int pollResult;
119  do {
120  pollResult = pollSocketVector(m_pollSocketPtrList, timeout);
121  if (pollResult & 1) {
122  // The multicast is the first entry.
123  // Get all entries if possible, but do not block anymore.
124  std::vector<zmq::socket_t*> vector = {m_subSocket.get()};
125  while (pollSocketVector(vector, 0) and repeat) {
126  repeat = multicastAnswer(m_subSocket);
127  }
128  }
129 
130  if (pollResult & 2) {
131  // Get all entries if possible, but do not block anymore.
132  std::vector<zmq::socket_t*> vector = {m_socket.get()};
133  while (pollSocketVector(vector, 0) and repeat) {
134  repeat = socketAnswer(m_socket);
135  }
136  }
137  } while (repeat and pollResult);
138 
139  return pollResult;
140  }
141 
142  template <class ASocketAnswer>
143  int ZMQClient::pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
144  {
145  B2ASSERT("Can only run this on started clients", m_socket);
146  std::vector<zmq::socket_t*> vector = {m_socket.get()};
147 
148  bool repeat = true;
149  int pollResult;
150  do {
151  pollResult = pollSocketVector(vector, timeout);
152  if (pollResult) {
153  while (pollSocketVector(vector, 0)) {
154  repeat = socketAnswer(m_socket);
155  }
156  }
157  } while (repeat and pollResult);
158 
159  return pollResult;
160  }
161 
162  template <class AMulticastAnswer>
163  int ZMQClient::pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
164  {
165  B2ASSERT("Can only run this on started clients", m_subSocket);
166  std::vector<zmq::socket_t*> vector = {m_subSocket.get()};
167 
168  bool repeat = true;
169  int pollResult;
170  do {
171  pollResult = pollSocketVector(vector, timeout);
172  if (pollResult) {
173  while (pollSocketVector(vector, 0)) {
174  repeat = multicastAnswer(m_subSocket);
175  }
176  }
177  } while (repeat and pollResult);
178 
179  return pollResult;
180  }
182 } // namespace Belle2
Belle2::ZMQClient::terminate
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:20
Belle2::ZMQClient::m_pubSocket
std::unique_ptr< zmq::socket_t > m_pubSocket
ZMQ Pub socket.
Definition: ZMQClient.h:114
Belle2::ZMQClient::pollSocketVector
static int pollSocketVector(const std::vector< zmq::socket_t * > &socketList, int timeout)
Internal poll function.
Definition: ZMQClient.cc:118
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
Belle2::ZMQClient::pollSocket
int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
Poll method to only the data socket.
Definition: ZMQClient.h:151
Belle2::ZMQClient::m_socket
std::unique_ptr< zmq::socket_t > m_socket
ZMQ socket.
Definition: ZMQClient.h:118
Belle2::ZMQClient::m_pollSocketPtrList
std::vector< zmq::socket_t * > m_pollSocketPtrList
Will use this vector for polling.
Definition: ZMQClient.h:111
Belle2::ZMQClient::subscribe
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:101
Belle2::ZMQClient::isOnline
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:68
Belle2::ZMQClient::pollMulticast
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:171
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQClient::send
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:52
Belle2::ZMQClient::publish
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:62
Belle2::ZMQClient::initialize
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:55
Belle2::ZMQClient::poll
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:122
Belle2::ZMQClient::m_subSocket
std::unique_ptr< zmq::socket_t > m_subSocket
ZMQ sub socket.
Definition: ZMQClient.h:116
Belle2::ZMQClient::reset
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
Definition: ZMQClient.cc:45
Belle2::ZMQClient::m_context
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
Definition: ZMQClient.h:108