Belle II Software  release-08-01-10
ZMQParent.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <framework/logging/Logger.h>
11 
12 #include <zmq.hpp>
13 #include <string>
14 #include <memory>
15 #include <thread>
16 #include <chrono>
17 
18 namespace Belle2 {
39  class ZMQParent {
40  public:
42  ~ZMQParent();
43 
45  template<class AZMQMessage>
46  static void send(std::unique_ptr<zmq::socket_t>& socket, AZMQMessage message);
47 
60  static unsigned int poll(const std::vector<zmq::socket_t*>& socketList, int timeout);
61 
63  static std::string createIdentity(unsigned int pid = 0);
64 
66  void terminate();
67 
78  template<int AZMQType>
79  std::unique_ptr<zmq::socket_t> createSocket(const std::string& socketAddress, bool bind);
80 
90  template<int AZMQType>
91  std::unique_ptr<zmq::socket_t> createSocket(const std::string& socketAddress);
92 
94  void reset();
95 
96  private:
98  std::unique_ptr<zmq::context_t> m_context;
99 
101  void initialize();
102  };
103 
104  template<int AZMQType>
105  std::unique_ptr<zmq::socket_t> ZMQParent::createSocket(const std::string& socketAddress, bool bind)
106  {
107  initialize();
108 
109  try {
110  // TODO: options to test additionally in the future:
111  // * ZMQ_IMMEDIATE
112  // * ZMQ_PROBE_ROUTER
113  // * ZMQ_ROUTER_MANDATORY
114  // * ZMQ_ROUTER_NOTIFY
115  auto socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
116 
117  if (AZMQType == ZMQ_DEALER) {
118  auto identity = createIdentity();
119  socket->set(zmq::sockopt::routing_id, identity);
120  }
121 
122  // Linger means: close the socket immediately if requested, do not wait until all messages have been sent.
123  // This is needed because if we want to ABORT, we usually know what we are doing (and want to do it now).
124  socket->set(zmq::sockopt::linger, 0);
125  if (bind) {
126  socket->bind(socketAddress.c_str());
127  } else {
128  socket->connect(socketAddress.c_str());
129  }
130 
131  // Wait a bit to give the framework time to initialize the socket
132  std::this_thread::sleep_for(std::chrono::milliseconds(100));
133 
134  return socket;
135  } catch (zmq::error_t& error) {
136  B2FATAL("Creating the ZMQ socket for address " << socketAddress << " failed because of: " << error.what());
137  }
138  }
139 
140  template<int AZMQType>
141  std::unique_ptr<zmq::socket_t> ZMQParent::createSocket(const std::string& socketAddress)
142  {
143  // We only check for "*" in the full address for now. This is fine, as neither a valid hostname nor a valid
144  // port number can ever contain a "*".
145  if (socketAddress.find("*") != std::string::npos) {
146  return createSocket<AZMQType>(socketAddress, true);
147  } else {
148  return createSocket<AZMQType>(socketAddress, false);
149  }
150  }
151 
152  template<class AZMQMessage>
153  void ZMQParent::send(std::unique_ptr<zmq::socket_t>& socket, AZMQMessage message)
154  {
155  AZMQMessage::element_type::toSocket(std::move(message), socket);
156  }
157 
159 } // namespace Belle2
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:39
void initialize()
Initialize the parent by creating the context.
Definition: ZMQParent.cc:44
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
void terminate()
Terminate the parent manually (before calling its destructor). You probably do not need to do this.
Definition: ZMQParent.cc:19
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:32
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
Definition: ZMQParent.h:98
~ZMQParent()
Destroy the parent by terminating the ZMQ context.
Definition: ZMQParent.cc:14
void reset()
Expert function: Reset the parent without context closing. ATTENTION: which will not clean up properl...
Definition: ZMQParent.cc:27
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
std::unique_ptr< zmq::socket_t > createSocket(const std::string &socketAddress, bool bind)
Create a socket of the given type with the given address and bind or not bind it.
Definition: ZMQParent.h:105
Abstract base class for different kinds of events.