Belle II Software  release-05-02-19
ProcessMonitor.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Anselm Baur, Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <framework/pcore/ProcessMonitor.h>
11 #include <framework/core/EventProcessor.h>
12 #include <framework/pcore/GlobalProcHandler.h>
13 
14 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
15 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
16 
17 #include <thread>
18 #include <csignal>
19 
20 using namespace Belle2;
21 
22 void ProcessMonitor::subscribe(const std::string& pubSocketAddress, const std::string& subSocketAddress,
23  const std::string& controlSocketAddress)
24 {
26  m_client.reset();
27 
28  // The default will be to not do anything on signals...
30 
31  // We open a new context here in the new process
32  zmq::context_t context(1);
33 
34  zmq::socket_t pubSocket(context, ZMQ_XPUB);
35  // ATTENTION: this is switched on intention!
36  pubSocket.bind(subSocketAddress);
37  pubSocket.setsockopt(ZMQ_LINGER, 0);
38 
39  zmq::socket_t subSocket(context, ZMQ_XSUB);
40  // ATTENTION: this is switched on intention!
41  subSocket.bind(pubSocketAddress);
42  subSocket.setsockopt(ZMQ_LINGER, 0);
43 
44  zmq::socket_t controlSocket(context, ZMQ_SUB);
45  controlSocket.bind(controlSocketAddress);
46  controlSocket.setsockopt(ZMQ_LINGER, 0);
47  controlSocket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
48 
49  B2DEBUG(10, "Will now start the proxy..");
50  bool running = true;
51  while (running) {
52  try {
53  zmq::proxy_steerable(pubSocket, subSocket, nullptr, controlSocket);
54  running = false;
55  } catch (zmq::error_t& ex) {
56  if (ex.num() != EINTR) {
57  B2ERROR("There was an error during the proxy event: " << ex.what());
58  running = false;
59  }
60  }
61  }
62  controlSocket.close();
63  pubSocket.close();
64  subSocket.close();
65  context.close();
66  B2DEBUG(10, "Proxy has finished");
67  exit(0);
68  }
69 
70  // Time to setup the proxy
71  std::this_thread::sleep_for(std::chrono::milliseconds(500));
72 
73  m_streamer.initialize(0, true);
74  m_client.initialize<ZMQ_PUB>(pubSocketAddress, subSocketAddress, controlSocketAddress, false);
75  m_client.subscribe(EMessageTypes::c_helloMessage);
76  m_client.subscribe(EMessageTypes::c_statisticMessage);
77  m_client.subscribe(EMessageTypes::c_killWorkerMessage);
78  m_client.subscribe(EMessageTypes::c_goodbyeMessage);
79 
80  B2DEBUG(10, "Started multicast publishing on " << pubSocketAddress << " and subscribing on " << subSocketAddress);
81 }
82 
84 {
85  m_client.terminate(false);
86 }
87 
89 {
90  m_client.reset();
91 }
92 
93 void ProcessMonitor::killProcesses(unsigned int timeout)
94 {
95  B2ASSERT("Only the monitoring process is allowed to kill processes", GlobalProcHandler::isProcess(ProcType::c_Monitor)
97 
98  if (not m_processList.empty() and m_client.isOnline()) {
99  B2DEBUG(10, "Try to kill the processes gently...");
100  // Try to kill them gently...
101  auto pcbMulticastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
102  m_client.publish(std::move(pcbMulticastMessage));
103 
105 
106  const auto multicastAnswer = [this](const auto & socket) {
107  processMulticast(socket);
108  for (const auto& pair : m_processList) {
109  if (pair.second != ProcType::c_Stopped) {
110  B2DEBUG(10, "Process pid " << pair.first << " of type " << static_cast<char>(pair.second) << " is still alive");
111  return true;
112  }
113  }
114  return not m_receivedStatistics;
115  };
116 
117  bool allProcessesStopped = true;
118  for (const auto& pair : m_processList) {
119  if (pair.second != ProcType::c_Stopped) {
120  allProcessesStopped = false;
121  break;
122  }
123  }
124 
125  if (not allProcessesStopped) {
126  B2DEBUG(10, "Start waiting for processes to go down.");
127  m_client.pollMulticast(timeout * 1000, multicastAnswer);
128  B2DEBUG(10, "Finished waiting for processes to go down.");
129  }
130  }
131 
132  if (m_client.isOnline()) {
133  B2DEBUG(10, "Will kill the proxy now.");
134  // Kill the proxy and give it some time to terminate
135  auto message = ZMQMessageHelper::createZMQMessage("TERMINATE");
136  m_client.send(message);
137  std::this_thread::sleep_for(std::chrono::milliseconds(10));
138  }
139 
140  // If everything did not help, we will kill all of them
142 }
143 
144 void ProcessMonitor::waitForRunningInput(const int timeout)
145 {
147  const auto multicastAnswer = [this](const auto & socket) {
148  processMulticast(socket);
150  };
151 
152  const auto pullResult = m_client.pollMulticast(timeout * 1000, multicastAnswer);
153  if (not pullResult) {
154  B2ERROR("Input process did not start properly!");
155  m_hasEnded = true;
156  }
157  }
158 }
159 
161 {
163  const auto multicastAnswer = [this](const auto & socket) {
164  processMulticast(socket);
166  };
167 
168  const auto pullResult = m_client.pollMulticast(timeout * 1000, multicastAnswer);
169  if (not pullResult) {
170  B2ERROR("Worker process did not start properly!");
171  m_hasEnded = true;
172  }
173  }
174 }
175 
177 {
179  const auto multicastAnswer = [this](const auto & socket) {
180  processMulticast(socket);
182  };
183 
184  const auto pullResult = m_client.pollMulticast(timeout * 1000, multicastAnswer);
185  if (not pullResult) {
186  B2ERROR("Output process did not start properly!");
187  m_hasEnded = true;
188  }
189  }
190 }
191 
192 void ProcessMonitor::initialize(unsigned int requestedNumberOfWorkers)
193 {
194  m_requestedNumberOfWorkers = requestedNumberOfWorkers;
195 }
196 
198 {
199  if (hasEnded()) {
200  return;
201  }
202 
203  const auto multicastAnswer = [this](const auto & socket) {
204  processMulticast(socket);
205  return false;
206  };
207  m_client.pollMulticast(timeout * 1000, multicastAnswer);
208 }
209 
210 template <class ASocket>
211 void ProcessMonitor::processMulticast(const ASocket& socket)
212 {
213  auto pcbMulticastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
214  if (pcbMulticastMessage->isMessage(EMessageTypes::c_helloMessage)) {
215  const int pid = std::stoi(pcbMulticastMessage->getData());
216  const ProcType procType = GlobalProcHandler::getProcType(pid);
217  m_processList[pid] = procType;
218  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Input) << " input processes.");
219  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Output) << " output processes.");
220  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Worker) << " worker processes.");
221  } else if (pcbMulticastMessage->isMessage(EMessageTypes::c_goodbyeMessage)) {
222  const int pid = std::stoi(pcbMulticastMessage->getData());
223  const auto& processIt = m_processList.find(pid);
224  if (processIt == m_processList.end()) {
225  B2WARNING("An unknown PID died!");
226  return;
227  }
228  const ProcType procType = processIt->second;
229  if (procType == ProcType::c_Worker) {
231  B2DEBUG(10, "Now we will only need " << m_requestedNumberOfWorkers << " of workers anymore");
232  }
233  processIt->second = ProcType::c_Stopped;
234  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Input) << " input processes.");
235  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Output) << " output processes.");
236  B2DEBUG(10, "Now having " << processesWithType(ProcType::c_Worker) << " worker processes.");
237  } else if (pcbMulticastMessage->isMessage(EMessageTypes::c_killWorkerMessage)) {
238  const int workerPID = atoi(pcbMulticastMessage->getData().c_str());
239  B2DEBUG(10, "Got message to kill worker " << workerPID);
240  if (kill(workerPID, SIGKILL) == 0) {
241  B2WARNING("Needed to kill worker " << workerPID << " as it was requested.");
242  } else {
243  B2ERROR("Try to kill process " << workerPID << ", but process is already gone.");
244  }
245  // TODO: Do we want to decrease the number of workers here or later in the check of the processes?
246  } else if (pcbMulticastMessage->isMessage(EMessageTypes::c_statisticMessage)) {
247  m_streamer.read(std::move(pcbMulticastMessage));
248  B2DEBUG(10, "Having received the process statistics");
249  m_receivedStatistics = true;
250  }
251 }
252 
254 {
255  if (hasEnded()) {
256  return;
257  }
258 
259  // Copy is intended, as we do not want the signal handler to change our list
260  std::vector<int> currentProcessList = GlobalProcHandler::getPIDList();
261  // Check for processes, which where there last time but are gone now (so they died)
262  for (auto iter = m_processList.begin(); iter != m_processList.end();) {
263  const auto& pair = *iter;
264 
265  B2ASSERT("This pid should not be in our list!", pair.first != 0);
266  //once a process is gone from the global list, remove them from our own, too.
267  if (std::find(currentProcessList.begin(), currentProcessList.end(), pair.first) != currentProcessList.end()) {
268  ++iter;
269  continue;
270  }
271 
272  // if the process has gone down properly, it should now be set to "Stopped"
273  if (pair.second == ProcType::c_Input) {
274  B2ERROR("An input process has died unexpected! Need to go down.");
275  m_hasEnded = true;
276  return;
277  } else if (pair.second == ProcType::c_Output) {
278  B2ERROR("An output process has died unexpected! Need to go down.");
279  m_hasEnded = true;
280  return;
281  } else if (pair.second == ProcType::c_Proxy) {
282  B2ERROR("A proxy process has died unexpected! Need to go down.");
283  m_hasEnded = true;
284  return;
285  } else if (pair.second == ProcType::c_Worker) {
286  B2WARNING("A worker process has died unexpected. If you have requested, I will now restart the workers.");
287  B2ASSERT("A worker died but none was present?", processesWithType(ProcType::c_Worker) != 0);
288  auto pcbMulticastMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage, pair.first);
289  m_client.publish(std::move(pcbMulticastMessage));
290  } else if (pair.second == ProcType::c_Stopped) {
291  B2DEBUG(10, "An children process has died expectedly.");
292  }
293 
294  iter = m_processList.erase(iter);
295  }
296 
297  m_hasEnded = false;
298  // The processing should be finished, if...
299  if (m_processList.empty()) {
300  // .. there is no process around anymore
301  m_hasEnded = true;
302  } else if (m_processList.size() == 1 and m_processList.begin()->second == ProcType::c_Output) {
303  // ... the single remaining process is an output process (can happen, if we do not use the event backup)
304  m_hasEnded = true;
305  } else {
306  // ... there are only workers or stopped processes around
307  m_hasEnded = true;
308  for (const auto& pair : m_processList) {
309  if (pair.second == ProcType::c_Input or pair.second == ProcType::c_Output) {
310  m_hasEnded = false;
311  break;
312  }
313  }
314  }
315 
316  if (m_hasEnded) {
317  B2DEBUG(10, "No input and no output process around. Will go home now!");
318  }
319 }
320 
321 void ProcessMonitor::checkSignals(int g_signalReceived)
322 {
323  if (g_signalReceived > 0) {
324  B2DEBUG(10, "Received signal " << g_signalReceived);
325  m_hasEnded = true;
326  }
327 }
328 
330 {
331  return m_hasEnded;
332 }
333 
335 {
337 }
338 
340 {
342  if (neededWorkers < 0) {
343  B2FATAL("Something went completely wrong here! I have more workers as requested...");
344  }
345  if (neededWorkers > 0) {
346  B2DEBUG(10, "I need to restart " << neededWorkers << " workers");
347  }
348  return static_cast<unsigned int>(neededWorkers);
349 }
350 
351 unsigned int ProcessMonitor::processesWithType(const ProcType& procType) const
352 {
353  auto correctProcType = [&procType](const auto & pair) {
354  return pair.second == procType;
355  };
356  return std::count_if(m_processList.begin(), m_processList.end(), correctProcType);
357 }
Belle2::ProcType
ProcType
Type of the process used for storing and mapping the child processes in the process handler.
Definition: ProcHelper.h:9
Belle2::ProcType::c_Monitor
@ c_Monitor
Monitoring Process.
Belle2::ProcessMonitor::m_hasEnded
bool m_hasEnded
Someone requested us to end the processing.
Definition: ProcessMonitor.h:86
Belle2::ProcType::c_Output
@ c_Output
Output Process.
Belle2::ZMQClient::terminate
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:20
Belle2::StreamHelper::initialize
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:20
Belle2::GlobalProcHandler::getPIDList
static const std::vector< int > & getPIDList()
Return the PID list handled by the running GlobalProcHandler. This PID list is updated using the sign...
Definition: GlobalProcHandler.cc:255
Belle2::ProcessMonitor::m_processList
std::map< int, ProcType > m_processList
The current list of pid -> process types (to be compared to the proc handler)
Definition: ProcessMonitor.h:84
Belle2::ProcessMonitor::subscribe
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
Definition: ProcessMonitor.cc:22
Belle2::ProcessMonitor::initialize
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
Definition: ProcessMonitor.cc:192
Belle2::ProcessMonitor::m_receivedStatistics
bool m_receivedStatistics
Did we already receive the statistics?
Definition: ProcessMonitor.h:92
Belle2::ZMQMessageHelper::createZMQMessage
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
Definition: ZMQMessageHelper.h:39
Belle2::ProcessMonitor::hasWorkers
bool hasWorkers() const
Check if there is at least one running worker.
Definition: ProcessMonitor.cc:334
Belle2::GlobalProcHandler::getProcType
static ProcType getProcType(int pid)
Return the proc type of this process.
Definition: GlobalProcHandler.cc:260
Belle2::GlobalProcHandler::isProcess
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
Definition: GlobalProcHandler.cc:177
Belle2::ProcessMonitor::m_requestedNumberOfWorkers
unsigned int m_requestedNumberOfWorkers
How many workers we should request to start.
Definition: ProcessMonitor.h:82
Belle2::ProcessMonitor::terminate
void terminate()
Terminate the processing.
Definition: ProcessMonitor.cc:83
Belle2::GlobalProcHandler::startProxyProcess
static bool startProxyProcess()
Fork and initialize a proxy process.
Definition: GlobalProcHandler.cc:136
Belle2::ZMQClient::subscribe
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:101
Belle2::ProcType::c_Init
@ c_Init
Before the forks, the process is in init state.
Belle2::ProcType::c_Stopped
@ c_Stopped
The process is stopped/killed.
Belle2::ZMQClient::isOnline
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:68
Belle2::ProcessMonitor::waitForRunningInput
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
Definition: ProcessMonitor.cc:144
Belle2::ZMQClient::pollMulticast
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:171
Belle2::ProcType::c_Input
@ c_Input
Input Process.
Belle2::ProcType::c_Worker
@ c_Worker
Worker/Reconstruction Process.
Belle2::ProcessMonitor::reset
void reset()
Reset the internal state.
Definition: ProcessMonitor.cc:88
Belle2::ProcessMonitor::processesWithType
unsigned int processesWithType(const ProcType &procType) const
Cound the number of processes with a certain type.
Definition: ProcessMonitor.cc:351
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::GlobalProcHandler::killAllProcesses
static void killAllProcesses()
Hard kill all processes.
Definition: GlobalProcHandler.cc:241
Belle2::ZMQClient::send
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:52
Belle2::ProcessMonitor::processMulticast
void processMulticast(const ASocket &socket)
Process a message from the multicast.
Definition: ProcessMonitor.cc:211
Belle2::ZMQClient::publish
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:62
Belle2::ZMQMessageFactory::createMessage
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Definition: ZMQMessageFactory.h:37
Belle2::ProcessMonitor::checkMulticast
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
Definition: ProcessMonitor.cc:197
Belle2::ProcessMonitor::needMoreWorkers
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
Definition: ProcessMonitor.cc:339
Belle2::StreamHelper::read
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:43
Belle2::ProcessMonitor::waitForRunningWorker
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.
Definition: ProcessMonitor.cc:160
Belle2::ZMQClient::initialize
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:55
Belle2::ProcessMonitor::checkSignals
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
Definition: ProcessMonitor.cc:321
Belle2::ProcessMonitor::hasEnded
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
Definition: ProcessMonitor.cc:329
Belle2::ProcessMonitor::waitForRunningOutput
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
Definition: ProcessMonitor.cc:176
Belle2::ProcessMonitor::killProcesses
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
Definition: ProcessMonitor.cc:93
Belle2::ProcType::c_Proxy
@ c_Proxy
Multicast Proxy Process.
Belle2::ZMQClient::reset
void reset()
Reset the sockets. ATTENTION: this does not close the sockets! Use only after forks to not clean up t...
Definition: ZMQClient.cc:45
Belle2::ProcessMonitor::m_client
ZMQClient m_client
The client used for message processing.
Definition: ProcessMonitor.h:79
Belle2::EventProcessor::installMainSignalHandlers
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
Definition: EventProcessor.cc:302
Belle2::ProcessMonitor::m_streamer
StreamHelper m_streamer
The data store streamer.
Definition: ProcessMonitor.h:89
Belle2::ProcessMonitor::checkChildProcesses
void checkChildProcesses()
check the child processes, if one has died
Definition: ProcessMonitor.cc:253