Belle II Software development
ERecoMasterCallback.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include "daq/expreco/ERecoMasterCallback.h"
9
10#include "daq/expreco/ERecoRunControlCallback.h"
11
12#include "daq/rfarm/manager/RFCommand.h"
13#include "daq/rfarm/manager/RfNodeInfo.h"
14#include "daq/rfarm/manager/RfUnitInfo.h"
15
16#include <daq/slc/nsm/NSMCommunicator.h>
17#include <daq/slc/runcontrol/RCCommand.h>
18#include <daq/slc/runcontrol/RCHandlerException.h>
19#include <daq/slc/system/Time.h>
20
21#include <daq/slc/base/StringUtil.h>
22#include <daq/slc/base/TimeoutException.h>
23
24using namespace Belle2;
25
26ERecoMasterCallback::ERecoMasterCallback(ConfigFile& config) : m_callback(NULL)
27{
28 reg(RFCommand::CONFIGURE);
29 reg(RFCommand::UNCONFIGURE);
30 reg(RFCommand::START);
31 reg(RFCommand::STOP);
32 reg(RFCommand::RESUME);
33 reg(RFCommand::PAUSE);
34 reg(RFCommand::RESTART);
35 reg(RFCommand::STATUS);
36
37 m_rcnode = NSMNode(config.get("ereco.rcnode"));
38 LogFile::debug("ERecoMaster started: Bridge Node = %s", m_rcnode.getName().c_str());
39}
40
41void ERecoMasterCallback::initialize(const DBObject& obj)
42{
43 configure(obj);
44}
45
46void ERecoMasterCallback::configure(const DBObject& obj)
47{
48 m_nodes = NSMNodeList();
49 m_dataname = StringList();
50 const std::string format = obj("system").getText("nsmdata");
51 // 0. Register DqmServer
52 if (obj.hasObject("dqmserver")) {
53 m_nodes.push_back(NSMNode(obj("dqmserver").getText("nodename")));
54 }
55 // 1. Register distributor
56 std::string nodename = obj("distributor").getText("nodename");
57 m_nodes.push_back(NSMNode(nodename));
58 addData(nodename, format);
59 // 2. Register event processors
60 const DBObject& processor(obj("processor"));
61 int maxnodes = processor.getInt("nnodes");
62 int idbase = processor.getInt("idbase");
63 std::string hostbase = processor.getText("nodebase");
64 std::string badlist = processor.getText("badlist");
65 char idname[3];
66 for (int i = 0; i < maxnodes; i++) {
67 sprintf(idname, "%2.2d", idbase + i);
68 if (badlist.size() == 0 || StringUtil::find(badlist, idname)) {
69 nodename = StringUtil::form("evp_%s%2.2d", hostbase.c_str(), idbase + i);
70 m_nodes.push_back(NSMNode(nodename));
71 addData(nodename, format);
72 }
73 }
74 // 3. Register Event Sampler
75 nodename = obj("eventsampler").getText("nodename");
76 m_nodes.push_back(NSMNode(nodename));
77
78 add(new NSMVHandlerInt("nnodes", true, false, m_nodes.size()));
79 int i = 0;
80 std::string rcconf = obj.getName();
81 if (StringUtil::find(rcconf, "@")) {
82 rcconf = StringUtil::split(rcconf, '@')[1];
83 }
84 for (NSMNodeList::iterator it = m_nodes.begin();
85 it != m_nodes.end(); ++it) {
86 NSMNode& node(*it);
87 RCState s(node.getState());
88 std::string vname = StringUtil::form("node[%d]", i++);
89 add(new NSMVHandlerText(vname + ".rcstate", true, false, s.getLabel()));
90 add(new NSMVHandlerText(vname + ".rcconfig", true, false, rcconf));
91 add(new NSMVHandlerText(vname + ".name", true, false,
92 StringUtil::tolower(node.getName())));
93 vname = StringUtil::tolower(node.getName());
94 add(new NSMVHandlerText(vname + ".rcstate", true, false, s.getLabel()));
95 add(new NSMVHandlerText(vname + ".rcconfig", true, false, rcconf));
96 }
97}
98
99void ERecoMasterCallback::setState(NSMNode& node, const RCState& state)
100{
101 node.setState(state);
102 int i = 0;
103 for (NSMNodeList::iterator it = m_nodes.begin();
104 it != m_nodes.end(); ++it) {
105 NSMNode& rnode(*it);
106 if (StringUtil::toupper(rnode.getName()) == StringUtil::toupper(node.getName())) {
107 std::string vname = StringUtil::form("node[%d].rcstate", i);
108 set(vname, state.getLabel());
109 break;
110 }
111 i++;
112 }
113 std::string vname = StringUtil::tolower(node.getName()) + ".rcstate";
114 set(vname, state.getLabel());
115}
116
117void ERecoMasterCallback::monitor()
118{
119 if (!m_callback->getData().isAvailable()) return;
120 RfUnitInfo* unitinfo = (RfUnitInfo*)m_callback->getData().get();
121 unitinfo->nnodes = m_dataname.size();
122 unitinfo->updatetime = Time().get();
123 unitinfo->rcstate = getNode().getState().getId();
124 int i = 0;
125 for (StringList::iterator it = m_dataname.begin();
126 it != m_dataname.end(); ++it) {
127 NSMData& data(getData(*it));
128 try {
129 if (data.isAvailable()) {
130 RfUnitInfo::RfNodeInfo* nodeinfo_o = &unitinfo->nodeinfo[i];
131 RfNodeInfo* nodeinfo_i = (RfNodeInfo*)data.get();
132 memcpy(nodeinfo_o, nodeinfo_i, sizeof(RfNodeInfo));
133 }
134 } catch (const NSMHandlerException& e) {}
135 i++;
136 }
137 static unsigned long long count = 0;
138 for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
139 NSMNode& node(*it);
140 if (count % 60 == 59 || node.getState() == RCState::UNKNOWN) {
141 if (NSMCommunicator::send(NSMMessage(node, RFCommand::STATUS))) {
142 } else {
143 setState(node, RCState::UNKNOWN);
144 }
145 }
146 }
147 count++;
148}
149
150void ERecoMasterCallback::ok(const char* nodename, const char* data)
151{
152 for (NSMNodeList::iterator it = m_nodes.begin();
153 it != m_nodes.end(); ++it) {
154 NSMNode& node(*it);
155 if (node.getName() == nodename) {
156 if (strcmp(data, "Configured") == 0) {
157 setState(node, RCState::READY_S);
158 } else if (strcmp(data, "Unconfigured") == 0) {
159 setState(node, RCState::NOTREADY_S);
160 } else if (strcmp(data, "Stopped") == 0) {
161 setState(node, RCState::READY_S);
162 } else if (strcmp(data, "Running") == 0) {
163 setState(node, RCState::RUNNING_S);
164 } else if (strcmp(data, "Idle") == 0) {
165 setState(node, RCState::PAUSED_S);
166 }
167 LogFile::debug("OK << %s (%s)", nodename, data);
168 return;
169 }
170 }
171 LogFile::warning("OK from unknown node %s (%s)", nodename, data);
172}
173
174void ERecoMasterCallback::error(const char* nodename, const char* data)
175{
176 for (NSMNodeList::iterator it = m_nodes.begin();
177 it != m_nodes.end(); ++it) {
178 NSMNode& node(*it);
179 if (node.getName() == nodename) {
180 LogFile::error("ERROR << %s (%s)", nodename, data);
181 setState(node, RCState::NOTREADY_S);
182 return;
183 }
184 }
185 LogFile::warning("Error from unknwon node %s : %s)", nodename, data);
186}
187
188void ERecoMasterCallback::load(const DBObject& /*db*/, const std::string& /*runtype*/)
189{
190 for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
191 NSMNode& node(*it);
192 printf("Loading : %s\n", node.getName().c_str());
193 if (node.getName().find("EVP") == std::string::npos) {
194 while (true) {
195 bool configured = true;
196 for (NSMNodeList::iterator it2 = m_nodes.begin();
197 it2 != it; ++it2) {
198 if (it2->getState() != RCState::READY_S)
199 configured = false;
200 }
201 if (configured) break;
202 try {
203 perform(NSMCommunicator::select(30));
204 } catch (const TimeoutException& e) {}
205 }
206 }
207 if (node.getState() != RCState::READY_S) {
208 printf("ERecoMasterCallback::load : loading %s\n", (node.getName()).c_str());
209 if (NSMCommunicator::send(NSMMessage(node, RCCommand::LOAD))) {
210 setState(node, RCState::LOADING_TS);
211 LogFile::debug("%s >> LOADING", node.getName().c_str());
212 } else {
213 throw (RCHandlerException("Failed to configure %s", node.getName().c_str()));
214 }
215 } else {
216 LogFile::debug("%s is READY", node.getName().c_str());
217 }
218 }
219 while (true) {
220 bool configured = true;
221 for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
222 if (it->getState() != RCState::READY_S) configured = false;
223 }
224 if (configured) break;
225 try {
226 perform(NSMCommunicator::select(30));
227 } catch (const TimeoutException& e) {}
228 }
229 if (m_callback != NULL)
230 m_callback->setState(RCState::READY_S);
231 RCCallback::setState(RCState::READY_S);
232}
233
234void ERecoMasterCallback::abort()
235{
236 for (NSMNodeList::reverse_iterator it = m_nodes.rbegin();
237 it != m_nodes.rend(); ++it) {
238 NSMNode& node(*it);
239 if (node.getName().find("EVP") == std::string::npos) {
240 while (true) {
241 bool unconfigured = true;
242 for (NSMNodeList::reverse_iterator it2 = m_nodes.rbegin();
243 it2 != it; ++it2) {
244 if (it2->getState() != RCState::NOTREADY_S)
245 unconfigured = false;
246 }
247 if (unconfigured) break;
248 try {
249 perform(NSMCommunicator::select(30));
250 } catch (const TimeoutException& e) {}
251 }
252 }
253 if (node.getState() != RCState::NOTREADY_S) {
254 if (NSMCommunicator::send(NSMMessage(node, RCCommand::ABORT))) {
255 setState(node, RCState::ABORTING_RS);
256 LogFile::debug("%s >> ABORTING", node.getName().c_str());
257 } else {
258 throw (RCHandlerException("Failed to unconfigure %s", node.getName().c_str()));
259 }
260 } else {
261 LogFile::debug("%s is NOTREADY", node.getName().c_str());
262 }
263 }
264 while (true) {
265 bool unconfigured = true;
266 for (NSMNodeList::reverse_iterator it = m_nodes.rbegin(); it != m_nodes.rend(); ++it) {
267 if (it->getState() != RCState::NOTREADY_S) unconfigured = false;
268 }
269 if (unconfigured) break;
270 try {
271 perform(NSMCommunicator::select(30));
272 } catch (const TimeoutException& e) {}
273 }
274 if (m_callback != NULL)
275 m_callback->setState(RCState::NOTREADY_S);
276 RCCallback::setState(RCState::NOTREADY_S);
277}
278
279void ERecoMasterCallback::start(int expno, int runno)
280{
281 int pars[] = {expno, runno};
282 std::string script;
283 if (getNode().getName().find("_") != std::string::npos) {
284 get("hlt.script", script);
285 // script = "run_processor.py";
286 } else {
287 LogFile::debug("asking database : node = %s", m_rcnode.getName().c_str());
288 get(m_rcnode, "RUNCONTROL@hlt.script", script);
289 // get(NSMNode("BERECO1"), "RUNCONTROL@hlt.script", script);
290 // get(m_rcnode, "hlt.script", script, 30);
291 script = "run_" + script + ".py";
292 printf("ERECO script = %s\n", script.c_str());
293 }
294 for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
295 NSMNode& node(*it);
296 if (node.getName().find("EVP") == std::string::npos) {
297 while (true) {
298 bool configured = true;
299 for (NSMNodeList::iterator it2 = m_nodes.begin();
300 it2 != it; ++it2) {
301 if (it2->getState() != RCState::RUNNING_S)
302 configured = false;
303 }
304 if (configured) break;
305 try {
306 perform(NSMCommunicator::select(30));
307 } catch (const TimeoutException& e) {}
308 }
309 }
310 if (node.getState() != RCState::RUNNING_S) {
311 if (NSMCommunicator::send(NSMMessage(node, RCCommand::START, 2, pars, script))) {
312 setState(node, RCState::STARTING_TS);
313 LogFile::debug("%s >> STARTING (script=%s)", node.getName().c_str(), script.c_str());
314 } else {
315 throw (RCHandlerException("Failed to configure %s", node.getName().c_str()));
316 }
317 } else {
318 LogFile::debug("%s is RUNNING", node.getName().c_str());
319 }
320 }
321 while (true) {
322 bool configured = true;
323 for (NSMNodeList::iterator it = m_nodes.begin(); it != m_nodes.end(); ++it) {
324 if (it->getState() != RCState::RUNNING_S) configured = false;
325 }
326 if (configured) break;
327 try {
328 perform(NSMCommunicator::select(30));
329 } catch (const TimeoutException& e) {}
330 }
331 char cmdline[] = "hsendcommand DQMRC:CLEAR erctl 9991";
332 system(cmdline);
333 printf("ERecoMaster : DQM TMemFile cleared\n");
334
335 if (m_callback != NULL)
336 m_callback->setState(RCState::RUNNING_S);
337 RCCallback::setState(RCState::RUNNING_S);
338}
339
340void ERecoMasterCallback::stop()
341{
342 for (NSMNodeList::reverse_iterator it = m_nodes.rbegin();
343 it != m_nodes.rend(); ++it) {
344 NSMNode& node(*it);
345 if (node.getName().find("EVP") == std::string::npos) {
346 while (true) {
347 bool unconfigured = true;
348 for (NSMNodeList::reverse_iterator it2 = m_nodes.rbegin();
349 it2 != it; ++it2) {
350 if (it2->getState() != RCState::READY_S)
351 unconfigured = false;
352 }
353 if (unconfigured) break;
354 try {
355 perform(NSMCommunicator::select(30));
356 } catch (const TimeoutException& e) {}
357 }
358 }
359 if (node.getState() != RCState::READY_S) {
360 if (NSMCommunicator::send(NSMMessage(node, RCCommand::STOP))) {
361 setState(node, RCState::STOPPING_TS);
362 LogFile::debug("%s >> STOPPING", node.getName().c_str());
363 } else {
364 throw (RCHandlerException("Failed to stop %s", node.getName().c_str()));
365 }
366 } else {
367 LogFile::debug("%s is READY", node.getName().c_str());
368 }
369 }
370 while (true) {
371 bool unconfigured = true;
372 for (NSMNodeList::reverse_iterator it = m_nodes.rbegin(); it != m_nodes.rend(); ++it) {
373 if (it->getState() != RCState::READY_S) unconfigured = false;
374 }
375 if (unconfigured) break;
376 try {
377 perform(NSMCommunicator::select(30));
378 } catch (const TimeoutException& e) {}
379 }
380 if (m_callback != NULL)
381 m_callback->setState(RCState::READY_S);
382 RCCallback::setState(RCState::READY_S);
383}
384
385bool ERecoMasterCallback::pause()
386{
387 return true;
388}
389
390bool ERecoMasterCallback::resume(int /*subno*/)
391{
392 return true;
393}
394
395void ERecoMasterCallback::recover(const DBObject& obj, const std::string& runtype)
396{
397 abort();
398 load(obj, runtype);
399}
400
401void ERecoMasterCallback::addData(const std::string& dataname, const std::string& format)
402{
403 openData(dataname, format);
404 int i = (int)m_dataname.size();
405 std::string vname = StringUtil::form("data[%d].name", i);
406 add(new NSMVHandlerText(vname, true, false, StringUtil::tolower(dataname)));
407 vname = StringUtil::form("data[%d].format", i);
408 add(new NSMVHandlerText(vname, true, false, format));
409 m_dataname.push_back(dataname);
410}
411
412bool ERecoMasterCallback::perform(NSMCommunicator& com)
413{
414 const NSMMessage msg = com.getMessage();
415 if (RCCallback::perform(com)) {
416 return true;
417 }
418 const RFCommand cmd = msg.getRequestName();
419 try {
420 if (cmd == RFCommand::CONFIGURE) {
421 load(getDBObject(), "");
422 return true;
423 } else if (cmd == RFCommand::UNCONFIGURE) {
424 abort();
425 return true;
426 } else if (cmd == RFCommand::START) {
427 return true;
428 } else if (cmd == RFCommand::STOP) {
429 return true;
430 } else if (cmd == RFCommand::RESTART) {
431 return true;
432 } else if (cmd == RFCommand::PAUSE) {
433 return true;
434 } else if (cmd == RFCommand::RESUME) {
435 return true;
436 } else if (cmd == RFCommand::STATUS) {
437 return true;
438 }
439 } catch (const RCHandlerException& e) {
440 std::string emsg = StringUtil::form("%s. aborting", e.what());
441 LogFile::error(emsg);
442 reply(NSMMessage(NSMCommand::ERROR, emsg));
443 try {
444 RCCallback::setState(RCState::ABORTING_RS);
445 abort();
446 RCCallback::setState(RCState::NOTREADY_S);
447 } catch (const RCHandlerException& ex) {
448 std::string exmsg = StringUtil::form("Failed to abort : %s", ex.what());
449 LogFile::fatal(exmsg);
450 }
451 } catch (const std::exception& e) {
452 LogFile::fatal("Unknown exception: %s. terminating process", e.what());
453 }
454 return false;
455}
TString getName(const TObject *obj)
human-readable name (e.g.
Definition: ObjectInfo.cc:45
Abstract base class for different kinds of events.