9 #include <framework/pcore/zmq/sockets/ZMQClient.h>
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
18 void ZMQClient::terminate(
bool sendGoodbye)
20 if (m_pubSocket and sendGoodbye) {
21 auto multicastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage, getpid());
22 publish(std::move(multicastMessage));
31 m_pubSocket.release();
35 m_subSocket.release();
43 void ZMQClient::reset()
46 m_subSocket.release();
47 m_pubSocket.release();
52 template <
int AZMQType>
53 void ZMQClient::initialize(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
54 const std::string& socketAddress,
bool bind)
56 initialize(pubSocketAddress, subSocketAddress);
57 m_socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
59 if (AZMQType == ZMQ_DEALER) {
60 const std::string uniqueID = std::to_string(getpid());
61 m_socket->set(zmq::sockopt::routing_id, uniqueID);
64 m_socket->set(zmq::sockopt::linger, 0);
66 m_socket->bind(socketAddress.c_str());
68 m_socket->connect(socketAddress.c_str());
72 std::this_thread::sleep_for(std::chrono::milliseconds(10));
74 B2DEBUG(100,
"Created socket: " << socketAddress);
76 m_pollSocketPtrList.push_back(m_socket.get());
79 void ZMQClient::initialize(
const std::string& pubSocketAddress,
const std::string& subSocketAddress)
81 m_context = std::make_unique<zmq::context_t>(1);
82 m_pubSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_PUB);
83 m_subSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_SUB);
85 m_pubSocket->connect(pubSocketAddress);
86 m_pubSocket->set(zmq::sockopt::linger, 0);
88 m_subSocket->connect(subSocketAddress);
89 m_subSocket->set(zmq::sockopt::linger, 0);
91 B2DEBUG(200,
"Having initialized multicast with sub on " << subSocketAddress <<
" and pub on " << pubSocketAddress);
93 std::this_thread::sleep_for(std::chrono::milliseconds(10));
95 m_pollSocketPtrList.clear();
96 m_pollSocketPtrList.push_back(m_subSocket.get());
101 B2ASSERT(
"Can only run this on started clients", m_subSocket);
103 char_filter[0] =
static_cast<char>(
filter);
105 m_subSocket->set(zmq::sockopt::subscribe, char_filter);
108 void ZMQClient::send(zmq::message_t& message)
const
110 B2ASSERT(
"Can only run this on started clients", m_socket);
111 m_socket->send(message, zmq::send_flags::none);
114 #if defined(__GNUC__) && !defined(__clang__)
115 #pragma GCC diagnostic push
116 #pragma GCC diagnostic ignored "-Wstack-usage="
118 int ZMQClient::pollSocketVector(
const std::vector<zmq::socket_t*>& socketList,
int timeout)
120 auto start = std::chrono::system_clock::now();
121 int return_bitmask = 0;
122 assert(socketList.size() <= 2);
123 zmq::pollitem_t items[socketList.size()];
125 for (
unsigned int i = 0; i < socketList.size(); i++) {
126 items[i].socket =
static_cast<void*
>(*socketList[i]);
127 items[i].events = ZMQ_POLLIN;
128 items[i].revents = 0;
131 while (timeout >= 0) {
133 zmq::poll(items, socketList.size(), timeout);
135 for (
unsigned int i = 0; i < socketList.size(); i++) {
136 if (
static_cast<bool>(items[i].revents & ZMQ_POLLIN)) {
137 return_bitmask = return_bitmask | 1 << i;
140 return return_bitmask;
141 }
catch (zmq::error_t& error) {
142 if (error.num() == EINTR) {
143 auto now = std::chrono::system_clock::now();
144 timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
153 #if defined(__GNUC__) && !defined(__clang__)
154 #pragma GCC diagnostic pop
157 template void Belle2::ZMQClient::initialize<ZMQ_PUSH>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
158 const std::string& socketAddress,
bool bind);
159 template void Belle2::ZMQClient::initialize<ZMQ_PULL>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
160 const std::string& socketAddress,
bool bind);
161 template void Belle2::ZMQClient::initialize<ZMQ_DEALER>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
162 const std::string& socketAddress,
bool bind);
163 template void Belle2::ZMQClient::initialize<ZMQ_ROUTER>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
164 const std::string& socketAddress,
bool bind);
165 template void Belle2::ZMQClient::initialize<ZMQ_PUB>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
166 const std::string& socketAddress,
bool bind);
EMessageTypes
Type the messages can have.
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Abstract base class for different kinds of events.