 |
Belle II Software
release-05-01-25
|
9 #include <framework/pcore/pEventProcessor.h>
10 #include <framework/pcore/ProcHandler.h>
11 #include <framework/pcore/RingBuffer.h>
12 #include <framework/pcore/RxModule.h>
13 #include <framework/pcore/TxModule.h>
14 #include <framework/pcore/DataStoreStreamer.h>
15 #include <framework/pcore/RbTuple.h>
17 #include <framework/core/ModuleManager.h>
18 #include <framework/core/Environment.h>
19 #include <framework/logging/LogSystem.h>
31 static int gSignalReceived = 0;
37 if (g_pEventProcessor)
40 void cleanupAndStop(
int sig)
49 static void parentSignalHandler(
int signal)
52 if (signal == SIGINT) {
54 }
else if (signal == SIGTERM or signal == SIGQUIT) {
57 if (gSignalReceived == 0)
58 gSignalReceived = signal;
65 g_pEventProcessor =
this;
72 g_pEventProcessor =
nullptr;
103 gROOT->GetListOfFiles()->Clear(
"nodelete");
109 if (spath->getModules().size() == 0)
return;
115 if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
116 maxEvent = numEventsArgument;
119 if (numProcesses == 0)
120 B2FATAL(
"pEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
127 B2INFO(
"Input Path " <<
m_inputPath->getPathString());
131 B2INFO(
"Main Path " <<
m_mainPath->getPathString());
133 B2INFO(
"Main Path [" <<
m_mainPath->getModules().front()->getName() <<
" -> ... (" <<
m_mainPath->getModules().size() - 2 <<
134 " further modules) ... -> " <<
m_mainPath->getModules().back()->getName() <<
" ]");
138 B2INFO(
"Output Path " <<
m_outputPath->getPathString());
141 B2WARNING(
"Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
180 B2ERROR(
"There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
185 if (numLogError != 0) {
186 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
210 if (localPath ==
nullptr) {
219 if (localPath ==
nullptr) {
233 if (localPath !=
nullptr) {
234 ModulePtrList localModules = localPath->buildModulePathList();
249 if (e.signal != SIGINT) {
259 B2INFO(
m_procHandler->getProcessName() <<
" process finished.");
273 B2INFO(
"All processes completed");
279 B2INFO(
"Global process: completed");
282 B2INFO(
"HistoManager:: adding histogram files");
287 if (gSignalReceived) {
288 B2ERROR(
"Processing aborted via signal " << gSignalReceived <<
289 ", terminating. Output files have been closed safely and should be readable.");
291 raise(gSignalReceived);
299 std::set<std::string> uselessParallelModules({
"HistoManager",
"Gearbox",
"Geometry"});
306 for (
const ModulePtr& module : path->getModules()) {
309 if (hasParallelFlag and module->hasCondition()) {
310 for (
const auto& conditionPath : module->getAllConditionPaths()) {
312 hasParallelFlag =
false;
318 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
322 bool path_is_useful =
false;
323 for (
const auto& parallelModule : mainpath->getModules()) {
324 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
325 path_is_useful =
true;
329 if (not path_is_useful) {
331 inpath->addPath(mainpath);
332 mainpath.reset(
new Path);
340 inpath->addModule(module);
353 mainpath->addModule(module);
355 outpath->addModule(module);
358 bool createAllPaths =
false;
359 for (
const ModulePtr& module : path->getModules()) {
361 createAllPaths =
true;
365 if (!mainpath->isEmpty())
367 if (createAllPaths or !inpath->isEmpty())
369 if (createAllPaths or !outpath->isEmpty())
376 const char* inrbname = getenv(name);
378 if (inrbname ==
nullptr) {
381 string rbname(inrbname + to_string(0));
391 newB->addModule(rxptr);
415 ModulePtrList::const_iterator it;
416 std::ostringstream strbuf;
417 strbuf << title <<
" : ";
418 for (it = modlist.begin(); it != modlist.end(); ++it) {
419 const Module* module = it->get();
420 strbuf << module->getName();
421 if (module->hasCondition()) {
422 for (
const auto& conditionPath : module->getAllConditionPaths()) {
423 strbuf <<
"[->" << conditionPath.get() <<
"] ";
426 if (*it != modlist.back())
429 B2INFO(strbuf.str());
437 if (m->hasProperties(flag))
438 tmpModuleList.push_back(m);
441 return tmpModuleList;
447 if (!m->hasProperties(flag))
448 tmpModuleList.push_back(m);
450 return tmpModuleList;
455 for (
const ModulePtr& m : prependModules) {
456 if (std::find(modules->begin(), modules->end(), m) == modules->end()) {
457 modules->push_front(m);
void cleanup()
clean up IPC resources (should only be called in one process).
ModulePtr m_histoman
Pointer to HistoManagerModule, or nullptr if not found.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
static ModulePtrList getModulesWithoutFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which do not have the given Module flag set.
RingBuffer * m_rbin
input RingBuffer
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static DataStore & Instance()
Instance of singleton Store.
Module to decode data store contents from RingBuffer.
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Module for encoding data store contents into a RingBuffer.
This class provides the core event processing loop for parallel processing.
std::unique_ptr< ProcHandler > m_procHandler
handler to fork and manage processes.
void clearFileList()
TFiles are stored in a global list and cleaned up by root since this will happen in all forked proces...
static ModulePtrList getModulesWithFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which have the given Module flag set.
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
static bool isOutputProcess()
Return true if the process is an output process.
Exception thrown when execution is stopped by a signal.
void gotSigINT()
signal handler for Ctrl+C (async-safe)
PathPtr m_outputPath
Output path.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
Class to manage a Ring Buffer placed in an IPC shared memory.
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain, starting with the first module in the given path.
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Abstract base class for different kinds of events.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
void kill()
Cause termination of reading processes (if they use isDead()).
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
@ c_Error
Error: for things that went wrong and have to be fixed.
void analyzePath(const PathPtr &path)
Analyze given path.
const static int c_DefaultSize
Standard size of buffer, in integers (~60MB).
std::list< ModulePtr > getModules() const override
no submodules, return empty list
static RbTupleManager & Instance()
Access to singleton.
virtual ~pEventProcessor()
Destructor.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
int hadd(bool deleteflag=true)
Functions to add up all histogram files.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
EModulePropFlags
Each module can be tagged with property flags, which indicate certain features of the module.
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
void preparePaths()
Adds internal modules to paths, prepare RingBuffers.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
Implements a path consisting of Module and/or Path objects.
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
RingBuffer * connectViaRingBuffer(const char *name, const PathPtr &a, PathPtr &b)
Create RingBuffer with name from given environment variable, add Tx and Rx modules to a and b.
static Environment & Instance()
Static method to get a reference to the Environment instance.
void dump_modules(const std::string &, const ModulePtrList &)
Dump module names in the ModulePtrList.
const Module * m_master
The master module that determines the experiment/run/event number.
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
RingBuffer * m_rbout
output RingBuffer
provides the core event processing loop.
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
PathPtr m_inputPath
Input path.
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
PathPtr m_mainPath
Main (parallel section) path.
void killRingBuffers()
signal handler (async-safe)