Belle II Software  release-05-01-25
GlobalProcHandler.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Soohyung Lee, Anseln Baur, Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 
11 #include <framework/pcore/GlobalProcHandler.h>
12 #include <framework/core/InputController.h>
13 #include <framework/logging/Logger.h>
14 #include <framework/core/EventProcessor.h>
15 
16 #include <sys/wait.h>
17 #include <sys/prctl.h>
18 #include <cstdio>
19 #include <cstdlib>
20 #include <cerrno>
21 #include <cstring>
22 #include <unistd.h>
23 #include <Python.h>
24 
25 #include <thread>
26 #include <chrono>
27 
28 #include <iostream>
29 
30 using namespace std;
31 using namespace Belle2;
32 
33 ProcType GlobalProcHandler::s_procType = ProcType::c_Init;
34 int GlobalProcHandler::s_processID = -1;
35 int GlobalProcHandler::s_numEventProcesses = 0;
36 
37 std::vector<int> GlobalProcHandler::s_pidVector;
38 std::map<int, ProcType> GlobalProcHandler::s_startedPIDs;
39 
40 void GlobalProcHandler::childSignalHandler(int)
41 {
42  while (!GlobalProcHandler::pidListEmpty()) {
43  int status;
44  int pid = waitpid(-1, &status, WNOHANG);
45  if (pid == -1) {
46  if (errno == EINTR) {
47  continue; // interrupted, try again
48  } else if (errno == ECHILD) {
49  // We don't have any child processes?
50  EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
51  // actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
52  // In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
53  GlobalProcHandler::clearPIDs();
54  return;
55  } else {
56  // also shouldn't happen
57  EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
58  }
59  } else if (pid == 0) {
60  // should not happen because of waitpid(-1,...)
61  // further children exist, but no state change yet
62  break;
63  } else {
64  // state change
65  // get signed PID
66  if (not GlobalProcHandler::findPID(pid)) {
67  // unknown child process died, ignore
68  continue;
69  }
70 
71  // errors?
72  if (WIFSIGNALED(status) or (WIFEXITED(status) and WEXITSTATUS(status) != 0)) {
73  EventProcessor::writeToStdErr("\nSub-process exited with non-zero exit status. Please check other log messages for details.\n");
74  }
75 
76  // remove pid from global list
77  GlobalProcHandler::removePID(pid);
78  }
79  }
80 }
81 
82 
83 void GlobalProcHandler::addPID(int newPID)
84 {
85  // if possible, insert pid into gap in list
86  for (int& pid : s_pidVector) {
87  if (pid == 0) {
88  pid = newPID;
89  return;
90  }
91  }
92 
93  B2FATAL("PID vector at capacity. This produces a race condition, make sure GlobalProcHandler is created early.");
94 }
95 
96 bool GlobalProcHandler::findPID(int pid)
97 {
98  return std::find(s_pidVector.begin(), s_pidVector.end(), pid) != s_pidVector.end();
99 }
100 
101 void GlobalProcHandler::removePID(int oldPID)
102 {
103  for (int& pid : s_pidVector) {
104  if (pid == oldPID) {
105  pid = 0;
106  return;
107  }
108  }
109 }
110 
111 void GlobalProcHandler::clearPIDs()
112 {
113  std::fill(s_pidVector.begin(), s_pidVector.end(), 0);
114 }
115 
116 bool GlobalProcHandler::pidListEmpty()
117 {
118  for (const int& pid : s_pidVector) {
119  if (pid != 0) {
120  return false;
121  }
122  }
123  return true;
124 }
125 
126 void GlobalProcHandler::initialize(unsigned int nWorkerProc)
127 {
128  B2ASSERT("Constructing GlobalProcHandler after forking is not allowed!", pidListEmpty());
129 
130  s_numEventProcesses = nWorkerProc;
131 
132  // s_pidVector size shouldn't be changed once processes are forked (race condition)
133  s_pidVector.resize(s_pidVector.size() + nWorkerProc + 3, 0); // num worker + input + output + proxy
134 }
135 
136 bool GlobalProcHandler::startProxyProcess()
137 {
138  return startProc(ProcType::c_Proxy, 30000);
139 }
140 
141 bool GlobalProcHandler::startInputProcess()
142 {
143  return startProc(ProcType::c_Input, 10000);
144 }
145 
146 bool GlobalProcHandler::startWorkerProcesses(unsigned int numProcesses)
147 {
148  for (unsigned int i = 0; i < numProcesses; i++) {
149  if (startProc(ProcType::c_Worker, 0)) {
150  return true;
151  }
152  }
153  return false;
154 }
155 
156 bool GlobalProcHandler::startOutputProcess(bool local)
157 {
158  if (local) {
159  s_procType = ProcType::c_Output;
160  return true;
161  } else {
162  return (startProc(ProcType::c_Output, 20000));
163  }
164 }
165 
166 bool GlobalProcHandler::startMonitoringProcess()
167 {
168  s_procType = ProcType::c_Monitor;
169  return true;
170 }
171 
172 bool GlobalProcHandler::parallelProcessingUsed()
173 {
174  return s_processID != -1;
175 }
176 
177 bool GlobalProcHandler::isProcess(ProcType procType)
178 {
179  return (procType == s_procType);
180 }
181 
182 bool GlobalProcHandler::startProc(ProcType procType, int id)
183 {
184  EventProcessor::installSignalHandler(SIGCHLD, childSignalHandler);
185 
186  fflush(stdout);
187  fflush(stderr);
188  pid_t pid = fork();
189  if (pid > 0) {
190  // Mother process
191  addPID(pid);
192  s_startedPIDs[pid] = procType;
193  fflush(stdout);
194  } else if (pid < 0) {
195  B2FATAL("fork() failed: " << strerror(errno));
196  } else {
197  // Child process
198  // do NOT handle SIGCHLD in forked processes!
199  EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
200 
201  s_procType = procType;
202 
203  if (id == 0)
204  s_processID = getpid();
205  else
206  s_processID = id;
207 
208  // Reset some python state: signals, threads, gil in the child
209  PyOS_AfterFork();
210  // InputController becomes useless in child process
211  InputController::resetForChildProcess();
212  // die when parent dies
213  prctl(PR_SET_PDEATHSIG, SIGHUP);
214  return true;
215  }
216  return false;
217 }
218 
219 std::string GlobalProcHandler::getProcessName()
220 {
221  if (isProcess(ProcType::c_Worker))
222  return "worker";
223  if (isProcess(ProcType::c_Input))
224  return "input";
225  if (isProcess(ProcType::c_Output))
226  return "output";
227  if (isProcess(ProcType::c_Init))
228  return "init";
229  if (isProcess(ProcType::c_Monitor))
230  return "monitor";
231 
232  //shouldn't happen
233  return "???";
234 }
235 
236 int GlobalProcHandler::EvtProcID()
237 {
238  return s_processID;
239 }
240 
241 void GlobalProcHandler::killAllProcesses()
242 {
243  for (int& pid : s_pidVector) {
244  if (pid != 0) {
245  if (kill(pid, SIGKILL) >= 0) {
246  B2ERROR("hard killed process " << pid);
247  } else {
248  B2DEBUG(100, "no process " << pid << " found, already gone?");
249  }
250  pid = 0;
251  }
252  }
253 }
254 
255 const std::vector<int>& GlobalProcHandler::getPIDList()
256 {
257  return s_pidVector;
258 }
259 
260 ProcType GlobalProcHandler::getProcType(int pid)
261 {
262  const auto procTypeIt = s_startedPIDs.find(pid);
263  if (procTypeIt == s_startedPIDs.end()) {
264  B2FATAL("Asking for a non-existing PID");
265  }
266  return procTypeIt->second;
267 }
268 
269 int GlobalProcHandler::numEventProcesses()
270 {
271  return s_numEventProcesses;
272 }
273 
274 bool GlobalProcHandler::isOutputProcess()
275 {
276  return isProcess(ProcType::c_Output);
277 }
278 
279 bool GlobalProcHandler::isWorkerProcess()
280 {
281  return isProcess(ProcType::c_Worker);
282 }
283 
284 bool GlobalProcHandler::isInputProcess()
285 {
286  return isProcess(ProcType::c_Input);
287 }
288 
289 void GlobalProcHandler::waitForAllProcesses()
290 {
291  while (true) {
292  if (pidListEmpty()) {
293  return;
294  }
295 
296  std::this_thread::sleep_for(std::chrono::milliseconds(1));
297  }
298 }
Belle2::ProcType
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:9
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19