Belle II Software  release-08-01-10
ZMQEventProcessor Class Reference

This class provides the core event processing loop for parallel processing with ZMQ. More...

#include <ZMQEventProcessor.h>

Inheritance diagram for ZMQEventProcessor:
Collaboration diagram for ZMQEventProcessor:

Public Member Functions

 ZMQEventProcessor ()
 Init the socket cleaning at exit.
 
virtual ~ZMQEventProcessor ()
 Make sure we remove all sockets cleanly.
 
void process (const PathPtr &spath, long maxEvent)
 Processes the full module chain using parallel processing, starting with the first module in the given path.
 
void cleanup ()
 clean up IPC resources (should only be called in one process).
 
void setProfileModuleName (const std::string &name)
 Set the name of the module we want to profile. More...
 

Static Public Member Functions

static void writeToStdErr (const char msg[])
 async-safe method to write something to STDERR.
 
static void installSignalHandler (int sig, void(*fn)(int))
 Install a signal handler 'fn' for given signal.
 
static void installMainSignalHandlers (void(*fn)(int)=nullptr)
 Install signal handler for INT, TERM and QUIT signals. More...
 

Protected Member Functions

void processInitialize (const ModulePtrList &modulePathList, bool setEventInfo=true)
 Initializes the modules. More...
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
 Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path. More...
 
bool processEvent (PathIterator moduleIter, bool skipMasterModule)
 Calls event() functions on all modules for the current event. More...
 
void callEvent (Module *module)
 Calls event() on one single module, setting up logging and statistics as needed. More...
 
void processTerminate (const ModulePtrList &modulePathList)
 Terminates the modules. More...
 
long getMaximumEventNumber (long maxEvent) const
 Calculate the maximum event number out of the argument from command line and the environment.
 

Protected Attributes

const Modulem_master
 The master module that determines the experiment/run/event number.
 
ModulePtrList m_moduleList
 List of all modules in order initialized.
 
std::string m_profileModuleName
 Name of the module which should be profiled, empty if no profiling is requested.
 
Modulem_profileModule = nullptr
 Adress of the module which we want to profile, nullptr if no profiling is requested.
 
StoreObjPtr< EventMetaDatam_eventMetaDataPtr
 EventMetaData is used by processEvent()/processCore().
 
StoreObjPtr< ProcessStatisticsm_processStatisticsPtr
 Also used in a number of places.
 
bool m_inRun
 Are we currently in a run? If yes, processEndRun() needs to do something.
 
double m_lastMetadataUpdate
 Time in seconds of last call for metadata update in event loop.
 
double m_metadataUpdateInterval
 Minimal time difference in seconds for metadata updates in event loop.
 
bool m_steerRootInputModuleOn = false
 True if the SteerRootInputModule is in charge for event processing.
 

Private Member Functions

void initialize (const ModulePtrList &moduleList, const ModulePtr &histogramManager)
 First step in the process: init the module in the list. More...
 
void forkAndRun (long maxEvent, const PathPtr &inputPath, const PathPtr &mainPath, const PathPtr &outputPath, const ModulePtrList &terminateGlobally)
 Second step in the process: fork out the processes we need to have and call the event loop.
 
void terminateAndCleanup (const ModulePtr &histogramManager)
 Last step in the process: run the termination and cleanup (kill all remaining processes)
 
void runMonitoring (const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Start the monitoring (without forking)
 
void runInput (const PathPtr &inputPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Fork out the input process.
 
void runOutput (const PathPtr &outputPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Fork out the output process.
 
void runWorker (unsigned int numProcesses, const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Fork out the N worker process.
 
void processPath (const PathPtr &localPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Basic function run in every process: process the event loop of the given path.
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true, bool isWorkerProcess=false, bool isOutputProcess=false)
 Process modules in the path.
 
bool processEvent (PathIterator moduleIter, bool skipMasterModule, bool Worker=false, bool output=false)
 Calls Event function.
 
void processBeginRun (bool skipDB=false)
 Calls BeginRun function.
 
void processEndRun ()
 Calls EndRun function.
 

Private Attributes

ProcessMonitor m_processMonitor
 Instance of the process monitor.
 
EventMetaData m_previousEventMetaData
 Stores previous eventMetaData.
 

Detailed Description

This class provides the core event processing loop for parallel processing with ZMQ.

Definition at line 23 of file ZMQEventProcessor.h.

Member Function Documentation

◆ callEvent()

void callEvent ( Module module)
protectedinherited

Calls event() on one single module, setting up logging and statistics as needed.

Parameters
moduleModule to call the event() function

Definition at line 226 of file EventProcessor.cc.

◆ initialize()

void initialize ( const ModulePtrList moduleList,
const ModulePtr histogramManager 
)
private

First step in the process: init the module in the list.

TFiles are stored in a global list and cleaned up by root since this will happen in all forked processes, these will be corrupted if we don't clean the list!

needs to be called at the end of every process.

Definition at line 174 of file ZMQEventProcessor.cc.

175 {
176  if (histogramManager) {
177  histogramManager->initialize();
178  }
179  // from now on the datastore is available
180  processInitialize(moduleList, true);
181 
182  B2INFO("ZMQEventProcessor : processInitialize done");
183 
184  // Don't start processing in case of no master module
185  if (!m_master) {
186  B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
187  }
188 
189  // Check if errors appeared. If yes, don't start the event processing.
191  if (numLogError != 0) {
192  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
193  }
194 
195  // TODO: I do not really understand what is going on here...
201  // disable ROOT's management of TFiles
202  // clear list, but don't actually delete the objects
203  gROOT->GetListOfFiles()->Clear("nodelete");
204 }
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
const Module * m_master
The master module that determines the experiment/run/event number.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31

◆ installMainSignalHandlers()

void installMainSignalHandlers ( void(*)(int)  fn = nullptr)
staticinherited

Install signal handler for INT, TERM and QUIT signals.

If argument is NULL, EventProcessor's own signal handler will be installed.

Definition at line 312 of file EventProcessor.cc.

◆ processCore()

void processCore ( const PathPtr startPath,
const ModulePtrList modulePathList,
long  maxEvent = 0,
bool  isInputProcess = true 
)
protectedinherited

Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.

Parameters
startPathThe processing starts with the first module of this path.
modulePathListA list of all modules which could be executed during the data processing (used for calling the beginRun() and endRun() method).
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events are processed.
isInputProcesstrue when this is either the only or the input process

Definition at line 406 of file EventProcessor.cc.

◆ processEvent()

bool processEvent ( PathIterator  moduleIter,
bool  skipMasterModule 
)
protectedinherited

Calls event() functions on all modules for the current event.

Used by processCore.

Parameters
moduleIteriterator of the path containing all the modules
skipMasterModuleskip the execution of the master module, presumably because this is the first event and it's already been done in initialize()
Returns
true if execution should stop.

Definition at line 321 of file EventProcessor.cc.

◆ processInitialize()

void processInitialize ( const ModulePtrList modulePathList,
bool  setEventInfo = true 
)
protectedinherited

Initializes the modules.

Loops over all module instances specified in a list and calls their initialize() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.
setEventInfoif true the first event call of the master module will be called immidiately to load the event info right away so that it's available for subsequent modules

Definition at line 242 of file EventProcessor.cc.

◆ processTerminate()

void processTerminate ( const ModulePtrList modulePathList)
protectedinherited

Terminates the modules.

Loops over all module instances in reverse order specified in a list and calls their terminate() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.

Definition at line 440 of file EventProcessor.cc.

◆ setProfileModuleName()

void setProfileModuleName ( const std::string &  name)
inlineinherited

Set the name of the module we want to profile.

Parameters
nameName of the module as returned by getName()

Definition at line 57 of file EventProcessor.h.


The documentation for this class was generated from the following files: