9#include <framework/pcore/ProcHelper.h>
10#include <framework/pcore/GlobalProcHandler.h>
11#include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12#include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14#include <framework/pcore/PathUtils.h>
16#include <framework/pcore/ZMQEventProcessor.h>
17#include <framework/pcore/DataStoreStreamer.h>
18#include <framework/pcore/RbTuple.h>
20#include <framework/core/Environment.h>
21#include <framework/logging/LogSystem.h>
23#include <framework/database/DBStore.h>
24#include <framework/core/RandomNumbers.h>
25#include <framework/core/MetadataService.h>
26#include <framework/gearbox/Unit.h>
27#include <framework/utilities/Utils.h>
46 static int g_signalReceived = 0;
51 static void cleanupAndRaiseSignal(
int signalNumber)
53 if (g_eventProcessorForSignalHandling) {
54 g_eventProcessorForSignalHandling->
cleanup();
57 signal(signalNumber, SIG_DFL);
61 static void storeSignal(
int signalNumber)
63 if (signalNumber == SIGINT) {
68 if (g_signalReceived == 0) {
69 g_signalReceived = signalNumber;
74 std::string g_socketAddress =
"";
76 void deleteSocketFiles()
82 const std::vector<ZMQAddressType> socketAddressList = {ZMQAddressType::c_input, ZMQAddressType::c_output, ZMQAddressType::c_pub, ZMQAddressType::c_sub, ZMQAddressType::c_control};
83 const auto seperatorPos = g_socketAddress.find(
"://");
85 if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
89 const std::string filename(g_socketAddress.substr(seperatorPos + 3));
92 for (
const auto socketAdressType : socketAddressList) {
94 if (stat(socketAddress.c_str(), &buffer) == 0) {
95 remove(socketAddress.c_str());
103 B2ASSERT(
"You are having two instances of the ZMQEventProcessor running! This is not possible",
104 not g_eventProcessorForSignalHandling);
105 g_eventProcessorForSignalHandling =
this;
109 std::atexit(deleteSocketFiles);
115 g_eventProcessorForSignalHandling =
nullptr;
126 if (path->isEmpty()) {
131 if (numProcesses == 0) {
132 B2FATAL(
"ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
136 PathPtr inputPath, mainPath, outputPath;
141 for (
const ModulePtr& module : inputPath->getModules()) {
142 if (module->getName() ==
"HLTZMQ2Ds") {
144 B2INFO(
"ZMQEventProcessor : DAQ environment set");
149 if (not mainPath or mainPath->isEmpty()) {
150 B2WARNING(
"Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
163 forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
172 if (histogramManager) {
173 histogramManager->initialize();
178 B2INFO(
"ZMQEventProcessor : processInitialize done");
182 B2ERROR(
"There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
187 if (numLogError != 0) {
188 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
199 gROOT->GetListOfFiles()->Clear(
"nodelete");
206 if (histogramManager) {
207 B2INFO(
"HistoManager:: adding histogram files");
212 if (g_signalReceived) {
213 if (g_signalReceived == SIGINT) {
214 B2RESULT(
"Processing aborted via signal " << g_signalReceived <<
215 ", terminating. Output files have been closed safely and should be readable.");
217 B2ERROR(
"Processing aborted via signal " << g_signalReceived <<
218 ", terminating. Output files have been closed safely and should be readable.");
222 raise(g_signalReceived);
228 if (not inputPath or inputPath->isEmpty()) {
244 processPath(inputPath, terminateGlobally, maxEvent);
245 B2DEBUG(30,
"Finished an input process");
255 if (not outputPath or outputPath->isEmpty()) {
271 processPath(outputPath, terminateGlobally, maxEvent);
279 zmqClient.
initialize(pubSocketAddress, subSocketAddress);
282 const auto& evtMessage = streamer.
stream();
284 zmqClient.
publish(std::move(message));
286 B2DEBUG(30,
"Finished an output process");
292 if (numProcesses == 0) {
305 if (inputPath and not inputPath->isEmpty()) {
307 m_master = mainPath->getModules().begin()->get();
313 processPath(mainPath, terminateGlobally, maxEvent);
314 B2DEBUG(30,
"Finished a worker process");
320 ModulePtrList localModules = localPath->buildModulePathList();
327 B2DEBUG(30,
"terminate process...");
342 B2DEBUG(30,
"Will now start process monitor...");
343 const int numProcesses = environment.getNumberProcesses();
362 const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
363 const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
365 B2DEBUG(30,
"Will now start main loop...");
381 if (neededWorkers > 0) {
382 B2DEBUG(30,
"restartFailedWorkers = " << restartFailedWorkers);
383 if (restartFailedWorkers) {
384 B2DEBUG(30,
".... Restarting a new worker");
385 B2ERROR(
".... Restarting a new worker process");
386 runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
387 }
else if (failOnFailedWorkers) {
388 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
391 B2WARNING(
"All workers have died and you did not request to restart them. Going down now.");
397 B2DEBUG(30,
"Finished the monitoring process");
417 runInput(inputPath, terminateGlobally, maxEvent);
418 runOutput(outputPath, terminateGlobally, maxEvent);
419 runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
435 bool isInputProcess,
bool isWorkerProcess,
bool isOutputProcess)
447 bool endProcess =
false;
448 while (!endProcess) {
454 if (isInputProcess) {
456 }
else if (isWorkerProcess) {
459 }
else if (isOutputProcess) {
463 B2INFO(
"processCore : should not come here. Specified path is invalid");
471 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess =
true;
478 B2INFO(
"processCore : End Last Run. calling processEndRun()");
493 while (!moduleIter.
isDone()) {
499 }
else if (!skipMasterModule) {
502 B2INFO(
"Skipping execution of module " << module->getName());
511 B2INFO(
"isEndOfData. Return");
516 if (module ==
m_master && !skipMasterModule) {
523 B2INFO(
"Worker Path and First Event!");
525 B2INFO(
"Worker path processing for ZMQDAQ first event.....Skip to the end of path");
528 module = moduleIter.
get();
529 if (module->getName() ==
"ZMQTxWorker")
break;
537 if (!WorkerPath && !OutputPath) {
539 B2INFO(
"===> EndOfRun : calling processEndRun(); isEndOfRun = " <<
m_eventMetaDataPtr->isEndOfRun());
547 B2INFO(
"===> EndOfData : ----> Run change request to the same run!!! Skip this event.");
568 if (runChangedWithoutNotice) {
572 B2INFO(
"===> Run Change (possibly offline) : calling processEndRun() and processBeginRun()");
575 B2INFO(
"--> runChanged = " << runChanged <<
" runChangedWithoutNotice = " << runChangedWithoutNotice);
585 B2INFO(
"Skipping begin/end run processing");
592 }
else if (!WorkerPath && !OutputPath) {
598 B2FATAL(
"Two modules setting EventMetaData were discovered: " <<
m_master->
getName() <<
" and " << module->getName());
602 if (g_signalReceived != 0) {
607 if (module->evalCondition()) {
608 PathPtr condPath = module->getConditionPath();
610 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
637 Module* module = modPtr.get();
640 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
672 Module* module = modPtr.get();
675 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
void setInitializeActive(bool active)
Setter for m_initializeActive.
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
const std::string & getZMQSocketAddress() const
Socket address to use in ZMQ.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
bool getNoStats() const
Disable collection of statistics during event processing.
void setZMQDAQEnvironment(bool zmqDAQ)
Set DAQ environment.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Exception thrown when execution is stopped by a signal.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
double m_lastMetadataUpdate
Time in seconds of last call for metadata update in event loop.
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
const Module * m_master
The master module that determines the experiment/run/event number.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
ModulePtrList m_moduleList
List of all modules in order initialized.
long getMaximumEventNumber(long maxEvent) const
Calculate the maximum event number out of the argument from command line and the environment.
double m_metadataUpdateInterval
Minimal time difference in seconds for metadata updates in event loop.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
static bool startOutputProcess(bool local=false)
Fork and initialize an output process.
static bool startInputProcess()
Fork and initialize an input process.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
static bool startMonitoringProcess()
Fork and initialize a monitoring process.
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
static std::string getProcessName()
Get a human readable name for this process. (input, event, output...).
@ c_Error
Error: for things that went wrong and have to be fixed.
Class for logging debug, info and error messages.
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
@ c_EndRun
Counting time/calls in endRun()
@ c_BeginRun
Counting time/calls in beginRun()
@ c_Event
Counting time/calls in event()
const std::string & getName() const
Returns the name of the module.
std::list< ModulePtr > getModules() const override
no submodules, return empty list
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
static ModulePtr getHistogramManager(PathPtr &inputPath)
Find the histogram manager in the paths and return it.
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
void checkChildProcesses()
check the child processes, if one has died
void terminate()
Terminate the processing.
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
bool hasWorkers() const
Check if there is at least one running worker.
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
void reset()
Reset the internal state.
static void initializeEndRun()
Initialize run independent random generator for end run.
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static RbTupleManager & Instance()
Access to singleton.
int hadd(bool deleteflag=true)
Functions to add up all histogram files.
Helper class for data store serialization.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
static const double s
[second]
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
This class provides the core event processing loop for parallel processing with ZMQ.
ProcessMonitor m_processMonitor
Instance of the process monitor.
void processEndRun()
Calls EndRun function.
void runMonitoring(const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Start the monitoring (without forking)
void processBeginRun(bool skipDB=false)
Calls BeginRun function.
void cleanup()
clean up IPC resources (should only be called in one process).
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain using parallel processing, starting with the first module in the give...
void terminateAndCleanup(const ModulePtr &histogramManager)
Last step in the process: run the termination and cleanup (kill all remaining processes)
void runWorker(unsigned int numProcesses, const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the N worker process.
void initialize(const ModulePtrList &moduleList, const ModulePtr &histogramManager)
First step in the process: init the module in the list.
bool processEvent(PathIterator moduleIter, bool skipMasterModule, bool Worker=false, bool output=false)
Calls Event function.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true, bool isWorkerProcess=false, bool isOutputProcess=false)
Process modules in the path.
void runInput(const PathPtr &inputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the input process.
void runOutput(const PathPtr &outputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the output process.
void forkAndRun(long maxEvent, const PathPtr &inputPath, const PathPtr &mainPath, const PathPtr &outputPath, const ModulePtrList &terminateGlobally)
Second step in the process: fork out the processes we need to have and call the event loop.
virtual ~ZMQEventProcessor()
Make sure we remove all sockets cleanly.
EventMetaData m_previousEventMetaData
Stores previous eventMetaData.
void processPath(const PathPtr &localPath, const ModulePtrList &terminateGlobally, long maxEvent)
Basic function run in every process: process the event loop of the given path.
ZMQEventProcessor()
Init the socket cleaning at exit.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
static DBStore & Instance()
Instance of a singleton DBStore.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
void updateEvent()
Updates all intra-run dependent objects.
void update()
Updates all objects that are outside their interval of validity.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Monitor
Monitoring Process.
@ c_Init
Before the forks, the process is in init state.
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
double getClock()
Return current value of the real-time clock.
Abstract base class for different kinds of events.