9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/GlobalProcHandler.h>
11 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/PathUtils.h>
16 #include <framework/pcore/ZMQEventProcessor.h>
17 #include <framework/pcore/DataStoreStreamer.h>
18 #include <framework/pcore/RbTuple.h>
20 #include <framework/core/Environment.h>
21 #include <framework/logging/LogSystem.h>
23 #include <framework/database/DBStore.h>
24 #include <framework/database/Database.h>
25 #include <framework/core/RandomNumbers.h>
26 #include <framework/core/MetadataService.h>
27 #include <framework/gearbox/Unit.h>
28 #include <framework/utilities/Utils.h>
48 static int g_signalReceived = 0;
53 static void cleanupAndRaiseSignal(
int signalNumber)
55 if (g_eventProcessorForSignalHandling) {
56 g_eventProcessorForSignalHandling->
cleanup();
59 signal(signalNumber, SIG_DFL);
63 static void storeSignal(
int signalNumber)
65 if (signalNumber == SIGINT) {
66 EventProcessor::writeToStdErr(
"\nStopping basf2 gracefully...\n");
70 if (g_signalReceived == 0) {
71 g_signalReceived = signalNumber;
76 std::string g_socketAddress =
"";
78 void deleteSocketFiles()
80 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
85 const auto seperatorPos = g_socketAddress.find(
"://");
87 if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
91 const std::string filename(g_socketAddress.substr(seperatorPos + 3));
94 for (
const auto socketAdressType : socketAddressList) {
95 const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
96 if (stat(socketAddress.c_str(), &buffer) == 0) {
97 remove(socketAddress.c_str());
103 ZMQEventProcessor::ZMQEventProcessor()
105 B2ASSERT(
"You are having two instances of the ZMQEventProcessor running! This is not possible",
106 not g_eventProcessorForSignalHandling);
107 g_eventProcessorForSignalHandling =
this;
110 g_socketAddress = Environment::Instance().getZMQSocketAddress();
111 std::atexit(deleteSocketFiles);
114 ZMQEventProcessor::~ZMQEventProcessor()
117 g_eventProcessorForSignalHandling =
nullptr;
120 void ZMQEventProcessor::process(
const PathPtr& path,
long maxEvent)
128 if (path->isEmpty()) {
132 const int numProcesses = Environment::Instance().getNumberProcesses();
133 if (numProcesses == 0) {
134 B2FATAL(
"ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
138 PathPtr inputPath, mainPath, outputPath;
139 std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
140 const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath, mainPath, outputPath);
143 for (
const ModulePtr& module : inputPath->getModules()) {
144 if (module->getName() ==
"HLTZMQ2Ds") {
145 Environment::Instance().setZMQDAQEnvironment(
true);
146 B2INFO(
"ZMQEventProcessor : DAQ environment set");
151 if (not mainPath or mainPath->isEmpty()) {
152 B2WARNING(
"Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
153 EventProcessor::process(path, maxEvent);
158 const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
161 initialize(moduleList, histogramManager);
166 const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
167 forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
169 installMainSignalHandlers(cleanupAndRaiseSignal);
171 terminateAndCleanup(histogramManager);
176 if (histogramManager) {
177 histogramManager->initialize();
180 processInitialize(moduleList,
true);
182 B2INFO(
"ZMQEventProcessor : processInitialize done");
186 B2ERROR(
"There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
190 int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
191 if (numLogError != 0) {
192 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
203 gROOT->GetListOfFiles()->Clear(
"nodelete");
206 void ZMQEventProcessor::terminateAndCleanup(
const ModulePtr& histogramManager)
210 if (histogramManager) {
211 B2INFO(
"HistoManager:: adding histogram files");
212 RbTupleManager::Instance().hadd();
216 if (g_signalReceived) {
217 if (g_signalReceived == SIGINT) {
218 B2RESULT(
"Processing aborted via signal " << g_signalReceived <<
219 ", terminating. Output files have been closed safely and should be readable.");
221 B2ERROR(
"Processing aborted via signal " << g_signalReceived <<
222 ", terminating. Output files have been closed safely and should be readable.");
225 installSignalHandler(g_signalReceived, SIG_DFL);
226 raise(g_signalReceived);
230 void ZMQEventProcessor::runInput(
const PathPtr& inputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
232 if (not inputPath or inputPath->isEmpty()) {
236 if (not GlobalProcHandler::startInputProcess()) {
238 DataStore::Instance().invalidateData(DataStore::c_Event);
243 installMainSignalHandlers(SIG_IGN);
245 m_processMonitor.reset();
246 DataStoreStreamer::removeSideEffects();
248 processPath(inputPath, terminateGlobally, maxEvent);
249 B2DEBUG(30,
"Finished an input process");
253 void ZMQEventProcessor::runOutput(
const PathPtr& outputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
255 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
256 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
257 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
259 if (not outputPath or outputPath->isEmpty()) {
263 if (not GlobalProcHandler::startOutputProcess()) {
268 installMainSignalHandlers(SIG_IGN);
270 m_processMonitor.reset();
273 m_master = outputPath->getModules().begin()->get();
275 processPath(outputPath, terminateGlobally, maxEvent);
283 zmqClient.
initialize(pubSocketAddress, subSocketAddress);
286 const auto& evtMessage = streamer.
stream();
287 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
288 zmqClient.
publish(std::move(message));
290 B2DEBUG(30,
"Finished an output process");
293 void ZMQEventProcessor::runWorker(
unsigned int numProcesses,
const PathPtr& inputPath,
const PathPtr& mainPath,
296 if (numProcesses == 0) {
300 if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
304 m_processMonitor.waitForRunningWorker(Environment::Instance().getZMQMaximalWaitingTime());
309 installMainSignalHandlers(SIG_IGN);
311 if (inputPath and not inputPath->isEmpty()) {
313 m_master = mainPath->getModules().begin()->get();
316 m_processMonitor.reset();
317 DataStoreStreamer::removeSideEffects();
319 processPath(mainPath, terminateGlobally, maxEvent);
320 B2DEBUG(30,
"Finished a worker process");
324 void ZMQEventProcessor::processPath(
const PathPtr& localPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
326 ModulePtrList localModules = localPath->buildModulePathList();
327 maxEvent = getMaximumEventNumber(maxEvent);
330 processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input),
331 GlobalProcHandler::isProcess(ProcType::c_Worker),
332 GlobalProcHandler::isProcess(ProcType::c_Output));
334 B2DEBUG(30,
"terminate process...");
335 PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
336 processTerminate(localModules);
343 if (not GlobalProcHandler::startMonitoringProcess()) {
347 const auto& environment = Environment::Instance();
349 B2DEBUG(30,
"Will now start process monitor...");
350 const int numProcesses = environment.getNumberProcesses();
351 m_processMonitor.initialize(numProcesses);
354 m_processMonitor.waitForRunningInput(60 * 1000);
355 if (m_processMonitor.hasEnded()) {
359 m_processMonitor.waitForRunningOutput(60 * 1000);
360 if (m_processMonitor.hasEnded()) {
364 installMainSignalHandlers(storeSignal);
367 runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
369 const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
370 const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
372 B2DEBUG(30,
"Will now start main loop...");
375 m_processMonitor.checkMulticast();
377 m_processMonitor.checkChildProcesses();
379 m_processMonitor.checkSignals(g_signalReceived);
382 if (m_processMonitor.hasEnded()) {
387 const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
388 if (neededWorkers > 0) {
389 B2DEBUG(30,
"restartFailedWorkers = " << restartFailedWorkers);
390 if (restartFailedWorkers) {
391 B2DEBUG(30,
".... Restarting a new worker");
392 B2ERROR(
".... Restarting a new worker process");
393 runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
394 }
else if (failOnFailedWorkers) {
395 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
397 }
else if (not m_processMonitor.hasWorkers()) {
398 B2WARNING(
"All workers have died and you did not request to restart them. Going down now.");
404 B2DEBUG(30,
"Finished the monitoring process");
407 void ZMQEventProcessor::forkAndRun(
long maxEvent,
const PathPtr& inputPath,
const PathPtr& mainPath,
const PathPtr& outputPath,
410 const int numProcesses = Environment::Instance().getNumberProcesses();
411 GlobalProcHandler::initialize(numProcesses);
413 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
415 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
416 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
421 installMainSignalHandlers(cleanupAndRaiseSignal);
422 m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
424 runInput(inputPath, terminateGlobally, maxEvent);
425 runOutput(outputPath, terminateGlobally, maxEvent);
426 runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
429 void ZMQEventProcessor::cleanup()
431 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
432 B2DEBUG(30,
"Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
435 m_processMonitor.killProcesses(5000);
436 m_processMonitor.terminate();
441 void ZMQEventProcessor::processCore(
const PathPtr& startPath,
const ModulePtrList& modulePathList,
long maxEvent,
442 bool isInputProcess,
bool isWorkerProcess,
bool isOutputProcess)
446 DataStore::Instance().setInitializeActive(
false);
447 m_moduleList = modulePathList;
450 m_previousEventMetaData.setEndOfData();
452 const bool collectStats = !Environment::Instance().getNoStats();
456 bool endProcess =
false;
457 while (!endProcess) {
459 m_processStatisticsPtr->startGlobal();
465 if (isInputProcess) {
466 endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
467 }
else if (isWorkerProcess) {
468 endProcess = ZMQEventProcessor::processEvent(moduleIter,
false,
469 isWorkerProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
470 }
else if (isOutputProcess) {
471 endProcess = ZMQEventProcessor::processEvent(moduleIter,
false,
false,
472 isOutputProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
474 B2INFO(
"processCore : should not come here. Specified path is invalid");
482 DataStore::Instance().invalidateData(DataStore::c_Event);
485 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess =
true;
487 m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_Event);
496 m_eventMetaDataPtr.create();
497 B2INFO(
"processCore : End Last Run. calling processEndRun()");
502 bool ZMQEventProcessor::processEvent(
PathIterator moduleIter,
bool skipMasterModule,
bool WorkerPath,
bool OutputPath)
504 double time = Utils::getClock() / Unit::s;
505 if (time > m_lastMetadataUpdate + m_metadataUpdateInterval) {
506 MetadataService::Instance().addBasf2Status(
"running event loop");
507 m_lastMetadataUpdate = time;
510 const bool collectStats = !Environment::Instance().getNoStats();
512 while (!moduleIter.
isDone()) {
517 if (module != m_master) {
521 }
else if (!skipMasterModule) {
526 B2INFO(
"Skipping execution of module " << module->getName());
528 if (!m_eventMetaDataPtr) {
534 if (m_eventMetaDataPtr->isEndOfData()) {
536 B2INFO(
"isEndOfData. Return");
541 if (module == m_master && !skipMasterModule) {
544 RandomNumbers::initializeEvent();
548 B2INFO(
"Worker Path and First Event!");
549 if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaDataPtr->getExperiment(), m_eventMetaDataPtr->getRun())) {
552 B2INFO(
"Worker path processing for ZMQDAQ first event.....Skip to the end of path");
553 B2INFO(
" --> exp = " << m_eventMetaDataPtr->getExperiment() <<
" run = " << m_eventMetaDataPtr->getRun());
555 module = moduleIter.
get();
557 if (module->getName() ==
"ZMQTxWorker")
break;
566 if (!WorkerPath && !OutputPath) {
567 if (m_eventMetaDataPtr->isEndOfRun()) {
568 B2INFO(
"===> EndOfRun : calling processEndRun(); isEndOfRun = " << m_eventMetaDataPtr->isEndOfRun());
571 m_previousEventMetaData = *m_eventMetaDataPtr;
574 }
else if (m_previousEventMetaData.isEndOfData() || m_previousEventMetaData.isEndOfRun()) {
575 if (m_eventMetaDataPtr->getRun() == m_previousEventMetaData.getRun()) {
576 B2INFO(
"===> EndOfData : ----> Run change request to the same run!!! Skip this event.");
579 B2INFO(
"===> EndOfData : calling processBeginRun(); isEndOfData = " << m_previousEventMetaData.isEndOfData() <<
580 " isEndOfRun = " << m_previousEventMetaData.isEndOfRun());
581 B2INFO(
"--> cur run = " << m_eventMetaDataPtr->getRun() <<
" <- prev run = " << m_previousEventMetaData.getRun());
582 B2INFO(
"--> cur evt = " << m_eventMetaDataPtr->getEvent() <<
" <- prev evt = " << m_previousEventMetaData.getEvent());
584 if (m_eventMetaDataPtr->getRun() != 0) {
586 m_previousEventMetaData = *m_eventMetaDataPtr;
592 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
593 (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
594 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
595 and not m_previousEventMetaData.isEndOfRun();
597 if (runChangedWithoutNotice) {
599 m_processStatisticsPtr->suspendGlobal();
601 B2INFO(
"===> Run Change (possibly offline) : calling processEndRun() and processBeginRun()");
602 B2INFO(
"--> cur run = " << m_eventMetaDataPtr->getRun() <<
" <- prev run = " << m_previousEventMetaData.getRun());
603 B2INFO(
"--> cur evt = " << m_eventMetaDataPtr->getEvent() <<
" <- prev evt = " << m_previousEventMetaData.getEvent());
604 B2INFO(
"--> runChanged = " << runChanged <<
" runChangedWithoutNotice = " << runChangedWithoutNotice);
610 m_processStatisticsPtr->resumeGlobal();
612 m_previousEventMetaData = *m_eventMetaDataPtr;
614 B2INFO(
"Skipping begin/end run processing");
617 RandomNumbers::useEventDependent();
619 DBStore::Instance().updateEvent();
621 }
else if (!WorkerPath && !OutputPath) {
625 if (!skipMasterModule && m_eventMetaDataPtr &&
626 (*m_eventMetaDataPtr != m_previousEventMetaData)) {
627 B2FATAL(
"Two modules setting EventMetaData were discovered: " << m_master->getName() <<
" and " << module->getName());
631 if (g_signalReceived != 0) {
636 if (module->evalCondition()) {
637 PathPtr condPath = module->getConditionPath();
639 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
651 void ZMQEventProcessor::processBeginRun(
bool skipDB)
653 MetadataService::Instance().addBasf2Status(
"beginning run");
658 LogSystem& logSystem = LogSystem::Instance();
659 m_processStatisticsPtr->startGlobal();
661 if (!skipDB) DBStore::Instance().update();
664 RandomNumbers::initializeBeginRun();
666 for (
const ModulePtr& modPtr : m_moduleList) {
667 Module* module = modPtr.get();
670 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
673 m_processStatisticsPtr->startModule();
676 m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_BeginRun);
682 m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_BeginRun);
686 void ZMQEventProcessor::processEndRun()
688 MetadataService::Instance().addBasf2Status(
"ending run");
694 LogSystem& logSystem = LogSystem::Instance();
695 m_processStatisticsPtr->startGlobal();
701 RandomNumbers::initializeEndRun();
703 for (
const ModulePtr& modPtr : m_moduleList) {
704 Module* module = modPtr.get();
707 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
710 m_processStatisticsPtr->startModule();
713 m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_EndRun);
718 *m_eventMetaDataPtr = newEventMetaData;
720 m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_EndRun);
Exception thrown when execution is stopped by a signal.
Class for logging debug, info and error messages.
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
Helper class for data store serialization.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
This class provides the core event processing loop for parallel processing with ZMQ.
void cleanup()
clean up IPC resources (should only be called in one process).
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
Abstract base class for different kinds of events.