Belle II Software development
GlobalProcHandler Class Reference

A class to manage processes for parallel processing. More...

#include <GlobalProcHandler.h>

Static Public Member Functions

static void initialize (unsigned int nWorkerProc)
 Create a new process handler, which will handle nWorkerProc processes.
 
static bool startInputProcess ()
 Fork and initialize an input process.
 
static bool startWorkerProcesses (unsigned int numProcesses)
 Fork and initialize as many worker processes as requested.
 
static bool startOutputProcess (bool local=false)
 Fork and initialize an output process.
 
static bool startProxyProcess ()
 Fork and initialize a proxy process.
 
static bool startMonitoringProcess ()
 Fork and initialize a monitoring process.
 
static void waitForAllProcesses ()
 Wait until all forked processes handled by this GlobalProcHandler.
 
static bool parallelProcessingUsed ()
 Returns true if multiple processes have been spawned, false in single-core mode.
 
static bool isInputProcess ()
 Return true if the process is of type c_Input.
 
static bool isWorkerProcess ()
 Return true if the process is of type c_Worker.
 
static bool isOutputProcess ()
 Return true if the process is of type c_Output.
 
static int numEventProcesses ()
 Return number of worker processes (configured value, not current)
 
static int EvtProcID ()
 Get the ID of this process. Attention: this ID may not be a stable API feature.
 
static std::string getProcessName ()
 Get a human readable name for this process. (input, event, output...).
 
static void killAllProcesses ()
 Hard kill all processes.
 
static const std::vector< int > & getPIDList ()
 Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the signal handlers and the startup of processes.
 
static bool isProcess (ProcType procType)
 Return true if the process is of type procType.
 
static ProcType getProcType (int pid)
 Return the proc type of this process.
 

Static Private Member Functions

static bool startProc (ProcType procType, int id)
 Start a new process, sets the type and id and returns true if in this new process.
 
static void addPID (int pid)
 Add a new PID. Is called when forking.
 
static bool findPID (int pid)
 Find a PID in the list and return true, if found.
 
static void removePID (int pid)
 Remove a PID from the list by setting it to 0.
 
static void clearPIDs ()
 Remove all PIDs.
 
static bool pidListEmpty ()
 Check if the PID list is empty (only 0).
 
static void childSignalHandler (int)
 This function is called on SIG_CLD.
 

Static Private Attributes

static ProcType s_procType = ProcType::c_Init
 Our current proc type.
 
static int s_processID = -1
 Our current process id.
 
static int s_numEventProcesses = 0
 How many processes are handled in this GlobalProcHandler.
 
static std::vector< int > s_pidVector
 global list of PIDs managed by GlobalProcHandler.
 
static std::map< int, ProcTypes_startedPIDs
 Which PIDs were started with which types.
 

Detailed Description

A class to manage processes for parallel processing.

Definition at line 22 of file GlobalProcHandler.h.

Member Function Documentation

◆ addPID()

void addPID ( int  pid)
staticprivate

Add a new PID. Is called when forking.

Definition at line 81 of file GlobalProcHandler.cc.

82{
83 // if possible, insert pid into gap in list
84 for (int& pid : s_pidVector) {
85 if (pid == 0) {
86 pid = newPID;
87 return;
88 }
89 }
90
91 B2FATAL("PID vector at capacity. This produces a race condition, make sure GlobalProcHandler is created early.");
92}
static std::vector< int > s_pidVector
global list of PIDs managed by GlobalProcHandler.

◆ childSignalHandler()

void childSignalHandler ( int  )
staticprivate

This function is called on SIG_CLD.

Definition at line 38 of file GlobalProcHandler.cc.

39{
41 int status;
42 int pid = waitpid(-1, &status, WNOHANG);
43 if (pid == -1) {
44 if (errno == EINTR) {
45 continue; // interrupted, try again
46 } else if (errno == ECHILD) {
47 // We don't have any child processes?
48 EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
49 // actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
50 // In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
52 return;
53 } else {
54 // also shouldn't happen
55 EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
56 }
57 } else if (pid == 0) {
58 // should not happen because of waitpid(-1,...)
59 // further children exist, but no state change yet
60 break;
61 } else {
62 // state change
63 // get signed PID
64 if (not GlobalProcHandler::findPID(pid)) {
65 // unknown child process died, ignore
66 continue;
67 }
68
69 // errors?
70 if (WIFSIGNALED(status) or (WIFEXITED(status) and WEXITSTATUS(status) != 0)) {
71 EventProcessor::writeToStdErr("\nSub-process exited with non-zero exit status. Please check other log messages for details.\n");
72 }
73
74 // remove pid from global list
76 }
77 }
78}
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
static bool findPID(int pid)
Find a PID in the list and return true, if found.
static void clearPIDs()
Remove all PIDs.
static bool pidListEmpty()
Check if the PID list is empty (only 0).
static void removePID(int pid)
Remove a PID from the list by setting it to 0.

◆ clearPIDs()

void clearPIDs ( )
staticprivate

Remove all PIDs.

Definition at line 109 of file GlobalProcHandler.cc.

110{
111 std::fill(s_pidVector.begin(), s_pidVector.end(), 0);
112}

◆ EvtProcID()

int EvtProcID ( )
static

Get the ID of this process. Attention: this ID may not be a stable API feature.

Definition at line 236 of file GlobalProcHandler.cc.

237{
238 return s_processID;
239}
static int s_processID
Our current process id.

◆ findPID()

bool findPID ( int  pid)
staticprivate

Find a PID in the list and return true, if found.

Definition at line 94 of file GlobalProcHandler.cc.

95{
96 return std::find(s_pidVector.begin(), s_pidVector.end(), pid) != s_pidVector.end();
97}

◆ getPIDList()

const std::vector< int > & getPIDList ( )
static

Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the signal handlers and the startup of processes.

Definition at line 255 of file GlobalProcHandler.cc.

256{
257 return s_pidVector;
258}

◆ getProcessName()

std::string getProcessName ( )
static

Get a human readable name for this process. (input, event, output...).

Definition at line 219 of file GlobalProcHandler.cc.

220{
222 return "worker";
224 return "input";
226 return "output";
228 return "init";
230 return "monitor";
231
232 //shouldn't happen
233 return "???";
234}
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Monitor
Monitoring Process.
@ c_Input
Input Process.
@ c_Init
Before the forks, the process is in init state.

◆ getProcType()

ProcType getProcType ( int  pid)
static

Return the proc type of this process.

Definition at line 260 of file GlobalProcHandler.cc.

261{
262 const auto procTypeIt = s_startedPIDs.find(pid);
263 if (procTypeIt == s_startedPIDs.end()) {
264 B2FATAL("Asking for a non-existing PID");
265 }
266 return procTypeIt->second;
267}
static std::map< int, ProcType > s_startedPIDs
Which PIDs were started with which types.

◆ initialize()

void initialize ( unsigned int  nWorkerProc)
static

Create a new process handler, which will handle nWorkerProc processes.

Definition at line 124 of file GlobalProcHandler.cc.

125{
126 B2ASSERT("Constructing GlobalProcHandler after forking is not allowed!", pidListEmpty());
127
128 s_numEventProcesses = nWorkerProc;
129
130 // s_pidVector size shouldn't be changed once processes are forked (race condition)
131 s_pidVector.resize(s_pidVector.size() + nWorkerProc + 3, 0); // num worker + input + output + proxy
132}
static int s_numEventProcesses
How many processes are handled in this GlobalProcHandler.

◆ isInputProcess()

bool isInputProcess ( )
static

Return true if the process is of type c_Input.

Definition at line 284 of file GlobalProcHandler.cc.

285{
287}

◆ isOutputProcess()

bool isOutputProcess ( )
static

Return true if the process is of type c_Output.

Definition at line 274 of file GlobalProcHandler.cc.

275{
277}

◆ isProcess()

bool isProcess ( ProcType  procType)
static

Return true if the process is of type procType.

Definition at line 175 of file GlobalProcHandler.cc.

176{
177 return (procType == s_procType);
178}
static ProcType s_procType
Our current proc type.

◆ isWorkerProcess()

bool isWorkerProcess ( )
static

Return true if the process is of type c_Worker.

Definition at line 279 of file GlobalProcHandler.cc.

280{
282}

◆ killAllProcesses()

void killAllProcesses ( )
static

Hard kill all processes.

Definition at line 241 of file GlobalProcHandler.cc.

242{
243 for (int& pid : s_pidVector) {
244 if (pid != 0) {
245 if (kill(pid, SIGKILL) >= 0) {
246 B2DEBUG(100, "hard killed process " << pid);
247 } else {
248 B2DEBUG(100, "no process " << pid << " found, already gone?");
249 }
250 pid = 0;
251 }
252 }
253}

◆ numEventProcesses()

int numEventProcesses ( )
static

Return number of worker processes (configured value, not current)

Definition at line 269 of file GlobalProcHandler.cc.

270{
271 return s_numEventProcesses;
272}

◆ parallelProcessingUsed()

bool parallelProcessingUsed ( )
static

Returns true if multiple processes have been spawned, false in single-core mode.

Definition at line 170 of file GlobalProcHandler.cc.

171{
172 return s_processID != -1;
173}

◆ pidListEmpty()

bool pidListEmpty ( )
staticprivate

Check if the PID list is empty (only 0).

Definition at line 114 of file GlobalProcHandler.cc.

115{
116 for (const int& pid : s_pidVector) {
117 if (pid != 0) {
118 return false;
119 }
120 }
121 return true;
122}

◆ removePID()

void removePID ( int  pid)
staticprivate

Remove a PID from the list by setting it to 0.

Definition at line 99 of file GlobalProcHandler.cc.

100{
101 for (int& pid : s_pidVector) {
102 if (pid == oldPID) {
103 pid = 0;
104 return;
105 }
106 }
107}

◆ startInputProcess()

bool startInputProcess ( )
static

Fork and initialize an input process.

Definition at line 139 of file GlobalProcHandler.cc.

140{
141 return startProc(ProcType::c_Input, 10000);
142}
static bool startProc(ProcType procType, int id)
Start a new process, sets the type and id and returns true if in this new process.

◆ startMonitoringProcess()

bool startMonitoringProcess ( )
static

Fork and initialize a monitoring process.

Definition at line 164 of file GlobalProcHandler.cc.

165{
167 return true;
168}

◆ startOutputProcess()

bool startOutputProcess ( bool  local = false)
static

Fork and initialize an output process.

If local is true, do not fork.

Definition at line 154 of file GlobalProcHandler.cc.

155{
156 if (local) {
158 return true;
159 } else {
160 return (startProc(ProcType::c_Output, 20000));
161 }
162}

◆ startProc()

bool startProc ( ProcType  procType,
int  id 
)
staticprivate

Start a new process, sets the type and id and returns true if in this new process.

Also adds the signal handling.

Definition at line 180 of file GlobalProcHandler.cc.

181{
183
184 fflush(stdout);
185 fflush(stderr);
186 pid_t pid = fork();
187 if (pid > 0) {
188 // Mother process
189 addPID(pid);
190 s_startedPIDs[pid] = procType;
191 fflush(stdout);
192 } else if (pid < 0) {
193 B2FATAL("fork() failed: " << strerror(errno));
194 } else {
195 // Child process
196 // do NOT handle SIGCHLD in forked processes!
197 EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
198
199 s_procType = procType;
200
201 if (id == 0)
202 s_processID = getpid();
203 else
204 s_processID = id;
205
206 ProcHandler::setProcessID(s_processID); // Interface to existing ProcHandler
207
208 // Reset some python state: signals, threads, gil in the child
209 PyOS_AfterFork_Child();
210 // InputController becomes useless in child process
212 // die when parent dies
213 prctl(PR_SET_PDEATHSIG, SIGHUP);
214 return true;
215 }
216 return false;
217}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
static void childSignalHandler(int)
This function is called on SIG_CLD.
static void addPID(int pid)
Add a new PID. Is called when forking.
static void resetForChildProcess()
Reset InputController (e.g.
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250

◆ startProxyProcess()

bool startProxyProcess ( )
static

Fork and initialize a proxy process.

Definition at line 134 of file GlobalProcHandler.cc.

135{
136 return startProc(ProcType::c_Proxy, 30000);
137}
@ c_Proxy
Multicast Proxy Process.

◆ startWorkerProcesses()

bool startWorkerProcesses ( unsigned int  numProcesses)
static

Fork and initialize as many worker processes as requested.

Definition at line 144 of file GlobalProcHandler.cc.

145{
146 for (unsigned int i = 0; i < numProcesses; i++) {
148 return true;
149 }
150 }
151 return false;
152}

◆ waitForAllProcesses()

void waitForAllProcesses ( )
static

Wait until all forked processes handled by this GlobalProcHandler.

Definition at line 289 of file GlobalProcHandler.cc.

290{
291 while (true) {
292 if (pidListEmpty()) {
293 return;
294 }
295
296 std::this_thread::sleep_for(std::chrono::milliseconds(1));
297 }
298}

Member Data Documentation

◆ s_numEventProcesses

int s_numEventProcesses = 0
staticprivate

How many processes are handled in this GlobalProcHandler.

Definition at line 77 of file GlobalProcHandler.h.

◆ s_pidVector

std::vector< int > s_pidVector
staticprivate

global list of PIDs managed by GlobalProcHandler.

Definition at line 80 of file GlobalProcHandler.h.

◆ s_processID

int s_processID = -1
staticprivate

Our current process id.

Definition at line 75 of file GlobalProcHandler.h.

◆ s_procType

ProcType s_procType = ProcType::c_Init
staticprivate

Our current proc type.

Definition at line 73 of file GlobalProcHandler.h.

◆ s_startedPIDs

std::map< int, ProcType > s_startedPIDs
staticprivate

Which PIDs were started with which types.

Definition at line 82 of file GlobalProcHandler.h.


The documentation for this class was generated from the following files: