Belle II Software development
ZMQClient.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/zmq/sockets/ZMQClient.h>
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11
12#include <thread>
13#include <chrono>
14
15using namespace std;
16using namespace Belle2;
17
18void ZMQClient::terminate(bool sendGoodbye)
19{
20 if (m_pubSocket and sendGoodbye) {
21 auto multicastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage, getpid());
22 publish(std::move(multicastMessage));
23 }
24
25 if (m_socket) {
26 m_socket->close();
27 m_socket.release();
28 }
29 if (m_pubSocket) {
30 m_pubSocket->close();
31 m_pubSocket.release();
32 }
33 if (m_subSocket) {
34 m_subSocket->close();
35 m_subSocket.release();
36 }
37 if (m_context) {
38 m_context->close();
39 m_context.release();
40 }
41}
42
44{
45 m_context.release();
46 m_subSocket.release();
47 m_pubSocket.release();
48 m_socket.release();
49}
50
51
52template <int AZMQType>
53void ZMQClient::initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress,
54 const std::string& socketAddress, bool bind)
55{
56 initialize(pubSocketAddress, subSocketAddress);
57 m_socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
58
59 if (AZMQType == ZMQ_DEALER) {
60 const std::string uniqueID = std::to_string(getpid());
61 m_socket->set(zmq::sockopt::routing_id, uniqueID);
62 }
63
64 m_socket->set(zmq::sockopt::linger, 0);
65 if (bind) {
66 m_socket->bind(socketAddress.c_str());
67 } else {
68 m_socket->connect(socketAddress.c_str());
69 }
70
71 // Give the sockets some time to start
72 std::this_thread::sleep_for(std::chrono::milliseconds(10));
73
74 B2DEBUG(100, "Created socket: " << socketAddress);
75
76 m_pollSocketPtrList.push_back(m_socket.get());
77}
78
79void ZMQClient::initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress)
80{
81 m_context = std::make_unique<zmq::context_t>(1);
82 m_pubSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_PUB);
83 m_subSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_SUB);
84
85 m_pubSocket->connect(pubSocketAddress);
86 m_pubSocket->set(zmq::sockopt::linger, 0);
87
88 m_subSocket->connect(subSocketAddress);
89 m_subSocket->set(zmq::sockopt::linger, 0);
90
91 B2DEBUG(200, "Having initialized multicast with sub on " << subSocketAddress << " and pub on " << pubSocketAddress);
92
93 std::this_thread::sleep_for(std::chrono::milliseconds(10));
94
95 m_pollSocketPtrList.clear();
96 m_pollSocketPtrList.push_back(m_subSocket.get());
97}
98
100{
101 B2ASSERT("Can only run this on started clients", m_subSocket);
102 char char_filter[2];
103 char_filter[0] = static_cast<char>(filter);
104 char_filter[1] = 0;
105 m_subSocket->set(zmq::sockopt::subscribe, char_filter);
106}
107
108void ZMQClient::send(zmq::message_t& message) const
109{
110 B2ASSERT("Can only run this on started clients", m_socket);
111 m_socket->send(message, zmq::send_flags::none);
112}
113
114#if defined(__GNUC__) && !defined(__clang__)
115#pragma GCC diagnostic push
116#pragma GCC diagnostic ignored "-Wstack-usage="
117#endif
118int ZMQClient::pollSocketVector(const std::vector<zmq::socket_t*>& socketList, int timeout)
119{
120 auto start = std::chrono::system_clock::now();
121 int return_bitmask = 0;
122 assert(socketList.size() <= 2);
123 std::vector<zmq::pollitem_t> items(socketList.size());
124
125 for (unsigned int i = 0; i < socketList.size(); i++) {
126 items[i].socket = static_cast<void*>(*socketList[i]);
127 items[i].events = ZMQ_POLLIN;
128 items[i].revents = 0;
129 }
130
131 while (timeout >= 0) {
132 try {
133 zmq::poll(items.data(), socketList.size(), timeout);
134
135 for (unsigned int i = 0; i < socketList.size(); i++) {
136 if (static_cast<bool>(items[i].revents & ZMQ_POLLIN)) {
137 return_bitmask = return_bitmask | 1 << i;
138 }
139 }
140 return return_bitmask;
141 } catch (zmq::error_t& error) {
142 if (error.num() == EINTR) {
143 auto now = std::chrono::system_clock::now();
144 timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
145 } else {
146 // cannot handle, rethrow exception
147 throw;
148 }
149 }
150 }
151 return 0;
152}
153#if defined(__GNUC__) && !defined(__clang__)
154#pragma GCC diagnostic pop
155#endif
156
157template void Belle2::ZMQClient::initialize<ZMQ_PUSH>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
158 const std::string& socketAddress, bool bind);
159template void Belle2::ZMQClient::initialize<ZMQ_PULL>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
160 const std::string& socketAddress, bool bind);
161template void Belle2::ZMQClient::initialize<ZMQ_DEALER>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
162 const std::string& socketAddress, bool bind);
163template void Belle2::ZMQClient::initialize<ZMQ_ROUTER>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
164 const std::string& socketAddress, bool bind);
165template void Belle2::ZMQClient::initialize<ZMQ_PUB>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
166 const std::string& socketAddress, bool bind);
std::unique_ptr< zmq::socket_t > m_subSocket
ZMQ sub socket.
Definition: ZMQClient.h:107
std::unique_ptr< zmq::socket_t > m_pubSocket
ZMQ Pub socket.
Definition: ZMQClient.h:105
static int pollSocketVector(const std::vector< zmq::socket_t * > &socketList, int timeout)
Internal poll function.
Definition: ZMQClient.cc:118
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
Definition: ZMQClient.h:99
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
std::vector< zmq::socket_t * > m_pollSocketPtrList
Will use this vector for polling.
Definition: ZMQClient.h:102
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
std::unique_ptr< zmq::socket_t > m_socket
ZMQ socket.
Definition: ZMQClient.h:109
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
Definition: ZMQClient.cc:43
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
EMessageTypes
Type the messages can have.
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double > > &runs, double cut, std::map< ExpRun, std::pair< double, double > > &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:38
Abstract base class for different kinds of events.
STL namespace.