Belle II Software development
GlobalProcHandler.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/GlobalProcHandler.h>
10#include <framework/core/InputController.h>
11#include <framework/logging/Logger.h>
12#include <framework/core/EventProcessor.h>
13
14#include <framework/pcore/ProcHandler.h>
15
16#include <sys/wait.h>
17#include <sys/prctl.h>
18#include <cstdio>
19#include <cstdlib>
20#include <cerrno>
21#include <cstring>
22#include <unistd.h>
23#include <Python.h>
24
25#include <thread>
26#include <chrono>
27
28using namespace std;
29using namespace Belle2;
30
34
35std::vector<int> GlobalProcHandler::s_pidVector;
36std::map<int, ProcType> GlobalProcHandler::s_startedPIDs;
37
39{
41 int status;
42 int pid = waitpid(-1, &status, WNOHANG);
43 if (pid == -1) {
44 if (errno == EINTR) {
45 continue; // interrupted, try again
46 } else if (errno == ECHILD) {
47 // We don't have any child processes?
48 EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
49 // actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
50 // In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
52 return;
53 } else {
54 // also shouldn't happen
55 EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
56 }
57 } else if (pid == 0) {
58 // should not happen because of waitpid(-1,...)
59 // further children exist, but no state change yet
60 break;
61 } else {
62 // state change
63 // get signed PID
64 if (not GlobalProcHandler::findPID(pid)) {
65 // unknown child process died, ignore
66 continue;
67 }
68
69 // errors?
70 if (WIFSIGNALED(status) or (WIFEXITED(status) and WEXITSTATUS(status) != 0)) {
71 EventProcessor::writeToStdErr("\nSub-process exited with non-zero exit status. Please check other log messages for details.\n");
72 }
73
74 // remove pid from global list
76 }
77 }
78}
79
80
82{
83 // if possible, insert pid into gap in list
84 for (int& pid : s_pidVector) {
85 if (pid == 0) {
86 pid = newPID;
87 return;
88 }
89 }
90
91 B2FATAL("PID vector at capacity. This produces a race condition, make sure GlobalProcHandler is created early.");
92}
93
95{
96 return std::find(s_pidVector.begin(), s_pidVector.end(), pid) != s_pidVector.end();
97}
98
100{
101 for (int& pid : s_pidVector) {
102 if (pid == oldPID) {
103 pid = 0;
104 return;
105 }
106 }
107}
108
110{
111 std::fill(s_pidVector.begin(), s_pidVector.end(), 0);
112}
113
115{
116 for (const int& pid : s_pidVector) {
117 if (pid != 0) {
118 return false;
119 }
120 }
121 return true;
122}
123
124void GlobalProcHandler::initialize(unsigned int nWorkerProc)
125{
126 B2ASSERT("Constructing GlobalProcHandler after forking is not allowed!", pidListEmpty());
127
128 s_numEventProcesses = nWorkerProc;
129
130 // s_pidVector size shouldn't be changed once processes are forked (race condition)
131 s_pidVector.resize(s_pidVector.size() + nWorkerProc + 3, 0); // num worker + input + output + proxy
132}
133
135{
136 return startProc(ProcType::c_Proxy, 30000);
137}
138
140{
141 return startProc(ProcType::c_Input, 10000);
142}
143
144bool GlobalProcHandler::startWorkerProcesses(unsigned int numProcesses)
145{
146 for (unsigned int i = 0; i < numProcesses; i++) {
148 return true;
149 }
150 }
151 return false;
152}
153
155{
156 if (local) {
158 return true;
159 } else {
160 return (startProc(ProcType::c_Output, 20000));
161 }
162}
163
165{
167 return true;
168}
169
171{
172 return s_processID != -1;
173}
174
176{
177 return (procType == s_procType);
178}
179
181{
183
184 fflush(stdout);
185 fflush(stderr);
186 pid_t pid = fork();
187 if (pid > 0) {
188 // Mother process
189 addPID(pid);
190 s_startedPIDs[pid] = procType;
191 fflush(stdout);
192 } else if (pid < 0) {
193 B2FATAL("fork() failed: " << strerror(errno));
194 } else {
195 // Child process
196 // do NOT handle SIGCHLD in forked processes!
197 EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
198
199 s_procType = procType;
200
201 if (id == 0)
202 s_processID = getpid();
203 else
204 s_processID = id;
205
206 ProcHandler::setProcessID(s_processID); // Interface to existing ProcHandler
207
208 // Reset some python state: signals, threads, gil in the child
209 PyOS_AfterFork_Child();
210 // InputController becomes useless in child process
212 // die when parent dies
213 prctl(PR_SET_PDEATHSIG, SIGHUP);
214 return true;
215 }
216 return false;
217}
218
220{
222 return "worker";
224 return "input";
226 return "output";
228 return "init";
230 return "monitor";
231
232 //shouldn't happen
233 return "???";
234}
235
237{
238 return s_processID;
239}
240
242{
243 for (int& pid : s_pidVector) {
244 if (pid != 0) {
245 if (kill(pid, SIGKILL) >= 0) {
246 B2DEBUG(100, "hard killed process " << pid);
247 } else {
248 B2DEBUG(100, "no process " << pid << " found, already gone?");
249 }
250 pid = 0;
251 }
252 }
253}
254
255const std::vector<int>& GlobalProcHandler::getPIDList()
256{
257 return s_pidVector;
258}
259
261{
262 const auto procTypeIt = s_startedPIDs.find(pid);
263 if (procTypeIt == s_startedPIDs.end()) {
264 B2FATAL("Asking for a non-existing PID");
265 }
266 return procTypeIt->second;
267}
268
270{
271 return s_numEventProcesses;
272}
273
275{
277}
278
280{
282}
283
285{
287}
288
290{
291 while (true) {
292 if (pidListEmpty()) {
293 return;
294 }
295
296 std::this_thread::sleep_for(std::chrono::milliseconds(1));
297 }
298}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
static bool startOutputProcess(bool local=false)
Fork and initialize an output process.
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static bool findPID(int pid)
Find a PID in the list and return true, if found.
static int EvtProcID()
Get the ID of this process. Attention: this ID may not be a stable API feature.
static ProcType getProcType(int pid)
Return the proc type of this process.
static int numEventProcesses()
Return number of worker processes (configured value, not current)
static bool isOutputProcess()
Return true if the process is of type c_Output.
static int s_numEventProcesses
How many processes are handled in this GlobalProcHandler.
static bool startProc(ProcType procType, int id)
Start a new process, sets the type and id and returns true if in this new process.
static bool isInputProcess()
Return true if the process is of type c_Input.
static std::map< int, ProcType > s_startedPIDs
Which PIDs were started with which types.
static bool startInputProcess()
Fork and initialize an input process.
static int s_processID
Our current process id.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
static void clearPIDs()
Remove all PIDs.
static bool startMonitoringProcess()
Fork and initialize a monitoring process.
static bool pidListEmpty()
Check if the PID list is empty (only 0).
static void childSignalHandler(int)
This function is called on SIG_CLD.
static void addPID(int pid)
Add a new PID. Is called when forking.
static ProcType s_procType
Our current proc type.
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
static void removePID(int pid)
Remove a PID from the list by setting it to 0.
static void waitForAllProcesses()
Wait until all forked processes handled by this GlobalProcHandler.
static const std::vector< int > & getPIDList()
Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the sign...
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
static bool startProxyProcess()
Fork and initialize a proxy process.
static void killAllProcesses()
Hard kill all processes.
static std::vector< int > s_pidVector
global list of PIDs managed by GlobalProcHandler.
static std::string getProcessName()
Get a human readable name for this process. (input, event, output...).
static void resetForChildProcess()
Reset InputController (e.g.
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:16
@ c_Proxy
Multicast Proxy Process.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Monitor
Monitoring Process.
@ c_Input
Input Process.
@ c_Init
Before the forks, the process is in init state.
Abstract base class for different kinds of events.
STL namespace.