Belle II Software  release-08-01-10
ERecoMasterCallback.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include "daq/expreco/ERecoMasterCallback.h"
9 
10 #include "daq/expreco/ERecoRunControlCallback.h"
11 
12 #include "daq/rfarm/manager/RFCommand.h"
13 #include "daq/rfarm/manager/RfNodeInfo.h"
14 #include "daq/rfarm/manager/RfUnitInfo.h"
15 
16 #include <daq/slc/nsm/NSMCommunicator.h>
17 #include <daq/slc/runcontrol/RCCommand.h>
18 #include <daq/slc/runcontrol/RCHandlerException.h>
19 #include <daq/slc/system/Time.h>
20 
21 #include <daq/slc/base/StringUtil.h>
22 #include <daq/slc/base/TimeoutException.h>
23 
24 using namespace Belle2;
25 
26 ERecoMasterCallback::ERecoMasterCallback(ConfigFile& config) : m_callback(NULL)
27 {
28  reg(RFCommand::CONFIGURE);
29  reg(RFCommand::UNCONFIGURE);
30  reg(RFCommand::START);
31  reg(RFCommand::STOP);
32  reg(RFCommand::RESUME);
33  reg(RFCommand::PAUSE);
34  reg(RFCommand::RESTART);
35  reg(RFCommand::STATUS);
36 
37  m_rcnode = NSMNode(config.get("ereco.rcnode"));
38  LogFile::debug("ERecoMaster started: Bridge Node = %s", m_rcnode.getName().c_str());
39 }
40 
41 void ERecoMasterCallback::initialize(const DBObject& obj)
42 {
43  configure(obj);
44 }
45 
46 void ERecoMasterCallback::configure(const DBObject& obj)
47 {
48  m_nodes = NSMNodeList();
49  m_dataname = StringList();
50  const std::string format = obj("system").getText("nsmdata");
51  // 0. Register DqmServer
52  if (obj.hasObject("dqmserver")) {
53  m_nodes.push_back(NSMNode(obj("dqmserver").getText("nodename")));
54  }
55  // 1. Register distributor
56  std::string nodename = obj("distributor").getText("nodename");
57  m_nodes.push_back(NSMNode(nodename));
58  addData(nodename, format);
59  // 2. Register event processors
60  const DBObject& processor(obj("processor"));
61  int maxnodes = processor.getInt("nnodes");
62  int idbase = processor.getInt("idbase");
63  std::string hostbase = processor.getText("nodebase");
64  std::string badlist = processor.getText("badlist");
65  char idname[3];
66  for (int i = 0; i < maxnodes; i++) {
67  sprintf(idname, "%2.2d", idbase + i);
68  if (badlist.size() == 0 || StringUtil::find(badlist, idname)) {
69  nodename = StringUtil::form("evp_%s%2.2d", hostbase.c_str(), idbase + i);
70  m_nodes.push_back(NSMNode(nodename));
71  addData(nodename, format);
72  }
73  }
74  // 3. Register Event Sampler
75  nodename = obj("eventsampler").getText("nodename");
76  m_nodes.push_back(NSMNode(nodename));
77 
78  add(new NSMVHandlerInt("nnodes", true, false, m_nodes.size()));
79  int i = 0;
80  std::string rcconf = obj.getName();
81  if (StringUtil::find(rcconf, "@")) {
82  rcconf = StringUtil::split(rcconf, '@')[1];
83  }
84  for (NSMNodeList::iterator it = m_nodes.begin();
85  it != m_nodes.end(); ++it) {
86  NSMNode& node(*it);
87  RCState s(node.getState());
88  std::string vname = StringUtil::form("node[%d]", i++);
89  add(new NSMVHandlerText(vname + ".rcstate", true, false, s.getLabel()));
90  add(new NSMVHandlerText(vname + ".rcconfig", true, false, rcconf));
91  add(new NSMVHandlerText(vname + ".name", true, false,
92  StringUtil::tolower(node.getName())));
93  vname = StringUtil::tolower(node.getName());
94  add(new NSMVHandlerText(vname + ".rcstate", true, false, s.getLabel()));
95  add(new NSMVHandlerText(vname + ".rcconfig", true, false, rcconf));
96  }
97 }
98 
99 void ERecoMasterCallback::setState(NSMNode& node, const RCState& state)
100 {
101  node.setState(state);
102  int i = 0;
103  for (NSMNodeList::iterator it = m_nodes.begin();
104  it != m_nodes.end(); ++it) {
105  NSMNode& rnode(*it);
106  if (StringUtil::toupper(rnode.getName()) == StringUtil::toupper(node.getName())) {
107  std::string vname = StringUtil::form("node[%d].rcstate", i);
108  set(vname, state.getLabel());
109  break;
110  }
111  i++;
112  }
113  std::string vname = StringUtil::tolower(node.getName()) + ".rcstate";
114  set(vname, state.getLabel());
115 }
116 
117 void ERecoMasterCallback::monitor()
118 {
119  if (!m_callback->getData().isAvailable()) return;
120  RfUnitInfo* unitinfo = (RfUnitInfo*)m_callback->getData().get();
121  unitinfo->nnodes = m_dataname.size();
122  unitinfo->updatetime = Time().get();
123  unitinfo->rcstate = getNode().getState().getId();
124  int i = 0;
125  for (StringList::iterator it = m_dataname.begin();
126  it != m_dataname.end(); ++it) {
127  NSMData& data(getData(*it));
128  try {
129  if (data.isAvailable()) {
130  RfUnitInfo::RfNodeInfo* nodeinfo_o = &unitinfo->nodeinfo[i];
131  RfNodeInfo* nodeinfo_i = (RfNodeInfo*)data.get();
132  memcpy(nodeinfo_o, nodeinfo_i, sizeof(RfNodeInfo));
133  }
134  } catch (const NSMHandlerException& e) {}
135  i++;
136  }
137  static unsigned long long count = 0;
138  for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
139  NSMNode& node(*it);
140  if (count % 60 == 59 || node.getState() == RCState::UNKNOWN) {
141  if (NSMCommunicator::send(NSMMessage(node, RFCommand::STATUS))) {
142  } else {
143  setState(node, RCState::UNKNOWN);
144  }
145  }
146  }
147  count++;
148 }
149 
150 void ERecoMasterCallback::ok(const char* nodename, const char* data)
151 {
152  for (NSMNodeList::iterator it = m_nodes.begin();
153  it != m_nodes.end(); ++it) {
154  NSMNode& node(*it);
155  if (node.getName() == nodename) {
156  if (strcmp(data, "Configured") == 0) {
157  setState(node, RCState::READY_S);
158  } else if (strcmp(data, "Unconfigured") == 0) {
159  setState(node, RCState::NOTREADY_S);
160  } else if (strcmp(data, "Stopped") == 0) {
161  setState(node, RCState::READY_S);
162  } else if (strcmp(data, "Running") == 0) {
163  setState(node, RCState::RUNNING_S);
164  } else if (strcmp(data, "Idle") == 0) {
165  setState(node, RCState::PAUSED_S);
166  }
167  LogFile::debug("OK << %s (%s)", nodename, data);
168  return;
169  }
170  }
171  LogFile::warning("OK from unknown node %s (%s)", nodename, data);
172 }
173 
174 void ERecoMasterCallback::error(const char* nodename, const char* data)
175 {
176  for (NSMNodeList::iterator it = m_nodes.begin();
177  it != m_nodes.end(); ++it) {
178  NSMNode& node(*it);
179  if (node.getName() == nodename) {
180  LogFile::error("ERROR << %s (%s)", nodename, data);
181  setState(node, RCState::NOTREADY_S);
182  return;
183  }
184  }
185  LogFile::warning("Error from unknwon node %s : %s)", nodename, data);
186 }
187 
188 void ERecoMasterCallback::load(const DBObject& /*db*/, const std::string& /*runtype*/)
189 {
190  for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
191  NSMNode& node(*it);
192  printf("Loading : %s\n", node.getName().c_str());
193  if (node.getName().find("EVP") == std::string::npos) {
194  while (true) {
195  bool configured = true;
196  for (NSMNodeList::iterator it2 = m_nodes.begin();
197  it2 != it; ++it2) {
198  if (it2->getState() != RCState::READY_S)
199  configured = false;
200  }
201  if (configured) break;
202  try {
203  perform(NSMCommunicator::select(30));
204  } catch (const TimeoutException& e) {}
205  }
206  }
207  if (node.getState() != RCState::READY_S) {
208  printf("ERecoMasterCallback::load : loading %s\n", (node.getName()).c_str());
209  if (NSMCommunicator::send(NSMMessage(node, RCCommand::LOAD))) {
210  setState(node, RCState::LOADING_TS);
211  LogFile::debug("%s >> LOADING", node.getName().c_str());
212  } else {
213  throw (RCHandlerException("Failed to configure %s", node.getName().c_str()));
214  }
215  } else {
216  LogFile::debug("%s is READY", node.getName().c_str());
217  }
218  }
219  while (true) {
220  bool configured = true;
221  for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
222  if (it->getState() != RCState::READY_S) configured = false;
223  }
224  if (configured) break;
225  try {
226  perform(NSMCommunicator::select(30));
227  } catch (const TimeoutException& e) {}
228  }
229  if (m_callback != NULL)
230  m_callback->setState(RCState::READY_S);
231  RCCallback::setState(RCState::READY_S);
232 }
233 
234 void ERecoMasterCallback::abort()
235 {
236  for (NSMNodeList::reverse_iterator it = m_nodes.rbegin();
237  it != m_nodes.rend(); ++it) {
238  NSMNode& node(*it);
239  if (node.getName().find("EVP") == std::string::npos) {
240  while (true) {
241  bool unconfigured = true;
242  for (NSMNodeList::reverse_iterator it2 = m_nodes.rbegin();
243  it2 != it; ++it2) {
244  if (it2->getState() != RCState::NOTREADY_S)
245  unconfigured = false;
246  }
247  if (unconfigured) break;
248  try {
249  perform(NSMCommunicator::select(30));
250  } catch (const TimeoutException& e) {}
251  }
252  }
253  if (node.getState() != RCState::NOTREADY_S) {
254  if (NSMCommunicator::send(NSMMessage(node, RCCommand::ABORT))) {
255  setState(node, RCState::ABORTING_RS);
256  LogFile::debug("%s >> ABORTING", node.getName().c_str());
257  } else {
258  throw (RCHandlerException("Failed to unconfigure %s", node.getName().c_str()));
259  }
260  } else {
261  LogFile::debug("%s is NOTREADY", node.getName().c_str());
262  }
263  }
264  while (true) {
265  bool unconfigured = true;
266  for (NSMNodeList::reverse_iterator it = m_nodes.rbegin(); it != m_nodes.rend(); ++it) {
267  if (it->getState() != RCState::NOTREADY_S) unconfigured = false;
268  }
269  if (unconfigured) break;
270  try {
271  perform(NSMCommunicator::select(30));
272  } catch (const TimeoutException& e) {}
273  }
274  if (m_callback != NULL)
275  m_callback->setState(RCState::NOTREADY_S);
276  RCCallback::setState(RCState::NOTREADY_S);
277 }
278 
279 void ERecoMasterCallback::start(int expno, int runno)
280 {
281  int pars[] = {expno, runno};
282  std::string script;
283  if (getNode().getName().find("_") != std::string::npos) {
284  get("hlt.script", script);
285  // script = "run_processor.py";
286  } else {
287  LogFile::debug("asking database : node = %s", m_rcnode.getName().c_str());
288  get(m_rcnode, "RUNCONTROL@hlt.script", script);
289  // get(NSMNode("BERECO1"), "RUNCONTROL@hlt.script", script);
290  // get(m_rcnode, "hlt.script", script, 30);
291  script = "run_" + script + ".py";
292  printf("ERECO script = %s\n", script.c_str());
293  }
294  for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
295  NSMNode& node(*it);
296  if (node.getName().find("EVP") == std::string::npos) {
297  while (true) {
298  bool configured = true;
299  for (NSMNodeList::iterator it2 = m_nodes.begin();
300  it2 != it; ++it2) {
301  if (it2->getState() != RCState::RUNNING_S)
302  configured = false;
303  }
304  if (configured) break;
305  try {
306  perform(NSMCommunicator::select(30));
307  } catch (const TimeoutException& e) {}
308  }
309  }
310  if (node.getState() != RCState::RUNNING_S) {
311  if (NSMCommunicator::send(NSMMessage(node, RCCommand::START, 2, pars, script))) {
312  setState(node, RCState::STARTING_TS);
313  LogFile::debug("%s >> STARTING (script=%s)", node.getName().c_str(), script.c_str());
314  } else {
315  throw (RCHandlerException("Failed to configure %s", node.getName().c_str()));
316  }
317  } else {
318  LogFile::debug("%s is RUNNING", node.getName().c_str());
319  }
320  }
321  while (true) {
322  bool configured = true;
323  for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
324  if (it->getState() != RCState::RUNNING_S) configured = false;
325  }
326  if (configured) break;
327  try {
328  perform(NSMCommunicator::select(30));
329  } catch (const TimeoutException& e) {}
330  }
331  char cmdline[] = "hsendcommand DQMRC:CLEAR erctl 9991";
332  system(cmdline);
333  printf("ERecoMaster : DQM TMemFile cleared\n");
334 
335  if (m_callback != NULL)
336  m_callback->setState(RCState::RUNNING_S);
337  RCCallback::setState(RCState::RUNNING_S);
338 }
339 
340 void ERecoMasterCallback::stop()
341 {
342  for (NSMNodeList::reverse_iterator it = m_nodes.rbegin();
343  it != m_nodes.rend(); ++it) {
344  NSMNode& node(*it);
345  if (node.getName().find("EVP") == std::string::npos) {
346  while (true) {
347  bool unconfigured = true;
348  for (NSMNodeList::reverse_iterator it2 = m_nodes.rbegin();
349  it2 != it; ++it2) {
350  if (it2->getState() != RCState::READY_S)
351  unconfigured = false;
352  }
353  if (unconfigured) break;
354  try {
355  perform(NSMCommunicator::select(30));
356  } catch (const TimeoutException& e) {}
357  }
358  }
359  if (node.getState() != RCState::READY_S) {
360  if (NSMCommunicator::send(NSMMessage(node, RCCommand::STOP))) {
361  setState(node, RCState::STOPPING_TS);
362  LogFile::debug("%s >> STOPPING", node.getName().c_str());
363  } else {
364  throw (RCHandlerException("Failed to stop %s", node.getName().c_str()));
365  }
366  } else {
367  LogFile::debug("%s is READY", node.getName().c_str());
368  }
369  }
370  while (true) {
371  bool unconfigured = true;
372  for (NSMNodeList::reverse_iterator it = m_nodes.rbegin(); it != m_nodes.rend(); ++it) {
373  if (it->getState() != RCState::READY_S) unconfigured = false;
374  }
375  if (unconfigured) break;
376  try {
377  perform(NSMCommunicator::select(30));
378  } catch (const TimeoutException& e) {}
379  }
380  if (m_callback != NULL)
381  m_callback->setState(RCState::READY_S);
382  RCCallback::setState(RCState::READY_S);
383 }
384 
385 bool ERecoMasterCallback::pause()
386 {
387  return true;
388 }
389 
390 bool ERecoMasterCallback::resume(int /*subno*/)
391 {
392  return true;
393 }
394 
395 void ERecoMasterCallback::recover(const DBObject& obj, const std::string& runtype)
396 {
397  abort();
398  load(obj, runtype);
399 }
400 
401 void ERecoMasterCallback::addData(const std::string& dataname, const std::string& format)
402 {
403  openData(dataname, format);
404  int i = (int)m_dataname.size();
405  std::string vname = StringUtil::form("data[%d].name", i);
406  add(new NSMVHandlerText(vname, true, false, StringUtil::tolower(dataname)));
407  vname = StringUtil::form("data[%d].format", i);
408  add(new NSMVHandlerText(vname, true, false, format));
409  m_dataname.push_back(dataname);
410 }
411 
412 bool ERecoMasterCallback::perform(NSMCommunicator& com)
413 {
414  const NSMMessage msg = com.getMessage();
415  if (RCCallback::perform(com)) {
416  return true;
417  }
418  const RFCommand cmd = msg.getRequestName();
419  try {
420  if (cmd == RFCommand::CONFIGURE) {
421  load(getDBObject(), "");
422  return true;
423  } else if (cmd == RFCommand::UNCONFIGURE) {
424  abort();
425  return true;
426  } else if (cmd == RFCommand::START) {
427  return true;
428  } else if (cmd == RFCommand::STOP) {
429  return true;
430  } else if (cmd == RFCommand::RESTART) {
431  return true;
432  } else if (cmd == RFCommand::PAUSE) {
433  return true;
434  } else if (cmd == RFCommand::RESUME) {
435  return true;
436  } else if (cmd == RFCommand::STATUS) {
437  return true;
438  }
439  } catch (const RCHandlerException& e) {
440  std::string emsg = StringUtil::form("%s. aborting", e.what());
441  LogFile::error(emsg);
442  reply(NSMMessage(NSMCommand::ERROR, emsg));
443  try {
444  RCCallback::setState(RCState::ABORTING_RS);
445  abort();
446  RCCallback::setState(RCState::NOTREADY_S);
447  } catch (const RCHandlerException& ex) {
448  std::string exmsg = StringUtil::form("Failed to abort : %s", ex.what());
449  LogFile::fatal(exmsg);
450  }
451  } catch (const std::exception& e) {
452  LogFile::fatal("Unknown exception: %s. terminating process", e.what());
453  }
454  return false;
455 }
TString getName(const TObject *obj)
human-readable name (e.g.
Definition: ObjectInfo.cc:45
Abstract base class for different kinds of events.