Belle II Software development
HLTEventProcessor Class Reference

EventProcessor to be used on the HLT with all specialities of the HLT processing: More...

#include <HLTEventProcessor.h>

Inheritance diagram for HLTEventProcessor:
EventProcessor

Public Member Functions

 HLTEventProcessor (const std::vector< std::string > &outputAddresses)
 Create a new event processor and store the ZMQ addresses where to unregister workers.
 
void process (PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
 Process the given path.
 
void process (const PathPtr &startPath, long maxEvent=0)
 Processes the full module chain, starting with the first module in the given path.
 
void setProfileModuleName (const std::string &name)
 Set the name of the module we want to profile.
 

Static Public Member Functions

static void writeToStdErr (const char msg[])
 async-safe method to write something to STDERR.
 
static void installSignalHandler (int sig, void(*fn)(int))
 Install a signal handler 'fn' for given signal.
 
static void installMainSignalHandlers (void(*fn)(int)=nullptr)
 Install signal handler for INT, TERM and QUIT signals.
 

Protected Member Functions

void processInitialize (const ModulePtrList &modulePathList, bool setEventInfo=true)
 Initializes the modules.
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
 Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.
 
void callEvent (Module *module)
 Calls event() on one single module, setting up logging and statistics as needed.
 
void processTerminate (const ModulePtrList &modulePathList)
 Terminates the modules.
 
void processBeginRun (bool skipDB=false)
 Calls the begin run methods of all modules.
 
void processEndRun ()
 Calls the end run methods of all modules.
 
long getMaximumEventNumber (long maxEvent) const
 Calculate the maximum event number out of the argument from command line and the environment.
 

Protected Attributes

const Modulem_master
 The master module that determines the experiment/run/event number.
 
ModulePtrList m_moduleList
 List of all modules in order initialized.
 
std::string m_profileModuleName
 Name of the module which should be profiled, empty if no profiling is requested.
 
Modulem_profileModule = nullptr
 Adress of the module which we want to profile, nullptr if no profiling is requested.
 
StoreObjPtr< EventMetaDatam_eventMetaDataPtr
 EventMetaData is used by processEvent()/processCore().
 
EventMetaData m_previousEventMetaData
 Stores state of EventMetaData before it was last changed.
 
StoreObjPtr< ProcessStatisticsm_processStatisticsPtr
 Also used in a number of places.
 
bool m_inRun
 Are we currently in a run? If yes, processEndRun() needs to do something.
 
double m_lastMetadataUpdate
 Time in seconds of last call for metadata update in event loop.
 
double m_metadataUpdateInterval
 Minimal time difference in seconds for metadata updates in event loop.
 
bool m_steerRootInputModuleOn = false
 True if the SteerRootInputModule is in charge for event processing.
 

Private Member Functions

void sendTerminatedMessage (unsigned int pid, bool waitForConfirmation)
 Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation (if requested)
 
void runWorkers (PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
 Fork out as much workers as requested and in each run the given path using processCore.
 
void processCore (PathPtr path)
 Process the path by basically calling processEvent until a termination is requested.
 
bool processEvent (PathIterator moduleIter, bool firstRound)
 Process a single event by iterating through the module path once.
 
std::pair< unsigned int, unsigned int > checkChildProcesses ()
 Check if one of the started processes has died.
 
void release ()
 Release the parent resource, which is needed after forking to not close it twice.
 
bool forkOut ()
 Helper function to fork out. Sets the Python state correctly and adds the process to the internal state.
 

Private Attributes

ZMQParent m_parent
 An instance of a ZMQParent to create sockets for unregistering workers.
 
std::vector< int > m_processList
 The current list of running processes (with their PIDs)
 
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
 The created sockets for unregistering workers. TODO: use connections.
 

Detailed Description

EventProcessor to be used on the HLT with all specialities of the HLT processing:

  • no input or output path - just workers
  • multiprocessing forked out after initialization, which is before the first real event is processed
  • restart of terminated workers (configurable) and unregistration and DQM server and collector
  • special run start/end handling: please check the HLTZMQ2DsModule for details
  • check of the child processes in an additional monitoring process (happens to be the parent process) by keeping track of the current state

This event processor is specialies to the HLT and should not be used apart from that. The event processor is exported as a python module. It can be called with

import hbasf2 hbasf2.process(path, [address, ...], True)

or if you want to put number on each folked process,

hbasf2.processNumbered(path, [address, ...], True(, True))

Definition at line 37 of file HLTEventProcessor.h.

Constructor & Destructor Documentation

◆ HLTEventProcessor()

HLTEventProcessor ( const std::vector< std::string > &  outputAddresses)

Create a new event processor and store the ZMQ addresses where to unregister workers.

Definition at line 76 of file HLTEventProcessor.cc.

77{
78 m_sockets.reserve(outputAddresses.size());
79 for (const auto& address : outputAddresses) {
80 m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, false));
81 }
82}
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
The created sockets for unregistering workers. TODO: use connections.
ZMQParent m_parent
An instance of a ZMQParent to create sockets for unregistering workers.
std::unique_ptr< zmq::socket_t > createSocket(const std::string &socketAddress, bool bind)
Create a socket of the given type with the given address and bind or not bind it.
Definition: ZMQParent.h:105

Member Function Documentation

◆ callEvent()

void callEvent ( Module module)
protectedinherited

Calls event() on one single module, setting up logging and statistics as needed.

Parameters
moduleModule to call the event() function

Definition at line 226 of file EventProcessor.cc.

227{
228 LogSystem& logSystem = LogSystem::Instance();
229 const bool collectStats = !Environment::Instance().getNoStats();
230 // set up logging
231 logSystem.updateModule(&(module->getLogConfig()), module->getName());
232 // set up statistics is requested
233 if (collectStats) m_processStatisticsPtr->startModule();
234 // call module
235 CALL_MODULE(module, event);
236 // stop timing
237 if (collectStats) m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_Event);
238 // reset logging
239 logSystem.updateModule(nullptr);
240};
bool getNoStats() const
Disable collection of statistics during event processing.
Definition: Environment.h:199
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
@ c_Event
Counting time/calls in event()

◆ checkChildProcesses()

std::pair< unsigned int, unsigned int > checkChildProcesses ( )
private

Check if one of the started processes has died.

If it has died with a non-zero exit code, increase the counter of workers to restart.

Returns the number of still alive workers and the number of workers needed to restart (aka the number of workers that died with a non-zero exit code) as a pair.

Definition at line 391 of file HLTEventProcessor.cc.

392{
393 unsigned int needToRestart = 0;
394
395 // Check for processes, which where there last time but are gone now (so they died)
396 for (auto iter = m_processList.begin(); iter != m_processList.end();) {
397 const auto& pid = *iter;
398
399 // check the status of this process pid
400 int status;
401 const int result = waitpid(pid, &status, WNOHANG);
402 if (result == -1) {
403 if (errno == EINTR) {
404 // interrupted, try again next time
405 ++iter;
406 continue;
407 } else {
408 B2FATAL("waitpid() failed.");
409 }
410 } else if (result == 0) {
411 // No change, so lets continue with the next worker
412 ++iter;
413 continue;
414 }
415
416 B2ASSERT("Do not understand the result of waitpid()", result == pid);
417
418 // state has changed, which means it is dead!
419 const auto exitCode = WEXITSTATUS(status);
420
421 // we only need to restart unexpected deads
422 if (exitCode != 0) {
423 B2WARNING("A worker process has died unexpected!");
424 needToRestart += 1;
425
426 sendTerminatedMessage(pid, true);
427 }
428
429 // once a process is gone from the global list, remove them from our own, too.
430 iter = m_processList.erase(iter);
431 }
432
433 return {m_processList.size(), needToRestart};
434}
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
std::vector< int > m_processList
The current list of running processes (with their PIDs)

◆ forkOut()

bool forkOut ( )
private

Helper function to fork out. Sets the Python state correctly and adds the process to the internal state.

Definition at line 444 of file HLTEventProcessor.cc.

445{
446 fflush(stdout);
447 fflush(stderr);
448
449 pid_t pid = fork();
450
451 if (pid > 0) {
452 g_processNumber++;
453 m_processList.push_back(pid);
454 return false;
455 } else if (pid < 0) {
456 B2FATAL("fork() failed: " << strerror(errno));
457 } else {
458 // Child process
459 // Reset some python state: signals, threads, gil in the child
460 PyOS_AfterFork_Child();
461 // InputController becomes useless in child process
463 // die when parent dies
464 prctl(PR_SET_PDEATHSIG, SIGHUP);
465
467 return true;
468 }
469}
static void resetForChildProcess()
Reset InputController (e.g.
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250

◆ getMaximumEventNumber()

long getMaximumEventNumber ( long  maxEvent) const
protectedinherited

Calculate the maximum event number out of the argument from command line and the environment.

Definition at line 113 of file EventProcessor.cc.

114{
115 //Check whether the number of events was set via command line argument
116 unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
117 if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
118 return numEventsArgument;
119 }
120 return maxEvent;
121}
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Definition: Environment.h:67

◆ installMainSignalHandlers()

void installMainSignalHandlers ( void(*)(int)  fn = nullptr)
staticinherited

Install signal handler for INT, TERM and QUIT signals.

If argument is NULL, EventProcessor's own signal handler will be installed.

Definition at line 312 of file EventProcessor.cc.

313{
314 if (!fn)
315 fn = signalHandler;
316 installSignalHandler(SIGINT, fn);
317 installSignalHandler(SIGTERM, fn);
318 installSignalHandler(SIGQUIT, fn);
319}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.

◆ installSignalHandler()

void installSignalHandler ( int  sig,
void(*)(int)  fn 
)
staticinherited

Install a signal handler 'fn' for given signal.

Definition at line 297 of file EventProcessor.cc.

298{
299 struct sigaction s;
300 memset(&s, '\0', sizeof(s));
301
302 s.sa_handler = fn;
303 sigemptyset(&s.sa_mask);
304 if (sig == SIGCHLD)
305 s.sa_flags |= SA_NOCLDSTOP; //don't produce signal when children are stopped
306
307 if (sigaction(sig, &s, nullptr) != 0) {
308 B2FATAL("Cannot setup signal handler for signal " << sig);
309 }
310}

◆ process() [1/2]

void process ( const PathPtr startPath,
long  maxEvent = 0 
)
inherited

Processes the full module chain, starting with the first module in the given path.

Processes all events for the given run number and for events from 0 to maxEvent. If maxEvent is smaller or equal 0 the maximum number check is disabled and all events are processed. If runNumber is smaller than 0, the run number has to be set externally by a module and not the given number is used.

Parameters
startPathThe processing starts with the first module of this path.
maxEventOptional: The maximum number of events that will be processed. If the number is smaller or equal 0, all events will be processed.

Definition at line 123 of file EventProcessor.cc.

124{
125 maxEvent = getMaximumEventNumber(maxEvent);
126 // Make sure the NumberEventsOverride reflects the actual number if
127 // process(path, N) was used instead of -n and that it's reset to what it was
128 // after we're done with processing()
129 NumberEventsOverrideGuard numberOfEventsOverrideGuard(maxEvent);
130
131 //Get list of modules which could be executed during the data processing.
132 ModulePtrList moduleList = startPath->buildModulePathList();
133
134 //Find the adress of the module we want to profile
135 if (!m_profileModuleName.empty()) {
136 for (const auto& module : moduleList) {
137 if (module->getName() == m_profileModuleName) {
138 m_profileModule = module.get();
139 break;
140 }
141 }
142 if (!m_profileModule)
143 B2FATAL("Module profiling was requested via --profile, but no module '" << m_profileModuleName << "' was found!");
144 }
145
146 //Initialize modules
147 processInitialize(moduleList);
148
149 // SteerRootInputModule might have changed the number of events to be processed
150 for (const auto& module : moduleList) {
151 if (module->getType() == "SteerRootInput") {
152 if (maxEvent != Environment::Instance().getNumberEventsOverride()) {
153 B2INFO("Module 'SteerRootInputModule' is controlling the number of processed events.");
156 }
157 break;
158 }
159 }
160
161 //do we want to visualize DataStore input/ouput?
162 if (Environment::Instance().getVisualizeDataFlow()) {
163 DataFlowVisualization v(&DataStore::Instance().getDependencyMap());
164 v.visualizePath("dataflow.dot", *startPath);
165 }
166
167 //Don't start processing in case of no master module
168 if (!m_master) {
169 B2ERROR("There is no module that provides event and run numbers (EventMetaData). You must add either the EventInfoSetter or an input module (e.g. RootInput) to the beginning of your path.");
170 }
171
172 //Check if errors appeared. If yes, don't start the event processing.
174 if ((numLogError == 0) && m_master) {
176 try {
177 processCore(startPath, moduleList, maxEvent); //Do the event processing
178 } catch (StoppedBySignalException& e) {
179 if (e.signal != SIGINT) {
180 // close all open ROOT files, ROOT's exit handler will crash otherwise
181 gROOT->GetListOfFiles()->Delete();
182
183 B2FATAL(e.what());
184 }
185 //in case of SIGINT, we move on to processTerminate() to shut down safely
186 } catch (...) {
188 B2ERROR("Exception occured in exp/run/evt: "
189 << m_eventMetaDataPtr->getExperiment() << " / "
190 << m_eventMetaDataPtr->getRun() << " / "
191 << m_eventMetaDataPtr->getEvent());
192 throw;
193 }
194
195 } else {
196 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
197 }
198
199 //Terminate modules
200 processTerminate(moduleList);
201
203
204 if (gSignalReceived == SIGINT) {
205 const auto msg = R"(Processing aborted via SIGINT, terminating.
206 Output files have been closed safely and should be readable. However
207 processing was NOT COMPLETE. The output files do contain only events
208 processed until this point.)";
210 B2ERROR(msg
211 << LogVar("last_experiment", m_eventMetaDataPtr->getExperiment())
212 << LogVar("last_run", m_eventMetaDataPtr->getRun())
213 << LogVar("last_event", m_eventMetaDataPtr->getEvent()));
214 else
215 B2ERROR(msg);
216 installSignalHandler(SIGINT, SIG_DFL);
217 raise(SIGINT);
218 }
219}
class to visualize data flow between modules.
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
std::string m_profileModuleName
Name of the module which should be profiled, empty if no profiling is requested.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
Module * m_profileModule
Adress of the module which we want to profile, nullptr if no profiling is requested.
const Module * m_master
The master module that determines the experiment/run/event number.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
long getMaximumEventNumber(long maxEvent) const
Calculate the maximum event number out of the argument from command line and the environment.
bool m_steerRootInputModuleOn
True if the SteerRootInputModule is in charge for event processing.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
void printErrorSummary()
Print error/warning summary at end of execution.
Definition: LogSystem.cc:206
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
Class to store variables with their name which were sent to the logging service.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584

◆ process() [2/2]

void process ( PathPtr  spath,
bool  restartFailedWorkers,
bool  appendProcessNumberToModuleName = false 
)

Process the given path.

If requested, restart failed workers (or not) For this,

  • first check if all modules have a parallel flag
  • then call initialize in the main process
  • fork out all workers and process the path
  • while monitoring their status
  • in the end kill remaining processes (if needed) and re-raise any collected signals

Definition at line 84 of file HLTEventProcessor.cc.

85{
86 using namespace std::chrono_literals;
87
88 m_moduleList = path->buildModulePathList();
89
90 // Assert path is what we want: fully parallel certified, not empty. Set first module to master module
91 B2ASSERT("You try to process an empty path!", not m_moduleList.empty());
92 for (const auto& module : m_moduleList) {
93 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
94 // entire conditional path must also be compatible
95 if (hasParallelFlag and module->hasCondition()) {
96 for (const auto& conditionPath : module->getAllConditionPaths()) {
98 hasParallelFlag = false;
99 }
100 }
101 }
102 B2ASSERT("Module with name " << module->getName() << " does not have parallel flag!", hasParallelFlag);
103 }
104
105 // Initialize of all modules (including event() of master module)
108
109 // Don't start processing in case of no master module
110 if (not m_master) {
111 B2ERROR("There is no module that provides event and run numbers (EventMetaData). "
112 "You must add the specific HLT module as first module to the path.");
113 }
114
115 // Check if errors appeared. If yes, don't start the event processing.
117 if (numLogError != 0) {
118 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
119 }
120
121 // Start the workers, which call the main loop
122 const int numProcesses = Environment::Instance().getNumberProcesses();
123 runWorkers(path, numProcesses, appendProcessNumberToModuleName);
124
125 installMainSignalHandlers(storeSignal);
126 // Back in the main process: wait for the processes and monitor them
127 int numberOfRestartedWorkers = 0;
128 while (true) {
129 // check if we have received any signal from the user or OS.
130 // Killing of the remaining processes happens after the loop.
131 if (g_signalReceived > 0) {
132 B2WARNING("Received a signal to go down.");
133 break;
134 }
135
136 // Test if we need more workers and if one has died
137 unsigned int presentWorkers;
138 unsigned int neededWorkers;
139
140 std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
141 if (neededWorkers > 0) {
142 if (restartFailedWorkers) {
143 runWorkers(path, neededWorkers);
144 numberOfRestartedWorkers += neededWorkers;
145 } else {
146 B2ERROR("A worker failed. Will try to end the process smoothly now.");
147 break;
148 }
149 } else if (presentWorkers == 0) {
150 B2DEBUG(10, "All workers have cleanly exited. Will now also exit");
151 break;
152 }
153
154 if (numberOfRestartedWorkers > numProcesses) {
155 B2ERROR("I needed to restart on total " << numberOfRestartedWorkers << ", which I think is abnormal. "
156 "Will terminate the process now!");
157 break;
158 }
159
160 std::this_thread::sleep_for(10ms);
161 }
162
163 if (appendProcessNumberToModuleName) {
164 for (const int& pid : m_processList) {
165 B2INFO(g_processNumber << ": Send SIGINT to " << pid);
166 kill(pid, SIGINT);
167 }
168 for (const int& pid : m_processList) {
169 int count = 0;
170 while (true) {
171 // Do not allow internal SIGKILL to prevent data loss in case of a numbered process
172 if (kill(pid, 0) != 0) {
173 break;
174 }
175 B2DEBUG(10, g_processNumber << ": Checking process termination, count = " << count);
176 std::this_thread::sleep_for(1000ms);
177 if (count % 5 == 1) kill(pid, SIGINT);
178 ++count;
179 }
180 }
181 }
182
184
185 // if we still have/had processes, we should unregister them
186 std::this_thread::sleep_for(500ms);
187
188 for (const int& pid : m_processList) {
189 if (kill(pid, SIGKILL) >= 0) {
190 B2WARNING("Needed to hard kill process " << pid);
191 } else {
192 B2DEBUG(100, "no process " << pid << " found, already gone?");
193 }
194 sendTerminatedMessage(pid, false);
195 }
196 m_processList.clear();
197
198 B2DEBUG(10, "Done here");
199
200 // Normally, we would call terminate here, but not on HLT!
201 // Normally, we would print the error summary here, but not on HLT!
202 if (g_signalReceived == SIGINT) {
203 installSignalHandler(SIGINT, SIG_DFL);
204 raise(SIGINT);
205 }
206}
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:157
ModulePtrList m_moduleList
List of all modules in order initialized.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80

◆ processBeginRun()

void processBeginRun ( bool  skipDB = false)
protectedinherited

Calls the begin run methods of all modules.

Loops over all module instances specified in a list and calls their beginRun() method. Please note: the beginRun() method of the module which triggered the beginRun() loop will also be called.

Definition at line 467 of file EventProcessor.cc.

468{
469 MetadataService::Instance().addBasf2Status("beginning run");
470
471 m_inRun = true;
472 auto dbsession = Database::Instance().createScopedUpdateSession(); // cppcheck-suppress unreadVariable
473
474 LogSystem& logSystem = LogSystem::Instance();
475 m_processStatisticsPtr->startGlobal();
476
477 if (!skipDB) DBStore::Instance().update();
478
479 //initialize random generator for end run
481
482 for (const ModulePtr& modPtr : m_moduleList) {
483 Module* module = modPtr.get();
484
485 //Set the module dependent log level
486 logSystem.updateModule(&(module->getLogConfig()), module->getName());
487
488 //Do beginRun() call
489 m_processStatisticsPtr->startModule();
490 CALL_MODULE(module, beginRun);
492
493 //Set the global log level
494 logSystem.updateModule(nullptr);
495 }
496
498}
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
void addBasf2Status(const std::string &message="")
Add metadata of basf2 status.
static MetadataService & Instance()
Static method to get a reference to the MetadataService instance.
@ c_BeginRun
Counting time/calls in beginRun()
Base class for Modules.
Definition: Module.h:72
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static Database & Instance()
Instance of a singleton Database.
Definition: Database.cc:42
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
void update()
Updates all objects that are outside their interval of validity.
Definition: DBStore.cc:79
ScopeGuard createScopedUpdateSession()
Make sure we have efficient http pipelinging during initialize/beginRun but don't keep session alive ...
Definition: Database.cc:62

◆ processCore() [1/2]

void processCore ( const PathPtr startPath,
const ModulePtrList modulePathList,
long  maxEvent = 0,
bool  isInputProcess = true 
)
protectedinherited

Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.

Parameters
startPathThe processing starts with the first module of this path.
modulePathListA list of all modules which could be executed during the data processing (used for calling the beginRun() and endRun() method).
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events are processed.
isInputProcesstrue when this is either the only or the input process

Definition at line 406 of file EventProcessor.cc.

407{
409 m_moduleList = modulePathList;
410 //Remember the previous event meta data, and identify end of data meta data
411 m_previousEventMetaData.setEndOfData(); //invalid start state
412
413 const bool collectStats = !Environment::Instance().getNoStats();
414
415 //Loop over the events
416 long currEvent = 0;
417 bool endProcess = false;
418 while (!endProcess) {
419 if (collectStats)
420 m_processStatisticsPtr->startGlobal();
421
422 PathIterator moduleIter(startPath);
423 endProcess = processEvent(moduleIter, isInputProcess && currEvent == 0);
424
425 //Delete event related data in DataStore
427
428 currEvent++;
429 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
430 if (collectStats)
432 } //end event loop
433
434 //End last run
435 m_eventMetaDataPtr.create();
437}
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:94
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:715
void setEndOfData()
Marks the end of the data processing.
void processEndRun()
Calls the end run methods of all modules.
bool processEvent(PathIterator moduleIter, bool skipMasterModule)
Calls event() functions on all modules for the current event.
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26

◆ processCore() [2/2]

void processCore ( PathPtr  path)
private

Process the path by basically calling processEvent until a termination is requested.

# Will not any initialization - it is assumed this has already happened before. In the end, terminate is called.

Definition at line 249 of file HLTEventProcessor.cc.

250{
251 bool terminationRequested = false;
252 bool firstRound = true;
253
254 // Initialisation is done
256
257 // Set the previous event meta data to something invalid
259
260 while (not terminationRequested) {
261 B2DEBUG(100, "Processing new event");
262
263 // Start the measurement
264 m_processStatisticsPtr->startGlobal();
265
266 // Main call to event() of the modules, and maybe beginRun() and endRun()
267 PathIterator moduleIter(path);
268 terminationRequested = processEvent(moduleIter, firstRound);
269
270 // Delete event related data in DataStore
272
273 // Stop the measurement
275
276 // We are surely not in the first round the next time
277 firstRound = false;
278 }
279
280 // End last run with a terminate. Yes, we are not calling a endRun() here and yes, we are calling this in the worker
281 B2DEBUG(10, "Calling terminate");
282 m_eventMetaDataPtr.create();
284}
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.

◆ processEndRun()

void processEndRun ( )
protectedinherited

Calls the end run methods of all modules.

Loops over all module instances specified in a list and calls their endRun() method. Please note: the endRun() method of the module which triggered the endRun() loop will also be called.

Definition at line 501 of file EventProcessor.cc.

502{
504
505 if (!m_inRun)
506 return;
507 m_inRun = false;
508
509 LogSystem& logSystem = LogSystem::Instance();
510 m_processStatisticsPtr->startGlobal();
511
512 const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
514
515 //initialize random generator for end run
517
518 for (const ModulePtr& modPtr : m_moduleList) {
519 Module* module = modPtr.get();
520
521 //Set the module dependent log level
522 logSystem.updateModule(&(module->getLogConfig()), module->getName());
523
524 //Do endRun() call
525 m_processStatisticsPtr->startModule();
526 CALL_MODULE(module, endRun);
528
529 //Set the global log level
530 logSystem.updateModule(nullptr);
531 }
532 *m_eventMetaDataPtr = newEventMetaData;
533
535}
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
@ c_EndRun
Counting time/calls in endRun()
static void initializeEndRun()
Initialize run independent random generator for end run.

◆ processEvent()

bool processEvent ( PathIterator  moduleIter,
bool  firstRound 
)
private

Process a single event by iterating through the module path once.

In principle very similar to the EventProcessor::processEvent function, but has different assumptions for the run changes happening induced by the master module (which is always the ZMQ2Ds module in the HLT case).

The logic happening after the master module is the following:

  • if an EndOfData is set in the event meta data, just break out
  • if a HLT-specific EndOfRun is set, call the end of run methods of all modules (without begin run)
  • if the previous event meta data has the EndOfRun/Data set, call the begin run functions
  • if the run change was induced due to a changing run number, call begin and end run functions

The rest (e.g. module conditions, db store) is the same as the EventProcessor case.

Definition at line 286 of file HLTEventProcessor.cc.

287{
288 while (not moduleIter.isDone()) {
289 Module* module = moduleIter.get();
290 B2DEBUG(10, "Starting event of " << module->getName());
291
292 // The actual call of the event function
293 if (module != m_master) {
294 // If this is not the master module it is quite simple: just call the event function
295 callEvent(module);
296
297 // Check for a second master module. Cannot do this if we are in the first round after initialize
298 // (as previous event meta data is not set properly here)
301 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and "
302 << module->getName());
303 }
304 } else {
305 if (not firstRound) {
306 // Also call the event function for the master, but not the first time
307 callEvent(module);
308 }
309 // initialize random number state for the event handling after we have
310 // recieved the event information from the master module.
312 }
313
314 if (g_signalReceived != 0) {
315 if (g_signalReceived != SIGINT) {
316 throw StoppedBySignalException(g_signalReceived);
317 } else {
318 B2DEBUG(10, "Received a SIGINT in the worker process...");
319 return true;
320 }
321 }
322
323 B2ASSERT("The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
324
325 if (m_eventMetaDataPtr->isEndOfData()) {
326 // Immediately leave the loop and terminate (true)
327 return true;
328 }
329
330 if (module == m_master and not firstRound) {
331 if (m_eventMetaDataPtr->isEndOfRun()) {
332 B2DEBUG(10, "Calling endRun()");
333 // call endRun() of all modules (internally uses the previous event meta data) and skip to the next event
334 m_processStatisticsPtr->suspendGlobal();
335 m_inRun = true;
337 m_processStatisticsPtr->resumeGlobal();
338
339 // Store the current event meta data for the next round
341
342 // Leave this event, but not the full processing (false)
343 return false;
345 // The run has changes (or we never had one), so call beginRun() before going on
346 m_processStatisticsPtr->suspendGlobal();
348 m_processStatisticsPtr->resumeGlobal();
349 }
350
351 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
353 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
355 if (runChangedWithoutNotice) {
356 m_processStatisticsPtr->suspendGlobal();
357
358 m_inRun = true;
361
362 m_processStatisticsPtr->resumeGlobal();
363 }
364
365 // make sure we use the event dependent generator again
367
368 // and the correct database
370
371 // Store the current event meta data for the next round
373 }
374
375 // Check for the module conditions, evaluate them and if one is true, switch to the new path
376 if (module->evalCondition()) {
377 PathPtr condPath = module->getConditionPath();
378 // continue with parent Path after condition path is executed?
379 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
380 moduleIter = PathIterator(condPath, moduleIter);
381 } else {
382 moduleIter = PathIterator(condPath);
383 }
384 } else {
385 moduleIter.next();
386 }
387 }
388 return false;
389}
int getRun() const
Run Getter.
bool isEndOfRun() const
is end-of-run set? (see setEndOfRun()).
int getExperiment() const
Experiment Getter.
bool isEndOfData() const
is end-of-data set? (see setEndOfData()).
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:187
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:142

◆ processInitialize()

void processInitialize ( const ModulePtrList modulePathList,
bool  setEventInfo = true 
)
protectedinherited

Initializes the modules.

Loops over all module instances specified in a list and calls their initialize() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.
setEventInfoif true the first event call of the master module will be called immidiately to load the event info right away so that it's available for subsequent modules

Definition at line 242 of file EventProcessor.cc.

243{
244 LogSystem& logSystem = LogSystem::Instance();
246
247 m_processStatisticsPtr.registerInDataStore();
248 //TODO I might want to overwrite it in initialize (e.g. if read from file)
249 // For parallel processing or subevents, I don't want that, though.
250 // Maybe make this a function argument?
252 m_processStatisticsPtr.create();
253 m_processStatisticsPtr->startGlobal();
254
256
257 for (const ModulePtr& modPtr : modulePathList) {
258 Module* module = modPtr.get();
259
260 if (module->hasUnsetForcedParams()) {
261 //error message was printed by module
262 continue;
263 }
264
265 //Set the module dependent log level
266 logSystem.updateModule(&(module->getLogConfig()), module->getName());
268
269 //Do initialization
270 m_processStatisticsPtr->initModule(module);
271 m_processStatisticsPtr->startModule();
272 CALL_MODULE(module, initialize);
274
275 //Set the global log level
276 logSystem.updateModule(nullptr);
277
278 //Check whether this is the master module
279 if (!m_master && DataStore::Instance().getEntry(m_eventMetaDataPtr) != nullptr) {
280 B2DEBUG(100, "Found module providing EventMetaData: " << module->getName());
281 m_master = module;
282 if (setEventInfo) {
283 callEvent(module);
284 // update Database payloads: we now have valid event meta data unless
285 // we don't process any events
287 }
288 }
289
290 if (gSignalReceived != 0) {
291 throw StoppedBySignalException(gSignalReceived);
292 }
293 }
295}
DependencyMap & getDependencyMap()
Return map of depedencies between modules.
Definition: DataStore.h:524
void setModule(const Module &mod)
Set the current module (for getCurrentModuleInfo())
Definition: DependencyMap.h:60
@ c_Init
Counting time/calls in initialize()

◆ processTerminate()

void processTerminate ( const ModulePtrList modulePathList)
protectedinherited

Terminates the modules.

Loops over all module instances in reverse order specified in a list and calls their terminate() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.

Definition at line 440 of file EventProcessor.cc.

441{
443
444 LogSystem& logSystem = LogSystem::Instance();
445 ModulePtrList::const_reverse_iterator listIter;
446 m_processStatisticsPtr->startGlobal();
447
448 for (listIter = modulePathList.rbegin(); listIter != modulePathList.rend(); ++listIter) {
449 Module* module = listIter->get();
450
451 //Set the module dependent log level
452 logSystem.updateModule(&(module->getLogConfig()), module->getName());
453
454 //Do termination
455 m_processStatisticsPtr->startModule();
456 CALL_MODULE(module, terminate);
458
459 //Set the global log level
460 logSystem.updateModule(nullptr);
461 }
462
464}
@ c_Term
Counting time/calls in terminate()

◆ release()

void release ( )
private

Release the parent resource, which is needed after forking to not close it twice.

Definition at line 436 of file HLTEventProcessor.cc.

437{
438 for (auto& socket : m_sockets) {
439 socket.release();
440 }
441 m_parent.reset();
442}
void reset()
Expert function: Reset the parent without context closing. ATTENTION: which will not clean up properl...
Definition: ZMQParent.cc:27

◆ runWorkers()

void runWorkers ( PathPtr  path,
unsigned int  numProcesses,
bool  appendProcessNumberToModuleName = false 
)
private

Fork out as much workers as requested and in each run the given path using processCore.

Definition at line 208 of file HLTEventProcessor.cc.

209{
210 for (unsigned int i = 0; i < numProcesses; i++) {
211 if (forkOut()) {
212 // Do only run in forked out worker process:
213 B2DEBUG(10, "Starting a new worker process");
214 // Reset the parent and sockets
215 release();
216
217 // Start the main loop with our signal handling and error catching
218 installMainSignalHandlers(storeSignal);
219 try {
220 if (appendProcessNumberToModuleName) {
221 for (const auto& module : m_moduleList) {
222 module->setName(std::to_string(g_processNumber) + std::string("_") + module->getName());
223 B2INFO("New worker name is " << module->getName());
224 }
225 }
226 processCore(path);
227 } catch (StoppedBySignalException& e) {
228 // close all open ROOT files, ROOT's exit handler will crash otherwise
229 gROOT->GetListOfFiles()->Delete();
230
231 B2ERROR(e.what());
232 exit(1);
233 } catch (...) {
235 B2ERROR("Exception occured in exp/run/evt: "
236 << m_eventMetaDataPtr->getExperiment() << " / "
237 << m_eventMetaDataPtr->getRun() << " / "
238 << m_eventMetaDataPtr->getEvent());
239 throw;
240 }
241
242 B2DEBUG(10, "Ending a worker process here.");
243 // Ok, we are done here!
244 exit(0);
245 }
246 }
247}
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void release()
Release the parent resource, which is needed after forking to not close it twice.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.

◆ sendTerminatedMessage()

void sendTerminatedMessage ( unsigned int  pid,
bool  waitForConfirmation 
)
private

Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation (if requested)

Definition at line 57 of file HLTEventProcessor.cc.

58{
59 for (auto& socket : m_sockets) {
60 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage,
62 ZMQParent::send(socket, std::move(message));
63
64 if (not waitForConfirmation) {
65 continue;
66 }
67 if (ZMQParent::poll({socket.get()}, 10 * 1000)) {
68 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69 B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
70 } else {
71 B2FATAL("Did not receive a confirmation message!");
72 }
73 }
74}
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:32
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153

◆ setProfileModuleName()

void setProfileModuleName ( const std::string &  name)
inlineinherited

Set the name of the module we want to profile.

Parameters
nameName of the module as returned by getName()

Definition at line 57 of file EventProcessor.h.

57{ m_profileModuleName = name; }

◆ writeToStdErr()

void writeToStdErr ( const char  msg[])
staticinherited

async-safe method to write something to STDERR.

Definition at line 72 of file EventProcessor.cc.

73{
74 //signal handlers are called asynchronously, making many standard functions (including output) dangerous
75 //write() is, however, safe, so we'll use that to write to stderr.
76
77 //strlen() not explicitly in safe list, but doesn't have any error handling routines that might alter global state
78 const int len = strlen(msg);
79
80 int rc = write(STDERR_FILENO, msg, len);
81 (void) rc; //ignore return value (there's nothing we can do about a failed write)
82
83}

Member Data Documentation

◆ m_eventMetaDataPtr

StoreObjPtr<EventMetaData> m_eventMetaDataPtr
protectedinherited

EventMetaData is used by processEvent()/processCore().

Definition at line 163 of file EventProcessor.h.

◆ m_inRun

bool m_inRun
protectedinherited

Are we currently in a run? If yes, processEndRun() needs to do something.

Definition at line 172 of file EventProcessor.h.

◆ m_lastMetadataUpdate

double m_lastMetadataUpdate
protectedinherited

Time in seconds of last call for metadata update in event loop.

Definition at line 175 of file EventProcessor.h.

◆ m_master

const Module* m_master
protectedinherited

The master module that determines the experiment/run/event number.

Definition at line 153 of file EventProcessor.h.

◆ m_metadataUpdateInterval

double m_metadataUpdateInterval
protectedinherited

Minimal time difference in seconds for metadata updates in event loop.

Definition at line 178 of file EventProcessor.h.

◆ m_moduleList

ModulePtrList m_moduleList
protectedinherited

List of all modules in order initialized.

Definition at line 154 of file EventProcessor.h.

◆ m_parent

ZMQParent m_parent
private

An instance of a ZMQParent to create sockets for unregistering workers.

Definition at line 55 of file HLTEventProcessor.h.

◆ m_previousEventMetaData

EventMetaData m_previousEventMetaData
protectedinherited

Stores state of EventMetaData before it was last changed.

Useful since processEndRun() needs info about which run it needs to end.

Definition at line 166 of file EventProcessor.h.

◆ m_processList

std::vector<int> m_processList
private

The current list of running processes (with their PIDs)

Definition at line 57 of file HLTEventProcessor.h.

◆ m_processStatisticsPtr

StoreObjPtr<ProcessStatistics> m_processStatisticsPtr
protectedinherited

Also used in a number of places.

Definition at line 169 of file EventProcessor.h.

◆ m_profileModule

Module* m_profileModule = nullptr
protectedinherited

Adress of the module which we want to profile, nullptr if no profiling is requested.

Definition at line 160 of file EventProcessor.h.

◆ m_profileModuleName

std::string m_profileModuleName
protectedinherited

Name of the module which should be profiled, empty if no profiling is requested.

Definition at line 157 of file EventProcessor.h.

◆ m_sockets

std::vector<std::unique_ptr<zmq::socket_t> > m_sockets
private

The created sockets for unregistering workers. TODO: use connections.

Definition at line 59 of file HLTEventProcessor.h.

◆ m_steerRootInputModuleOn

bool m_steerRootInputModuleOn = false
protectedinherited

True if the SteerRootInputModule is in charge for event processing.

Definition at line 181 of file EventProcessor.h.


The documentation for this class was generated from the following files: