Belle II Software development
ZMQEventProcessor Class Reference

This class provides the core event processing loop for parallel processing with ZMQ. More...

#include <ZMQEventProcessor.h>

Inheritance diagram for ZMQEventProcessor:
EventProcessor

Public Member Functions

 ZMQEventProcessor ()
 Init the socket cleaning at exit.
 
virtual ~ZMQEventProcessor ()
 Make sure we remove all sockets cleanly.
 
void process (const PathPtr &spath, long maxEvent)
 Processes the full module chain using parallel processing, starting with the first module in the given path.
 
void cleanup ()
 clean up IPC resources (should only be called in one process).
 
void setProfileModuleName (const std::string &name)
 Set the name of the module we want to profile.
 

Static Public Member Functions

static void writeToStdErr (const char msg[])
 async-safe method to write something to STDERR.
 
static void installSignalHandler (int sig, void(*fn)(int))
 Install a signal handler 'fn' for given signal.
 
static void installMainSignalHandlers (void(*fn)(int)=nullptr)
 Install signal handler for INT, TERM and QUIT signals.
 

Protected Member Functions

void processInitialize (const ModulePtrList &modulePathList, bool setEventInfo=true)
 Initializes the modules.
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
 Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.
 
bool processEvent (PathIterator moduleIter, bool skipMasterModule)
 Calls event() functions on all modules for the current event.
 
void callEvent (Module *module)
 Calls event() on one single module, setting up logging and statistics as needed.
 
void processTerminate (const ModulePtrList &modulePathList)
 Terminates the modules.
 
long getMaximumEventNumber (long maxEvent) const
 Calculate the maximum event number out of the argument from command line and the environment.
 

Protected Attributes

const Modulem_master
 The master module that determines the experiment/run/event number.
 
ModulePtrList m_moduleList
 List of all modules in order initialized.
 
std::string m_profileModuleName
 Name of the module which should be profiled, empty if no profiling is requested.
 
Modulem_profileModule = nullptr
 Address of the module which we want to profile, nullptr if no profiling is requested.
 
StoreObjPtr< EventMetaDatam_eventMetaDataPtr
 EventMetaData is used by processEvent()/processCore().
 
StoreObjPtr< EventExtraInfom_eventExtraInfo
 event extra info object pointer
 
StoreObjPtr< ProcessStatisticsm_processStatisticsPtr
 Also used in a number of places.
 
bool m_inRun
 Are we currently in a run? If yes, processEndRun() needs to do something.
 
double m_lastMetadataUpdate
 Time in seconds of last call for metadata update in event loop.
 
double m_metadataUpdateInterval
 Minimal time difference in seconds for metadata updates in event loop.
 
bool m_steerRootInputModuleOn = false
 True if the SteerRootInputModule is in charge for event processing.
 

Private Member Functions

void initialize (const ModulePtrList &moduleList, const ModulePtr &histogramManager)
 First step in the process: init the module in the list.
 
void forkAndRun (long maxEvent, const PathPtr &inputPath, const PathPtr &mainPath, const PathPtr &outputPath, const ModulePtrList &terminateGlobally)
 Second step in the process: fork out the processes we need to have and call the event loop.
 
void terminateAndCleanup (const ModulePtr &histogramManager)
 Last step in the process: run the termination and cleanup (kill all remaining processes)
 
void runMonitoring (const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Start the monitoring (without forking)
 
void runInput (const PathPtr &inputPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Fork out the input process.
 
void runOutput (const PathPtr &outputPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Fork out the output process.
 
void runWorker (unsigned int numProcesses, const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Fork out the N worker process.
 
void processPath (const PathPtr &localPath, const ModulePtrList &terminateGlobally, long maxEvent)
 Basic function run in every process: process the event loop of the given path.
 
void processCore (const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true, bool isWorkerProcess=false, bool isOutputProcess=false)
 Process modules in the path.
 
bool processEvent (PathIterator moduleIter, bool skipMasterModule, bool Worker=false, bool output=false)
 Calls Event function.
 
void processBeginRun (bool skipDB=false)
 Calls BeginRun function.
 
void processEndRun ()
 Calls EndRun function.
 

Private Attributes

ProcessMonitor m_processMonitor
 Instance of the process monitor.
 
EventMetaData m_previousEventMetaData
 Stores previous eventMetaData.
 

Detailed Description

This class provides the core event processing loop for parallel processing with ZMQ.

Definition at line 23 of file ZMQEventProcessor.h.

Constructor & Destructor Documentation

◆ ZMQEventProcessor()

Init the socket cleaning at exit.

Definition at line 101 of file ZMQEventProcessor.cc.

102{
103 B2ASSERT("You are having two instances of the ZMQEventProcessor running! This is not possible",
104 not g_eventProcessorForSignalHandling);
105 g_eventProcessorForSignalHandling = this;
106
107 // Make sure to remove the sockets
108 g_socketAddress = Environment::Instance().getZMQSocketAddress();
109 std::atexit(deleteSocketFiles);
110}
const std::string & getZMQSocketAddress() const
Socket address to use in ZMQ.
Definition: Environment.h:261
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28

◆ ~ZMQEventProcessor()

~ZMQEventProcessor ( )
virtual

Make sure we remove all sockets cleanly.

Definition at line 112 of file ZMQEventProcessor.cc.

113{
114 cleanup();
115 g_eventProcessorForSignalHandling = nullptr;
116}
void cleanup()
clean up IPC resources (should only be called in one process).

Member Function Documentation

◆ callEvent()

void callEvent ( Module module)
protectedinherited

Calls event() on one single module, setting up logging and statistics as needed.

Parameters
moduleModule to call the event() function

Definition at line 226 of file EventProcessor.cc.

227{
228 LogSystem& logSystem = LogSystem::Instance();
229 const bool collectStats = !Environment::Instance().getNoStats();
230 // set up logging
231 logSystem.updateModule(&(module->getLogConfig()), module->getName());
232 // set up statistics is requested
233 if (collectStats) m_processStatisticsPtr->startModule();
234 // call module
235 CALL_MODULE(module, event);
236 // stop timing
237 if (collectStats) m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_Event);
238 // reset logging
239 logSystem.updateModule(nullptr);
240};
bool getNoStats() const
Disable collection of statistics during event processing.
Definition: Environment.h:200
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:28
@ c_Event
Counting time/calls in event()

◆ cleanup()

void cleanup ( )

clean up IPC resources (should only be called in one process).

Definition at line 422 of file ZMQEventProcessor.cc.

423{
425 B2DEBUG(30, "Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
426 return;
427 }
430
431 deleteSocketFiles();
432}
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
static std::string getProcessName()
Get a human readable name for this process. (input, event, output...).
void terminate()
Terminate the processing.
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
ProcessMonitor m_processMonitor
Instance of the process monitor.
@ c_Monitor
Monitoring Process.
@ c_Init
Before the forks, the process is in init state.

◆ forkAndRun()

void forkAndRun ( long  maxEvent,
const PathPtr inputPath,
const PathPtr mainPath,
const PathPtr outputPath,
const ModulePtrList terminateGlobally 
)
private

Second step in the process: fork out the processes we need to have and call the event loop.

Definition at line 400 of file ZMQEventProcessor.cc.

402{
403 const int numProcesses = Environment::Instance().getNumberProcesses();
404 GlobalProcHandler::initialize(numProcesses);
405
406 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
407
408 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
409 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
410 const auto controlSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_control));
411
412 // We catch all signals and store them into a variable. This is used during the main loop then.
413 // From now on, we have to make sure to clean up behind us
414 installMainSignalHandlers(cleanupAndRaiseSignal);
415 m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
416
417 runInput(inputPath, terminateGlobally, maxEvent);
418 runOutput(outputPath, terminateGlobally, maxEvent);
419 runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
420}
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:158
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
void runMonitoring(const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Start the monitoring (without forking)
void runInput(const PathPtr &inputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the input process.
void runOutput(const PathPtr &outputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the output process.
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
@ c_pub
Output socket.

◆ getMaximumEventNumber()

long getMaximumEventNumber ( long  maxEvent) const
protectedinherited

Calculate the maximum event number out of the argument from command line and the environment.

Definition at line 113 of file EventProcessor.cc.

114{
115 //Check whether the number of events was set via command line argument
116 unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
117 if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
118 return numEventsArgument;
119 }
120 return maxEvent;
121}
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Definition: Environment.h:68

◆ initialize()

void initialize ( const ModulePtrList moduleList,
const ModulePtr histogramManager 
)
private

First step in the process: init the module in the list.

TFiles are stored in a global list and cleaned up by root since this will happen in all forked processes, these will be corrupted if we don't clean the list!

needs to be called at the end of every process.

Definition at line 170 of file ZMQEventProcessor.cc.

171{
172 if (histogramManager) {
173 histogramManager->initialize();
174 }
175 // from now on the datastore is available
176 processInitialize(moduleList, true);
177
178 B2INFO("ZMQEventProcessor : processInitialize done");
179
180 // Don't start processing in case of no master module
181 if (!m_master) {
182 B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
183 }
184
185 // Check if errors appeared. If yes, don't start the event processing.
187 if (numLogError != 0) {
188 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
189 }
190
191 // TODO: I do not really understand what is going on here...
197 // disable ROOT's management of TFiles
198 // clear list, but don't actually delete the objects
199 gROOT->GetListOfFiles()->Clear("nodelete");
200}
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
const Module * m_master
The master module that determines the experiment/run/event number.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:158

◆ installMainSignalHandlers()

void installMainSignalHandlers ( void(*)(int)  fn = nullptr)
staticinherited

Install signal handler for INT, TERM and QUIT signals.

If argument is NULL, EventProcessor's own signal handler will be installed.

Definition at line 315 of file EventProcessor.cc.

316{
317 if (!fn)
318 fn = signalHandler;
319 installSignalHandler(SIGINT, fn);
320 installSignalHandler(SIGTERM, fn);
321 installSignalHandler(SIGQUIT, fn);
322}
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.

◆ installSignalHandler()

void installSignalHandler ( int  sig,
void(*)(int)  fn 
)
staticinherited

Install a signal handler 'fn' for given signal.

Definition at line 300 of file EventProcessor.cc.

301{
302 struct sigaction s;
303 memset(&s, '\0', sizeof(s));
304
305 s.sa_handler = fn;
306 sigemptyset(&s.sa_mask);
307 if (sig == SIGCHLD)
308 s.sa_flags |= SA_NOCLDSTOP; //don't produce signal when children are stopped
309
310 if (sigaction(sig, &s, nullptr) != 0) {
311 B2FATAL("Cannot setup signal handler for signal " << sig);
312 }
313}

◆ process()

void process ( const PathPtr spath,
long  maxEvent 
)

Processes the full module chain using parallel processing, starting with the first module in the given path.

Definition at line 118 of file ZMQEventProcessor.cc.

119{
120 // Concerning signal handling:
121 // * During the initialization, we just raise the signal without doing any cleanup etc.
122 // * During the event execution, we will not allow for any signal in all processes except the parent process.
123 // Here, we catch sigint and clean up the processes AND WHAT DO WE DO IN THE OTHER CASES?
124 // * During cleanup, we will just ignore sigint, but the rest will be raised
125
126 if (path->isEmpty()) {
127 return;
128 }
129
130 const int numProcesses = Environment::Instance().getNumberProcesses();
131 if (numProcesses == 0) {
132 B2FATAL("ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
133 }
134
135 // Split the path into input, main and output. A nullptr means, the path should not be used
136 PathPtr inputPath, mainPath, outputPath;
137 std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
138 const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath);
139
140 // Check for existence of HLTZMQ2Ds module in input path to set DAQ environment
141 for (const ModulePtr& module : inputPath->getModules()) {
142 if (module->getName() == "HLTZMQ2Ds") {
144 B2INFO("ZMQEventProcessor : DAQ environment set");
145 break;
146 }
147 }
148
149 if (not mainPath or mainPath->isEmpty()) {
150 B2WARNING("Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
151 EventProcessor::process(path, maxEvent);
152 return;
153 }
154
155 // inserts Rx/Tx modules into path (sets up IPC structures)
156 const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
157
158 // Run the initialization of the modules and the histogram manager
159 initialize(moduleList, histogramManager);
160
161 // The main part: fork into the different processes and run!
162 const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
163 forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
164
165 installMainSignalHandlers(cleanupAndRaiseSignal);
166 // Run the final termination and cleanup with error check
167 terminateAndCleanup(histogramManager);
168}
void setZMQDAQEnvironment(bool zmqDAQ)
Set DAQ environment.
Definition: Environment.h:352
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
Definition: PathUtils.cc:196
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
Definition: PathUtils.cc:112
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
Definition: PathUtils.cc:16
static ModulePtr getHistogramManager(PathPtr &inputPath)
Find the histogram manager in the paths and return it.
Definition: PathUtils.cc:97
void terminateAndCleanup(const ModulePtr &histogramManager)
Last step in the process: run the termination and cleanup (kill all remaining processes)
void initialize(const ModulePtrList &moduleList, const ModulePtr &histogramManager)
First step in the process: init the module in the list.
void forkAndRun(long maxEvent, const PathPtr &inputPath, const PathPtr &mainPath, const PathPtr &outputPath, const ModulePtrList &terminateGlobally)
Second step in the process: fork out the processes we need to have and call the event loop.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:583

◆ processBeginRun()

void processBeginRun ( bool  skipDB = false)
private

Calls BeginRun function.

Definition at line 622 of file ZMQEventProcessor.cc.

623{
624 MetadataService::Instance().addBasf2Status("beginning run");
625
626 m_inRun = true;
627
628 LogSystem& logSystem = LogSystem::Instance();
629 m_processStatisticsPtr->startGlobal();
630
631 if (!skipDB) DBStore::Instance().update();
632
633 // initialize random generator for end run
635
636 for (const ModulePtr& modPtr : m_moduleList) {
637 Module* module = modPtr.get();
638
639 //Set the module dependent log level
640 logSystem.updateModule(&(module->getLogConfig()), module->getName());
641
642 //Do beginRun() call
643 m_processStatisticsPtr->startModule();
644 module->beginRun();
646
647 //Set the global log level
648 logSystem.updateModule(nullptr);
649 }
650
652}
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
ModulePtrList m_moduleList
List of all modules in order initialized.
void addBasf2Status(const std::string &message="")
Add metadata of basf2 status.
static MetadataService & Instance()
Static method to get a reference to the MetadataService instance.
@ c_BeginRun
Counting time/calls in beginRun()
Base class for Modules.
Definition: Module.h:72
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:26
void update()
Updates all objects that are outside their interval of validity.
Definition: DBStore.cc:77

◆ processCore() [1/2]

void processCore ( const PathPtr startPath,
const ModulePtrList modulePathList,
long  maxEvent = 0,
bool  isInputProcess = true 
)
protectedinherited

Processes the full module chain consisting of an arbitrary number of connected paths, starting with the first module in the specified path.

Parameters
startPathThe processing starts with the first module of this path.
modulePathListA list of all modules which could be executed during the data processing (used for calling the beginRun() and endRun() method).
maxEventThe maximum number of events that will be processed. If the number is smaller or equal 0, all events are processed.
isInputProcesstrue when this is either the only or the input process

Definition at line 409 of file EventProcessor.cc.

410{
412 m_moduleList = modulePathList;
413 //Remember the previous event meta data, and identify end of data meta data
414 m_previousEventMetaData.setEndOfData(); //invalid start state
415
416 const bool collectStats = !Environment::Instance().getNoStats();
417
418 //Loop over the events
419 long currEvent = 0;
420 bool endProcess = false;
421 while (!endProcess) {
422 if (collectStats)
423 m_processStatisticsPtr->startGlobal();
424
425 PathIterator moduleIter(startPath);
426 endProcess = processEvent(moduleIter, isInputProcess && currEvent == 0);
427
428 //Delete event related data in DataStore
430
431 currEvent++;
432 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
433 if (collectStats)
435 } //end event loop
436
437 //End last run
438 m_eventMetaDataPtr.create();
440}
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:53
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:93
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:714
void setEndOfData()
Marks the end of the data processing.
void processEndRun()
Calls the end run methods of all modules.
bool processEvent(PathIterator moduleIter, bool skipMasterModule)
Calls event() functions on all modules for the current event.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26

◆ processCore() [2/2]

void processCore ( const PathPtr startPath,
const ModulePtrList modulePathList,
long  maxEvent = 0,
bool  isInputProcess = true,
bool  isWorkerProcess = false,
bool  isOutputProcess = false 
)
private

Process modules in the path.

Definition at line 434 of file ZMQEventProcessor.cc.

436{
438 m_moduleList = modulePathList;
439
440 //Remember the previous event meta data, and identify end of data meta data
441 m_previousEventMetaData.setEndOfData(); //invalid start state
442
443 const bool collectStats = !Environment::Instance().getNoStats();
444
445 //Loop over the events
446 long currEvent = 0;
447 bool endProcess = false;
448 while (!endProcess) {
449 if (collectStats)
450 m_processStatisticsPtr->startGlobal();
451
452 PathIterator moduleIter(startPath);
453
454 if (isInputProcess) {
455 endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
456 } else if (isWorkerProcess) {
457 endProcess = ZMQEventProcessor::processEvent(moduleIter, false,
458 isWorkerProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
459 } else if (isOutputProcess) {
460 endProcess = ZMQEventProcessor::processEvent(moduleIter, false, false,
461 isOutputProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
462 } else {
463 B2INFO("processCore : should not come here. Specified path is invalid");
464 return;
465 }
466
467 //Delete event related data in DataStore
469
470 currEvent++;
471 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
472 if (collectStats)
474 } //end event loop
475
476 //End last run
477 m_eventMetaDataPtr.create();
478 B2INFO("processCore : End Last Run. calling processEndRun()");
480}
void processEndRun()
Calls EndRun function.
bool processEvent(PathIterator moduleIter, bool skipMasterModule, bool Worker=false, bool output=false)
Calls Event function.
EventMetaData m_previousEventMetaData
Stores previous eventMetaData.

◆ processEndRun()

void processEndRun ( )
private

Calls EndRun function.

Definition at line 655 of file ZMQEventProcessor.cc.

656{
658
659 if (!m_inRun)
660 return;
661 m_inRun = false;
662
663 LogSystem& logSystem = LogSystem::Instance();
664 m_processStatisticsPtr->startGlobal();
665
666 const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
667
668 //initialize random generator for end run
670
671 for (const ModulePtr& modPtr : m_moduleList) {
672 Module* module = modPtr.get();
673
674 //Set the module dependent log level
675 logSystem.updateModule(&(module->getLogConfig()), module->getName());
676
677 //Do endRun() call
678 m_processStatisticsPtr->startModule();
679 module->endRun();
681
682 //Set the global log level
683 logSystem.updateModule(nullptr);
684 }
685 *m_eventMetaDataPtr = newEventMetaData;
686
688}
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
@ c_EndRun
Counting time/calls in endRun()
static void initializeEndRun()
Initialize run independent random generator for end run.

◆ processEvent() [1/2]

bool processEvent ( PathIterator  moduleIter,
bool  skipMasterModule 
)
protectedinherited

Calls event() functions on all modules for the current event.

Used by processCore.

Parameters
moduleIteriterator of the path containing all the modules
skipMasterModuleskip the execution of the master module, presumably because this is the first event and it's already been done in initialize()
Returns
true if execution should stop.

Definition at line 324 of file EventProcessor.cc.

325{
326 double time = Utils::getClock() / Unit::s;
328 MetadataService::Instance().addBasf2Status("running event loop");
330 }
331
332 const bool collectStats = !Environment::Instance().getNoStats();
333
334 while (!moduleIter.isDone()) {
335 Module* module = moduleIter.get();
336
337 // run the module ... unless we don't want to
338 if (!(skipMasterModule && module == m_master)) {
339 callEvent(module);
340 }
341
342 //Check for end of data
343 if ((m_eventMetaDataPtr && (m_eventMetaDataPtr->isEndOfData())) ||
344 ((module == m_master) && !m_eventMetaDataPtr)) {
345 if ((module != m_master) && !m_steerRootInputModuleOn) {
346 B2WARNING("Event processing stopped by module '" << module->getName() <<
347 "', which is not in control of event processing (does not provide EventMetaData)");
348 }
349 return true;
350 }
351
352 //Handle EventMetaData changes by master module
353 if (module == m_master) {
354
355 //initialize random number state for the event
357
358 //Check for a change of the run
359 if ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) ||
361
362 if (collectStats)
363 m_processStatisticsPtr->suspendGlobal();
364
366 processBeginRun(skipMasterModule);
367
368 if (collectStats)
369 m_processStatisticsPtr->resumeGlobal();
370 }
371
373
374 //make sure we use the event dependent generator again
376
378
379 } else {
380 //Check for a second master module. Cannot do this if we skipped the
381 //master module as the EventMetaData is probably set before we call this
382 //function
383 if (!skipMasterModule && m_eventMetaDataPtr &&
385 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and " << module->getName());
386 }
387 }
388
389 if (gSignalReceived != 0) {
390 throw StoppedBySignalException(gSignalReceived);
391 }
392
393 //Check for the module conditions, evaluate them and if one is true switch to the new path
394 if (module->evalCondition()) {
395 PathPtr condPath = module->getConditionPath();
396 //continue with parent Path after condition path is executed?
397 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
398 moduleIter = PathIterator(condPath, moduleIter);
399 } else {
400 moduleIter = PathIterator(condPath);
401 }
402 } else {
403 moduleIter.next();
404 }
405 } //end module loop
406 return false;
407}
int getRun() const
Run Getter.
int getExperiment() const
Experiment Getter.
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
double m_lastMetadataUpdate
Time in seconds of last call for metadata update in event loop.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
bool m_steerRootInputModuleOn
True if the SteerRootInputModule is in charge for event processing.
double m_metadataUpdateInterval
Minimal time difference in seconds for metadata updates in event loop.
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:186
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static const double s
[second]
Definition: Unit.h:95
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:140
double getClock()
Return current value of the real-time clock.
Definition: Utils.cc:66

◆ processEvent() [2/2]

bool processEvent ( PathIterator  moduleIter,
bool  skipMasterModule,
bool  Worker = false,
bool  output = false 
)
private

Calls Event function.

Definition at line 483 of file ZMQEventProcessor.cc.

484{
485 double time = Utils::getClock() / Unit::s;
487 MetadataService::Instance().addBasf2Status("running event loop");
489 }
490
491 const bool collectStats = !Environment::Instance().getNoStats();
492
493 while (!moduleIter.isDone()) {
494 Module* module = moduleIter.get();
495
496 // run the module ... unless we don't want to
497 if (module != m_master) {
498 callEvent(module);
499 } else if (!skipMasterModule) {
500 callEvent(module);
501 } else
502 B2INFO("Skipping execution of module " << module->getName());
503
504 if (!m_eventMetaDataPtr) {
505 return false;
506 }
507
508 //Check for end of data
509 if (m_eventMetaDataPtr->isEndOfData()) {
510 // Immediately leave the loop and terminate (true)
511 B2INFO("isEndOfData. Return");
512 return true;
513 }
514
515 //Handle EventMetaData changes by master module
516 if (module == m_master && !skipMasterModule) {
517
518 //initialize random number state for the event
520
521 // Worker Path
522 if (WorkerPath) {
523 B2INFO("Worker Path and First Event!");
524 if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaDataPtr->getExperiment(), m_eventMetaDataPtr->getRun())) {
525 B2INFO("Worker path processing for ZMQDAQ first event.....Skip to the end of path");
526 B2INFO(" --> exp = " << m_eventMetaDataPtr->getExperiment() << " run = " << m_eventMetaDataPtr->getRun());
527 while (true) {
528 module = moduleIter.get();
529 if (module->getName() == "ZMQTxWorker") break;
530 moduleIter.next();
531 }
532 continue;
533 }
534 }
535
536 // Check for EndOfRun
537 if (!WorkerPath && !OutputPath) {
538 if (m_eventMetaDataPtr->isEndOfRun()) {
539 B2INFO("===> EndOfRun : calling processEndRun(); isEndOfRun = " << m_eventMetaDataPtr->isEndOfRun());
541 // Store the current event meta data for the next round
543 // Leave this event, but not the full processing (false)
544 return false;
547 B2INFO("===> EndOfData : ----> Run change request to the same run!!! Skip this event.");
548 return false;
549 }
550 B2INFO("===> EndOfData : calling processBeginRun(); isEndOfData = " << m_previousEventMetaData.isEndOfData() <<
551 " isEndOfRun = " << m_previousEventMetaData.isEndOfRun());
552 B2INFO("--> cur run = " << m_eventMetaDataPtr->getRun() << " <- prev run = " << m_previousEventMetaData.getRun());
553 B2INFO("--> cur evt = " << m_eventMetaDataPtr->getEvent() << " <- prev evt = " << m_previousEventMetaData.getEvent());
554 // The run number should not be 0
555 if (m_eventMetaDataPtr->getRun() != 0) {
558 } else {
559 return false;
560 }
561 }
562
563 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
565 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
567 // if (runChangedWithoutNotice && !g_first_round) {
568 if (runChangedWithoutNotice) {
569 if (collectStats)
570 m_processStatisticsPtr->suspendGlobal();
571
572 B2INFO("===> Run Change (possibly offline) : calling processEndRun() and processBeginRun()");
573 B2INFO("--> cur run = " << m_eventMetaDataPtr->getRun() << " <- prev run = " << m_previousEventMetaData.getRun());
574 B2INFO("--> cur evt = " << m_eventMetaDataPtr->getEvent() << " <- prev evt = " << m_previousEventMetaData.getEvent());
575 B2INFO("--> runChanged = " << runChanged << " runChangedWithoutNotice = " << runChangedWithoutNotice);
576
579
580 if (collectStats)
581 m_processStatisticsPtr->resumeGlobal();
582 }
584 } else
585 B2INFO("Skipping begin/end run processing");
586
587 //make sure we use the event dependent generator again
589
591
592 } else if (!WorkerPath && !OutputPath) {
593 //Check for a second master module. Cannot do this if we skipped the
594 //master module as the EventMetaData is probably set before we call this
595 //function
596 if (!skipMasterModule && m_eventMetaDataPtr &&
598 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and " << module->getName());
599 }
600 }
601
602 if (g_signalReceived != 0) {
603 throw StoppedBySignalException(g_signalReceived);
604 }
605
606 //Check for the module conditions, evaluate them and if one is true switch to the new path
607 if (module->evalCondition()) {
608 PathPtr condPath = module->getConditionPath();
609 //continue with parent Path after condition path is executed?
610 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
611 moduleIter = PathIterator(condPath, moduleIter);
612 } else {
613 moduleIter = PathIterator(condPath);
614 }
615 } else {
616 moduleIter.next();
617 }
618 } //end module loop
619 return false;
620}
unsigned int getEvent() const
Event Getter.
bool isEndOfRun() const
is end-of-run set? (see setEndOfRun()).
bool isEndOfData() const
is end-of-data set? (see setEndOfData()).
void processBeginRun(bool skipDB=false)
Calls BeginRun function.

◆ processInitialize()

void processInitialize ( const ModulePtrList modulePathList,
bool  setEventInfo = true 
)
protectedinherited

Initializes the modules.

Loops over all module instances specified in a list and calls their initialize() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.
setEventInfoif true the first event call of the master module will be called immediately to load the event info right away so that it's available for subsequent modules

Definition at line 242 of file EventProcessor.cc.

243{
244 LogSystem& logSystem = LogSystem::Instance();
246
247 m_processStatisticsPtr.registerInDataStore();
248 //TODO I might want to overwrite it in initialize (e.g. if read from file)
249 // For parallel processing or subevents, I don't want that, though.
250 // Maybe make this a function argument?
252 m_processStatisticsPtr.create();
253 m_processStatisticsPtr->startGlobal();
254
256
257 // EventExtraInfo is needed in several modules so register it here
258 m_eventExtraInfo.registerInDataStore();
259
260 for (const ModulePtr& modPtr : modulePathList) {
261 Module* module = modPtr.get();
262
263 if (module->hasUnsetForcedParams()) {
264 //error message was printed by module
265 continue;
266 }
267
268 //Set the module dependent log level
269 logSystem.updateModule(&(module->getLogConfig()), module->getName());
271
272 //Do initialization
273 m_processStatisticsPtr->initModule(module);
274 m_processStatisticsPtr->startModule();
275 CALL_MODULE(module, initialize);
277
278 //Set the global log level
279 logSystem.updateModule(nullptr);
280
281 //Check whether this is the master module
282 if (!m_master && DataStore::Instance().getEntry(m_eventMetaDataPtr) != nullptr) {
283 B2DEBUG(100, "Found module providing EventMetaData: " << module->getName());
284 m_master = module;
285 if (setEventInfo) {
286 callEvent(module);
287 // update Database payloads: we now have valid event meta data unless
288 // we don't process any events
290 }
291 }
292
293 if (gSignalReceived != 0) {
294 throw StoppedBySignalException(gSignalReceived);
295 }
296 }
298}
DependencyMap & getDependencyMap()
Return map of dependencies between modules.
Definition: DataStore.h:524
void setModule(const Module &mod)
Set the current module (for getCurrentModuleInfo())
Definition: DependencyMap.h:60
StoreObjPtr< EventExtraInfo > m_eventExtraInfo
event extra info object pointer
@ c_Init
Counting time/calls in initialize()
static Database & Instance()
Instance of a singleton Database.
Definition: Database.cc:41
ScopeGuard createScopedUpdateSession()
Make sure we have efficient http pipelinging during initialize/beginRun but don't keep session alive ...
Definition: Database.cc:61

◆ processPath()

void processPath ( const PathPtr localPath,
const ModulePtrList terminateGlobally,
long  maxEvent 
)
private

Basic function run in every process: process the event loop of the given path.

Definition at line 318 of file ZMQEventProcessor.cc.

319{
320 ModulePtrList localModules = localPath->buildModulePathList();
321 maxEvent = getMaximumEventNumber(maxEvent);
322 // we are not using the default signal handler, so the processCore can not throw any exception because if sigint...
323 processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input),
326
327 B2DEBUG(30, "terminate process...");
328 PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
329 processTerminate(localModules);
330}
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
long getMaximumEventNumber(long maxEvent) const
Calculate the maximum event number out of the argument from command line and the environment.
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
Definition: PathUtils.cc:206
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true, bool isWorkerProcess=false, bool isOutputProcess=false)
Process modules in the path.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Input
Input Process.

◆ processTerminate()

void processTerminate ( const ModulePtrList modulePathList)
protectedinherited

Terminates the modules.

Loops over all module instances in reverse order specified in a list and calls their terminate() method.

Parameters
modulePathListA list of all modules which could be executed during the data processing.

Definition at line 443 of file EventProcessor.cc.

444{
446
447 LogSystem& logSystem = LogSystem::Instance();
448 ModulePtrList::const_reverse_iterator listIter;
449 m_processStatisticsPtr->startGlobal();
450
451 for (listIter = modulePathList.rbegin(); listIter != modulePathList.rend(); ++listIter) {
452 Module* module = listIter->get();
453
454 //Set the module dependent log level
455 logSystem.updateModule(&(module->getLogConfig()), module->getName());
456
457 //Do termination
458 m_processStatisticsPtr->startModule();
459 CALL_MODULE(module, terminate);
461
462 //Set the global log level
463 logSystem.updateModule(nullptr);
464 }
465
467}
@ c_Term
Counting time/calls in terminate()

◆ runInput()

void runInput ( const PathPtr inputPath,
const ModulePtrList terminateGlobally,
long  maxEvent 
)
private

Fork out the input process.

Definition at line 226 of file ZMQEventProcessor.cc.

227{
228 if (not inputPath or inputPath->isEmpty()) {
229 return;
230 }
231
233 // This is not the input process, clean up datastore to not contain the first event
235 return;
236 }
237
238 // The default will be to not do anything on signals...
240
243
244 processPath(inputPath, terminateGlobally, maxEvent);
245 B2DEBUG(30, "Finished an input process");
246 exit(0);
247}
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
static bool startInputProcess()
Fork and initialize an input process.
void reset()
Reset the internal state.
void processPath(const PathPtr &localPath, const ModulePtrList &terminateGlobally, long maxEvent)
Basic function run in every process: process the event loop of the given path.

◆ runMonitoring()

void runMonitoring ( const PathPtr inputPath,
const PathPtr mainPath,
const ModulePtrList terminateGlobally,
long  maxEvent 
)
private

Start the monitoring (without forking)

Definition at line 333 of file ZMQEventProcessor.cc.

335{
337 return;
338 }
339
340 const auto& environment = Environment::Instance();
341
342 B2DEBUG(30, "Will now start process monitor...");
343 const int numProcesses = environment.getNumberProcesses();
344 m_processMonitor.initialize(numProcesses);
345
346 // Make sure the input process is running until we go on
349 return;
350 }
351 // Make sure the output process is running until we go on
354 return;
355 }
356
357 installMainSignalHandlers(storeSignal);
358
359 // at least start the number of workers requested
360 runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
361
362 const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
363 const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
364
365 B2DEBUG(30, "Will now start main loop...");
366 while (true) {
367 // check multicast for messages and kill workers if requested
369 // check the child processes, if one has died
371 // check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
372 m_processMonitor.checkSignals(g_signalReceived);
373
374 // If we have received a SIGINT signal or the last process is gone, we can end smoothly
376 break;
377 }
378
379 // Test if we need more workers
380 const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
381 if (neededWorkers > 0) {
382 B2DEBUG(30, "restartFailedWorkers = " << restartFailedWorkers);
383 if (restartFailedWorkers) {
384 B2DEBUG(30, ".... Restarting a new worker");
385 B2ERROR(".... Restarting a new worker process");
386 runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
387 } else if (failOnFailedWorkers) {
388 B2ERROR("A worker failed. Will try to end the process smoothly now.");
389 break;
390 } else if (not m_processMonitor.hasWorkers()) {
391 B2WARNING("All workers have died and you did not request to restart them. Going down now.");
392 break;
393 }
394 }
395 }
396
397 B2DEBUG(30, "Finished the monitoring process");
398}
static bool startMonitoringProcess()
Fork and initialize a monitoring process.
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
void checkChildProcesses()
check the child processes, if one has died
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
bool hasWorkers() const
Check if there is at least one running worker.
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
void runWorker(unsigned int numProcesses, const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the N worker process.

◆ runOutput()

void runOutput ( const PathPtr outputPath,
const ModulePtrList terminateGlobally,
long  maxEvent 
)
private

Fork out the output process.

Definition at line 249 of file ZMQEventProcessor.cc.

250{
251 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
252 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
253 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
254
255 if (not outputPath or outputPath->isEmpty()) {
256 return;
257 }
258
260 return;
261 }
262
263 // The default will be to not do anything on signals...
265
267
268 // Set the rx module as main module
269 m_master = outputPath->getModules().begin()->get();
270
271 processPath(outputPath, terminateGlobally, maxEvent);
272
273 // Send the statistics to the process monitor
274 StreamHelper streamer;
275 ZMQClient zmqClient;
276
277 // TODO: true?
278 streamer.initialize(0, true);
279 zmqClient.initialize(pubSocketAddress, subSocketAddress);
280
281 // TODO: make sure to only send statistics!
282 const auto& evtMessage = streamer.stream();
283 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
284 zmqClient.publish(std::move(message));
285
286 B2DEBUG(30, "Finished an output process");
287 exit(0);
288}
static bool startOutputProcess(bool local=false)
Fork and initialize an output process.
std::list< ModulePtr > getModules() const override
no submodules, return empty list
Definition: Module.h:505
Helper class for data store serialization.
Definition: StreamHelper.h:23
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.

◆ runWorker()

void runWorker ( unsigned int  numProcesses,
const PathPtr inputPath,
const PathPtr mainPath,
const ModulePtrList terminateGlobally,
long  maxEvent 
)
private

Fork out the N worker process.

Definition at line 289 of file ZMQEventProcessor.cc.

291{
292 if (numProcesses == 0) {
293 return;
294 }
295
296 if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
297 // Make sure the worker process is running until we go on
298 m_processMonitor.waitForRunningWorker(Environment::Instance().getZMQMaximalWaitingTime());
299 return;
300 }
301
302 // The default will be to not do anything on signals...
304
305 if (inputPath and not inputPath->isEmpty()) {
306 // set Rx as master
307 m_master = mainPath->getModules().begin()->get();
308 }
309
312
313 processPath(mainPath, terminateGlobally, maxEvent);
314 B2DEBUG(30, "Finished a worker process");
315 exit(0);
316}
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.

◆ setProfileModuleName()

void setProfileModuleName ( const std::string &  name)
inlineinherited

Set the name of the module we want to profile.

Parameters
nameName of the module as returned by getName()

Definition at line 58 of file EventProcessor.h.

58{ m_profileModuleName = name; }
std::string m_profileModuleName
Name of the module which should be profiled, empty if no profiling is requested.

◆ terminateAndCleanup()

void terminateAndCleanup ( const ModulePtr histogramManager)
private

Last step in the process: run the termination and cleanup (kill all remaining processes)

Definition at line 202 of file ZMQEventProcessor.cc.

203{
204 cleanup();
205
206 if (histogramManager) {
207 B2INFO("HistoManager:: adding histogram files");
209 }
210
211 // did anything bad happen?
212 if (g_signalReceived) {
213 if (g_signalReceived == SIGINT) {
214 B2RESULT("Processing aborted via signal " << g_signalReceived <<
215 ", terminating. Output files have been closed safely and should be readable.");
216 } else {
217 B2ERROR("Processing aborted via signal " << g_signalReceived <<
218 ", terminating. Output files have been closed safely and should be readable.");
219 }
220 // re-raise the signal
221 installSignalHandler(g_signalReceived, SIG_DFL);
222 raise(g_signalReceived);
223 }
224}
static RbTupleManager & Instance()
Access to singleton.
Definition: RbTuple.cc:38
int hadd(bool deleteflag=true)
Functions to add up all histogram files.
Definition: RbTuple.cc:136

◆ writeToStdErr()

void writeToStdErr ( const char  msg[])
staticinherited

async-safe method to write something to STDERR.

Definition at line 72 of file EventProcessor.cc.

73{
74 //signal handlers are called asynchronously, making many standard functions (including output) dangerous
75 //write() is, however, safe, so we'll use that to write to stderr.
76
77 //strlen() not explicitly in safe list, but doesn't have any error handling routines that might alter global state
78 const int len = strlen(msg);
79
80 int rc = write(STDERR_FILENO, msg, len);
81 (void) rc; //ignore return value (there's nothing we can do about a failed write)
82
83}

Member Data Documentation

◆ m_eventExtraInfo

StoreObjPtr<EventExtraInfo> m_eventExtraInfo
protectedinherited

event extra info object pointer

Definition at line 170 of file EventProcessor.h.

◆ m_eventMetaDataPtr

StoreObjPtr<EventMetaData> m_eventMetaDataPtr
protectedinherited

EventMetaData is used by processEvent()/processCore().

Definition at line 164 of file EventProcessor.h.

◆ m_inRun

bool m_inRun
protectedinherited

Are we currently in a run? If yes, processEndRun() needs to do something.

Definition at line 176 of file EventProcessor.h.

◆ m_lastMetadataUpdate

double m_lastMetadataUpdate
protectedinherited

Time in seconds of last call for metadata update in event loop.

Definition at line 179 of file EventProcessor.h.

◆ m_master

const Module* m_master
protectedinherited

The master module that determines the experiment/run/event number.

Definition at line 154 of file EventProcessor.h.

◆ m_metadataUpdateInterval

double m_metadataUpdateInterval
protectedinherited

Minimal time difference in seconds for metadata updates in event loop.

Definition at line 182 of file EventProcessor.h.

◆ m_moduleList

ModulePtrList m_moduleList
protectedinherited

List of all modules in order initialized.

Definition at line 155 of file EventProcessor.h.

◆ m_previousEventMetaData

EventMetaData m_previousEventMetaData
private

Stores previous eventMetaData.

Definition at line 82 of file ZMQEventProcessor.h.

◆ m_processMonitor

ProcessMonitor m_processMonitor
private

Instance of the process monitor.

Definition at line 79 of file ZMQEventProcessor.h.

◆ m_processStatisticsPtr

StoreObjPtr<ProcessStatistics> m_processStatisticsPtr
protectedinherited

Also used in a number of places.

Definition at line 173 of file EventProcessor.h.

◆ m_profileModule

Module* m_profileModule = nullptr
protectedinherited

Address of the module which we want to profile, nullptr if no profiling is requested.

Definition at line 161 of file EventProcessor.h.

◆ m_profileModuleName

std::string m_profileModuleName
protectedinherited

Name of the module which should be profiled, empty if no profiling is requested.

Definition at line 158 of file EventProcessor.h.

◆ m_steerRootInputModuleOn

bool m_steerRootInputModuleOn = false
protectedinherited

True if the SteerRootInputModule is in charge for event processing.

Definition at line 185 of file EventProcessor.h.


The documentation for this class was generated from the following files: