Belle II Software development
HLTEventProcessor Class Reference

EventProcessor to be used on the HLT with all specialities of the HLT processing: More...

#include <HLTEventProcessor.h>

Inheritance diagram for HLTEventProcessor:
EventProcessor

Public Member Functions

 HLTEventProcessor (const std::vector< std::string > &outputAddresses)
 Create a new event processor and store the ZMQ addresses where to unregister workers.
 
void process (PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
 Process the given path.
 
void process (const PathPtr &startPath, long maxEvent=0)
 Processes the full module chain, starting with the first module in the given path.
 
void setProfileModuleName (const std::string &name)
 Set the name of the module we want to profile.
 

Static Public Member Functions

static void writeToStdErr (const char msg[])
 async-safe method to write something to STDERR.
 
static void installSignalHandler (int sig, void(*fn)(int))
 Install a signal handler 'fn' for given signal.
 
static void installMainSignalHandlers (void(*fn)(int)=nullptr)
 Install signal handler for INT, TERM and QUIT signals.
 

Protected Member Functions

void processInitialize (const ModulePtrList &modulePathList, bool setEventInfo=true)
 Initializes the modules.
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
 Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.
 
void callEvent (Module *module)
 Calls event() on one single module, setting up logging and statistics as needed.
 
void processTerminate (const ModulePtrList &modulePathList)
 Terminates the modules.
 
void processBeginRun (bool skipDB=false)
 Calls the begin run methods of all modules.
 
void processEndRun ()
 Calls the end run methods of all modules.
 
long getMaximumEventNumber (long maxEvent) const
 Calculate the maximum event number out of the argument from command line and the environment.
 

Protected Attributes

const Modulem_master
 The master module that determines the experiment/run/event number.
 
ModulePtrList m_moduleList
 List of all modules in order initialized.
 
std::string m_profileModuleName
 Name of the module which should be profiled, empty if no profiling is requested.
 
Modulem_profileModule = nullptr
 Address of the module which we want to profile, nullptr if no profiling is requested.
 
StoreObjPtr< EventMetaDatam_eventMetaDataPtr
 EventMetaData is used by processEvent()/processCore().
 
EventMetaData m_previousEventMetaData
 Stores state of EventMetaData before it was last changed.
 
StoreObjPtr< EventExtraInfom_eventExtraInfo
 event extra info object pointer
 
StoreObjPtr< ProcessStatisticsm_processStatisticsPtr
 Also used in a number of places.
 
bool m_inRun
 Are we currently in a run?
 
double m_lastMetadataUpdate
 Time in seconds of last call for metadata update in event loop.
 
double m_metadataUpdateInterval
 Minimal time difference in seconds for metadata updates in event loop.
 
bool m_steerRootInputModuleOn = false
 True if the SteerRootInputModule is in charge for event processing.
 

Private Member Functions

void sendTerminatedMessage (unsigned int pid, bool waitForConfirmation)
 Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation (if requested)
 
void runWorkers (PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
 Fork out as much workers as requested and in each run the given path using processCore.
 
void processCore (PathPtr path)
 Process the path by basically calling processEvent until a termination is requested.
 
bool processEvent (PathIterator moduleIter, bool firstRound)
 Process a single event by iterating through the module path once.
 
std::pair< unsigned int, unsigned int > checkChildProcesses ()
 Check if one of the started processes has died.
 
void release ()
 Release the parent resource, which is needed after forking to not close it twice.
 
bool forkOut ()
 Helper function to fork out. Sets the Python state correctly and adds the process to the internal state.
 

Private Attributes

ZMQParent m_parent
 An instance of a ZMQParent to create sockets for unregistering workers.
 
std::vector< int > m_processList
 The current list of running processes (with their PIDs)
 
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
 The created sockets for unregistering workers. TODO: use connections.
 

Detailed Description

EventProcessor to be used on the HLT with all specialities of the HLT processing:

  • no input or output path - just workers
  • multiprocessing forked out after initialization, which is before the first real event is processed
  • restart of terminated workers (configurable) and unregistration and DQM server and collector
  • special run start/end handling: please check the HLTZMQ2DsModule for details
  • check of the child processes in an additional monitoring process (happens to be the parent process) by keeping track of the current state

This event processor is specialies to the HLT and should not be used apart from that. The event processor is exported as a python module. It can be called with

import hbasf2 hbasf2.process(path, [address, ...], True)

or if you want to put number on each folked process,

hbasf2.processNumbered(path, [address, ...], True(, True))

Definition at line 37 of file HLTEventProcessor.h.

Constructor & Destructor Documentation

◆ HLTEventProcessor()

HLTEventProcessor ( const std::vector< std::string > & outputAddresses)

Create a new event processor and store the ZMQ addresses where to unregister workers.

Definition at line 75 of file HLTEventProcessor.cc.

76{
77 m_sockets.reserve(outputAddresses.size());
78 for (const auto& address : outputAddresses) {
79 m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, false));
80 }
81}
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
The created sockets for unregistering workers. TODO: use connections.
ZMQParent m_parent
An instance of a ZMQParent to create sockets for unregistering workers.

Member Function Documentation

◆ callEvent()

void callEvent ( Module * module)
protectedinherited

Calls event() on one single module, setting up logging and statistics as needed.

Parameters
moduleModule to call the event() function

Definition at line 226 of file EventProcessor.cc.

227{
228 LogSystem& logSystem = LogSystem::Instance();
229 const bool collectStats = Environment::Instance().getStats();
230 // set up logging
231 logSystem.updateModule(&(module->getLogConfig()), module->getName());
232 // set up statistics is requested
233 if (collectStats) m_processStatisticsPtr->startModule();
234 // call module
235 CALL_MODULE(module, event);
236 // stop timing
237 if (collectStats) m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_Event);
238 // reset logging
239 logSystem.updateModule(nullptr);
240};
bool getStats() const
Is collection of statistics during event processing enabled?
static Environment & Instance()
Static method to get a reference to the Environment instance.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition LogSystem.h:200
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition LogSystem.cc:28
@ c_Event
Counting time/calls in event()

◆ checkChildProcesses()

std::pair< unsigned int, unsigned int > checkChildProcesses ( )
private

Check if one of the started processes has died.

If it has died with a non-zero exit code, increase the counter of workers to restart.

Returns the number of still alive workers and the number of workers needed to restart (aka the number of workers that died with a non-zero exit code) as a pair.

Definition at line 395 of file HLTEventProcessor.cc.

396{
397 unsigned int needToRestart = 0;
398
399 // Check for processes, which where there last time but are gone now (so they died)
400 for (auto iter = m_processList.begin(); iter != m_processList.end();) {
401 const auto& pid = *iter;
402
403 // check the status of this process pid
404 int status;
405 const int result = waitpid(pid, &status, WNOHANG);
406 if (result == -1) {
407 if (errno == EINTR) {
408 // interrupted, try again next time
409 ++iter;
410 continue;
411 } else {
412 B2FATAL("waitpid() failed.");
413 }
414 } else if (result == 0) {
415 // No change, so lets continue with the next worker
416 ++iter;
417 continue;
418 }
419
420 B2ASSERT("Do not understand the result of waitpid()", result == pid);
421
422 // state has changed, which means it is dead!
423 const auto exitCode = WEXITSTATUS(status);
424
425 // we only need to restart unexpected deads
426 if (exitCode != 0) {
427 B2WARNING("A worker process has died unexpected!");
428 needToRestart += 1;
429
430 sendTerminatedMessage(pid, true);
431 }
432
433 // once a process is gone from the global list, remove them from our own, too.
434 iter = m_processList.erase(iter);
435 }
436
437 return {m_processList.size(), needToRestart};
438}
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
std::vector< int > m_processList
The current list of running processes (with their PIDs)

◆ forkOut()

bool forkOut ( )
private

Helper function to fork out. Sets the Python state correctly and adds the process to the internal state.

Definition at line 448 of file HLTEventProcessor.cc.

449{
450 fflush(stdout);
451 fflush(stderr);
452
453 pid_t pid = fork();
454
455 if (pid > 0) {
456 g_processNumber++;
457 m_processList.push_back(pid);
458 return false;
459 } else if (pid < 0) {
460 B2FATAL("fork() failed: " << strerror(errno));
461 } else {
462 // Child process
463 // Reset some python state: signals, threads, gil in the child
464 PyOS_AfterFork_Child();
465 // InputController becomes useless in child process
467 // die when parent dies
468 prctl(PR_SET_PDEATHSIG, SIGHUP);
469
471 return true;
472 }
473}
static void resetForChildProcess()
Reset InputController (e.g.
static void setProcessID(int processID)
Set the process ID of this process.

◆ getMaximumEventNumber()

long getMaximumEventNumber ( long maxEvent) const
protectedinherited

Calculate the maximum event number out of the argument from command line and the environment.

Definition at line 113 of file EventProcessor.cc.

114{
115 //Check whether the number of events was set via command line argument
116 unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
117 if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
118 return numEventsArgument;
119 }
120 return maxEvent;
121}
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Definition Environment.h:68

◆ installMainSignalHandlers()

void installMainSignalHandlers ( void(* fn )(int) = nullptr)
staticinherited

Install signal handler for INT, TERM and QUIT signals.

If argument is NULL, EventProcessor's own signal handler will be installed.

Definition at line 315 of file EventProcessor.cc.

316{
317 if (!fn)
318 fn = signalHandler;
319 installSignalHandler(SIGINT, fn);
320 installSignalHandler(SIGTERM, fn);
321 installSignalHandler(SIGQUIT, fn);
322}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.

◆ installSignalHandler()

void installSignalHandler ( int sig,
void(* fn )(int) )
staticinherited

Install a signal handler 'fn' for given signal.

Definition at line 300 of file EventProcessor.cc.

301{
302 struct sigaction s;
303 memset(&s, '\0', sizeof(s));
304
305 s.sa_handler = fn;
306 sigemptyset(&s.sa_mask);
307 if (sig == SIGCHLD)
308 s.sa_flags |= SA_NOCLDSTOP; //don't produce signal when children are stopped
309
310 if (sigaction(sig, &s, nullptr) != 0) {
311 B2FATAL("Cannot setup signal handler for signal " << sig);
312 }
313}

◆ process() [1/2]

void process ( const PathPtr & startPath,
long maxEvent = 0 )
inherited

Processes the full module chain, starting with the first module in the given path.

Processes all events for the given run number and for events from 0 to maxEvent. If maxEvent is smaller or equal 0 the maximum number check is disabled and all events are processed. If runNumber is smaller than 0, the run number has to be set externally by a module and not the given number is used.

Parameters
startPathThe processing starts with the first module of this path.
maxEventOptional: The maximum number of events that will be processed. If the number is smaller or equal 0, all events will be processed.

Definition at line 123 of file EventProcessor.cc.

124{
125 maxEvent = getMaximumEventNumber(maxEvent);
126 // Make sure the NumberEventsOverride reflects the actual number if
127 // process(path, N) was used instead of -n and that it's reset to what it was
128 // after we're done with processing()
129 NumberEventsOverrideGuard numberOfEventsOverrideGuard(maxEvent);
130
131 //Get list of modules which could be executed during the data processing.
132 ModulePtrList moduleList = startPath->buildModulePathList();
133
134 //Find the address of the module we want to profile
135 if (!m_profileModuleName.empty()) {
136 for (const auto& module : moduleList) {
137 if (module->getName() == m_profileModuleName) {
138 m_profileModule = module.get();
139 break;
140 }
141 }
142 if (!m_profileModule)
143 B2FATAL("Module profiling was requested via --profile, but no module '" << m_profileModuleName << "' was found!");
144 }
145
146 //Initialize modules
147 processInitialize(moduleList);
148
149 // SteerRootInputModule might have changed the number of events to be processed
150 for (const auto& module : moduleList) {
151 if (module->getType() == "SteerRootInput") {
152 if (maxEvent != Environment::Instance().getNumberEventsOverride()) {
153 B2INFO("Module 'SteerRootInputModule' is controlling the number of processed events.");
156 }
157 break;
158 }
159 }
160
161 //do we want to visualize DataStore input/output?
162 if (Environment::Instance().getVisualizeDataFlow()) {
163 DataFlowVisualization v(&DataStore::Instance().getDependencyMap());
164 v.visualizePath("dataflow.dot", *startPath);
165 }
166
167 //Don't start processing in case of no master module
168 if (!m_master) {
169 B2ERROR("There is no module that provides event and run numbers (EventMetaData). You must add either the EventInfoSetter or an input module (e.g. RootInput) to the beginning of your path.");
170 }
171
172 //Check if errors appeared. If yes, don't start the event processing.
174 if ((numLogError == 0) && m_master) {
176 try {
177 processCore(startPath, moduleList, maxEvent); //Do the event processing
178 } catch (StoppedBySignalException& e) {
179 if (e.signal != SIGINT) {
180 // close all open ROOT files, ROOT's exit handler will crash otherwise
181 gROOT->GetListOfFiles()->Delete();
182
183 B2FATAL(e.what());
184 }
185 //in case of SIGINT, we move on to processTerminate() to shut down safely
186 } catch (...) {
188 B2ERROR("Exception occurred in exp/run/evt: "
189 << m_eventMetaDataPtr->getExperiment() << " / "
190 << m_eventMetaDataPtr->getRun() << " / "
191 << m_eventMetaDataPtr->getEvent());
192 throw;
193 }
194
195 } else {
196 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
197 }
198
199 //Terminate modules
200 processTerminate(moduleList);
201
203
204 if (gSignalReceived == SIGINT) {
205 const auto msg = R"(Processing aborted via SIGINT, terminating.
206 Output files have been closed safely and should be readable. However
207 processing was NOT COMPLETE. The output files do contain only events
208 processed until this point.)";
210 B2ERROR(msg
211 << LogVar("last_experiment", m_eventMetaDataPtr->getExperiment())
212 << LogVar("last_run", m_eventMetaDataPtr->getRun())
213 << LogVar("last_event", m_eventMetaDataPtr->getEvent()));
214 else
215 B2ERROR(msg);
216 installSignalHandler(SIGINT, SIG_DFL);
217 raise(SIGINT);
218 }
219}
static DataStore & Instance()
Instance of singleton Store.
Definition DataStore.cc:53
Exception thrown when execution is stopped by a signal.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
std::string m_profileModuleName
Name of the module which should be profiled, empty if no profiling is requested.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
Module * m_profileModule
Address of the module which we want to profile, nullptr if no profiling is requested.
const Module * m_master
The master module that determines the experiment/run/event number.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
long getMaximumEventNumber(long maxEvent) const
Calculate the maximum event number out of the argument from command line and the environment.
bool m_steerRootInputModuleOn
True if the SteerRootInputModule is in charge for event processing.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition LogConfig.h:30
void printErrorSummary()
Print error/warning summary at end of execution.
Definition LogSystem.cc:203
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition LogSystem.cc:158
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition Path.cc:67
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition Module.h:583

◆ process() [2/2]

void process ( PathPtr spath,
bool restartFailedWorkers,
bool appendProcessNumberToModuleName = false )

Process the given path.

If requested, restart failed workers (or not) For this,

  • first check if all modules have a parallel flag
  • then call initialize in the main process
  • fork out all workers and process the path
  • while monitoring their status
  • in the end kill remaining processes (if needed) and re-raise any collected signals

Definition at line 83 of file HLTEventProcessor.cc.

84{
85 using namespace std::chrono_literals;
86
87 m_moduleList = path->buildModulePathList();
88
89 // Assert path is what we want: fully parallel certified, not empty. Set first module to master module
90 B2ASSERT("You try to process an empty path!", not m_moduleList.empty());
91 for (const auto& module : m_moduleList) {
92 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
93 // entire conditional path must also be compatible
94 if (hasParallelFlag and module->hasCondition()) {
95 for (const auto& conditionPath : module->getAllConditionPaths()) {
97 hasParallelFlag = false;
98 }
99 }
100 }
101 B2ASSERT("Module with name " << module->getName() << " does not have parallel flag!", hasParallelFlag);
102 }
103
104 // Initialize of all modules (including event() of master module)
107
108 // Don't start processing in case of no master module
109 if (not m_master) {
110 B2ERROR("There is no module that provides event and run numbers (EventMetaData). "
111 "You must add the specific HLT module as first module to the path.");
112 }
113
114 // Check if errors appeared. If yes, don't start the event processing.
116 if (numLogError != 0) {
117 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
118 }
119
120 // Start the workers, which call the main loop
121 const int numProcesses = Environment::Instance().getNumberProcesses();
122 runWorkers(path, numProcesses, appendProcessNumberToModuleName);
123
124 installMainSignalHandlers(storeSignal);
125 // Back in the main process: wait for the processes and monitor them
126 int numberOfRestartedWorkers = 0;
127 while (true) {
128 // check if we have received any signal from the user or OS.
129 // Killing of the remaining processes happens after the loop.
130 if (g_signalReceived > 0) {
131 B2WARNING("Received a signal to go down.");
132 break;
133 }
134
135 // Test if we need more workers and if one has died
136 unsigned int presentWorkers;
137 unsigned int neededWorkers;
138
139 std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
140 if (neededWorkers > 0) {
141 if (restartFailedWorkers) {
142 runWorkers(path, neededWorkers);
143 numberOfRestartedWorkers += neededWorkers;
144 } else {
145 B2ERROR("A worker failed. Will try to end the process smoothly now.");
146 break;
147 }
148 } else if (presentWorkers == 0) {
149 B2DEBUG(10, "All workers have cleanly exited. Will now also exit");
150 break;
151 }
152
153 if (numberOfRestartedWorkers > numProcesses) {
154 B2ERROR("I needed to restart on total " << numberOfRestartedWorkers << ", which I think is abnormal. "
155 "Will terminate the process now!");
156 break;
157 }
158
159 std::this_thread::sleep_for(10ms);
160 }
161
162 if (appendProcessNumberToModuleName) {
163 for (const int& pid : m_processList) {
164 B2INFO(g_processNumber << ": Send SIGINT to " << pid);
165 kill(pid, SIGINT);
166 }
167 for (const int& pid : m_processList) {
168 int count = 0;
169 while (true) {
170 // Do not allow internal SIGKILL to prevent data loss in case of a numbered process
171 if (kill(pid, 0) != 0) {
172 break;
173 }
174 B2DEBUG(10, g_processNumber << ": Checking process termination, count = " << count);
175 std::this_thread::sleep_for(1000ms);
176 if (count % 5 == 1) kill(pid, SIGINT);
177 ++count;
178 }
179 }
180 }
181
183
184 // if we still have/had processes, we should unregister them
185 std::this_thread::sleep_for(500ms);
186
187 for (const int& pid : m_processList) {
188 if (kill(pid, SIGKILL) >= 0) {
189 B2WARNING("Needed to hard kill process " << pid);
190 } else {
191 B2DEBUG(100, "no process " << pid << " found, already gone?");
192 }
193 sendTerminatedMessage(pid, false);
194 }
195 m_processList.clear();
196
197 B2DEBUG(10, "Done here");
198
199 // Normally, we would call terminate here, but not on HLT!
200 // Normally, we would print the error summary here, but not on HLT!
201 if (g_signalReceived == SIGINT) {
202 installSignalHandler(SIGINT, SIG_DFL);
203 raise(SIGINT);
204 }
205}
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
ModulePtrList m_moduleList
List of all modules in order initialized.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition Module.h:80

◆ processBeginRun()

void processBeginRun ( bool skipDB = false)
protectedinherited

Calls the begin run methods of all modules.

Loops over all module instances specified in a list and calls their beginRun() method. Please note: the beginRun() method of the module which triggered the beginRun() loop will also be called.

Definition at line 470 of file EventProcessor.cc.

471{
472 MetadataService::Instance().addBasf2Status("beginning run");
473
474 m_inRun = true;
475 auto dbsession = Database::Instance().createScopedUpdateSession(); // cppcheck-suppress unreadVariable
476
477 LogSystem& logSystem = LogSystem::Instance();
478 m_processStatisticsPtr->startGlobal();
479
480 if (!skipDB) DBStore::Instance().update();
481
482 //initialize random generator for end run
484
485 for (const ModulePtr& modPtr : m_moduleList) {
486 Module* module = modPtr.get();
487
488 //Set the module dependent log level
489 logSystem.updateModule(&(module->getLogConfig()), module->getName());
490
491 //Do beginRun() call
492 m_processStatisticsPtr->startModule();
493 CALL_MODULE(module, beginRun);
495
496 //Set the global log level
497 logSystem.updateModule(nullptr);
498 }
499
501}
bool m_inRun
Are we currently in a run?
void addBasf2Status(const std::string &message="")
Add metadata of basf2 status.
static MetadataService & Instance()
Static method to get a reference to the MetadataService instance.
@ c_BeginRun
Counting time/calls in beginRun()
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static Database & Instance()
Instance of a singleton Database.
Definition Database.cc:41
static DBStore & Instance()
Instance of a singleton DBStore.
Definition DBStore.cc:26
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition Module.h:43
void update()
Updates all objects that are outside their interval of validity.
Definition DBStore.cc:77
ScopeGuard createScopedUpdateSession()
Make sure we have efficient http pipelinging during initialize/beginRun but don't keep session alive ...
Definition Database.cc:61

◆ processCore() [1/2]

void processCore ( const PathPtr & startPath,
const ModulePtrList & modulePathList,
long maxEvent = 0,
bool isInputProcess = true )
protectedinherited

Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.

Parameters
startPathThe processing starts with the first module of this path.
modulePathListA list of all modules which could be executed during the data processing (used for calling the beginRun() and endRun() method).
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events are processed.
isInputProcesstrue when this is either the only or the input process

Definition at line 409 of file EventProcessor.cc.

410{
412 m_moduleList = modulePathList;
413 //Remember the previous event meta data, and identify end of data meta data
414 m_previousEventMetaData.setEndOfData(); //invalid start state
415
416 const bool collectStats = Environment::Instance().getStats();
417
418 //Loop over the events
419 long currEvent = 0;
420 bool endProcess = false;
421 while (!endProcess) {
422 if (collectStats)
423 m_processStatisticsPtr->startGlobal();
424
425 PathIterator moduleIter(startPath);
426 endProcess = processEvent(moduleIter, isInputProcess && currEvent == 0);
427
428 //Delete event related data in DataStore
430
431 currEvent++;
432 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
433 if (collectStats)
435 } //end event loop
436
437 //End last run
438 m_eventMetaDataPtr.create();
440}
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition DataStore.h:59
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition DataStore.cc:93
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition DataStore.cc:714
void processEndRun()
Calls the end run methods of all modules.
bool processEvent(PathIterator moduleIter, bool skipMasterModule)
Calls event() functions on all modules for the current event.
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.

◆ processCore() [2/2]

void processCore ( PathPtr path)
private

Process the path by basically calling processEvent until a termination is requested.

# Will not any initialization - it is assumed this has already happened before. In the end, terminate is called.

Definition at line 248 of file HLTEventProcessor.cc.

249{
250 bool terminationRequested = false;
251 bool firstRound = true;
252
253 // Initialisation is done
255
256 // Set the previous event meta data to something invalid
258
259 while (not terminationRequested) {
260 B2DEBUG(100, "Processing new event");
261
262 // Start the measurement
263 m_processStatisticsPtr->startGlobal();
264
265 // Main call to event() of the modules, and maybe beginRun() and endRun()
266 PathIterator moduleIter(path);
267 terminationRequested = processEvent(moduleIter, firstRound);
268
269 // Delete event related data in DataStore
271
272 // Stop the measurement
274
275 // We are surely not in the first round the next time
276 firstRound = false;
277 }
278
279 // End last run with a terminate. Yes, we are not calling a endRun() here and yes, we are calling this in the worker
280 B2DEBUG(10, "Calling terminate");
281 m_eventMetaDataPtr.create();
283}
void setEndOfData()
Marks the end of the data processing.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.

◆ processEndRun()

void processEndRun ( )
protectedinherited

Calls the end run methods of all modules.

Loops over all module instances specified in a list and calls their endRun() method. Please note: the endRun() method of the module which triggered the endRun() loop will also be called.

Definition at line 504 of file EventProcessor.cc.

505{
507
508 if (!m_inRun)
509 return;
510 m_inRun = false;
511
512 LogSystem& logSystem = LogSystem::Instance();
513 m_processStatisticsPtr->startGlobal();
514
515 const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
517
518 //initialize random generator for end run
520
521 for (const ModulePtr& modPtr : m_moduleList) {
522 Module* module = modPtr.get();
523
524 //Set the module dependent log level
525 logSystem.updateModule(&(module->getLogConfig()), module->getName());
526
527 //Do endRun() call
528 m_processStatisticsPtr->startModule();
529 CALL_MODULE(module, endRun);
531
532 //Set the global log level
533 logSystem.updateModule(nullptr);
534 }
535 *m_eventMetaDataPtr = newEventMetaData;
536
538}
@ c_EndRun
Counting time/calls in endRun()
static void initializeEndRun()
Initialize run independent random generator for end run.

◆ processEvent()

bool processEvent ( PathIterator moduleIter,
bool firstRound )
private

Process a single event by iterating through the module path once.

In principle very similar to the EventProcessor::processEvent function, but has different assumptions for the run changes happening induced by the master module (which is always the ZMQ2Ds module in the HLT case).

The logic happening after the master module is the following:

  • if an EndOfData is set in the event meta data, just break out
  • if a HLT-specific EndOfRun is set, call the end of run methods of all modules (without begin run)
  • if the previous event meta data has the EndOfRun/Data set, call the begin run functions
  • if the run change was induced due to a changing run number, call begin and end run functions

The rest (e.g. module conditions, db store) is the same as the EventProcessor case.

Definition at line 285 of file HLTEventProcessor.cc.

286{
287 while (not moduleIter.isDone()) {
288 Module* module = moduleIter.get();
289 B2DEBUG(10, "Starting event of " << module->getName());
290
291 // The actual call of the event function
292 if (module != m_master) {
293 // If this is not the master module it is quite simple: just call the event function
294 callEvent(module);
295
296 // Check for a second master module. Cannot do this if we are in the first round after initialize
297 // (as previous event meta data is not set properly here)
300 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and "
301 << module->getName());
302 }
303 } else {
304 if (not firstRound) {
305 // Also call the event function for the master, but not the first time
306 callEvent(module);
307 }
308 // initialize random number state for the event handling after we have
309 // recieved the event information from the master module.
311 }
312
313 if (g_signalReceived != 0) {
314 if (g_signalReceived != SIGINT) {
315 throw StoppedBySignalException(g_signalReceived);
316 } else {
317 B2DEBUG(10, "Received a SIGINT in the worker process...");
318 return true;
319 }
320 }
321
322 B2ASSERT("The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
323
324 if (m_eventMetaDataPtr->isEndOfData()) {
325 // Immediately leave the loop and terminate (true)
326 return true;
327 }
328
329 if (module == m_master and not firstRound) {
330 if (m_eventMetaDataPtr->isEndOfRun()) {
331 B2DEBUG(10, "Calling endRun()");
332 // call endRun() of all modules (internally uses the previous event meta data) and skip to the next event
333 m_processStatisticsPtr->suspendGlobal();
334 m_inRun = true;
336 m_processStatisticsPtr->resumeGlobal();
337
338 // Store the current event meta data for the next round
340
341 // Leave this event, but not the full processing (false)
342 return false;
344 // The run has changes (or we never had one), so call beginRun() before going on
345 // The run number should not be 0
346 if (m_eventMetaDataPtr->getRun() != 0) {
347 m_processStatisticsPtr->suspendGlobal();
349 m_processStatisticsPtr->resumeGlobal();
350 } else {
351 return false;
352 }
353 }
354
355 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
357 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
359 if (runChangedWithoutNotice) {
360 m_processStatisticsPtr->suspendGlobal();
361
362 m_inRun = true;
365
366 m_processStatisticsPtr->resumeGlobal();
367 }
368
369 // make sure we use the event dependent generator again
371
372 // and the correct database
374
375 // Store the current event meta data for the next round
377 }
378
379 // Check for the module conditions, evaluate them and if one is true, switch to the new path
380 if (module->evalCondition()) {
381 PathPtr condPath = module->getConditionPath();
382 // continue with parent Path after condition path is executed?
383 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
384 moduleIter = PathIterator(condPath, moduleIter);
385 } else {
386 moduleIter = PathIterator(condPath);
387 }
388 } else {
389 moduleIter.next();
390 }
391 }
392 return false;
393}
int getRun() const
Run Getter.
bool isEndOfRun() const
is end-of-run set?
int getExperiment() const
Experiment Getter.
bool isEndOfData() const
is end-of-data set?
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
const std::string & getName() const
Returns the name of the module.
Definition Module.h:186
void next()
increment.
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition Path.h:35
void updateEvent()
Updates all intra-run dependent objects.
Definition DBStore.cc:140

◆ processInitialize()

void processInitialize ( const ModulePtrList & modulePathList,
bool setEventInfo = true )
protectedinherited

Initializes the modules.

Loops over all module instances specified in a list and calls their initialize() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.
setEventInfoif true the first event call of the master module will be called immediately to load the event info right away so that it's available for subsequent modules

Definition at line 242 of file EventProcessor.cc.

243{
244 LogSystem& logSystem = LogSystem::Instance();
246
247 m_processStatisticsPtr.registerInDataStore();
248 //TODO I might want to overwrite it in initialize (e.g. if read from file)
249 // For parallel processing or subevents, I don't want that, though.
250 // Maybe make this a function argument?
252 m_processStatisticsPtr.create();
253 m_processStatisticsPtr->startGlobal();
254
256
257 // EventExtraInfo is needed in several modules so register it here
258 m_eventExtraInfo.registerInDataStore();
259
260 for (const ModulePtr& modPtr : modulePathList) {
261 Module* module = modPtr.get();
262
263 if (module->hasUnsetForcedParams()) {
264 //error message was printed by module
265 continue;
266 }
267
268 //Set the module dependent log level
269 logSystem.updateModule(&(module->getLogConfig()), module->getName());
271
272 //Do initialization
273 m_processStatisticsPtr->initModule(module);
274 m_processStatisticsPtr->startModule();
275 CALL_MODULE(module, initialize);
277
278 //Set the global log level
279 logSystem.updateModule(nullptr);
280
281 //Check whether this is the master module
282 if (!m_master && DataStore::Instance().getEntry(m_eventMetaDataPtr) != nullptr) {
283 B2DEBUG(100, "Found module providing EventMetaData: " << module->getName());
284 m_master = module;
285 if (setEventInfo) {
286 callEvent(module);
287 // update Database payloads: we now have valid event meta data unless
288 // we don't process any events
290 }
291 }
292
293 if (gSignalReceived != 0) {
294 throw StoppedBySignalException(gSignalReceived);
295 }
296 }
298}
DependencyMap & getDependencyMap()
Return map of dependencies between modules.
Definition DataStore.h:524
void setModule(const Module &mod)
Set the current module (for getCurrentModuleInfo())
StoreObjPtr< EventExtraInfo > m_eventExtraInfo
event extra info object pointer
@ c_Init
Counting time/calls in initialize()

◆ processTerminate()

void processTerminate ( const ModulePtrList & modulePathList)
protectedinherited

Terminates the modules.

Loops over all module instances in reverse order specified in a list and calls their terminate() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.

Definition at line 443 of file EventProcessor.cc.

444{
446
447 LogSystem& logSystem = LogSystem::Instance();
448 ModulePtrList::const_reverse_iterator listIter;
449 m_processStatisticsPtr->startGlobal();
450
451 for (listIter = modulePathList.rbegin(); listIter != modulePathList.rend(); ++listIter) {
452 Module* module = listIter->get();
453
454 //Set the module dependent log level
455 logSystem.updateModule(&(module->getLogConfig()), module->getName());
456
457 //Do termination
458 m_processStatisticsPtr->startModule();
459 CALL_MODULE(module, terminate);
461
462 //Set the global log level
463 logSystem.updateModule(nullptr);
464 }
465
467}
@ c_Term
Counting time/calls in terminate()

◆ release()

void release ( )
private

Release the parent resource, which is needed after forking to not close it twice.

Definition at line 440 of file HLTEventProcessor.cc.

441{
442 for (auto& socket : m_sockets) {
443 socket.release();
444 }
445 m_parent.reset();
446}

◆ runWorkers()

void runWorkers ( PathPtr path,
unsigned int numProcesses,
bool appendProcessNumberToModuleName = false )
private

Fork out as much workers as requested and in each run the given path using processCore.

Definition at line 207 of file HLTEventProcessor.cc.

208{
209 for (unsigned int i = 0; i < numProcesses; i++) {
210 if (forkOut()) {
211 // Do only run in forked out worker process:
212 B2DEBUG(10, "Starting a new worker process");
213 // Reset the parent and sockets
214 release();
215
216 // Start the main loop with our signal handling and error catching
217 installMainSignalHandlers(storeSignal);
218 try {
219 if (appendProcessNumberToModuleName) {
220 for (const auto& module : m_moduleList) {
221 module->setName(std::to_string(g_processNumber) + std::string("_") + module->getName());
222 B2INFO("New worker name is " << module->getName());
223 }
224 }
225 processCore(path);
226 } catch (StoppedBySignalException& e) {
227 // close all open ROOT files, ROOT's exit handler will crash otherwise
228 gROOT->GetListOfFiles()->Delete();
229
230 B2ERROR(e.what());
231 exit(1);
232 } catch (...) {
234 B2ERROR("Exception occured in exp/run/evt: "
235 << m_eventMetaDataPtr->getExperiment() << " / "
236 << m_eventMetaDataPtr->getRun() << " / "
237 << m_eventMetaDataPtr->getEvent());
238 throw;
239 }
240
241 B2DEBUG(10, "Ending a worker process here.");
242 // Ok, we are done here!
243 exit(0);
244 }
245 }
246}
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void release()
Release the parent resource, which is needed after forking to not close it twice.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.

◆ sendTerminatedMessage()

void sendTerminatedMessage ( unsigned int pid,
bool waitForConfirmation )
private

Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation (if requested)

Definition at line 56 of file HLTEventProcessor.cc.

57{
58 for (auto& socket : m_sockets) {
59 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage,
61 ZMQParent::send(socket, std::move(message));
62
63 if (not waitForConfirmation) {
64 continue;
65 }
66 if (ZMQParent::poll({socket.get()}, 10 * 1000)) {
67 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
68 B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
69 } else {
70 B2FATAL("Did not receive a confirmation message! waitForConfirmation is " << waitForConfirmation);
71 }
72 }
73}
static std::unique_ptr< AMessage > fromSocket(const std::unique_ptr< zmq::socket_t > &socket)
Create a message of the given type by receiving a message from the socket.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition ZMQParent.cc:56
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition ZMQParent.cc:32
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition ZMQParent.h:153

◆ setProfileModuleName()

void setProfileModuleName ( const std::string & name)
inlineinherited

Set the name of the module we want to profile.

Parameters
nameName of the module as returned by getName()

Definition at line 58 of file EventProcessor.h.

58{ m_profileModuleName = name; }

◆ writeToStdErr()

void writeToStdErr ( const char msg[])
staticinherited

async-safe method to write something to STDERR.

Definition at line 72 of file EventProcessor.cc.

73{
74 //signal handlers are called asynchronously, making many standard functions (including output) dangerous
75 //write() is, however, safe, so we'll use that to write to stderr.
76
77 //strlen() not explicitly in safe list, but doesn't have any error handling routines that might alter global state
78 const int len = strlen(msg);
79
80 int rc = write(STDERR_FILENO, msg, len);
81 (void) rc; //ignore return value (there's nothing we can do about a failed write)
82
83}

Member Data Documentation

◆ m_eventExtraInfo

StoreObjPtr<EventExtraInfo> m_eventExtraInfo
protectedinherited

event extra info object pointer

Definition at line 170 of file EventProcessor.h.

◆ m_eventMetaDataPtr

StoreObjPtr<EventMetaData> m_eventMetaDataPtr
protectedinherited

EventMetaData is used by processEvent()/processCore().

Definition at line 164 of file EventProcessor.h.

◆ m_inRun

bool m_inRun
protectedinherited

Are we currently in a run?

If yes, processEndRun() needs to do something.

Definition at line 176 of file EventProcessor.h.

◆ m_lastMetadataUpdate

double m_lastMetadataUpdate
protectedinherited

Time in seconds of last call for metadata update in event loop.

Definition at line 179 of file EventProcessor.h.

◆ m_master

const Module* m_master
protectedinherited

The master module that determines the experiment/run/event number.

Definition at line 154 of file EventProcessor.h.

◆ m_metadataUpdateInterval

double m_metadataUpdateInterval
protectedinherited

Minimal time difference in seconds for metadata updates in event loop.

Definition at line 182 of file EventProcessor.h.

◆ m_moduleList

ModulePtrList m_moduleList
protectedinherited

List of all modules in order initialized.

Definition at line 155 of file EventProcessor.h.

◆ m_parent

ZMQParent m_parent
private

An instance of a ZMQParent to create sockets for unregistering workers.

Definition at line 55 of file HLTEventProcessor.h.

◆ m_previousEventMetaData

EventMetaData m_previousEventMetaData
protectedinherited

Stores state of EventMetaData before it was last changed.

Useful since processEndRun() needs info about which run it needs to end.

Definition at line 167 of file EventProcessor.h.

◆ m_processList

std::vector<int> m_processList
private

The current list of running processes (with their PIDs)

Definition at line 57 of file HLTEventProcessor.h.

◆ m_processStatisticsPtr

StoreObjPtr<ProcessStatistics> m_processStatisticsPtr
protectedinherited

Also used in a number of places.

Definition at line 173 of file EventProcessor.h.

◆ m_profileModule

Module* m_profileModule = nullptr
protectedinherited

Address of the module which we want to profile, nullptr if no profiling is requested.

Definition at line 161 of file EventProcessor.h.

◆ m_profileModuleName

std::string m_profileModuleName
protectedinherited

Name of the module which should be profiled, empty if no profiling is requested.

Definition at line 158 of file EventProcessor.h.

◆ m_sockets

std::vector<std::unique_ptr<zmq::socket_t> > m_sockets
private

The created sockets for unregistering workers. TODO: use connections.

Definition at line 59 of file HLTEventProcessor.h.

◆ m_steerRootInputModuleOn

bool m_steerRootInputModuleOn = false
protectedinherited

True if the SteerRootInputModule is in charge for event processing.

Definition at line 185 of file EventProcessor.h.


The documentation for this class was generated from the following files: