Belle II Software  release-05-02-19
HLTEventProcessor Class Reference

EventProcessor to be used on the HLT with all specialities of the HLT processing: More...

#include <HLTEventProcessor.h>

Inheritance diagram for HLTEventProcessor:
Collaboration diagram for HLTEventProcessor:

Public Member Functions

 HLTEventProcessor (const std::vector< std::string > &outputAddresses)
 Create a new event processor and store the ZMQ addresses where to unregister workers.
 
void process (PathPtr spath, bool restartFailedWorkers)
 Process the given path. More...
 
void process (const PathPtr &startPath, long maxEvent=0)
 Processes the full module chain, starting with the first module in the given path. More...
 
void setProfileModuleName (const std::string &name)
 Set the name of the module we want to profile. More...
 

Static Public Member Functions

static void writeToStdErr (const char msg[])
 async-safe method to write something to STDERR.
 
static void installSignalHandler (int sig, void(*fn)(int))
 Install a signal handler 'fn' for given signal.
 
static void installMainSignalHandlers (void(*fn)(int)=nullptr)
 Install signal handler for INT, TERM and QUIT signals. More...
 

Protected Member Functions

void processInitialize (const ModulePtrList &modulePathList, bool setEventInfo=true)
 Initializes the modules. More...
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
 Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path. More...
 
void callEvent (Module *module)
 Calls event() on one single module, setting up logging and statistics as needed. More...
 
void processTerminate (const ModulePtrList &modulePathList)
 Terminates the modules. More...
 
void processBeginRun (bool skipDB=false)
 Calls the begin run methods of all modules. More...
 
void processEndRun ()
 Calls the end run methods of all modules. More...
 
long getMaximumEventNumber (long maxEvent) const
 Calculate the maximum event number out of the argument from command line and the environment.
 

Protected Attributes

const Modulem_master
 The master module that determines the experiment/run/event number.
 
ModulePtrList m_moduleList
 List of all modules in order initialized.
 
std::string m_profileModuleName
 Name of the module which should be profiled, empty if no profiling is requested.
 
Modulem_profileModule = nullptr
 Adress of the module which we want to profile, nullptr if no profiling is requested.
 
StoreObjPtr< EventMetaDatam_eventMetaDataPtr
 EventMetaData is used by processEvent()/processCore().
 
EventMetaData m_previousEventMetaData
 Stores state of EventMetaData before it was last changed. More...
 
StoreObjPtr< ProcessStatisticsm_processStatisticsPtr
 Also used in a number of places.
 
bool m_inRun
 Are we currently in a run? If yes, processEndRun() needs to do something.
 
double m_lastMetadataUpdate
 Time in seconds of last call for metadata update in event loop.
 
double m_metadataUpdateInterval
 Minimal time difference in seconds for metadata updates in event loop.
 

Private Member Functions

void sendTerminatedMessage (unsigned int pid, bool waitForConfirmation)
 Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation (if requested)
 
void runWorkers (PathPtr path, unsigned int numProcesses)
 Fork out as much workers as requested and in each run the given path using processCore.
 
void processCore (PathPtr path)
 Process the path by basically calling processEvent until a termination is requested. More...
 
bool processEvent (PathIterator moduleIter, bool firstRound)
 Process a single event by iterating through the module path once. More...
 
std::pair< unsigned int, unsigned int > checkChildProcesses ()
 Check if one of the started processes has died. More...
 
void release ()
 Release the parent resource, which is needed after forking to not close it twice.
 
bool forkOut ()
 Helper function to fork out. Sets the Python state correctly and adds the process to the internal state.
 

Private Attributes

ZMQParent m_parent
 An instance of a ZMQParent to create sockets for unregistering workers.
 
std::vector< int > m_processList
 The current list of running processes (with their PIDs)
 
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
 The created sockets for unregistering workers. TODO: use connections.
 

Detailed Description

EventProcessor to be used on the HLT with all specialities of the HLT processing:

  • no input or output path - just workers
  • multiprocessing forked out after initialization, which is before the first real event is processed
  • restart of terminated workers (configurable) and unregistration and DQM server and collector
  • special run start/end handling: please check the HLTZMQ2DsModule for details
  • check of the child processes in an additional monitoring process (happens to be the parent process) by keeping track of the current state

This event processor is specialies to the HLT and should not be used apart from that. The event processor is exported as a python module. It can be called with

import hbasf2 hbasf2.process(path, [address, ...], True)

Definition at line 45 of file HLTEventProcessor.h.

Member Function Documentation

◆ callEvent()

void callEvent ( Module module)
protectedinherited

Calls event() on one single module, setting up logging and statistics as needed.

Parameters
moduleModule to call the event() function

Definition at line 216 of file EventProcessor.cc.

◆ checkChildProcesses()

std::pair< unsigned int, unsigned int > checkChildProcesses ( )
private

Check if one of the started processes has died.

If it has died with a non-zero exit code, increase the counter of workers to restart.

Returns the number of still alive workers and the number of workers needed to restart (aka the number of workers that died with a non-zero exit code) as a pair.

Definition at line 364 of file HLTEventProcessor.cc.

365 {
366  unsigned int needToRestart = 0;
367 
368  // Check for processes, which where there last time but are gone now (so they died)
369  for (auto iter = m_processList.begin(); iter != m_processList.end();) {
370  const auto& pid = *iter;
371 
372  // check the status of this process pid
373  int status;
374  const int result = waitpid(pid, &status, WNOHANG);
375  if (result == -1) {
376  if (errno == EINTR) {
377  // interrupted, try again next time
378  ++iter;
379  continue;
380  } else {
381  B2FATAL("waitpid() failed.");
382  }
383  } else if (result == 0) {
384  // No change, so lets continue with the next worker
385  ++iter;
386  continue;
387  }
388 
389  B2ASSERT("Do not understand the result of waitpid()", result == pid);
390 
391  // state has changed, which means it is dead!
392  const auto exitCode = WEXITSTATUS(status);
393 
394  // we only need to restart unexpected deads
395  if (exitCode != 0) {
396  B2WARNING("A worker process has died unexpected!");
397  needToRestart += 1;
398 
399  sendTerminatedMessage(pid, true);
400  }
401 
402  // once a process is gone from the global list, remove them from our own, too.
403  iter = m_processList.erase(iter);
404  }
405 
406  return {m_processList.size(), needToRestart};
407 }

◆ installMainSignalHandlers()

void installMainSignalHandlers ( void(*)(int)  fn = nullptr)
staticinherited

Install signal handler for INT, TERM and QUIT signals.

If argument is NULL, EventProcessor's own signal handler will be installed.

Definition at line 302 of file EventProcessor.cc.

◆ process() [1/2]

void process ( const PathPtr startPath,
long  maxEvent = 0 
)
inherited

Processes the full module chain, starting with the first module in the given path.

Processes all events for the given run number and for events from 0 to maxEvent. If maxEvent is smaller or equal 0 the maximum number check is disabled and all events are processed. If runNumber is smaller than 0, the run number has to be set externally by a module and not the given number is used.

Parameters
startPathThe processing starts with the first module of this path.
maxEventOptional: The maximum number of events that will be processed. If the number is smaller or equal 0, all events will be processed.

Definition at line 125 of file EventProcessor.cc.

◆ process() [2/2]

void process ( PathPtr  spath,
bool  restartFailedWorkers 
)

Process the given path.

If requested, restart failed workers (or not) For this,

  • first check if all modules have a parallel flag
  • then call initialize in the main process
  • fork out all workers and process the path
  • while monitoring their status
  • in the end kill remaining processes (if needed) and re-raise any collected signals

Definition at line 83 of file HLTEventProcessor.cc.

◆ processBeginRun()

void processBeginRun ( bool  skipDB = false)
protectedinherited

Calls the begin run methods of all modules.

Loops over all module instances specified in a list and calls their beginRun() method. Please note: the beginRun() method of the module which triggered the beginRun() loop will also be called.

Definition at line 457 of file EventProcessor.cc.

◆ processCore() [1/2]

void processCore ( const PathPtr startPath,
const ModulePtrList modulePathList,
long  maxEvent = 0,
bool  isInputProcess = true 
)
protectedinherited

Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.

Parameters
startPathThe processing starts with the first module of this path.
modulePathListA list of all modules which could be executed during the data processing (used for calling the beginRun() and endRun() method).
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events are processed.
isInputProcesstrue when this is either the only or the input process

Definition at line 396 of file EventProcessor.cc.

◆ processCore() [2/2]

void processCore ( PathPtr  path)
private

Process the path by basically calling processEvent until a termination is requested.

# Will not any initialization - it is assumed this has already happened before. In the end, terminate is called.

Definition at line 222 of file HLTEventProcessor.cc.

◆ processEndRun()

void processEndRun ( )
protectedinherited

Calls the end run methods of all modules.

Loops over all module instances specified in a list and calls their endRun() method. Please note: the endRun() method of the module which triggered the endRun() loop will also be called.

Definition at line 491 of file EventProcessor.cc.

◆ processEvent()

bool processEvent ( PathIterator  moduleIter,
bool  firstRound 
)
private

Process a single event by iterating through the module path once.

In principle very similar to the EventProcessor::processEvent function, but has different assumptions for the run changes happening induced by the master module (which is always the ZMQ2Ds module in the HLT case).

The logic happening after the master module is the following:

  • if an EndOfData is set in the event meta data, just break out
  • if a HLT-specific EndOfRun is set, call the end of run methods of all modules (without begin run)
  • if the previous event meta data has the EndOfRun/Data set, call the begin run functions
  • if the run change was induced due to a changing run number, call begin and end run functions

The rest (e.g. module conditions, db store) is the same as the EventProcessor case.

Definition at line 259 of file HLTEventProcessor.cc.

◆ processInitialize()

void processInitialize ( const ModulePtrList modulePathList,
bool  setEventInfo = true 
)
protectedinherited

Initializes the modules.

Loops over all module instances specified in a list and calls their initialize() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.
setEventInfoif true the first event call of the master module will be called immidiately to load the event info right away so that it's available for subsequent modules

Definition at line 232 of file EventProcessor.cc.

◆ processTerminate()

void processTerminate ( const ModulePtrList modulePathList)
protectedinherited

Terminates the modules.

Loops over all module instances in reverse order specified in a list and calls their terminate() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.

Definition at line 430 of file EventProcessor.cc.

◆ setProfileModuleName()

void setProfileModuleName ( const std::string &  name)
inlineinherited

Set the name of the module we want to profile.

Parameters
nameName of the module as returned by getName()

Definition at line 67 of file EventProcessor.h.

Member Data Documentation

◆ m_previousEventMetaData

EventMetaData m_previousEventMetaData
protectedinherited

Stores state of EventMetaData before it was last changed.

Useful since processEndRun() needs info about which run it needs to end.

Definition at line 176 of file EventProcessor.h.


The documentation for this class was generated from the following files:
Belle2::HLTEventProcessor::sendTerminatedMessage
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
Definition: HLTEventProcessor.cc:56
Belle2::HLTEventProcessor::m_processList
std::vector< int > m_processList
The current list of running processes (with their PIDs)
Definition: HLTEventProcessor.h:65