Belle II Software development
ZMQClient Class Reference

A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket. More...

#include <ZMQClient.h>

Public Member Functions

template<int AZMQType>
void initialize (const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
 Initialize the multicast and a data socket of the given type.
 
void initialize (const std::string &pubSocketAddress, const std::string &subSocketAddress)
 Initialize only the multicast.
 
void terminate (bool sendGoodbye=true)
 Terminate the sockets properly.
 
void reset ()
 Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up to times.
 
void subscribe (EMessageTypes messageType)
 Subscribe to the given multicast message type.
 
template<class AZMQMessage >
void send (AZMQMessage message) const
 Send a message over the data socket.
 
void send (zmq::message_t &message) const
 Send a zmq message over the data socket. ATTENTION: we are taking ownership here!
 
template<class AZMQMessage >
void publish (AZMQMessage message) const
 Publish the message to the multicast.
 
bool isOnline () const
 Check if the client was initialized and not terminated.
 
template<class AMulticastAnswer , class ASocketAnswer >
int poll (unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
 Poll both the multicast and the data socket until, either:
 
template<class ASocketAnswer >
int pollSocket (unsigned int timeout, ASocketAnswer socketAnswer) const
 Poll method to only the data socket.
 
template<class AMulticastAnswer >
int pollMulticast (unsigned int timeout, AMulticastAnswer multicastAnswer) const
 Poll method to only the multicast socket.
 

Static Private Member Functions

static int pollSocketVector (const std::vector< zmq::socket_t * > &socketList, int timeout)
 Internal poll function.
 

Private Attributes

std::unique_ptr< zmq::context_t > m_context
 ZMQ context.
 
std::vector< zmq::socket_t * > m_pollSocketPtrList
 Will use this vector for polling.
 
std::unique_ptr< zmq::socket_t > m_pubSocket
 ZMQ Pub socket.
 
std::unique_ptr< zmq::socket_t > m_subSocket
 ZMQ sub socket.
 
std::unique_ptr< zmq::socket_t > m_socket
 ZMQ socket.
 

Detailed Description

A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.

Definition at line 22 of file ZMQClient.h.

Member Function Documentation

◆ initialize() [1/2]

void initialize ( const std::string &  pubSocketAddress,
const std::string &  subSocketAddress 
)

Initialize only the multicast.

Definition at line 79 of file ZMQClient.cc.

80{
81 m_context = std::make_unique<zmq::context_t>(1);
82 m_pubSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_PUB);
83 m_subSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_SUB);
84
85 m_pubSocket->connect(pubSocketAddress);
86 m_pubSocket->set(zmq::sockopt::linger, 0);
87
88 m_subSocket->connect(subSocketAddress);
89 m_subSocket->set(zmq::sockopt::linger, 0);
90
91 B2DEBUG(200, "Having initialized multicast with sub on " << subSocketAddress << " and pub on " << pubSocketAddress);
92
93 std::this_thread::sleep_for(std::chrono::milliseconds(10));
94
95 m_pollSocketPtrList.clear();
96 m_pollSocketPtrList.push_back(m_subSocket.get());
97}
std::unique_ptr< zmq::socket_t > m_subSocket
ZMQ sub socket.
Definition: ZMQClient.h:107
std::unique_ptr< zmq::socket_t > m_pubSocket
ZMQ Pub socket.
Definition: ZMQClient.h:105
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
Definition: ZMQClient.h:99
std::vector< zmq::socket_t * > m_pollSocketPtrList
Will use this vector for polling.
Definition: ZMQClient.h:102

◆ initialize() [2/2]

template void initialize< ZMQ_PUB > ( const std::string &  pubSocketAddress,
const std::string &  subSocketAddress,
const std::string &  socketAddress,
bool  bind 
)

Initialize the multicast and a data socket of the given type.

Definition at line 53 of file ZMQClient.cc.

55{
56 initialize(pubSocketAddress, subSocketAddress);
57 m_socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
58
59 if (AZMQType == ZMQ_DEALER) {
60 const std::string uniqueID = std::to_string(getpid());
61 m_socket->set(zmq::sockopt::routing_id, uniqueID);
62 }
63
64 m_socket->set(zmq::sockopt::linger, 0);
65 if (bind) {
66 m_socket->bind(socketAddress.c_str());
67 } else {
68 m_socket->connect(socketAddress.c_str());
69 }
70
71 // Give the sockets some time to start
72 std::this_thread::sleep_for(std::chrono::milliseconds(10));
73
74 B2DEBUG(100, "Created socket: " << socketAddress);
75
76 m_pollSocketPtrList.push_back(m_socket.get());
77}
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
std::unique_ptr< zmq::socket_t > m_socket
ZMQ socket.
Definition: ZMQClient.h:109

◆ isOnline()

bool isOnline ( ) const
inline

Check if the client was initialized and not terminated.

Definition at line 59 of file ZMQClient.h.

60 {
61 return m_context.get();
62 }

◆ pollSocketVector()

int pollSocketVector ( const std::vector< zmq::socket_t * > &  socketList,
int  timeout 
)
staticprivate

Internal poll function.

Definition at line 118 of file ZMQClient.cc.

119{
120 auto start = std::chrono::system_clock::now();
121 int return_bitmask = 0;
122 assert(socketList.size() <= 2);
123 std::vector<zmq::pollitem_t> items(socketList.size());
124
125 for (unsigned int i = 0; i < socketList.size(); i++) {
126 items[i].socket = static_cast<void*>(*socketList[i]);
127 items[i].events = ZMQ_POLLIN;
128 items[i].revents = 0;
129 }
130
131 while (timeout >= 0) {
132 try {
133 zmq::poll(items.data(), socketList.size(), timeout);
134
135 for (unsigned int i = 0; i < socketList.size(); i++) {
136 if (static_cast<bool>(items[i].revents & ZMQ_POLLIN)) {
137 return_bitmask = return_bitmask | 1 << i;
138 }
139 }
140 return return_bitmask;
141 } catch (zmq::error_t& error) {
142 if (error.num() == EINTR) {
143 auto now = std::chrono::system_clock::now();
144 timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
145 } else {
146 // cannot handle, rethrow exception
147 throw;
148 }
149 }
150 }
151 return 0;
152}

◆ publish()

void publish ( AZMQMessage  message) const
inline

Publish the message to the multicast.

Definition at line 53 of file ZMQClient.h.

54 {
55 AZMQMessage::element_type::toSocket(std::move(message), m_pubSocket);
56 }

◆ reset()

void reset ( )

Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up to times.

Definition at line 43 of file ZMQClient.cc.

44{
45 m_context.release();
46 m_subSocket.release();
47 m_pubSocket.release();
48 m_socket.release();
49}

◆ send() [1/2]

void send ( AZMQMessage  message) const
inline

Send a message over the data socket.

Definition at line 43 of file ZMQClient.h.

44 {
45 AZMQMessage::element_type::toSocket(std::move(message), m_socket);
46 }

◆ send() [2/2]

void send ( zmq::message_t &  message) const

Send a zmq message over the data socket. ATTENTION: we are taking ownership here!

Definition at line 108 of file ZMQClient.cc.

109{
110 B2ASSERT("Can only run this on started clients", m_socket);
111 m_socket->send(message, zmq::send_flags::none);
112}

◆ subscribe()

void subscribe ( EMessageTypes  messageType)

Subscribe to the given multicast message type.

Definition at line 99 of file ZMQClient.cc.

100{
101 B2ASSERT("Can only run this on started clients", m_subSocket);
102 char char_filter[2];
103 char_filter[0] = static_cast<char>(filter);
104 char_filter[1] = 0;
105 m_subSocket->set(zmq::sockopt::subscribe, char_filter);
106}
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double > > &runs, double cut, std::map< ExpRun, std::pair< double, double > > &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:38

◆ terminate()

void terminate ( bool  sendGoodbye = true)

Terminate the sockets properly.

Definition at line 18 of file ZMQClient.cc.

19{
20 if (m_pubSocket and sendGoodbye) {
21 auto multicastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage, getpid());
22 publish(std::move(multicastMessage));
23 }
24
25 if (m_socket) {
26 m_socket->close();
27 m_socket.release();
28 }
29 if (m_pubSocket) {
30 m_pubSocket->close();
31 m_pubSocket.release();
32 }
33 if (m_subSocket) {
34 m_subSocket->close();
35 m_subSocket.release();
36 }
37 if (m_context) {
38 m_context->close();
39 m_context.release();
40 }
41}
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.

Member Data Documentation

◆ m_context

std::unique_ptr<zmq::context_t> m_context
private

ZMQ context.

Definition at line 99 of file ZMQClient.h.

◆ m_pollSocketPtrList

std::vector<zmq::socket_t*> m_pollSocketPtrList
private

Will use this vector for polling.

Definition at line 102 of file ZMQClient.h.

◆ m_pubSocket

std::unique_ptr<zmq::socket_t> m_pubSocket
private

ZMQ Pub socket.

Definition at line 105 of file ZMQClient.h.

◆ m_socket

std::unique_ptr<zmq::socket_t> m_socket
private

ZMQ socket.

Definition at line 109 of file ZMQClient.h.

◆ m_subSocket

std::unique_ptr<zmq::socket_t> m_subSocket
private

ZMQ sub socket.

Definition at line 107 of file ZMQClient.h.


The documentation for this class was generated from the following files: