Belle II Software  release-05-01-25
NSMCommunicator.cc
1 // revisions
2 //
3 // 20191119 nakao - following currently unused functions are removed
4 // void NSMCommunicator::callContext()
5 // handling nsmlib_recv inside a user program is against the usage of NSM2
6 // const std::string NSMCommunicator::getNodeHost(const std::string& nodename)
7 // making hostname visible to application is against the philosophy of NSM2
8 // const std::string NSMCommunicator::getNodeHost()
9 // accessing NSM2 internal structure is against the philosophy of NSM2
10 
11 #include "daq/slc/nsm/NSMCommunicator.h"
12 
13 #include <daq/slc/base/TimeoutException.h>
14 #include <daq/slc/nsm/NSMCallback.h>
15 #include <daq/slc/nsm/NSMHandlerException.h>
16 #include <daq/slc/nsm/NSMNotConnectedException.h>
17 
18 extern "C" {
19 #include <nsm2/nsm2.h>
20 #include <nsm2/nsmlib2.h>
21 #include <nsm2/belle2nsm.h>
22 }
23 
24 #include <cmath>
25 #include <cstdio>
26 #include <cstring>
27 
28 #include <sys/select.h>
29 #include <errno.h>
30 
31 #include <netinet/in.h>
32 #include <arpa/inet.h>
33 #include <daq/slc/system/LockGuard.h>
34 
35 #define NSM_DEBUGMODE 1
36 
37 using namespace Belle2;
38 
39 NSMCommunicatorList NSMCommunicator::g_comm;
40 Mutex NSMCommunicator::g_mutex;
41 Mutex NSMCommunicator::g_mutex_select;
42 
43 NSMCommunicator& NSMCommunicator::select(double usec)
44 {
45  fd_set fds;
46  int ret;
47  FD_ZERO(&fds);
48  int highest = 0;
49  LockGuard lockGuard(g_mutex_select);
50  for (NSMCommunicatorList::iterator it = g_comm.begin();
51  it != g_comm.end(); it++) {
52  NSMCommunicator& com(*(*it));
53  if (com.m_nsmc != NULL) {
54  FD_SET(com.m_nsmc->sock, &fds);
55  if (highest < com.m_nsmc->sock)
56  highest = com.m_nsmc->sock;
57  }
58  }
59  while (true) {
60  errno = 0;
61  if (usec >= 0) {
62  double s = 0;
63  double us = modf(usec, &s);
64  timeval t = {(long)s, (long)(us * 1000000)};
65  ret = ::select(highest + 1, &fds, NULL, NULL, &t);
66  } else {
67  ret = ::select(highest + 1, &fds, NULL, NULL, NULL);
68  }
69  if (ret != -1 || (errno != EINTR && errno != EAGAIN)) break;
70  }
71  if (ret < 0) {
72  throw (NSMHandlerException("Failed to select"));
73  }
74  for (NSMCommunicatorList::iterator it = g_comm.begin();
75  it != g_comm.end(); it++) {
76  NSMCommunicator& com(*(*it));
77  if (com.m_nsmc != NULL) {
78  if (FD_ISSET(com.m_nsmc->sock, &fds)) {
79  com.m_message.read(com.m_nsmc);
80  com.m_message.setRequestName();
81  b2nsm_context(com.m_nsmc);
82  return com;
83  }
84  }
85  }
86  throw (TimeoutException("NSMCommunicator::select was timed out"));
87 }
88 
89 NSMCommunicator& NSMCommunicator::connected(const std::string& node)
90 {
91  for (NSMCommunicatorList::iterator it = g_comm.begin();
92  it != g_comm.end(); it++) {
93  NSMCommunicator& com(*(*it));
94  if (com.isConnected(node)) return com;
95  }
96  throw (NSMNotConnectedException("No connection for " + node));
97 }
98 
99 bool NSMCommunicator::send(const NSMMessage& msg)
100 {
101  bool sent = false;
102  bool alive = false;
103 #if NSM_PACKAGE_VERSION >= 1914
104  LockGuard lockGuard(g_mutex);
105  std::string emsg;
106  for (NSMCommunicatorList::iterator it = g_comm.begin();
107  it != g_comm.end(); it++) {
108  NSMCommunicator& com(*(*it));
109  const char* node = msg.getNodeName();
110  if (com.isConnected(node)) {
111  alive = true;
112  const char* req = msg.getRequestName();
113  if (node != NULL && req != NULL &&
114  strlen(node) > 0 && strlen(req) > 0) {
115  b2nsm_context(com.m_nsmc);
116  if (b2nsm_sendany(node, req, msg.getNParams(), (int*)msg.getParams(),
117  msg.getLength(), msg.getData(), NULL) < 0) {
118  emsg = b2nsm_strerror();
119  }
120  sent = true;
121  }
122  }
123  }
124  if (alive && !sent) {
125  throw (NSMHandlerException("Failed to send request: " + emsg));
126  }
127 #else
128 #warning "Wrong version of nsm2. try source daq/slc/extra/nsm2/export.sh"
129 #endif
130  return sent;
131 }
132 
133 NSMCommunicator::NSMCommunicator(const std::string& host, int port)
134 {
135  m_id = -1;
136  m_nsmc = NULL;
137  m_callback = NULL;
138  m_host = host;
139  m_port = port;
140 }
141 
142 NSMCommunicator::NSMCommunicator(NSMcontext* nsmc)
143 {
144  m_nsmc = nsmc;
145  m_id = m_nsmc->nodeid;
146  m_callback = NULL;
147 }
148 
149 void NSMCommunicator::init(const NSMNode& node,
150  const std::string& host, int port)
151 {
152 #if NSM_PACKAGE_VERSION >= 1914
153  LockGuard lockGuard(g_mutex);
154  if (host.size() > 0) m_host = host;
155  if (port > 0) m_port = port;
156  if (node.getName().size() == 0) {
157  throw (NSMHandlerException("Error during init2 (nodename is empty)"));
158  }
159  m_nsmc = b2nsm_init2(node.getName().c_str(), 0, host.c_str(), port, port);
160  if (m_nsmc == NULL) {
161  m_id = -1;
162  throw (NSMHandlerException("Error during init2 (%s=>%s:%d): %s",
163  node.getName().c_str(), host.c_str(),
164  m_port, b2nsm_strerror()));
165  }
166  g_comm.push_back(this);
167  nsmlib_usesig(m_nsmc, 0);
168  m_id = m_nsmc->nodeid;
169  m_node = node;
170 #else
171 #warning "Wrong version of nsm2. try source daq/slc/extra/nsm2/export.sh"
172 #endif
173 }
174 
175 NSMCallback& NSMCommunicator::getCallback()
176 {
177  if (m_callback) {
178  return *m_callback;
179  }
180  throw (std::out_of_range("No callback was registered"));
181 }
182 
183 void NSMCommunicator::setCallback(NSMCallback* callback)
184 {
185  b2nsm_context(m_nsmc);
186  if (callback != NULL) {
187  m_callback = callback;
188  NSMCallback::NSMCommandList& req_v(callback->getCommandList());
189  for (NSMCallback::NSMCommandList::iterator it = req_v.begin();
190  it != req_v.end(); it++) {
191  NSMCommand& command(*it);
192  if (b2nsm_callback(command.getLabel(), NULL) < 0) {
193  m_id = -1;
194  throw (NSMHandlerException("Failed to register callback (%s)",
195  command.getLabel()));
196  }
197  }
198  }
199 }
200 
201 int NSMCommunicator::getNodeIdByName(const std::string& name)
202 {
203 #if NSM_PACKAGE_VERSION >= 1914
204  b2nsm_context(m_nsmc);
205  return b2nsm_nodeid(name.c_str());
206 #else
207  return -1;
208 #endif
209 }
210 
211 const std::string NSMCommunicator::getNodeNameById(int id)
212 {
213  const char* name = nsmlib_nodename(m_nsmc, id);
214  if (name == NULL) return "";
215  return name;
216 }
217 
218 int NSMCommunicator::getNodePidByName(const std::string& name)
219 {
220 #if NSM_PACKAGE_VERSION >= 1914
221  b2nsm_context(m_nsmc);
222  return b2nsm_nodepid(name.c_str());
223 #else
224  return -1;
225 #endif
226 }
227 
228 bool NSMCommunicator::isConnected(const std::string& node)
229 {
230  bool is_online = getNodeIdByName(node) >= 0 &&
231  getNodePidByName(node) > 0;
232  return is_online;
233 }
234 
235 NSMMessage NSMCommunicator::popQueue()
236 {
237  NSMMessage msg = m_msg_q.front();
238  m_msg_q.pop();
239  return msg;
240 }
241 
242 void NSMCommunicator::setMessage(const NSMMessage& msg)
243 {
244  m_message = msg;
245 }
Belle2::NSMNode
Definition: NSMNode.h:14
Belle2::Mutex
Definition: Mutex.h:12
Belle2::GenericLockGuard
Lock Guard for a Mutex instance.
Definition: LockGuard.h:40
Belle2::NSMCommunicator
Definition: NSMCommunicator.h:25
Belle2::NSMMessage
Definition: NSMMessage.h:29
Belle2::NSMCallback
Definition: NSMCallback.h:24
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::NSMHandlerException
Definition: NSMHandlerException.h:12
Belle2::NSMNotConnectedException
Definition: NSMNotConnectedException.h:12
Belle2::NSMCommand
Definition: NSMCommand.h:12
Belle2::TimeoutException
Definition: TimeoutException.h:12
NSMcontext_struct
Definition: nsmlib2.h:66