Belle II Software  release-06-02-00
EventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 //first because of python include
10 #include <framework/core/Module.h>
11 
12 #include <framework/core/EventProcessor.h>
13 
14 #include <framework/core/PathIterator.h>
15 #include <framework/datastore/DataStore.h>
16 #include <framework/database/DBStore.h>
17 #include <framework/database/Database.h>
18 #include <framework/logging/Logger.h>
19 #include <framework/core/Environment.h>
20 #include <framework/core/DataFlowVisualization.h>
21 #include <framework/core/RandomNumbers.h>
22 #include <framework/core/MetadataService.h>
23 #include <framework/gearbox/Unit.h>
24 #include <framework/utilities/Utils.h>
25 
26 #ifdef HAS_CALLGRIND
27 #include <valgrind/callgrind.h>
34 #define CALL_MODULE(module,x) \
35  if(m_profileModule && m_profileModule==module && RUNNING_ON_VALGRIND){\
36  CALLGRIND_START_INSTRUMENTATION;\
37  module->x();\
38  CALLGRIND_STOP_INSTRUMENTATION;\
39  }else{\
40  module->x();\
41  }
42 #else
43 #define CALL_MODULE(module, x) module->x()
44 #endif
45 
46 #include <TROOT.h>
47 
48 #include <csignal>
49 #include <unistd.h>
50 #include <cstring>
51 
52 using namespace std;
53 using namespace Belle2;
54 
55 namespace {
56  static int gSignalReceived = 0;
57  static void signalHandler(int signal)
58  {
59  gSignalReceived = signal;
60 
61  if (signal == SIGINT) {
62  EventProcessor::writeToStdErr("Received Ctrl+C, basf2 will exit safely. (Press Ctrl+\\ (SIGQUIT) to abort immediately - this will break output files.)\n");
63  }
64  }
65 }
66 EventProcessor::StoppedBySignalException::StoppedBySignalException(int signal_):
67  runtime_error("Execution stopped by signal " + to_string(signal_) + "!"),
68  signal(signal_)
69 {
70 }
71 
72 void EventProcessor::writeToStdErr(const char msg[])
73 {
74  //signal handlers are called asynchronously, making many standard functions (including output) dangerous
75  //write() is, however, safe, so we'll use that to write to stderr.
76 
77  //strlen() not explicitly in safe list, but doesn't have any error handling routines that might alter global state
78  const int len = strlen(msg);
79 
80  int rc = write(STDERR_FILENO, msg, len);
81  (void) rc; //ignore return value (there's nothing we can do about a failed write)
82 
83 }
84 
87 {
88 
89 }
90 
92 
93 namespace {
96  struct NumberEventsOverrideGuard {
98  explicit NumberEventsOverrideGuard(unsigned int newValue)
99  {
102  }
104  ~NumberEventsOverrideGuard()
105  {
107  }
109  unsigned int m_maxEvent;
110  };
111 }
112 
113 long EventProcessor::getMaximumEventNumber(long maxEvent) const
114 {
115  //Check whether the number of events was set via command line argument
116  unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
117  if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
118  return numEventsArgument;
119  }
120  return maxEvent;
121 }
122 
123 void EventProcessor::process(const PathPtr& startPath, long maxEvent)
124 {
125  maxEvent = getMaximumEventNumber(maxEvent);
126  // Make sure the NumberEventsOverride reflects the actual number if
127  // process(path, N) was used instead of -n and that it's reset to what it was
128  // after we're done with processing()
129  NumberEventsOverrideGuard numberOfEventsOverrideGuard(maxEvent);
130 
131  //Get list of modules which could be executed during the data processing.
132  ModulePtrList moduleList = startPath->buildModulePathList();
133 
134  //Find the adress of the module we want to profile
135  if (!m_profileModuleName.empty()) {
136  for (const auto& module : moduleList) {
137  if (module->getName() == m_profileModuleName) {
138  m_profileModule = module.get();
139  break;
140  }
141  }
142  if (!m_profileModule)
143  B2FATAL("Module profiling was requested via --profile, but no module '" << m_profileModuleName << "' was found!");
144  }
145 
146  //Initialize modules
147  processInitialize(moduleList);
148 
149  //do we want to visualize DataStore input/ouput?
150  if (Environment::Instance().getVisualizeDataFlow()) {
151  DataFlowVisualization v(&DataStore::Instance().getDependencyMap());
152  v.visualizePath("dataflow.dot", *startPath);
153  }
154 
155  //Don't start processing in case of no master module
156  if (!m_master) {
157  B2ERROR("There is no module that provides event and run numbers (EventMetaData). You must add either the EventInfoSetter or an input module (e.g. RootInput) to the beginning of your path.");
158  }
159 
160  //Check if errors appeared. If yes, don't start the event processing.
162  if ((numLogError == 0) && m_master) {
164  try {
165  processCore(startPath, moduleList, maxEvent); //Do the event processing
166  } catch (StoppedBySignalException& e) {
167  if (e.signal != SIGINT) {
168  // close all open ROOT files, ROOT's exit handler will crash otherwise
169  gROOT->GetListOfFiles()->Delete();
170 
171  B2FATAL(e.what());
172  }
173  //in case of SIGINT, we move on to processTerminate() to shut down safely
174  } catch (...) {
175  if (m_eventMetaDataPtr)
176  B2ERROR("Exception occured in exp/run/evt: "
177  << m_eventMetaDataPtr->getExperiment() << " / "
178  << m_eventMetaDataPtr->getRun() << " / "
179  << m_eventMetaDataPtr->getEvent());
180  throw;
181  }
182 
183  } else {
184  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
185  }
186 
187  //Terminate modules
188  processTerminate(moduleList);
189 
191 
192  if (gSignalReceived == SIGINT) {
193  const auto msg = R"(Processing aborted via SIGINT, terminating.
194  Output files have been closed safely and should be readable. However
195  processing was NOT COMPLETE. The output files do contain only events
196  processed until this point.)";
197  if (m_eventMetaDataPtr)
198  B2ERROR(msg
199  << LogVar("last_experiment", m_eventMetaDataPtr->getExperiment())
200  << LogVar("last_run", m_eventMetaDataPtr->getRun())
201  << LogVar("last_event", m_eventMetaDataPtr->getEvent()));
202  else
203  B2ERROR(msg);
204  installSignalHandler(SIGINT, SIG_DFL);
205  raise(SIGINT);
206  }
207 }
208 
209 
210 //============================================================================
211 // Protected methods
212 //============================================================================
213 
215 {
216  LogSystem& logSystem = LogSystem::Instance();
217  const bool collectStats = !Environment::Instance().getNoStats();
218  // set up logging
219  logSystem.updateModule(&(module->getLogConfig()), module->getName());
220  // set up statistics is requested
221  if (collectStats) m_processStatisticsPtr->startModule();
222  // call module
223  CALL_MODULE(module, event);
224  // stop timing
225  if (collectStats) m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_Event);
226  // reset logging
227  logSystem.updateModule(nullptr);
228 };
229 
230 void EventProcessor::processInitialize(const ModulePtrList& modulePathList, bool setEventInfo)
231 {
232  LogSystem& logSystem = LogSystem::Instance();
233  auto dbsession = Database::Instance().createScopedUpdateSession();
234 
235  m_processStatisticsPtr.registerInDataStore();
236  //TODO I might want to overwrite it in initialize (e.g. if read from file)
237  // For parallel processing or subevents, I don't want that, though.
238  // Maybe make this a function argument?
240  m_processStatisticsPtr.create();
241  m_processStatisticsPtr->startGlobal();
242 
243  MetadataService::Instance().addBasf2Status("initializing");
244 
245  for (const ModulePtr& modPtr : modulePathList) {
246  Module* module = modPtr.get();
247 
248  if (module->hasUnsetForcedParams()) {
249  //error message was printed by module
250  continue;
251  }
252 
253  //Set the module dependent log level
254  logSystem.updateModule(&(module->getLogConfig()), module->getName());
256 
257  //Do initialization
258  m_processStatisticsPtr->initModule(module);
259  m_processStatisticsPtr->startModule();
260  CALL_MODULE(module, initialize);
262 
263  //Set the global log level
264  logSystem.updateModule(nullptr);
265 
266  //Check whether this is the master module
267  if (!m_master && DataStore::Instance().getEntry(m_eventMetaDataPtr) != nullptr) {
268  B2DEBUG(100, "Found module providing EventMetaData: " << module->getName());
269  m_master = module;
270  if (setEventInfo) {
271  callEvent(module);
272  // update Database payloads: we now have valid event meta data unless
273  // we don't process any events
275  }
276  }
277 
278  if (gSignalReceived != 0) {
279  throw StoppedBySignalException(gSignalReceived);
280  }
281  }
283 }
284 
285 void EventProcessor::installSignalHandler(int sig, void (*fn)(int))
286 {
287  struct sigaction s;
288  memset(&s, '\0', sizeof(s));
289 
290  s.sa_handler = fn;
291  sigemptyset(&s.sa_mask);
292  if (sig == SIGCHLD)
293  s.sa_flags |= SA_NOCLDSTOP; //don't produce signal when children are stopped
294 
295  if (sigaction(sig, &s, nullptr) != 0) {
296  B2FATAL("Cannot setup signal handler for signal " << sig);
297  }
298 }
299 
301 {
302  if (!fn)
303  fn = signalHandler;
304  installSignalHandler(SIGINT, fn);
305  installSignalHandler(SIGTERM, fn);
306  installSignalHandler(SIGQUIT, fn);
307 }
308 
309 bool EventProcessor::processEvent(PathIterator moduleIter, bool skipMasterModule)
310 {
311  double time = Utils::getClock() / Unit::s;
313  MetadataService::Instance().addBasf2Status("running event loop");
314  m_lastMetadataUpdate = time;
315  }
316 
317  const bool collectStats = !Environment::Instance().getNoStats();
318 
319  while (!moduleIter.isDone()) {
320  Module* module = moduleIter.get();
321 
322  // run the module ... unless we don't want to
323  if (!(skipMasterModule && module == m_master)) {
324  callEvent(module);
325  }
326 
327  //Check for end of data
328  if ((m_eventMetaDataPtr && (m_eventMetaDataPtr->isEndOfData())) ||
329  ((module == m_master) && !m_eventMetaDataPtr)) {
330  if (module != m_master) {
331  B2WARNING("Event processing stopped by module '" << module->getName() <<
332  "', which is not in control of event processing (does not provide EventMetaData)");
333  }
334  return true;
335  }
336 
337  //Handle EventMetaData changes by master module
338  if (module == m_master) {
339 
340  //initialize random number state for the event
342 
343  //Check for a change of the run
344  if ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) ||
346 
347  if (collectStats)
348  m_processStatisticsPtr->suspendGlobal();
349 
350  processEndRun();
351  processBeginRun(skipMasterModule);
352 
353  if (collectStats)
354  m_processStatisticsPtr->resumeGlobal();
355  }
356 
358 
359  //make sure we use the event dependent generator again
361 
363 
364  } else {
365  //Check for a second master module. Cannot do this if we skipped the
366  //master module as the EventMetaData is probably set before we call this
367  //function
368  if (!skipMasterModule && m_eventMetaDataPtr &&
370  B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and " << module->getName());
371  }
372  }
373 
374  if (gSignalReceived != 0) {
375  throw StoppedBySignalException(gSignalReceived);
376  }
377 
378  //Check for the module conditions, evaluate them and if one is true switch to the new path
379  if (module->evalCondition()) {
380  PathPtr condPath = module->getConditionPath();
381  //continue with parent Path after condition path is executed?
382  if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
383  moduleIter = PathIterator(condPath, moduleIter);
384  } else {
385  moduleIter = PathIterator(condPath);
386  }
387  } else {
388  moduleIter.next();
389  }
390  } //end module loop
391  return false;
392 }
393 
394 void EventProcessor::processCore(const PathPtr& startPath, const ModulePtrList& modulePathList, long maxEvent, bool isInputProcess)
395 {
397  m_moduleList = modulePathList;
398  //Remember the previous event meta data, and identify end of data meta data
399  m_previousEventMetaData.setEndOfData(); //invalid start state
400 
401  const bool collectStats = !Environment::Instance().getNoStats();
402 
403  //Loop over the events
404  long currEvent = 0;
405  bool endProcess = false;
406  while (!endProcess) {
407  if (collectStats)
408  m_processStatisticsPtr->startGlobal();
409 
410  PathIterator moduleIter(startPath);
411  endProcess = processEvent(moduleIter, isInputProcess && currEvent == 0);
412 
413  //Delete event related data in DataStore
415 
416  currEvent++;
417  if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
418  if (collectStats)
420  } //end event loop
421 
422  //End last run
423  m_eventMetaDataPtr.create();
424  processEndRun();
425 }
426 
427 
429 {
430  MetadataService::Instance().addBasf2Status("terminating");
431 
432  LogSystem& logSystem = LogSystem::Instance();
433  ModulePtrList::const_reverse_iterator listIter;
434  m_processStatisticsPtr->startGlobal();
435 
436  for (listIter = modulePathList.rbegin(); listIter != modulePathList.rend(); ++listIter) {
437  Module* module = listIter->get();
438 
439  //Set the module dependent log level
440  logSystem.updateModule(&(module->getLogConfig()), module->getName());
441 
442  //Do termination
443  m_processStatisticsPtr->startModule();
444  CALL_MODULE(module, terminate);
446 
447  //Set the global log level
448  logSystem.updateModule(nullptr);
449  }
450 
452 }
453 
454 
456 {
457  MetadataService::Instance().addBasf2Status("beginning run");
458 
459  m_inRun = true;
460  auto dbsession = Database::Instance().createScopedUpdateSession();
461 
462  LogSystem& logSystem = LogSystem::Instance();
463  m_processStatisticsPtr->startGlobal();
464 
465  if (!skipDB) DBStore::Instance().update();
466 
467  //initialize random generator for end run
469 
470  for (const ModulePtr& modPtr : m_moduleList) {
471  Module* module = modPtr.get();
472 
473  //Set the module dependent log level
474  logSystem.updateModule(&(module->getLogConfig()), module->getName());
475 
476  //Do beginRun() call
477  m_processStatisticsPtr->startModule();
478  CALL_MODULE(module, beginRun);
480 
481  //Set the global log level
482  logSystem.updateModule(nullptr);
483  }
484 
486 }
487 
488 
490 {
492 
493  if (!m_inRun)
494  return;
495  m_inRun = false;
496 
497  LogSystem& logSystem = LogSystem::Instance();
498  m_processStatisticsPtr->startGlobal();
499 
500  const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
502 
503  //initialize random generator for end run
505 
506  for (const ModulePtr& modPtr : m_moduleList) {
507  Module* module = modPtr.get();
508 
509  //Set the module dependent log level
510  logSystem.updateModule(&(module->getLogConfig()), module->getName());
511 
512  //Do endRun() call
513  m_processStatisticsPtr->startModule();
514  CALL_MODULE(module, endRun);
516 
517  //Set the global log level
518  logSystem.updateModule(nullptr);
519  }
520  *m_eventMetaDataPtr = newEventMetaData;
521 
523 }
class to visualize data flow between modules.
In the store you can park objects that have to be accessed by various modules.
Definition: DataStore.h:51
DependencyMap & getDependencyMap()
Return map of depedencies between modules.
Definition: DataStore.h:517
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:52
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:92
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:686
void setModule(const Module &mod)
Set the current module (for getCurrentModuleInfo())
Definition: DependencyMap.h:60
void setNumberEventsOverride(unsigned int nevents)
Override the number of events in run 1 for EventInfoSetter module.
Definition: Environment.h:63
bool getNoStats() const
Disable collection of statistics during event processing.
Definition: Environment.h:181
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:29
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Definition: Environment.h:66
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
void setEndOfData()
Marks the end of the data processing.
int getRun() const
Run Getter.
int getExperiment() const
Experiment Getter.
Exception thrown when execution is stopped by a signal.
void processEndRun()
Calls the end run methods of all modules.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
std::string m_profileModuleName
Name of the module which should be profiled, empty if no profiling is requested.
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
double m_lastMetadataUpdate
Time in seconds of last call for metadata update in event loop.
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
Module * m_profileModule
Adress of the module which we want to profile, nullptr if no profiling is requested.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
bool processEvent(PathIterator moduleIter, bool skipMasterModule)
Calls event() functions on all modules for the current event.
virtual ~EventProcessor()
Destructor.
const Module * m_master
The master module that determines the experiment/run/event number.
EventProcessor()
Constructor.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
ModulePtrList m_moduleList
List of all modules in order initialized.
long getMaximumEventNumber(long maxEvent) const
Calculate the maximum event number out of the argument from command line and the environment.
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.
double m_metadataUpdateInterval
Minimal time difference in seconds for metadata updates in event loop.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
void printErrorSummary()
Print error/warning summary at end of execution.
Definition: LogSystem.cc:206
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
void addBasf2Status(const std::string &message="")
Add metadata of basf2 status.
static MetadataService & Instance()
Static method to get a reference to the MetadataService instance.
@ c_Continue
After the conditional path, resume execution after this module.
@ c_Init
Counting time/calls in initialize()
@ c_EndRun
Counting time/calls in endRun()
@ c_Term
Counting time/calls in terminate()
@ c_BeginRun
Counting time/calls in beginRun()
@ c_Event
Counting time/calls in event()
Base class for Modules.
Definition: Module.h:72
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:187
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void initializeEndRun()
Initialize run independent random generator for end run.
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static const double s
[second]
Definition: Unit.h:95
Class to store variables with their name which were sent to the logging service.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
static Database & Instance()
Instance of a singleton Database.
Definition: Database.cc:41
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:26
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:140
void update()
Updates all objects that are outside their interval of validity.
Definition: DBStore.cc:77
ScopeGuard createScopedUpdateSession()
Make sure we have efficient http pipelinging during initialize/beginRun but don't keep session alive ...
Definition: Database.cc:61
double getClock()
Return current value of the real-time clock.
Definition: Utils.cc:65
Abstract base class for different kinds of events.