Belle II Software  release-08-01-10
pEventProcessor Class Reference

This class provides the core event processing loop for parallel processing. More...

#include <pEventProcessor.h>

Inheritance diagram for pEventProcessor:
Collaboration diagram for pEventProcessor:

Public Member Functions

 pEventProcessor ()
 Constructor.
 
virtual ~pEventProcessor ()
 Destructor.
 
void process (const PathPtr &spath, long maxEvent)
 Processes the full module chain, starting with the first module in the given path. More...
 
void gotSigINT ()
 signal handler for Ctrl+C (async-safe) More...
 
void killRingBuffers ()
 signal handler (async-safe) More...
 
void cleanup ()
 clean up IPC resources (should only be called in one process).
 
void setProfileModuleName (const std::string &name)
 Set the name of the module we want to profile. More...
 

Static Public Member Functions

static void writeToStdErr (const char msg[])
 async-safe method to write something to STDERR.
 
static void installSignalHandler (int sig, void(*fn)(int))
 Install a signal handler 'fn' for given signal.
 
static void installMainSignalHandlers (void(*fn)(int)=nullptr)
 Install signal handler for INT, TERM and QUIT signals. More...
 

Protected Member Functions

void processInitialize (const ModulePtrList &modulePathList, bool setEventInfo=true)
 Initializes the modules. More...
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
 Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path. More...
 
bool processEvent (PathIterator moduleIter, bool skipMasterModule)
 Calls event() functions on all modules for the current event. More...
 
void callEvent (Module *module)
 Calls event() on one single module, setting up logging and statistics as needed. More...
 
void processTerminate (const ModulePtrList &modulePathList)
 Terminates the modules. More...
 
void processBeginRun (bool skipDB=false)
 Calls the begin run methods of all modules. More...
 
void processEndRun ()
 Calls the end run methods of all modules. More...
 
long getMaximumEventNumber (long maxEvent) const
 Calculate the maximum event number out of the argument from command line and the environment.
 

Protected Attributes

const Modulem_master
 The master module that determines the experiment/run/event number.
 
ModulePtrList m_moduleList
 List of all modules in order initialized.
 
std::string m_profileModuleName
 Name of the module which should be profiled, empty if no profiling is requested.
 
Modulem_profileModule = nullptr
 Adress of the module which we want to profile, nullptr if no profiling is requested.
 
StoreObjPtr< EventMetaDatam_eventMetaDataPtr
 EventMetaData is used by processEvent()/processCore().
 
EventMetaData m_previousEventMetaData
 Stores state of EventMetaData before it was last changed. More...
 
StoreObjPtr< ProcessStatisticsm_processStatisticsPtr
 Also used in a number of places.
 
bool m_inRun
 Are we currently in a run? If yes, processEndRun() needs to do something.
 
double m_lastMetadataUpdate
 Time in seconds of last call for metadata update in event loop.
 
double m_metadataUpdateInterval
 Minimal time difference in seconds for metadata updates in event loop.
 
bool m_steerRootInputModuleOn = false
 True if the SteerRootInputModule is in charge for event processing.
 

Private Member Functions

void analyzePath (const PathPtr &path)
 Analyze given path. More...
 
void preparePaths ()
 Adds internal modules to paths, prepare RingBuffers.
 
RingBufferconnectViaRingBuffer (const char *name, const PathPtr &a, PathPtr &b)
 Create RingBuffer with name from given environment variable, add Tx and Rx modules to a and b.
 
void dump_modules (const std::string &, const ModulePtrList &)
 Dump module names in the ModulePtrList.
 
void clearFileList ()
 TFiles are stored in a global list and cleaned up by root since this will happen in all forked processes, these will be corrupted if we don't clean the list! More...
 

Static Private Member Functions

static ModulePtrList getModulesWithFlag (const ModulePtrList &modules, Module::EModulePropFlags flag)
 Return only modules which have the given Module flag set.
 
static ModulePtrList getModulesWithoutFlag (const ModulePtrList &modules, Module::EModulePropFlags flag)
 Return only modules which do not have the given Module flag set.
 
static void prependModulesIfNotPresent (ModulePtrList *modules, const ModulePtrList &prependModules)
 Prepend given 'prependModules' to 'modules', if they're not already present.
 

Private Attributes

std::unique_ptr< ProcHandlerm_procHandler
 handler to fork and manage processes.
 
PathPtr m_inputPath
 Input path.
 
PathPtr m_mainPath
 Main (parallel section) path.
 
PathPtr m_outputPath
 Output path.
 
RingBufferm_rbin = nullptr
 input RingBuffer
 
RingBufferm_rbout = nullptr
 output RingBuffer
 
ModulePtr m_histoman
 Pointer to HistoManagerModule, or nullptr if not found.
 

Detailed Description

This class provides the core event processing loop for parallel processing.

Definition at line 28 of file pEventProcessor.h.

Member Function Documentation

◆ analyzePath()

void analyzePath ( const PathPtr path)
private

Analyze given path.

Fills m_*path objects.

Definition at line 296 of file pEventProcessor.cc.

297 {
298  //modules that can be parallelised, but should not go into a parallel section by themselves
299  std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
300 
301  PathPtr inpath(new Path);
302  PathPtr mainpath(new Path);
303  PathPtr outpath(new Path);
304 
305  int stage = 0; //0: in, 1: event/main, 2: out
306  for (const ModulePtr& module : path->getModules()) {
307  bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
308  //entire conditional path must also be compatible
309  if (hasParallelFlag and module->hasCondition()) {
310  for (const auto& conditionPath : module->getAllConditionPaths()) {
312  hasParallelFlag = false;
313  }
314  }
315  }
316 
317  //update stage?
318  if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
319  stage++;
320 
321  if (stage == 2) {
322  bool path_is_useful = false;
323  for (const auto& parallelModule : mainpath->getModules()) {
324  if (uselessParallelModules.count(parallelModule->getType()) == 0) {
325  path_is_useful = true;
326  break;
327  }
328  }
329  if (not path_is_useful) {
330  //merge mainpath back into input path
331  inpath->addPath(mainpath);
332  mainpath.reset(new Path);
333  //and search for further parallel sections
334  stage = 0;
335  }
336  }
337  }
338 
339  if (stage == 0) { //fill input path
340  inpath->addModule(module);
341 
342  if (module->hasProperties(Module::c_HistogramManager)) {
343  // Initialize histogram manager if found in the path
344  m_histoman = module;
345 
346  //add histoman to other paths
347  mainpath->addModule(m_histoman);
348  outpath->addModule(m_histoman);
349  }
350  }
351 
352  if (stage == 1)
353  mainpath->addModule(module);
354  if (stage == 2)
355  outpath->addModule(module);
356  }
357 
358  bool createAllPaths = false; //usually we might not need e.g. an output path
359  for (const ModulePtr& module : path->getModules()) {
360  if (module->hasProperties(Module::c_TerminateInAllProcesses))
361  createAllPaths = true; //ensure there are all kinds of processes
362  }
363 
364  //if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
365  if (!mainpath->isEmpty())
366  m_mainPath = mainpath;
367  if (createAllPaths or !inpath->isEmpty())
368  m_inputPath = inpath;
369  if (createAllPaths or !outpath->isEmpty())
370  m_outputPath = outpath;
371 }
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
PathPtr m_outputPath
Output path.
ModulePtr m_histoman
Pointer to HistoManagerModule, or nullptr if not found.
PathPtr m_mainPath
Main (parallel section) path.
PathPtr m_inputPath
Input path.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40

◆ callEvent()

void callEvent ( Module module)
protectedinherited

Calls event() on one single module, setting up logging and statistics as needed.

Parameters
moduleModule to call the event() function

Definition at line 226 of file EventProcessor.cc.

◆ clearFileList()

void clearFileList ( )
private

TFiles are stored in a global list and cleaned up by root since this will happen in all forked processes, these will be corrupted if we don't clean the list!

needs to be called at the end of every process.

Definition at line 99 of file pEventProcessor.cc.

◆ gotSigINT()

void gotSigINT ( )

signal handler for Ctrl+C (async-safe)

When called the first time, does nothing (input process handles SIGINT by itself). On subsequent calls, RingBuffers are cleared, discarding any events that have been partly produced (mostly equivalent to previous behaviour on Ctrl+C)

Definition at line 86 of file pEventProcessor.cc.

◆ installMainSignalHandlers()

void installMainSignalHandlers ( void(*)(int)  fn = nullptr)
staticinherited

Install signal handler for INT, TERM and QUIT signals.

If argument is NULL, EventProcessor's own signal handler will be installed.

Definition at line 312 of file EventProcessor.cc.

◆ killRingBuffers()

void killRingBuffers ( )

signal handler (async-safe)

Fairly abrupt termination after the current event.

Definition at line 92 of file pEventProcessor.cc.

◆ process()

void process ( const PathPtr spath,
long  maxEvent 
)

Processes the full module chain, starting with the first module in the given path.

Processes all events for the given run number and for events from 0 to maxEvent.

Parameters
spathThe processing starts with the first module of this path.
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events will be processed.

Definition at line 107 of file pEventProcessor.cc.

◆ processBeginRun()

void processBeginRun ( bool  skipDB = false)
protectedinherited

Calls the begin run methods of all modules.

Loops over all module instances specified in a list and calls their beginRun() method. Please note: the beginRun() method of the module which triggered the beginRun() loop will also be called.

Definition at line 467 of file EventProcessor.cc.

◆ processCore()

void processCore ( const PathPtr startPath,
const ModulePtrList modulePathList,
long  maxEvent = 0,
bool  isInputProcess = true 
)
protectedinherited

Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.

Parameters
startPathThe processing starts with the first module of this path.
modulePathListA list of all modules which could be executed during the data processing (used for calling the beginRun() and endRun() method).
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events are processed.
isInputProcesstrue when this is either the only or the input process

Definition at line 406 of file EventProcessor.cc.

◆ processEndRun()

void processEndRun ( )
protectedinherited

Calls the end run methods of all modules.

Loops over all module instances specified in a list and calls their endRun() method. Please note: the endRun() method of the module which triggered the endRun() loop will also be called.

Definition at line 501 of file EventProcessor.cc.

◆ processEvent()

bool processEvent ( PathIterator  moduleIter,
bool  skipMasterModule 
)
protectedinherited

Calls event() functions on all modules for the current event.

Used by processCore.

Parameters
moduleIteriterator of the path containing all the modules
skipMasterModuleskip the execution of the master module, presumably because this is the first event and it's already been done in initialize()
Returns
true if execution should stop.

Definition at line 321 of file EventProcessor.cc.

◆ processInitialize()

void processInitialize ( const ModulePtrList modulePathList,
bool  setEventInfo = true 
)
protectedinherited

Initializes the modules.

Loops over all module instances specified in a list and calls their initialize() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.
setEventInfoif true the first event call of the master module will be called immidiately to load the event info right away so that it's available for subsequent modules

Definition at line 242 of file EventProcessor.cc.

◆ processTerminate()

void processTerminate ( const ModulePtrList modulePathList)
protectedinherited

Terminates the modules.

Loops over all module instances in reverse order specified in a list and calls their terminate() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.

Definition at line 440 of file EventProcessor.cc.

◆ setProfileModuleName()

void setProfileModuleName ( const std::string &  name)
inlineinherited

Set the name of the module we want to profile.

Parameters
nameName of the module as returned by getName()

Definition at line 57 of file EventProcessor.h.

Member Data Documentation

◆ m_previousEventMetaData

EventMetaData m_previousEventMetaData
protectedinherited

Stores state of EventMetaData before it was last changed.

Useful since processEndRun() needs info about which run it needs to end.

Definition at line 166 of file EventProcessor.h.


The documentation for this class was generated from the following files: