Belle II Software development
GlobalProcHandler Class Reference

A class to manage processes for parallel processing. More...

#include <GlobalProcHandler.h>

Static Public Member Functions

static void initialize (unsigned int nWorkerProc)
 Create a new process handler, which will handle nWorkerProc processes.
 
static bool startInputProcess ()
 Fork and initialize an input process.
 
static bool startWorkerProcesses (unsigned int numProcesses)
 Fork and initialize as many worker processes as requested.
 
static bool startOutputProcess (bool local=false)
 Fork and initialize an output process.
 
static bool startProxyProcess ()
 Fork and initialize a proxy process.
 
static bool startMonitoringProcess ()
 Fork and initialize a monitoring process.
 
static void waitForAllProcesses ()
 Wait until all forked processes handled by this GlobalProcHandler.
 
static bool parallelProcessingUsed ()
 Returns true if multiple processes have been spawned, false in single-core mode.
 
static bool isInputProcess ()
 Return true if the process is of type c_Input.
 
static bool isWorkerProcess ()
 Return true if the process is of type c_Worker.
 
static bool isOutputProcess ()
 Return true if the process is of type c_Output.
 
static int numEventProcesses ()
 Return number of worker processes (configured value, not current)
 
static int EvtProcID ()
 Get the ID of this process. Attention: this ID may not be a stable API feature.
 
static std::string getProcessName ()
 Get a human readable name for this process. (input, event, output...).
 
static void killAllProcesses ()
 Hard kill all processes.
 
static const std::vector< int > & getPIDList ()
 Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the signal handlers and the startup of processes.
 
static bool isProcess (ProcType procType)
 Return true if the process is of type procType.
 
static ProcType getProcType (int pid)
 Return the proc type of this process.
 

Static Private Member Functions

static bool startProc (ProcType procType, int id)
 Start a new process, sets the type and id and returns true if in this new process.
 
static void addPID (int pid)
 Add a new PID. Is called when forking.
 
static bool findPID (int pid)
 Find a PID in the list and return true, if found.
 
static void removePID (int pid)
 Remove a PID from the list by setting it to 0.
 
static void clearPIDs ()
 Remove all PIDs.
 
static bool pidListEmpty ()
 Check if the PID list is empty (only 0).
 
static void childSignalHandler (int)
 This function is called on SIG_CLD.
 

Static Private Attributes

static ProcType s_procType = ProcType::c_Init
 Our current proc type.
 
static int s_processID = -1
 Our current process id.
 
static int s_numEventProcesses = 0
 How many processes are handled in this GlobalProcHandler.
 
static std::vector< int > s_pidVector
 global list of PIDs managed by GlobalProcHandler.
 
static std::map< int, ProcTypes_startedPIDs
 Which PIDs were started with which types.
 

Detailed Description

A class to manage processes for parallel processing.

Definition at line 22 of file GlobalProcHandler.h.

Member Function Documentation

◆ addPID()

void addPID ( int  pid)
staticprivate

Add a new PID. Is called when forking.

Definition at line 83 of file GlobalProcHandler.cc.

84{
85 // if possible, insert pid into gap in list
86 for (int& pid : s_pidVector) {
87 if (pid == 0) {
88 pid = newPID;
89 return;
90 }
91 }
92
93 B2FATAL("PID vector at capacity. This produces a race condition, make sure GlobalProcHandler is created early.");
94}
static std::vector< int > s_pidVector
global list of PIDs managed by GlobalProcHandler.

◆ childSignalHandler()

void childSignalHandler ( int  )
staticprivate

This function is called on SIG_CLD.

Definition at line 40 of file GlobalProcHandler.cc.

41{
43 int status;
44 int pid = waitpid(-1, &status, WNOHANG);
45 if (pid == -1) {
46 if (errno == EINTR) {
47 continue; // interrupted, try again
48 } else if (errno == ECHILD) {
49 // We don't have any child processes?
50 EventProcessor::writeToStdErr("\n Called waitpid() without any children left. This shouldn't happen and and indicates a problem.\n");
51 // actually, this is ok in case we already called waitpid() somewhere else. (but we don't do that...)
52 // In case I want to avoid this, waitid() and WNOWAIT might help, but require libc >= 2.12 (not present in SL5)
54 return;
55 } else {
56 // also shouldn't happen
57 EventProcessor::writeToStdErr("\nwaitpid() failed.\n");
58 }
59 } else if (pid == 0) {
60 // should not happen because of waitpid(-1,...)
61 // further children exist, but no state change yet
62 break;
63 } else {
64 // state change
65 // get signed PID
66 if (not GlobalProcHandler::findPID(pid)) {
67 // unknown child process died, ignore
68 continue;
69 }
70
71 // errors?
72 if (WIFSIGNALED(status) or (WIFEXITED(status) and WEXITSTATUS(status) != 0)) {
73 EventProcessor::writeToStdErr("\nSub-process exited with non-zero exit status. Please check other log messages for details.\n");
74 }
75
76 // remove pid from global list
78 }
79 }
80}
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
static bool findPID(int pid)
Find a PID in the list and return true, if found.
static void clearPIDs()
Remove all PIDs.
static bool pidListEmpty()
Check if the PID list is empty (only 0).
static void removePID(int pid)
Remove a PID from the list by setting it to 0.

◆ clearPIDs()

void clearPIDs ( )
staticprivate

Remove all PIDs.

Definition at line 111 of file GlobalProcHandler.cc.

112{
113 std::fill(s_pidVector.begin(), s_pidVector.end(), 0);
114}

◆ EvtProcID()

int EvtProcID ( )
static

Get the ID of this process. Attention: this ID may not be a stable API feature.

Definition at line 238 of file GlobalProcHandler.cc.

239{
240 return s_processID;
241}
static int s_processID
Our current process id.

◆ findPID()

bool findPID ( int  pid)
staticprivate

Find a PID in the list and return true, if found.

Definition at line 96 of file GlobalProcHandler.cc.

97{
98 return std::find(s_pidVector.begin(), s_pidVector.end(), pid) != s_pidVector.end();
99}

◆ getPIDList()

const std::vector< int > & getPIDList ( )
static

Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the signal handlers and the startup of processes.

Definition at line 257 of file GlobalProcHandler.cc.

258{
259 return s_pidVector;
260}

◆ getProcessName()

std::string getProcessName ( )
static

Get a human readable name for this process. (input, event, output...).

Definition at line 221 of file GlobalProcHandler.cc.

222{
224 return "worker";
226 return "input";
228 return "output";
230 return "init";
232 return "monitor";
233
234 //shouldn't happen
235 return "???";
236}
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Monitor
Monitoring Process.
@ c_Input
Input Process.
@ c_Init
Before the forks, the process is in init state.

◆ getProcType()

ProcType getProcType ( int  pid)
static

Return the proc type of this process.

Definition at line 262 of file GlobalProcHandler.cc.

263{
264 const auto procTypeIt = s_startedPIDs.find(pid);
265 if (procTypeIt == s_startedPIDs.end()) {
266 B2FATAL("Asking for a non-existing PID");
267 }
268 return procTypeIt->second;
269}
static std::map< int, ProcType > s_startedPIDs
Which PIDs were started with which types.

◆ initialize()

void initialize ( unsigned int  nWorkerProc)
static

Create a new process handler, which will handle nWorkerProc processes.

Definition at line 126 of file GlobalProcHandler.cc.

127{
128 B2ASSERT("Constructing GlobalProcHandler after forking is not allowed!", pidListEmpty());
129
130 s_numEventProcesses = nWorkerProc;
131
132 // s_pidVector size shouldn't be changed once processes are forked (race condition)
133 s_pidVector.resize(s_pidVector.size() + nWorkerProc + 3, 0); // num worker + input + output + proxy
134}
static int s_numEventProcesses
How many processes are handled in this GlobalProcHandler.

◆ isInputProcess()

bool isInputProcess ( )
static

Return true if the process is of type c_Input.

Definition at line 286 of file GlobalProcHandler.cc.

287{
289}

◆ isOutputProcess()

bool isOutputProcess ( )
static

Return true if the process is of type c_Output.

Definition at line 276 of file GlobalProcHandler.cc.

277{
279}

◆ isProcess()

bool isProcess ( ProcType  procType)
static

Return true if the process is of type procType.

Definition at line 177 of file GlobalProcHandler.cc.

178{
179 return (procType == s_procType);
180}
static ProcType s_procType
Our current proc type.

◆ isWorkerProcess()

bool isWorkerProcess ( )
static

Return true if the process is of type c_Worker.

Definition at line 281 of file GlobalProcHandler.cc.

282{
284}

◆ killAllProcesses()

void killAllProcesses ( )
static

Hard kill all processes.

Definition at line 243 of file GlobalProcHandler.cc.

244{
245 for (int& pid : s_pidVector) {
246 if (pid != 0) {
247 if (kill(pid, SIGKILL) >= 0) {
248 B2DEBUG(100, "hard killed process " << pid);
249 } else {
250 B2DEBUG(100, "no process " << pid << " found, already gone?");
251 }
252 pid = 0;
253 }
254 }
255}

◆ numEventProcesses()

int numEventProcesses ( )
static

Return number of worker processes (configured value, not current)

Definition at line 271 of file GlobalProcHandler.cc.

272{
273 return s_numEventProcesses;
274}

◆ parallelProcessingUsed()

bool parallelProcessingUsed ( )
static

Returns true if multiple processes have been spawned, false in single-core mode.

Definition at line 172 of file GlobalProcHandler.cc.

173{
174 return s_processID != -1;
175}

◆ pidListEmpty()

bool pidListEmpty ( )
staticprivate

Check if the PID list is empty (only 0).

Definition at line 116 of file GlobalProcHandler.cc.

117{
118 for (const int& pid : s_pidVector) {
119 if (pid != 0) {
120 return false;
121 }
122 }
123 return true;
124}

◆ removePID()

void removePID ( int  pid)
staticprivate

Remove a PID from the list by setting it to 0.

Definition at line 101 of file GlobalProcHandler.cc.

102{
103 for (int& pid : s_pidVector) {
104 if (pid == oldPID) {
105 pid = 0;
106 return;
107 }
108 }
109}

◆ startInputProcess()

bool startInputProcess ( )
static

Fork and initialize an input process.

Definition at line 141 of file GlobalProcHandler.cc.

142{
143 return startProc(ProcType::c_Input, 10000);
144}
static bool startProc(ProcType procType, int id)
Start a new process, sets the type and id and returns true if in this new process.

◆ startMonitoringProcess()

bool startMonitoringProcess ( )
static

Fork and initialize a monitoring process.

Definition at line 166 of file GlobalProcHandler.cc.

167{
169 return true;
170}

◆ startOutputProcess()

bool startOutputProcess ( bool  local = false)
static

Fork and initialize an output process.

If local is true, do not fork.

Definition at line 156 of file GlobalProcHandler.cc.

157{
158 if (local) {
160 return true;
161 } else {
162 return (startProc(ProcType::c_Output, 20000));
163 }
164}

◆ startProc()

bool startProc ( ProcType  procType,
int  id 
)
staticprivate

Start a new process, sets the type and id and returns true if in this new process.

Also adds the signal handling.

Definition at line 182 of file GlobalProcHandler.cc.

183{
185
186 fflush(stdout);
187 fflush(stderr);
188 pid_t pid = fork();
189 if (pid > 0) {
190 // Mother process
191 addPID(pid);
192 s_startedPIDs[pid] = procType;
193 fflush(stdout);
194 } else if (pid < 0) {
195 B2FATAL("fork() failed: " << strerror(errno));
196 } else {
197 // Child process
198 // do NOT handle SIGCHLD in forked processes!
199 EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
200
201 s_procType = procType;
202
203 if (id == 0)
204 s_processID = getpid();
205 else
206 s_processID = id;
207
208 ProcHandler::setProcessID(s_processID); // Interface to existing ProcHandler
209
210 // Reset some python state: signals, threads, gil in the child
211 PyOS_AfterFork_Child();
212 // InputController becomes useless in child process
214 // die when parent dies
215 prctl(PR_SET_PDEATHSIG, SIGHUP);
216 return true;
217 }
218 return false;
219}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
static void childSignalHandler(int)
This function is called on SIG_CLD.
static void addPID(int pid)
Add a new PID. Is called when forking.
static void resetForChildProcess()
Reset InputController (e.g.
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250

◆ startProxyProcess()

bool startProxyProcess ( )
static

Fork and initialize a proxy process.

Definition at line 136 of file GlobalProcHandler.cc.

137{
138 return startProc(ProcType::c_Proxy, 30000);
139}
@ c_Proxy
Multicast Proxy Process.

◆ startWorkerProcesses()

bool startWorkerProcesses ( unsigned int  numProcesses)
static

Fork and initialize as many worker processes as requested.

Definition at line 146 of file GlobalProcHandler.cc.

147{
148 for (unsigned int i = 0; i < numProcesses; i++) {
150 return true;
151 }
152 }
153 return false;
154}

◆ waitForAllProcesses()

void waitForAllProcesses ( )
static

Wait until all forked processes handled by this GlobalProcHandler.

Definition at line 291 of file GlobalProcHandler.cc.

292{
293 while (true) {
294 if (pidListEmpty()) {
295 return;
296 }
297
298 std::this_thread::sleep_for(std::chrono::milliseconds(1));
299 }
300}

Member Data Documentation

◆ s_numEventProcesses

int s_numEventProcesses = 0
staticprivate

How many processes are handled in this GlobalProcHandler.

Definition at line 77 of file GlobalProcHandler.h.

◆ s_pidVector

std::vector< int > s_pidVector
staticprivate

global list of PIDs managed by GlobalProcHandler.

Definition at line 80 of file GlobalProcHandler.h.

◆ s_processID

int s_processID = -1
staticprivate

Our current process id.

Definition at line 75 of file GlobalProcHandler.h.

◆ s_procType

ProcType s_procType = ProcType::c_Init
staticprivate

Our current proc type.

Definition at line 73 of file GlobalProcHandler.h.

◆ s_startedPIDs

std::map< int, ProcType > s_startedPIDs
staticprivate

Which PIDs were started with which types.

Definition at line 82 of file GlobalProcHandler.h.


The documentation for this class was generated from the following files: