Belle II Software  release-08-01-10
ProcHandler.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/ProcHandler.h>
10 #include <framework/core/InputController.h>
11 #include <framework/logging/Logger.h>
12 #include <framework/core/EventProcessor.h>
13 #include <framework/pcore/GlobalProcHandler.h>
14 
15 #include <vector>
16 
17 #include <sys/wait.h>
18 #include <sys/prctl.h>
19 #include <cstdio>
20 #include <cstdlib>
21 #include <cerrno>
22 #include <cstring>
23 #include <unistd.h>
24 #include <Python.h>
25 
26 using namespace std;
27 using namespace Belle2;
28 
29 namespace {
30  static int s_processID = -1;
31  static int s_numEventProcesses = 0;
32  static int s_localChildrenWithErrors = 0;
33 
34  // global list of PIDs managed by ProcHandler.
35  // (directly modifying STL structures in the signal handler is unsafe, so let's be overly
36  // cautious and use only C-like functions there.)
37  // PIDs are addedusing addPID() while forking, items are set to 0 when process stops
38  static std::vector<int> s_pidVector;
39  static int* s_pids = nullptr;
40  static int s_numpids = 0;
41  void addPID(int pid)
42  {
43  //if possible, insert pid into gap in list
44  bool found_gap = false;
45  for (int i = 0; i < s_numpids; i++) {
46  if (s_pids[i] == 0) {
47  found_gap = true;
48  s_pids[i] = pid;
49  break;
50  }
51  }
52 
53  if (!found_gap) {
54  if (s_pidVector.size() == s_pidVector.capacity()) {
55  B2FATAL("PID vector at capacity. This produces a race condition, make sure ProcHandler is created early.");
56  }
57  s_pidVector.push_back(pid);
58  }
59  s_pids = s_pidVector.data();
60  s_numpids = s_pidVector.size();
61  }
62 
63  //in signal handler, use only the following functions!
65  int findPID(int pid)
66  {
67  for (int i = 0; i < s_numpids; i++)
68  if (std::abs(s_pids[i]) == std::abs(pid))
69  return s_pids[i];
70  return 0;
71  }
72  void removePID(int pid)
73  {
74  for (int i = 0; i < s_numpids; i++)
75  if (std::abs(s_pids[i]) == std::abs(pid))
76  s_pids[i] = 0;
77  }
78  void clearPIDs()
79  {
80  for (int i = 0; i < s_numpids; i++)
81  s_pids[i] = 0;
82  }
83  bool pidListEmpty()
84  {
85  for (int i = 0; i < s_numpids; i++)
86  if (s_pids[i] != 0)
87  return false;
88  return true;
89  }
90 
91  void sigChldHandler(int)
92  {
93  //EventProcessor::writeToStdErr("\n sigchild handler called .\n");
94  int raiseSig = 0;
95  while (!pidListEmpty()) {
96  int status;
97  int pid = waitpid(-1, &status, WNOHANG);
98  if (pid == -1) {
99  if (errno == EINTR) {
100  continue; //interrupted, try again
101  } else if (errno == ECHILD) {
102  //We don't have any child processes?
103  EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
104  //
105  //actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
106  //In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
107  clearPIDs();
108  return;
109  } else {
110  //also shouldn't happen
111  EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
112  }
113  } else if (pid == 0) {
114  //further children exist, but no state change yet
115  break;
116  } else { //state change
117  //get signed PID
118  pid = findPID(pid);
119  if (pid == 0)
120  continue; //unknown child process died, ignore
121 
122  int termSig = 0;
123  //errors?
124  if (WIFSIGNALED(status)) {
125  //ok, it died because of some signal
126  //EventProcessor::writeToStdErr("\nOne of our child processes died, stopping execution...\n");
127  termSig = WTERMSIG(status);
128 
129  //backtrace in parent is not helpful
130  if (termSig == SIGSEGV)
131  termSig = SIGTERM;
132  } else if (WIFEXITED(status) and WEXITSTATUS(status) != 0) {
133  EventProcessor::writeToStdErr("\nExecution stopped, sub-process exited with non-zero exit status. Please check other log messages for details.\n");
134  termSig = SIGTERM;
135  }
136 
137  if (termSig != 0) {
138  if (pid < 0)
139  s_localChildrenWithErrors++;
140  else
141  raiseSig = termSig;
142  }
143 
144  //remove pid from global list
145  removePID(pid);
146  }
147  }
148 
149  if (raiseSig)
150  raise(raiseSig);
151  }
152 }
153 
154 bool ProcHandler::startProc(std::set<int>* processList, const std::string& procType, int id)
155 {
156  EventProcessor::installSignalHandler(SIGCHLD, sigChldHandler);
157 
158  fflush(stdout);
159  fflush(stderr);
160  PyOS_BeforeFork();
161  pid_t pid = fork();
162  if (pid > 0) { // Mother process
163  PyOS_AfterFork_Parent();
164  if (m_markChildrenAsLocal)
165  pid = -pid;
166  processList->insert(pid);
167  addPID(pid);
168  B2INFO("ProcHandler: " << procType << " process forked. pid = " << pid);
169  fflush(stdout);
170  } else if (pid < 0) {
171  B2FATAL("fork() failed: " << strerror(errno));
172  } else {
173  //do NOT handle SIGCHLD in forked processes!
174  EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
175 
176  s_processID = id;
177  //Reset some python state: signals, threads, gil in the child
178  PyOS_AfterFork_Child();
179  //InputController becomes useless in child process
180  InputController::resetForChildProcess();
181  //die when parent dies
182  prctl(PR_SET_PDEATHSIG, SIGHUP);
183  return true;
184  }
185  return false;
186 }
187 
188 ProcHandler::ProcHandler(unsigned int nWorkerProc, bool markChildrenAsLocal):
189  m_markChildrenAsLocal(markChildrenAsLocal),
190  m_numWorkerProcesses(nWorkerProc)
191 {
192  if ((int)nWorkerProc > s_numEventProcesses)
193  s_numEventProcesses = nWorkerProc;
194 
195  if (!pidListEmpty())
196  B2FATAL("Constructing ProcHandler after forking is not allowed!");
197 
198  //s_pidVector size shouldn't be changed once processes are forked (race condition)
199  s_pidVector.reserve(s_pidVector.size() + nWorkerProc + 2);
200  s_pids = s_pidVector.data();
201  setsid();
202 
203 }
204 ProcHandler::~ProcHandler() = default;
205 
206 
208 {
209  startProc(&m_processList, "input", 10000);
210 }
211 
213 {
214  for (unsigned int i = 0; i < m_numWorkerProcesses; i++) {
215  if (startProc(&m_processList, "worker", i))
216  break; // in child process
217  }
218 }
219 
221 {
222  if (s_processID == -1)
223  s_processID = 20000;
224 }
225 
227 
228 bool ProcHandler::isInputProcess() { return (s_processID >= 10000 and s_processID < 20000) or GlobalProcHandler::isInputProcess(); }
229 
231 
232 bool ProcHandler::isOutputProcess() { return s_processID >= 20000 or GlobalProcHandler::isOutputProcess(); }
233 
235 {
236  return s_numEventProcesses;
237 }
238 
240 {
241  return std::set<int>(s_pidVector.begin(), s_pidVector.end());
242 }
243 std::set<int> ProcHandler::processList() const
244 {
245  return m_processList;
246 }
247 
248 int ProcHandler::EvtProcID() { return s_processID; }
249 
250 void ProcHandler::setProcessID(int processID) { s_processID = processID; }
251 
253 {
254  if (isWorkerProcess())
255  return "worker";
256  if (isInputProcess())
257  return "input";
258  if (isOutputProcess())
259  return "output";
260 
261  //shouldn't happen
262  return "???";
263 }
264 
265 
267 {
268  bool ok = true;
269  while (!m_processList.empty()) {
270  for (int pid : m_processList) {
271  //once a process is gone from the global list, remove them from our own, too.
272  if (findPID(pid) == 0) {
273  m_processList.erase(pid);
274  if (m_markChildrenAsLocal and pid < 0 and s_localChildrenWithErrors != 0) {
275  ok = false;
276  s_localChildrenWithErrors--;
277  }
278  break;
279  }
280  }
281 
282  usleep(100);
283  }
284  return ok;
285 }
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static bool isOutputProcess()
Return true if the process is of type c_Output.
static bool isInputProcess()
Return true if the process is of type c_Input.
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
void startOutputProcess()
There is no real output process, but marks current process as output.
Definition: ProcHandler.cc:220
static bool isWorkerProcess()
Return true if the process is a worker process.
Definition: ProcHandler.cc:230
static int EvtProcID()
Return ID of the current process.
Definition: ProcHandler.cc:248
std::set< int > m_processList
PIDs of processes controlled by this ProcHandler.
Definition: ProcHandler.h:89
static int numEventProcesses()
Return number of worker processes (configured value, not current)
Definition: ProcHandler.cc:234
static bool isOutputProcess()
Return true if the process is an output process.
Definition: ProcHandler.cc:232
static bool isInputProcess()
Return true if the process is an input process.
Definition: ProcHandler.cc:228
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250
unsigned int m_numWorkerProcesses
Number of worker processes controlled by this ProcHandler.
Definition: ProcHandler.h:90
static std::set< int > globalProcessList()
Return list of all PIDs (from all ProcHandler instances).
Definition: ProcHandler.cc:239
bool waitForAllProcesses()
Wait until all forked processes handled by this ProcHandler terminate.
Definition: ProcHandler.cc:266
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
Definition: ProcHandler.cc:226
bool m_markChildrenAsLocal
Anormal termination of child will not stop parent, waitForAllProcesses() returns status.
Definition: ProcHandler.h:88
bool startProc(std::set< int > *processList, const std::string &procType, int id)
Start a new process, adding its PID to processList, and setting s_processID = id.
Definition: ProcHandler.cc:154
~ProcHandler()
Destructor.
void startWorkerProcesses()
Fork and initialize worker processes.
Definition: ProcHandler.cc:212
std::set< int > processList() const
Return list of PIDs managed by this ProcHandler instance.
Definition: ProcHandler.cc:243
void startInputProcess()
Fork and initialize an input process.
Definition: ProcHandler.cc:207
static std::string getProcessName()
Get a name for this process.
Definition: ProcHandler.cc:252
Abstract base class for different kinds of events.