Belle II Software light-2406-ragdoll
ProcessMonitor Class Reference

Class to monitor all started framework processes (input, workers, output), kill them if requested and handle the signals from the OS. More...

#include <ProcessMonitor.h>

Collaboration diagram for ProcessMonitor:

Public Member Functions

void subscribe (const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
 Start listening for process management information on the given address.
 
void waitForRunningInput (int timeout)
 Block until either the input process is running or the timeout (in seconds) is raised.
 
void waitForRunningOutput (int timeout)
 Block until either the output process is running or the timeout (in seconds) is raised.
 
void waitForRunningWorker (int timeout)
 Block until either the worker process is running or the timeout (in seconds) is raised.
 
void killProcesses (unsigned int timeout)
 Ask all processes to terminate. If not, kill them after timeout seconds.
 
void initialize (unsigned int requestedNumberOfWorkers)
 Init the processing with that many workers.
 
void terminate ()
 Terminate the processing.
 
void reset ()
 Reset the internal state.
 
void checkMulticast (int timeout=0)
 check multicast for messages and kill workers if requested
 
void checkChildProcesses ()
 check the child processes, if one has died
 
void checkSignals (int g_signalReceived)
 check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
 
bool hasEnded () const
 If we have received a SIGINT signal or the last process is gone, we can end smoothly.
 
unsigned int needMoreWorkers () const
 Compare our current list of workers of how many we want to have.
 
bool hasWorkers () const
 Check if there is at least one running worker.
 

Private Member Functions

unsigned int processesWithType (const ProcType &procType) const
 Cound the number of processes with a certain type.
 
template<class ASocket >
void processMulticast (const ASocket &socket)
 Process a message from the multicast.
 

Private Attributes

ZMQClient m_client
 The client used for message processing.
 
unsigned int m_requestedNumberOfWorkers = 0
 How many workers we should request to start.
 
std::map< int, ProcTypem_processList
 The current list of pid -> process types (to be compared to the proc handler)
 
bool m_hasEnded = false
 Someone requested us to end the processing.
 
StreamHelper m_streamer
 The data store streamer.
 
bool m_receivedStatistics = false
 Did we already receive the statistics?
 

Detailed Description

Class to monitor all started framework processes (input, workers, output), kill them if requested and handle the signals from the OS.

Is used in the ZMQ event processor and can only be used together with ZMQ.

Definition at line 28 of file ProcessMonitor.h.

Member Function Documentation

◆ checkChildProcesses()

void checkChildProcesses ( )

check the child processes, if one has died

Definition at line 266 of file ProcessMonitor.cc.

267{
268 if (hasEnded()) {
269 return;
270 }
271
272 // Copy is intended, as we do not want the signal handler to change our list
273 std::vector<int> currentProcessList = GlobalProcHandler::getPIDList();
274 // Check for processes, which where there last time but are gone now (so they died)
275 for (auto iter = m_processList.begin(); iter != m_processList.end();) {
276 const auto& pair = *iter;
277
278 B2ASSERT("This pid should not be in our list!", pair.first != 0);
279 //once a process is gone from the global list, remove them from our own, too.
280 if (std::find(currentProcessList.begin(), currentProcessList.end(), pair.first) != currentProcessList.end()) {
281 ++iter;
282 continue;
283 }
284
285 // if the process has gone down properly, it should now be set to "Stopped"
286 if (pair.second == ProcType::c_Input) {
287 B2ERROR("An input process has died unexpected! Need to go down.");
288 m_hasEnded = true;
289 return;
290 } else if (pair.second == ProcType::c_Output) {
291 B2ERROR("An output process has died unexpected! Need to go down.");
292 m_hasEnded = true;
293 return;
294 } else if (pair.second == ProcType::c_Proxy) {
295 B2ERROR("A proxy process has died unexpected! Need to go down.");
296 m_hasEnded = true;
297 return;
298 } else if (pair.second == ProcType::c_Worker) {
299 // B2WARNING("A worker process has died unexpected. If you have requested, I will now restart the workers.");
300 B2ERROR("A worker process has died unexpectedly.");
301 B2ASSERT("A worker died but none was present?", processesWithType(ProcType::c_Worker) != 0);
302 auto pcbMulticastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage, pair.first);
303 m_client.publish(std::move(pcbMulticastMessage));
304 } else if (pair.second == ProcType::c_Stopped) {
305 B2DEBUG(30, "An children process has died expectedly.");
306 }
307
308 iter = m_processList.erase(iter);
309 }
310
311 m_hasEnded = false;
312 // The processing should be finished, if...
313 if (m_processList.empty()) {
314 // .. there is no process around anymore
315 m_hasEnded = true;
316 } else if (m_processList.size() == 1 and m_processList.begin()->second == ProcType::c_Output) {
317 // ... the single remaining process is an output process (can happen, if we do not use the event backup)
318 m_hasEnded = true;
319 } else {
320 // ... there are only workers or stopped processes around
321 m_hasEnded = true;
322 for (const auto& pair : m_processList) {
323 if (pair.second == ProcType::c_Input or pair.second == ProcType::c_Output) {
324 m_hasEnded = false;
325 break;
326 }
327 }
328 }
329
330 if (m_hasEnded) {
331 B2DEBUG(30, "No input and no output process around. Will go home now!");
332 }
333}
static const std::vector< int > & getPIDList()
Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the sign...
bool m_hasEnded
Someone requested us to end the processing.
std::map< int, ProcType > m_processList
The current list of pid -> process types (to be compared to the proc handler)
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
unsigned int processesWithType(const ProcType &procType) const
Cound the number of processes with a certain type.
ZMQClient m_client
The client used for message processing.
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
@ c_Proxy
Multicast Proxy Process.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Input
Input Process.
@ c_Stopped
The process is stopped/killed.

◆ checkMulticast()

void checkMulticast ( int  timeout = 0)

check multicast for messages and kill workers if requested

Definition at line 210 of file ProcessMonitor.cc.

211{
212 if (hasEnded()) {
213 return;
214 }
215
216 const auto multicastAnswer = [this](const auto & socket) {
217 processMulticast(socket);
218 return false;
219 };
220 m_client.pollMulticast(timeout * 1000, multicastAnswer);
221}
void processMulticast(const ASocket &socket)
Process a message from the multicast.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:162

◆ checkSignals()

void checkSignals ( int  g_signalReceived)

check if we have received any signal from the user or OS. Kill the processes if not SIGINT.

Definition at line 335 of file ProcessMonitor.cc.

336{
337 if (g_signalReceived > 0) {
338 B2DEBUG(30, "Received signal " << g_signalReceived);
339 m_hasEnded = true;
340 }
341}

◆ hasEnded()

bool hasEnded ( ) const

If we have received a SIGINT signal or the last process is gone, we can end smoothly.

Definition at line 343 of file ProcessMonitor.cc.

344{
345 return m_hasEnded;
346}

◆ hasWorkers()

bool hasWorkers ( ) const

Check if there is at least one running worker.

Definition at line 348 of file ProcessMonitor.cc.

349{
351}

◆ initialize()

void initialize ( unsigned int  requestedNumberOfWorkers)

Init the processing with that many workers.

Definition at line 205 of file ProcessMonitor.cc.

206{
207 m_requestedNumberOfWorkers = requestedNumberOfWorkers;
208}
unsigned int m_requestedNumberOfWorkers
How many workers we should request to start.

◆ killProcesses()

void killProcesses ( unsigned int  timeout)

Ask all processes to terminate. If not, kill them after timeout seconds.

Definition at line 106 of file ProcessMonitor.cc.

107{
108 B2ASSERT("Only the monitoring process is allowed to kill processes", GlobalProcHandler::isProcess(ProcType::c_Monitor)
110
111 if (not m_processList.empty() and m_client.isOnline()) {
112 B2DEBUG(30, "Try to kill the processes gently...");
113 // Try to kill them gently...
114 auto pcbMulticastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
115 m_client.publish(std::move(pcbMulticastMessage));
116
118
119 const auto multicastAnswer = [this](const auto & socket) {
120 processMulticast(socket);
121 for (const auto& pair : m_processList) {
122 if (pair.second != ProcType::c_Stopped) {
123 B2DEBUG(30, "Process pid " << pair.first << " of type " << static_cast<char>(pair.second) << " is still alive");
124 return true;
125 }
126 }
127 return not m_receivedStatistics;
128 };
129
130 bool allProcessesStopped = true;
131 for (const auto& pair : m_processList) {
132 if (pair.second != ProcType::c_Stopped) {
133 allProcessesStopped = false;
134 break;
135 }
136 }
137
138 if (not allProcessesStopped) {
139 B2DEBUG(30, "Start waiting for processes to go down.");
140 m_client.pollMulticast(timeout, multicastAnswer);
141 B2DEBUG(30, "Finished waiting for processes to go down.");
142 }
143 }
144
145 if (m_client.isOnline()) {
146 B2DEBUG(30, "Will kill the proxy now.");
147 // Kill the proxy and give it some time to terminate
148 auto message = ZMQMessageHelper::createZMQMessage("TERMINATE");
149 m_client.send(message);
150 std::this_thread::sleep_for(std::chrono::milliseconds(10));
151 }
152
153 // If everything did not help, we will kill all of them
155}
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
static void killAllProcesses()
Hard kill all processes.
void checkChildProcesses()
check the child processes, if one has died
bool m_receivedStatistics
Did we already receive the statistics?
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:59
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
@ c_Monitor
Monitoring Process.
@ c_Init
Before the forks, the process is in init state.

◆ needMoreWorkers()

unsigned int needMoreWorkers ( ) const

Compare our current list of workers of how many we want to have.

Definition at line 353 of file ProcessMonitor.cc.

354{
356 if (neededWorkers < 0) {
357 B2FATAL("Something went completely wrong here! I have more workers as requested...");
358 }
359 if (neededWorkers > 0) {
360 B2DEBUG(30, "I need to restart " << neededWorkers << " workers");
361 }
362 return static_cast<unsigned int>(neededWorkers);
363}

◆ processesWithType()

unsigned int processesWithType ( const ProcType procType) const
private

Cound the number of processes with a certain type.

Definition at line 365 of file ProcessMonitor.cc.

366{
367 auto correctProcType = [&procType](const auto & pair) {
368 return pair.second == procType;
369 };
370 return std::count_if(m_processList.begin(), m_processList.end(), correctProcType);
371}

◆ processMulticast()

void processMulticast ( const ASocket &  socket)
private

Process a message from the multicast.

Definition at line 224 of file ProcessMonitor.cc.

225{
226 auto pcbMulticastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
227 if (pcbMulticastMessage->isMessage(EMessageTypes::c_helloMessage)) {
228 const int pid = std::stoi(pcbMulticastMessage->getData());
229 const ProcType procType = GlobalProcHandler::getProcType(pid);
230 m_processList[pid] = procType;
231 B2DEBUG(30, "Now having " << processesWithType(ProcType::c_Input) << " input processes.");
232 B2DEBUG(30, "Now having " << processesWithType(ProcType::c_Output) << " output processes.");
233 B2DEBUG(30, "Now having " << processesWithType(ProcType::c_Worker) << " worker processes.");
234 } else if (pcbMulticastMessage->isMessage(EMessageTypes::c_goodbyeMessage)) {
235 const int pid = std::stoi(pcbMulticastMessage->getData());
236 const auto& processIt = m_processList.find(pid);
237 if (processIt == m_processList.end()) {
238 B2WARNING("An unknown PID died!");
239 return;
240 }
241 const ProcType procType = processIt->second;
242 if (procType == ProcType::c_Worker) {
244 B2DEBUG(30, "Now we will only need " << m_requestedNumberOfWorkers << " of workers anymore");
245 }
246 processIt->second = ProcType::c_Stopped;
247 B2DEBUG(30, "Now having " << processesWithType(ProcType::c_Input) << " input processes.");
248 B2DEBUG(30, "Now having " << processesWithType(ProcType::c_Output) << " output processes.");
249 B2DEBUG(30, "Now having " << processesWithType(ProcType::c_Worker) << " worker processes.");
250 } else if (pcbMulticastMessage->isMessage(EMessageTypes::c_killWorkerMessage)) {
251 const int workerPID = atoi(pcbMulticastMessage->getData().c_str());
252 B2DEBUG(30, "Got message to kill worker " << workerPID);
253 if (kill(workerPID, SIGKILL) == 0) {
254 B2WARNING("Needed to kill worker " << workerPID << " as it was requested.");
255 } else {
256 B2ERROR("Try to kill process " << workerPID << ", but process is already gone.");
257 }
258 // TODO: Do we want to decrease the number of workers here or later in the check of the processes?
259 } else if (pcbMulticastMessage->isMessage(EMessageTypes::c_statisticMessage)) {
260 m_streamer.read(std::move(pcbMulticastMessage));
261 B2DEBUG(30, "Having received the process statistics");
263 }
264}
static ProcType getProcType(int pid)
Return the proc type of this process.
StreamHelper m_streamer
The data store streamer.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:16

◆ reset()

void reset ( )

Reset the internal state.

Definition at line 101 of file ProcessMonitor.cc.

102{
103 m_client.reset();
104}
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
Definition: ZMQClient.cc:43

◆ subscribe()

void subscribe ( const std::string &  pubSocketAddress,
const std::string &  subSocketAddress,
const std::string &  controlSocketAddress 
)

Start listening for process management information on the given address.

Definition at line 24 of file ProcessMonitor.cc.

26{
28 B2DEBUG(30, "ProcessMonitor starting.....");
30
31 // The default will be to not do anything on signals...
33
34 // We open a new context here in the new process
35 zmq::context_t context(1);
36
37 // current directory
38 char currentdir[256];
39 getcwd(currentdir, 256);
40
41 zmq::socket_t pubSocket(context, ZMQ_XPUB);
42 // ATTENTION: this is switched on intention!
43 try {
44 pubSocket.bind(subSocketAddress);
45 } catch (zmq::error_t& err) {
46 B2DEBUG(30, "ZMQ bind error!! " + std::string(err.what()));
47 }
48
49 pubSocket.set(zmq::sockopt::linger, 0);
50
51 zmq::socket_t subSocket(context, ZMQ_XSUB);
52 // ATTENTION: this is switched on intention!
53 subSocket.bind(pubSocketAddress);
54 subSocket.set(zmq::sockopt::linger, 0);
55
56 zmq::socket_t controlSocket(context, ZMQ_SUB);
57 controlSocket.bind(controlSocketAddress);
58 controlSocket.set(zmq::sockopt::linger, 0);
59 controlSocket.set(zmq::sockopt::subscribe, "");
60
61 B2DEBUG(30, "Will now start the proxy..");
62 bool running = true;
63 while (running) {
64 try {
65 zmq::socket_ref nullsocket;
66 zmq::proxy_steerable(pubSocket, subSocket, nullsocket, controlSocket);
67 running = false;
68 } catch (zmq::error_t& ex) {
69 if (ex.num() != EINTR) {
70 B2ERROR("There was an error during the proxy event: " << ex.what());
71 running = false;
72 }
73 }
74 }
75 controlSocket.close();
76 pubSocket.close();
77 subSocket.close();
78 context.close();
79 B2DEBUG(30, "Proxy has finished");
80 exit(0);
81 }
82
83 // Time to setup the proxy
84 std::this_thread::sleep_for(std::chrono::milliseconds(500));
85
86 m_streamer.initialize(0, true);
87 m_client.initialize<ZMQ_PUB>(pubSocketAddress, subSocketAddress, controlSocketAddress, false);
88 m_client.subscribe(EMessageTypes::c_helloMessage);
89 m_client.subscribe(EMessageTypes::c_statisticMessage);
90 m_client.subscribe(EMessageTypes::c_killWorkerMessage);
91 m_client.subscribe(EMessageTypes::c_goodbyeMessage);
92
93 B2DEBUG(30, "Started multicast publishing on " << pubSocketAddress << " and subscribing on " << subSocketAddress);
94}
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
static bool startProxyProcess()
Fork and initialize a proxy process.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99

◆ terminate()

void terminate ( )

Terminate the processing.

Definition at line 96 of file ProcessMonitor.cc.

97{
98 m_client.terminate(false);
99}
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18

◆ waitForRunningInput()

void waitForRunningInput ( int  timeout)

Block until either the input process is running or the timeout (in seconds) is raised.

Definition at line 157 of file ProcessMonitor.cc.

158{
160 const auto multicastAnswer = [this](const auto & socket) {
161 processMulticast(socket);
163 };
164
165 const auto pullResult = m_client.pollMulticast(timeout, multicastAnswer);
166 if (not pullResult) {
167 B2ERROR("Input process did not start properly!");
168 m_hasEnded = true;
169 }
170 }
171}

◆ waitForRunningOutput()

void waitForRunningOutput ( int  timeout)

Block until either the output process is running or the timeout (in seconds) is raised.

Definition at line 189 of file ProcessMonitor.cc.

190{
192 const auto multicastAnswer = [this](const auto & socket) {
193 processMulticast(socket);
195 };
196
197 const auto pullResult = m_client.pollMulticast(timeout, multicastAnswer);
198 if (not pullResult) {
199 B2ERROR("Output process did not start properly!");
200 m_hasEnded = true;
201 }
202 }
203}

◆ waitForRunningWorker()

void waitForRunningWorker ( int  timeout)

Block until either the worker process is running or the timeout (in seconds) is raised.

Definition at line 173 of file ProcessMonitor.cc.

174{
176 const auto multicastAnswer = [this](const auto & socket) {
177 processMulticast(socket);
179 };
180
181 const auto pullResult = m_client.pollMulticast(timeout, multicastAnswer);
182 if (not pullResult) {
183 B2ERROR("Worker process did not start properly!");
184 m_hasEnded = true;
185 }
186 }
187}

Member Data Documentation

◆ m_client

ZMQClient m_client
private

The client used for message processing.

Definition at line 69 of file ProcessMonitor.h.

◆ m_hasEnded

bool m_hasEnded = false
private

Someone requested us to end the processing.

Definition at line 76 of file ProcessMonitor.h.

◆ m_processList

std::map<int, ProcType> m_processList
private

The current list of pid -> process types (to be compared to the proc handler)

Definition at line 74 of file ProcessMonitor.h.

◆ m_receivedStatistics

bool m_receivedStatistics = false
private

Did we already receive the statistics?

Definition at line 82 of file ProcessMonitor.h.

◆ m_requestedNumberOfWorkers

unsigned int m_requestedNumberOfWorkers = 0
private

How many workers we should request to start.

Definition at line 72 of file ProcessMonitor.h.

◆ m_streamer

StreamHelper m_streamer
private

The data store streamer.

Definition at line 79 of file ProcessMonitor.h.


The documentation for this class was generated from the following files: