Belle II Software  release-05-02-19
ZMQClient.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 
11 #include <framework/pcore/zmq/sockets/ZMQClient.h>
12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
13 
14 #include <thread>
15 #include <chrono>
16 
17 using namespace std;
18 using namespace Belle2;
19 
20 void ZMQClient::terminate(bool sendGoodbye)
21 {
22  if (m_pubSocket and sendGoodbye) {
23  auto multicastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage, getpid());
24  publish(std::move(multicastMessage));
25  }
26 
27  if (m_socket) {
28  m_socket->close();
29  m_socket.release();
30  }
31  if (m_pubSocket) {
32  m_pubSocket->close();
33  m_pubSocket.release();
34  }
35  if (m_subSocket) {
36  m_subSocket->close();
37  m_subSocket.release();
38  }
39  if (m_context) {
40  m_context->close();
41  m_context.release();
42  }
43 }
44 
45 void ZMQClient::reset()
46 {
47  m_context.release();
48  m_subSocket.release();
49  m_pubSocket.release();
50  m_socket.release();
51 }
52 
53 
54 template <int AZMQType>
55 void ZMQClient::initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress,
56  const std::string& socketAddress, bool bind)
57 {
58  initialize(pubSocketAddress, subSocketAddress);
59  m_socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
60 
61  if (AZMQType == ZMQ_DEALER) {
62  const std::string uniqueID = std::to_string(getpid());
63  m_socket->setsockopt(ZMQ_IDENTITY, uniqueID.c_str(), uniqueID.length());
64  }
65 
66  m_socket->setsockopt(ZMQ_LINGER, 0);
67  if (bind) {
68  m_socket->bind(socketAddress.c_str());
69  } else {
70  m_socket->connect(socketAddress.c_str());
71  }
72 
73  // Give the sockets some time to start
74  std::this_thread::sleep_for(std::chrono::milliseconds(10));
75 
76  B2DEBUG(100, "Created socket: " << socketAddress);
77 
78  m_pollSocketPtrList.push_back(m_socket.get());
79 }
80 
81 void ZMQClient::initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress)
82 {
83  m_context = std::make_unique<zmq::context_t>(1);
84  m_pubSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_PUB);
85  m_subSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_SUB);
86 
87  m_pubSocket->connect(pubSocketAddress);
88  m_pubSocket->setsockopt(ZMQ_LINGER, 0);
89 
90  m_subSocket->connect(subSocketAddress);
91  m_subSocket->setsockopt(ZMQ_LINGER, 0);
92 
93  B2DEBUG(200, "Having initialized multicast with sub on " << subSocketAddress << " and pub on " << pubSocketAddress);
94 
95  std::this_thread::sleep_for(std::chrono::milliseconds(10));
96 
97  m_pollSocketPtrList.clear();
98  m_pollSocketPtrList.push_back(m_subSocket.get());
99 }
100 
101 void ZMQClient::subscribe(EMessageTypes filter)
102 {
103  B2ASSERT("Can only run this on started clients", m_subSocket);
104  const auto char_filter = static_cast<char>(filter);
105  m_subSocket->setsockopt(ZMQ_SUBSCRIBE, &char_filter, 1);
106 }
107 
108 void ZMQClient::send(zmq::message_t& message) const
109 {
110  B2ASSERT("Can only run this on started clients", m_socket);
111  m_socket->send(message);
112 }
113 
114 #if defined(__GNUC__) && !defined(__clang__)
115 #pragma GCC diagnostic push
116 #pragma GCC diagnostic ignored "-Wstack-usage="
117 #endif
118 int ZMQClient::pollSocketVector(const std::vector<zmq::socket_t*>& socketList, int timeout)
119 {
120  auto start = std::chrono::system_clock::now();
121  int return_bitmask = 0;
122  assert(socketList.size() <= 2);
123  zmq::pollitem_t items[socketList.size()];
124 
125  for (unsigned int i = 0; i < socketList.size(); i++) {
126  items[i].socket = static_cast<void*>(*socketList[i]);
127  items[i].events = ZMQ_POLLIN;
128  items[i].revents = 0;
129  }
130 
131  while (timeout >= 0) {
132  try {
133  zmq::poll(items, socketList.size(), timeout);
134 
135  for (unsigned int i = 0; i < socketList.size(); i++) {
136  if (static_cast<bool>(items[i].revents & ZMQ_POLLIN)) {
137  return_bitmask = return_bitmask | 1 << i;
138  }
139  }
140  return return_bitmask;
141  } catch (zmq::error_t& error) {
142  if (error.num() == EINTR) {
143  auto now = std::chrono::system_clock::now();
144  timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
145  } else {
146  // cannot handle, rethrow exception
147  throw;
148  }
149  }
150  }
151  return 0;
152 }
153 #if defined(__GNUC__) && !defined(__clang__)
154 #pragma GCC diagnostic pop
155 #endif
156 
157 template void Belle2::ZMQClient::initialize<ZMQ_PUSH>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
158  const std::string& socketAddress, bool bind);
159 template void Belle2::ZMQClient::initialize<ZMQ_PULL>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
160  const std::string& socketAddress, bool bind);
161 template void Belle2::ZMQClient::initialize<ZMQ_DEALER>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
162  const std::string& socketAddress, bool bind);
163 template void Belle2::ZMQClient::initialize<ZMQ_ROUTER>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
164  const std::string& socketAddress, bool bind);
165 template void Belle2::ZMQClient::initialize<ZMQ_PUB>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
166  const std::string& socketAddress, bool bind);
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
Belle2::filter
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:43
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19