Belle II Software  release-05-02-19
ProcHandler.cc
Go to the documentation of this file.
1 
6 #include <framework/pcore/ProcHandler.h>
7 #include <framework/core/InputController.h>
8 #include <framework/logging/Logger.h>
9 #include <framework/core/EventProcessor.h>
10 #include <framework/pcore/GlobalProcHandler.h>
11 
12 #include <vector>
13 
14 #include <sys/wait.h>
15 #include <sys/prctl.h>
16 #include <cstdio>
17 #include <cstdlib>
18 #include <cerrno>
19 #include <cstring>
20 #include <unistd.h>
21 #include <Python.h>
22 
23 using namespace std;
24 using namespace Belle2;
25 
26 namespace {
27  static int s_processID = -1;
28  static int s_numEventProcesses = 0;
29  static int s_localChildrenWithErrors = 0;
30 
31  // global list of PIDs managed by ProcHandler.
32  // (directly modifying STL structures in the signal handler is unsafe, so let's be overly
33  // cautious and use only C-like functions there.)
34  // PIDs are addedusing addPID() while forking, items are set to 0 when process stops
35  static std::vector<int> s_pidVector;
36  static int* s_pids = nullptr;
37  static int s_numpids = 0;
38  void addPID(int pid)
39  {
40  //if possible, insert pid into gap in list
41  bool found_gap = false;
42  for (int i = 0; i < s_numpids; i++) {
43  if (s_pids[i] == 0) {
44  found_gap = true;
45  s_pids[i] = pid;
46  break;
47  }
48  }
49 
50  if (!found_gap) {
51  if (s_pidVector.size() == s_pidVector.capacity()) {
52  B2FATAL("PID vector at capacity. This produces a race condition, make sure ProcHandler is created early.");
53  }
54  s_pidVector.push_back(pid);
55  }
56  s_pids = s_pidVector.data();
57  s_numpids = s_pidVector.size();
58  }
59 
60  //in signal handler, use only the following functions!
62  int findPID(int pid)
63  {
64  for (int i = 0; i < s_numpids; i++)
65  if (std::abs(s_pids[i]) == std::abs(pid))
66  return s_pids[i];
67  return 0;
68  }
69  void removePID(int pid)
70  {
71  for (int i = 0; i < s_numpids; i++)
72  if (std::abs(s_pids[i]) == std::abs(pid))
73  s_pids[i] = 0;
74  }
75  void clearPIDs()
76  {
77  for (int i = 0; i < s_numpids; i++)
78  s_pids[i] = 0;
79  }
80  bool pidListEmpty()
81  {
82  for (int i = 0; i < s_numpids; i++)
83  if (s_pids[i] != 0)
84  return false;
85  return true;
86  }
87 
88  void sigChldHandler(int)
89  {
90  //EventProcessor::writeToStdErr("\n sigchild handler called .\n");
91  int raiseSig = 0;
92  while (!pidListEmpty()) {
93  int status;
94  int pid = waitpid(-1, &status, WNOHANG);
95  if (pid == -1) {
96  if (errno == EINTR) {
97  continue; //interrupted, try again
98  } else if (errno == ECHILD) {
99  //We don't have any child processes?
100  EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
101  //
102  //actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
103  //In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
104  clearPIDs();
105  return;
106  } else {
107  //also shouldn't happen
108  EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
109  }
110  } else if (pid == 0) {
111  //further children exist, but no state change yet
112  break;
113  } else { //state change
114  //get signed PID
115  pid = findPID(pid);
116  if (pid == 0)
117  continue; //unknown child process died, ignore
118 
119  int termSig = 0;
120  //errors?
121  if (WIFSIGNALED(status)) {
122  //ok, it died because of some signal
123  //EventProcessor::writeToStdErr("\nOne of our child processes died, stopping execution...\n");
124  termSig = WTERMSIG(status);
125 
126  //backtrace in parent is not helpful
127  if (termSig == SIGSEGV)
128  termSig = SIGTERM;
129  } else if (WIFEXITED(status) and WEXITSTATUS(status) != 0) {
130  EventProcessor::writeToStdErr("\nExecution stopped, sub-process exited with non-zero exit status. Please check other log messages for details.\n");
131  termSig = SIGTERM;
132  }
133 
134  if (termSig != 0) {
135  if (pid < 0)
136  s_localChildrenWithErrors++;
137  else
138  raiseSig = termSig;
139  }
140 
141  //remove pid from global list
142  removePID(pid);
143  }
144  }
145 
146  if (raiseSig)
147  raise(raiseSig);
148  }
149 }
150 
151 bool ProcHandler::startProc(std::set<int>* processList, const std::string& procType, int id)
152 {
153  EventProcessor::installSignalHandler(SIGCHLD, sigChldHandler);
154 
155  fflush(stdout);
156  fflush(stderr);
157  pid_t pid = fork();
158  if (pid > 0) { // Mother process
159  if (m_markChildrenAsLocal)
160  pid = -pid;
161  processList->insert(pid);
162  addPID(pid);
163  B2INFO("ProcHandler: " << procType << " process forked. pid = " << pid);
164  fflush(stdout);
165  } else if (pid < 0) {
166  B2FATAL("fork() failed: " << strerror(errno));
167  } else {
168  //do NOT handle SIGCHLD in forked processes!
169  EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
170 
171  s_processID = id;
172  //Reset some python state: signals, threads, gil in the child
173  PyOS_AfterFork();
174  //InputController becomes useless in child process
175  InputController::resetForChildProcess();
176  //die when parent dies
177  prctl(PR_SET_PDEATHSIG, SIGHUP);
178  return true;
179  }
180  return false;
181 }
182 
183 ProcHandler::ProcHandler(unsigned int nWorkerProc, bool markChildrenAsLocal):
184  m_markChildrenAsLocal(markChildrenAsLocal),
185  m_numWorkerProcesses(nWorkerProc)
186 {
187  if ((int)nWorkerProc > s_numEventProcesses)
188  s_numEventProcesses = nWorkerProc;
189 
190  if (!pidListEmpty())
191  B2FATAL("Constructing ProcHandler after forking is not allowed!");
192 
193  //s_pidVector size shouldn't be changed once processes are forked (race condition)
194  s_pidVector.reserve(s_pidVector.size() + nWorkerProc + 2);
195  s_pids = s_pidVector.data();
196  setsid();
197 
198 }
199 ProcHandler::~ProcHandler() = default;
200 
201 
203 {
204  startProc(&m_processList, "input", 10000);
205 }
206 
208 {
209  for (unsigned int i = 0; i < m_numWorkerProcesses; i++) {
210  if (startProc(&m_processList, "worker", i))
211  break; // in child process
212  }
213 }
214 
216 {
217  if (s_processID == -1)
218  s_processID = 20000;
219 }
220 
222 
223 bool ProcHandler::isInputProcess() { return (s_processID >= 10000 and s_processID < 20000) or GlobalProcHandler::isInputProcess(); }
224 
226 
227 bool ProcHandler::isOutputProcess() { return s_processID >= 20000 or GlobalProcHandler::isOutputProcess(); }
228 
230 {
231  return s_numEventProcesses;
232 }
233 
235 {
236  return std::set<int>(s_pidVector.begin(), s_pidVector.end());
237 }
238 std::set<int> ProcHandler::processList() const
239 {
240  return m_processList;
241 }
242 
243 int ProcHandler::EvtProcID() { return s_processID; }
244 
245 void ProcHandler::setProcessID(int processID) { s_processID = processID; }
246 
248 {
249  if (isWorkerProcess())
250  return "worker";
251  if (isInputProcess())
252  return "input";
253  if (isOutputProcess())
254  return "output";
255 
256  //shouldn't happen
257  return "???";
258 }
259 
260 
262 {
263  bool ok = true;
264  while (!m_processList.empty()) {
265  for (int pid : m_processList) {
266  //once a process is gone from the global list, remove them from our own, too.
267  if (findPID(pid) == 0) {
268  m_processList.erase(pid);
269  if (m_markChildrenAsLocal and pid < 0 and s_localChildrenWithErrors != 0) {
270  ok = false;
271  s_localChildrenWithErrors--;
272  }
273  break;
274  }
275  }
276 
277  usleep(100);
278  }
279  return ok;
280 }
Belle2::ProcHandler::isWorkerProcess
static bool isWorkerProcess()
Return true if the process is a worker process.
Definition: ProcHandler.cc:225
Belle2::GlobalProcHandler::parallelProcessingUsed
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
Definition: GlobalProcHandler.cc:172
Belle2::ProcHandler::m_numWorkerProcesses
unsigned int m_numWorkerProcesses
Number of worker processes controlled by this ProcHandler.
Definition: ProcHandler.h:87
Belle2::ProcHandler::waitForAllProcesses
bool waitForAllProcesses()
Wait until all forked processes handled by this ProcHandler terminate.
Definition: ProcHandler.cc:261
Belle2::ProcHandler::startInputProcess
void startInputProcess()
Fork and initialize an input process.
Definition: ProcHandler.cc:202
Belle2::ProcHandler::m_markChildrenAsLocal
bool m_markChildrenAsLocal
Anormal termination of child will not stop parent, waitForAllProcesses() returns status.
Definition: ProcHandler.h:85
Belle2::ProcHandler::isOutputProcess
static bool isOutputProcess()
Return true if the process is an output process.
Definition: ProcHandler.cc:227
Belle2::ProcHandler::m_processList
std::set< int > m_processList
PIDs of processes controlled by this ProcHandler.
Definition: ProcHandler.h:86
Belle2::ProcHandler::getProcessName
static std::string getProcessName()
Get a name for this process.
Definition: ProcHandler.cc:247
Belle2::ProcHandler::globalProcessList
static std::set< int > globalProcessList()
Return list of all PIDs (from all ProcHandler instances).
Definition: ProcHandler.cc:234
Belle2::GlobalProcHandler::isInputProcess
static bool isInputProcess()
Return true if the process is of type c_Input.
Definition: GlobalProcHandler.cc:284
Belle2::ProcHandler::startOutputProcess
void startOutputProcess()
There is no real output process, but marks current process as output.
Definition: ProcHandler.cc:215
Belle2::ProcHandler::~ProcHandler
~ProcHandler()
Destructor.
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ProcHandler::startWorkerProcesses
void startWorkerProcesses()
Fork and initialize worker processes.
Definition: ProcHandler.cc:207
Belle2::GlobalProcHandler::isWorkerProcess
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
Definition: GlobalProcHandler.cc:279
Belle2::ProcHandler::EvtProcID
static int EvtProcID()
Return ID of the current process.
Definition: ProcHandler.cc:243
Belle2::ProcHandler::processList
std::set< int > processList() const
Return list of PIDs managed by this ProcHandler instance.
Definition: ProcHandler.cc:238
Belle2::ProcHandler::setProcessID
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:245
Belle2::ProcHandler::isInputProcess
static bool isInputProcess()
Return true if the process is an input process.
Definition: ProcHandler.cc:223
Belle2::ProcHandler::startProc
bool startProc(std::set< int > *processList, const std::string &procType, int id)
Start a new process, adding its PID to processList, and setting s_processID = id.
Definition: ProcHandler.cc:151
Belle2::ProcHandler::parallelProcessingUsed
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
Definition: ProcHandler.cc:221
Belle2::GlobalProcHandler::isOutputProcess
static bool isOutputProcess()
Return true if the process is of type c_Output.
Definition: GlobalProcHandler.cc:274
Belle2::ProcHandler::numEventProcesses
static int numEventProcesses()
Return number of worker processes (configured value, not current)
Definition: ProcHandler.cc:229