Belle II Software  release-05-02-19
ZMQParent.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <framework/logging/Logger.h>
13 
14 #include <zmq.hpp>
15 #include <string>
16 #include <memory>
17 #include <thread>
18 #include <chrono>
19 
20 namespace Belle2 {
41  class ZMQParent {
42  public:
44  ~ZMQParent();
45 
47  template<class AZMQMessage>
48  static void send(std::unique_ptr<zmq::socket_t>& socket, AZMQMessage message);
49 
62  static unsigned int poll(const std::vector<zmq::socket_t*>& socketList, int timeout);
63 
65  static std::string createIdentity(unsigned int pid = 0);
66 
68  void terminate();
69 
80  template<int AZMQType>
81  std::unique_ptr<zmq::socket_t> createSocket(const std::string& socketAddress, bool bind);
82 
92  template<int AZMQType>
93  std::unique_ptr<zmq::socket_t> createSocket(const std::string& socketAddress);
94 
96  void reset();
97 
98  private:
100  std::unique_ptr<zmq::context_t> m_context;
101 
103  void initialize();
104  };
105 
106  template<int AZMQType>
107  std::unique_ptr<zmq::socket_t> ZMQParent::createSocket(const std::string& socketAddress, bool bind)
108  {
109  initialize();
110 
111  try {
112  // TODO: options to test additionally in the future:
113  // * ZMQ_IMMEDIATE
114  // * ZMQ_PROBE_ROUTER
115  // * ZMQ_ROUTER_MANDATORY
116  // * ZMQ_ROUTER_NOTIFY
117  auto socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
118 
119  if (AZMQType == ZMQ_DEALER) {
120  auto identity = createIdentity();
121  socket->setsockopt(ZMQ_IDENTITY, identity.c_str(), identity.length());
122  }
123 
124  // Linger means: close the socket immediately if requested, do not wait until all messages have been sent.
125  // This is needed because if we want to ABORT, we usually know what we are doing (and want to do it now).
126  socket->setsockopt(ZMQ_LINGER, 0);
127  if (bind) {
128  socket->bind(socketAddress.c_str());
129  } else {
130  socket->connect(socketAddress.c_str());
131  }
132 
133  // Wait a bit to give the framework time to initialize the socket
134  std::this_thread::sleep_for(std::chrono::milliseconds(100));
135 
136  return socket;
137  } catch (zmq::error_t& error) {
138  B2FATAL("Creating the ZMQ socket for address " << socketAddress << " failed because of: " << error.what());
139  }
140  }
141 
142  template<int AZMQType>
143  std::unique_ptr<zmq::socket_t> ZMQParent::createSocket(const std::string& socketAddress)
144  {
145  // We only check for "*" in the full address for now. This is fine, as neither a valid hostname nor a valid
146  // port number can ever contain a "*".
147  if (socketAddress.find("*") != std::string::npos) {
148  return createSocket<AZMQType>(socketAddress, true);
149  } else {
150  return createSocket<AZMQType>(socketAddress, false);
151  }
152  }
153 
154  template<class AZMQMessage>
155  void ZMQParent::send(std::unique_ptr<zmq::socket_t>& socket, AZMQMessage message)
156  {
157  AZMQMessage::element_type::toSocket(std::move(message), socket);
158  }
159 
161 } // namespace Belle2
Belle2::ZMQParent::poll
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:58
Belle2::ZMQParent::m_context
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
Definition: ZMQParent.h:108
Belle2::ZMQParent::send
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:163
Belle2::ZMQParent::reset
void reset()
Expert function: Reset the parent without context closing. ATTENTION: which will not clean up properl...
Definition: ZMQParent.cc:29
Belle2::ZMQParent::~ZMQParent
~ZMQParent()
Destroy the parent by terminating the ZMQ context.
Definition: ZMQParent.cc:16
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQParent::terminate
void terminate()
Terminate the parent manually (before calling its destructor). You probably do not need to do this.
Definition: ZMQParent.cc:21
Belle2::ZMQParent::initialize
void initialize()
Initialize the parent by creating the context.
Definition: ZMQParent.cc:46
Belle2::ZMQParent::createSocket
std::unique_ptr< zmq::socket_t > createSocket(const std::string &socketAddress, bool bind)
Create a socket of the given type with the given address and bind or not bind it.
Definition: ZMQParent.h:115
Belle2::ZMQParent::createIdentity
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:34