Belle II Software light-2505-deimos
ProcHandler.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/ProcHandler.h>
10#include <framework/core/InputController.h>
11#include <framework/logging/Logger.h>
12#include <framework/core/EventProcessor.h>
13#include <framework/pcore/GlobalProcHandler.h>
14
15#include <vector>
16
17#include <sys/wait.h>
18#include <sys/prctl.h>
19#include <cstdio>
20#include <cstdlib>
21#include <cerrno>
22#include <cstring>
23#include <unistd.h>
24#include <Python.h>
25
26using namespace std;
27using namespace Belle2;
28
29namespace {
30 static int s_processID = -1;
31 static int s_numEventProcesses = 0;
32 static int s_localChildrenWithErrors = 0;
33
34 // global list of PIDs managed by ProcHandler.
35 // (directly modifying STL structures in the signal handler is unsafe, so let's be overly
36 // cautious and use only C-like functions there.)
37 // PIDs are addedusing addPID() while forking, items are set to 0 when process stops
38 static std::vector<int> s_pidVector;
39 static int* s_pids = nullptr;
40 static int s_numpids = 0;
41 void addPID(int pid)
42 {
43 //if possible, insert pid into gap in list
44 bool found_gap = false;
45 for (int i = 0; i < s_numpids; i++) {
46 if (s_pids[i] == 0) {
47 found_gap = true;
48 s_pids[i] = pid;
49 break;
50 }
51 }
52
53 if (!found_gap) {
54 if (s_pidVector.size() == s_pidVector.capacity()) {
55 B2FATAL("PID vector at capacity. This produces a race condition, make sure ProcHandler is created early.");
56 }
57 s_pidVector.push_back(pid);
58 }
59 s_pids = s_pidVector.data();
60 s_numpids = s_pidVector.size();
61 }
62
63 //in signal handler, use only the following functions!
65 int findPID(int pid)
66 {
67 for (int i = 0; i < s_numpids; i++)
68 if (std::abs(s_pids[i]) == std::abs(pid))
69 return s_pids[i];
70 return 0;
71 }
72 void removePID(int pid)
73 {
74 for (int i = 0; i < s_numpids; i++)
75 if (std::abs(s_pids[i]) == std::abs(pid))
76 s_pids[i] = 0;
77 }
78 void clearPIDs()
79 {
80 for (int i = 0; i < s_numpids; i++)
81 s_pids[i] = 0;
82 }
83 bool pidListEmpty()
84 {
85 for (int i = 0; i < s_numpids; i++)
86 if (s_pids[i] != 0)
87 return false;
88 return true;
89 }
90
91 void sigChldHandler(int)
92 {
93 //EventProcessor::writeToStdErr("\n sigchild handler called .\n");
94 int raiseSig = 0;
95 while (!pidListEmpty()) {
96 int status;
97 int pid = waitpid(-1, &status, WNOHANG);
98 if (pid == -1) {
99 if (errno == EINTR) {
100 continue; //interrupted, try again
101 } else if (errno == ECHILD) {
102 //We don't have any child processes?
103 EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
104 //
105 //actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
106 //In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
107 clearPIDs();
108 return;
109 } else {
110 //also shouldn't happen
111 EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
112 }
113 } else if (pid == 0) {
114 //further children exist, but no state change yet
115 break;
116 } else { //state change
117 //get signed PID
118 pid = findPID(pid);
119 if (pid == 0)
120 continue; //unknown child process died, ignore
121
122 int termSig = 0;
123 //errors?
124 if (WIFSIGNALED(status)) {
125 //ok, it died because of some signal
126 //EventProcessor::writeToStdErr("\nOne of our child processes died, stopping execution...\n");
127 termSig = WTERMSIG(status);
128
129 //backtrace in parent is not helpful
130 if (termSig == SIGSEGV)
131 termSig = SIGTERM;
132 } else if (WIFEXITED(status) and WEXITSTATUS(status) != 0) {
133 EventProcessor::writeToStdErr("\nExecution stopped, sub-process exited with non-zero exit status. Please check other log messages for details.\n");
134 termSig = SIGTERM;
135 }
136
137 if (termSig != 0) {
138 if (pid < 0)
139 s_localChildrenWithErrors++;
140 else
141 raiseSig = termSig;
142 }
143
144 //remove pid from global list
145 removePID(pid);
146 }
147 }
148
149 if (raiseSig)
150 raise(raiseSig);
151 }
152}
153
154bool ProcHandler::startProc(std::set<int>* processList, const std::string& procType, int id)
155{
156 EventProcessor::installSignalHandler(SIGCHLD, sigChldHandler);
157
158 fflush(stdout);
159 fflush(stderr);
160 PyOS_BeforeFork();
161 pid_t pid = fork();
162 if (pid > 0) { // Mother process
163 PyOS_AfterFork_Parent();
165 pid = -pid;
166 processList->insert(pid);
167 addPID(pid);
168 B2INFO("ProcHandler: " << procType << " process forked. pid = " << pid);
169 fflush(stdout);
170 } else if (pid < 0) {
171 B2FATAL("fork() failed: " << strerror(errno));
172 } else {
173 //do NOT handle SIGCHLD in forked processes!
174 EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
175
176 s_processID = id;
177 //Reset some python state: signals, threads, gil in the child
178 PyOS_AfterFork_Child();
179 //InputController becomes useless in child process
181 //die when parent dies
182 prctl(PR_SET_PDEATHSIG, SIGHUP);
183 return true;
184 }
185 return false;
186}
187
188ProcHandler::ProcHandler(unsigned int nWorkerProc, bool markChildrenAsLocal):
189 m_markChildrenAsLocal(markChildrenAsLocal),
190 m_numWorkerProcesses(nWorkerProc)
191{
192 if ((int)nWorkerProc > s_numEventProcesses)
193 s_numEventProcesses = nWorkerProc;
194
195 if (!pidListEmpty())
196 B2FATAL("Constructing ProcHandler after forking is not allowed!");
197
198 //s_pidVector size shouldn't be changed once processes are forked (race condition)
199 s_pidVector.reserve(s_pidVector.size() + nWorkerProc + 2);
200 s_pids = s_pidVector.data();
201 setsid();
202
203}
204ProcHandler::~ProcHandler() = default;
205
206
208{
209 startProc(&m_processList, "input", 10000);
210}
211
213{
214 for (unsigned int i = 0; i < m_numWorkerProcesses; i++) {
215 if (startProc(&m_processList, "worker", i))
216 break; // in child process
217 }
218}
219
221{
222 if (s_processID == -1)
223 s_processID = 20000;
224}
225
227
228bool ProcHandler::isInputProcess() { return (s_processID >= 10000 and s_processID < 20000) or GlobalProcHandler::isInputProcess(); }
229
231
232bool ProcHandler::isOutputProcess() { return s_processID >= 20000 or GlobalProcHandler::isOutputProcess(); }
233
235{
236 return s_numEventProcesses;
237}
238
240{
241 return std::set<int>(s_pidVector.begin(), s_pidVector.end());
242}
243std::set<int> ProcHandler::processList() const
244{
245 return m_processList;
246}
247
248int ProcHandler::EvtProcID() { return s_processID; }
249
250void ProcHandler::setProcessID(int processID) { s_processID = processID; }
251
253{
254 if (isWorkerProcess())
255 return "worker";
256 if (isInputProcess())
257 return "input";
258 if (isOutputProcess())
259 return "output";
260
261 //shouldn't happen
262 return "???";
263}
264
265
267{
268 bool ok = true;
269 while (!m_processList.empty()) {
270 for (int pid : m_processList) {
271 //once a process is gone from the global list, remove them from our own, too.
272 if (findPID(pid) == 0) {
273 m_processList.erase(pid);
274 if (m_markChildrenAsLocal and pid < 0 and s_localChildrenWithErrors != 0) {
275 ok = false;
276 s_localChildrenWithErrors--;
277 }
278 break;
279 }
280 }
281
282 usleep(100);
283 }
284 return ok;
285}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static bool isOutputProcess()
Return true if the process is of type c_Output.
static bool isInputProcess()
Return true if the process is of type c_Input.
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
static void resetForChildProcess()
Reset InputController (e.g.
void startOutputProcess()
There is no real output process, but marks current process as output.
static bool isWorkerProcess()
Return true if the process is a worker process.
static int EvtProcID()
Return ID of the current process.
std::set< int > m_processList
PIDs of processes controlled by this ProcHandler.
Definition ProcHandler.h:89
static int numEventProcesses()
Return number of worker processes (configured value, not current)
static bool isOutputProcess()
Return true if the process is an output process.
static bool isInputProcess()
Return true if the process is an input process.
static void setProcessID(int processID)
Set the process ID of this process.
unsigned int m_numWorkerProcesses
Number of worker processes controlled by this ProcHandler.
Definition ProcHandler.h:90
static std::set< int > globalProcessList()
Return list of all PIDs (from all ProcHandler instances).
bool waitForAllProcesses()
Wait until all forked processes handled by this ProcHandler terminate.
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
bool m_markChildrenAsLocal
Abnormal termination of child will not stop parent, waitForAllProcesses() returns status.
Definition ProcHandler.h:88
ProcHandler(unsigned int nWorkerProc, bool markChildrenAsLocal=false)
Constructor.
bool startProc(std::set< int > *processList, const std::string &procType, int id)
Start a new process, adding its PID to processList, and setting s_processID = id.
~ProcHandler()
Destructor.
void startWorkerProcesses()
Fork and initialize worker processes.
std::set< int > processList() const
Return list of PIDs managed by this ProcHandler instance.
void startInputProcess()
Fork and initialize an input process.
static std::string getProcessName()
Get a name for this process.
Abstract base class for different kinds of events.
STL namespace.