Belle II Software  release-08-01-10
ZMQClient.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/zmq/sockets/ZMQClient.h>
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11 
12 #include <thread>
13 #include <chrono>
14 
15 using namespace std;
16 using namespace Belle2;
17 
18 void ZMQClient::terminate(bool sendGoodbye)
19 {
20  if (m_pubSocket and sendGoodbye) {
21  auto multicastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage, getpid());
22  publish(std::move(multicastMessage));
23  }
24 
25  if (m_socket) {
26  m_socket->close();
27  m_socket.release();
28  }
29  if (m_pubSocket) {
30  m_pubSocket->close();
31  m_pubSocket.release();
32  }
33  if (m_subSocket) {
34  m_subSocket->close();
35  m_subSocket.release();
36  }
37  if (m_context) {
38  m_context->close();
39  m_context.release();
40  }
41 }
42 
43 void ZMQClient::reset()
44 {
45  m_context.release();
46  m_subSocket.release();
47  m_pubSocket.release();
48  m_socket.release();
49 }
50 
51 
52 template <int AZMQType>
53 void ZMQClient::initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress,
54  const std::string& socketAddress, bool bind)
55 {
56  initialize(pubSocketAddress, subSocketAddress);
57  m_socket = std::make_unique<zmq::socket_t>(*m_context, AZMQType);
58 
59  if (AZMQType == ZMQ_DEALER) {
60  const std::string uniqueID = std::to_string(getpid());
61  m_socket->set(zmq::sockopt::routing_id, uniqueID);
62  }
63 
64  m_socket->set(zmq::sockopt::linger, 0);
65  if (bind) {
66  m_socket->bind(socketAddress.c_str());
67  } else {
68  m_socket->connect(socketAddress.c_str());
69  }
70 
71  // Give the sockets some time to start
72  std::this_thread::sleep_for(std::chrono::milliseconds(10));
73 
74  B2DEBUG(100, "Created socket: " << socketAddress);
75 
76  m_pollSocketPtrList.push_back(m_socket.get());
77 }
78 
79 void ZMQClient::initialize(const std::string& pubSocketAddress, const std::string& subSocketAddress)
80 {
81  m_context = std::make_unique<zmq::context_t>(1);
82  m_pubSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_PUB);
83  m_subSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_SUB);
84 
85  m_pubSocket->connect(pubSocketAddress);
86  m_pubSocket->set(zmq::sockopt::linger, 0);
87 
88  m_subSocket->connect(subSocketAddress);
89  m_subSocket->set(zmq::sockopt::linger, 0);
90 
91  B2DEBUG(200, "Having initialized multicast with sub on " << subSocketAddress << " and pub on " << pubSocketAddress);
92 
93  std::this_thread::sleep_for(std::chrono::milliseconds(10));
94 
95  m_pollSocketPtrList.clear();
96  m_pollSocketPtrList.push_back(m_subSocket.get());
97 }
98 
99 void ZMQClient::subscribe(EMessageTypes filter)
100 {
101  B2ASSERT("Can only run this on started clients", m_subSocket);
102  char char_filter[2];
103  char_filter[0] = static_cast<char>(filter);
104  char_filter[1] = 0;
105  m_subSocket->set(zmq::sockopt::subscribe, char_filter);
106 }
107 
108 void ZMQClient::send(zmq::message_t& message) const
109 {
110  B2ASSERT("Can only run this on started clients", m_socket);
111  m_socket->send(message, zmq::send_flags::none);
112 }
113 
114 #if defined(__GNUC__) && !defined(__clang__)
115 #pragma GCC diagnostic push
116 #pragma GCC diagnostic ignored "-Wstack-usage="
117 #endif
118 int ZMQClient::pollSocketVector(const std::vector<zmq::socket_t*>& socketList, int timeout)
119 {
120  auto start = std::chrono::system_clock::now();
121  int return_bitmask = 0;
122  assert(socketList.size() <= 2);
123  zmq::pollitem_t items[socketList.size()];
124 
125  for (unsigned int i = 0; i < socketList.size(); i++) {
126  items[i].socket = static_cast<void*>(*socketList[i]);
127  items[i].events = ZMQ_POLLIN;
128  items[i].revents = 0;
129  }
130 
131  while (timeout >= 0) {
132  try {
133  zmq::poll(items, socketList.size(), timeout);
134 
135  for (unsigned int i = 0; i < socketList.size(); i++) {
136  if (static_cast<bool>(items[i].revents & ZMQ_POLLIN)) {
137  return_bitmask = return_bitmask | 1 << i;
138  }
139  }
140  return return_bitmask;
141  } catch (zmq::error_t& error) {
142  if (error.num() == EINTR) {
143  auto now = std::chrono::system_clock::now();
144  timeout -= std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
145  } else {
146  // cannot handle, rethrow exception
147  throw;
148  }
149  }
150  }
151  return 0;
152 }
153 #if defined(__GNUC__) && !defined(__clang__)
154 #pragma GCC diagnostic pop
155 #endif
156 
157 template void Belle2::ZMQClient::initialize<ZMQ_PUSH>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
158  const std::string& socketAddress, bool bind);
159 template void Belle2::ZMQClient::initialize<ZMQ_PULL>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
160  const std::string& socketAddress, bool bind);
161 template void Belle2::ZMQClient::initialize<ZMQ_DEALER>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
162  const std::string& socketAddress, bool bind);
163 template void Belle2::ZMQClient::initialize<ZMQ_ROUTER>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
164  const std::string& socketAddress, bool bind);
165 template void Belle2::ZMQClient::initialize<ZMQ_PUB>(const std::string& pubSocketAddress, const std::string& subSocketAddress,
166  const std::string& socketAddress, bool bind);
EMessageTypes
Type the messages can have.
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:38
Abstract base class for different kinds of events.