Belle II Software  release-08-01-10
GlobalProcHandler.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/GlobalProcHandler.h>
10 #include <framework/core/InputController.h>
11 #include <framework/logging/Logger.h>
12 #include <framework/core/EventProcessor.h>
13 
14 #include <framework/pcore/ProcHandler.h>
15 
16 #include <sys/wait.h>
17 #include <sys/prctl.h>
18 #include <cstdio>
19 #include <cstdlib>
20 #include <cerrno>
21 #include <cstring>
22 #include <unistd.h>
23 #include <Python.h>
24 
25 #include <thread>
26 #include <chrono>
27 
28 #include <iostream>
29 
30 using namespace std;
31 using namespace Belle2;
32 
33 ProcType GlobalProcHandler::s_procType = ProcType::c_Init;
34 int GlobalProcHandler::s_processID = -1;
35 int GlobalProcHandler::s_numEventProcesses = 0;
36 
37 std::vector<int> GlobalProcHandler::s_pidVector;
38 std::map<int, ProcType> GlobalProcHandler::s_startedPIDs;
39 
40 void GlobalProcHandler::childSignalHandler(int)
41 {
42  while (!GlobalProcHandler::pidListEmpty()) {
43  int status;
44  int pid = waitpid(-1, &status, WNOHANG);
45  if (pid == -1) {
46  if (errno == EINTR) {
47  continue; // interrupted, try again
48  } else if (errno == ECHILD) {
49  // We don't have any child processes?
50  EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
51  // actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
52  // In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
53  GlobalProcHandler::clearPIDs();
54  return;
55  } else {
56  // also shouldn't happen
57  EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
58  }
59  } else if (pid == 0) {
60  // should not happen because of waitpid(-1,...)
61  // further children exist, but no state change yet
62  break;
63  } else {
64  // state change
65  // get signed PID
66  if (not GlobalProcHandler::findPID(pid)) {
67  // unknown child process died, ignore
68  continue;
69  }
70 
71  // errors?
72  if (WIFSIGNALED(status) or (WIFEXITED(status) and WEXITSTATUS(status) != 0)) {
73  EventProcessor::writeToStdErr("\nSub-process exited with non-zero exit status. Please check other log messages for details.\n");
74  }
75 
76  // remove pid from global list
77  GlobalProcHandler::removePID(pid);
78  }
79  }
80 }
81 
82 
83 void GlobalProcHandler::addPID(int newPID)
84 {
85  // if possible, insert pid into gap in list
86  for (int& pid : s_pidVector) {
87  if (pid == 0) {
88  pid = newPID;
89  return;
90  }
91  }
92 
93  B2FATAL("PID vector at capacity. This produces a race condition, make sure GlobalProcHandler is created early.");
94 }
95 
96 bool GlobalProcHandler::findPID(int pid)
97 {
98  return std::find(s_pidVector.begin(), s_pidVector.end(), pid) != s_pidVector.end();
99 }
100 
101 void GlobalProcHandler::removePID(int oldPID)
102 {
103  for (int& pid : s_pidVector) {
104  if (pid == oldPID) {
105  pid = 0;
106  return;
107  }
108  }
109 }
110 
111 void GlobalProcHandler::clearPIDs()
112 {
113  std::fill(s_pidVector.begin(), s_pidVector.end(), 0);
114 }
115 
116 bool GlobalProcHandler::pidListEmpty()
117 {
118  for (const int& pid : s_pidVector) {
119  if (pid != 0) {
120  return false;
121  }
122  }
123  return true;
124 }
125 
126 void GlobalProcHandler::initialize(unsigned int nWorkerProc)
127 {
128  B2ASSERT("Constructing GlobalProcHandler after forking is not allowed!", pidListEmpty());
129 
130  s_numEventProcesses = nWorkerProc;
131 
132  // s_pidVector size shouldn't be changed once processes are forked (race condition)
133  s_pidVector.resize(s_pidVector.size() + nWorkerProc + 3, 0); // num worker + input + output + proxy
134 }
135 
136 bool GlobalProcHandler::startProxyProcess()
137 {
138  return startProc(ProcType::c_Proxy, 30000);
139 }
140 
141 bool GlobalProcHandler::startInputProcess()
142 {
143  return startProc(ProcType::c_Input, 10000);
144 }
145 
146 bool GlobalProcHandler::startWorkerProcesses(unsigned int numProcesses)
147 {
148  for (unsigned int i = 0; i < numProcesses; i++) {
149  if (startProc(ProcType::c_Worker, 0)) {
150  return true;
151  }
152  }
153  return false;
154 }
155 
156 bool GlobalProcHandler::startOutputProcess(bool local)
157 {
158  if (local) {
159  s_procType = ProcType::c_Output;
160  return true;
161  } else {
162  return (startProc(ProcType::c_Output, 20000));
163  }
164 }
165 
166 bool GlobalProcHandler::startMonitoringProcess()
167 {
168  s_procType = ProcType::c_Monitor;
169  return true;
170 }
171 
172 bool GlobalProcHandler::parallelProcessingUsed()
173 {
174  return s_processID != -1;
175 }
176 
177 bool GlobalProcHandler::isProcess(ProcType procType)
178 {
179  return (procType == s_procType);
180 }
181 
182 bool GlobalProcHandler::startProc(ProcType procType, int id)
183 {
184  EventProcessor::installSignalHandler(SIGCHLD, childSignalHandler);
185 
186  fflush(stdout);
187  fflush(stderr);
188  pid_t pid = fork();
189  if (pid > 0) {
190  // Mother process
191  addPID(pid);
192  s_startedPIDs[pid] = procType;
193  fflush(stdout);
194  } else if (pid < 0) {
195  B2FATAL("fork() failed: " << strerror(errno));
196  } else {
197  // Child process
198  // do NOT handle SIGCHLD in forked processes!
199  EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
200 
201  s_procType = procType;
202 
203  if (id == 0)
204  s_processID = getpid();
205  else
206  s_processID = id;
207 
208  ProcHandler::setProcessID(s_processID); // Interface to existing ProcHandler
209 
210  // Reset some python state: signals, threads, gil in the child
211  PyOS_AfterFork_Child();
212  // InputController becomes useless in child process
213  InputController::resetForChildProcess();
214  // die when parent dies
215  prctl(PR_SET_PDEATHSIG, SIGHUP);
216  return true;
217  }
218  return false;
219 }
220 
221 std::string GlobalProcHandler::getProcessName()
222 {
223  if (isProcess(ProcType::c_Worker))
224  return "worker";
225  if (isProcess(ProcType::c_Input))
226  return "input";
227  if (isProcess(ProcType::c_Output))
228  return "output";
229  if (isProcess(ProcType::c_Init))
230  return "init";
231  if (isProcess(ProcType::c_Monitor))
232  return "monitor";
233 
234  //shouldn't happen
235  return "???";
236 }
237 
238 int GlobalProcHandler::EvtProcID()
239 {
240  return s_processID;
241 }
242 
243 void GlobalProcHandler::killAllProcesses()
244 {
245  for (int& pid : s_pidVector) {
246  if (pid != 0) {
247  if (kill(pid, SIGKILL) >= 0) {
248  B2DEBUG(100, "hard killed process " << pid);
249  } else {
250  B2DEBUG(100, "no process " << pid << " found, already gone?");
251  }
252  pid = 0;
253  }
254  }
255 }
256 
257 const std::vector<int>& GlobalProcHandler::getPIDList()
258 {
259  return s_pidVector;
260 }
261 
262 ProcType GlobalProcHandler::getProcType(int pid)
263 {
264  const auto procTypeIt = s_startedPIDs.find(pid);
265  if (procTypeIt == s_startedPIDs.end()) {
266  B2FATAL("Asking for a non-existing PID");
267  }
268  return procTypeIt->second;
269 }
270 
271 int GlobalProcHandler::numEventProcesses()
272 {
273  return s_numEventProcesses;
274 }
275 
276 bool GlobalProcHandler::isOutputProcess()
277 {
278  return isProcess(ProcType::c_Output);
279 }
280 
281 bool GlobalProcHandler::isWorkerProcess()
282 {
283  return isProcess(ProcType::c_Worker);
284 }
285 
286 bool GlobalProcHandler::isInputProcess()
287 {
288  return isProcess(ProcType::c_Input);
289 }
290 
291 void GlobalProcHandler::waitForAllProcesses()
292 {
293  while (true) {
294  if (pidListEmpty()) {
295  return;
296  }
297 
298  std::this_thread::sleep_for(std::chrono::milliseconds(1));
299  }
300 }
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:16
Abstract base class for different kinds of events.