Belle II Software development
ProcessMonitor.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <framework/pcore/ProcHelper.h>
11#include <framework/pcore/zmq/sockets/ZMQClient.h>
12#include <framework/pcore/zmq/utils/StreamHelper.h>
13
14#include <string>
15#include <map>
16
17namespace Belle2 {
29 public:
31 void subscribe(const std::string& pubSocketAddress, const std::string& subSocketAddress,
32 const std::string& controlSocketAddress);
34 void waitForRunningInput(int timeout);
36 void waitForRunningOutput(int timeout);
38 void waitForRunningWorker(int timeout);
40 void killProcesses(unsigned int timeout);
41
43 void initialize(unsigned int requestedNumberOfWorkers);
44
46 void terminate();
47
49 void reset();
50
52 void checkMulticast(int timeout = 0);
56 void checkSignals(int g_signalReceived);
57
59 bool hasEnded() const;
60
62 unsigned int needMoreWorkers() const;
63
65 bool hasWorkers() const;
66
67 private:
70
74 std::map<int, ProcType> m_processList;
76 bool m_hasEnded = false;
77
80
83
85 unsigned int processesWithType(const ProcType& procType) const;
86
88 template <class ASocket>
89 void processMulticast(const ASocket& socket);
90 };
92} // namespace Belle2
Class to monitor all started framework processes (input, workers, output), kill them if requested and...
void processMulticast(const ASocket &socket)
Process a message from the multicast.
bool m_hasEnded
Someone requested us to end the processing.
StreamHelper m_streamer
The data store streamer.
std::map< int, ProcType > m_processList
The current list of pid -> process types (to be compared to the proc handler)
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
void checkChildProcesses()
check the child processes, if one has died
bool m_receivedStatistics
Did we already receive the statistics?
void terminate()
Terminate the processing.
unsigned int m_requestedNumberOfWorkers
How many workers we should request to start.
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
bool hasWorkers() const
Check if there is at least one running worker.
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
unsigned int processesWithType(const ProcType &procType) const
Cound the number of processes with a certain type.
ZMQClient m_client
The client used for message processing.
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
void reset()
Reset the internal state.
Helper class for data store serialization.
Definition: StreamHelper.h:23
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:16
Abstract base class for different kinds of events.