Belle II Software  release-08-01-10
RCCallback.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include "daq/slc/runcontrol/RCCallback.h"
9 
10 #include <daq/slc/database/DBHandlerException.h>
11 #include <daq/slc/database/DBInterface.h>
12 #include <daq/slc/database/DBObjectLoader.h>
13 
14 #include <daq/slc/runcontrol/RCCommand.h>
15 #include <daq/slc/runcontrol/RCHandlerException.h>
16 #include <daq/slc/runcontrol/RCHandlerFatalException.h>
17 
18 #include <daq/slc/system/LogFile.h>
19 #include <daq/slc/system/TCPSocket.h>
20 #include <daq/slc/system/TCPSocketReader.h>
21 #include <daq/slc/system/TCPSocketWriter.h>
22 
23 #include <daq/slc/nsm/NSMCommunicator.h>
24 
25 #include <sstream>
26 
27 #include <cstdlib>
28 
29 double tabort = 0;
30 
31 namespace Belle2 {
37  public:
38  RCConfigHandler(RCCallback& callback,
39  const std::string& name, const std::string& val)
40  : NSMVHandlerText(name, true, true, val), m_callback(callback) {}
41  bool handleGetText(std::string& val) override
42  {
43  const DBObject& obj(m_callback.getDBObject());
44  val = obj.getName();
45  return true;
46  }
47  bool handleSetText(const std::string& val) override
48  {
49  RCState state(m_callback.getNode().getState());
50  RCState tstate(RCCommand::CONFIGURE.nextTState());
51  m_callback.setState(tstate);
52  try {
53  m_callback.abort();
54  m_callback.dbload(val.size(), val.c_str());
55  } catch (const IOException& e) {
56  throw (RCHandlerException(e.what()));
57  }
58  const DBObject& obj(m_callback.getDBObject());
59  obj.getName();
60  m_callback.configure(obj);
61  m_callback.setState(state);
62  return true;
63  }
64  private:
65  RCCallback& m_callback;
66  };
67 
69 }
70 
71 using namespace Belle2;
72 
73 RCCallback::RCCallback(int timeout)
74  : NSMCallback(timeout)
75 {
76  reg(RCCommand::CONFIGURE);
77  reg(RCCommand::BOOT);
78  reg(RCCommand::LOAD);
79  reg(RCCommand::START);
80  reg(RCCommand::STOP);
81  reg(RCCommand::RECOVER);
82  reg(RCCommand::RESUME);
83  reg(RCCommand::PAUSE);
84  reg(RCCommand::ABORT);
85  reg(RCCommand::STATUS);
86  reg(NSMCommand::FATAL);
87  m_auto = true;
88  m_db = NULL;
89  m_showall = true;
90  m_expno = m_runno = 0;
91 }
92 
93 void RCCallback::init(NSMCommunicator&)
94 {
95  LogFile::debug("init");
96  dbload(0, 0);
97  try {
98  initialize(m_obj);
99  } catch (const RCHandlerException& e) {
100  LogFile::fatal("Failed to initialize. %s. terminating process (84)", e.what());
101  term();
102  exit(1);
103  }
104  LogFile::debug("init done");
105  setState(RCState::NOTREADY_S);
106  tabort = Date().get();
107 }
108 
109 bool RCCallback::perform(NSMCommunicator& com)
110 {
111  NSMMessage msg(com.getMessage());
112  const RCCommand cmd = msg.getRequestName();
113  const RCState state_org(getNode().getState());
114  RCState state(getNode().getState());
115  if (NSMCallback::perform(com)) return true;
116  if (cmd.isAvailable(state) == NSMCommand::DISABLED) {
117  return false;
118  }
119  addNode(NSMNode(msg.getNodeName()));
120  try {
121  set("rcrequest", msg.getRequestName());
122  } catch (const std::exception& e) {
123  LogFile::error(e.what());
124  }
125  RCState tstate(cmd.nextTState());
126  try {
127  if (tstate != Enum::UNKNOWN) {
128  log(LogFile::DEBUG, "RC request %s from %s", msg.getRequestName(), msg.getNodeName());
129  setState(tstate);
130  std::string nodename = getNode().getName();
131  bool ismaster = nodename == "RUNCONTROL" ||
132  (StringUtil::find(nodename, "RC_") && !StringUtil::find(nodename, "HLT"));
133  if (cmd == RCCommand::CONFIGURE) {
134  m_runcontrol.setName(msg.getNodeName());
135  configure_raw(msg.getLength(), msg.getData());
136  } else if (cmd == RCCommand::BOOT) {
137  m_runcontrol.setName(msg.getNodeName());
138  get(m_obj);
139  std::string opt = msg.getLength() > 0 ? msg.getData() : "";
140  boot(opt, m_obj);
141  } else if (cmd == RCCommand::LOAD) {
142  m_expno = 0;
143  m_runno = 0;
144  m_runcontrol.setName(msg.getNodeName());
145  std::string runtype = (msg.getLength() > 0 ? msg.getData() : "");
146  if (runtype.size() == 0) {
147  get("runtype", runtype);
148  } else if (runtype.size() > 0) {
149  set("runtype", runtype);
150  }
151  get(m_obj);
152  load(m_obj, runtype);
153  } else if (cmd == RCCommand::START) {
154  m_runcontrol.setName(msg.getNodeName());
155  m_expno = (msg.getNParams() > 0) ? msg.getParam(0) : 0;
156  m_runno = (msg.getNParams() > 1) ? msg.getParam(1) : 0;
157  if (ismaster) {
158  log(LogFile::NOTICE, "Run start by %s (exp=%05d, run=%06d)",
159  msg.getNodeName(), m_runno, m_expno);
160  }
161  start(m_expno, m_runno);
162  } else if (cmd == RCCommand::STOP) {
163  if (ismaster) {
164  log(LogFile::NOTICE, "Run stop by %s (exp=%05d, run=%06d)",
165  msg.getNodeName(), m_runno, m_expno);
166  }
167  stop();
168  } else if (cmd == RCCommand::RESUME) {
169  if (ismaster) {
170  log(LogFile::NOTICE, "Run resume by %s (exp=%05d, run=%06d)",
171  msg.getNodeName(), m_runno, m_expno);
172  }
173  if (!resume(msg.getParam(0))) {
174  setState(RCState::NOTREADY_S);
175  return true;
176  }
177  } else if (cmd == RCCommand::PAUSE) {
178  if (ismaster) {
179  log(LogFile::NOTICE, "Run pause by %s (exp=%05d, run=%06d)",
180  msg.getNodeName(), m_runno, m_expno);
181  }
182  if (!pause()) {
183  setState(RCState::NOTREADY_S);
184  return true;
185  }
186  }
187  try {
188  if (cmd == RCCommand::ABORT) {
189  double t = Date().get();
190  if (t - tabort > 3) {
191  if (ismaster) {
192  log(LogFile::NOTICE, "Run abort by %s (exp=%05d, run=%06d)",
193  msg.getNodeName(), m_runno, m_expno);
194  }
195  m_runcontrol.setName(msg.getNodeName());
196  abort();
197  }
198  tabort = t;
199  } else if (cmd == RCCommand::RECOVER) {
200  std::string runtype;
201  get("runtype", runtype);
202  recover(m_obj, runtype);
203  }
204  } catch (const RCHandlerException& e) {
205  log(LogFile::FATAL, "Failed to recover/abort : %s", e.what());
206  }
207  }
208  RCState State = cmd.nextState();
209  if (getNode().getState() == tstate &&
210  State != Enum::UNKNOWN && m_auto) {
211  setState(State);
212  }
213  State = getNode().getState();
214  if (State != Enum::UNKNOWN) {
215  if ((cmd == RCCommand::START &&
216  (State == RCState::RUNNING_S || State == RCState::STARTING_TS)) ||
217  ((cmd == RCCommand::STOP || cmd == RCCommand::ABORT)
218  && state_org == RCState::RUNNING_S)) {
219  try {
220  dump(cmd == RCCommand::START);
221  } catch (const DBHandlerException& e) {
222  LogFile::error(e.what());
223  }
224  }
225  }
226  } catch (const RCHandlerFatalException& e) {
227  log(LogFile::FATAL, e.what());
228  } catch (const RCHandlerException& e) {
229  log(LogFile::ERROR, e.what());
230  setState(RCState::ERROR_ES);
231  } catch (const std::exception& e) {
232  log(LogFile::FATAL, "Unknown exception: %s. terminating process (193)", e.what());
233  }
234  return true;
235 }
236 
237 void RCCallback::dump(bool isstart)
238 {
239  std::string runtype;
240  get("runtype", runtype);
241  std::string rcconfig = getNode().getName() + "@RC:"
242  + (isstart ? "start:" : "end:") + runtype
243  + StringUtil::form(":%05d:%04d", m_expno, m_runno);
244  std::stringstream ss;
245  ss << dbdump() << std::endl;
246  ss << "config : " << rcconfig << std::endl;
247  LogFile::debug(ss.str());
248  ConfigFile file(ss);
249  DBObject obj = DBObjectLoader::load(file);
250  obj.print();
251  std::string table = m_table + "record";
252  if (getDB()) {
253  DBInterface& db(*getDB());
254  try {
255  DBObjectLoader::createDB(db, table, obj);
256  } catch (const IOException& e) {
257  throw (DBHandlerException("Failed to connect to database error : %s ", e.what()));
258  }
259  } else if (m_provider_host.size() > 0 && m_provider_port > 0) {
260  TCPSocket socket(m_provider_host, m_provider_port);
261  try {
262  socket.connect();
263  TCPSocketWriter writer(socket);
264  writer.writeInt(2);
265  writer.writeString(table);
266  writer.writeObject(obj);
267  } catch (const IOException& e) {
268  socket.close();
269  throw (DBHandlerException("Failed to connect to dbprovider error : %s ", e.what()));
270  }
271  socket.close();
272  }
273 }
274 
275 void RCCallback::timeout(NSMCommunicator& /*com*/)
276 {
277  try {
278  monitor();
279  } catch (const RCHandlerFatalException& e) {
280  LogFile::fatal(e.what());
281  setState(RCState::ERROR_ES);
282  reply(NSMMessage(NSMCommand::FATAL, e.what()));
283  } catch (const RCHandlerException& e) {
284  LogFile::error(e.what());
285  setState(RCState::ERROR_ES);
286  reply(NSMMessage(NSMCommand::ERROR, e.what()));
287  } catch (const std::exception& e) {
288  LogFile::fatal("Unknown exception: %s. terminating process (249)", e.what());
289  }
290 }
291 
292 std::string RCCallback::dbdump()
293 {
294  std::stringstream ss;
295  StringList& hnames(getHandlerNames());
296  const NSMVHandlerList& handlers(getHandlers());
297  for (StringList::iterator it = hnames.begin();
298  it != hnames.end(); ++it) {
299  std::string hname = *it;
300  NSMVHandler& handler(*handlers.at(hname));
301  std::string vname = StringUtil::replace(hname, "@", "");
302  if (!handler.useGet()) {
303  continue;
304  }
305  if (!handler.isDumped()) {
306  continue;
307  }
308  if (vname.c_str()[0] == '.') {
309  continue;
310  }
311  NSMVar var;
312  handler.handleGet(var);
313  switch (var.getType()) {
314  case NSMVar::INT:
315  ss << vname << " : int(" << var.getInt() << ")" << std::endl;
316  break;
317  case NSMVar::FLOAT:
318  ss << vname << " : float(" << var.getFloat() << ")" << std::endl;
319  break;
320  case NSMVar::TEXT:
321  ss << vname << " : \"" << var.getText() << "\"" << std::endl;
322  break;
323  default:
324  break;
325  }
326  }
327  ss << getDBObject().sprint(true) << std::endl;
328  ss << "nodename : " << std::endl;
329  return ss.str();
330 }
331 
332 void RCCallback::setState(const RCState& state)
333 {
334  RCState state_org = getNode().getState();
335  if (state_org != state) {
336  LogFile::debug("state transit : %s >> %s",
337  state_org.getLabel(), state.getLabel());
338  try {
339  getNode().setState(state);
340  set("rcstate", state.getLabel());
341  } catch (const std::exception& e) {
342  LogFile::error(e.what());
343  }
344  }
345 }
346 
347 DBObject RCCallback::dbload(const std::string& path)
348 {
349  DBObject obj;
350  std::string pathin, table, config;
351  LogFile::debug(path);
352  if (path.find("db://") != std::string::npos) {
353  pathin = StringUtil::replace(path, "db://", "");
354  StringList s = StringUtil::split(pathin, '/');
355  if (s.size() > 1) {
356  table = s[0];
357  config = s[1];
358  } else {
359  table = m_table;
360  config = s[0];
361  }
362  } else if (path.find("file://") != std::string::npos) {
363  pathin = StringUtil::replace(path, "file:/", "");
364  return obj;
365  }
366  if (table.size() > 0 && config.size() > 0) {
367  if (getDB()) {
368  DBInterface& db(*getDB());
369  try {
370  obj = DBObjectLoader::load(db, table, config, false);
371  if (obj.getName().size() > 0) {
372  config = obj.getName();
373  }
374  db.close();
375  } catch (const DBHandlerException& e) {
376  db.close();
377  throw (e);
378  }
379  } else if (m_provider_host.size() > 0 && m_provider_port > 0) {
380  TCPSocket socket(m_provider_host, m_provider_port);
381  try {
382  socket.connect();
383  TCPSocketWriter writer(socket);
384  writer.writeInt(1);
385  writer.writeString(table + "/" + config);
386  TCPSocketReader reader(socket);
387  obj.readObject(reader);
388  socket.close();
389  if (obj.getName().size() > 0) {
390  config = obj.getName();
391  }
392  } catch (const IOException& e) {
393  socket.close();
394  throw (IOException("Socket connection error : %s ", e.what()));
395  }
396  }
397  }
398  return obj;
399 }
400 
401 void RCCallback::configure_raw(int length, const char* data)
402 {
403  try {
404  dbload(length, data);
405  } catch (const IOException& e) {
406  throw (RCHandlerException(e.what()));
407  }
408  configure(m_obj);
409 }
410 
411 void RCCallback::dbload(int /*length*/, const char* /*data*/)
412 {
413  const NSMNode& node(getNode());
414  m_rcconfig = node.getName() + "@RC:" + m_rcconfig_org;
415  LogFile::debug("Loading '%s'", m_rcconfig.c_str());
416  if (m_file.size() > 0) {
417  StringList files = StringUtil::split(m_file, ',');
418  ConfigFile conf;
419  for (size_t i = 0; i < files.size(); i++) {
420  conf.read(files[i]);
421  }
422  m_obj = DBObjectLoader::load(conf);
423  m_obj.print(m_showall);
424  } else if (getDB()) {
425  DBInterface& db(*getDB());
426  std::string rcconfig = "";
427  DBObject obj1, obj2;
428  if (m_runtype_record.size()) {
429  rcconfig = node.getName() + "@RC:start:" + m_runtype_record + ":";
430  obj1 = DBObjectLoader::load(db, m_table + "record", rcconfig, m_showall);
431  }
432  obj2 = DBObjectLoader::load(db, m_table, m_rcconfig, m_showall);
433  if (obj1.getDate() < obj2.getDate()) {
434  m_obj = obj2;
435  } else {
436  m_obj = obj1;
437  m_rcconfig = rcconfig;
438  }
439  db.close();
440  m_obj.print(m_showall);
441  } else if (m_provider_host.size() > 0 && m_provider_port > 0) {
442  TCPSocket socket(m_provider_host, m_provider_port);
443  try {
444  DBObject obj1, obj2;
445  std::string rcconfig = "";
446  if (m_runtype_record.size() > 0) {
447  rcconfig = node.getName() + "@RC:start:" + m_runtype_record + ":";
448  socket.connect();
449  TCPSocketWriter writer(socket);
450  writer.writeInt(1);
451  writer.writeString(m_table + "record/" + rcconfig);
452  TCPSocketReader reader(socket);
453  obj1.readObject(reader);
454  socket.close();
455  }
456  socket.connect();
457  TCPSocketWriter writer(socket);
458  writer.writeInt(1);
459  writer.writeString(m_table + "/" + m_rcconfig);
460  TCPSocketReader reader(socket);
461  obj2.readObject(reader);
462  if (obj1.getDate() < obj2.getDate()) {
463  m_obj = obj2;
464  } else {
465  m_obj = obj1;
466  m_rcconfig = rcconfig;
467  }
468  m_obj.print(m_showall);
469  } catch (const IOException& e) {
470  socket.close();
471  throw (IOException("Socket connection error : %s ", e.what()));
472  }
473  socket.close();
474  }
475  reset();
476  add(new NSMVHandlerText("runtype", true, true, ""), false, true);
477  add(new NSMVHandlerText("dbtable", true, false, m_table), false, true);
478  add(new NSMVHandlerText("rcstate", true, false, RCState::NOTREADY_S.getLabel()), false, true);
479  add(new NSMVHandlerText("rcrequest", true, false, ""), false, true);
480  add(new RCConfigHandler(*this, "rcconfig", m_obj.getName()));
481  addDB(m_obj);
482 }
483 
Abstract base class for different kinds of events.