Belle II Software  release-08-01-10
HLTEventProcessor Class Reference

EventProcessor to be used on the HLT with all specialities of the HLT processing: More...

#include <HLTEventProcessor.h>

Inheritance diagram for HLTEventProcessor:
Collaboration diagram for HLTEventProcessor:

Public Member Functions

 HLTEventProcessor (const std::vector< std::string > &outputAddresses)
 Create a new event processor and store the ZMQ addresses where to unregister workers.
 
void process (PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
 Process the given path. More...
 
void process (const PathPtr &startPath, long maxEvent=0)
 Processes the full module chain, starting with the first module in the given path. More...
 
void setProfileModuleName (const std::string &name)
 Set the name of the module we want to profile. More...
 

Static Public Member Functions

static void writeToStdErr (const char msg[])
 async-safe method to write something to STDERR.
 
static void installSignalHandler (int sig, void(*fn)(int))
 Install a signal handler 'fn' for given signal.
 
static void installMainSignalHandlers (void(*fn)(int)=nullptr)
 Install signal handler for INT, TERM and QUIT signals. More...
 

Protected Member Functions

void processInitialize (const ModulePtrList &modulePathList, bool setEventInfo=true)
 Initializes the modules. More...
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
 Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path. More...
 
void callEvent (Module *module)
 Calls event() on one single module, setting up logging and statistics as needed. More...
 
void processTerminate (const ModulePtrList &modulePathList)
 Terminates the modules. More...
 
void processBeginRun (bool skipDB=false)
 Calls the begin run methods of all modules. More...
 
void processEndRun ()
 Calls the end run methods of all modules. More...
 
long getMaximumEventNumber (long maxEvent) const
 Calculate the maximum event number out of the argument from command line and the environment.
 

Protected Attributes

const Modulem_master
 The master module that determines the experiment/run/event number.
 
ModulePtrList m_moduleList
 List of all modules in order initialized.
 
std::string m_profileModuleName
 Name of the module which should be profiled, empty if no profiling is requested.
 
Modulem_profileModule = nullptr
 Adress of the module which we want to profile, nullptr if no profiling is requested.
 
StoreObjPtr< EventMetaDatam_eventMetaDataPtr
 EventMetaData is used by processEvent()/processCore().
 
EventMetaData m_previousEventMetaData
 Stores state of EventMetaData before it was last changed. More...
 
StoreObjPtr< ProcessStatisticsm_processStatisticsPtr
 Also used in a number of places.
 
bool m_inRun
 Are we currently in a run? If yes, processEndRun() needs to do something.
 
double m_lastMetadataUpdate
 Time in seconds of last call for metadata update in event loop.
 
double m_metadataUpdateInterval
 Minimal time difference in seconds for metadata updates in event loop.
 
bool m_steerRootInputModuleOn = false
 True if the SteerRootInputModule is in charge for event processing.
 

Private Member Functions

void sendTerminatedMessage (unsigned int pid, bool waitForConfirmation)
 Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation (if requested)
 
void runWorkers (PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
 Fork out as much workers as requested and in each run the given path using processCore.
 
void processCore (PathPtr path)
 Process the path by basically calling processEvent until a termination is requested. More...
 
bool processEvent (PathIterator moduleIter, bool firstRound)
 Process a single event by iterating through the module path once. More...
 
std::pair< unsigned int, unsigned int > checkChildProcesses ()
 Check if one of the started processes has died. More...
 
void release ()
 Release the parent resource, which is needed after forking to not close it twice.
 
bool forkOut ()
 Helper function to fork out. Sets the Python state correctly and adds the process to the internal state.
 

Private Attributes

ZMQParent m_parent
 An instance of a ZMQParent to create sockets for unregistering workers.
 
std::vector< int > m_processList
 The current list of running processes (with their PIDs)
 
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
 The created sockets for unregistering workers. TODO: use connections.
 

Detailed Description

EventProcessor to be used on the HLT with all specialities of the HLT processing:

  • no input or output path - just workers
  • multiprocessing forked out after initialization, which is before the first real event is processed
  • restart of terminated workers (configurable) and unregistration and DQM server and collector
  • special run start/end handling: please check the HLTZMQ2DsModule for details
  • check of the child processes in an additional monitoring process (happens to be the parent process) by keeping track of the current state

This event processor is specialies to the HLT and should not be used apart from that. The event processor is exported as a python module. It can be called with

import hbasf2 hbasf2.process(path, [address, ...], True)

or if you want to put number on each folked process,

hbasf2.processNumbered(path, [address, ...], True(, True))

Definition at line 37 of file HLTEventProcessor.h.

Member Function Documentation

◆ callEvent()

void callEvent ( Module module)
protectedinherited

Calls event() on one single module, setting up logging and statistics as needed.

Parameters
moduleModule to call the event() function

Definition at line 226 of file EventProcessor.cc.

227 {
228  LogSystem& logSystem = LogSystem::Instance();
229  const bool collectStats = !Environment::Instance().getNoStats();
230  // set up logging
231  logSystem.updateModule(&(module->getLogConfig()), module->getName());
232  // set up statistics is requested
233  if (collectStats) m_processStatisticsPtr->startModule();
234  // call module
235  CALL_MODULE(module, event);
236  // stop timing
237  if (collectStats) m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_Event);
238  // reset logging
239  logSystem.updateModule(nullptr);
240 };
bool getNoStats() const
Disable collection of statistics during event processing.
Definition: Environment.h:187
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
@ c_Event
Counting time/calls in event()

◆ checkChildProcesses()

std::pair< unsigned int, unsigned int > checkChildProcesses ( )
private

Check if one of the started processes has died.

If it has died with a non-zero exit code, increase the counter of workers to restart.

Returns the number of still alive workers and the number of workers needed to restart (aka the number of workers that died with a non-zero exit code) as a pair.

Definition at line 391 of file HLTEventProcessor.cc.

392 {
393  unsigned int needToRestart = 0;
394 
395  // Check for processes, which where there last time but are gone now (so they died)
396  for (auto iter = m_processList.begin(); iter != m_processList.end();) {
397  const auto& pid = *iter;
398 
399  // check the status of this process pid
400  int status;
401  const int result = waitpid(pid, &status, WNOHANG);
402  if (result == -1) {
403  if (errno == EINTR) {
404  // interrupted, try again next time
405  ++iter;
406  continue;
407  } else {
408  B2FATAL("waitpid() failed.");
409  }
410  } else if (result == 0) {
411  // No change, so lets continue with the next worker
412  ++iter;
413  continue;
414  }
415 
416  B2ASSERT("Do not understand the result of waitpid()", result == pid);
417 
418  // state has changed, which means it is dead!
419  const auto exitCode = WEXITSTATUS(status);
420 
421  // we only need to restart unexpected deads
422  if (exitCode != 0) {
423  B2WARNING("A worker process has died unexpected!");
424  needToRestart += 1;
425 
426  sendTerminatedMessage(pid, true);
427  }
428 
429  // once a process is gone from the global list, remove them from our own, too.
430  iter = m_processList.erase(iter);
431  }
432 
433  return {m_processList.size(), needToRestart};
434 }
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
std::vector< int > m_processList
The current list of running processes (with their PIDs)

◆ installMainSignalHandlers()

void installMainSignalHandlers ( void(*)(int)  fn = nullptr)
staticinherited

Install signal handler for INT, TERM and QUIT signals.

If argument is NULL, EventProcessor's own signal handler will be installed.

Definition at line 312 of file EventProcessor.cc.

◆ process() [1/2]

void process ( const PathPtr startPath,
long  maxEvent = 0 
)
inherited

Processes the full module chain, starting with the first module in the given path.

Processes all events for the given run number and for events from 0 to maxEvent. If maxEvent is smaller or equal 0 the maximum number check is disabled and all events are processed. If runNumber is smaller than 0, the run number has to be set externally by a module and not the given number is used.

Parameters
startPathThe processing starts with the first module of this path.
maxEventOptional: The maximum number of events that will be processed. If the number is smaller or equal 0, all events will be processed.

Definition at line 123 of file EventProcessor.cc.

◆ process() [2/2]

void process ( PathPtr  spath,
bool  restartFailedWorkers,
bool  appendProcessNumberToModuleName = false 
)

Process the given path.

If requested, restart failed workers (or not) For this,

  • first check if all modules have a parallel flag
  • then call initialize in the main process
  • fork out all workers and process the path
  • while monitoring their status
  • in the end kill remaining processes (if needed) and re-raise any collected signals

Definition at line 84 of file HLTEventProcessor.cc.

◆ processBeginRun()

void processBeginRun ( bool  skipDB = false)
protectedinherited

Calls the begin run methods of all modules.

Loops over all module instances specified in a list and calls their beginRun() method. Please note: the beginRun() method of the module which triggered the beginRun() loop will also be called.

Definition at line 467 of file EventProcessor.cc.

◆ processCore() [1/2]

void processCore ( const PathPtr startPath,
const ModulePtrList modulePathList,
long  maxEvent = 0,
bool  isInputProcess = true 
)
protectedinherited

Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.

Parameters
startPathThe processing starts with the first module of this path.
modulePathListA list of all modules which could be executed during the data processing (used for calling the beginRun() and endRun() method).
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events are processed.
isInputProcesstrue when this is either the only or the input process

Definition at line 406 of file EventProcessor.cc.

◆ processCore() [2/2]

void processCore ( PathPtr  path)
private

Process the path by basically calling processEvent until a termination is requested.

# Will not any initialization - it is assumed this has already happened before. In the end, terminate is called.

Definition at line 249 of file HLTEventProcessor.cc.

◆ processEndRun()

void processEndRun ( )
protectedinherited

Calls the end run methods of all modules.

Loops over all module instances specified in a list and calls their endRun() method. Please note: the endRun() method of the module which triggered the endRun() loop will also be called.

Definition at line 501 of file EventProcessor.cc.

◆ processEvent()

bool processEvent ( PathIterator  moduleIter,
bool  firstRound 
)
private

Process a single event by iterating through the module path once.

In principle very similar to the EventProcessor::processEvent function, but has different assumptions for the run changes happening induced by the master module (which is always the ZMQ2Ds module in the HLT case).

The logic happening after the master module is the following:

  • if an EndOfData is set in the event meta data, just break out
  • if a HLT-specific EndOfRun is set, call the end of run methods of all modules (without begin run)
  • if the previous event meta data has the EndOfRun/Data set, call the begin run functions
  • if the run change was induced due to a changing run number, call begin and end run functions

The rest (e.g. module conditions, db store) is the same as the EventProcessor case.

Definition at line 286 of file HLTEventProcessor.cc.

◆ processInitialize()

void processInitialize ( const ModulePtrList modulePathList,
bool  setEventInfo = true 
)
protectedinherited

Initializes the modules.

Loops over all module instances specified in a list and calls their initialize() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.
setEventInfoif true the first event call of the master module will be called immidiately to load the event info right away so that it's available for subsequent modules

Definition at line 242 of file EventProcessor.cc.

◆ processTerminate()

void processTerminate ( const ModulePtrList modulePathList)
protectedinherited

Terminates the modules.

Loops over all module instances in reverse order specified in a list and calls their terminate() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.

Definition at line 440 of file EventProcessor.cc.

◆ setProfileModuleName()

void setProfileModuleName ( const std::string &  name)
inlineinherited

Set the name of the module we want to profile.

Parameters
nameName of the module as returned by getName()

Definition at line 57 of file EventProcessor.h.

57 { m_profileModuleName = name; }
std::string m_profileModuleName
Name of the module which should be profiled, empty if no profiling is requested.

Member Data Documentation

◆ m_previousEventMetaData

EventMetaData m_previousEventMetaData
protectedinherited

Stores state of EventMetaData before it was last changed.

Useful since processEndRun() needs info about which run it needs to end.

Definition at line 166 of file EventProcessor.h.


The documentation for this class was generated from the following files: