9#include <framework/pcore/zmq/sockets/ZMQClient.h>
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
22 publish(std::move(multicastMessage));
52template <
int AZMQType>
54 const std::string& socketAddress,
bool bind)
56 initialize(pubSocketAddress, subSocketAddress);
59 if (AZMQType == ZMQ_DEALER) {
60 const std::string uniqueID = std::to_string(getpid());
61 m_socket->set(zmq::sockopt::routing_id, uniqueID);
64 m_socket->set(zmq::sockopt::linger, 0);
66 m_socket->bind(socketAddress.c_str());
68 m_socket->connect(socketAddress.c_str());
72 std::this_thread::sleep_for(std::chrono::milliseconds(10));
74 B2DEBUG(100,
"Created socket: " << socketAddress);
81 m_context = std::make_unique<zmq::context_t>(1);
91 B2DEBUG(200,
"Having initialized multicast with sub on " << subSocketAddress <<
" and pub on " << pubSocketAddress);
93 std::this_thread::sleep_for(std::chrono::milliseconds(10));
101 B2ASSERT(
"Can only run this on started clients",
m_subSocket);
103 char_filter[0] =
static_cast<char>(
filter);
105 m_subSocket->set(zmq::sockopt::subscribe, char_filter);
110 B2ASSERT(
"Can only run this on started clients",
m_socket);
111 m_socket->send(message, zmq::send_flags::none);
114#if defined(__GNUC__) && !defined(__clang__)
115#pragma GCC diagnostic push
116#pragma GCC diagnostic ignored "-Wstack-usage="
120 auto start = std::chrono::system_clock::now();
121 int return_bitmask = 0;
122 assert(socketList.size() <= 2);
123 std::vector<zmq::pollitem_t> items(socketList.size());
125 for (
unsigned int i = 0; i < socketList.size(); i++) {
126 items[i].socket =
static_cast<void*
>(*socketList[i]);
127 items[i].events = ZMQ_POLLIN;
128 items[i].revents = 0;
131 while (timeout >= 0) {
133 zmq::poll(items.data(), socketList.size(), timeout);
135 for (
unsigned int i = 0; i < socketList.size(); i++) {
136 if (
static_cast<bool>(items[i].revents & ZMQ_POLLIN)) {
137 return_bitmask = return_bitmask | 1 << i;
140 return return_bitmask;
141 }
catch (zmq::error_t& error) {
142 if (error.num() == EINTR) {
143 auto now = std::chrono::system_clock::now();
144 timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
153#if defined(__GNUC__) && !defined(__clang__)
154#pragma GCC diagnostic pop
157template void Belle2::ZMQClient::initialize<ZMQ_PUSH>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
158 const std::string& socketAddress,
bool bind);
159template void Belle2::ZMQClient::initialize<ZMQ_PULL>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
160 const std::string& socketAddress,
bool bind);
161template void Belle2::ZMQClient::initialize<ZMQ_DEALER>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
162 const std::string& socketAddress,
bool bind);
163template void Belle2::ZMQClient::initialize<ZMQ_ROUTER>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
164 const std::string& socketAddress,
bool bind);
165template void Belle2::ZMQClient::initialize<ZMQ_PUB>(
const std::string& pubSocketAddress,
const std::string& subSocketAddress,
166 const std::string& socketAddress,
bool bind);
std::unique_ptr< zmq::socket_t > m_subSocket
ZMQ sub socket.
std::unique_ptr< zmq::socket_t > m_pubSocket
ZMQ Pub socket.
static int pollSocketVector(const std::vector< zmq::socket_t * > &socketList, int timeout)
Internal poll function.
void publish(AZMQMessage message) const
Publish the message to the multicast.
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
std::vector< zmq::socket_t * > m_pollSocketPtrList
Will use this vector for polling.
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
std::unique_ptr< zmq::socket_t > m_socket
ZMQ socket.
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
void send(AZMQMessage message) const
Send a message over the data socket.
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
EMessageTypes
Type the messages can have.
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double > > &runs, double cut, std::map< ExpRun, std::pair< double, double > > &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Abstract base class for different kinds of events.