Belle II Software development
EventProcessor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9//first because of python include
10#include <framework/core/Module.h>
11
12#include <framework/core/EventProcessor.h>
13
14#include <framework/core/PathIterator.h>
15#include <framework/datastore/DataStore.h>
16#include <framework/database/DBStore.h>
17#include <framework/database/Database.h>
18#include <framework/logging/Logger.h>
19#include <framework/core/Environment.h>
20#include <framework/core/DataFlowVisualization.h>
21#include <framework/core/RandomNumbers.h>
22#include <framework/core/MetadataService.h>
23#include <framework/gearbox/Unit.h>
24#include <framework/utilities/Utils.h>
25
26#ifdef HAS_CALLGRIND
27#include <valgrind/callgrind.h>
34#define CALL_MODULE(module,x) \
35 if(m_profileModule && m_profileModule==module && RUNNING_ON_VALGRIND){\
36 CALLGRIND_START_INSTRUMENTATION;\
37 module->x();\
38 CALLGRIND_STOP_INSTRUMENTATION;\
39 }else{\
40 module->x();\
41 }
42#else
43#define CALL_MODULE(module, x) module->x()
44#endif
45
46#include <TROOT.h>
47
48#include <csignal>
49#include <unistd.h>
50#include <cstring>
51
52using namespace std;
53using namespace Belle2;
54
55namespace {
56 static int gSignalReceived = 0;
57 static void signalHandler(int signal)
58 {
59 gSignalReceived = signal;
60
61 if (signal == SIGINT) {
62 EventProcessor::writeToStdErr("Received Ctrl+C, basf2 will exit safely. (Press Ctrl+\\ (SIGQUIT) to abort immediately - this will break output files.)\n");
63 }
64 }
65}
67 runtime_error("Execution stopped by signal " + to_string(signal_) + "!"),
68 signal(signal_)
69{
70}
71
72void EventProcessor::writeToStdErr(const char msg[])
73{
74 //signal handlers are called asynchronously, making many standard functions (including output) dangerous
75 //write() is, however, safe, so we'll use that to write to stderr.
76
77 //strlen() not explicitly in safe list, but doesn't have any error handling routines that might alter global state
78 const int len = strlen(msg);
79
80 int rc = write(STDERR_FILENO, msg, len);
81 (void) rc; //ignore return value (there's nothing we can do about a failed write)
82
83}
84
87{
88
89}
90
92
93namespace {
96 struct NumberEventsOverrideGuard {
98 explicit NumberEventsOverrideGuard(unsigned int newValue)
99 {
102 }
104 ~NumberEventsOverrideGuard()
105 {
107 }
109 unsigned int m_maxEvent;
110 };
111}
112
114{
115 //Check whether the number of events was set via command line argument
116 unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
117 if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
118 return numEventsArgument;
119 }
120 return maxEvent;
121}
122
123void EventProcessor::process(const PathPtr& startPath, long maxEvent)
124{
125 maxEvent = getMaximumEventNumber(maxEvent);
126 // Make sure the NumberEventsOverride reflects the actual number if
127 // process(path, N) was used instead of -n and that it's reset to what it was
128 // after we're done with processing()
129 NumberEventsOverrideGuard numberOfEventsOverrideGuard(maxEvent);
130
131 //Get list of modules which could be executed during the data processing.
132 ModulePtrList moduleList = startPath->buildModulePathList();
133
134 //Find the address of the module we want to profile
135 if (!m_profileModuleName.empty()) {
136 for (const auto& module : moduleList) {
137 if (module->getName() == m_profileModuleName) {
138 m_profileModule = module.get();
139 break;
140 }
141 }
142 if (!m_profileModule)
143 B2FATAL("Module profiling was requested via --profile, but no module '" << m_profileModuleName << "' was found!");
144 }
145
146 //Initialize modules
147 processInitialize(moduleList);
148
149 // SteerRootInputModule might have changed the number of events to be processed
150 for (const auto& module : moduleList) {
151 if (module->getType() == "SteerRootInput") {
152 if (maxEvent != Environment::Instance().getNumberEventsOverride()) {
153 B2INFO("Module 'SteerRootInputModule' is controlling the number of processed events.");
156 }
157 break;
158 }
159 }
160
161 //do we want to visualize DataStore input/output?
162 if (Environment::Instance().getVisualizeDataFlow()) {
163 DataFlowVisualization v(&DataStore::Instance().getDependencyMap());
164 v.visualizePath("dataflow.dot", *startPath);
165 }
166
167 //Don't start processing in case of no master module
168 if (!m_master) {
169 B2ERROR("There is no module that provides event and run numbers (EventMetaData). You must add either the EventInfoSetter or an input module (e.g. RootInput) to the beginning of your path.");
170 }
171
172 //Check if errors appeared. If yes, don't start the event processing.
174 if ((numLogError == 0) && m_master) {
176 try {
177 processCore(startPath, moduleList, maxEvent); //Do the event processing
178 } catch (StoppedBySignalException& e) {
179 if (e.signal != SIGINT) {
180 // close all open ROOT files, ROOT's exit handler will crash otherwise
181 gROOT->GetListOfFiles()->Delete();
182
183 B2FATAL(e.what());
184 }
185 //in case of SIGINT, we move on to processTerminate() to shut down safely
186 } catch (...) {
188 B2ERROR("Exception occurred in exp/run/evt: "
189 << m_eventMetaDataPtr->getExperiment() << " / "
190 << m_eventMetaDataPtr->getRun() << " / "
191 << m_eventMetaDataPtr->getEvent());
192 throw;
193 }
194
195 } else {
196 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
197 }
198
199 //Terminate modules
200 processTerminate(moduleList);
201
203
204 if (gSignalReceived == SIGINT) {
205 const auto msg = R"(Processing aborted via SIGINT, terminating.
206 Output files have been closed safely and should be readable. However
207 processing was NOT COMPLETE. The output files do contain only events
208 processed until this point.)";
210 B2ERROR(msg
211 << LogVar("last_experiment", m_eventMetaDataPtr->getExperiment())
212 << LogVar("last_run", m_eventMetaDataPtr->getRun())
213 << LogVar("last_event", m_eventMetaDataPtr->getEvent()));
214 else
215 B2ERROR(msg);
216 installSignalHandler(SIGINT, SIG_DFL);
217 raise(SIGINT);
218 }
219}
220
221
222//============================================================================
223// Protected methods
224//============================================================================
225
227{
228 LogSystem& logSystem = LogSystem::Instance();
229 const bool collectStats = !Environment::Instance().getNoStats();
230 // set up logging
231 logSystem.updateModule(&(module->getLogConfig()), module->getName());
232 // set up statistics is requested
233 if (collectStats) m_processStatisticsPtr->startModule();
234 // call module
235 CALL_MODULE(module, event);
236 // stop timing
237 if (collectStats) m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_Event);
238 // reset logging
239 logSystem.updateModule(nullptr);
240};
241
242void EventProcessor::processInitialize(const ModulePtrList& modulePathList, bool setEventInfo)
243{
244 LogSystem& logSystem = LogSystem::Instance();
246
247 m_processStatisticsPtr.registerInDataStore();
248 //TODO I might want to overwrite it in initialize (e.g. if read from file)
249 // For parallel processing or subevents, I don't want that, though.
250 // Maybe make this a function argument?
252 m_processStatisticsPtr.create();
253 m_processStatisticsPtr->startGlobal();
254
256
257 // EventExtraInfo is needed in several modules so register it here
258 m_eventExtraInfo.registerInDataStore();
259
260 for (const ModulePtr& modPtr : modulePathList) {
261 Module* module = modPtr.get();
262
263 if (module->hasUnsetForcedParams()) {
264 //error message was printed by module
265 continue;
266 }
267
268 //Set the module dependent log level
269 logSystem.updateModule(&(module->getLogConfig()), module->getName());
271
272 //Do initialization
273 m_processStatisticsPtr->initModule(module);
274 m_processStatisticsPtr->startModule();
275 CALL_MODULE(module, initialize);
277
278 //Set the global log level
279 logSystem.updateModule(nullptr);
280
281 //Check whether this is the master module
282 if (!m_master && DataStore::Instance().getEntry(m_eventMetaDataPtr) != nullptr) {
283 B2DEBUG(100, "Found module providing EventMetaData: " << module->getName());
284 m_master = module;
285 if (setEventInfo) {
286 callEvent(module);
287 // update Database payloads: we now have valid event meta data unless
288 // we don't process any events
290 }
291 }
292
293 if (gSignalReceived != 0) {
294 throw StoppedBySignalException(gSignalReceived);
295 }
296 }
298}
299
300void EventProcessor::installSignalHandler(int sig, void (*fn)(int))
301{
302 struct sigaction s;
303 memset(&s, '\0', sizeof(s));
304
305 s.sa_handler = fn;
306 sigemptyset(&s.sa_mask);
307 if (sig == SIGCHLD)
308 s.sa_flags |= SA_NOCLDSTOP; //don't produce signal when children are stopped
309
310 if (sigaction(sig, &s, nullptr) != 0) {
311 B2FATAL("Cannot setup signal handler for signal " << sig);
312 }
313}
314
316{
317 if (!fn)
318 fn = signalHandler;
319 installSignalHandler(SIGINT, fn);
320 installSignalHandler(SIGTERM, fn);
321 installSignalHandler(SIGQUIT, fn);
322}
323
324bool EventProcessor::processEvent(PathIterator moduleIter, bool skipMasterModule)
325{
326 double time = Utils::getClock() / Unit::s;
328 MetadataService::Instance().addBasf2Status("running event loop");
330 }
331
332 const bool collectStats = !Environment::Instance().getNoStats();
333
334 while (!moduleIter.isDone()) {
335 Module* module = moduleIter.get();
336
337 // run the module ... unless we don't want to
338 if (!(skipMasterModule && module == m_master)) {
339 callEvent(module);
340 }
341
342 //Check for end of data
343 if ((m_eventMetaDataPtr && (m_eventMetaDataPtr->isEndOfData())) ||
344 ((module == m_master) && !m_eventMetaDataPtr)) {
345 if ((module != m_master) && !m_steerRootInputModuleOn) {
346 B2WARNING("Event processing stopped by module '" << module->getName() <<
347 "', which is not in control of event processing (does not provide EventMetaData)");
348 }
349 return true;
350 }
351
352 //Handle EventMetaData changes by master module
353 if (module == m_master) {
354
355 //initialize random number state for the event
357
358 //Check for a change of the run
359 if ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) ||
361
362 if (collectStats)
363 m_processStatisticsPtr->suspendGlobal();
364
366 processBeginRun(skipMasterModule);
367
368 if (collectStats)
369 m_processStatisticsPtr->resumeGlobal();
370 }
371
373
374 //make sure we use the event dependent generator again
376
378
379 } else {
380 //Check for a second master module. Cannot do this if we skipped the
381 //master module as the EventMetaData is probably set before we call this
382 //function
383 if (!skipMasterModule && m_eventMetaDataPtr &&
385 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and " << module->getName());
386 }
387 }
388
389 if (gSignalReceived != 0) {
390 throw StoppedBySignalException(gSignalReceived);
391 }
392
393 //Check for the module conditions, evaluate them and if one is true switch to the new path
394 if (module->evalCondition()) {
395 PathPtr condPath = module->getConditionPath();
396 //continue with parent Path after condition path is executed?
397 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
398 moduleIter = PathIterator(condPath, moduleIter);
399 } else {
400 moduleIter = PathIterator(condPath);
401 }
402 } else {
403 moduleIter.next();
404 }
405 } //end module loop
406 return false;
407}
408
409void EventProcessor::processCore(const PathPtr& startPath, const ModulePtrList& modulePathList, long maxEvent, bool isInputProcess)
410{
412 m_moduleList = modulePathList;
413 //Remember the previous event meta data, and identify end of data meta data
414 m_previousEventMetaData.setEndOfData(); //invalid start state
415
416 const bool collectStats = !Environment::Instance().getNoStats();
417
418 //Loop over the events
419 long currEvent = 0;
420 bool endProcess = false;
421 while (!endProcess) {
422 if (collectStats)
423 m_processStatisticsPtr->startGlobal();
424
425 PathIterator moduleIter(startPath);
426 endProcess = processEvent(moduleIter, isInputProcess && currEvent == 0);
427
428 //Delete event related data in DataStore
430
431 currEvent++;
432 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
433 if (collectStats)
435 } //end event loop
436
437 //End last run
438 m_eventMetaDataPtr.create();
440}
441
442
444{
446
447 LogSystem& logSystem = LogSystem::Instance();
448 ModulePtrList::const_reverse_iterator listIter;
449 m_processStatisticsPtr->startGlobal();
450
451 for (listIter = modulePathList.rbegin(); listIter != modulePathList.rend(); ++listIter) {
452 Module* module = listIter->get();
453
454 //Set the module dependent log level
455 logSystem.updateModule(&(module->getLogConfig()), module->getName());
456
457 //Do termination
458 m_processStatisticsPtr->startModule();
459 CALL_MODULE(module, terminate);
461
462 //Set the global log level
463 logSystem.updateModule(nullptr);
464 }
465
467}
468
469
471{
472 MetadataService::Instance().addBasf2Status("beginning run");
473
474 m_inRun = true;
475 auto dbsession = Database::Instance().createScopedUpdateSession(); // cppcheck-suppress unreadVariable
476
477 LogSystem& logSystem = LogSystem::Instance();
478 m_processStatisticsPtr->startGlobal();
479
480 if (!skipDB) DBStore::Instance().update();
481
482 //initialize random generator for end run
484
485 for (const ModulePtr& modPtr : m_moduleList) {
486 Module* module = modPtr.get();
487
488 //Set the module dependent log level
489 logSystem.updateModule(&(module->getLogConfig()), module->getName());
490
491 //Do beginRun() call
492 m_processStatisticsPtr->startModule();
493 CALL_MODULE(module, beginRun);
495
496 //Set the global log level
497 logSystem.updateModule(nullptr);
498 }
499
501}
502
503
505{
507
508 if (!m_inRun)
509 return;
510 m_inRun = false;
511
512 LogSystem& logSystem = LogSystem::Instance();
513 m_processStatisticsPtr->startGlobal();
514
515 const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
517
518 //initialize random generator for end run
520
521 for (const ModulePtr& modPtr : m_moduleList) {
522 Module* module = modPtr.get();
523
524 //Set the module dependent log level
525 logSystem.updateModule(&(module->getLogConfig()), module->getName());
526
527 //Do endRun() call
528 m_processStatisticsPtr->startModule();
529 CALL_MODULE(module, endRun);
531
532 //Set the global log level
533 logSystem.updateModule(nullptr);
534 }
535 *m_eventMetaDataPtr = newEventMetaData;
536
538}
class to visualize data flow between modules.
void visualizePath(const std::string &filename, const Path &path)
Create graphs with datastore inputs/outputs of each module in path.
In the store you can park objects that have to be accessed by various modules.
Definition: DataStore.h:51
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:53
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:93
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:714
DependencyMap & getDependencyMap()
Return map of dependencies between modules.
Definition: DataStore.h:524
void setModule(const Module &mod)
Set the current module (for getCurrentModuleInfo())
Definition: DependencyMap.h:60
void setNumberEventsOverride(unsigned int nevents)
Override the number of events in run 1 for EventInfoSetter module.
Definition: Environment.h:65
bool getNoStats() const
Disable collection of statistics during event processing.
Definition: Environment.h:200
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Definition: Environment.h:68
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
void setEndOfData()
Marks the end of the data processing.
int getRun() const
Run Getter.
int getExperiment() const
Experiment Getter.
Exception thrown when execution is stopped by a signal.
void processEndRun()
Calls the end run methods of all modules.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
std::string m_profileModuleName
Name of the module which should be profiled, empty if no profiling is requested.
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
double m_lastMetadataUpdate
Time in seconds of last call for metadata update in event loop.
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
Module * m_profileModule
Address of the module which we want to profile, nullptr if no profiling is requested.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
bool processEvent(PathIterator moduleIter, bool skipMasterModule)
Calls event() functions on all modules for the current event.
virtual ~EventProcessor()
Destructor.
const Module * m_master
The master module that determines the experiment/run/event number.
EventProcessor()
Constructor.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
ModulePtrList m_moduleList
List of all modules in order initialized.
long getMaximumEventNumber(long maxEvent) const
Calculate the maximum event number out of the argument from command line and the environment.
StoreObjPtr< EventExtraInfo > m_eventExtraInfo
event extra info object pointer
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.
bool m_steerRootInputModuleOn
True if the SteerRootInputModule is in charge for event processing.
double m_metadataUpdateInterval
Minimal time difference in seconds for metadata updates in event loop.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
void printErrorSummary()
Print error/warning summary at end of execution.
Definition: LogSystem.cc:203
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:158
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:28
void addBasf2Status(const std::string &message="")
Add metadata of basf2 status.
static MetadataService & Instance()
Static method to get a reference to the MetadataService instance.
@ c_Init
Counting time/calls in initialize()
@ c_EndRun
Counting time/calls in endRun()
@ c_Term
Counting time/calls in terminate()
@ c_BeginRun
Counting time/calls in beginRun()
@ c_Event
Counting time/calls in event()
Base class for Modules.
Definition: Module.h:72
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:186
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void initializeEndRun()
Initialize run independent random generator for end run.
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static const double s
[second]
Definition: Unit.h:95
Class to store variables with their name which were sent to the logging service.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
static Database & Instance()
Instance of a singleton Database.
Definition: Database.cc:41
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:26
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:583
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:140
void update()
Updates all objects that are outside their interval of validity.
Definition: DBStore.cc:77
ScopeGuard createScopedUpdateSession()
Make sure we have efficient http pipelinging during initialize/beginRun but don't keep session alive ...
Definition: Database.cc:61
double getClock()
Return current value of the real-time clock.
Definition: Utils.cc:66
Abstract base class for different kinds of events.
STL namespace.