Belle II Software development
RCCallback.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include "daq/slc/runcontrol/RCCallback.h"
9
10#include <daq/slc/database/DBHandlerException.h>
11#include <daq/slc/database/DBInterface.h>
12#include <daq/slc/database/DBObjectLoader.h>
13
14#include <daq/slc/runcontrol/RCCommand.h>
15#include <daq/slc/runcontrol/RCHandlerException.h>
16#include <daq/slc/runcontrol/RCHandlerFatalException.h>
17
18#include <daq/slc/system/LogFile.h>
19#include <daq/slc/system/TCPSocket.h>
20#include <daq/slc/system/TCPSocketReader.h>
21#include <daq/slc/system/TCPSocketWriter.h>
22
23#include <daq/slc/nsm/NSMCommunicator.h>
24
25#include <sstream>
26
27#include <cstdlib>
28
29double tabort = 0;
30
31namespace Belle2 {
37 public:
39 const std::string& name, const std::string& val)
40 : NSMVHandlerText(name, true, true, val), m_callback(callback) {}
41 bool handleGetText(std::string& val) override
42 {
43 const DBObject& obj(m_callback.getDBObject());
44 val = obj.getName();
45 return true;
46 }
47 bool handleSetText(const std::string& val) override
48 {
49 RCState state(m_callback.getNode().getState());
50 RCState tstate(RCCommand::CONFIGURE.nextTState());
51 m_callback.setState(tstate);
52 try {
53 m_callback.abort();
54 m_callback.dbload(val.size(), val.c_str());
55 } catch (const IOException& e) {
56 throw (RCHandlerException(e.what()));
57 }
58 const DBObject& obj(m_callback.getDBObject());
59 obj.getName();
60 m_callback.configure(obj);
61 m_callback.setState(state);
62 return true;
63 }
64 private:
65 RCCallback& m_callback;
66 };
67
69}
70
71using namespace Belle2;
72
73RCCallback::RCCallback(int timeout)
74 : NSMCallback(timeout)
75{
76 reg(RCCommand::CONFIGURE);
77 reg(RCCommand::BOOT);
78 reg(RCCommand::LOAD);
79 reg(RCCommand::START);
80 reg(RCCommand::STOP);
81 reg(RCCommand::RECOVER);
82 reg(RCCommand::RESUME);
83 reg(RCCommand::PAUSE);
84 reg(RCCommand::ABORT);
85 reg(RCCommand::STATUS);
86 reg(NSMCommand::FATAL);
87 m_auto = true;
88 m_db = NULL;
89 m_showall = true;
90 m_expno = m_runno = 0;
91}
92
93void RCCallback::init(NSMCommunicator&)
94{
95 LogFile::debug("init");
96 dbload(0, 0);
97 try {
98 initialize(m_obj);
99 } catch (const RCHandlerException& e) {
100 LogFile::fatal("Failed to initialize. %s. terminating process (84)", e.what());
101 term();
102 exit(1);
103 }
104 LogFile::debug("init done");
105 setState(RCState::NOTREADY_S);
106 tabort = Date().get();
107}
108
109bool RCCallback::perform(NSMCommunicator& com)
110{
111 NSMMessage msg(com.getMessage());
112 const RCCommand cmd = msg.getRequestName();
113 const RCState state_org(getNode().getState());
114 RCState state(getNode().getState());
115 if (NSMCallback::perform(com)) return true;
116 if (cmd.isAvailable(state) == NSMCommand::DISABLED) {
117 return false;
118 }
119 addNode(NSMNode(msg.getNodeName()));
120 try {
121 set("rcrequest", msg.getRequestName());
122 } catch (const std::exception& e) {
123 LogFile::error(e.what());
124 }
125 RCState tstate(cmd.nextTState());
126 try {
127 if (tstate != Enum::UNKNOWN) {
128 log(LogFile::DEBUG, "RC request %s from %s", msg.getRequestName(), msg.getNodeName());
129 setState(tstate);
130 std::string nodename = getNode().getName();
131 bool ismaster = nodename == "RUNCONTROL" ||
132 (StringUtil::find(nodename, "RC_") && !StringUtil::find(nodename, "HLT"));
133 if (cmd == RCCommand::CONFIGURE) {
134 m_runcontrol.setName(msg.getNodeName());
135 configure_raw(msg.getLength(), msg.getData());
136 } else if (cmd == RCCommand::BOOT) {
137 m_runcontrol.setName(msg.getNodeName());
138 get(m_obj);
139 std::string opt = msg.getLength() > 0 ? msg.getData() : "";
140 boot(opt, m_obj);
141 } else if (cmd == RCCommand::LOAD) {
142 m_expno = 0;
143 m_runno = 0;
144 m_runcontrol.setName(msg.getNodeName());
145 std::string runtype = (msg.getLength() > 0 ? msg.getData() : "");
146 if (runtype.size() == 0) {
147 get("runtype", runtype);
148 } else {
149 set("runtype", runtype);
150 }
151 get(m_obj);
152 load(m_obj, runtype);
153 } else if (cmd == RCCommand::START) {
154 m_runcontrol.setName(msg.getNodeName());
155 m_expno = (msg.getNParams() > 0) ? msg.getParam(0) : 0;
156 m_runno = (msg.getNParams() > 1) ? msg.getParam(1) : 0;
157 if (ismaster) {
158 log(LogFile::NOTICE, "Run start by %s (exp=%05d, run=%06d)",
159 msg.getNodeName(), m_runno, m_expno);
160 }
161 start(m_expno, m_runno);
162 } else if (cmd == RCCommand::STOP) {
163 if (ismaster) {
164 log(LogFile::NOTICE, "Run stop by %s (exp=%05d, run=%06d)",
165 msg.getNodeName(), m_runno, m_expno);
166 }
167 stop();
168 } else if (cmd == RCCommand::RESUME) {
169 if (ismaster) {
170 log(LogFile::NOTICE, "Run resume by %s (exp=%05d, run=%06d)",
171 msg.getNodeName(), m_runno, m_expno);
172 }
173 if (!resume(msg.getParam(0))) {
174 setState(RCState::NOTREADY_S);
175 return true;
176 }
177 } else if (cmd == RCCommand::PAUSE) {
178 if (ismaster) {
179 log(LogFile::NOTICE, "Run pause by %s (exp=%05d, run=%06d)",
180 msg.getNodeName(), m_runno, m_expno);
181 }
182 if (!pause()) {
183 setState(RCState::NOTREADY_S);
184 return true;
185 }
186 }
187 try {
188 if (cmd == RCCommand::ABORT) {
189 double t = Date().get();
190 if (t - tabort > 3) {
191 if (ismaster) {
192 log(LogFile::NOTICE, "Run abort by %s (exp=%05d, run=%06d)",
193 msg.getNodeName(), m_runno, m_expno);
194 }
195 m_runcontrol.setName(msg.getNodeName());
196 abort();
197 }
198 tabort = t;
199 } else if (cmd == RCCommand::RECOVER) {
200 std::string runtype;
201 get("runtype", runtype);
202 recover(m_obj, runtype);
203 }
204 } catch (const RCHandlerException& e) {
205 log(LogFile::FATAL, "Failed to recover/abort : %s", e.what());
206 }
207 }
208 RCState State = cmd.nextState();
209 if (getNode().getState() == tstate &&
210 State != Enum::UNKNOWN && m_auto) {
211 setState(State);
212 }
213 State = getNode().getState();
214 if (State != Enum::UNKNOWN) {
215 if ((cmd == RCCommand::START &&
216 (State == RCState::RUNNING_S || State == RCState::STARTING_TS)) ||
217 ((cmd == RCCommand::STOP || cmd == RCCommand::ABORT)
218 && state_org == RCState::RUNNING_S)) {
219 try {
220 dump(cmd == RCCommand::START);
221 } catch (const DBHandlerException& e) {
222 LogFile::error(e.what());
223 }
224 }
225 }
226 } catch (const RCHandlerFatalException& e) {
227 log(LogFile::FATAL, e.what());
228 } catch (const RCHandlerException& e) {
229 log(LogFile::ERROR, e.what());
230 setState(RCState::ERROR_ES);
231 } catch (const std::exception& e) {
232 log(LogFile::FATAL, "Unknown exception: %s. terminating process (193)", e.what());
233 }
234 return true;
235}
236
237void RCCallback::dump(bool isstart)
238{
239 std::string runtype;
240 get("runtype", runtype);
241 std::string rcconfig = getNode().getName() + "@RC:"
242 + (isstart ? "start:" : "end:") + runtype
243 + StringUtil::form(":%05d:%04d", m_expno, m_runno);
244 std::stringstream ss;
245 ss << dbdump() << std::endl;
246 ss << "config : " << rcconfig << std::endl;
247 LogFile::debug(ss.str());
248 ConfigFile file(ss);
249 DBObject obj = DBObjectLoader::load(file);
250 obj.print();
251 std::string table = m_table + "record";
252 if (getDB()) {
253 try {
254 DBInterface& db(*getDB());
255 DBObjectLoader::createDB(db, table, obj);
256 } catch (const IOException& e) {
257 throw (DBHandlerException("Failed to connect to database error : %s ", e.what()));
258 }
259 } else if (m_provider_host.size() > 0 && m_provider_port > 0) {
260 TCPSocket socket(m_provider_host, m_provider_port);
261 try {
262 socket.connect();
263 TCPSocketWriter writer(socket);
264 writer.writeInt(2);
265 writer.writeString(table);
266 writer.writeObject(obj);
267 } catch (const IOException& e) {
268 socket.close();
269 throw (DBHandlerException("Failed to connect to dbprovider error : %s ", e.what()));
270 }
271 socket.close();
272 }
273}
274
275void RCCallback::timeout(NSMCommunicator& /*com*/)
276{
277 try {
278 monitor();
279 } catch (const RCHandlerFatalException& e) {
280 LogFile::fatal(e.what());
281 setState(RCState::ERROR_ES);
282 reply(NSMMessage(NSMCommand::FATAL, e.what()));
283 } catch (const RCHandlerException& e) {
284 LogFile::error(e.what());
285 setState(RCState::ERROR_ES);
286 reply(NSMMessage(NSMCommand::ERROR, e.what()));
287 } catch (const std::exception& e) {
288 LogFile::fatal("Unknown exception: %s. terminating process (249)", e.what());
289 }
290}
291
292std::string RCCallback::dbdump()
293{
294 std::stringstream ss;
295 StringList& hnames(getHandlerNames());
296 const NSMVHandlerList& handlers(getHandlers());
297 for (StringList::iterator it = hnames.begin();
298 it != hnames.end(); ++it) {
299 std::string hname = *it;
300 NSMVHandler& handler(*handlers.at(hname));
301 std::string vname = StringUtil::replace(hname, "@", "");
302 if (!handler.useGet()) {
303 continue;
304 }
305 if (!handler.isDumped()) {
306 continue;
307 }
308 if (vname.c_str()[0] == '.') {
309 continue;
310 }
311 NSMVar var;
312 handler.handleGet(var);
313 switch (var.getType()) {
314 case NSMVar::INT:
315 ss << vname << " : int(" << var.getInt() << ")" << std::endl;
316 break;
317 case NSMVar::FLOAT:
318 ss << vname << " : float(" << var.getFloat() << ")" << std::endl;
319 break;
320 case NSMVar::TEXT:
321 ss << vname << " : \"" << var.getText() << "\"" << std::endl;
322 break;
323 default:
324 break;
325 }
326 }
327 ss << getDBObject().sprint(true) << std::endl;
328 ss << "nodename : " << std::endl;
329 return ss.str();
330}
331
332void RCCallback::setState(const RCState& state)
333{
334 RCState state_org = getNode().getState();
335 if (state_org != state) {
336 LogFile::debug("state transit : %s >> %s",
337 state_org.getLabel(), state.getLabel());
338 try {
339 getNode().setState(state);
340 set("rcstate", state.getLabel());
341 } catch (const std::exception& e) {
342 LogFile::error(e.what());
343 }
344 }
345}
346
347DBObject RCCallback::dbload(const std::string& path)
348{
349 DBObject obj;
350 std::string pathin, table, config;
351 LogFile::debug(path);
352 if (path.find("db://") != std::string::npos) {
353 pathin = StringUtil::replace(path, "db://", "");
354 StringList s = StringUtil::split(pathin, '/');
355 if (s.size() > 1) {
356 table = s[0];
357 config = s[1];
358 } else {
359 table = m_table;
360 config = s[0];
361 }
362 } else if (path.find("file://") != std::string::npos) {
363 pathin = StringUtil::replace(path, "file:/", "");
364 return obj;
365 }
366 if (table.size() > 0 && config.size() > 0) {
367 if (getDB()) {
368 DBInterface& db(*getDB());
369 try {
370 obj = DBObjectLoader::load(db, table, config, false);
371 if (obj.getName().size() > 0) {
372 config = obj.getName();
373 }
374 db.close();
375 } catch (const DBHandlerException& e) {
376 db.close();
377 throw (e);
378 }
379 } else if (m_provider_host.size() > 0 && m_provider_port > 0) {
380 TCPSocket socket(m_provider_host, m_provider_port);
381 try {
382 socket.connect();
383 TCPSocketWriter writer(socket);
384 writer.writeInt(1);
385 writer.writeString(table + "/" + config);
386 TCPSocketReader reader(socket);
387 obj.readObject(reader);
388 socket.close();
389 if (obj.getName().size() > 0) {
390 config = obj.getName();
391 }
392 } catch (const IOException& e) {
393 socket.close();
394 throw (IOException("Socket connection error : %s ", e.what()));
395 }
396 }
397 }
398 return obj;
399}
400
401void RCCallback::configure_raw(int length, const char* data)
402{
403 try {
404 dbload(length, data);
405 } catch (const IOException& e) {
406 throw (RCHandlerException(e.what()));
407 }
408 configure(m_obj);
409}
410
411void RCCallback::dbload(int /*length*/, const char* /*data*/)
412{
413 const NSMNode& node(getNode());
414 m_rcconfig = node.getName() + "@RC:" + m_rcconfig_org;
415 LogFile::debug("Loading '%s'", m_rcconfig.c_str());
416 if (m_file.size() > 0) {
417 StringList files = StringUtil::split(m_file, ',');
418 ConfigFile conf;
419 for (size_t i = 0; i < files.size(); i++) {
420 conf.read(files[i]);
421 }
422 m_obj = DBObjectLoader::load(conf);
423 m_obj.print(m_showall);
424 } else if (getDB()) {
425 DBInterface& db(*getDB());
426 std::string rcconfig = "";
427 DBObject obj1, obj2;
428 if (m_runtype_record.size()) {
429 rcconfig = node.getName() + "@RC:start:" + m_runtype_record + ":";
430 obj1 = DBObjectLoader::load(db, m_table + "record", rcconfig, m_showall);
431 }
432 obj2 = DBObjectLoader::load(db, m_table, m_rcconfig, m_showall);
433 if (obj1.getDate() < obj2.getDate()) {
434 m_obj = obj2;
435 } else {
436 m_obj = obj1;
437 m_rcconfig = rcconfig;
438 }
439 db.close();
440 m_obj.print(m_showall);
441 } else if (m_provider_host.size() > 0 && m_provider_port > 0) {
442 TCPSocket socket(m_provider_host, m_provider_port);
443 try {
444 DBObject obj1, obj2;
445 std::string rcconfig = "";
446 if (m_runtype_record.size() > 0) {
447 rcconfig = node.getName() + "@RC:start:" + m_runtype_record + ":";
448 socket.connect();
449 TCPSocketWriter writer(socket);
450 writer.writeInt(1);
451 writer.writeString(m_table + "record/" + rcconfig);
452 TCPSocketReader reader(socket);
453 obj1.readObject(reader);
454 socket.close();
455 }
456 socket.connect();
457 TCPSocketWriter writer(socket);
458 writer.writeInt(1);
459 writer.writeString(m_table + "/" + m_rcconfig);
460 TCPSocketReader reader(socket);
461 obj2.readObject(reader);
462 if (obj1.getDate() < obj2.getDate()) {
463 m_obj = obj2;
464 } else {
465 m_obj = obj1;
466 m_rcconfig = rcconfig;
467 }
468 m_obj.print(m_showall);
469 } catch (const IOException& e) {
470 socket.close();
471 throw (IOException("Socket connection error : %s ", e.what()));
472 }
473 socket.close();
474 }
475 reset();
476 add(new NSMVHandlerText("runtype", true, true, ""), false, true);
477 add(new NSMVHandlerText("dbtable", true, false, m_table), false, true);
478 add(new NSMVHandlerText("rcstate", true, false, RCState::NOTREADY_S.getLabel()), false, true);
479 add(new NSMVHandlerText("rcrequest", true, false, ""), false, true);
480 add(new RCConfigHandler(*this, "rcconfig", m_obj.getName()));
481 addDB(m_obj);
482}
483
Abstract base class for different kinds of events.