Belle II Software  release-06-00-14
HLTEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/utils/HLTEventProcessor.h>
9 
10 #include <boost/python.hpp>
11 #include <framework/utilities/RegisterPythonModule.h>
12 #include <framework/core/InputController.h>
13 #include <framework/pcore/ProcHandler.h>
14 
15 #include <framework/database/DBStore.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/core/Environment.h>
18 #include <framework/core/ModuleManager.h>
19 
20 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
21 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
22 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
23 
24 #include <TROOT.h>
25 
26 #include <sys/prctl.h>
27 #include <sys/wait.h>
28 
29 #include <chrono>
30 #include <thread>
31 #include <signal.h>
32 #include <zmq.h>
33 
34 using namespace Belle2;
35 using namespace boost::python;
36 
37 namespace {
39  static int g_signalReceived = 0;
40 
41  static void storeSignal(int signalNumber)
42  {
43  if (signalNumber == SIGINT) {
44  EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
45  }
46 
47  // We do not want to remove the first signal
48  if (g_signalReceived == 0) {
49  g_signalReceived = signalNumber;
50  }
51  }
52 }
53 
54 void HLTEventProcessor::sendTerminatedMessage(unsigned int pid, bool waitForConformation)
55 {
56  for (auto& socket : m_sockets) {
57  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage,
59  ZMQParent::send(socket, std::move(message));
60 
61  if (not waitForConformation) {
62  continue;
63  }
64  if (ZMQParent::poll({socket.get()}, 10 * 1000)) {
65  auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
66  B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
67  } else {
68  B2FATAL("Did not receive a confirmation message!");
69  }
70  }
71 }
72 
73 HLTEventProcessor::HLTEventProcessor(const std::vector<std::string>& outputAddresses)
74 {
75  m_sockets.reserve(outputAddresses.size());
76  for (const auto& address : outputAddresses) {
77  m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, false));
78  }
79 }
80 
81 void HLTEventProcessor::process(PathPtr path, bool restartFailedWorkers)
82 {
83  using namespace std::chrono_literals;
84 
85  m_moduleList = path->buildModulePathList();
86 
87  // Assert path is what we want: fully parallel certified, not empty. Set first module to master module
88  B2ASSERT("You try to process an empty path!", not m_moduleList.empty());
89  for (const auto& module : m_moduleList) {
90  bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
91  // entire conditional path must also be compatible
92  if (hasParallelFlag and module->hasCondition()) {
93  for (const auto& conditionPath : module->getAllConditionPaths()) {
95  hasParallelFlag = false;
96  }
97  }
98  }
99  B2ASSERT("Module with name " << module->getName() << " does not have parallel flag!", hasParallelFlag);
100  }
101 
102  // Initialize of all modules (including event() of master module)
103  installMainSignalHandlers();
104  processInitialize(m_moduleList);
105 
106  // Don't start processing in case of no master module
107  if (not m_master) {
108  B2ERROR("There is no module that provides event and run numbers (EventMetaData). "
109  "You must add the specific HLT module as first module to the path.");
110  }
111 
112  // Check if errors appeared. If yes, don't start the event processing.
113  const int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
114  if (numLogError != 0) {
115  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
116  }
117 
118  // Start the workers, which call the main loop
119  const int numProcesses = Environment::Instance().getNumberProcesses();
120  runWorkers(path, numProcesses);
121 
122  installMainSignalHandlers(storeSignal);
123  // Back in the main process: wait for the processes and monitor them
124  int numberOfRestartedWorkers = 0;
125  while (true) {
126  // check if we have received any signal from the user or OS.
127  // Killing of the remaining processes happens after the loop.
128  if (g_signalReceived > 0) {
129  B2WARNING("Received a signal to go down.");
130  break;
131  }
132 
133  // Test if we need more workers and if one has died
134  unsigned int presentWorkers;
135  unsigned int neededWorkers;
136 
137  std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
138  if (neededWorkers > 0) {
139  if (restartFailedWorkers) {
140  runWorkers(path, neededWorkers);
141  numberOfRestartedWorkers += neededWorkers;
142  } else {
143  B2ERROR("A worker failed. Will try to end the process smoothly now.");
144  break;
145  }
146  } else if (presentWorkers == 0) {
147  B2DEBUG(10, "All workers have cleanly exited. Will now also exit");
148  break;
149  }
150 
151  if (numberOfRestartedWorkers > numProcesses) {
152  B2ERROR("I needed to restart on total " << numberOfRestartedWorkers << ", which I think is abnormal. "
153  "Will terminate the process now!");
154  break;
155  }
156 
157  std::this_thread::sleep_for(10ms);
158  }
159 
160  checkChildProcesses();
161 
162  // if we still have/had processes, we should unregister them
163  std::this_thread::sleep_for(500ms);
164 
165  for (const int& pid : m_processList) {
166  if (kill(pid, SIGKILL) >= 0) {
167  B2WARNING("Needed to hard kill process " << pid);
168  } else {
169  B2DEBUG(100, "no process " << pid << " found, already gone?");
170  }
171  sendTerminatedMessage(pid, false);
172  }
173  m_processList.clear();
174 
175  B2DEBUG(10, "Done here");
176 
177  // Normally, we would call terminate here, but not on HLT!
178  // Normally, we would print the error summary here, but not on HLT!
179  if (g_signalReceived == SIGINT) {
180  installSignalHandler(SIGINT, SIG_DFL);
181  raise(SIGINT);
182  }
183 }
184 
185 void HLTEventProcessor::runWorkers(PathPtr path, unsigned int numProcesses)
186 {
187  for (unsigned int i = 0; i < numProcesses; i++) {
188  if (forkOut()) {
189  // Do only run in forked out worker process:
190  B2DEBUG(10, "Starting a new worker process");
191  // Reset the parent and sockets
192  release();
193 
194  // Start the main loop with our signal handling and error catching
195  installMainSignalHandlers(storeSignal);
196  try {
197  processCore(path);
198  } catch (StoppedBySignalException& e) {
199  // close all open ROOT files, ROOT's exit handler will crash otherwise
200  gROOT->GetListOfFiles()->Delete();
201 
202  B2ERROR(e.what());
203  exit(1);
204  } catch (...) {
205  if (m_eventMetaDataPtr)
206  B2ERROR("Exception occured in exp/run/evt: "
207  << m_eventMetaDataPtr->getExperiment() << " / "
208  << m_eventMetaDataPtr->getRun() << " / "
209  << m_eventMetaDataPtr->getEvent());
210  throw;
211  }
212 
213  B2DEBUG(10, "Ending a worker process here.");
214  // Ok, we are done here!
215  exit(0);
216  }
217  }
218 }
219 
221 {
222  bool terminationRequested = false;
223  bool firstRound = true;
224 
225  // Initialisation is done
227 
228  // Set the previous event meta data to something invalid
229  m_previousEventMetaData.setEndOfData();
230 
231  while (not terminationRequested) {
232  B2DEBUG(100, "Processing new event");
233 
234  // Start the measurement
235  m_processStatisticsPtr->startGlobal();
236 
237  // Main call to event() of the modules, and maybe beginRun() and endRun()
238  PathIterator moduleIter(path);
239  terminationRequested = processEvent(moduleIter, firstRound);
240 
241  // Delete event related data in DataStore
243 
244  // Stop the measurement
245  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_Event);
246 
247  // We are surely not in the first round the next time
248  firstRound = false;
249  }
250 
251  // End last run with a terminate. Yes, we are not calling a endRun() here and yes, we are calling this in the worker
252  B2DEBUG(10, "Calling terminate");
253  m_eventMetaDataPtr.create();
254  processTerminate(m_moduleList);
255 }
256 
257 bool HLTEventProcessor::processEvent(PathIterator moduleIter, bool firstRound)
258 {
259  while (not moduleIter.isDone()) {
260  Module* module = moduleIter.get();
261  B2DEBUG(10, "Starting event of " << module->getName());
262 
263  // The actual call of the event function
264  if (module != m_master) {
265  // If this is not the master module it is quite simple: just call the event function
266  callEvent(module);
267 
268  // Check for a second master module. Cannot do this if we are in the first round after initialize
269  // (as previous event meta data is not set properly here)
270  if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
271  (*m_eventMetaDataPtr != m_previousEventMetaData)) {
272  B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and "
273  << module->getName());
274  }
275  } else {
276  if (not firstRound) {
277  // Also call the event function for the master, but not the first time
278  callEvent(module);
279  }
280  // initialize random number state for the event handling after we have
281  // recieved the event information from the master module.
283  }
284 
285  if (g_signalReceived != 0) {
286  if (g_signalReceived != SIGINT) {
287  throw StoppedBySignalException(g_signalReceived);
288  } else {
289  B2DEBUG(10, "Received a SIGINT in the worker process...");
290  return true;
291  }
292  }
293 
294  B2ASSERT("The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
295 
296  if (m_eventMetaDataPtr->isEndOfData()) {
297  // Immediately leave the loop and terminate (true)
298  return true;
299  }
300 
301  if (module == m_master and not firstRound) {
302  if (m_eventMetaDataPtr->isEndOfRun()) {
303  B2DEBUG(10, "Calling endRun()");
304  // call endRun() of all modules (internally uses the previous event meta data) and skip to the next event
305  m_processStatisticsPtr->suspendGlobal();
306  m_inRun = true;
307  processEndRun();
308  m_processStatisticsPtr->resumeGlobal();
309 
310  // Store the current event meta data for the next round
311  m_previousEventMetaData = *m_eventMetaDataPtr;
312 
313  // Leave this event, but not the full processing (false)
314  return false;
315  } else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
316  // The run has changes (or we never had one), so call beginRun() before going on
317  m_processStatisticsPtr->suspendGlobal();
318  processBeginRun();
319  m_processStatisticsPtr->resumeGlobal();
320  }
321 
322  const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
323  (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
324  const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
325  and not m_previousEventMetaData.isEndOfRun();
326  if (runChangedWithoutNotice) {
327  m_processStatisticsPtr->suspendGlobal();
328 
329  m_inRun = true;
330  processEndRun();
331  processBeginRun();
332 
333  m_processStatisticsPtr->resumeGlobal();
334  }
335 
336  // make sure we use the event dependent generator again
338 
339  // and the correct database
341 
342  // Store the current event meta data for the next round
343  m_previousEventMetaData = *m_eventMetaDataPtr;
344  }
345 
346  // Check for the module conditions, evaluate them and if one is true, switch to the new path
347  if (module->evalCondition()) {
348  PathPtr condPath = module->getConditionPath();
349  // continue with parent Path after condition path is executed?
350  if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
351  moduleIter = PathIterator(condPath, moduleIter);
352  } else {
353  moduleIter = PathIterator(condPath);
354  }
355  } else {
356  moduleIter.next();
357  }
358  }
359  return false;
360 }
361 
362 std::pair<unsigned int, unsigned int> HLTEventProcessor::checkChildProcesses()
363 {
364  unsigned int needToRestart = 0;
365 
366  // Check for processes, which where there last time but are gone now (so they died)
367  for (auto iter = m_processList.begin(); iter != m_processList.end();) {
368  const auto& pid = *iter;
369 
370  // check the status of this process pid
371  int status;
372  const int result = waitpid(pid, &status, WNOHANG);
373  if (result == -1) {
374  if (errno == EINTR) {
375  // interrupted, try again next time
376  ++iter;
377  continue;
378  } else {
379  B2FATAL("waitpid() failed.");
380  }
381  } else if (result == 0) {
382  // No change, so lets continue with the next worker
383  ++iter;
384  continue;
385  }
386 
387  B2ASSERT("Do not understand the result of waitpid()", result == pid);
388 
389  // state has changed, which means it is dead!
390  const auto exitCode = WEXITSTATUS(status);
391 
392  // we only need to restart unexpected deads
393  if (exitCode != 0) {
394  B2WARNING("A worker process has died unexpected!");
395  needToRestart += 1;
396 
397  sendTerminatedMessage(pid, true);
398  }
399 
400  // once a process is gone from the global list, remove them from our own, too.
401  iter = m_processList.erase(iter);
402  }
403 
404  return {m_processList.size(), needToRestart};
405 }
406 
408 {
409  for (auto& socket : m_sockets) {
410  socket.release();
411  }
412  m_parent.reset();
413 }
414 
416 {
417  fflush(stdout);
418  fflush(stderr);
419 
420  pid_t pid = fork();
421 
422  if (pid > 0) {
423  m_processList.push_back(pid);
424  return false;
425  } else if (pid < 0) {
426  B2FATAL("fork() failed: " << strerror(errno));
427  } else {
428  // Child process
429  // Reset some python state: signals, threads, gil in the child
430  PyOS_AfterFork();
431  // InputController becomes useless in child process
433  // die when parent dies
434  prctl(PR_SET_PDEATHSIG, SIGHUP);
435 
436  ProcHandler::setProcessID(getpid());
437  return true;
438  }
439 }
440 
441 void process(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false)
442 {
443  static bool already_executed = false;
444  B2ASSERT("Can not run process() on HLT twice per file!", not already_executed);
445 
446  auto& environment = Environment::Instance();
447  B2ASSERT("HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
448 
449  namespace py = boost::python;
450  std::vector<std::string> outputAddressesAsString;
451  size_t nList = py::len(outputAddresses);
452  for (size_t iList = 0; iList < nList; ++iList) {
453  outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr("__str__")()));
454  }
455 
456  try {
460 
461  already_executed = true;
462 
463  HLTEventProcessor processor(outputAddressesAsString);
464  processor.process(startPath, restartFailedWorkers);
465 
467  } catch (std::exception& e) {
468  B2ERROR("Uncaught exception encountered: " << e.what()); //should show module name
469  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
470  throw; //and let python's global handler do the rest
471  } catch (...) {
472  B2ERROR("Uncaught exception encountered!"); //should show module name
473  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
474  throw; //and let python's global handler do the rest
475  }
476 }
477 
478 BOOST_PYTHON_MODULE(hbasf2)
479 {
480  def("process", &process);
481 }
482 
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:52
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:92
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
Definition: DataStore.cc:84
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:686
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:139
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:29
Exception thrown when execution is stopped by a signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void process(PathPtr spath, bool restartFailedWorkers)
Process the given path.
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void runWorkers(PathPtr path, unsigned int numProcesses)
Fork out as much workers as requested and in each run the given path using processCore.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
static void resetForChildProcess()
Reset InputController (e.g.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
Definition: LogSystem.cc:150
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
@ c_Continue
After the conditional path, resume execution after this module.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
Base class for Modules.
Definition: Module.h:72
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:32
void reset(bool keepEntries=false)
Invalidate all payloads.
Definition: DBStore.cc:175
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:26
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:140
Abstract base class for different kinds of events.