Belle II Software development
ZMQClient.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <string>
11#include <zmq.hpp>
12#include <memory>
13#include <framework/pcore/zmq/messages/ZMQDefinitions.h>
14#include <framework/logging/Logger.h>
15
16namespace Belle2 {
22 class ZMQClient {
23 public:
25 template <int AZMQType>
26 void initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress, const std::string& socketAddress,
27 bool bind);
28
30 void initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress);
31
33 void terminate(bool sendGoodbye = true);
34
36 void reset();
37
39 void subscribe(EMessageTypes messageType);
40
42 template <class AZMQMessage>
43 void send(AZMQMessage message) const
44 {
45 AZMQMessage::element_type::toSocket(std::move(message), m_socket);
46 }
47
49 void send(zmq::message_t& message) const;
50
52 template <class AZMQMessage>
53 void publish(AZMQMessage message) const
54 {
55 AZMQMessage::element_type::toSocket(std::move(message), m_pubSocket);
56 }
57
59 bool isOnline() const
60 {
61 return m_context.get();
62 }
63
64
75 template <class AMulticastAnswer, class ASocketAnswer>
76 int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const;
77
83 template <class ASocketAnswer>
84 int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const;
85
91 template <class AMulticastAnswer>
92 int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const;
93
94 private:
96 static int pollSocketVector(const std::vector<zmq::socket_t*>& socketList, int timeout);
97
99 std::unique_ptr<zmq::context_t> m_context;
100
102 std::vector<zmq::socket_t*> m_pollSocketPtrList;
103
105 std::unique_ptr<zmq::socket_t> m_pubSocket;
107 std::unique_ptr<zmq::socket_t> m_subSocket;
109 std::unique_ptr<zmq::socket_t> m_socket;
110 };
111
112 template <class AMulticastAnswer, class ASocketAnswer>
113 int ZMQClient::poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
114 {
115 B2ASSERT("Can only run this on started clients", m_subSocket and m_socket);
116 bool repeat = true;
117 int pollResult;
118 do {
119 pollResult = pollSocketVector(m_pollSocketPtrList, timeout);
120 if (pollResult & 1) {
121 // The multicast is the first entry.
122 // Get all entries if possible, but do not block anymore.
123 std::vector<zmq::socket_t*> vector = {m_subSocket.get()};
124 while (pollSocketVector(vector, 0) and repeat) {
125 repeat = multicastAnswer(m_subSocket);
126 }
127 }
128
129 if (pollResult & 2) {
130 // Get all entries if possible, but do not block anymore.
131 std::vector<zmq::socket_t*> vector = {m_socket.get()};
132 while (pollSocketVector(vector, 0) and repeat) {
133 repeat = socketAnswer(m_socket);
134 }
135 }
136 } while (repeat and pollResult);
137
138 return pollResult;
139 }
140
141 template <class ASocketAnswer>
142 int ZMQClient::pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
143 {
144 B2ASSERT("Can only run this on started clients", m_socket);
145 std::vector<zmq::socket_t*> vector = {m_socket.get()};
146
147 bool repeat = true;
148 int pollResult;
149 do {
150 pollResult = pollSocketVector(vector, timeout);
151 if (pollResult) {
152 while (pollSocketVector(vector, 0)) {
153 repeat = socketAnswer(m_socket);
154 }
155 }
156 } while (repeat and pollResult);
157
158 return pollResult;
159 }
160
161 template <class AMulticastAnswer>
162 int ZMQClient::pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
163 {
164 B2ASSERT("Can only run this on started clients", m_subSocket);
165 std::vector<zmq::socket_t*> vector = {m_subSocket.get()};
166
167 bool repeat = true;
168 int pollResult;
169 do {
170 pollResult = pollSocketVector(vector, timeout);
171 if (pollResult) {
172 while (pollSocketVector(vector, 0)) {
173 repeat = multicastAnswer(m_subSocket);
174 }
175 }
176 } while (repeat and pollResult);
177
178 return pollResult;
179 }
181} // namespace Belle2
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
std::unique_ptr< zmq::socket_t > m_subSocket
ZMQ sub socket.
Definition: ZMQClient.h:107
std::unique_ptr< zmq::socket_t > m_pubSocket
ZMQ Pub socket.
Definition: ZMQClient.h:105
static int pollSocketVector(const std::vector< zmq::socket_t * > &socketList, int timeout)
Internal poll function.
Definition: ZMQClient.cc:118
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:59
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
Definition: ZMQClient.h:99
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
std::vector< zmq::socket_t * > m_pollSocketPtrList
Will use this vector for polling.
Definition: ZMQClient.h:102
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
std::unique_ptr< zmq::socket_t > m_socket
ZMQ socket.
Definition: ZMQClient.h:109
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
Definition: ZMQClient.cc:43
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:113
EMessageTypes
Type the messages can have.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:162
int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
Poll method to only the data socket.
Definition: ZMQClient.h:142
Abstract base class for different kinds of events.