Belle II Software development
GlobalProcHandler.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/GlobalProcHandler.h>
10#include <framework/core/InputController.h>
11#include <framework/logging/Logger.h>
12#include <framework/core/EventProcessor.h>
13
14#include <framework/pcore/ProcHandler.h>
15
16#include <sys/wait.h>
17#include <sys/prctl.h>
18#include <cstdio>
19#include <cstdlib>
20#include <cerrno>
21#include <cstring>
22#include <unistd.h>
23#include <Python.h>
24
25#include <thread>
26#include <chrono>
27
28#include <iostream>
29
30using namespace std;
31using namespace Belle2;
32
36
37std::vector<int> GlobalProcHandler::s_pidVector;
38std::map<int, ProcType> GlobalProcHandler::s_startedPIDs;
39
41{
43 int status;
44 int pid = waitpid(-1, &status, WNOHANG);
45 if (pid == -1) {
46 if (errno == EINTR) {
47 continue; // interrupted, try again
48 } else if (errno == ECHILD) {
49 // We don't have any child processes?
50 EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
51 // actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
52 // In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
54 return;
55 } else {
56 // also shouldn't happen
57 EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
58 }
59 } else if (pid == 0) {
60 // should not happen because of waitpid(-1,...)
61 // further children exist, but no state change yet
62 break;
63 } else {
64 // state change
65 // get signed PID
66 if (not GlobalProcHandler::findPID(pid)) {
67 // unknown child process died, ignore
68 continue;
69 }
70
71 // errors?
72 if (WIFSIGNALED(status) or (WIFEXITED(status) and WEXITSTATUS(status) != 0)) {
73 EventProcessor::writeToStdErr("\nSub-process exited with non-zero exit status. Please check other log messages for details.\n");
74 }
75
76 // remove pid from global list
78 }
79 }
80}
81
82
84{
85 // if possible, insert pid into gap in list
86 for (int& pid : s_pidVector) {
87 if (pid == 0) {
88 pid = newPID;
89 return;
90 }
91 }
92
93 B2FATAL("PID vector at capacity. This produces a race condition, make sure GlobalProcHandler is created early.");
94}
95
97{
98 return std::find(s_pidVector.begin(), s_pidVector.end(), pid) != s_pidVector.end();
99}
100
102{
103 for (int& pid : s_pidVector) {
104 if (pid == oldPID) {
105 pid = 0;
106 return;
107 }
108 }
109}
110
112{
113 std::fill(s_pidVector.begin(), s_pidVector.end(), 0);
114}
115
117{
118 for (const int& pid : s_pidVector) {
119 if (pid != 0) {
120 return false;
121 }
122 }
123 return true;
124}
125
126void GlobalProcHandler::initialize(unsigned int nWorkerProc)
127{
128 B2ASSERT("Constructing GlobalProcHandler after forking is not allowed!", pidListEmpty());
129
130 s_numEventProcesses = nWorkerProc;
131
132 // s_pidVector size shouldn't be changed once processes are forked (race condition)
133 s_pidVector.resize(s_pidVector.size() + nWorkerProc + 3, 0); // num worker + input + output + proxy
134}
135
137{
138 return startProc(ProcType::c_Proxy, 30000);
139}
140
142{
143 return startProc(ProcType::c_Input, 10000);
144}
145
146bool GlobalProcHandler::startWorkerProcesses(unsigned int numProcesses)
147{
148 for (unsigned int i = 0; i < numProcesses; i++) {
150 return true;
151 }
152 }
153 return false;
154}
155
157{
158 if (local) {
160 return true;
161 } else {
162 return (startProc(ProcType::c_Output, 20000));
163 }
164}
165
167{
169 return true;
170}
171
173{
174 return s_processID != -1;
175}
176
178{
179 return (procType == s_procType);
180}
181
183{
185
186 fflush(stdout);
187 fflush(stderr);
188 pid_t pid = fork();
189 if (pid > 0) {
190 // Mother process
191 addPID(pid);
192 s_startedPIDs[pid] = procType;
193 fflush(stdout);
194 } else if (pid < 0) {
195 B2FATAL("fork() failed: " << strerror(errno));
196 } else {
197 // Child process
198 // do NOT handle SIGCHLD in forked processes!
199 EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
200
201 s_procType = procType;
202
203 if (id == 0)
204 s_processID = getpid();
205 else
206 s_processID = id;
207
208 ProcHandler::setProcessID(s_processID); // Interface to existing ProcHandler
209
210 // Reset some python state: signals, threads, gil in the child
211 PyOS_AfterFork_Child();
212 // InputController becomes useless in child process
214 // die when parent dies
215 prctl(PR_SET_PDEATHSIG, SIGHUP);
216 return true;
217 }
218 return false;
219}
220
222{
224 return "worker";
226 return "input";
228 return "output";
230 return "init";
232 return "monitor";
233
234 //shouldn't happen
235 return "???";
236}
237
239{
240 return s_processID;
241}
242
244{
245 for (int& pid : s_pidVector) {
246 if (pid != 0) {
247 if (kill(pid, SIGKILL) >= 0) {
248 B2DEBUG(100, "hard killed process " << pid);
249 } else {
250 B2DEBUG(100, "no process " << pid << " found, already gone?");
251 }
252 pid = 0;
253 }
254 }
255}
256
257const std::vector<int>& GlobalProcHandler::getPIDList()
258{
259 return s_pidVector;
260}
261
263{
264 const auto procTypeIt = s_startedPIDs.find(pid);
265 if (procTypeIt == s_startedPIDs.end()) {
266 B2FATAL("Asking for a non-existing PID");
267 }
268 return procTypeIt->second;
269}
270
272{
273 return s_numEventProcesses;
274}
275
277{
279}
280
282{
284}
285
287{
289}
290
292{
293 while (true) {
294 if (pidListEmpty()) {
295 return;
296 }
297
298 std::this_thread::sleep_for(std::chrono::milliseconds(1));
299 }
300}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
static bool startOutputProcess(bool local=false)
Fork and initialize an output process.
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static bool findPID(int pid)
Find a PID in the list and return true, if found.
static int EvtProcID()
Get the ID of this process. Attention: this ID may not be a stable API feature.
static ProcType getProcType(int pid)
Return the proc type of this process.
static int numEventProcesses()
Return number of worker processes (configured value, not current)
static bool isOutputProcess()
Return true if the process is of type c_Output.
static int s_numEventProcesses
How many processes are handled in this GlobalProcHandler.
static bool startProc(ProcType procType, int id)
Start a new process, sets the type and id and returns true if in this new process.
static bool isInputProcess()
Return true if the process is of type c_Input.
static std::map< int, ProcType > s_startedPIDs
Which PIDs were started with which types.
static bool startInputProcess()
Fork and initialize an input process.
static int s_processID
Our current process id.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
static void clearPIDs()
Remove all PIDs.
static bool startMonitoringProcess()
Fork and initialize a monitoring process.
static bool pidListEmpty()
Check if the PID list is empty (only 0).
static void childSignalHandler(int)
This function is called on SIG_CLD.
static void addPID(int pid)
Add a new PID. Is called when forking.
static ProcType s_procType
Our current proc type.
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
static void removePID(int pid)
Remove a PID from the list by setting it to 0.
static void waitForAllProcesses()
Wait until all forked processes handled by this GlobalProcHandler.
static const std::vector< int > & getPIDList()
Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the sign...
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
static bool startProxyProcess()
Fork and initialize a proxy process.
static void killAllProcesses()
Hard kill all processes.
static std::vector< int > s_pidVector
global list of PIDs managed by GlobalProcHandler.
static std::string getProcessName()
Get a human readable name for this process. (input, event, output...).
static void resetForChildProcess()
Reset InputController (e.g.
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:16
@ c_Proxy
Multicast Proxy Process.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Monitor
Monitoring Process.
@ c_Input
Input Process.
@ c_Init
Before the forks, the process is in init state.
Abstract base class for different kinds of events.
STL namespace.