Belle II Software  release-05-01-25
HLTEventProcessor.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <daq/hbasf2/utils/HLTEventProcessor.h>
11 
12 #include <boost/python.hpp>
13 #include <framework/utilities/RegisterPythonModule.h>
14 #include <framework/core/InputController.h>
15 #include <framework/pcore/ProcHandler.h>
16 
17 #include <framework/database/DBStore.h>
18 #include <framework/core/RandomNumbers.h>
19 #include <framework/core/Environment.h>
20 #include <framework/core/ModuleManager.h>
21 
22 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
23 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
24 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
25 
26 #include <TROOT.h>
27 
28 #include <sys/prctl.h>
29 #include <sys/wait.h>
30 
31 #include <chrono>
32 #include <thread>
33 #include <signal.h>
34 #include <zmq.h>
35 
36 using namespace Belle2;
37 using namespace boost::python;
38 
39 namespace {
41  static int g_signalReceived = 0;
42 
43  static void storeSignal(int signalNumber)
44  {
45  if (signalNumber == SIGINT) {
46  EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
47  }
48 
49  // We do not want to remove the first signal
50  if (g_signalReceived == 0) {
51  g_signalReceived = signalNumber;
52  }
53  }
54 }
55 
56 void HLTEventProcessor::sendTerminatedMessage(unsigned int pid, bool waitForConformation)
57 {
58  for (auto& socket : m_sockets) {
59  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage,
61  ZMQParent::send(socket, std::move(message));
62 
63  if (not waitForConformation) {
64  continue;
65  }
66  if (ZMQParent::poll({socket.get()}, 10 * 1000)) {
67  auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
68  B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
69  } else {
70  B2FATAL("Did not receive a confirmation message!");
71  }
72  }
73 }
74 
75 HLTEventProcessor::HLTEventProcessor(const std::vector<std::string>& outputAddresses)
76 {
77  m_sockets.reserve(outputAddresses.size());
78  for (const auto& address : outputAddresses) {
79  m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, false));
80  }
81 }
82 
83 void HLTEventProcessor::process(PathPtr path, bool restartFailedWorkers)
84 {
85  using namespace std::chrono_literals;
86 
87  m_moduleList = path->buildModulePathList();
88 
89  // Assert path is what we want: fully parallel certified, not empty. Set first module to master module
90  B2ASSERT("You try to process an empty path!", not m_moduleList.empty());
91  for (const auto& module : m_moduleList) {
92  bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
93  // entire conditional path must also be compatible
94  if (hasParallelFlag and module->hasCondition()) {
95  for (const auto& conditionPath : module->getAllConditionPaths()) {
97  hasParallelFlag = false;
98  }
99  }
100  }
101  B2ASSERT("Module with name " << module->getName() << " does not have parallel flag!", hasParallelFlag);
102  }
103 
104  // Initialize of all modules (including event() of master module)
105  installMainSignalHandlers();
106  processInitialize(m_moduleList);
107 
108  // Don't start processing in case of no master module
109  if (not m_master) {
110  B2ERROR("There is no module that provides event and run numbers (EventMetaData). "
111  "You must add the specific HLT module as first module to the path.");
112  }
113 
114  // Check if errors appeared. If yes, don't start the event processing.
115  const int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
116  if (numLogError != 0) {
117  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
118  }
119 
120  // Start the workers, which call the main loop
121  const int numProcesses = Environment::Instance().getNumberProcesses();
122  runWorkers(path, numProcesses);
123 
124  installMainSignalHandlers(storeSignal);
125  // Back in the main process: wait for the processes and monitor them
126  int numberOfRestartedWorkers = 0;
127  while (true) {
128  // check if we have received any signal from the user or OS.
129  // Killing of the remaining processes happens after the loop.
130  if (g_signalReceived > 0) {
131  B2WARNING("Received a signal to go down.");
132  break;
133  }
134 
135  // Test if we need more workers and if one has died
136  unsigned int presentWorkers;
137  unsigned int neededWorkers;
138 
139  std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
140  if (neededWorkers > 0) {
141  if (restartFailedWorkers) {
142  runWorkers(path, neededWorkers);
143  numberOfRestartedWorkers += neededWorkers;
144  } else {
145  B2ERROR("A worker failed. Will try to end the process smoothly now.");
146  break;
147  }
148  } else if (presentWorkers == 0) {
149  B2DEBUG(10, "All workers have cleanly exited. Will now also exit");
150  break;
151  }
152 
153  if (numberOfRestartedWorkers > numProcesses) {
154  B2ERROR("I needed to restart on total " << numberOfRestartedWorkers << ", which I think is abnormal. "
155  "Will terminate the process now!");
156  break;
157  }
158 
159  std::this_thread::sleep_for(10ms);
160  }
161 
162  checkChildProcesses();
163 
164  // if we still have/had processes, we should unregister them
165  std::this_thread::sleep_for(500ms);
166 
167  for (const int& pid : m_processList) {
168  if (kill(pid, SIGKILL) >= 0) {
169  B2WARNING("Needed to hard kill process " << pid);
170  } else {
171  B2DEBUG(100, "no process " << pid << " found, already gone?");
172  }
173  sendTerminatedMessage(pid, false);
174  }
175  m_processList.clear();
176 
177  B2DEBUG(10, "Done here");
178 
179  // Normally, we would call terminate here, but not on HLT!
180  // Normally, we would print the error summary here, but not on HLT!
181  if (g_signalReceived == SIGINT) {
182  installSignalHandler(SIGINT, SIG_DFL);
183  raise(SIGINT);
184  }
185 }
186 
187 void HLTEventProcessor::runWorkers(PathPtr path, unsigned int numProcesses)
188 {
189  for (unsigned int i = 0; i < numProcesses; i++) {
190  if (forkOut()) {
191  // Do only run in forked out worker process:
192  B2DEBUG(10, "Starting a new worker process");
193  // Reset the parent and sockets
194  release();
195 
196  // Start the main loop with our signal handling and error catching
197  installMainSignalHandlers(storeSignal);
198  try {
199  processCore(path);
200  } catch (StoppedBySignalException& e) {
201  // close all open ROOT files, ROOT's exit handler will crash otherwise
202  gROOT->GetListOfFiles()->Delete();
203 
204  B2ERROR(e.what());
205  exit(1);
206  } catch (...) {
207  if (m_eventMetaDataPtr)
208  B2ERROR("Exception occured in exp/run/evt: "
209  << m_eventMetaDataPtr->getExperiment() << " / "
210  << m_eventMetaDataPtr->getRun() << " / "
211  << m_eventMetaDataPtr->getEvent());
212  throw;
213  }
214 
215  B2DEBUG(10, "Ending a worker process here.");
216  // Ok, we are done here!
217  exit(0);
218  }
219  }
220 }
221 
223 {
224  bool terminationRequested = false;
225  bool firstRound = true;
226 
227  // Initialisation is done
229 
230  // Set the previous event meta data to something invalid
231  m_previousEventMetaData.setEndOfData();
232 
233  while (not terminationRequested) {
234  B2DEBUG(100, "Processing new event");
235 
236  // Start the measurement
237  m_processStatisticsPtr->startGlobal();
238 
239  // Main call to event() of the modules, and maybe beginRun() and endRun()
240  PathIterator moduleIter(path);
241  terminationRequested = processEvent(moduleIter, firstRound);
242 
243  // Delete event related data in DataStore
245 
246  // Stop the measurement
247  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_Event);
248 
249  // We are surely not in the first round the next time
250  firstRound = false;
251  }
252 
253  // End last run with a terminate. Yes, we are not calling a endRun() here and yes, we are calling this in the worker
254  B2DEBUG(10, "Calling terminate");
255  m_eventMetaDataPtr.create();
256  processTerminate(m_moduleList);
257 }
258 
259 bool HLTEventProcessor::processEvent(PathIterator moduleIter, bool firstRound)
260 {
261  while (not moduleIter.isDone()) {
262  Module* module = moduleIter.get();
263  B2DEBUG(10, "Starting event of " << module->getName());
264 
265  // The actual call of the event function
266  if (module != m_master) {
267  // If this is not the master module it is quite simple: just call the event function
268  callEvent(module);
269 
270  // Check for a second master module. Cannot do this if we are in the first round after initialize
271  // (as previous event meta data is not set properly here)
272  if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
273  (*m_eventMetaDataPtr != m_previousEventMetaData)) {
274  B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and "
275  << module->getName());
276  }
277  } else {
278  if (not firstRound) {
279  // Also call the event function for the master, but not the first time
280  callEvent(module);
281  }
282  // initialize random number state for the event handling after we have
283  // recieved the event information from the master module.
285  }
286 
287  if (g_signalReceived != 0) {
288  if (g_signalReceived != SIGINT) {
289  throw StoppedBySignalException(g_signalReceived);
290  } else {
291  B2DEBUG(10, "Received a SIGINT in the worker process...");
292  return true;
293  }
294  }
295 
296  B2ASSERT("The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
297 
298  if (m_eventMetaDataPtr->isEndOfData()) {
299  // Immediately leave the loop and terminate (true)
300  return true;
301  }
302 
303  if (module == m_master and not firstRound) {
304  if (m_eventMetaDataPtr->isEndOfRun()) {
305  B2DEBUG(10, "Calling endRun()");
306  // call endRun() of all modules (internally uses the previous event meta data) and skip to the next event
307  m_processStatisticsPtr->suspendGlobal();
308  m_inRun = true;
309  processEndRun();
310  m_processStatisticsPtr->resumeGlobal();
311 
312  // Store the current event meta data for the next round
313  m_previousEventMetaData = *m_eventMetaDataPtr;
314 
315  // Leave this event, but not the full processing (false)
316  return false;
317  } else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
318  // The run has changes (or we never had one), so call beginRun() before going on
319  m_processStatisticsPtr->suspendGlobal();
320  processBeginRun();
321  m_processStatisticsPtr->resumeGlobal();
322  }
323 
324  const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
325  (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
326  const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
327  and not m_previousEventMetaData.isEndOfRun();
328  if (runChangedWithoutNotice) {
329  m_processStatisticsPtr->suspendGlobal();
330 
331  m_inRun = true;
332  processEndRun();
333  processBeginRun();
334 
335  m_processStatisticsPtr->resumeGlobal();
336  }
337 
338  // make sure we use the event dependent generator again
340 
341  // and the correct database
343 
344  // Store the current event meta data for the next round
345  m_previousEventMetaData = *m_eventMetaDataPtr;
346  }
347 
348  // Check for the module conditions, evaluate them and if one is true, switch to the new path
349  if (module->evalCondition()) {
350  PathPtr condPath = module->getConditionPath();
351  // continue with parent Path after condition path is executed?
352  if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
353  moduleIter = PathIterator(condPath, moduleIter);
354  } else {
355  moduleIter = PathIterator(condPath);
356  }
357  } else {
358  moduleIter.next();
359  }
360  }
361  return false;
362 }
363 
364 std::pair<unsigned int, unsigned int> HLTEventProcessor::checkChildProcesses()
365 {
366  unsigned int needToRestart = 0;
367 
368  // Check for processes, which where there last time but are gone now (so they died)
369  for (auto iter = m_processList.begin(); iter != m_processList.end();) {
370  const auto& pid = *iter;
371 
372  // check the status of this process pid
373  int status;
374  const int result = waitpid(pid, &status, WNOHANG);
375  if (result == -1) {
376  if (errno == EINTR) {
377  // interrupted, try again next time
378  ++iter;
379  continue;
380  } else {
381  B2FATAL("waitpid() failed.");
382  }
383  } else if (result == 0) {
384  // No change, so lets continue with the next worker
385  ++iter;
386  continue;
387  }
388 
389  B2ASSERT("Do not understand the result of waitpid()", result == pid);
390 
391  // state has changed, which means it is dead!
392  const auto exitCode = WEXITSTATUS(status);
393 
394  // we only need to restart unexpected deads
395  if (exitCode != 0) {
396  B2WARNING("A worker process has died unexpected!");
397  needToRestart += 1;
398 
399  sendTerminatedMessage(pid, true);
400  }
401 
402  // once a process is gone from the global list, remove them from our own, too.
403  iter = m_processList.erase(iter);
404  }
405 
406  return {m_processList.size(), needToRestart};
407 }
408 
410 {
411  for (auto& socket : m_sockets) {
412  socket.release();
413  }
414  m_parent.reset();
415 }
416 
418 {
419  fflush(stdout);
420  fflush(stderr);
421 
422  pid_t pid = fork();
423 
424  if (pid > 0) {
425  m_processList.push_back(pid);
426  return false;
427  } else if (pid < 0) {
428  B2FATAL("fork() failed: " << strerror(errno));
429  } else {
430  // Child process
431  // Reset some python state: signals, threads, gil in the child
432  PyOS_AfterFork();
433  // InputController becomes useless in child process
435  // die when parent dies
436  prctl(PR_SET_PDEATHSIG, SIGHUP);
437 
438  ProcHandler::setProcessID(getpid());
439  return true;
440  }
441 }
442 
443 void process(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false)
444 {
445  static bool already_executed = false;
446  B2ASSERT("Can not run process() on HLT twice per file!", not already_executed);
447 
448  auto& environment = Environment::Instance();
449  B2ASSERT("HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
450 
451  namespace py = boost::python;
452  std::vector<std::string> outputAddressesAsString;
453  size_t nList = py::len(outputAddresses);
454  for (size_t iList = 0; iList < nList; ++iList) {
455  outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr("__str__")()));
456  }
457 
458  try {
462 
463  already_executed = true;
464 
465  HLTEventProcessor processor(outputAddressesAsString);
466  processor.process(startPath, restartFailedWorkers);
467 
469  } catch (std::exception& e) {
470  B2ERROR("Uncaught exception encountered: " << e.what()); //should show module name
471  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
472  throw; //and let python's global handler do the rest
473  } catch (...) {
474  B2ERROR("Uncaught exception encountered!"); //should show module name
475  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
476  throw; //and let python's global handler do the rest
477  }
478 }
479 
480 BOOST_PYTHON_MODULE(hbasf2)
481 {
482  def("process", &process);
483 }
484 
Belle2::ZMQParent::poll
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:58
Belle2::PathIterator::next
void next()
increment.
Definition: PathIterator.h:59
Belle2::ModuleStatistics::c_Event
@ c_Event
Counting time/calls in event()
Definition: ModuleStatistics.h:45
prepareAsicCrosstalkSimDB.e
e
aux.
Definition: prepareAsicCrosstalkSimDB.py:53
Belle2::HLTEventProcessor::forkOut
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
Definition: HLTEventProcessor.cc:417
Belle2::DataStore::Instance
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
Belle2::Module::c_ParallelProcessingCertified
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:82
Belle2::InputController::resetForChildProcess
static void resetForChildProcess()
Reset InputController (e.g.
Definition: InputController.cc:16
Belle2::DataStore::setInitializeActive
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:94
Belle2::HLTEventProcessor::processEvent
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
Definition: HLTEventProcessor.cc:259
Belle2::ZMQParent::send
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:163
Belle2::RandomNumbers::useEventDependent
static void useEventDependent()
Set Event dependent Random Generator as current one.
Definition: RandomNumbers.cc:130
Belle2::HLTEventProcessor
EventProcessor to be used on the HLT with all specialities of the HLT processing:
Definition: HLTEventProcessor.h:45
Belle2::HLTEventProcessor::HLTEventProcessor
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
Definition: HLTEventProcessor.cc:75
Belle2::DBStore::reset
void reset(bool keepEntries=false)
Invalidate all payloads.
Definition: DBStore.cc:185
Belle2::HLTEventProcessor::release
void release()
Release the parent resource, which is needed after forking to not close it twice.
Definition: HLTEventProcessor.cc:409
Belle2::HLTEventProcessor::process
void process(PathPtr spath, bool restartFailedWorkers)
Process the given path.
Definition: HLTEventProcessor.cc:83
Belle2::EventProcessor::StoppedBySignalException
Exception thrown when execution is stopped by a signal.
Definition: EventProcessor.h:85
Belle2::HLTEventProcessor::runWorkers
void runWorkers(PathPtr path, unsigned int numProcesses)
Fork out as much workers as requested and in each run the given path using processCore.
Definition: HLTEventProcessor.cc:187
Belle2::EventProcessor::writeToStdErr
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
Definition: EventProcessor.cc:74
Belle2::LogSystem::resetMessageCounter
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
Definition: LogSystem.cc:154
Belle2::ModuleManager::allModulesHaveFlag
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
Definition: ModuleManager.cc:144
Belle2::DataStore::reset
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
Definition: DataStore.cc:86
Belle2::HLTEventProcessor::sendTerminatedMessage
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
Definition: HLTEventProcessor.cc:56
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2::ModuleCondition::EAfterConditionPath::c_Continue
@ c_Continue
After the conditional path, resume execution after this module.
Belle2::Environment::getNumberProcesses
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:147
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::PathPtr
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:30
Belle2::LogConfig::c_Error
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:40
Belle2::DBStore::Instance
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:36
Belle2::ZMQMessageFactory::createMessage
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Definition: ZMQMessageFactory.h:37
Belle2::RandomNumbers::initializeEvent
static void initializeEvent(bool force=false)
Initialize event information.
Definition: RandomNumbers.cc:110
Belle2::HLTEventProcessor::processCore
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
Definition: HLTEventProcessor.cc:222
Belle2::PathIterator::get
Module * get() const
dereference.
Definition: PathIterator.h:85
Belle2::DBStore::updateEvent
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:150
Belle2::ProcHandler::setProcessID
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:245
Belle2::LogSystem::Instance
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:33
Belle2::HLTEventProcessor::checkChildProcesses
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
Definition: HLTEventProcessor.cc:364
Belle2::Environment::Instance
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:31
Belle2::PathIterator
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:36
Belle2::DataStore::c_Event
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:61
Belle2::PathIterator::isDone
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:82
Belle2::DataStore::invalidateData
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:689
REGISTER_PYTHON_MODULE
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
Definition: RegisterPythonModule.h:46
Belle2::LogSystem::getMessageCounter
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:165
Belle2::ZMQParent::createIdentity
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:34