Belle II Software  release-08-00-10
HLTEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/utils/HLTEventProcessor.h>
9 
10 #include <boost/python.hpp>
11 #include <framework/utilities/RegisterPythonModule.h>
12 #include <framework/core/InputController.h>
13 #include <framework/pcore/ProcHandler.h>
14 
15 #include <framework/database/DBStore.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/core/Environment.h>
18 #include <framework/core/ModuleManager.h>
19 
20 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
21 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
22 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
23 
24 #include <TROOT.h>
25 
26 #include <sys/prctl.h>
27 #include <sys/wait.h>
28 
29 #include <chrono>
30 #include <thread>
31 #include <signal.h>
32 #include <zmq.h>
33 
34 using namespace Belle2;
35 using namespace boost::python;
36 
37 namespace {
39  static int g_signalReceived = 0;
40 
41  // For processor unique ID
42  static int g_processNumber = 1;
43 
44  static void storeSignal(int signalNumber)
45  {
46  if (signalNumber == SIGINT) {
47  EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
48  }
49 
50  // We do not want to remove the first signal
51  if (g_signalReceived == 0) {
52  g_signalReceived = signalNumber;
53  }
54  }
55 }
56 
57 void HLTEventProcessor::sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
58 {
59  for (auto& socket : m_sockets) {
60  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage,
62  ZMQParent::send(socket, std::move(message));
63 
64  if (not waitForConfirmation) {
65  continue;
66  }
67  if (ZMQParent::poll({socket.get()}, 10 * 1000)) {
68  auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69  B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
70  } else {
71  B2FATAL("Did not receive a confirmation message!");
72  }
73  }
74 }
75 
76 HLTEventProcessor::HLTEventProcessor(const std::vector<std::string>& outputAddresses)
77 {
78  m_sockets.reserve(outputAddresses.size());
79  for (const auto& address : outputAddresses) {
80  m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, false));
81  }
82 }
83 
84 void HLTEventProcessor::process(PathPtr path, bool restartFailedWorkers, bool appendProcessNumberToModuleName)
85 {
86  using namespace std::chrono_literals;
87 
88  m_moduleList = path->buildModulePathList();
89 
90  // Assert path is what we want: fully parallel certified, not empty. Set first module to master module
91  B2ASSERT("You try to process an empty path!", not m_moduleList.empty());
92  for (const auto& module : m_moduleList) {
93  bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
94  // entire conditional path must also be compatible
95  if (hasParallelFlag and module->hasCondition()) {
96  for (const auto& conditionPath : module->getAllConditionPaths()) {
98  hasParallelFlag = false;
99  }
100  }
101  }
102  B2ASSERT("Module with name " << module->getName() << " does not have parallel flag!", hasParallelFlag);
103  }
104 
105  // Initialize of all modules (including event() of master module)
106  installMainSignalHandlers();
107  processInitialize(m_moduleList);
108 
109  // Don't start processing in case of no master module
110  if (not m_master) {
111  B2ERROR("There is no module that provides event and run numbers (EventMetaData). "
112  "You must add the specific HLT module as first module to the path.");
113  }
114 
115  // Check if errors appeared. If yes, don't start the event processing.
116  const int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
117  if (numLogError != 0) {
118  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
119  }
120 
121  // Start the workers, which call the main loop
122  const int numProcesses = Environment::Instance().getNumberProcesses();
123  runWorkers(path, numProcesses, appendProcessNumberToModuleName);
124 
125  installMainSignalHandlers(storeSignal);
126  // Back in the main process: wait for the processes and monitor them
127  int numberOfRestartedWorkers = 0;
128  while (true) {
129  // check if we have received any signal from the user or OS.
130  // Killing of the remaining processes happens after the loop.
131  if (g_signalReceived > 0) {
132  B2WARNING("Received a signal to go down.");
133  break;
134  }
135 
136  // Test if we need more workers and if one has died
137  unsigned int presentWorkers;
138  unsigned int neededWorkers;
139 
140  std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
141  if (neededWorkers > 0) {
142  if (restartFailedWorkers) {
143  runWorkers(path, neededWorkers);
144  numberOfRestartedWorkers += neededWorkers;
145  } else {
146  B2ERROR("A worker failed. Will try to end the process smoothly now.");
147  break;
148  }
149  } else if (presentWorkers == 0) {
150  B2DEBUG(10, "All workers have cleanly exited. Will now also exit");
151  break;
152  }
153 
154  if (numberOfRestartedWorkers > numProcesses) {
155  B2ERROR("I needed to restart on total " << numberOfRestartedWorkers << ", which I think is abnormal. "
156  "Will terminate the process now!");
157  break;
158  }
159 
160  std::this_thread::sleep_for(10ms);
161  }
162 
163  if (appendProcessNumberToModuleName) {
164  for (const int& pid : m_processList) {
165  B2INFO(g_processNumber << ": Send SIGINT to " << pid);
166  kill(pid, SIGINT);
167  }
168  for (const int& pid : m_processList) {
169  int count = 0;
170  while (true) {
171  if (kill(pid, 0) != 0) {
172  break;
173  }
174  B2DEBUG(10, g_processNumber << ": Checking process termination, count = " << count);
175  std::this_thread::sleep_for(1000ms);
176  // Force to leave the loop after 20min
177  // Before this, slow control app will send SIGKILL in normal case
178  if (count % 5 == 1) kill(pid, SIGINT);
179  if (count == 1200) break;
180  ++count;
181  }
182  }
183  }
184 
185  checkChildProcesses();
186 
187  // if we still have/had processes, we should unregister them
188  std::this_thread::sleep_for(500ms);
189 
190  for (const int& pid : m_processList) {
191  if (kill(pid, SIGKILL) >= 0) {
192  B2WARNING("Needed to hard kill process " << pid);
193  } else {
194  B2DEBUG(100, "no process " << pid << " found, already gone?");
195  }
196  sendTerminatedMessage(pid, false);
197  }
198  m_processList.clear();
199 
200  B2DEBUG(10, "Done here");
201 
202  // Normally, we would call terminate here, but not on HLT!
203  // Normally, we would print the error summary here, but not on HLT!
204  if (g_signalReceived == SIGINT) {
205  installSignalHandler(SIGINT, SIG_DFL);
206  raise(SIGINT);
207  }
208 }
209 
210 void HLTEventProcessor::runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName)
211 {
212  for (unsigned int i = 0; i < numProcesses; i++) {
213  if (forkOut()) {
214  // Do only run in forked out worker process:
215  B2DEBUG(10, "Starting a new worker process");
216  // Reset the parent and sockets
217  release();
218 
219  // Start the main loop with our signal handling and error catching
220  installMainSignalHandlers(storeSignal);
221  try {
222  if (appendProcessNumberToModuleName) {
223  for (const auto& module : m_moduleList) {
224  module->setName(std::to_string(g_processNumber) + std::string("_") + module->getName());
225  B2INFO("New worker name is " << module->getName());
226  }
227  }
228  processCore(path);
229  } catch (StoppedBySignalException& e) {
230  // close all open ROOT files, ROOT's exit handler will crash otherwise
231  gROOT->GetListOfFiles()->Delete();
232 
233  B2ERROR(e.what());
234  exit(1);
235  } catch (...) {
236  if (m_eventMetaDataPtr)
237  B2ERROR("Exception occured in exp/run/evt: "
238  << m_eventMetaDataPtr->getExperiment() << " / "
239  << m_eventMetaDataPtr->getRun() << " / "
240  << m_eventMetaDataPtr->getEvent());
241  throw;
242  }
243 
244  B2DEBUG(10, "Ending a worker process here.");
245  // Ok, we are done here!
246  exit(0);
247  }
248  }
249 }
250 
252 {
253  bool terminationRequested = false;
254  bool firstRound = true;
255 
256  // Initialisation is done
258 
259  // Set the previous event meta data to something invalid
260  m_previousEventMetaData.setEndOfData();
261 
262  while (not terminationRequested) {
263  B2DEBUG(100, "Processing new event");
264 
265  // Start the measurement
266  m_processStatisticsPtr->startGlobal();
267 
268  // Main call to event() of the modules, and maybe beginRun() and endRun()
269  PathIterator moduleIter(path);
270  terminationRequested = processEvent(moduleIter, firstRound);
271 
272  // Delete event related data in DataStore
274 
275  // Stop the measurement
276  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_Event);
277 
278  // We are surely not in the first round the next time
279  firstRound = false;
280  }
281 
282  // End last run with a terminate. Yes, we are not calling a endRun() here and yes, we are calling this in the worker
283  B2DEBUG(10, "Calling terminate");
284  m_eventMetaDataPtr.create();
285  processTerminate(m_moduleList);
286 }
287 
288 bool HLTEventProcessor::processEvent(PathIterator moduleIter, bool firstRound)
289 {
290  while (not moduleIter.isDone()) {
291  Module* module = moduleIter.get();
292  B2DEBUG(10, "Starting event of " << module->getName());
293 
294  // The actual call of the event function
295  if (module != m_master) {
296  // If this is not the master module it is quite simple: just call the event function
297  callEvent(module);
298 
299  // Check for a second master module. Cannot do this if we are in the first round after initialize
300  // (as previous event meta data is not set properly here)
301  if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
302  (*m_eventMetaDataPtr != m_previousEventMetaData)) {
303  B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and "
304  << module->getName());
305  }
306  } else {
307  if (not firstRound) {
308  // Also call the event function for the master, but not the first time
309  callEvent(module);
310  }
311  // initialize random number state for the event handling after we have
312  // recieved the event information from the master module.
314  }
315 
316  if (g_signalReceived != 0) {
317  if (g_signalReceived != SIGINT) {
318  throw StoppedBySignalException(g_signalReceived);
319  } else {
320  B2DEBUG(10, "Received a SIGINT in the worker process...");
321  return true;
322  }
323  }
324 
325  B2ASSERT("The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
326 
327  if (m_eventMetaDataPtr->isEndOfData()) {
328  // Immediately leave the loop and terminate (true)
329  return true;
330  }
331 
332  if (module == m_master and not firstRound) {
333  if (m_eventMetaDataPtr->isEndOfRun()) {
334  B2DEBUG(10, "Calling endRun()");
335  // call endRun() of all modules (internally uses the previous event meta data) and skip to the next event
336  m_processStatisticsPtr->suspendGlobal();
337  m_inRun = true;
338  processEndRun();
339  m_processStatisticsPtr->resumeGlobal();
340 
341  // Store the current event meta data for the next round
342  m_previousEventMetaData = *m_eventMetaDataPtr;
343 
344  // Leave this event, but not the full processing (false)
345  return false;
346  } else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
347  // The run has changes (or we never had one), so call beginRun() before going on
348  m_processStatisticsPtr->suspendGlobal();
349  processBeginRun();
350  m_processStatisticsPtr->resumeGlobal();
351  }
352 
353  const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
354  (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
355  const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
356  and not m_previousEventMetaData.isEndOfRun();
357  if (runChangedWithoutNotice) {
358  m_processStatisticsPtr->suspendGlobal();
359 
360  m_inRun = true;
361  processEndRun();
362  processBeginRun();
363 
364  m_processStatisticsPtr->resumeGlobal();
365  }
366 
367  // make sure we use the event dependent generator again
369 
370  // and the correct database
372 
373  // Store the current event meta data for the next round
374  m_previousEventMetaData = *m_eventMetaDataPtr;
375  }
376 
377  // Check for the module conditions, evaluate them and if one is true, switch to the new path
378  if (module->evalCondition()) {
379  PathPtr condPath = module->getConditionPath();
380  // continue with parent Path after condition path is executed?
381  if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
382  moduleIter = PathIterator(condPath, moduleIter);
383  } else {
384  moduleIter = PathIterator(condPath);
385  }
386  } else {
387  moduleIter.next();
388  }
389  }
390  return false;
391 }
392 
393 std::pair<unsigned int, unsigned int> HLTEventProcessor::checkChildProcesses()
394 {
395  unsigned int needToRestart = 0;
396 
397  // Check for processes, which where there last time but are gone now (so they died)
398  for (auto iter = m_processList.begin(); iter != m_processList.end();) {
399  const auto& pid = *iter;
400 
401  // check the status of this process pid
402  int status;
403  const int result = waitpid(pid, &status, WNOHANG);
404  if (result == -1) {
405  if (errno == EINTR) {
406  // interrupted, try again next time
407  ++iter;
408  continue;
409  } else {
410  B2FATAL("waitpid() failed.");
411  }
412  } else if (result == 0) {
413  // No change, so lets continue with the next worker
414  ++iter;
415  continue;
416  }
417 
418  B2ASSERT("Do not understand the result of waitpid()", result == pid);
419 
420  // state has changed, which means it is dead!
421  const auto exitCode = WEXITSTATUS(status);
422 
423  // we only need to restart unexpected deads
424  if (exitCode != 0) {
425  B2WARNING("A worker process has died unexpected!");
426  needToRestart += 1;
427 
428  sendTerminatedMessage(pid, true);
429  }
430 
431  // once a process is gone from the global list, remove them from our own, too.
432  iter = m_processList.erase(iter);
433  }
434 
435  return {m_processList.size(), needToRestart};
436 }
437 
439 {
440  for (auto& socket : m_sockets) {
441  socket.release();
442  }
443  m_parent.reset();
444 }
445 
447 {
448  fflush(stdout);
449  fflush(stderr);
450 
451  pid_t pid = fork();
452 
453  if (pid > 0) {
454  g_processNumber++;
455  m_processList.push_back(pid);
456  return false;
457  } else if (pid < 0) {
458  B2FATAL("fork() failed: " << strerror(errno));
459  } else {
460  // Child process
461  // Reset some python state: signals, threads, gil in the child
462  PyOS_AfterFork_Child();
463  // InputController becomes useless in child process
465  // die when parent dies
466  prctl(PR_SET_PDEATHSIG, SIGHUP);
467 
468  ProcHandler::setProcessID(getpid());
469  return true;
470  }
471 }
472 
473 void processNumbered(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false,
474  bool appendProcessNumberToModuleName = true)
475 {
476  static bool already_executed = false;
477  B2ASSERT("Can not run process() on HLT twice per file!", not already_executed);
478 
479  auto& environment = Environment::Instance();
480  B2ASSERT("HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
481 
482  namespace py = boost::python;
483  std::vector<std::string> outputAddressesAsString;
484  size_t nList = py::len(outputAddresses);
485  for (size_t iList = 0; iList < nList; ++iList) {
486  outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr("__str__")()));
487  }
488 
489  try {
493 
494  already_executed = true;
495 
496  HLTEventProcessor processor(outputAddressesAsString);
497  processor.process(startPath, restartFailedWorkers, appendProcessNumberToModuleName);
498 
500  } catch (std::exception& e) {
501  B2ERROR("Uncaught exception encountered: " << e.what()); //should show module name
502  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
503  throw; //and let python's global handler do the rest
504  } catch (...) {
505  B2ERROR("Uncaught exception encountered!"); //should show module name
506  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
507  throw; //and let python's global handler do the rest
508  }
509 }
510 
511 void process(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false)
512 {
513  processNumbered(startPath, outputAddresses, restartFailedWorkers, false);
514 }
515 
516 BOOST_PYTHON_MODULE(hbasf2)
517 {
518  def("processNumbered", &processNumbered);
519  def("process", &process);
520 }
521 
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:94
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
Definition: DataStore.cc:86
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:715
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:145
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Exception thrown when execution is stopped by a signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
Process the given path.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
static void resetForChildProcess()
Reset InputController (e.g.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
Definition: LogSystem.cc:150
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
@ c_Continue
After the conditional path, resume execution after this module.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
Base class for Modules.
Definition: Module.h:72
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:32
void reset(bool keepEntries=false)
Invalidate all payloads.
Definition: DBStore.cc:177
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:28
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:142
Abstract base class for different kinds of events.