Belle II Software  release-05-02-19
ProcessController.cc
1 #include "daq/slc/readout/ProcessController.h"
2 
3 #include "daq/slc/readout/ProcessListener.h"
4 #include "daq/slc/readout/LogListener.h"
5 #include "daq/slc/runcontrol/RCHandlerException.h"
6 
7 #include "daq/slc/system/Executor.h"
8 #include "daq/slc/system/PThread.h"
9 #include "daq/slc/system/LogFile.h"
10 
11 #include "daq/slc/base/StringUtil.h"
12 #include <daq/slc/system/LockGuard.h>
13 
14 #include <cstdio>
15 #include <unistd.h>
16 
17 
18 using namespace Belle2;
19 
20 bool ProcessController::init(const std::string& parname, int nodeid)
21 {
22  m_name = StringUtil::tolower(m_callback->getNode().getName());
23  m_parname = parname;
24  LogFile::open(m_name + "_" + m_parname);
25  if (!m_info.open(m_name + "_" + m_parname, nodeid, true)) {
26  return false;
27  }
28  m_callback->add(new NSMVHandlerInt(m_parname + ".pid", true, false, 0), false, false);
29  return true;
30 }
31 
32 void ProcessController::clear()
33 {
34  m_info.clear();
35 }
36 
37 bool ProcessController::waitReady(int timeout)
38 {
39  if (!m_info.waitReady(timeout)) {
40  return false;
41  }
42  return true;
43 }
44 
45 bool ProcessController::load(int timeout)
46 {
47  m_info.clear();
48  m_process.cancel();
49  m_process.kill(SIGABRT);
50  if (pipe(m_iopipe) < 0) {
51  perror("pipe");
52  return false;
53  }
54  m_process = Process(new ProcessSubmitter(this, m_iopipe));
55  m_th_log = PThread(new LogListener(this, m_iopipe));
56  m_th_process = PThread(new ProcessListener(this));
57  //close(iopipe[1]);
58  if (timeout > 0) {
59  if (!m_info.waitReady(timeout)) {
60  throw (RCHandlerException("Failed to boot " + m_parname));
61  return false;
62  }
63  }
64  m_callback->set(m_parname + ".pid", m_process.get_id());
65  return true;
66 }
67 
68 bool ProcessController::start(int expno, int runno)
69 {
70  GenericLockGuard<RunInfoBuffer> lockGuard(m_info);
71  m_info.setExpNumber(expno);
72  m_info.setRunNumber(runno);
73  m_info.setSubNumber(0);
74  /*
75  if (m_info.getState() != RunInfoBuffer::RUNNING) {
76  throw (RCHandlerException(m_parname + " is not running"));
77  }
78  */
79  return true;
80 }
81 
82 bool ProcessController::stop()
83 {
84  GenericLockGuard<RunInfoBuffer> lockGuard(m_info);
85  m_info.setExpNumber(0);
86  m_info.setRunNumber(0);
87  m_info.setSubNumber(0);
88  m_info.setInputCount(0);
89  m_info.setInputNBytes(0);
90  m_info.setOutputCount(0);
91  m_info.setOutputNBytes(0);
92  return true;
93 }
94 
95 bool ProcessController::pause()
96 {
97  GenericLockGuard<RunInfoBuffer> lockGuard(m_info);
98  if (m_info.isRunning()) {
99  m_info.setState(RunInfoBuffer::PAUSED);
100  } else {
101  LogFile::warning("Process is not running. Pause request was ignored.");
102  }
103  return true;
104 }
105 
106 bool ProcessController::resume()
107 {
108  GenericLockGuard<RunInfoBuffer> lockGuard(m_info);
109  if (m_info.isPaused()) {
110  m_info.setState(RunInfoBuffer::RUNNING);
111  } else {
112  LogFile::warning("Process is not paused. Resume request was ignored.");
113  }
114  return true;
115 }
116 
117 bool ProcessController::abort(unsigned int timeout)
118 {
119  m_info.clear();
120  m_process.kill(SIGINT);
121 
122  bool sigintWasSuccessful = false;
123  for (unsigned int timer = 0; timer < timeout; ++timer) {
124  if (not m_process.isAlive()) {
125  sigintWasSuccessful = true;
126  break;
127  } else {
128  usleep(1000000);
129  }
130  }
131 
132  if (sigintWasSuccessful) {
133  LogFile::debug("Process successfully killed with SIGINT.");
134  } else {
135  LogFile::warning("Process could not be killed with SIGINT, sending SIGABRT.");
136  m_process.kill(SIGABRT);
137  }
138 
139  if (m_callback != NULL)
140  m_callback->set(m_parname + ".pid", -1);
141  m_process.wait();
142  m_th_log.cancel();
143  m_th_process.cancel();
144  close(m_iopipe[1]);
145  close(m_iopipe[0]);
146  return true;
147 }
148 
149 void ProcessSubmitter::run()
150 {
151  //close(1);
152  dup2(m_iopipe[1], 1);
153  //close(2);
154  dup2(m_iopipe[1], 2);
155  close(m_iopipe[0]);
156  Executor executor;
157  if (m_con->getExecutable().size() == 0) {
158  m_con->setExecutable("basf2");
159  }
160  executor.setExecutable(m_con->getExecutable());
161  for (size_t i = 0; i < m_con->m_arg_v.size(); i++) {
162  executor.addArg(m_con->m_arg_v[i]);
163  }
164  executor.execute();
165 }
166 
167 void ProcessController::addArgument(const char* format, ...)
168 {
169  va_list ap;
170  static char ss[1024];
171  va_start(ap, format);
172  vsprintf(ss, format, ap);
173  va_end(ap);
174  m_arg_v.push_back(ss);
175 }
Belle2::ProcessSubmitter
Definition: ProcessController.h:101
Belle2::RCHandlerException
Definition: RCHandlerException.h:12
Belle2::NSMVHandlerInt
Definition: NSMVHandler.h:81
Belle2::LogListener
Definition: LogListener.h:12
Belle2::GenericLockGuard
Lock Guard for a Mutex instance.
Definition: LockGuard.h:40
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::PThread
Definition: PThread.h:16
Belle2::Process
Definition: Process.h:15
Belle2::ProcessListener
Definition: ProcessListener.h:12
Belle2::Executor
Definition: Executor.h:13