Belle II Software development
HLTEventProcessor Class Reference

EventProcessor to be used on the HLT with all specialities of the HLT processing: More...

#include <HLTEventProcessor.h>

Inheritance diagram for HLTEventProcessor:
EventProcessor

Public Member Functions

 HLTEventProcessor (const std::vector< std::string > &outputAddresses)
 Create a new event processor and store the ZMQ addresses where to unregister workers.
 
void process (PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
 Process the given path.
 
void process (const PathPtr &startPath, long maxEvent=0)
 Processes the full module chain, starting with the first module in the given path.
 
void setProfileModuleName (const std::string &name)
 Set the name of the module we want to profile.
 

Static Public Member Functions

static void writeToStdErr (const char msg[])
 async-safe method to write something to STDERR.
 
static void installSignalHandler (int sig, void(*fn)(int))
 Install a signal handler 'fn' for given signal.
 
static void installMainSignalHandlers (void(*fn)(int)=nullptr)
 Install signal handler for INT, TERM and QUIT signals.
 

Protected Member Functions

void processInitialize (const ModulePtrList &modulePathList, bool setEventInfo=true)
 Initializes the modules.
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
 Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.
 
void callEvent (Module *module)
 Calls event() on one single module, setting up logging and statistics as needed.
 
void processTerminate (const ModulePtrList &modulePathList)
 Terminates the modules.
 
void processBeginRun (bool skipDB=false)
 Calls the begin run methods of all modules.
 
void processEndRun ()
 Calls the end run methods of all modules.
 
long getMaximumEventNumber (long maxEvent) const
 Calculate the maximum event number out of the argument from command line and the environment.
 

Protected Attributes

const Modulem_master
 The master module that determines the experiment/run/event number.
 
ModulePtrList m_moduleList
 List of all modules in order initialized.
 
std::string m_profileModuleName
 Name of the module which should be profiled, empty if no profiling is requested.
 
Modulem_profileModule = nullptr
 Address of the module which we want to profile, nullptr if no profiling is requested.
 
StoreObjPtr< EventMetaDatam_eventMetaDataPtr
 EventMetaData is used by processEvent()/processCore().
 
EventMetaData m_previousEventMetaData
 Stores state of EventMetaData before it was last changed.
 
StoreObjPtr< EventExtraInfom_eventExtraInfo
 event extra info object pointer
 
StoreObjPtr< ProcessStatisticsm_processStatisticsPtr
 Also used in a number of places.
 
bool m_inRun
 Are we currently in a run? If yes, processEndRun() needs to do something.
 
double m_lastMetadataUpdate
 Time in seconds of last call for metadata update in event loop.
 
double m_metadataUpdateInterval
 Minimal time difference in seconds for metadata updates in event loop.
 
bool m_steerRootInputModuleOn = false
 True if the SteerRootInputModule is in charge for event processing.
 

Private Member Functions

void sendTerminatedMessage (unsigned int pid, bool waitForConfirmation)
 Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation (if requested)
 
void runWorkers (PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
 Fork out as much workers as requested and in each run the given path using processCore.
 
void processCore (PathPtr path)
 Process the path by basically calling processEvent until a termination is requested.
 
bool processEvent (PathIterator moduleIter, bool firstRound)
 Process a single event by iterating through the module path once.
 
std::pair< unsigned int, unsigned int > checkChildProcesses ()
 Check if one of the started processes has died.
 
void release ()
 Release the parent resource, which is needed after forking to not close it twice.
 
bool forkOut ()
 Helper function to fork out. Sets the Python state correctly and adds the process to the internal state.
 

Private Attributes

ZMQParent m_parent
 An instance of a ZMQParent to create sockets for unregistering workers.
 
std::vector< int > m_processList
 The current list of running processes (with their PIDs)
 
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
 The created sockets for unregistering workers. TODO: use connections.
 

Detailed Description

EventProcessor to be used on the HLT with all specialities of the HLT processing:

  • no input or output path - just workers
  • multiprocessing forked out after initialization, which is before the first real event is processed
  • restart of terminated workers (configurable) and unregistration and DQM server and collector
  • special run start/end handling: please check the HLTZMQ2DsModule for details
  • check of the child processes in an additional monitoring process (happens to be the parent process) by keeping track of the current state

This event processor is specialies to the HLT and should not be used apart from that. The event processor is exported as a python module. It can be called with

import hbasf2 hbasf2.process(path, [address, ...], True)

or if you want to put number on each folked process,

hbasf2.processNumbered(path, [address, ...], True(, True))

Definition at line 37 of file HLTEventProcessor.h.

Constructor & Destructor Documentation

◆ HLTEventProcessor()

HLTEventProcessor ( const std::vector< std::string > &  outputAddresses)

Create a new event processor and store the ZMQ addresses where to unregister workers.

Definition at line 76 of file HLTEventProcessor.cc.

77{
78 m_sockets.reserve(outputAddresses.size());
79 for (const auto& address : outputAddresses) {
80 m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, false));
81 }
82}
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
The created sockets for unregistering workers. TODO: use connections.
ZMQParent m_parent
An instance of a ZMQParent to create sockets for unregistering workers.
std::unique_ptr< zmq::socket_t > createSocket(const std::string &socketAddress, bool bind)
Create a socket of the given type with the given address and bind or not bind it.
Definition: ZMQParent.h:105

Member Function Documentation

◆ callEvent()

void callEvent ( Module module)
protectedinherited

Calls event() on one single module, setting up logging and statistics as needed.

Parameters
moduleModule to call the event() function

Definition at line 226 of file EventProcessor.cc.

227{
228 LogSystem& logSystem = LogSystem::Instance();
229 const bool collectStats = !Environment::Instance().getNoStats();
230 // set up logging
231 logSystem.updateModule(&(module->getLogConfig()), module->getName());
232 // set up statistics is requested
233 if (collectStats) m_processStatisticsPtr->startModule();
234 // call module
235 CALL_MODULE(module, event);
236 // stop timing
237 if (collectStats) m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_Event);
238 // reset logging
239 logSystem.updateModule(nullptr);
240};
bool getNoStats() const
Disable collection of statistics during event processing.
Definition: Environment.h:200
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:28
@ c_Event
Counting time/calls in event()

◆ checkChildProcesses()

std::pair< unsigned int, unsigned int > checkChildProcesses ( )
private

Check if one of the started processes has died.

If it has died with a non-zero exit code, increase the counter of workers to restart.

Returns the number of still alive workers and the number of workers needed to restart (aka the number of workers that died with a non-zero exit code) as a pair.

Definition at line 396 of file HLTEventProcessor.cc.

397{
398 unsigned int needToRestart = 0;
399
400 // Check for processes, which where there last time but are gone now (so they died)
401 for (auto iter = m_processList.begin(); iter != m_processList.end();) {
402 const auto& pid = *iter;
403
404 // check the status of this process pid
405 int status;
406 const int result = waitpid(pid, &status, WNOHANG);
407 if (result == -1) {
408 if (errno == EINTR) {
409 // interrupted, try again next time
410 ++iter;
411 continue;
412 } else {
413 B2FATAL("waitpid() failed.");
414 }
415 } else if (result == 0) {
416 // No change, so lets continue with the next worker
417 ++iter;
418 continue;
419 }
420
421 B2ASSERT("Do not understand the result of waitpid()", result == pid);
422
423 // state has changed, which means it is dead!
424 const auto exitCode = WEXITSTATUS(status);
425
426 // we only need to restart unexpected deads
427 if (exitCode != 0) {
428 B2WARNING("A worker process has died unexpected!");
429 needToRestart += 1;
430
431 sendTerminatedMessage(pid, true);
432 }
433
434 // once a process is gone from the global list, remove them from our own, too.
435 iter = m_processList.erase(iter);
436 }
437
438 return {m_processList.size(), needToRestart};
439}
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
std::vector< int > m_processList
The current list of running processes (with their PIDs)

◆ forkOut()

bool forkOut ( )
private

Helper function to fork out. Sets the Python state correctly and adds the process to the internal state.

Definition at line 449 of file HLTEventProcessor.cc.

450{
451 fflush(stdout);
452 fflush(stderr);
453
454 pid_t pid = fork();
455
456 if (pid > 0) {
457 g_processNumber++;
458 m_processList.push_back(pid);
459 return false;
460 } else if (pid < 0) {
461 B2FATAL("fork() failed: " << strerror(errno));
462 } else {
463 // Child process
464 // Reset some python state: signals, threads, gil in the child
465 PyOS_AfterFork_Child();
466 // InputController becomes useless in child process
468 // die when parent dies
469 prctl(PR_SET_PDEATHSIG, SIGHUP);
470
472 return true;
473 }
474}
static void resetForChildProcess()
Reset InputController (e.g.
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250

◆ getMaximumEventNumber()

long getMaximumEventNumber ( long  maxEvent) const
protectedinherited

Calculate the maximum event number out of the argument from command line and the environment.

Definition at line 113 of file EventProcessor.cc.

114{
115 //Check whether the number of events was set via command line argument
116 unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
117 if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
118 return numEventsArgument;
119 }
120 return maxEvent;
121}
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Definition: Environment.h:68

◆ installMainSignalHandlers()

void installMainSignalHandlers ( void(*)(int)  fn = nullptr)
staticinherited

Install signal handler for INT, TERM and QUIT signals.

If argument is NULL, EventProcessor's own signal handler will be installed.

Definition at line 315 of file EventProcessor.cc.

316{
317 if (!fn)
318 fn = signalHandler;
319 installSignalHandler(SIGINT, fn);
320 installSignalHandler(SIGTERM, fn);
321 installSignalHandler(SIGQUIT, fn);
322}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.

◆ installSignalHandler()

void installSignalHandler ( int  sig,
void(*)(int)  fn 
)
staticinherited

Install a signal handler 'fn' for given signal.

Definition at line 300 of file EventProcessor.cc.

301{
302 struct sigaction s;
303 memset(&s, '\0', sizeof(s));
304
305 s.sa_handler = fn;
306 sigemptyset(&s.sa_mask);
307 if (sig == SIGCHLD)
308 s.sa_flags |= SA_NOCLDSTOP; //don't produce signal when children are stopped
309
310 if (sigaction(sig, &s, nullptr) != 0) {
311 B2FATAL("Cannot setup signal handler for signal " << sig);
312 }
313}

◆ process() [1/2]

void process ( const PathPtr startPath,
long  maxEvent = 0 
)
inherited

Processes the full module chain, starting with the first module in the given path.

Processes all events for the given run number and for events from 0 to maxEvent. If maxEvent is smaller or equal 0 the maximum number check is disabled and all events are processed. If runNumber is smaller than 0, the run number has to be set externally by a module and not the given number is used.

Parameters
startPathThe processing starts with the first module of this path.
maxEventOptional: The maximum number of events that will be processed. If the number is smaller or equal 0, all events will be processed.

Definition at line 123 of file EventProcessor.cc.

124{
125 maxEvent = getMaximumEventNumber(maxEvent);
126 // Make sure the NumberEventsOverride reflects the actual number if
127 // process(path, N) was used instead of -n and that it's reset to what it was
128 // after we're done with processing()
129 NumberEventsOverrideGuard numberOfEventsOverrideGuard(maxEvent);
130
131 //Get list of modules which could be executed during the data processing.
132 ModulePtrList moduleList = startPath->buildModulePathList();
133
134 //Find the address of the module we want to profile
135 if (!m_profileModuleName.empty()) {
136 for (const auto& module : moduleList) {
137 if (module->getName() == m_profileModuleName) {
138 m_profileModule = module.get();
139 break;
140 }
141 }
142 if (!m_profileModule)
143 B2FATAL("Module profiling was requested via --profile, but no module '" << m_profileModuleName << "' was found!");
144 }
145
146 //Initialize modules
147 processInitialize(moduleList);
148
149 // SteerRootInputModule might have changed the number of events to be processed
150 for (const auto& module : moduleList) {
151 if (module->getType() == "SteerRootInput") {
152 if (maxEvent != Environment::Instance().getNumberEventsOverride()) {
153 B2INFO("Module 'SteerRootInputModule' is controlling the number of processed events.");
156 }
157 break;
158 }
159 }
160
161 //do we want to visualize DataStore input/output?
162 if (Environment::Instance().getVisualizeDataFlow()) {
163 DataFlowVisualization v(&DataStore::Instance().getDependencyMap());
164 v.visualizePath("dataflow.dot", *startPath);
165 }
166
167 //Don't start processing in case of no master module
168 if (!m_master) {
169 B2ERROR("There is no module that provides event and run numbers (EventMetaData). You must add either the EventInfoSetter or an input module (e.g. RootInput) to the beginning of your path.");
170 }
171
172 //Check if errors appeared. If yes, don't start the event processing.
174 if ((numLogError == 0) && m_master) {
176 try {
177 processCore(startPath, moduleList, maxEvent); //Do the event processing
178 } catch (StoppedBySignalException& e) {
179 if (e.signal != SIGINT) {
180 // close all open ROOT files, ROOT's exit handler will crash otherwise
181 gROOT->GetListOfFiles()->Delete();
182
183 B2FATAL(e.what());
184 }
185 //in case of SIGINT, we move on to processTerminate() to shut down safely
186 } catch (...) {
188 B2ERROR("Exception occurred in exp/run/evt: "
189 << m_eventMetaDataPtr->getExperiment() << " / "
190 << m_eventMetaDataPtr->getRun() << " / "
191 << m_eventMetaDataPtr->getEvent());
192 throw;
193 }
194
195 } else {
196 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
197 }
198
199 //Terminate modules
200 processTerminate(moduleList);
201
203
204 if (gSignalReceived == SIGINT) {
205 const auto msg = R"(Processing aborted via SIGINT, terminating.
206 Output files have been closed safely and should be readable. However
207 processing was NOT COMPLETE. The output files do contain only events
208 processed until this point.)";
210 B2ERROR(msg
211 << LogVar("last_experiment", m_eventMetaDataPtr->getExperiment())
212 << LogVar("last_run", m_eventMetaDataPtr->getRun())
213 << LogVar("last_event", m_eventMetaDataPtr->getEvent()));
214 else
215 B2ERROR(msg);
216 installSignalHandler(SIGINT, SIG_DFL);
217 raise(SIGINT);
218 }
219}
class to visualize data flow between modules.
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:53
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
std::string m_profileModuleName
Name of the module which should be profiled, empty if no profiling is requested.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
Module * m_profileModule
Address of the module which we want to profile, nullptr if no profiling is requested.
const Module * m_master
The master module that determines the experiment/run/event number.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
long getMaximumEventNumber(long maxEvent) const
Calculate the maximum event number out of the argument from command line and the environment.
bool m_steerRootInputModuleOn
True if the SteerRootInputModule is in charge for event processing.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
void printErrorSummary()
Print error/warning summary at end of execution.
Definition: LogSystem.cc:203
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:158
Class to store variables with their name which were sent to the logging service.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:583

◆ process() [2/2]

void process ( PathPtr  spath,
bool  restartFailedWorkers,
bool  appendProcessNumberToModuleName = false 
)

Process the given path.

If requested, restart failed workers (or not) For this,

  • first check if all modules have a parallel flag
  • then call initialize in the main process
  • fork out all workers and process the path
  • while monitoring their status
  • in the end kill remaining processes (if needed) and re-raise any collected signals

Definition at line 84 of file HLTEventProcessor.cc.

85{
86 using namespace std::chrono_literals;
87
88 m_moduleList = path->buildModulePathList();
89
90 // Assert path is what we want: fully parallel certified, not empty. Set first module to master module
91 B2ASSERT("You try to process an empty path!", not m_moduleList.empty());
92 for (const auto& module : m_moduleList) {
93 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
94 // entire conditional path must also be compatible
95 if (hasParallelFlag and module->hasCondition()) {
96 for (const auto& conditionPath : module->getAllConditionPaths()) {
98 hasParallelFlag = false;
99 }
100 }
101 }
102 B2ASSERT("Module with name " << module->getName() << " does not have parallel flag!", hasParallelFlag);
103 }
104
105 // Initialize of all modules (including event() of master module)
108
109 // Don't start processing in case of no master module
110 if (not m_master) {
111 B2ERROR("There is no module that provides event and run numbers (EventMetaData). "
112 "You must add the specific HLT module as first module to the path.");
113 }
114
115 // Check if errors appeared. If yes, don't start the event processing.
117 if (numLogError != 0) {
118 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
119 }
120
121 // Start the workers, which call the main loop
122 const int numProcesses = Environment::Instance().getNumberProcesses();
123 runWorkers(path, numProcesses, appendProcessNumberToModuleName);
124
125 installMainSignalHandlers(storeSignal);
126 // Back in the main process: wait for the processes and monitor them
127 int numberOfRestartedWorkers = 0;
128 while (true) {
129 // check if we have received any signal from the user or OS.
130 // Killing of the remaining processes happens after the loop.
131 if (g_signalReceived > 0) {
132 B2WARNING("Received a signal to go down.");
133 break;
134 }
135
136 // Test if we need more workers and if one has died
137 unsigned int presentWorkers;
138 unsigned int neededWorkers;
139
140 std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
141 if (neededWorkers > 0) {
142 if (restartFailedWorkers) {
143 runWorkers(path, neededWorkers);
144 numberOfRestartedWorkers += neededWorkers;
145 } else {
146 B2ERROR("A worker failed. Will try to end the process smoothly now.");
147 break;
148 }
149 } else if (presentWorkers == 0) {
150 B2DEBUG(10, "All workers have cleanly exited. Will now also exit");
151 break;
152 }
153
154 if (numberOfRestartedWorkers > numProcesses) {
155 B2ERROR("I needed to restart on total " << numberOfRestartedWorkers << ", which I think is abnormal. "
156 "Will terminate the process now!");
157 break;
158 }
159
160 std::this_thread::sleep_for(10ms);
161 }
162
163 if (appendProcessNumberToModuleName) {
164 for (const int& pid : m_processList) {
165 B2INFO(g_processNumber << ": Send SIGINT to " << pid);
166 kill(pid, SIGINT);
167 }
168 for (const int& pid : m_processList) {
169 int count = 0;
170 while (true) {
171 // Do not allow internal SIGKILL to prevent data loss in case of a numbered process
172 if (kill(pid, 0) != 0) {
173 break;
174 }
175 B2DEBUG(10, g_processNumber << ": Checking process termination, count = " << count);
176 std::this_thread::sleep_for(1000ms);
177 if (count % 5 == 1) kill(pid, SIGINT);
178 ++count;
179 }
180 }
181 }
182
184
185 // if we still have/had processes, we should unregister them
186 std::this_thread::sleep_for(500ms);
187
188 for (const int& pid : m_processList) {
189 if (kill(pid, SIGKILL) >= 0) {
190 B2WARNING("Needed to hard kill process " << pid);
191 } else {
192 B2DEBUG(100, "no process " << pid << " found, already gone?");
193 }
194 sendTerminatedMessage(pid, false);
195 }
196 m_processList.clear();
197
198 B2DEBUG(10, "Done here");
199
200 // Normally, we would call terminate here, but not on HLT!
201 // Normally, we would print the error summary here, but not on HLT!
202 if (g_signalReceived == SIGINT) {
203 installSignalHandler(SIGINT, SIG_DFL);
204 raise(SIGINT);
205 }
206}
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:158
ModulePtrList m_moduleList
List of all modules in order initialized.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80

◆ processBeginRun()

void processBeginRun ( bool  skipDB = false)
protectedinherited

Calls the begin run methods of all modules.

Loops over all module instances specified in a list and calls their beginRun() method. Please note: the beginRun() method of the module which triggered the beginRun() loop will also be called.

Definition at line 470 of file EventProcessor.cc.

471{
472 MetadataService::Instance().addBasf2Status("beginning run");
473
474 m_inRun = true;
475 auto dbsession = Database::Instance().createScopedUpdateSession(); // cppcheck-suppress unreadVariable
476
477 LogSystem& logSystem = LogSystem::Instance();
478 m_processStatisticsPtr->startGlobal();
479
480 if (!skipDB) DBStore::Instance().update();
481
482 //initialize random generator for end run
484
485 for (const ModulePtr& modPtr : m_moduleList) {
486 Module* module = modPtr.get();
487
488 //Set the module dependent log level
489 logSystem.updateModule(&(module->getLogConfig()), module->getName());
490
491 //Do beginRun() call
492 m_processStatisticsPtr->startModule();
493 CALL_MODULE(module, beginRun);
495
496 //Set the global log level
497 logSystem.updateModule(nullptr);
498 }
499
501}
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
void addBasf2Status(const std::string &message="")
Add metadata of basf2 status.
static MetadataService & Instance()
Static method to get a reference to the MetadataService instance.
@ c_BeginRun
Counting time/calls in beginRun()
Base class for Modules.
Definition: Module.h:72
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static Database & Instance()
Instance of a singleton Database.
Definition: Database.cc:41
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:26
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
void update()
Updates all objects that are outside their interval of validity.
Definition: DBStore.cc:77
ScopeGuard createScopedUpdateSession()
Make sure we have efficient http pipelinging during initialize/beginRun but don't keep session alive ...
Definition: Database.cc:61

◆ processCore() [1/2]

void processCore ( const PathPtr startPath,
const ModulePtrList modulePathList,
long  maxEvent = 0,
bool  isInputProcess = true 
)
protectedinherited

Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.

Parameters
startPathThe processing starts with the first module of this path.
modulePathListA list of all modules which could be executed during the data processing (used for calling the beginRun() and endRun() method).
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events are processed.
isInputProcesstrue when this is either the only or the input process

Definition at line 409 of file EventProcessor.cc.

410{
412 m_moduleList = modulePathList;
413 //Remember the previous event meta data, and identify end of data meta data
414 m_previousEventMetaData.setEndOfData(); //invalid start state
415
416 const bool collectStats = !Environment::Instance().getNoStats();
417
418 //Loop over the events
419 long currEvent = 0;
420 bool endProcess = false;
421 while (!endProcess) {
422 if (collectStats)
423 m_processStatisticsPtr->startGlobal();
424
425 PathIterator moduleIter(startPath);
426 endProcess = processEvent(moduleIter, isInputProcess && currEvent == 0);
427
428 //Delete event related data in DataStore
430
431 currEvent++;
432 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
433 if (collectStats)
435 } //end event loop
436
437 //End last run
438 m_eventMetaDataPtr.create();
440}
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:93
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:714
void setEndOfData()
Marks the end of the data processing.
void processEndRun()
Calls the end run methods of all modules.
bool processEvent(PathIterator moduleIter, bool skipMasterModule)
Calls event() functions on all modules for the current event.
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26

◆ processCore() [2/2]

void processCore ( PathPtr  path)
private

Process the path by basically calling processEvent until a termination is requested.

# Will not any initialization - it is assumed this has already happened before. In the end, terminate is called.

Definition at line 249 of file HLTEventProcessor.cc.

250{
251 bool terminationRequested = false;
252 bool firstRound = true;
253
254 // Initialisation is done
256
257 // Set the previous event meta data to something invalid
259
260 while (not terminationRequested) {
261 B2DEBUG(100, "Processing new event");
262
263 // Start the measurement
264 m_processStatisticsPtr->startGlobal();
265
266 // Main call to event() of the modules, and maybe beginRun() and endRun()
267 PathIterator moduleIter(path);
268 terminationRequested = processEvent(moduleIter, firstRound);
269
270 // Delete event related data in DataStore
272
273 // Stop the measurement
275
276 // We are surely not in the first round the next time
277 firstRound = false;
278 }
279
280 // End last run with a terminate. Yes, we are not calling a endRun() here and yes, we are calling this in the worker
281 B2DEBUG(10, "Calling terminate");
282 m_eventMetaDataPtr.create();
284}
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.

◆ processEndRun()

void processEndRun ( )
protectedinherited

Calls the end run methods of all modules.

Loops over all module instances specified in a list and calls their endRun() method. Please note: the endRun() method of the module which triggered the endRun() loop will also be called.

Definition at line 504 of file EventProcessor.cc.

505{
507
508 if (!m_inRun)
509 return;
510 m_inRun = false;
511
512 LogSystem& logSystem = LogSystem::Instance();
513 m_processStatisticsPtr->startGlobal();
514
515 const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
517
518 //initialize random generator for end run
520
521 for (const ModulePtr& modPtr : m_moduleList) {
522 Module* module = modPtr.get();
523
524 //Set the module dependent log level
525 logSystem.updateModule(&(module->getLogConfig()), module->getName());
526
527 //Do endRun() call
528 m_processStatisticsPtr->startModule();
529 CALL_MODULE(module, endRun);
531
532 //Set the global log level
533 logSystem.updateModule(nullptr);
534 }
535 *m_eventMetaDataPtr = newEventMetaData;
536
538}
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
@ c_EndRun
Counting time/calls in endRun()
static void initializeEndRun()
Initialize run independent random generator for end run.

◆ processEvent()

bool processEvent ( PathIterator  moduleIter,
bool  firstRound 
)
private

Process a single event by iterating through the module path once.

In principle very similar to the EventProcessor::processEvent function, but has different assumptions for the run changes happening induced by the master module (which is always the ZMQ2Ds module in the HLT case).

The logic happening after the master module is the following:

  • if an EndOfData is set in the event meta data, just break out
  • if a HLT-specific EndOfRun is set, call the end of run methods of all modules (without begin run)
  • if the previous event meta data has the EndOfRun/Data set, call the begin run functions
  • if the run change was induced due to a changing run number, call begin and end run functions

The rest (e.g. module conditions, db store) is the same as the EventProcessor case.

Definition at line 286 of file HLTEventProcessor.cc.

287{
288 while (not moduleIter.isDone()) {
289 Module* module = moduleIter.get();
290 B2DEBUG(10, "Starting event of " << module->getName());
291
292 // The actual call of the event function
293 if (module != m_master) {
294 // If this is not the master module it is quite simple: just call the event function
295 callEvent(module);
296
297 // Check for a second master module. Cannot do this if we are in the first round after initialize
298 // (as previous event meta data is not set properly here)
301 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and "
302 << module->getName());
303 }
304 } else {
305 if (not firstRound) {
306 // Also call the event function for the master, but not the first time
307 callEvent(module);
308 }
309 // initialize random number state for the event handling after we have
310 // recieved the event information from the master module.
312 }
313
314 if (g_signalReceived != 0) {
315 if (g_signalReceived != SIGINT) {
316 throw StoppedBySignalException(g_signalReceived);
317 } else {
318 B2DEBUG(10, "Received a SIGINT in the worker process...");
319 return true;
320 }
321 }
322
323 B2ASSERT("The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
324
325 if (m_eventMetaDataPtr->isEndOfData()) {
326 // Immediately leave the loop and terminate (true)
327 return true;
328 }
329
330 if (module == m_master and not firstRound) {
331 if (m_eventMetaDataPtr->isEndOfRun()) {
332 B2DEBUG(10, "Calling endRun()");
333 // call endRun() of all modules (internally uses the previous event meta data) and skip to the next event
334 m_processStatisticsPtr->suspendGlobal();
335 m_inRun = true;
337 m_processStatisticsPtr->resumeGlobal();
338
339 // Store the current event meta data for the next round
341
342 // Leave this event, but not the full processing (false)
343 return false;
345 // The run has changes (or we never had one), so call beginRun() before going on
346 // The run number should not be 0
347 if (m_eventMetaDataPtr->getRun() != 0) {
348 m_processStatisticsPtr->suspendGlobal();
350 m_processStatisticsPtr->resumeGlobal();
351 } else {
352 return false;
353 }
354 }
355
356 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
358 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
360 if (runChangedWithoutNotice) {
361 m_processStatisticsPtr->suspendGlobal();
362
363 m_inRun = true;
366
367 m_processStatisticsPtr->resumeGlobal();
368 }
369
370 // make sure we use the event dependent generator again
372
373 // and the correct database
375
376 // Store the current event meta data for the next round
378 }
379
380 // Check for the module conditions, evaluate them and if one is true, switch to the new path
381 if (module->evalCondition()) {
382 PathPtr condPath = module->getConditionPath();
383 // continue with parent Path after condition path is executed?
384 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
385 moduleIter = PathIterator(condPath, moduleIter);
386 } else {
387 moduleIter = PathIterator(condPath);
388 }
389 } else {
390 moduleIter.next();
391 }
392 }
393 return false;
394}
int getRun() const
Run Getter.
bool isEndOfRun() const
is end-of-run set? (see setEndOfRun()).
int getExperiment() const
Experiment Getter.
bool isEndOfData() const
is end-of-data set? (see setEndOfData()).
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:186
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:140

◆ processInitialize()

void processInitialize ( const ModulePtrList modulePathList,
bool  setEventInfo = true 
)
protectedinherited

Initializes the modules.

Loops over all module instances specified in a list and calls their initialize() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.
setEventInfoif true the first event call of the master module will be called immediately to load the event info right away so that it's available for subsequent modules

Definition at line 242 of file EventProcessor.cc.

243{
244 LogSystem& logSystem = LogSystem::Instance();
246
247 m_processStatisticsPtr.registerInDataStore();
248 //TODO I might want to overwrite it in initialize (e.g. if read from file)
249 // For parallel processing or subevents, I don't want that, though.
250 // Maybe make this a function argument?
252 m_processStatisticsPtr.create();
253 m_processStatisticsPtr->startGlobal();
254
256
257 // EventExtraInfo is needed in several modules so register it here
258 m_eventExtraInfo.registerInDataStore();
259
260 for (const ModulePtr& modPtr : modulePathList) {
261 Module* module = modPtr.get();
262
263 if (module->hasUnsetForcedParams()) {
264 //error message was printed by module
265 continue;
266 }
267
268 //Set the module dependent log level
269 logSystem.updateModule(&(module->getLogConfig()), module->getName());
271
272 //Do initialization
273 m_processStatisticsPtr->initModule(module);
274 m_processStatisticsPtr->startModule();
275 CALL_MODULE(module, initialize);
277
278 //Set the global log level
279 logSystem.updateModule(nullptr);
280
281 //Check whether this is the master module
282 if (!m_master && DataStore::Instance().getEntry(m_eventMetaDataPtr) != nullptr) {
283 B2DEBUG(100, "Found module providing EventMetaData: " << module->getName());
284 m_master = module;
285 if (setEventInfo) {
286 callEvent(module);
287 // update Database payloads: we now have valid event meta data unless
288 // we don't process any events
290 }
291 }
292
293 if (gSignalReceived != 0) {
294 throw StoppedBySignalException(gSignalReceived);
295 }
296 }
298}
DependencyMap & getDependencyMap()
Return map of dependencies between modules.
Definition: DataStore.h:524
void setModule(const Module &mod)
Set the current module (for getCurrentModuleInfo())
Definition: DependencyMap.h:60
StoreObjPtr< EventExtraInfo > m_eventExtraInfo
event extra info object pointer
@ c_Init
Counting time/calls in initialize()

◆ processTerminate()

void processTerminate ( const ModulePtrList modulePathList)
protectedinherited

Terminates the modules.

Loops over all module instances in reverse order specified in a list and calls their terminate() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.

Definition at line 443 of file EventProcessor.cc.

444{
446
447 LogSystem& logSystem = LogSystem::Instance();
448 ModulePtrList::const_reverse_iterator listIter;
449 m_processStatisticsPtr->startGlobal();
450
451 for (listIter = modulePathList.rbegin(); listIter != modulePathList.rend(); ++listIter) {
452 Module* module = listIter->get();
453
454 //Set the module dependent log level
455 logSystem.updateModule(&(module->getLogConfig()), module->getName());
456
457 //Do termination
458 m_processStatisticsPtr->startModule();
459 CALL_MODULE(module, terminate);
461
462 //Set the global log level
463 logSystem.updateModule(nullptr);
464 }
465
467}
@ c_Term
Counting time/calls in terminate()

◆ release()

void release ( )
private

Release the parent resource, which is needed after forking to not close it twice.

Definition at line 441 of file HLTEventProcessor.cc.

442{
443 for (auto& socket : m_sockets) {
444 socket.release();
445 }
446 m_parent.reset();
447}
void reset()
Expert function: Reset the parent without context closing. ATTENTION: which will not clean up properl...
Definition: ZMQParent.cc:27

◆ runWorkers()

void runWorkers ( PathPtr  path,
unsigned int  numProcesses,
bool  appendProcessNumberToModuleName = false 
)
private

Fork out as much workers as requested and in each run the given path using processCore.

Definition at line 208 of file HLTEventProcessor.cc.

209{
210 for (unsigned int i = 0; i < numProcesses; i++) {
211 if (forkOut()) {
212 // Do only run in forked out worker process:
213 B2DEBUG(10, "Starting a new worker process");
214 // Reset the parent and sockets
215 release();
216
217 // Start the main loop with our signal handling and error catching
218 installMainSignalHandlers(storeSignal);
219 try {
220 if (appendProcessNumberToModuleName) {
221 for (const auto& module : m_moduleList) {
222 module->setName(std::to_string(g_processNumber) + std::string("_") + module->getName());
223 B2INFO("New worker name is " << module->getName());
224 }
225 }
226 processCore(path);
227 } catch (StoppedBySignalException& e) {
228 // close all open ROOT files, ROOT's exit handler will crash otherwise
229 gROOT->GetListOfFiles()->Delete();
230
231 B2ERROR(e.what());
232 exit(1);
233 } catch (...) {
235 B2ERROR("Exception occured in exp/run/evt: "
236 << m_eventMetaDataPtr->getExperiment() << " / "
237 << m_eventMetaDataPtr->getRun() << " / "
238 << m_eventMetaDataPtr->getEvent());
239 throw;
240 }
241
242 B2DEBUG(10, "Ending a worker process here.");
243 // Ok, we are done here!
244 exit(0);
245 }
246 }
247}
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void release()
Release the parent resource, which is needed after forking to not close it twice.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.

◆ sendTerminatedMessage()

void sendTerminatedMessage ( unsigned int  pid,
bool  waitForConfirmation 
)
private

Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation (if requested)

Definition at line 57 of file HLTEventProcessor.cc.

58{
59 for (auto& socket : m_sockets) {
60 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage,
62 ZMQParent::send(socket, std::move(message));
63
64 if (not waitForConfirmation) {
65 continue;
66 }
67 if (ZMQParent::poll({socket.get()}, 10 * 1000)) {
68 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69 B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
70 } else {
71 B2FATAL("Did not receive a confirmation message! waitForConfirmation is " << waitForConfirmation);
72 }
73 }
74}
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:32
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153

◆ setProfileModuleName()

void setProfileModuleName ( const std::string &  name)
inlineinherited

Set the name of the module we want to profile.

Parameters
nameName of the module as returned by getName()

Definition at line 58 of file EventProcessor.h.

58{ m_profileModuleName = name; }

◆ writeToStdErr()

void writeToStdErr ( const char  msg[])
staticinherited

async-safe method to write something to STDERR.

Definition at line 72 of file EventProcessor.cc.

73{
74 //signal handlers are called asynchronously, making many standard functions (including output) dangerous
75 //write() is, however, safe, so we'll use that to write to stderr.
76
77 //strlen() not explicitly in safe list, but doesn't have any error handling routines that might alter global state
78 const int len = strlen(msg);
79
80 int rc = write(STDERR_FILENO, msg, len);
81 (void) rc; //ignore return value (there's nothing we can do about a failed write)
82
83}

Member Data Documentation

◆ m_eventExtraInfo

StoreObjPtr<EventExtraInfo> m_eventExtraInfo
protectedinherited

event extra info object pointer

Definition at line 170 of file EventProcessor.h.

◆ m_eventMetaDataPtr

StoreObjPtr<EventMetaData> m_eventMetaDataPtr
protectedinherited

EventMetaData is used by processEvent()/processCore().

Definition at line 164 of file EventProcessor.h.

◆ m_inRun

bool m_inRun
protectedinherited

Are we currently in a run? If yes, processEndRun() needs to do something.

Definition at line 176 of file EventProcessor.h.

◆ m_lastMetadataUpdate

double m_lastMetadataUpdate
protectedinherited

Time in seconds of last call for metadata update in event loop.

Definition at line 179 of file EventProcessor.h.

◆ m_master

const Module* m_master
protectedinherited

The master module that determines the experiment/run/event number.

Definition at line 154 of file EventProcessor.h.

◆ m_metadataUpdateInterval

double m_metadataUpdateInterval
protectedinherited

Minimal time difference in seconds for metadata updates in event loop.

Definition at line 182 of file EventProcessor.h.

◆ m_moduleList

ModulePtrList m_moduleList
protectedinherited

List of all modules in order initialized.

Definition at line 155 of file EventProcessor.h.

◆ m_parent

ZMQParent m_parent
private

An instance of a ZMQParent to create sockets for unregistering workers.

Definition at line 55 of file HLTEventProcessor.h.

◆ m_previousEventMetaData

EventMetaData m_previousEventMetaData
protectedinherited

Stores state of EventMetaData before it was last changed.

Useful since processEndRun() needs info about which run it needs to end.

Definition at line 167 of file EventProcessor.h.

◆ m_processList

std::vector<int> m_processList
private

The current list of running processes (with their PIDs)

Definition at line 57 of file HLTEventProcessor.h.

◆ m_processStatisticsPtr

StoreObjPtr<ProcessStatistics> m_processStatisticsPtr
protectedinherited

Also used in a number of places.

Definition at line 173 of file EventProcessor.h.

◆ m_profileModule

Module* m_profileModule = nullptr
protectedinherited

Address of the module which we want to profile, nullptr if no profiling is requested.

Definition at line 161 of file EventProcessor.h.

◆ m_profileModuleName

std::string m_profileModuleName
protectedinherited

Name of the module which should be profiled, empty if no profiling is requested.

Definition at line 158 of file EventProcessor.h.

◆ m_sockets

std::vector<std::unique_ptr<zmq::socket_t> > m_sockets
private

The created sockets for unregistering workers. TODO: use connections.

Definition at line 59 of file HLTEventProcessor.h.

◆ m_steerRootInputModuleOn

bool m_steerRootInputModuleOn = false
protectedinherited

True if the SteerRootInputModule is in charge for event processing.

Definition at line 185 of file EventProcessor.h.


The documentation for this class was generated from the following files: