Belle II Software development
ProcHandler.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/ProcHandler.h>
10#include <framework/core/InputController.h>
11#include <framework/logging/Logger.h>
12#include <framework/core/EventProcessor.h>
13#include <framework/pcore/GlobalProcHandler.h>
14
15#include <vector>
16
17#include <sys/wait.h>
18#include <sys/prctl.h>
19#include <cstdio>
20#include <cstdlib>
21#include <cerrno>
22#include <cstring>
23#include <unistd.h>
24#include <Python.h>
25
26using namespace std;
27using namespace Belle2;
28
29namespace {
30 static int s_processID = -1;
31 static int s_numEventProcesses = 0;
32 static int s_localChildrenWithErrors = 0;
33
34 // global list of PIDs managed by ProcHandler.
35 // (directly modifying STL structures in the signal handler is unsafe, so let's be overly
36 // cautious and use only C-like functions there.)
37 // PIDs are addedusing addPID() while forking, items are set to 0 when process stops
38 static std::vector<int> s_pidVector;
39 static int* s_pids = nullptr;
40 static int s_numpids = 0;
41 void addPID(int pid)
42 {
43 //if possible, insert pid into gap in list
44 bool found_gap = false;
45 for (int i = 0; i < s_numpids; i++) {
46 if (s_pids[i] == 0) {
47 found_gap = true;
48 s_pids[i] = pid;
49 break;
50 }
51 }
52
53 if (!found_gap) {
54 if (s_pidVector.size() == s_pidVector.capacity()) {
55 B2FATAL("PID vector at capacity. This produces a race condition, make sure ProcHandler is created early.");
56 }
57 s_pidVector.push_back(pid);
58 }
59 s_pids = s_pidVector.data();
60 s_numpids = s_pidVector.size();
61 }
62
63 //in signal handler, use only the following functions!
65 int findPID(int pid)
66 {
67 for (int i = 0; i < s_numpids; i++)
68 if (std::abs(s_pids[i]) == std::abs(pid))
69 return s_pids[i];
70 return 0;
71 }
72 void removePID(int pid)
73 {
74 for (int i = 0; i < s_numpids; i++)
75 if (std::abs(s_pids[i]) == std::abs(pid))
76 s_pids[i] = 0;
77 }
78 void clearPIDs()
79 {
80 for (int i = 0; i < s_numpids; i++)
81 s_pids[i] = 0;
82 }
83 bool pidListEmpty()
84 {
85 for (int i = 0; i < s_numpids; i++)
86 if (s_pids[i] != 0)
87 return false;
88 return true;
89 }
90
91 void sigChldHandler(int)
92 {
93 //EventProcessor::writeToStdErr("\n sigchild handler called .\n");
94 int raiseSig = 0;
95 while (!pidListEmpty()) {
96 int status;
97 int pid = waitpid(-1, &status, WNOHANG);
98 if (pid == -1) {
99 if (errno == EINTR) {
100 continue; //interrupted, try again
101 } else if (errno == ECHILD) {
102 //We don't have any child processes?
103 EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
104 //
105 //actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
106 //In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
107 clearPIDs();
108 return;
109 } else {
110 //also shouldn't happen
111 EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
112 }
113 } else if (pid == 0) {
114 //further children exist, but no state change yet
115 break;
116 } else { //state change
117 //get signed PID
118 pid = findPID(pid);
119 if (pid == 0)
120 continue; //unknown child process died, ignore
121
122 int termSig = 0;
123 //errors?
124 if (WIFSIGNALED(status)) {
125 //ok, it died because of some signal
126 //EventProcessor::writeToStdErr("\nOne of our child processes died, stopping execution...\n");
127 termSig = WTERMSIG(status);
128
129 //backtrace in parent is not helpful
130 if (termSig == SIGSEGV)
131 termSig = SIGTERM;
132 } else if (WIFEXITED(status) and WEXITSTATUS(status) != 0) {
133 EventProcessor::writeToStdErr("\nExecution stopped, sub-process exited with non-zero exit status. Please check other log messages for details.\n");
134 termSig = SIGTERM;
135 }
136
137 if (termSig != 0) {
138 if (pid < 0)
139 s_localChildrenWithErrors++;
140 else
141 raiseSig = termSig;
142 }
143
144 //remove pid from global list
145 removePID(pid);
146 }
147 }
148
149 if (raiseSig)
150 raise(raiseSig);
151 }
152}
153
154bool ProcHandler::startProc(std::set<int>* processList, const std::string& procType, int id)
155{
156 EventProcessor::installSignalHandler(SIGCHLD, sigChldHandler);
157
158 fflush(stdout);
159 fflush(stderr);
160 PyOS_BeforeFork();
161 pid_t pid = fork();
162 if (pid > 0) { // Mother process
163 PyOS_AfterFork_Parent();
165 pid = -pid;
166 processList->insert(pid);
167 addPID(pid);
168 B2INFO("ProcHandler: " << procType << " process forked. pid = " << pid);
169 fflush(stdout);
170 } else if (pid < 0) {
171 B2FATAL("fork() failed: " << strerror(errno));
172 } else {
173 //do NOT handle SIGCHLD in forked processes!
174 EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
175
176 s_processID = id;
177 //Reset some python state: signals, threads, gil in the child
178 PyOS_AfterFork_Child();
179 //InputController becomes useless in child process
181 //die when parent dies
182 prctl(PR_SET_PDEATHSIG, SIGHUP);
183 return true;
184 }
185 return false;
186}
187
188ProcHandler::ProcHandler(unsigned int nWorkerProc, bool markChildrenAsLocal):
189 m_markChildrenAsLocal(markChildrenAsLocal),
190 m_numWorkerProcesses(nWorkerProc)
191{
192 if ((int)nWorkerProc > s_numEventProcesses)
193 s_numEventProcesses = nWorkerProc;
194
195 if (!pidListEmpty())
196 B2FATAL("Constructing ProcHandler after forking is not allowed!");
197
198 //s_pidVector size shouldn't be changed once processes are forked (race condition)
199 s_pidVector.reserve(s_pidVector.size() + nWorkerProc + 2);
200 s_pids = s_pidVector.data();
201 setsid();
202
203}
204ProcHandler::~ProcHandler() = default;
205
206
208{
209 startProc(&m_processList, "input", 10000);
210}
211
213{
214 for (unsigned int i = 0; i < m_numWorkerProcesses; i++) {
215 if (startProc(&m_processList, "worker", i))
216 break; // in child process
217 }
218}
219
221{
222 if (s_processID == -1)
223 s_processID = 20000;
224}
225
227
228bool ProcHandler::isInputProcess() { return (s_processID >= 10000 and s_processID < 20000) or GlobalProcHandler::isInputProcess(); }
229
231
232bool ProcHandler::isOutputProcess() { return s_processID >= 20000 or GlobalProcHandler::isOutputProcess(); }
233
235{
236 return s_numEventProcesses;
237}
238
240{
241 return std::set<int>(s_pidVector.begin(), s_pidVector.end());
242}
243std::set<int> ProcHandler::processList() const
244{
245 return m_processList;
246}
247
248int ProcHandler::EvtProcID() { return s_processID; }
249
250void ProcHandler::setProcessID(int processID) { s_processID = processID; }
251
253{
254 if (isWorkerProcess())
255 return "worker";
256 if (isInputProcess())
257 return "input";
258 if (isOutputProcess())
259 return "output";
260
261 //shouldn't happen
262 return "???";
263}
264
265
267{
268 bool ok = true;
269 while (!m_processList.empty()) {
270 for (int pid : m_processList) {
271 //once a process is gone from the global list, remove them from our own, too.
272 if (findPID(pid) == 0) {
273 m_processList.erase(pid);
274 if (m_markChildrenAsLocal and pid < 0 and s_localChildrenWithErrors != 0) {
275 ok = false;
276 s_localChildrenWithErrors--;
277 }
278 break;
279 }
280 }
281
282 usleep(100);
283 }
284 return ok;
285}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static bool isOutputProcess()
Return true if the process is of type c_Output.
static bool isInputProcess()
Return true if the process is of type c_Input.
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
static void resetForChildProcess()
Reset InputController (e.g.
void startOutputProcess()
There is no real output process, but marks current process as output.
Definition: ProcHandler.cc:220
static bool isWorkerProcess()
Return true if the process is a worker process.
Definition: ProcHandler.cc:230
static int EvtProcID()
Return ID of the current process.
Definition: ProcHandler.cc:248
std::set< int > m_processList
PIDs of processes controlled by this ProcHandler.
Definition: ProcHandler.h:89
static int numEventProcesses()
Return number of worker processes (configured value, not current)
Definition: ProcHandler.cc:234
static bool isOutputProcess()
Return true if the process is an output process.
Definition: ProcHandler.cc:232
static bool isInputProcess()
Return true if the process is an input process.
Definition: ProcHandler.cc:228
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250
unsigned int m_numWorkerProcesses
Number of worker processes controlled by this ProcHandler.
Definition: ProcHandler.h:90
static std::set< int > globalProcessList()
Return list of all PIDs (from all ProcHandler instances).
Definition: ProcHandler.cc:239
bool waitForAllProcesses()
Wait until all forked processes handled by this ProcHandler terminate.
Definition: ProcHandler.cc:266
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
Definition: ProcHandler.cc:226
bool m_markChildrenAsLocal
Anormal termination of child will not stop parent, waitForAllProcesses() returns status.
Definition: ProcHandler.h:88
ProcHandler(unsigned int nWorkerProc, bool markChildrenAsLocal=false)
Constructor.
Definition: ProcHandler.cc:188
bool startProc(std::set< int > *processList, const std::string &procType, int id)
Start a new process, adding its PID to processList, and setting s_processID = id.
Definition: ProcHandler.cc:154
~ProcHandler()
Destructor.
void startWorkerProcesses()
Fork and initialize worker processes.
Definition: ProcHandler.cc:212
std::set< int > processList() const
Return list of PIDs managed by this ProcHandler instance.
Definition: ProcHandler.cc:243
void startInputProcess()
Fork and initialize an input process.
Definition: ProcHandler.cc:207
static std::string getProcessName()
Get a name for this process.
Definition: ProcHandler.cc:252
Abstract base class for different kinds of events.
STL namespace.