Belle II Software  release-08-01-10
NSMCommunicator.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 // revisions
9 //
10 // 20191119 - following currently unused functions are removed
11 // void NSMCommunicator::callContext()
12 // handling nsmlib_recv inside a user program is against the usage of NSM2
13 // const std::string NSMCommunicator::getNodeHost(const std::string& nodename)
14 // making hostname visible to application is against the philosophy of NSM2
15 // const std::string NSMCommunicator::getNodeHost()
16 // accessing NSM2 internal structure is against the philosophy of NSM2
17 
18 #include "daq/slc/nsm/NSMCommunicator.h"
19 
20 #include <daq/slc/base/TimeoutException.h>
21 #include <daq/slc/nsm/NSMCallback.h>
22 #include <daq/slc/nsm/NSMHandlerException.h>
23 #include <daq/slc/nsm/NSMNotConnectedException.h>
24 
25 extern "C" {
26 #include <nsm2/nsm2.h>
27 #include <nsm2/nsmlib2.h>
28 #include <nsm2/belle2nsm.h>
29 }
30 
31 #include <cmath>
32 #include <cstdio>
33 #include <cstring>
34 
35 #include <sys/select.h>
36 #include <errno.h>
37 
38 #include <netinet/in.h>
39 #include <arpa/inet.h>
40 #include <daq/slc/system/LockGuard.h>
41 
42 #define NSM_DEBUGMODE 1
43 
44 using namespace Belle2;
45 
46 NSMCommunicatorList NSMCommunicator::g_comm;
47 Mutex NSMCommunicator::g_mutex;
48 Mutex NSMCommunicator::g_mutex_select;
49 
50 NSMCommunicator& NSMCommunicator::select(double usec)
51 {
52  fd_set fds;
53  int ret;
54  FD_ZERO(&fds);
55  int highest = 0;
56  LockGuard lockGuard(g_mutex_select);
57  for (NSMCommunicatorList::iterator it = g_comm.begin();
58  it != g_comm.end(); ++it) {
59  NSMCommunicator& com(*(*it));
60  if (com.m_nsmc != NULL) {
61  FD_SET(com.m_nsmc->sock, &fds);
62  if (highest < com.m_nsmc->sock)
63  highest = com.m_nsmc->sock;
64  }
65  }
66  while (true) {
67  errno = 0;
68  if (usec >= 0) {
69  double s = 0;
70  double us = modf(usec, &s);
71  timeval t = {(long)s, (long)(us * 1000000)};
72  ret = ::select(highest + 1, &fds, NULL, NULL, &t);
73  } else {
74  ret = ::select(highest + 1, &fds, NULL, NULL, NULL);
75  }
76  if (ret != -1 || (errno != EINTR && errno != EAGAIN)) break;
77  }
78  if (ret < 0) {
79  throw (NSMHandlerException("Failed to select"));
80  }
81  for (NSMCommunicatorList::iterator it = g_comm.begin();
82  it != g_comm.end(); ++it) {
83  NSMCommunicator& com(*(*it));
84  if (com.m_nsmc != NULL) {
85  if (FD_ISSET(com.m_nsmc->sock, &fds)) {
86  com.m_message.read(com.m_nsmc);
87  com.m_message.setRequestName();
88  b2nsm_context(com.m_nsmc);
89  return com;
90  }
91  }
92  }
93  throw (TimeoutException("NSMCommunicator::select was timed out"));
94 }
95 
96 NSMCommunicator& NSMCommunicator::connected(const std::string& node)
97 {
98  for (NSMCommunicatorList::iterator it = g_comm.begin();
99  it != g_comm.end(); ++it) {
100  NSMCommunicator& com(*(*it));
101  if (com.isConnected(node)) return com;
102  }
103  throw (NSMNotConnectedException("No connection for " + node));
104 }
105 
106 bool NSMCommunicator::send(const NSMMessage& msg)
107 {
108  bool sent = false;
109  bool alive = false;
110 #if NSM_PACKAGE_VERSION >= 1914
111  LockGuard lockGuard(g_mutex);
112  std::string emsg;
113  for (NSMCommunicatorList::iterator it = g_comm.begin();
114  it != g_comm.end(); ++it) {
115  NSMCommunicator& com(*(*it));
116  const char* node = msg.getNodeName();
117  if (com.isConnected(node)) {
118  alive = true;
119  const char* req = msg.getRequestName();
120  if (node != NULL && req != NULL &&
121  strlen(node) > 0 && strlen(req) > 0) {
122  b2nsm_context(com.m_nsmc);
123  if (b2nsm_sendany(node, req, msg.getNParams(), (int*)msg.getParams(),
124  msg.getLength(), msg.getData(), NULL) < 0) {
125  emsg = b2nsm_strerror();
126  }
127  sent = true;
128  }
129  }
130  }
131  if (alive && !sent) {
132  throw (NSMHandlerException("Failed to send request: " + emsg));
133  }
134 #else
135 #warning "Wrong version of nsm2. try source daq/slc/extra/nsm2/export.sh"
136 #endif
137  return sent;
138 }
139 
140 NSMCommunicator::NSMCommunicator(const std::string& host, int port)
141 {
142  m_id = -1;
143  m_nsmc = NULL;
144  m_callback = NULL;
145  m_host = host;
146  m_port = port;
147 }
148 
149 NSMCommunicator::NSMCommunicator(NSMcontext* nsmc)
150 {
151  m_nsmc = nsmc;
152  m_id = m_nsmc->nodeid;
153  m_callback = NULL;
154 }
155 
156 void NSMCommunicator::init(const NSMNode& node,
157  const std::string& host, int port)
158 {
159 #if NSM_PACKAGE_VERSION >= 1914
160  LockGuard lockGuard(g_mutex);
161  if (host.size() > 0) m_host = host;
162  if (port > 0) m_port = port;
163  if (node.getName().size() == 0) {
164  throw (NSMHandlerException("Error during init2 (nodename is empty)"));
165  }
166  m_nsmc = b2nsm_init2(node.getName().c_str(), 0, host.c_str(), port, port);
167  if (m_nsmc == NULL) {
168  m_id = -1;
169  throw (NSMHandlerException("Error during init2 (%s=>%s:%d): %s",
170  node.getName().c_str(), host.c_str(),
171  m_port, b2nsm_strerror()));
172  }
173  g_comm.push_back(this);
174  nsmlib_usesig(m_nsmc, 0);
175  m_id = m_nsmc->nodeid;
176  m_node = node;
177 #else
178 #warning "Wrong version of nsm2. try source daq/slc/extra/nsm2/export.sh"
179 #endif
180 }
181 
182 NSMCallback& NSMCommunicator::getCallback()
183 {
184  if (m_callback) {
185  return *m_callback;
186  }
187  throw (std::out_of_range("No callback was registered"));
188 }
189 
190 void NSMCommunicator::setCallback(NSMCallback* callback)
191 {
192  b2nsm_context(m_nsmc);
193  if (callback != NULL) {
194  m_callback = callback;
195  NSMCallback::NSMCommandList& req_v(callback->getCommandList());
196  for (NSMCallback::NSMCommandList::iterator it = req_v.begin();
197  it != req_v.end(); ++it) {
198  NSMCommand& command(*it);
199  if (b2nsm_callback(command.getLabel(), NULL) < 0) {
200  m_id = -1;
201  throw (NSMHandlerException("Failed to register callback (%s)",
202  command.getLabel()));
203  }
204  }
205  }
206 }
207 
208 int NSMCommunicator::getNodeIdByName(const std::string& name)
209 {
210 #if NSM_PACKAGE_VERSION >= 1914
211  b2nsm_context(m_nsmc);
212  return b2nsm_nodeid(name.c_str());
213 #else
214  return -1;
215 #endif
216 }
217 
218 const std::string NSMCommunicator::getNodeNameById(int id)
219 {
220  const char* name = nsmlib_nodename(m_nsmc, id);
221  if (name == NULL) return "";
222  return name;
223 }
224 
225 int NSMCommunicator::getNodePidByName(const std::string& name)
226 {
227 #if NSM_PACKAGE_VERSION >= 1914
228  b2nsm_context(m_nsmc);
229  return b2nsm_nodepid(name.c_str());
230 #else
231  return -1;
232 #endif
233 }
234 
235 bool NSMCommunicator::isConnected(const std::string& node)
236 {
237  bool is_online = getNodeIdByName(node) >= 0 &&
238  getNodePidByName(node) > 0;
239  return is_online;
240 }
241 
242 NSMMessage NSMCommunicator::popQueue()
243 {
244  NSMMessage msg = m_msg_q.front();
245  m_msg_q.pop();
246  return msg;
247 }
248 
249 void NSMCommunicator::setMessage(const NSMMessage& msg)
250 {
251  m_message = msg;
252 }
Lock Guard for a Mutex instance.
Definition: LockGuard.h:47
Abstract base class for different kinds of events.