Belle II Software  release-06-00-14
ProcessMonitor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/ProcessMonitor.h>
9 #include <framework/core/EventProcessor.h>
10 #include <framework/pcore/GlobalProcHandler.h>
11 
12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
13 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
14 
15 #include <thread>
16 #include <csignal>
17 
18 using namespace Belle2;
19 
20 void ProcessMonitor::subscribe(const std::string& pubSocketAddress, const std::string& subSocketAddress,
21  const std::string& controlSocketAddress)
22 {
24  m_client.reset();
25 
26  // The default will be to not do anything on signals...
28 
29  // We open a new context here in the new process
30  zmq::context_t context(1);
31 
32  zmq::socket_t pubSocket(context, ZMQ_XPUB);
33  // ATTENTION: this is switched on intention!
34  pubSocket.bind(subSocketAddress);
35  pubSocket.set(zmq::sockopt::linger, 0);
36 
37  zmq::socket_t subSocket(context, ZMQ_XSUB);
38  // ATTENTION: this is switched on intention!
39  subSocket.bind(pubSocketAddress);
40  subSocket.set(zmq::sockopt::linger, 0);
41 
42  zmq::socket_t controlSocket(context, ZMQ_SUB);
43  controlSocket.bind(controlSocketAddress);
44  controlSocket.set(zmq::sockopt::linger, 0);
45  controlSocket.set(zmq::sockopt::subscribe, "");
46 
47  B2DEBUG(10, "Will now start the proxy..");
48  bool running = true;
49  while (running) {
50  try {
51  zmq::socket_ref nullsocket;
52  zmq::proxy_steerable(pubSocket, subSocket, nullsocket, controlSocket);
53  running = false;
54  } catch (zmq::error_t& ex) {
55  if (ex.num() != EINTR) {
56  B2ERROR("There was an error during the proxy event: " << ex.what());
57  running = false;
58  }
59  }
60  }
61  controlSocket.close();
62  pubSocket.close();
63  subSocket.close();
64  context.close();
65  B2DEBUG(10, "Proxy has finished");
66  exit(0);
67  }
68 
69  // Time to setup the proxy
70  std::this_thread::sleep_for(std::chrono::milliseconds(500));
71 
72  m_streamer.initialize(0, true);
73  m_client.initialize<ZMQ_PUB>(pubSocketAddress, subSocketAddress, controlSocketAddress, false);
74  m_client.subscribe(EMessageTypes::c_helloMessage);
75  m_client.subscribe(EMessageTypes::c_statisticMessage);
76  m_client.subscribe(EMessageTypes::c_killWorkerMessage);
77  m_client.subscribe(EMessageTypes::c_goodbyeMessage);
78 
79  B2DEBUG(10, "Started multicast publishing on " << pubSocketAddress << " and subscribing on " << subSocketAddress);
80 }
81 
83 {
84  m_client.terminate(false);
85 }
86 
88 {
89  m_client.reset();
90 }
91 
92 void ProcessMonitor::killProcesses(unsigned int timeout)
93 {
94  B2ASSERT("Only the monitoring process is allowed to kill processes", GlobalProcHandler::isProcess(ProcType::c_Monitor)
96 
97  if (not m_processList.empty() and m_client.isOnline()) {
98  B2DEBUG(10, "Try to kill the processes gently...");
99  // Try to kill them gently...
100  auto pcbMulticastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
101  m_client.publish(std::move(pcbMulticastMessage));
102 
104 
105  const auto multicastAnswer = [this](const auto & socket) {
106  processMulticast(socket);
107  for (const auto& pair : m_processList) {
108  if (pair.second != ProcType::c_Stopped) {
109  B2DEBUG(10, "Process pid " << pair.first << " of type " << static_cast<char>(pair.second) << " is still alive");
110  return true;
111  }
112  }
113  return not m_receivedStatistics;
114  };
115 
116  bool allProcessesStopped = true;
117  for (const auto& pair : m_processList) {
118  if (pair.second != ProcType::c_Stopped) {
119  allProcessesStopped = false;
120  break;
121  }
122  }
123 
124  if (not allProcessesStopped) {
125  B2DEBUG(10, "Start waiting for processes to go down.");
126  m_client.pollMulticast(timeout * 1000, multicastAnswer);
127  B2DEBUG(10, "Finished waiting for processes to go down.");
128  }
129  }
130 
131  if (m_client.isOnline()) {
132  B2DEBUG(10, "Will kill the proxy now.");
133  // Kill the proxy and give it some time to terminate
134  auto message = ZMQMessageHelper::createZMQMessage("TERMINATE");
135  m_client.send(message);
136  std::this_thread::sleep_for(std::chrono::milliseconds(10));
137  }
138 
139  // If everything did not help, we will kill all of them
141 }
142 
143 void ProcessMonitor::waitForRunningInput(const int timeout)
144 {
146  const auto multicastAnswer = [this](const auto & socket) {
147  processMulticast(socket);
149  };
150 
151  const auto pullResult = m_client.pollMulticast(timeout * 1000, multicastAnswer);
152  if (not pullResult) {
153  B2ERROR("Input process did not start properly!");
154  m_hasEnded = true;
155  }
156  }
157 }
158 
160 {
162  const auto multicastAnswer = [this](const auto & socket) {
163  processMulticast(socket);
165  };
166 
167  const auto pullResult = m_client.pollMulticast(timeout * 1000, multicastAnswer);
168  if (not pullResult) {
169  B2ERROR("Worker process did not start properly!");
170  m_hasEnded = true;
171  }
172  }
173 }
174 
176 {
178  const auto multicastAnswer = [this](const auto & socket) {
179  processMulticast(socket);
181  };
182 
183  const auto pullResult = m_client.pollMulticast(timeout * 1000, multicastAnswer);
184  if (not pullResult) {
185  B2ERROR("Output process did not start properly!");
186  m_hasEnded = true;
187  }
188  }
189 }
190 
191 void ProcessMonitor::initialize(unsigned int requestedNumberOfWorkers)
192 {
193  m_requestedNumberOfWorkers = requestedNumberOfWorkers;
194 }
195 
197 {
198  if (hasEnded()) {
199  return;
200  }
201 
202  const auto multicastAnswer = [this](const auto & socket) {
203  processMulticast(socket);
204  return false;
205  };
206  m_client.pollMulticast(timeout * 1000, multicastAnswer);
207 }
208 
209 template <class ASocket>
210 void ProcessMonitor::processMulticast(const ASocket& socket)
211 {
212  auto pcbMulticastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
213  if (pcbMulticastMessage->isMessage(EMessageTypes::c_helloMessage)) {
214  const int pid = std::stoi(pcbMulticastMessage->getData());
215  const ProcType procType = GlobalProcHandler::getProcType(pid);
216  m_processList[pid] = procType;
217  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Input) << " input processes.");
218  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Output) << " output processes.");
219  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Worker) << " worker processes.");
220  } else if (pcbMulticastMessage->isMessage(EMessageTypes::c_goodbyeMessage)) {
221  const int pid = std::stoi(pcbMulticastMessage->getData());
222  const auto& processIt = m_processList.find(pid);
223  if (processIt == m_processList.end()) {
224  B2WARNING("An unknown PID died!");
225  return;
226  }
227  const ProcType procType = processIt->second;
228  if (procType == ProcType::c_Worker) {
230  B2DEBUG(10, "Now we will only need " << m_requestedNumberOfWorkers << " of workers anymore");
231  }
232  processIt->second = ProcType::c_Stopped;
233  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Input) << " input processes.");
234  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Output) << " output processes.");
235  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Worker) << " worker processes.");
236  } else if (pcbMulticastMessage->isMessage(EMessageTypes::c_killWorkerMessage)) {
237  const int workerPID = atoi(pcbMulticastMessage->getData().c_str());
238  B2DEBUG(10, "Got message to kill worker " << workerPID);
239  if (kill(workerPID, SIGKILL) == 0) {
240  B2WARNING("Needed to kill worker " << workerPID << " as it was requested.");
241  } else {
242  B2ERROR("Try to kill process " << workerPID << ", but process is already gone.");
243  }
244  // TODO: Do we want to decrease the number of workers here or later in the check of the processes?
245  } else if (pcbMulticastMessage->isMessage(EMessageTypes::c_statisticMessage)) {
246  m_streamer.read(std::move(pcbMulticastMessage));
247  B2DEBUG(10, "Having received the process statistics");
248  m_receivedStatistics = true;
249  }
250 }
251 
253 {
254  if (hasEnded()) {
255  return;
256  }
257 
258  // Copy is intended, as we do not want the signal handler to change our list
259  std::vector<int> currentProcessList = GlobalProcHandler::getPIDList();
260  // Check for processes, which where there last time but are gone now (so they died)
261  for (auto iter = m_processList.begin(); iter != m_processList.end();) {
262  const auto& pair = *iter;
263 
264  B2ASSERT("This pid should not be in our list!", pair.first != 0);
265  //once a process is gone from the global list, remove them from our own, too.
266  if (std::find(currentProcessList.begin(), currentProcessList.end(), pair.first) != currentProcessList.end()) {
267  ++iter;
268  continue;
269  }
270 
271  // if the process has gone down properly, it should now be set to "Stopped"
272  if (pair.second == ProcType::c_Input) {
273  B2ERROR("An input process has died unexpected! Need to go down.");
274  m_hasEnded = true;
275  return;
276  } else if (pair.second == ProcType::c_Output) {
277  B2ERROR("An output process has died unexpected! Need to go down.");
278  m_hasEnded = true;
279  return;
280  } else if (pair.second == ProcType::c_Proxy) {
281  B2ERROR("A proxy process has died unexpected! Need to go down.");
282  m_hasEnded = true;
283  return;
284  } else if (pair.second == ProcType::c_Worker) {
285  B2WARNING("A worker process has died unexpected. If you have requested, I will now restart the workers.");
286  B2ASSERT("A worker died but none was present?", processesWithType(ProcType::c_Worker) != 0);
287  auto pcbMulticastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage, pair.first);
288  m_client.publish(std::move(pcbMulticastMessage));
289  } else if (pair.second == ProcType::c_Stopped) {
290  B2DEBUG(10, "An children process has died expectedly.");
291  }
292 
293  iter = m_processList.erase(iter);
294  }
295 
296  m_hasEnded = false;
297  // The processing should be finished, if...
298  if (m_processList.empty()) {
299  // .. there is no process around anymore
300  m_hasEnded = true;
301  } else if (m_processList.size() == 1 and m_processList.begin()->second == ProcType::c_Output) {
302  // ... the single remaining process is an output process (can happen, if we do not use the event backup)
303  m_hasEnded = true;
304  } else {
305  // ... there are only workers or stopped processes around
306  m_hasEnded = true;
307  for (const auto& pair : m_processList) {
308  if (pair.second == ProcType::c_Input or pair.second == ProcType::c_Output) {
309  m_hasEnded = false;
310  break;
311  }
312  }
313  }
314 
315  if (m_hasEnded) {
316  B2DEBUG(10, "No input and no output process around. Will go home now!");
317  }
318 }
319 
320 void ProcessMonitor::checkSignals(int g_signalReceived)
321 {
322  if (g_signalReceived > 0) {
323  B2DEBUG(10, "Received signal " << g_signalReceived);
324  m_hasEnded = true;
325  }
326 }
327 
329 {
330  return m_hasEnded;
331 }
332 
334 {
336 }
337 
339 {
341  if (neededWorkers < 0) {
342  B2FATAL("Something went completely wrong here! I have more workers as requested...");
343  }
344  if (neededWorkers > 0) {
345  B2DEBUG(10, "I need to restart " << neededWorkers << " workers");
346  }
347  return static_cast<unsigned int>(neededWorkers);
348 }
349 
350 unsigned int ProcessMonitor::processesWithType(const ProcType& procType) const
351 {
352  auto correctProcType = [&procType](const auto & pair) {
353  return pair.second == procType;
354  };
355  return std::count_if(m_processList.begin(), m_processList.end(), correctProcType);
356 }
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
static ProcType getProcType(int pid)
Return the proc type of this process.
static const std::vector< int > & getPIDList()
Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the sign...
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
static bool startProxyProcess()
Fork and initialize a proxy process.
static void killAllProcesses()
Hard kill all processes.
void processMulticast(const ASocket &socket)
Process a message from the multicast.
bool m_hasEnded
Someone requested us to end the processing.
StreamHelper m_streamer
The data store streamer.
std::map< int, ProcType > m_processList
The current list of pid -> process types (to be compared to the proc handler)
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
void checkChildProcesses()
check the child processes, if one has died
bool m_receivedStatistics
Did we already receive the statistics?
void terminate()
Terminate the processing.
unsigned int m_requestedNumberOfWorkers
How many workers we should request to start.
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
bool hasWorkers() const
Check if there is at least one running worker.
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
unsigned int processesWithType(const ProcType &procType) const
Cound the number of processes with a certain type.
ZMQClient m_client
The client used for message processing.
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
void reset()
Reset the internal state.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:58
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:52
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
Definition: ZMQClient.cc:43
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:42
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:16
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:161
@ c_Proxy
Multicast Proxy Process.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Monitor
Monitoring Process.
@ c_Input
Input Process.
@ c_Stopped
The process is stopped/killed.
@ c_Init
Before the forks, the process is in init state.
Abstract base class for different kinds of events.