Belle II Software  release-08-01-10
HLTEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/utils/HLTEventProcessor.h>
9 
10 #include <boost/python.hpp>
11 #include <framework/utilities/RegisterPythonModule.h>
12 #include <framework/core/InputController.h>
13 #include <framework/pcore/ProcHandler.h>
14 
15 #include <framework/database/DBStore.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/core/Environment.h>
18 #include <framework/core/ModuleManager.h>
19 
20 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
21 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
22 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
23 
24 #include <TROOT.h>
25 
26 #include <sys/prctl.h>
27 #include <sys/wait.h>
28 
29 #include <chrono>
30 #include <thread>
31 #include <signal.h>
32 #include <zmq.h>
33 
34 using namespace Belle2;
35 using namespace boost::python;
36 
37 namespace {
39  static int g_signalReceived = 0;
40 
41  // For processor unique ID
42  static int g_processNumber = 1;
43 
44  static void storeSignal(int signalNumber)
45  {
46  if (signalNumber == SIGINT) {
47  EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
48  }
49 
50  // We do not want to remove the first signal
51  if (g_signalReceived == 0) {
52  g_signalReceived = signalNumber;
53  }
54  }
55 }
56 
57 void HLTEventProcessor::sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
58 {
59  for (auto& socket : m_sockets) {
60  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage,
62  ZMQParent::send(socket, std::move(message));
63 
64  if (not waitForConfirmation) {
65  continue;
66  }
67  if (ZMQParent::poll({socket.get()}, 10 * 1000)) {
68  auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69  B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
70  } else {
71  B2FATAL("Did not receive a confirmation message!");
72  }
73  }
74 }
75 
76 HLTEventProcessor::HLTEventProcessor(const std::vector<std::string>& outputAddresses)
77 {
78  m_sockets.reserve(outputAddresses.size());
79  for (const auto& address : outputAddresses) {
80  m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, false));
81  }
82 }
83 
84 void HLTEventProcessor::process(PathPtr path, bool restartFailedWorkers, bool appendProcessNumberToModuleName)
85 {
86  using namespace std::chrono_literals;
87 
88  m_moduleList = path->buildModulePathList();
89 
90  // Assert path is what we want: fully parallel certified, not empty. Set first module to master module
91  B2ASSERT("You try to process an empty path!", not m_moduleList.empty());
92  for (const auto& module : m_moduleList) {
93  bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
94  // entire conditional path must also be compatible
95  if (hasParallelFlag and module->hasCondition()) {
96  for (const auto& conditionPath : module->getAllConditionPaths()) {
98  hasParallelFlag = false;
99  }
100  }
101  }
102  B2ASSERT("Module with name " << module->getName() << " does not have parallel flag!", hasParallelFlag);
103  }
104 
105  // Initialize of all modules (including event() of master module)
106  installMainSignalHandlers();
107  processInitialize(m_moduleList);
108 
109  // Don't start processing in case of no master module
110  if (not m_master) {
111  B2ERROR("There is no module that provides event and run numbers (EventMetaData). "
112  "You must add the specific HLT module as first module to the path.");
113  }
114 
115  // Check if errors appeared. If yes, don't start the event processing.
116  const int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
117  if (numLogError != 0) {
118  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
119  }
120 
121  // Start the workers, which call the main loop
122  const int numProcesses = Environment::Instance().getNumberProcesses();
123  runWorkers(path, numProcesses, appendProcessNumberToModuleName);
124 
125  installMainSignalHandlers(storeSignal);
126  // Back in the main process: wait for the processes and monitor them
127  int numberOfRestartedWorkers = 0;
128  while (true) {
129  // check if we have received any signal from the user or OS.
130  // Killing of the remaining processes happens after the loop.
131  if (g_signalReceived > 0) {
132  B2WARNING("Received a signal to go down.");
133  break;
134  }
135 
136  // Test if we need more workers and if one has died
137  unsigned int presentWorkers;
138  unsigned int neededWorkers;
139 
140  std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
141  if (neededWorkers > 0) {
142  if (restartFailedWorkers) {
143  runWorkers(path, neededWorkers);
144  numberOfRestartedWorkers += neededWorkers;
145  } else {
146  B2ERROR("A worker failed. Will try to end the process smoothly now.");
147  break;
148  }
149  } else if (presentWorkers == 0) {
150  B2DEBUG(10, "All workers have cleanly exited. Will now also exit");
151  break;
152  }
153 
154  if (numberOfRestartedWorkers > numProcesses) {
155  B2ERROR("I needed to restart on total " << numberOfRestartedWorkers << ", which I think is abnormal. "
156  "Will terminate the process now!");
157  break;
158  }
159 
160  std::this_thread::sleep_for(10ms);
161  }
162 
163  if (appendProcessNumberToModuleName) {
164  for (const int& pid : m_processList) {
165  B2INFO(g_processNumber << ": Send SIGINT to " << pid);
166  kill(pid, SIGINT);
167  }
168  for (const int& pid : m_processList) {
169  int count = 0;
170  while (true) {
171  // Do not allow internal SIGKILL to prevent data loss in case of a numbered process
172  if (kill(pid, 0) != 0) {
173  break;
174  }
175  B2DEBUG(10, g_processNumber << ": Checking process termination, count = " << count);
176  std::this_thread::sleep_for(1000ms);
177  if (count % 5 == 1) kill(pid, SIGINT);
178  ++count;
179  }
180  }
181  }
182 
183  checkChildProcesses();
184 
185  // if we still have/had processes, we should unregister them
186  std::this_thread::sleep_for(500ms);
187 
188  for (const int& pid : m_processList) {
189  if (kill(pid, SIGKILL) >= 0) {
190  B2WARNING("Needed to hard kill process " << pid);
191  } else {
192  B2DEBUG(100, "no process " << pid << " found, already gone?");
193  }
194  sendTerminatedMessage(pid, false);
195  }
196  m_processList.clear();
197 
198  B2DEBUG(10, "Done here");
199 
200  // Normally, we would call terminate here, but not on HLT!
201  // Normally, we would print the error summary here, but not on HLT!
202  if (g_signalReceived == SIGINT) {
203  installSignalHandler(SIGINT, SIG_DFL);
204  raise(SIGINT);
205  }
206 }
207 
208 void HLTEventProcessor::runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName)
209 {
210  for (unsigned int i = 0; i < numProcesses; i++) {
211  if (forkOut()) {
212  // Do only run in forked out worker process:
213  B2DEBUG(10, "Starting a new worker process");
214  // Reset the parent and sockets
215  release();
216 
217  // Start the main loop with our signal handling and error catching
218  installMainSignalHandlers(storeSignal);
219  try {
220  if (appendProcessNumberToModuleName) {
221  for (const auto& module : m_moduleList) {
222  module->setName(std::to_string(g_processNumber) + std::string("_") + module->getName());
223  B2INFO("New worker name is " << module->getName());
224  }
225  }
226  processCore(path);
227  } catch (StoppedBySignalException& e) {
228  // close all open ROOT files, ROOT's exit handler will crash otherwise
229  gROOT->GetListOfFiles()->Delete();
230 
231  B2ERROR(e.what());
232  exit(1);
233  } catch (...) {
234  if (m_eventMetaDataPtr)
235  B2ERROR("Exception occured in exp/run/evt: "
236  << m_eventMetaDataPtr->getExperiment() << " / "
237  << m_eventMetaDataPtr->getRun() << " / "
238  << m_eventMetaDataPtr->getEvent());
239  throw;
240  }
241 
242  B2DEBUG(10, "Ending a worker process here.");
243  // Ok, we are done here!
244  exit(0);
245  }
246  }
247 }
248 
250 {
251  bool terminationRequested = false;
252  bool firstRound = true;
253 
254  // Initialisation is done
256 
257  // Set the previous event meta data to something invalid
258  m_previousEventMetaData.setEndOfData();
259 
260  while (not terminationRequested) {
261  B2DEBUG(100, "Processing new event");
262 
263  // Start the measurement
264  m_processStatisticsPtr->startGlobal();
265 
266  // Main call to event() of the modules, and maybe beginRun() and endRun()
267  PathIterator moduleIter(path);
268  terminationRequested = processEvent(moduleIter, firstRound);
269 
270  // Delete event related data in DataStore
272 
273  // Stop the measurement
274  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_Event);
275 
276  // We are surely not in the first round the next time
277  firstRound = false;
278  }
279 
280  // End last run with a terminate. Yes, we are not calling a endRun() here and yes, we are calling this in the worker
281  B2DEBUG(10, "Calling terminate");
282  m_eventMetaDataPtr.create();
283  processTerminate(m_moduleList);
284 }
285 
286 bool HLTEventProcessor::processEvent(PathIterator moduleIter, bool firstRound)
287 {
288  while (not moduleIter.isDone()) {
289  Module* module = moduleIter.get();
290  B2DEBUG(10, "Starting event of " << module->getName());
291 
292  // The actual call of the event function
293  if (module != m_master) {
294  // If this is not the master module it is quite simple: just call the event function
295  callEvent(module);
296 
297  // Check for a second master module. Cannot do this if we are in the first round after initialize
298  // (as previous event meta data is not set properly here)
299  if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
300  (*m_eventMetaDataPtr != m_previousEventMetaData)) {
301  B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and "
302  << module->getName());
303  }
304  } else {
305  if (not firstRound) {
306  // Also call the event function for the master, but not the first time
307  callEvent(module);
308  }
309  // initialize random number state for the event handling after we have
310  // recieved the event information from the master module.
312  }
313 
314  if (g_signalReceived != 0) {
315  if (g_signalReceived != SIGINT) {
316  throw StoppedBySignalException(g_signalReceived);
317  } else {
318  B2DEBUG(10, "Received a SIGINT in the worker process...");
319  return true;
320  }
321  }
322 
323  B2ASSERT("The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
324 
325  if (m_eventMetaDataPtr->isEndOfData()) {
326  // Immediately leave the loop and terminate (true)
327  return true;
328  }
329 
330  if (module == m_master and not firstRound) {
331  if (m_eventMetaDataPtr->isEndOfRun()) {
332  B2DEBUG(10, "Calling endRun()");
333  // call endRun() of all modules (internally uses the previous event meta data) and skip to the next event
334  m_processStatisticsPtr->suspendGlobal();
335  m_inRun = true;
336  processEndRun();
337  m_processStatisticsPtr->resumeGlobal();
338 
339  // Store the current event meta data for the next round
340  m_previousEventMetaData = *m_eventMetaDataPtr;
341 
342  // Leave this event, but not the full processing (false)
343  return false;
344  } else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
345  // The run has changes (or we never had one), so call beginRun() before going on
346  m_processStatisticsPtr->suspendGlobal();
347  processBeginRun();
348  m_processStatisticsPtr->resumeGlobal();
349  }
350 
351  const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
352  (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
353  const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
354  and not m_previousEventMetaData.isEndOfRun();
355  if (runChangedWithoutNotice) {
356  m_processStatisticsPtr->suspendGlobal();
357 
358  m_inRun = true;
359  processEndRun();
360  processBeginRun();
361 
362  m_processStatisticsPtr->resumeGlobal();
363  }
364 
365  // make sure we use the event dependent generator again
367 
368  // and the correct database
370 
371  // Store the current event meta data for the next round
372  m_previousEventMetaData = *m_eventMetaDataPtr;
373  }
374 
375  // Check for the module conditions, evaluate them and if one is true, switch to the new path
376  if (module->evalCondition()) {
377  PathPtr condPath = module->getConditionPath();
378  // continue with parent Path after condition path is executed?
379  if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
380  moduleIter = PathIterator(condPath, moduleIter);
381  } else {
382  moduleIter = PathIterator(condPath);
383  }
384  } else {
385  moduleIter.next();
386  }
387  }
388  return false;
389 }
390 
391 std::pair<unsigned int, unsigned int> HLTEventProcessor::checkChildProcesses()
392 {
393  unsigned int needToRestart = 0;
394 
395  // Check for processes, which where there last time but are gone now (so they died)
396  for (auto iter = m_processList.begin(); iter != m_processList.end();) {
397  const auto& pid = *iter;
398 
399  // check the status of this process pid
400  int status;
401  const int result = waitpid(pid, &status, WNOHANG);
402  if (result == -1) {
403  if (errno == EINTR) {
404  // interrupted, try again next time
405  ++iter;
406  continue;
407  } else {
408  B2FATAL("waitpid() failed.");
409  }
410  } else if (result == 0) {
411  // No change, so lets continue with the next worker
412  ++iter;
413  continue;
414  }
415 
416  B2ASSERT("Do not understand the result of waitpid()", result == pid);
417 
418  // state has changed, which means it is dead!
419  const auto exitCode = WEXITSTATUS(status);
420 
421  // we only need to restart unexpected deads
422  if (exitCode != 0) {
423  B2WARNING("A worker process has died unexpected!");
424  needToRestart += 1;
425 
426  sendTerminatedMessage(pid, true);
427  }
428 
429  // once a process is gone from the global list, remove them from our own, too.
430  iter = m_processList.erase(iter);
431  }
432 
433  return {m_processList.size(), needToRestart};
434 }
435 
437 {
438  for (auto& socket : m_sockets) {
439  socket.release();
440  }
441  m_parent.reset();
442 }
443 
445 {
446  fflush(stdout);
447  fflush(stderr);
448 
449  pid_t pid = fork();
450 
451  if (pid > 0) {
452  g_processNumber++;
453  m_processList.push_back(pid);
454  return false;
455  } else if (pid < 0) {
456  B2FATAL("fork() failed: " << strerror(errno));
457  } else {
458  // Child process
459  // Reset some python state: signals, threads, gil in the child
460  PyOS_AfterFork_Child();
461  // InputController becomes useless in child process
463  // die when parent dies
464  prctl(PR_SET_PDEATHSIG, SIGHUP);
465 
466  ProcHandler::setProcessID(getpid());
467  return true;
468  }
469 }
470 
471 void processNumbered(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false,
472  bool appendProcessNumberToModuleName = true)
473 {
474  static bool already_executed = false;
475  B2ASSERT("Can not run process() on HLT twice per file!", not already_executed);
476 
477  auto& environment = Environment::Instance();
478  B2ASSERT("HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
479 
480  namespace py = boost::python;
481  std::vector<std::string> outputAddressesAsString;
482  size_t nList = py::len(outputAddresses);
483  for (size_t iList = 0; iList < nList; ++iList) {
484  outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr("__str__")()));
485  }
486 
487  try {
491 
492  already_executed = true;
493 
494  HLTEventProcessor processor(outputAddressesAsString);
495  processor.process(startPath, restartFailedWorkers, appendProcessNumberToModuleName);
496 
498  } catch (std::exception& e) {
499  B2ERROR("Uncaught exception encountered: " << e.what()); //should show module name
500  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
501  throw; //and let python's global handler do the rest
502  } catch (...) {
503  B2ERROR("Uncaught exception encountered!"); //should show module name
504  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
505  throw; //and let python's global handler do the rest
506  }
507 }
508 
509 void process(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false)
510 {
511  processNumbered(startPath, outputAddresses, restartFailedWorkers, false);
512 }
513 
514 BOOST_PYTHON_MODULE(hbasf2)
515 {
516  def("processNumbered", &processNumbered);
517  def("process", &process);
518 }
519 
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:94
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
Definition: DataStore.cc:86
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:715
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:145
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Exception thrown when execution is stopped by a signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
Process the given path.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
static void resetForChildProcess()
Reset InputController (e.g.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
Definition: LogSystem.cc:150
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
@ c_Continue
After the conditional path, resume execution after this module.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
Base class for Modules.
Definition: Module.h:72
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:32
void reset(bool keepEntries=false)
Invalidate all payloads.
Definition: DBStore.cc:177
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:28
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:142
Abstract base class for different kinds of events.