9#include <framework/pcore/zmq/sockets/ZMQClient.h> 
   10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h> 
   22    publish(std::move(multicastMessage));
 
   52template <
int AZMQType>
 
   54                           const std::string& socketAddress, 
bool bind)
 
   56  initialize(pubSocketAddress, subSocketAddress);
 
   59  if (AZMQType == ZMQ_DEALER) {
 
   60    const std::string uniqueID = std::to_string(getpid());
 
   61    m_socket->set(zmq::sockopt::routing_id, uniqueID);
 
   64  m_socket->set(zmq::sockopt::linger, 0);
 
   66    m_socket->bind(socketAddress.c_str());
 
   68    m_socket->connect(socketAddress.c_str());
 
   72  std::this_thread::sleep_for(std::chrono::milliseconds(10));
 
   74  B2DEBUG(100, 
"Created socket: " << socketAddress);
 
   81  m_context = std::make_unique<zmq::context_t>(1);
 
   91  B2DEBUG(200, 
"Having initialized multicast with sub on " << subSocketAddress << 
" and pub on " << pubSocketAddress);
 
   93  std::this_thread::sleep_for(std::chrono::milliseconds(10));
 
  101  B2ASSERT(
"Can only run this on started clients", 
m_subSocket);
 
  103  char_filter[0] = 
static_cast<char>(
filter);
 
  105  m_subSocket->set(zmq::sockopt::subscribe, char_filter);
 
  110  B2ASSERT(
"Can only run this on started clients", 
m_socket);
 
  111  m_socket->send(message, zmq::send_flags::none);
 
  114#if defined(__GNUC__) && !defined(__clang__) 
  115#pragma GCC diagnostic push 
  116#pragma GCC diagnostic ignored "-Wstack-usage=" 
  120  auto start = std::chrono::system_clock::now();
 
  121  int return_bitmask = 0;
 
  122  assert(socketList.size() <= 2);
 
  123  zmq::pollitem_t items[socketList.size()];
 
  125  for (
unsigned int i = 0; i < socketList.size(); i++) {
 
  126    items[i].socket = 
static_cast<void*
>(*socketList[i]);
 
  127    items[i].events = ZMQ_POLLIN;
 
  128    items[i].revents = 0;
 
  131  while (timeout >= 0) {
 
  133      zmq::poll(items, socketList.size(), timeout);
 
  135      for (
unsigned int i = 0; i < socketList.size(); i++) {
 
  136        if (
static_cast<bool>(items[i].revents & ZMQ_POLLIN)) {
 
  137          return_bitmask = return_bitmask | 1 << i;
 
  140      return return_bitmask;
 
  141    } 
catch (zmq::error_t& error) {
 
  142      if (error.num() == EINTR) {
 
  143        auto now = std::chrono::system_clock::now();
 
  144        timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
 
  153#if defined(__GNUC__) && !defined(__clang__) 
  154#pragma GCC diagnostic pop 
  157template void Belle2::ZMQClient::initialize<ZMQ_PUSH>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  158                                                      const std::string& socketAddress, 
bool bind);
 
  159template void Belle2::ZMQClient::initialize<ZMQ_PULL>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  160                                                      const std::string& socketAddress, 
bool bind);
 
  161template void Belle2::ZMQClient::initialize<ZMQ_DEALER>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  162                                                        const std::string& socketAddress, 
bool bind);
 
  163template void Belle2::ZMQClient::initialize<ZMQ_ROUTER>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  164                                                        const std::string& socketAddress, 
bool bind);
 
  165template void Belle2::ZMQClient::initialize<ZMQ_PUB>(
const std::string& pubSocketAddress, 
const std::string& subSocketAddress,
 
  166                                                     const std::string& socketAddress, 
bool bind);
 
std::unique_ptr< zmq::socket_t > m_subSocket
ZMQ sub socket.
std::unique_ptr< zmq::socket_t > m_pubSocket
ZMQ Pub socket.
static int pollSocketVector(const std::vector< zmq::socket_t * > &socketList, int timeout)
Internal poll function.
void publish(AZMQMessage message) const
Publish the message to the multicast.
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
std::vector< zmq::socket_t * > m_pollSocketPtrList
Will use this vector for polling.
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
std::unique_ptr< zmq::socket_t > m_socket
ZMQ socket.
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
void send(AZMQMessage message) const
Send a message over the data socket.
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
EMessageTypes
Type the messages can have.
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double > > &runs, double cut, std::map< ExpRun, std::pair< double, double > > &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Abstract base class for different kinds of events.