Belle II Software  release-06-02-00
GlobalProcHandler.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/GlobalProcHandler.h>
10 #include <framework/core/InputController.h>
11 #include <framework/logging/Logger.h>
12 #include <framework/core/EventProcessor.h>
13 
14 #include <sys/wait.h>
15 #include <sys/prctl.h>
16 #include <cstdio>
17 #include <cstdlib>
18 #include <cerrno>
19 #include <cstring>
20 #include <unistd.h>
21 #include <Python.h>
22 
23 #include <thread>
24 #include <chrono>
25 
26 #include <iostream>
27 
28 using namespace std;
29 using namespace Belle2;
30 
31 ProcType GlobalProcHandler::s_procType = ProcType::c_Init;
32 int GlobalProcHandler::s_processID = -1;
33 int GlobalProcHandler::s_numEventProcesses = 0;
34 
35 std::vector<int> GlobalProcHandler::s_pidVector;
36 std::map<int, ProcType> GlobalProcHandler::s_startedPIDs;
37 
38 void GlobalProcHandler::childSignalHandler(int)
39 {
40  while (!GlobalProcHandler::pidListEmpty()) {
41  int status;
42  int pid = waitpid(-1, &status, WNOHANG);
43  if (pid == -1) {
44  if (errno == EINTR) {
45  continue; // interrupted, try again
46  } else if (errno == ECHILD) {
47  // We don't have any child processes?
48  EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
49  // actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
50  // In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
51  GlobalProcHandler::clearPIDs();
52  return;
53  } else {
54  // also shouldn't happen
55  EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
56  }
57  } else if (pid == 0) {
58  // should not happen because of waitpid(-1,...)
59  // further children exist, but no state change yet
60  break;
61  } else {
62  // state change
63  // get signed PID
64  if (not GlobalProcHandler::findPID(pid)) {
65  // unknown child process died, ignore
66  continue;
67  }
68 
69  // errors?
70  if (WIFSIGNALED(status) or (WIFEXITED(status) and WEXITSTATUS(status) != 0)) {
71  EventProcessor::writeToStdErr("\nSub-process exited with non-zero exit status. Please check other log messages for details.\n");
72  }
73 
74  // remove pid from global list
75  GlobalProcHandler::removePID(pid);
76  }
77  }
78 }
79 
80 
81 void GlobalProcHandler::addPID(int newPID)
82 {
83  // if possible, insert pid into gap in list
84  for (int& pid : s_pidVector) {
85  if (pid == 0) {
86  pid = newPID;
87  return;
88  }
89  }
90 
91  B2FATAL("PID vector at capacity. This produces a race condition, make sure GlobalProcHandler is created early.");
92 }
93 
94 bool GlobalProcHandler::findPID(int pid)
95 {
96  return std::find(s_pidVector.begin(), s_pidVector.end(), pid) != s_pidVector.end();
97 }
98 
99 void GlobalProcHandler::removePID(int oldPID)
100 {
101  for (int& pid : s_pidVector) {
102  if (pid == oldPID) {
103  pid = 0;
104  return;
105  }
106  }
107 }
108 
109 void GlobalProcHandler::clearPIDs()
110 {
111  std::fill(s_pidVector.begin(), s_pidVector.end(), 0);
112 }
113 
114 bool GlobalProcHandler::pidListEmpty()
115 {
116  for (const int& pid : s_pidVector) {
117  if (pid != 0) {
118  return false;
119  }
120  }
121  return true;
122 }
123 
124 void GlobalProcHandler::initialize(unsigned int nWorkerProc)
125 {
126  B2ASSERT("Constructing GlobalProcHandler after forking is not allowed!", pidListEmpty());
127 
128  s_numEventProcesses = nWorkerProc;
129 
130  // s_pidVector size shouldn't be changed once processes are forked (race condition)
131  s_pidVector.resize(s_pidVector.size() + nWorkerProc + 3, 0); // num worker + input + output + proxy
132 }
133 
134 bool GlobalProcHandler::startProxyProcess()
135 {
136  return startProc(ProcType::c_Proxy, 30000);
137 }
138 
139 bool GlobalProcHandler::startInputProcess()
140 {
141  return startProc(ProcType::c_Input, 10000);
142 }
143 
144 bool GlobalProcHandler::startWorkerProcesses(unsigned int numProcesses)
145 {
146  for (unsigned int i = 0; i < numProcesses; i++) {
147  if (startProc(ProcType::c_Worker, 0)) {
148  return true;
149  }
150  }
151  return false;
152 }
153 
154 bool GlobalProcHandler::startOutputProcess(bool local)
155 {
156  if (local) {
157  s_procType = ProcType::c_Output;
158  return true;
159  } else {
160  return (startProc(ProcType::c_Output, 20000));
161  }
162 }
163 
164 bool GlobalProcHandler::startMonitoringProcess()
165 {
166  s_procType = ProcType::c_Monitor;
167  return true;
168 }
169 
170 bool GlobalProcHandler::parallelProcessingUsed()
171 {
172  return s_processID != -1;
173 }
174 
175 bool GlobalProcHandler::isProcess(ProcType procType)
176 {
177  return (procType == s_procType);
178 }
179 
180 bool GlobalProcHandler::startProc(ProcType procType, int id)
181 {
182  EventProcessor::installSignalHandler(SIGCHLD, childSignalHandler);
183 
184  fflush(stdout);
185  fflush(stderr);
186  pid_t pid = fork();
187  if (pid > 0) {
188  // Mother process
189  addPID(pid);
190  s_startedPIDs[pid] = procType;
191  fflush(stdout);
192  } else if (pid < 0) {
193  B2FATAL("fork() failed: " << strerror(errno));
194  } else {
195  // Child process
196  // do NOT handle SIGCHLD in forked processes!
197  EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
198 
199  s_procType = procType;
200 
201  if (id == 0)
202  s_processID = getpid();
203  else
204  s_processID = id;
205 
206  // Reset some python state: signals, threads, gil in the child
207  PyOS_AfterFork_Child();
208  // InputController becomes useless in child process
209  InputController::resetForChildProcess();
210  // die when parent dies
211  prctl(PR_SET_PDEATHSIG, SIGHUP);
212  return true;
213  }
214  return false;
215 }
216 
217 std::string GlobalProcHandler::getProcessName()
218 {
219  if (isProcess(ProcType::c_Worker))
220  return "worker";
221  if (isProcess(ProcType::c_Input))
222  return "input";
223  if (isProcess(ProcType::c_Output))
224  return "output";
225  if (isProcess(ProcType::c_Init))
226  return "init";
227  if (isProcess(ProcType::c_Monitor))
228  return "monitor";
229 
230  //shouldn't happen
231  return "???";
232 }
233 
234 int GlobalProcHandler::EvtProcID()
235 {
236  return s_processID;
237 }
238 
239 void GlobalProcHandler::killAllProcesses()
240 {
241  for (int& pid : s_pidVector) {
242  if (pid != 0) {
243  if (kill(pid, SIGKILL) >= 0) {
244  B2ERROR("hard killed process " << pid);
245  } else {
246  B2DEBUG(100, "no process " << pid << " found, already gone?");
247  }
248  pid = 0;
249  }
250  }
251 }
252 
253 const std::vector<int>& GlobalProcHandler::getPIDList()
254 {
255  return s_pidVector;
256 }
257 
258 ProcType GlobalProcHandler::getProcType(int pid)
259 {
260  const auto procTypeIt = s_startedPIDs.find(pid);
261  if (procTypeIt == s_startedPIDs.end()) {
262  B2FATAL("Asking for a non-existing PID");
263  }
264  return procTypeIt->second;
265 }
266 
267 int GlobalProcHandler::numEventProcesses()
268 {
269  return s_numEventProcesses;
270 }
271 
272 bool GlobalProcHandler::isOutputProcess()
273 {
274  return isProcess(ProcType::c_Output);
275 }
276 
277 bool GlobalProcHandler::isWorkerProcess()
278 {
279  return isProcess(ProcType::c_Worker);
280 }
281 
282 bool GlobalProcHandler::isInputProcess()
283 {
284  return isProcess(ProcType::c_Input);
285 }
286 
287 void GlobalProcHandler::waitForAllProcesses()
288 {
289  while (true) {
290  if (pidListEmpty()) {
291  return;
292  }
293 
294  std::this_thread::sleep_for(std::chrono::milliseconds(1));
295  }
296 }
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:16
Abstract base class for different kinds of events.