Belle II Software development
pEventProcessor Class Reference

This class provides the core event processing loop for parallel processing. More...

#include <pEventProcessor.h>

Inheritance diagram for pEventProcessor:
EventProcessor

Public Member Functions

 pEventProcessor ()
 Constructor.
 
virtual ~pEventProcessor ()
 Destructor.
 
void process (const PathPtr &spath, long maxEvent)
 Processes the full module chain, starting with the first module in the given path.
 
void gotSigINT ()
 signal handler for Ctrl+C (async-safe)
 
void killRingBuffers ()
 signal handler (async-safe)
 
void cleanup ()
 clean up IPC resources (should only be called in one process).
 
void setProfileModuleName (const std::string &name)
 Set the name of the module we want to profile.
 

Static Public Member Functions

static void writeToStdErr (const char msg[])
 async-safe method to write something to STDERR.
 
static void installSignalHandler (int sig, void(*fn)(int))
 Install a signal handler 'fn' for given signal.
 
static void installMainSignalHandlers (void(*fn)(int)=nullptr)
 Install signal handler for INT, TERM and QUIT signals.
 

Protected Member Functions

void processInitialize (const ModulePtrList &modulePathList, bool setEventInfo=true)
 Initializes the modules.
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
 Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.
 
bool processEvent (PathIterator moduleIter, bool skipMasterModule)
 Calls event() functions on all modules for the current event.
 
void callEvent (Module *module)
 Calls event() on one single module, setting up logging and statistics as needed.
 
void processTerminate (const ModulePtrList &modulePathList)
 Terminates the modules.
 
void processBeginRun (bool skipDB=false)
 Calls the begin run methods of all modules.
 
void processEndRun ()
 Calls the end run methods of all modules.
 
long getMaximumEventNumber (long maxEvent) const
 Calculate the maximum event number out of the argument from command line and the environment.
 

Protected Attributes

const Modulem_master
 The master module that determines the experiment/run/event number.
 
ModulePtrList m_moduleList
 List of all modules in order initialized.
 
std::string m_profileModuleName
 Name of the module which should be profiled, empty if no profiling is requested.
 
Modulem_profileModule = nullptr
 Address of the module which we want to profile, nullptr if no profiling is requested.
 
StoreObjPtr< EventMetaDatam_eventMetaDataPtr
 EventMetaData is used by processEvent()/processCore().
 
EventMetaData m_previousEventMetaData
 Stores state of EventMetaData before it was last changed.
 
StoreObjPtr< EventExtraInfom_eventExtraInfo
 event extra info object pointer
 
StoreObjPtr< ProcessStatisticsm_processStatisticsPtr
 Also used in a number of places.
 
bool m_inRun
 Are we currently in a run? If yes, processEndRun() needs to do something.
 
double m_lastMetadataUpdate
 Time in seconds of last call for metadata update in event loop.
 
double m_metadataUpdateInterval
 Minimal time difference in seconds for metadata updates in event loop.
 
bool m_steerRootInputModuleOn = false
 True if the SteerRootInputModule is in charge for event processing.
 

Private Member Functions

void analyzePath (const PathPtr &path)
 Analyze given path.
 
void preparePaths ()
 Adds internal modules to paths, prepare RingBuffers.
 
RingBufferconnectViaRingBuffer (const char *name, const PathPtr &a, PathPtr &b)
 Create RingBuffer with name from given environment variable, add Tx and Rx modules to a and b.
 
void dump_modules (const std::string &, const ModulePtrList &)
 Dump module names in the ModulePtrList.
 
void clearFileList ()
 TFiles are stored in a global list and cleaned up by root since this will happen in all forked processes, these will be corrupted if we don't clean the list!
 

Static Private Member Functions

static ModulePtrList getModulesWithFlag (const ModulePtrList &modules, Module::EModulePropFlags flag)
 Return only modules which have the given Module flag set.
 
static ModulePtrList getModulesWithoutFlag (const ModulePtrList &modules, Module::EModulePropFlags flag)
 Return only modules which do not have the given Module flag set.
 
static void prependModulesIfNotPresent (ModulePtrList *modules, const ModulePtrList &prependModules)
 Prepend given 'prependModules' to 'modules', if they're not already present.
 

Private Attributes

std::unique_ptr< ProcHandlerm_procHandler
 handler to fork and manage processes.
 
PathPtr m_inputPath
 Input path.
 
PathPtr m_mainPath
 Main (parallel section) path.
 
PathPtr m_outputPath
 Output path.
 
RingBufferm_rbin = nullptr
 input RingBuffer
 
RingBufferm_rbout = nullptr
 output RingBuffer
 
ModulePtr m_histoman
 Pointer to HistoManagerModule, or nullptr if not found.
 

Detailed Description

This class provides the core event processing loop for parallel processing.

Definition at line 28 of file pEventProcessor.h.

Constructor & Destructor Documentation

◆ pEventProcessor()

Constructor.

Definition at line 62 of file pEventProcessor.cc.

63 m_histoman(nullptr)
64{
65 g_pEventProcessor = this;
66}
EventProcessor()
Constructor.
ModulePtr m_histoman
Pointer to HistoManagerModule, or nullptr if not found.

◆ ~pEventProcessor()

~pEventProcessor ( )
virtual

Destructor.

Definition at line 69 of file pEventProcessor.cc.

70{
71 cleanup();
72 g_pEventProcessor = nullptr;
73}
void cleanup()
clean up IPC resources (should only be called in one process).

Member Function Documentation

◆ analyzePath()

void analyzePath ( const PathPtr path)
private

Analyze given path.

Fills m_*path objects.

Definition at line 296 of file pEventProcessor.cc.

297{
298 //modules that can be parallelised, but should not go into a parallel section by themselves
299 std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
300
301 PathPtr inpath(new Path);
302 PathPtr mainpath(new Path);
303 PathPtr outpath(new Path);
304
305 int stage = 0; //0: in, 1: event/main, 2: out
306 for (const ModulePtr& module : path->getModules()) {
307 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
308 //entire conditional path must also be compatible
309 if (hasParallelFlag and module->hasCondition()) {
310 for (const auto& conditionPath : module->getAllConditionPaths()) {
312 hasParallelFlag = false;
313 }
314 }
315 }
316
317 //update stage?
318 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
319 stage++;
320
321 if (stage == 2) {
322 bool path_is_useful = false;
323 for (const auto& parallelModule : mainpath->getModules()) {
324 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
325 path_is_useful = true;
326 break;
327 }
328 }
329 if (not path_is_useful) {
330 //merge mainpath back into input path
331 inpath->addPath(mainpath);
332 mainpath.reset(new Path);
333 //and search for further parallel sections
334 stage = 0;
335 }
336 }
337 }
338
339 if (stage == 0) { //fill input path
340 inpath->addModule(module);
341
342 if (module->hasProperties(Module::c_HistogramManager)) {
343 // Initialize histogram manager if found in the path
344 m_histoman = module;
345
346 //add histoman to other paths
347 mainpath->addModule(m_histoman);
348 outpath->addModule(m_histoman);
349 }
350 }
351
352 if (stage == 1)
353 mainpath->addModule(module);
354 if (stage == 2)
355 outpath->addModule(module);
356 }
357
358 bool createAllPaths = false; //usually we might not need e.g. an output path
359 for (const ModulePtr& module : path->getModules()) {
360 if (module->hasProperties(Module::c_TerminateInAllProcesses))
361 createAllPaths = true; //ensure there are all kinds of processes
362 }
363
364 //if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
365 if (!mainpath->isEmpty())
366 m_mainPath = mainpath;
367 if (createAllPaths or !inpath->isEmpty())
368 m_inputPath = inpath;
369 if (createAllPaths or !outpath->isEmpty())
370 m_outputPath = outpath;
371}
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
PathPtr m_outputPath
Output path.
PathPtr m_mainPath
Main (parallel section) path.
PathPtr m_inputPath
Input path.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43

◆ callEvent()

void callEvent ( Module module)
protectedinherited

Calls event() on one single module, setting up logging and statistics as needed.

Parameters
moduleModule to call the event() function

Definition at line 226 of file EventProcessor.cc.

227{
228 LogSystem& logSystem = LogSystem::Instance();
229 const bool collectStats = !Environment::Instance().getNoStats();
230 // set up logging
231 logSystem.updateModule(&(module->getLogConfig()), module->getName());
232 // set up statistics is requested
233 if (collectStats) m_processStatisticsPtr->startModule();
234 // call module
235 CALL_MODULE(module, event);
236 // stop timing
237 if (collectStats) m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_Event);
238 // reset logging
239 logSystem.updateModule(nullptr);
240};
bool getNoStats() const
Disable collection of statistics during event processing.
Definition: Environment.h:200
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:28
@ c_Event
Counting time/calls in event()

◆ cleanup()

void cleanup ( )

clean up IPC resources (should only be called in one process).

Definition at line 75 of file pEventProcessor.cc.

76{
78 delete m_rbin;
79 m_rbin = nullptr;
80 delete m_rbout;
81 m_rbout = nullptr;
82 }
83}
static bool isOutputProcess()
Return true if the process is an output process.
Definition: ProcHandler.cc:232
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
Definition: ProcHandler.cc:226
RingBuffer * m_rbin
input RingBuffer
RingBuffer * m_rbout
output RingBuffer

◆ clearFileList()

void clearFileList ( )
private

TFiles are stored in a global list and cleaned up by root since this will happen in all forked processes, these will be corrupted if we don't clean the list!

needs to be called at the end of every process.

Definition at line 99 of file pEventProcessor.cc.

100{
101 //B2WARNING("list of files: " << gROOT->GetListOfFiles()->GetEntries());
102 //clear list, but don't actually delete the objects
103 gROOT->GetListOfFiles()->Clear("nodelete");
104}

◆ connectViaRingBuffer()

RingBuffer * connectViaRingBuffer ( const char *  name,
const PathPtr a,
PathPtr b 
)
private

Create RingBuffer with name from given environment variable, add Tx and Rx modules to a and b.

Definition at line 373 of file pEventProcessor.cc.

374{
375 //create ringbuffers and add rx/tx where needed
376 const char* inrbname = getenv(name);
377 RingBuffer* rbuf;
378 if (inrbname == nullptr) {
379 rbuf = new RingBuffer();
380 } else {
381 string rbname(inrbname + to_string(0)); //currently at most one input, one output buffer
382 rbuf = new RingBuffer(rbname.c_str(), RingBuffer::c_DefaultSize);
383 }
384
385 // Insert Tx at the end of current path
386 ModulePtr txptr(new TxModule(rbuf));
387 a->addModule(txptr);
388 // Insert Rx at beginning of next path
389 ModulePtr rxptr(new RxModule(rbuf));
390 PathPtr newB(new Path());
391 newB->addModule(rxptr);
392 newB->addPath(b);
393 b.swap(newB);
394
395 return rbuf;
396}
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
static const int c_DefaultSize
Standard size of buffer, in integers (~60MB).
Definition: RingBuffer.h:42
Module to decode data store contents from RingBuffer.
Definition: RxModule.h:25
Module for encoding data store contents into a RingBuffer.
Definition: TxModule.h:25

◆ dump_modules()

void dump_modules ( const std::string &  title,
const ModulePtrList modlist 
)
private

Dump module names in the ModulePtrList.

Definition at line 413 of file pEventProcessor.cc.

414{
415 ModulePtrList::const_iterator it;
416 std::ostringstream strbuf;
417 strbuf << title << " : ";
418 for (it = modlist.begin(); it != modlist.end(); ++it) {
419 const Module* module = it->get();
420 strbuf << module->getName();
421 if (module->hasCondition()) {
422 for (const auto& conditionPath : module->getAllConditionPaths()) {
423 strbuf << "[->" << conditionPath.get() << "] ";
424 }
425 }
426 if (*it != modlist.back())
427 strbuf << " -> ";
428 }
429 B2INFO(strbuf.str());
430}
Base class for Modules.
Definition: Module.h:72

◆ getMaximumEventNumber()

long getMaximumEventNumber ( long  maxEvent) const
protectedinherited

Calculate the maximum event number out of the argument from command line and the environment.

Definition at line 113 of file EventProcessor.cc.

114{
115 //Check whether the number of events was set via command line argument
116 unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
117 if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
118 return numEventsArgument;
119 }
120 return maxEvent;
121}
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Definition: Environment.h:68

◆ getModulesWithFlag()

ModulePtrList getModulesWithFlag ( const ModulePtrList modules,
Module::EModulePropFlags  flag 
)
staticprivate

Return only modules which have the given Module flag set.

Definition at line 433 of file pEventProcessor.cc.

434{
435 ModulePtrList tmpModuleList;
436 for (const ModulePtr& m : modules) {
437 if (m->hasProperties(flag))
438 tmpModuleList.push_back(m);
439 }
440
441 return tmpModuleList;
442}
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:583

◆ getModulesWithoutFlag()

ModulePtrList getModulesWithoutFlag ( const ModulePtrList modules,
Module::EModulePropFlags  flag 
)
staticprivate

Return only modules which do not have the given Module flag set.

Definition at line 443 of file pEventProcessor.cc.

444{
445 ModulePtrList tmpModuleList;
446 for (const ModulePtr& m : modules) {
447 if (!m->hasProperties(flag))
448 tmpModuleList.push_back(m);
449 }
450 return tmpModuleList;
451}

◆ gotSigINT()

void gotSigINT ( )

signal handler for Ctrl+C (async-safe)

When called the first time, does nothing (input process handles SIGINT by itself). On subsequent calls, RingBuffers are cleared, discarding any events that have been partly produced (mostly equivalent to previous behaviour on Ctrl+C)

Definition at line 86 of file pEventProcessor.cc.

87{
88 EventProcessor::writeToStdErr("\nStopping basf2...\n");
90}
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
void killRingBuffers()
signal handler (async-safe)

◆ installMainSignalHandlers()

void installMainSignalHandlers ( void(*)(int)  fn = nullptr)
staticinherited

Install signal handler for INT, TERM and QUIT signals.

If argument is NULL, EventProcessor's own signal handler will be installed.

Definition at line 315 of file EventProcessor.cc.

316{
317 if (!fn)
318 fn = signalHandler;
319 installSignalHandler(SIGINT, fn);
320 installSignalHandler(SIGTERM, fn);
321 installSignalHandler(SIGQUIT, fn);
322}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.

◆ installSignalHandler()

void installSignalHandler ( int  sig,
void(*)(int)  fn 
)
staticinherited

Install a signal handler 'fn' for given signal.

Definition at line 300 of file EventProcessor.cc.

301{
302 struct sigaction s;
303 memset(&s, '\0', sizeof(s));
304
305 s.sa_handler = fn;
306 sigemptyset(&s.sa_mask);
307 if (sig == SIGCHLD)
308 s.sa_flags |= SA_NOCLDSTOP; //don't produce signal when children are stopped
309
310 if (sigaction(sig, &s, nullptr) != 0) {
311 B2FATAL("Cannot setup signal handler for signal " << sig);
312 }
313}

◆ killRingBuffers()

void killRingBuffers ( )

signal handler (async-safe)

Fairly abrupt termination after the current event.

Definition at line 92 of file pEventProcessor.cc.

93{
94 m_rbin->kill();
95 //these might be locked by _this_ process, so we cannot escape
96 m_rbout->kill(); //atomic, so doesn't lock
97}
void kill()
Cause termination of reading processes (if they use isDead()).
Definition: RingBuffer.cc:389

◆ preparePaths()

void preparePaths ( )
private

Adds internal modules to paths, prepare RingBuffers.

Definition at line 398 of file pEventProcessor.cc.

399{
400 if (m_histoman) {
401 m_histoman->initialize();
402 }
403 if (not m_mainPath)
404 return; //we'll fall back to single-core
405
406 if (m_inputPath)
408 if (m_outputPath)
410}
RingBuffer * connectViaRingBuffer(const char *name, const PathPtr &a, PathPtr &b)
Create RingBuffer with name from given environment variable, add Tx and Rx modules to a and b.

◆ prependModulesIfNotPresent()

void prependModulesIfNotPresent ( ModulePtrList modules,
const ModulePtrList prependModules 
)
staticprivate

Prepend given 'prependModules' to 'modules', if they're not already present.

Definition at line 453 of file pEventProcessor.cc.

454{
455 for (const ModulePtr& m : prependModules) {
456 if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
457 modules->push_front(m);
458 }
459 }
460}

◆ process()

void process ( const PathPtr spath,
long  maxEvent 
)

Processes the full module chain, starting with the first module in the given path.

Processes all events for the given run number and for events from 0 to maxEvent.

Parameters
spathThe processing starts with the first module of this path.
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events will be processed.

Definition at line 107 of file pEventProcessor.cc.

108{
109 if (spath->getModules().size() == 0) return;
110
111 const int numProcesses = Environment::Instance().getNumberProcesses();
112
113 //Check whether the number of events was set via command line argument
114 unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
115 if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
116 maxEvent = numEventsArgument;
117 }
118
119 if (numProcesses == 0)
120 B2FATAL("pEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
121
122 // 1. Analyze start path and split into parallel paths
123 analyzePath(spath);
124
125
126 if (m_inputPath) {
127 B2INFO("Input Path " << m_inputPath->getPathString());
128 }
129 if (m_mainPath) {
130 if (m_mainPath->getModules().size() <= 5) {
131 B2INFO("Main Path " << m_mainPath->getPathString());
132 } else {
133 B2INFO("Main Path [" << m_mainPath->getModules().front()->getName() << " -> ... (" << m_mainPath->getModules().size() - 2 <<
134 " further modules) ... -> " << m_mainPath->getModules().back()->getName() << " ]");
135 }
136 }
137 if (m_outputPath) {
138 B2INFO("Output Path " << m_outputPath->getPathString());
139 }
140 if (not m_mainPath) {
141 B2WARNING("Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
142 EventProcessor::process(spath, maxEvent);
143 return;
144 }
145
146 installMainSignalHandlers(cleanupAndStop);
147
148 //inserts Rx/Tx modules into path (sets up IPC structures)
149 preparePaths();
150
151 // ensure that we free the IPC resources!
152 atexit(cleanupIPC);
153
154
155 //init statistics
156 {
157 m_processStatisticsPtr.registerInDataStore();
159 m_processStatisticsPtr.create();
160 Path mergedPath;
161 if (m_inputPath)
162 mergedPath.addPath(m_inputPath);
163 mergedPath.addPath(m_mainPath);
164 if (m_outputPath)
165 mergedPath.addPath(m_outputPath);
166 for (const ModulePtr& module : mergedPath.buildModulePathList())
167 m_processStatisticsPtr->initModule(module.get());
168 }
169
170 // 2. Initialization
171 ModulePtrList modulelist = spath->buildModulePathList();
173 //dump_modules("Initializing globally: ", initGlobally);
174 processInitialize(initGlobally);
175
177
178 //Don't start processing in case of no master module
179 if (!m_master) {
180 B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
181 }
182
183 //Check if errors appeared. If yes, don't start the event processing.
185 if (numLogError != 0) {
186 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
187 }
188
189 //disable ROOT's management of TFiles
191
192
193 //install new signal handlers before forking
194 installMainSignalHandlers(parentSignalHandler);
195
196 //Path for current process
197 PathPtr localPath;
198
199 m_procHandler = std::make_unique<ProcHandler>(numProcesses);
200
201 // 3. Fork input path
202 m_procHandler->startInputProcess();
203 if (m_procHandler->isInputProcess()) { // In input process
204 localPath = m_inputPath;
205 } else {
206 // This is not the input path, clean up datastore to not contain the first event
208 }
209
210 if (localPath == nullptr) { //not forked yet
211 // 5. Fork out worker path (parallel section)
212 m_procHandler->startWorkerProcesses();
213 if (m_procHandler->isWorkerProcess()) {
214 localPath = m_mainPath;
215 m_master = localPath->getModules().begin()->get(); //set Rx as master
216 }
217 }
218
219 if (localPath == nullptr) { //not forked yet -> this process is the output process
220 m_procHandler->startOutputProcess();
221 if (m_outputPath) {
222 localPath = m_outputPath;
223 m_master = localPath->getModules().begin()->get(); //set Rx as master
224 }
225 }
226
227
228 if (!m_procHandler->isOutputProcess()) {
230 }
231
232 bool gotSigINT = false;
233 if (localPath != nullptr) {
234 ModulePtrList localModules = localPath->buildModulePathList();
235 ModulePtrList procinitmodules = getModulesWithFlag(localModules, Module::c_InternalSerializer);
236 //dump_modules("processInitialize for ", procinitmodules);
237 processInitialize(procinitmodules);
238
239 //input: handle signals normally, will slowly cascade down
240 if (m_procHandler->isInputProcess())
242 //workers will have to ignore the signals, there's no good way to do this safely
243 if (m_procHandler->isWorkerProcess())
245
246 try {
247 processCore(localPath, localModules, maxEvent, m_procHandler->isInputProcess());
248 } catch (StoppedBySignalException& e) {
249 if (e.signal != SIGINT) {
250 B2FATAL(e.what());
251 }
252 //in case of SIGINT, we move on to processTerminate() to shut down safely
253 gotSigINT = true;
254 }
255 prependModulesIfNotPresent(&localModules, terminateGlobally);
256 processTerminate(localModules);
257 }
258
259 B2INFO(m_procHandler->getProcessName() << " process finished.");
260
261 //all processes except output stop here
262 if (!m_procHandler->isOutputProcess()) {
263 if (gotSigINT) {
264 installSignalHandler(SIGINT, SIG_DFL);
265 raise(SIGINT);
266 } else {
267 exit(0);
268 }
269 }
270
271 //output process: do final cleanup
272 m_procHandler->waitForAllProcesses();
273 B2INFO("All processes completed");
274
275 //finished, disable handler again
276 installSignalHandler(SIGINT, SIG_IGN);
277
278 cleanup();
279 B2INFO("Global process: completed");
280
281 if (m_histoman) {
282 B2INFO("HistoManager:: adding histogram files");
284 }
285
286 //did anything bad happen?
287 if (gSignalReceived) {
288 B2ERROR("Processing aborted via signal " << gSignalReceived <<
289 ", terminating. Output files have been closed safely and should be readable.");
290 installSignalHandler(gSignalReceived, SIG_DFL);
291 raise(gSignalReceived);
292 }
293
294}
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:53
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:714
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:158
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
const Module * m_master
The master module that determines the experiment/run/event number.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:158
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:82
std::list< ModulePtr > getModules() const override
no submodules, return empty list
Definition: Module.h:505
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:67
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:37
static RbTupleManager & Instance()
Access to singleton.
Definition: RbTuple.cc:38
int hadd(bool deleteflag=true)
Functions to add up all histogram files.
Definition: RbTuple.cc:136
void preparePaths()
Adds internal modules to paths, prepare RingBuffers.
static ModulePtrList getModulesWithFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which have the given Module flag set.
std::unique_ptr< ProcHandler > m_procHandler
handler to fork and manage processes.
void analyzePath(const PathPtr &path)
Analyze given path.
void gotSigINT()
signal handler for Ctrl+C (async-safe)
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
static ModulePtrList getModulesWithoutFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which do not have the given Module flag set.
void clearFileList()
TFiles are stored in a global list and cleaned up by root since this will happen in all forked proces...

◆ processBeginRun()

void processBeginRun ( bool  skipDB = false)
protectedinherited

Calls the begin run methods of all modules.

Loops over all module instances specified in a list and calls their beginRun() method. Please note: the beginRun() method of the module which triggered the beginRun() loop will also be called.

Definition at line 470 of file EventProcessor.cc.

471{
472 MetadataService::Instance().addBasf2Status("beginning run");
473
474 m_inRun = true;
475 auto dbsession = Database::Instance().createScopedUpdateSession(); // cppcheck-suppress unreadVariable
476
477 LogSystem& logSystem = LogSystem::Instance();
478 m_processStatisticsPtr->startGlobal();
479
480 if (!skipDB) DBStore::Instance().update();
481
482 //initialize random generator for end run
484
485 for (const ModulePtr& modPtr : m_moduleList) {
486 Module* module = modPtr.get();
487
488 //Set the module dependent log level
489 logSystem.updateModule(&(module->getLogConfig()), module->getName());
490
491 //Do beginRun() call
492 m_processStatisticsPtr->startModule();
493 CALL_MODULE(module, beginRun);
495
496 //Set the global log level
497 logSystem.updateModule(nullptr);
498 }
499
501}
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
ModulePtrList m_moduleList
List of all modules in order initialized.
void addBasf2Status(const std::string &message="")
Add metadata of basf2 status.
static MetadataService & Instance()
Static method to get a reference to the MetadataService instance.
@ c_BeginRun
Counting time/calls in beginRun()
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static Database & Instance()
Instance of a singleton Database.
Definition: Database.cc:41
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:26
void update()
Updates all objects that are outside their interval of validity.
Definition: DBStore.cc:77
ScopeGuard createScopedUpdateSession()
Make sure we have efficient http pipelinging during initialize/beginRun but don't keep session alive ...
Definition: Database.cc:61

◆ processCore()

void processCore ( const PathPtr startPath,
const ModulePtrList modulePathList,
long  maxEvent = 0,
bool  isInputProcess = true 
)
protectedinherited

Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.

Parameters
startPathThe processing starts with the first module of this path.
modulePathListA list of all modules which could be executed during the data processing (used for calling the beginRun() and endRun() method).
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events are processed.
isInputProcesstrue when this is either the only or the input process

Definition at line 409 of file EventProcessor.cc.

410{
412 m_moduleList = modulePathList;
413 //Remember the previous event meta data, and identify end of data meta data
414 m_previousEventMetaData.setEndOfData(); //invalid start state
415
416 const bool collectStats = !Environment::Instance().getNoStats();
417
418 //Loop over the events
419 long currEvent = 0;
420 bool endProcess = false;
421 while (!endProcess) {
422 if (collectStats)
423 m_processStatisticsPtr->startGlobal();
424
425 PathIterator moduleIter(startPath);
426 endProcess = processEvent(moduleIter, isInputProcess && currEvent == 0);
427
428 //Delete event related data in DataStore
430
431 currEvent++;
432 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
433 if (collectStats)
435 } //end event loop
436
437 //End last run
438 m_eventMetaDataPtr.create();
440}
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:93
void setEndOfData()
Marks the end of the data processing.
void processEndRun()
Calls the end run methods of all modules.
bool processEvent(PathIterator moduleIter, bool skipMasterModule)
Calls event() functions on all modules for the current event.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26

◆ processEndRun()

void processEndRun ( )
protectedinherited

Calls the end run methods of all modules.

Loops over all module instances specified in a list and calls their endRun() method. Please note: the endRun() method of the module which triggered the endRun() loop will also be called.

Definition at line 504 of file EventProcessor.cc.

505{
507
508 if (!m_inRun)
509 return;
510 m_inRun = false;
511
512 LogSystem& logSystem = LogSystem::Instance();
513 m_processStatisticsPtr->startGlobal();
514
515 const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
517
518 //initialize random generator for end run
520
521 for (const ModulePtr& modPtr : m_moduleList) {
522 Module* module = modPtr.get();
523
524 //Set the module dependent log level
525 logSystem.updateModule(&(module->getLogConfig()), module->getName());
526
527 //Do endRun() call
528 m_processStatisticsPtr->startModule();
529 CALL_MODULE(module, endRun);
531
532 //Set the global log level
533 logSystem.updateModule(nullptr);
534 }
535 *m_eventMetaDataPtr = newEventMetaData;
536
538}
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
@ c_EndRun
Counting time/calls in endRun()
static void initializeEndRun()
Initialize run independent random generator for end run.

◆ processEvent()

bool processEvent ( PathIterator  moduleIter,
bool  skipMasterModule 
)
protectedinherited

Calls event() functions on all modules for the current event.

Used by processCore.

Parameters
moduleIteriterator of the path containing all the modules
skipMasterModuleskip the execution of the master module, presumably because this is the first event and it's already been done in initialize()
Returns
true if execution should stop.

Definition at line 324 of file EventProcessor.cc.

325{
326 double time = Utils::getClock() / Unit::s;
328 MetadataService::Instance().addBasf2Status("running event loop");
330 }
331
332 const bool collectStats = !Environment::Instance().getNoStats();
333
334 while (!moduleIter.isDone()) {
335 Module* module = moduleIter.get();
336
337 // run the module ... unless we don't want to
338 if (!(skipMasterModule && module == m_master)) {
339 callEvent(module);
340 }
341
342 //Check for end of data
343 if ((m_eventMetaDataPtr && (m_eventMetaDataPtr->isEndOfData())) ||
344 ((module == m_master) && !m_eventMetaDataPtr)) {
345 if ((module != m_master) && !m_steerRootInputModuleOn) {
346 B2WARNING("Event processing stopped by module '" << module->getName() <<
347 "', which is not in control of event processing (does not provide EventMetaData)");
348 }
349 return true;
350 }
351
352 //Handle EventMetaData changes by master module
353 if (module == m_master) {
354
355 //initialize random number state for the event
357
358 //Check for a change of the run
359 if ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) ||
361
362 if (collectStats)
363 m_processStatisticsPtr->suspendGlobal();
364
366 processBeginRun(skipMasterModule);
367
368 if (collectStats)
369 m_processStatisticsPtr->resumeGlobal();
370 }
371
373
374 //make sure we use the event dependent generator again
376
378
379 } else {
380 //Check for a second master module. Cannot do this if we skipped the
381 //master module as the EventMetaData is probably set before we call this
382 //function
383 if (!skipMasterModule && m_eventMetaDataPtr &&
385 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and " << module->getName());
386 }
387 }
388
389 if (gSignalReceived != 0) {
390 throw StoppedBySignalException(gSignalReceived);
391 }
392
393 //Check for the module conditions, evaluate them and if one is true switch to the new path
394 if (module->evalCondition()) {
395 PathPtr condPath = module->getConditionPath();
396 //continue with parent Path after condition path is executed?
397 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
398 moduleIter = PathIterator(condPath, moduleIter);
399 } else {
400 moduleIter = PathIterator(condPath);
401 }
402 } else {
403 moduleIter.next();
404 }
405 } //end module loop
406 return false;
407}
int getRun() const
Run Getter.
int getExperiment() const
Experiment Getter.
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
double m_lastMetadataUpdate
Time in seconds of last call for metadata update in event loop.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
bool m_steerRootInputModuleOn
True if the SteerRootInputModule is in charge for event processing.
double m_metadataUpdateInterval
Minimal time difference in seconds for metadata updates in event loop.
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:186
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static const double s
[second]
Definition: Unit.h:95
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:140
double getClock()
Return current value of the real-time clock.
Definition: Utils.cc:66

◆ processInitialize()

void processInitialize ( const ModulePtrList modulePathList,
bool  setEventInfo = true 
)
protectedinherited

Initializes the modules.

Loops over all module instances specified in a list and calls their initialize() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.
setEventInfoif true the first event call of the master module will be called immediately to load the event info right away so that it's available for subsequent modules

Definition at line 242 of file EventProcessor.cc.

243{
244 LogSystem& logSystem = LogSystem::Instance();
246
247 m_processStatisticsPtr.registerInDataStore();
248 //TODO I might want to overwrite it in initialize (e.g. if read from file)
249 // For parallel processing or subevents, I don't want that, though.
250 // Maybe make this a function argument?
252 m_processStatisticsPtr.create();
253 m_processStatisticsPtr->startGlobal();
254
256
257 // EventExtraInfo is needed in several modules so register it here
258 m_eventExtraInfo.registerInDataStore();
259
260 for (const ModulePtr& modPtr : modulePathList) {
261 Module* module = modPtr.get();
262
263 if (module->hasUnsetForcedParams()) {
264 //error message was printed by module
265 continue;
266 }
267
268 //Set the module dependent log level
269 logSystem.updateModule(&(module->getLogConfig()), module->getName());
271
272 //Do initialization
273 m_processStatisticsPtr->initModule(module);
274 m_processStatisticsPtr->startModule();
275 CALL_MODULE(module, initialize);
277
278 //Set the global log level
279 logSystem.updateModule(nullptr);
280
281 //Check whether this is the master module
282 if (!m_master && DataStore::Instance().getEntry(m_eventMetaDataPtr) != nullptr) {
283 B2DEBUG(100, "Found module providing EventMetaData: " << module->getName());
284 m_master = module;
285 if (setEventInfo) {
286 callEvent(module);
287 // update Database payloads: we now have valid event meta data unless
288 // we don't process any events
290 }
291 }
292
293 if (gSignalReceived != 0) {
294 throw StoppedBySignalException(gSignalReceived);
295 }
296 }
298}
DependencyMap & getDependencyMap()
Return map of dependencies between modules.
Definition: DataStore.h:524
void setModule(const Module &mod)
Set the current module (for getCurrentModuleInfo())
Definition: DependencyMap.h:60
StoreObjPtr< EventExtraInfo > m_eventExtraInfo
event extra info object pointer
@ c_Init
Counting time/calls in initialize()

◆ processTerminate()

void processTerminate ( const ModulePtrList modulePathList)
protectedinherited

Terminates the modules.

Loops over all module instances in reverse order specified in a list and calls their terminate() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.

Definition at line 443 of file EventProcessor.cc.

444{
446
447 LogSystem& logSystem = LogSystem::Instance();
448 ModulePtrList::const_reverse_iterator listIter;
449 m_processStatisticsPtr->startGlobal();
450
451 for (listIter = modulePathList.rbegin(); listIter != modulePathList.rend(); ++listIter) {
452 Module* module = listIter->get();
453
454 //Set the module dependent log level
455 logSystem.updateModule(&(module->getLogConfig()), module->getName());
456
457 //Do termination
458 m_processStatisticsPtr->startModule();
459 CALL_MODULE(module, terminate);
461
462 //Set the global log level
463 logSystem.updateModule(nullptr);
464 }
465
467}
@ c_Term
Counting time/calls in terminate()

◆ setProfileModuleName()

void setProfileModuleName ( const std::string &  name)
inlineinherited

Set the name of the module we want to profile.

Parameters
nameName of the module as returned by getName()

Definition at line 58 of file EventProcessor.h.

58{ m_profileModuleName = name; }
std::string m_profileModuleName
Name of the module which should be profiled, empty if no profiling is requested.

◆ writeToStdErr()

void writeToStdErr ( const char  msg[])
staticinherited

async-safe method to write something to STDERR.

Definition at line 72 of file EventProcessor.cc.

73{
74 //signal handlers are called asynchronously, making many standard functions (including output) dangerous
75 //write() is, however, safe, so we'll use that to write to stderr.
76
77 //strlen() not explicitly in safe list, but doesn't have any error handling routines that might alter global state
78 const int len = strlen(msg);
79
80 int rc = write(STDERR_FILENO, msg, len);
81 (void) rc; //ignore return value (there's nothing we can do about a failed write)
82
83}

Member Data Documentation

◆ m_eventExtraInfo

StoreObjPtr<EventExtraInfo> m_eventExtraInfo
protectedinherited

event extra info object pointer

Definition at line 170 of file EventProcessor.h.

◆ m_eventMetaDataPtr

StoreObjPtr<EventMetaData> m_eventMetaDataPtr
protectedinherited

EventMetaData is used by processEvent()/processCore().

Definition at line 164 of file EventProcessor.h.

◆ m_histoman

ModulePtr m_histoman
private

Pointer to HistoManagerModule, or nullptr if not found.

Definition at line 108 of file pEventProcessor.h.

◆ m_inputPath

PathPtr m_inputPath
private

Input path.

Definition at line 96 of file pEventProcessor.h.

◆ m_inRun

bool m_inRun
protectedinherited

Are we currently in a run? If yes, processEndRun() needs to do something.

Definition at line 176 of file EventProcessor.h.

◆ m_lastMetadataUpdate

double m_lastMetadataUpdate
protectedinherited

Time in seconds of last call for metadata update in event loop.

Definition at line 179 of file EventProcessor.h.

◆ m_mainPath

PathPtr m_mainPath
private

Main (parallel section) path.

Definition at line 98 of file pEventProcessor.h.

◆ m_master

const Module* m_master
protectedinherited

The master module that determines the experiment/run/event number.

Definition at line 154 of file EventProcessor.h.

◆ m_metadataUpdateInterval

double m_metadataUpdateInterval
protectedinherited

Minimal time difference in seconds for metadata updates in event loop.

Definition at line 182 of file EventProcessor.h.

◆ m_moduleList

ModulePtrList m_moduleList
protectedinherited

List of all modules in order initialized.

Definition at line 155 of file EventProcessor.h.

◆ m_outputPath

PathPtr m_outputPath
private

Output path.

Definition at line 100 of file pEventProcessor.h.

◆ m_previousEventMetaData

EventMetaData m_previousEventMetaData
protectedinherited

Stores state of EventMetaData before it was last changed.

Useful since processEndRun() needs info about which run it needs to end.

Definition at line 167 of file EventProcessor.h.

◆ m_processStatisticsPtr

StoreObjPtr<ProcessStatistics> m_processStatisticsPtr
protectedinherited

Also used in a number of places.

Definition at line 173 of file EventProcessor.h.

◆ m_procHandler

std::unique_ptr<ProcHandler> m_procHandler
private

handler to fork and manage processes.

Definition at line 93 of file pEventProcessor.h.

◆ m_profileModule

Module* m_profileModule = nullptr
protectedinherited

Address of the module which we want to profile, nullptr if no profiling is requested.

Definition at line 161 of file EventProcessor.h.

◆ m_profileModuleName

std::string m_profileModuleName
protectedinherited

Name of the module which should be profiled, empty if no profiling is requested.

Definition at line 158 of file EventProcessor.h.

◆ m_rbin

RingBuffer* m_rbin = nullptr
private

input RingBuffer

Definition at line 103 of file pEventProcessor.h.

◆ m_rbout

RingBuffer* m_rbout = nullptr
private

output RingBuffer

Definition at line 105 of file pEventProcessor.h.

◆ m_steerRootInputModuleOn

bool m_steerRootInputModuleOn = false
protectedinherited

True if the SteerRootInputModule is in charge for event processing.

Definition at line 185 of file EventProcessor.h.


The documentation for this class was generated from the following files: