9#include <framework/pcore/ProcHelper.h>
10#include <framework/pcore/GlobalProcHandler.h>
11#include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12#include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14#include <framework/pcore/PathUtils.h>
16#include <framework/pcore/ZMQEventProcessor.h>
17#include <framework/pcore/DataStoreStreamer.h>
18#include <framework/pcore/RbTuple.h>
20#include <framework/core/Environment.h>
21#include <framework/logging/LogSystem.h>
23#include <framework/database/DBStore.h>
24#include <framework/database/Database.h>
25#include <framework/core/RandomNumbers.h>
26#include <framework/core/MetadataService.h>
27#include <framework/gearbox/Unit.h>
28#include <framework/utilities/Utils.h>
48 static int g_signalReceived = 0;
53 static void cleanupAndRaiseSignal(
int signalNumber)
55 if (g_eventProcessorForSignalHandling) {
56 g_eventProcessorForSignalHandling->
cleanup();
59 signal(signalNumber, SIG_DFL);
63 static void storeSignal(
int signalNumber)
65 if (signalNumber == SIGINT) {
70 if (g_signalReceived == 0) {
71 g_signalReceived = signalNumber;
76 std::string g_socketAddress =
"";
78 void deleteSocketFiles()
84 const std::vector<ZMQAddressType> socketAddressList = {ZMQAddressType::c_input, ZMQAddressType::c_output, ZMQAddressType::c_pub, ZMQAddressType::c_sub, ZMQAddressType::c_control};
85 const auto seperatorPos = g_socketAddress.find(
"://");
87 if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
91 const std::string filename(g_socketAddress.substr(seperatorPos + 3));
94 for (
const auto socketAdressType : socketAddressList) {
96 if (stat(socketAddress.c_str(), &buffer) == 0) {
97 remove(socketAddress.c_str());
105 B2ASSERT(
"You are having two instances of the ZMQEventProcessor running! This is not possible",
106 not g_eventProcessorForSignalHandling);
107 g_eventProcessorForSignalHandling =
this;
111 std::atexit(deleteSocketFiles);
117 g_eventProcessorForSignalHandling =
nullptr;
128 if (path->isEmpty()) {
133 if (numProcesses == 0) {
134 B2FATAL(
"ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
138 PathPtr inputPath, mainPath, outputPath;
143 for (
const ModulePtr& module : inputPath->getModules()) {
144 if (module->getName() ==
"HLTZMQ2Ds") {
146 B2INFO(
"ZMQEventProcessor : DAQ environment set");
151 if (not mainPath or mainPath->isEmpty()) {
152 B2WARNING(
"Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
167 forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
176 if (histogramManager) {
177 histogramManager->initialize();
182 B2INFO(
"ZMQEventProcessor : processInitialize done");
186 B2ERROR(
"There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
191 if (numLogError != 0) {
192 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
203 gROOT->GetListOfFiles()->Clear(
"nodelete");
210 if (histogramManager) {
211 B2INFO(
"HistoManager:: adding histogram files");
216 if (g_signalReceived) {
217 if (g_signalReceived == SIGINT) {
218 B2RESULT(
"Processing aborted via signal " << g_signalReceived <<
219 ", terminating. Output files have been closed safely and should be readable.");
221 B2ERROR(
"Processing aborted via signal " << g_signalReceived <<
222 ", terminating. Output files have been closed safely and should be readable.");
226 raise(g_signalReceived);
232 if (not inputPath or inputPath->isEmpty()) {
248 processPath(inputPath, terminateGlobally, maxEvent);
249 B2DEBUG(30,
"Finished an input process");
259 if (not outputPath or outputPath->isEmpty()) {
275 processPath(outputPath, terminateGlobally, maxEvent);
283 zmqClient.
initialize(pubSocketAddress, subSocketAddress);
286 const auto& evtMessage = streamer.
stream();
288 zmqClient.
publish(std::move(message));
290 B2DEBUG(30,
"Finished an output process");
296 if (numProcesses == 0) {
311 if (inputPath and not inputPath->isEmpty()) {
319 processPath(mainPath, terminateGlobally, maxEvent);
320 B2DEBUG(30,
"Finished a worker process");
326 ModulePtrList localModules = localPath->buildModulePathList();
334 B2DEBUG(30,
"terminate process...");
349 B2DEBUG(30,
"Will now start process monitor...");
350 const int numProcesses = environment.getNumberProcesses();
369 const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
370 const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
372 B2DEBUG(30,
"Will now start main loop...");
388 if (neededWorkers > 0) {
389 B2DEBUG(30,
"restartFailedWorkers = " << restartFailedWorkers);
390 if (restartFailedWorkers) {
391 B2DEBUG(30,
".... Restarting a new worker");
392 B2ERROR(
".... Restarting a new worker process");
393 runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
394 }
else if (failOnFailedWorkers) {
395 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
398 B2WARNING(
"All workers have died and you did not request to restart them. Going down now.");
404 B2DEBUG(30,
"Finished the monitoring process");
424 runInput(inputPath, terminateGlobally, maxEvent);
425 runOutput(outputPath, terminateGlobally, maxEvent);
426 runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
442 bool isInputProcess,
bool isWorkerProcess,
bool isOutputProcess)
456 bool endProcess =
false;
457 while (!endProcess) {
465 if (isInputProcess) {
467 }
else if (isWorkerProcess) {
470 }
else if (isOutputProcess) {
474 B2INFO(
"processCore : should not come here. Specified path is invalid");
485 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess =
true;
497 B2INFO(
"processCore : End Last Run. calling processEndRun()");
512 while (!moduleIter.
isDone()) {
521 }
else if (!skipMasterModule) {
526 B2INFO(
"Skipping execution of module " << module->getName());
536 B2INFO(
"isEndOfData. Return");
541 if (module ==
m_master && !skipMasterModule) {
548 B2INFO(
"Worker Path and First Event!");
552 B2INFO(
"Worker path processing for ZMQDAQ first event.....Skip to the end of path");
555 module = moduleIter.
get();
557 if (module->getName() ==
"ZMQTxWorker")
break;
566 if (!WorkerPath && !OutputPath) {
568 B2INFO(
"===> EndOfRun : calling processEndRun(); isEndOfRun = " <<
m_eventMetaDataPtr->isEndOfRun());
576 B2INFO(
"===> EndOfData : ----> Run change request to the same run!!! Skip this event.");
594 if (runChangedWithoutNotice) {
598 B2INFO(
"===> Run Change (possibly offline) : calling processEndRun() and processBeginRun()");
601 B2INFO(
"--> runChanged = " << runChanged <<
" runChangedWithoutNotice = " << runChangedWithoutNotice);
612 B2INFO(
"Skipping begin/end run processing");
619 }
else if (!WorkerPath && !OutputPath) {
625 B2FATAL(
"Two modules setting EventMetaData were discovered: " <<
m_master->
getName() <<
" and " << module->getName());
629 if (g_signalReceived != 0) {
634 if (module->evalCondition()) {
635 PathPtr condPath = module->getConditionPath();
637 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
665 Module* module = modPtr.get();
668 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
702 Module* module = modPtr.get();
705 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
void setInitializeActive(bool active)
Setter for m_initializeActive.
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
const std::string & getZMQSocketAddress() const
Socket address to use in ZMQ.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
bool getNoStats() const
Disable collection of statistics during event processing.
void setZMQDAQEnvironment(bool zmqDAQ)
Set DAQ environment.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Exception thrown when execution is stopped by a signal.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
double m_lastMetadataUpdate
Time in seconds of last call for metadata update in event loop.
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
const Module * m_master
The master module that determines the experiment/run/event number.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
ModulePtrList m_moduleList
List of all modules in order initialized.
long getMaximumEventNumber(long maxEvent) const
Calculate the maximum event number out of the argument from command line and the environment.
double m_metadataUpdateInterval
Minimal time difference in seconds for metadata updates in event loop.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
static bool startOutputProcess(bool local=false)
Fork and initialize an output process.
static bool startInputProcess()
Fork and initialize an input process.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
static bool startMonitoringProcess()
Fork and initialize a monitoring process.
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
static std::string getProcessName()
Get a human readable name for this process. (input, event, output...).
@ c_Error
Error: for things that went wrong and have to be fixed.
Class for logging debug, info and error messages.
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
@ c_EndRun
Counting time/calls in endRun()
@ c_BeginRun
Counting time/calls in beginRun()
@ c_Event
Counting time/calls in event()
const std::string & getName() const
Returns the name of the module.
std::list< ModulePtr > getModules() const override
no submodules, return empty list
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
static ModulePtr getHistogramManager(PathPtr &inputPath)
Find the histogram manager in the paths and return it.
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
void checkChildProcesses()
check the child processes, if one has died
void terminate()
Terminate the processing.
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
bool hasWorkers() const
Check if there is at least one running worker.
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
void reset()
Reset the internal state.
static void initializeEndRun()
Initialize run independent random generator for end run.
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static RbTupleManager & Instance()
Access to singleton.
int hadd(bool deleteflag=true)
Functions to add up all histogram files.
Helper class for data store serialization.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
static const double s
[second]
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
This class provides the core event processing loop for parallel processing with ZMQ.
ProcessMonitor m_processMonitor
Instance of the process monitor.
void processEndRun()
Calls EndRun function.
void runMonitoring(const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Start the monitoring (without forking)
void processBeginRun(bool skipDB=false)
Calls BeginRun function.
void cleanup()
clean up IPC resources (should only be called in one process).
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain using parallel processing, starting with the first module in the give...
void terminateAndCleanup(const ModulePtr &histogramManager)
Last step in the process: run the termination and cleanup (kill all remaining processes)
void runWorker(unsigned int numProcesses, const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the N worker process.
void initialize(const ModulePtrList &moduleList, const ModulePtr &histogramManager)
First step in the process: init the module in the list.
bool processEvent(PathIterator moduleIter, bool skipMasterModule, bool Worker=false, bool output=false)
Calls Event function.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true, bool isWorkerProcess=false, bool isOutputProcess=false)
Process modules in the path.
void runInput(const PathPtr &inputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the input process.
void runOutput(const PathPtr &outputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the output process.
void forkAndRun(long maxEvent, const PathPtr &inputPath, const PathPtr &mainPath, const PathPtr &outputPath, const ModulePtrList &terminateGlobally)
Second step in the process: fork out the processes we need to have and call the event loop.
virtual ~ZMQEventProcessor()
Make sure we remove all sockets cleanly.
EventMetaData m_previousEventMetaData
Stores previous eventMetaData.
void processPath(const PathPtr &localPath, const ModulePtrList &terminateGlobally, long maxEvent)
Basic function run in every process: process the event loop of the given path.
ZMQEventProcessor()
Init the socket cleaning at exit.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
static DBStore & Instance()
Instance of a singleton DBStore.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
void updateEvent()
Updates all intra-run dependent objects.
void update()
Updates all objects that are outside their interval of validity.
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Monitor
Monitoring Process.
@ c_Init
Before the forks, the process is in init state.
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
double getClock()
Return current value of the real-time clock.
Abstract base class for different kinds of events.