Belle II Software light-2406-ragdoll
ZMQParent.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <framework/logging/Logger.h>
11
12#include <zmq.hpp>
13#include <string>
14#include <memory>
15#include <thread>
16#include <chrono>
17
18namespace Belle2 {
39 class ZMQParent {
40 public:
42 ~ZMQParent();
43
45 template<class AZMQMessage>
46 static void send(std::unique_ptr<zmq::socket_t>& socket, AZMQMessage message);
47
60 static unsigned int poll(const std::vector<zmq::socket_t*>& socketList, int timeout);
61
63 static std::string createIdentity(unsigned int pid = 0);
64
66 void terminate();
67
78 template<int AZMQType>
79 std::unique_ptr<zmq::socket_t> createSocket(const std::string& socketAddress, bool bind);
80
90 template<int AZMQType>
91 std::unique_ptr<zmq::socket_t> createSocket(const std::string& socketAddress);
92
94 void reset();
95
96 private:
98 std::unique_ptr<zmq::context_t> m_context;
99
101 void initialize();
102 };
103
104 template<int AZMQType>
105 std::unique_ptr<zmq::socket_t> ZMQParent::createSocket(const std::string& socketAddress, bool bind)
106 {
107 initialize();
108
109 try {
110 // TODO: options to test additionally in the future:
111 // * ZMQ_IMMEDIATE
112 // * ZMQ_PROBE_ROUTER
113 // * ZMQ_ROUTER_MANDATORY
114 // * ZMQ_ROUTER_NOTIFY
115 auto socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
116
117 if (AZMQType == ZMQ_DEALER) {
118 auto identity = createIdentity();
119 socket->set(zmq::sockopt::routing_id, identity);
120 }
121
122 // Linger means: close the socket immediately if requested, do not wait until all messages have been sent.
123 // This is needed because if we want to ABORT, we usually know what we are doing (and want to do it now).
124 socket->set(zmq::sockopt::linger, 0);
125 if (bind) {
126 socket->bind(socketAddress.c_str());
127 } else {
128 socket->connect(socketAddress.c_str());
129 }
130
131 // Wait a bit to give the framework time to initialize the socket
132 std::this_thread::sleep_for(std::chrono::milliseconds(100));
133
134 return socket;
135 } catch (zmq::error_t& error) {
136 B2FATAL("Creating the ZMQ socket for address " << socketAddress << " failed because of: " << error.what());
137 }
138 }
139
140 template<int AZMQType>
141 std::unique_ptr<zmq::socket_t> ZMQParent::createSocket(const std::string& socketAddress)
142 {
143 // We only check for "*" in the full address for now. This is fine, as neither a valid hostname nor a valid
144 // port number can ever contain a "*".
145 if (socketAddress.find("*") != std::string::npos) {
146 return createSocket<AZMQType>(socketAddress, true);
147 } else {
148 return createSocket<AZMQType>(socketAddress, false);
149 }
150 }
151
152 template<class AZMQMessage>
153 void ZMQParent::send(std::unique_ptr<zmq::socket_t>& socket, AZMQMessage message)
154 {
155 AZMQMessage::element_type::toSocket(std::move(message), socket);
156 }
157
159} // namespace Belle2
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:39
void initialize()
Initialize the parent by creating the context.
Definition: ZMQParent.cc:44
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
void terminate()
Terminate the parent manually (before calling its destructor). You probably do not need to do this.
Definition: ZMQParent.cc:19
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:32
std::unique_ptr< zmq::context_t > m_context
ZMQ context.
Definition: ZMQParent.h:98
~ZMQParent()
Destroy the parent by terminating the ZMQ context.
Definition: ZMQParent.cc:14
void reset()
Expert function: Reset the parent without context closing. ATTENTION: which will not clean up properl...
Definition: ZMQParent.cc:27
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
std::unique_ptr< zmq::socket_t > createSocket(const std::string &socketAddress, bool bind)
Create a socket of the given type with the given address and bind or not bind it.
Definition: ZMQParent.h:105
Abstract base class for different kinds of events.
Definition: ClusterUtils.h:24