Belle II Software  release-05-02-19
ProcessMonitor.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Anselm Baur, Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <framework/pcore/ProcHelper.h>
13 #include <framework/pcore/zmq/sockets/ZMQClient.h>
14 #include <framework/pcore/zmq/utils/StreamHelper.h>
15 
16 #include <string>
17 #include <map>
18 
19 namespace Belle2 {
30  class ProcessMonitor {
31  public:
33  void subscribe(const std::string& pubSocketAddress, const std::string& subSocketAddress,
34  const std::string& controlSocketAddress);
36  void waitForRunningInput(int timeout);
38  void waitForRunningOutput(int timeout);
40  void waitForRunningWorker(int timeout);
42  void killProcesses(unsigned int timeout);
43 
45  void initialize(unsigned int requestedNumberOfWorkers);
46 
48  void terminate();
49 
51  void reset();
52 
54  void checkMulticast(int timeout = 0);
56  void checkChildProcesses();
58  void checkSignals(int g_signalReceived);
59 
61  bool hasEnded() const;
62 
64  unsigned int needMoreWorkers() const;
65 
67  bool hasWorkers() const;
68 
69  private:
72 
74  unsigned int m_requestedNumberOfWorkers = 0;
76  std::map<int, ProcType> m_processList;
78  bool m_hasEnded = false;
79 
82 
84  bool m_receivedStatistics = false;
85 
87  unsigned int processesWithType(const ProcType& procType) const;
88 
90  template <class ASocket>
91  void processMulticast(const ASocket& socket);
92  };
94 } // namespace Belle2
Belle2::ProcType
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:9
Belle2::ProcessMonitor::m_hasEnded
bool m_hasEnded
Someone requested us to end the processing.
Definition: ProcessMonitor.h:86
Belle2::ProcessMonitor::m_processList
std::map< int, ProcType > m_processList
The current list of pid -> process types (to be compared to the proc handler)
Definition: ProcessMonitor.h:84
Belle2::ProcessMonitor::subscribe
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
Definition: ProcessMonitor.cc:22
Belle2::ProcessMonitor::initialize
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
Definition: ProcessMonitor.cc:192
Belle2::ProcessMonitor::m_receivedStatistics
bool m_receivedStatistics
Did we already receive the statistics?
Definition: ProcessMonitor.h:92
Belle2::ProcessMonitor::hasWorkers
bool hasWorkers() const
Check if there is at least one running worker.
Definition: ProcessMonitor.cc:334
Belle2::ProcessMonitor::m_requestedNumberOfWorkers
unsigned int m_requestedNumberOfWorkers
How many workers we should request to start.
Definition: ProcessMonitor.h:82
Belle2::ProcessMonitor::terminate
void terminate()
Terminate the processing.
Definition: ProcessMonitor.cc:83
Belle2::StreamHelper
Helper class for data store serialization.
Definition: StreamHelper.h:33
Belle2::ProcessMonitor::waitForRunningInput
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
Definition: ProcessMonitor.cc:144
Belle2::ProcessMonitor::reset
void reset()
Reset the internal state.
Definition: ProcessMonitor.cc:88
Belle2::ProcessMonitor::processesWithType
unsigned int processesWithType(const ProcType &procType) const
Cound the number of processes with a certain type.
Definition: ProcessMonitor.cc:351
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ProcessMonitor::processMulticast
void processMulticast(const ASocket &socket)
Process a message from the multicast.
Definition: ProcessMonitor.cc:211
Belle2::ProcessMonitor::checkMulticast
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
Definition: ProcessMonitor.cc:197
Belle2::ProcessMonitor::needMoreWorkers
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
Definition: ProcessMonitor.cc:339
Belle2::ProcessMonitor::waitForRunningWorker
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.
Definition: ProcessMonitor.cc:160
Belle2::ProcessMonitor::checkSignals
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
Definition: ProcessMonitor.cc:321
Belle2::ProcessMonitor::hasEnded
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
Definition: ProcessMonitor.cc:329
Belle2::ProcessMonitor::waitForRunningOutput
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
Definition: ProcessMonitor.cc:176
Belle2::ProcessMonitor::killProcesses
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
Definition: ProcessMonitor.cc:93
Belle2::ZMQClient
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:32
Belle2::ProcessMonitor::m_client
ZMQClient m_client
The client used for message processing.
Definition: ProcessMonitor.h:79
Belle2::ProcessMonitor::m_streamer
StreamHelper m_streamer
The data store streamer.
Definition: ProcessMonitor.h:89
Belle2::ProcessMonitor::checkChildProcesses
void checkChildProcesses()
check the child processes, if one has died
Definition: ProcessMonitor.cc:253