Belle II Software light-2406-ragdoll
ProcHandler Class Reference

A class to manage processes for parallel processing. More...

#include <ProcHandler.h>

Collaboration diagram for ProcHandler:

Public Member Functions

 ProcHandler (unsigned int nWorkerProc, bool markChildrenAsLocal=false)
 Constructor.
 
 ~ProcHandler ()
 Destructor.
 
void startInputProcess ()
 Fork and initialize an input process.
 
void startWorkerProcesses ()
 Fork and initialize worker processes.
 
void startOutputProcess ()
 There is no real output process, but marks current process as output.
 
bool waitForAllProcesses ()
 Wait until all forked processes handled by this ProcHandler terminate.
 
std::set< int > processList () const
 Return list of PIDs managed by this ProcHandler instance.
 

Static Public Member Functions

static bool parallelProcessingUsed ()
 Returns true if multiple processes have been spawned, false in single-core mode.
 
static bool isInputProcess ()
 Return true if the process is an input process.
 
static bool isWorkerProcess ()
 Return true if the process is a worker process.
 
static bool isOutputProcess ()
 Return true if the process is an output process.
 
static int numEventProcesses ()
 Return number of worker processes (configured value, not current)
 
static std::set< int > globalProcessList ()
 Return list of all PIDs (from all ProcHandler instances).
 
static int EvtProcID ()
 Return ID of the current process.
 
static void setProcessID (int processID)
 Set the process ID of this process.
 
static std::string getProcessName ()
 Get a name for this process.
 

Private Member Functions

bool startProc (std::set< int > *processList, const std::string &procType, int id)
 Start a new process, adding its PID to processList, and setting s_processID = id.
 

Private Attributes

bool m_markChildrenAsLocal
 Anormal termination of child will not stop parent, waitForAllProcesses() returns status.
 
std::set< int > m_processList
 PIDs of processes controlled by this ProcHandler.
 
unsigned int m_numWorkerProcesses
 Number of worker processes controlled by this ProcHandler.
 

Detailed Description

A class to manage processes for parallel processing.

Definition at line 21 of file ProcHandler.h.

Constructor & Destructor Documentation

◆ ProcHandler()

ProcHandler ( unsigned int  nWorkerProc,
bool  markChildrenAsLocal = false 
)
explicit

Constructor.

Children marked as local will not cause the parent process to die in case of anormal termination. Errors can be detected using the return value of waitForAllProcesses(). Note that you probably shouldn't have multiple ProcHandlers with local children running at the same time.

Definition at line 188 of file ProcHandler.cc.

188 :
189 m_markChildrenAsLocal(markChildrenAsLocal),
190 m_numWorkerProcesses(nWorkerProc)
191{
192 if ((int)nWorkerProc > s_numEventProcesses)
193 s_numEventProcesses = nWorkerProc;
194
195 if (!pidListEmpty())
196 B2FATAL("Constructing ProcHandler after forking is not allowed!");
197
198 //s_pidVector size shouldn't be changed once processes are forked (race condition)
199 s_pidVector.reserve(s_pidVector.size() + nWorkerProc + 2);
200 s_pids = s_pidVector.data();
201 setsid();
202
203}
unsigned int m_numWorkerProcesses
Number of worker processes controlled by this ProcHandler.
Definition: ProcHandler.h:90
bool m_markChildrenAsLocal
Anormal termination of child will not stop parent, waitForAllProcesses() returns status.
Definition: ProcHandler.h:88

Member Function Documentation

◆ EvtProcID()

int EvtProcID ( )
static

Return ID of the current process.

Return values mean: -1: no parallel processing used <10000 one of the worker processes (between input and output paths). In range 0..Environment::getInstance().getNumberProcesses()-1. >=10000 input path >=20000 output path

Definition at line 248 of file ProcHandler.cc.

248{ return s_processID; }

◆ getProcessName()

std::string getProcessName ( )
static

Get a name for this process.

(input, event, output...).

Definition at line 252 of file ProcHandler.cc.

253{
254 if (isWorkerProcess())
255 return "worker";
256 if (isInputProcess())
257 return "input";
258 if (isOutputProcess())
259 return "output";
260
261 //shouldn't happen
262 return "???";
263}
static bool isWorkerProcess()
Return true if the process is a worker process.
Definition: ProcHandler.cc:230
static bool isOutputProcess()
Return true if the process is an output process.
Definition: ProcHandler.cc:232
static bool isInputProcess()
Return true if the process is an input process.
Definition: ProcHandler.cc:228

◆ globalProcessList()

std::set< int > globalProcessList ( )
static

Return list of all PIDs (from all ProcHandler instances).

Definition at line 239 of file ProcHandler.cc.

240{
241 return std::set<int>(s_pidVector.begin(), s_pidVector.end());
242}

◆ isInputProcess()

bool isInputProcess ( )
static

Return true if the process is an input process.

Definition at line 228 of file ProcHandler.cc.

228{ return (s_processID >= 10000 and s_processID < 20000) or GlobalProcHandler::isInputProcess(); }
static bool isInputProcess()
Return true if the process is of type c_Input.

◆ isOutputProcess()

bool isOutputProcess ( )
static

Return true if the process is an output process.

Definition at line 232 of file ProcHandler.cc.

232{ return s_processID >= 20000 or GlobalProcHandler::isOutputProcess(); }
static bool isOutputProcess()
Return true if the process is of type c_Output.

◆ isWorkerProcess()

bool isWorkerProcess ( )
static

Return true if the process is a worker process.

Definition at line 230 of file ProcHandler.cc.

230{ return (parallelProcessingUsed() and s_processID < 10000) or GlobalProcHandler::isWorkerProcess(); }
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
Definition: ProcHandler.cc:226

◆ numEventProcesses()

int numEventProcesses ( )
static

Return number of worker processes (configured value, not current)

Definition at line 234 of file ProcHandler.cc.

235{
236 return s_numEventProcesses;
237}

◆ parallelProcessingUsed()

bool parallelProcessingUsed ( )
static

Returns true if multiple processes have been spawned, false in single-core mode.

Definition at line 226 of file ProcHandler.cc.

226{ return s_processID != -1 or GlobalProcHandler::parallelProcessingUsed(); }
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.

◆ processList()

std::set< int > processList ( ) const

Return list of PIDs managed by this ProcHandler instance.

Definition at line 243 of file ProcHandler.cc.

244{
245 return m_processList;
246}
std::set< int > m_processList
PIDs of processes controlled by this ProcHandler.
Definition: ProcHandler.h:89

◆ setProcessID()

void setProcessID ( int  processID)
static

Set the process ID of this process.

Definition at line 250 of file ProcHandler.cc.

250{ s_processID = processID; }

◆ startInputProcess()

void startInputProcess ( )

Fork and initialize an input process.

Definition at line 207 of file ProcHandler.cc.

208{
209 startProc(&m_processList, "input", 10000);
210}
bool startProc(std::set< int > *processList, const std::string &procType, int id)
Start a new process, adding its PID to processList, and setting s_processID = id.
Definition: ProcHandler.cc:154

◆ startOutputProcess()

void startOutputProcess ( )

There is no real output process, but marks current process as output.

Definition at line 220 of file ProcHandler.cc.

221{
222 if (s_processID == -1)
223 s_processID = 20000;
224}

◆ startProc()

bool startProc ( std::set< int > *  processList,
const std::string &  procType,
int  id 
)
private

Start a new process, adding its PID to processList, and setting s_processID = id.

Returns true in child process.

Definition at line 154 of file ProcHandler.cc.

155{
156 EventProcessor::installSignalHandler(SIGCHLD, sigChldHandler);
157
158 fflush(stdout);
159 fflush(stderr);
160 PyOS_BeforeFork();
161 pid_t pid = fork();
162 if (pid > 0) { // Mother process
163 PyOS_AfterFork_Parent();
165 pid = -pid;
166 processList->insert(pid);
167 addPID(pid);
168 B2INFO("ProcHandler: " << procType << " process forked. pid = " << pid);
169 fflush(stdout);
170 } else if (pid < 0) {
171 B2FATAL("fork() failed: " << strerror(errno));
172 } else {
173 //do NOT handle SIGCHLD in forked processes!
174 EventProcessor::installSignalHandler(SIGCHLD, SIG_IGN);
175
176 s_processID = id;
177 //Reset some python state: signals, threads, gil in the child
178 PyOS_AfterFork_Child();
179 //InputController becomes useless in child process
181 //die when parent dies
182 prctl(PR_SET_PDEATHSIG, SIGHUP);
183 return true;
184 }
185 return false;
186}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
static void resetForChildProcess()
Reset InputController (e.g.
std::set< int > processList() const
Return list of PIDs managed by this ProcHandler instance.
Definition: ProcHandler.cc:243

◆ startWorkerProcesses()

void startWorkerProcesses ( )

Fork and initialize worker processes.

Definition at line 212 of file ProcHandler.cc.

213{
214 for (unsigned int i = 0; i < m_numWorkerProcesses; i++) {
215 if (startProc(&m_processList, "worker", i))
216 break; // in child process
217 }
218}

◆ waitForAllProcesses()

bool waitForAllProcesses ( )

Wait until all forked processes handled by this ProcHandler terminate.

In case the markChildrenAsLocal option was set when a process was started and this process terminated anormaly (via signal or non-zero exit code), the return value is set to false. Otherwise, true is returned.

Definition at line 266 of file ProcHandler.cc.

267{
268 bool ok = true;
269 while (!m_processList.empty()) {
270 for (int pid : m_processList) {
271 //once a process is gone from the global list, remove them from our own, too.
272 if (findPID(pid) == 0) {
273 m_processList.erase(pid);
274 if (m_markChildrenAsLocal and pid < 0 and s_localChildrenWithErrors != 0) {
275 ok = false;
276 s_localChildrenWithErrors--;
277 }
278 break;
279 }
280 }
281
282 usleep(100);
283 }
284 return ok;
285}

Member Data Documentation

◆ m_markChildrenAsLocal

bool m_markChildrenAsLocal
private

Anormal termination of child will not stop parent, waitForAllProcesses() returns status.

Definition at line 88 of file ProcHandler.h.

◆ m_numWorkerProcesses

unsigned int m_numWorkerProcesses
private

Number of worker processes controlled by this ProcHandler.

Definition at line 90 of file ProcHandler.h.

◆ m_processList

std::set<int> m_processList
private

PIDs of processes controlled by this ProcHandler.

Definition at line 89 of file ProcHandler.h.


The documentation for this class was generated from the following files: