9 #include <framework/pcore/pEventProcessor.h> 
   10 #include <framework/pcore/ProcHandler.h> 
   11 #include <framework/pcore/RingBuffer.h> 
   12 #include <framework/pcore/RxModule.h> 
   13 #include <framework/pcore/TxModule.h> 
   14 #include <framework/pcore/DataStoreStreamer.h> 
   15 #include <framework/pcore/RbTuple.h> 
   17 #include <framework/core/ModuleManager.h> 
   18 #include <framework/core/Environment.h> 
   19 #include <framework/logging/LogSystem.h> 
   31   static int gSignalReceived = 0;
 
   37     if (g_pEventProcessor)
 
   40   void cleanupAndStop(
int sig)
 
   49   static void parentSignalHandler(
int signal)
 
   52     if (signal == SIGINT) {
 
   54     } 
else if (signal == SIGTERM or signal == SIGQUIT) {
 
   57     if (gSignalReceived == 0)
 
   58       gSignalReceived = signal;
 
   65   g_pEventProcessor = 
this;
 
   72   g_pEventProcessor = 
nullptr;
 
  103   gROOT->GetListOfFiles()->Clear(
"nodelete");
 
  109   if (spath->getModules().size() == 0) 
return;
 
  115   if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
 
  116     maxEvent = numEventsArgument;
 
  119   if (numProcesses == 0)
 
  120     B2FATAL(
"pEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
 
  127     B2INFO(
"Input Path " << 
m_inputPath->getPathString());
 
  131       B2INFO(
"Main Path " << 
m_mainPath->getPathString());
 
  133       B2INFO(
"Main Path [" << 
m_mainPath->getModules().front()->getName() << 
" -> ... (" << 
m_mainPath->getModules().size() - 2 <<
 
  134              " further modules) ... -> " << 
m_mainPath->getModules().back()->getName() << 
" ]");
 
  138     B2INFO(
"Output Path " << 
m_outputPath->getPathString());
 
  141     B2WARNING(
"Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
 
  180     B2ERROR(
"There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
 
  185   if (numLogError != 0) {
 
  186     B2FATAL(numLogError << 
" ERROR(S) occurred! The processing of events will not be started.");
 
  210   if (localPath == 
nullptr) { 
 
  219   if (localPath == 
nullptr) { 
 
  233   if (localPath != 
nullptr) {
 
  234     ModulePtrList localModules = localPath->buildModulePathList();
 
  259   B2INFO(
m_procHandler->getProcessName() << 
" process finished.");
 
  273   B2INFO(
"All processes completed");
 
  279   B2INFO(
"Global process: completed");
 
  282     B2INFO(
"HistoManager:: adding histogram files");
 
  287   if (gSignalReceived) {
 
  288     B2ERROR(
"Processing aborted via signal " << gSignalReceived <<
 
  289             ", terminating. Output files have been closed safely and should be readable.");
 
  291     raise(gSignalReceived);
 
  299   std::set<std::string> uselessParallelModules({
"HistoManager", 
"Gearbox", 
"Geometry"});
 
  306   for (
const ModulePtr& module : path->getModules()) {
 
  309     if (hasParallelFlag and module->hasCondition()) {
 
  310       for (
const auto& conditionPath : module->getAllConditionPaths()) {
 
  312           hasParallelFlag = 
false;
 
  318     if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
 
  322         bool path_is_useful = 
false;
 
  323         for (
const auto& parallelModule : mainpath->getModules()) {
 
  324           if (uselessParallelModules.count(parallelModule->getType()) == 0) {
 
  325             path_is_useful = 
true;
 
  329         if (not path_is_useful) {
 
  331           inpath->addPath(mainpath);
 
  332           mainpath.reset(
new Path);
 
  340       inpath->addModule(module);
 
  353       mainpath->addModule(module);
 
  355       outpath->addModule(module);
 
  358   bool createAllPaths = 
false; 
 
  359   for (
const ModulePtr& module : path->getModules()) {
 
  361       createAllPaths = 
true; 
 
  365   if (!mainpath->isEmpty())
 
  367   if (createAllPaths or !inpath->isEmpty())
 
  369   if (createAllPaths or !outpath->isEmpty())
 
  376   const char* inrbname = getenv(name);
 
  378   if (inrbname == 
nullptr) {
 
  381     string rbname(inrbname + to_string(0)); 
 
  391   newB->addModule(rxptr);
 
  415   ModulePtrList::const_iterator it;
 
  416   std::ostringstream strbuf;
 
  417   strbuf << title << 
" : ";
 
  418   for (it = modlist.begin(); it != modlist.end(); ++it) {
 
  419     const Module* module = it->get();
 
  420     strbuf << module->getName();
 
  421     if (module->hasCondition()) {
 
  422       for (
const auto& conditionPath : module->getAllConditionPaths()) {
 
  423         strbuf << 
"[->" << conditionPath.get() << 
"] ";
 
  426     if (*it != modlist.back())
 
  429   B2INFO(strbuf.str());
 
  437     if (m->hasProperties(flag))
 
  438       tmpModuleList.push_back(m);
 
  441   return tmpModuleList;
 
  447     if (!m->hasProperties(flag))
 
  448       tmpModuleList.push_back(m);
 
  450   return tmpModuleList;
 
  455   for (
const ModulePtr& m : prependModules) {
 
  456     if (std::find(modules->begin(), modules->end(), m) == modules->end()) { 
 
  457       modules->push_front(m);
 
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
static Environment & Instance()
Static method to get a reference to the Environment instance.
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Exception thrown when execution is stopped by a signal.
int signal
see 'man 7 signal'.
provides the core event processing loop.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
const Module * m_master
The master module that determines the experiment/run/event number.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
@ c_Error
Error: for things that went wrong and have to be fixed.
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
std::list< ModulePtr > getModules() const override
no submodules, return empty list
EModulePropFlags
Each module can be tagged with property flags, which indicate certain features of the module.
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Implements a path consisting of Module and/or Path objects.
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
static bool isOutputProcess()
Return true if the process is an output process.
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
static RbTupleManager & Instance()
Access to singleton.
int hadd(bool deleteflag=true)
Functions to add up all histogram files.
Class to manage a Ring Buffer placed in an IPC shared memory.
static const int c_DefaultSize
Standard size of buffer, in integers (~60MB).
void kill()
Cause termination of reading processes (if they use isDead()).
Module to decode data store contents from RingBuffer.
Module for encoding data store contents into a RingBuffer.
This class provides the core event processing loop for parallel processing.
void preparePaths()
Adds internal modules to paths, prepare RingBuffers.
void dump_modules(const std::string &, const ModulePtrList &)
Dump module names in the ModulePtrList.
PathPtr m_outputPath
Output path.
static ModulePtrList getModulesWithFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which have the given Module flag set.
RingBuffer * m_rbin
input RingBuffer
RingBuffer * m_rbout
output RingBuffer
std::unique_ptr< ProcHandler > m_procHandler
handler to fork and manage processes.
void cleanup()
clean up IPC resources (should only be called in one process).
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain, starting with the first module in the given path.
RingBuffer * connectViaRingBuffer(const char *name, const PathPtr &a, PathPtr &b)
Create RingBuffer with name from given environment variable, add Tx and Rx modules to a and b.
ModulePtr m_histoman
Pointer to HistoManagerModule, or nullptr if not found.
void analyzePath(const PathPtr &path)
Analyze given path.
void gotSigINT()
signal handler for Ctrl+C (async-safe)
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
virtual ~pEventProcessor()
Destructor.
PathPtr m_mainPath
Main (parallel section) path.
static ModulePtrList getModulesWithoutFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which do not have the given Module flag set.
PathPtr m_inputPath
Input path.
void killRingBuffers()
signal handler (async-safe)
void clearFileList()
TFiles are stored in a global list and cleaned up by root since this will happen in all forked proces...
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Abstract base class for different kinds of events.