9 #include <framework/pcore/zmq/sockets/ZMQClient.h> 
   10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h> 
   18 void ZMQClient::terminate(
bool sendGoodbye)
 
   20   if (m_pubSocket and sendGoodbye) {
 
   21     auto multicastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage, getpid());
 
   22     publish(std::move(multicastMessage));
 
   31     m_pubSocket.release();
 
   35     m_subSocket.release();
 
   43 void ZMQClient::reset()
 
   46   m_subSocket.release();
 
   47   m_pubSocket.release();
 
   52 template <
int AZMQType>
 
   53 void ZMQClient::initialize(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
   54                            const std::string& socketAddress, 
bool bind)
 
   56   initialize(pubSocketAddress, subSocketAddress);
 
   57   m_socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
 
   59   if (AZMQType == ZMQ_DEALER) {
 
   60     const std::string uniqueID = std::to_string(getpid());
 
   61     m_socket->set(zmq::sockopt::routing_id, uniqueID);
 
   64   m_socket->set(zmq::sockopt::linger, 0);
 
   66     m_socket->bind(socketAddress.c_str());
 
   68     m_socket->connect(socketAddress.c_str());
 
   72   std::this_thread::sleep_for(std::chrono::milliseconds(10));
 
   74   B2DEBUG(100, 
"Created socket: " << socketAddress);
 
   76   m_pollSocketPtrList.push_back(m_socket.get());
 
   79 void ZMQClient::initialize(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress)
 
   81   m_context = std::make_unique<zmq::context_t>(1);
 
   82   m_pubSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_PUB);
 
   83   m_subSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_SUB);
 
   85   m_pubSocket->connect(pubSocketAddress);
 
   86   m_pubSocket->set(zmq::sockopt::linger, 0);
 
   88   m_subSocket->connect(subSocketAddress);
 
   89   m_subSocket->set(zmq::sockopt::linger, 0);
 
   91   B2DEBUG(200, 
"Having initialized multicast with sub on " << subSocketAddress << 
" and pub on " << pubSocketAddress);
 
   93   std::this_thread::sleep_for(std::chrono::milliseconds(10));
 
   95   m_pollSocketPtrList.clear();
 
   96   m_pollSocketPtrList.push_back(m_subSocket.get());
 
  101   B2ASSERT(
"Can only run this on started clients", m_subSocket);
 
  103   char_filter[0] = 
static_cast<char>(filter);
 
  105   m_subSocket->set(zmq::sockopt::subscribe, char_filter);
 
  108 void ZMQClient::send(zmq::message_t& message)
 const 
  110   B2ASSERT(
"Can only run this on started clients", m_socket);
 
  111   m_socket->send(message, zmq::send_flags::none);
 
  114 #if defined(__GNUC__) && !defined(__clang__) 
  115 #pragma GCC diagnostic push 
  116 #pragma GCC diagnostic ignored "-Wstack-usage=" 
  118 int ZMQClient::pollSocketVector(
const std::vector<zmq::socket_t*>& socketList, 
int timeout)
 
  120   auto start = std::chrono::system_clock::now();
 
  121   int return_bitmask = 0;
 
  122   assert(socketList.size() <= 2);
 
  123   zmq::pollitem_t items[socketList.size()];
 
  125   for (
unsigned int i = 0; i < socketList.size(); i++) {
 
  126     items[i].socket = 
static_cast<void*
>(*socketList[i]);
 
  127     items[i].events = ZMQ_POLLIN;
 
  128     items[i].revents = 0;
 
  131   while (timeout >= 0) {
 
  133       zmq::poll(items, socketList.size(), timeout);
 
  135       for (
unsigned int i = 0; i < socketList.size(); i++) {
 
  136         if (
static_cast<bool>(items[i].revents & ZMQ_POLLIN)) {
 
  137           return_bitmask = return_bitmask | 1 << i;
 
  140       return return_bitmask;
 
  141     } 
catch (zmq::error_t& error) {
 
  142       if (error.num() == EINTR) {
 
  143         auto now = std::chrono::system_clock::now();
 
  144         timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
 
  153 #if defined(__GNUC__) && !defined(__clang__) 
  154 #pragma GCC diagnostic pop 
  157 template void Belle2::ZMQClient::initialize<ZMQ_PUSH>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  158                                                       const std::string& socketAddress, 
bool bind);
 
  159 template void Belle2::ZMQClient::initialize<ZMQ_PULL>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  160                                                       const std::string& socketAddress, 
bool bind);
 
  161 template void Belle2::ZMQClient::initialize<ZMQ_DEALER>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  162                                                         const std::string& socketAddress, 
bool bind);
 
  163 template void Belle2::ZMQClient::initialize<ZMQ_ROUTER>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  164                                                         const std::string& socketAddress, 
bool bind);
 
  165 template void Belle2::ZMQClient::initialize<ZMQ_PUB>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  166                                                      const std::string& socketAddress, 
bool bind);
 
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.