11 #include <framework/pcore/zmq/sockets/ZMQClient.h>
12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
20 void ZMQClient::terminate(
bool sendGoodbye)
22 if (m_pubSocket and sendGoodbye) {
23 auto multicastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage, getpid());
24 publish(std::move(multicastMessage));
33 m_pubSocket.release();
37 m_subSocket.release();
45 void ZMQClient::reset()
48 m_subSocket.release();
49 m_pubSocket.release();
54 template <
int AZMQType>
55 void ZMQClient::initialize(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
56 const std::string& socketAddress,
bool bind)
58 initialize(pubSocketAddress, subSocketAddress);
59 m_socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
61 if (AZMQType == ZMQ_DEALER) {
62 const std::string uniqueID = std::to_string(getpid());
63 m_socket->setsockopt(ZMQ_IDENTITY, uniqueID.c_str(), uniqueID.length());
66 m_socket->setsockopt(ZMQ_LINGER, 0);
68 m_socket->bind(socketAddress.c_str());
70 m_socket->connect(socketAddress.c_str());
74 std::this_thread::sleep_for(std::chrono::milliseconds(10));
76 B2DEBUG(100,
"Created socket: " << socketAddress);
78 m_pollSocketPtrList.push_back(m_socket.get());
81 void ZMQClient::initialize(
const std::string& pubSocketAddress,
const std::string& subSocketAddress)
83 m_context = std::make_unique<zmq::context_t>(1);
84 m_pubSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_PUB);
85 m_subSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_SUB);
87 m_pubSocket->connect(pubSocketAddress);
88 m_pubSocket->setsockopt(ZMQ_LINGER, 0);
90 m_subSocket->connect(subSocketAddress);
91 m_subSocket->setsockopt(ZMQ_LINGER, 0);
93 B2DEBUG(200,
"Having initialized multicast with sub on " << subSocketAddress <<
" and pub on " << pubSocketAddress);
95 std::this_thread::sleep_for(std::chrono::milliseconds(10));
97 m_pollSocketPtrList.clear();
98 m_pollSocketPtrList.push_back(m_subSocket.get());
103 B2ASSERT(
"Can only run this on started clients", m_subSocket);
104 const auto char_filter =
static_cast<char>(
filter);
105 m_subSocket->setsockopt(ZMQ_SUBSCRIBE, &char_filter, 1);
108 void ZMQClient::send(zmq::message_t& message)
const
110 B2ASSERT(
"Can only run this on started clients", m_socket);
111 m_socket->send(message);
114 #if defined(__GNUC__) && !defined(__clang__)
115 #pragma GCC diagnostic push
116 #pragma GCC diagnostic ignored "-Wstack-usage="
118 int ZMQClient::pollSocketVector(
const std::vector<zmq::socket_t*>& socketList,
int timeout)
120 auto start = std::chrono::system_clock::now();
121 int return_bitmask = 0;
122 assert(socketList.size() <= 2);
123 zmq::pollitem_t items[socketList.size()];
125 for (
unsigned int i = 0; i < socketList.size(); i++) {
126 items[i].socket =
static_cast<void*
>(*socketList[i]);
127 items[i].events = ZMQ_POLLIN;
128 items[i].revents = 0;
131 while (timeout >= 0) {
133 zmq::poll(items, socketList.size(), timeout);
135 for (
unsigned int i = 0; i < socketList.size(); i++) {
136 if (
static_cast<bool>(items[i].revents & ZMQ_POLLIN)) {
137 return_bitmask = return_bitmask | 1 << i;
140 return return_bitmask;
141 }
catch (zmq::error_t& error) {
142 if (error.num() == EINTR) {
143 auto now = std::chrono::system_clock::now();
144 timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
153 #if defined(__GNUC__) && !defined(__clang__)
154 #pragma GCC diagnostic pop
157 template void Belle2::ZMQClient::initialize<ZMQ_PUSH>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
158 const std::string& socketAddress,
bool bind);
159 template void Belle2::ZMQClient::initialize<ZMQ_PULL>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
160 const std::string& socketAddress,
bool bind);
161 template void Belle2::ZMQClient::initialize<ZMQ_DEALER>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
162 const std::string& socketAddress,
bool bind);
163 template void Belle2::ZMQClient::initialize<ZMQ_ROUTER>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
164 const std::string& socketAddress,
bool bind);
165 template void Belle2::ZMQClient::initialize<ZMQ_PUB>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
166 const std::string& socketAddress,
bool bind);