9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/GlobalProcHandler.h>
11 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/PathUtils.h>
16 #include <framework/pcore/ZMQEventProcessor.h>
17 #include <framework/pcore/DataStoreStreamer.h>
18 #include <framework/pcore/RbTuple.h>
20 #include <framework/core/Environment.h>
21 #include <framework/logging/LogSystem.h>
23 #include <framework/database/DBStore.h>
24 #include <framework/database/Database.h>
25 #include <framework/core/RandomNumbers.h>
26 #include <framework/core/MetadataService.h>
27 #include <framework/gearbox/Unit.h>
28 #include <framework/utilities/Utils.h>
48 static int g_signalReceived = 0;
53 static void cleanupAndRaiseSignal(
int signalNumber)
55 if (g_eventProcessorForSignalHandling) {
56 g_eventProcessorForSignalHandling->
cleanup();
59 signal(signalNumber, SIG_DFL);
63 static void storeSignal(
int signalNumber)
65 if (signalNumber == SIGINT) {
66 EventProcessor::writeToStdErr(
"\nStopping basf2 gracefully...\n");
70 if (g_signalReceived == 0) {
71 g_signalReceived = signalNumber;
76 std::string g_socketAddress =
"";
78 void deleteSocketFiles()
80 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
85 const auto seperatorPos = g_socketAddress.find(
"://");
87 if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
91 const std::string filename(g_socketAddress.substr(seperatorPos + 3));
94 for (
const auto socketAdressType : socketAddressList) {
95 const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
96 if (stat(socketAddress.c_str(), &buffer) == 0) {
97 remove(socketAddress.c_str());
103 ZMQEventProcessor::ZMQEventProcessor()
105 B2ASSERT(
"You are having two instances of the ZMQEventProcessor running! This is not possible",
106 not g_eventProcessorForSignalHandling);
107 g_eventProcessorForSignalHandling =
this;
110 g_socketAddress = Environment::Instance().getZMQSocketAddress();
111 std::atexit(deleteSocketFiles);
114 ZMQEventProcessor::~ZMQEventProcessor()
117 g_eventProcessorForSignalHandling =
nullptr;
120 void ZMQEventProcessor::process(
const PathPtr& path,
long maxEvent)
128 if (path->isEmpty()) {
132 const int numProcesses = Environment::Instance().getNumberProcesses();
133 if (numProcesses == 0) {
134 B2FATAL(
"ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
138 PathPtr inputPath, mainPath, outputPath;
139 std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
140 const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath, mainPath, outputPath);
143 for (
const ModulePtr& module : inputPath->getModules()) {
144 if (module->getName() ==
"HLTZMQ2Ds") {
145 Environment::Instance().setZMQDAQEnvironment(
true);
146 B2INFO(
"ZMQEventProcessor : DAQ environment set");
151 if (not mainPath or mainPath->isEmpty()) {
152 B2WARNING(
"Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
153 EventProcessor::process(path, maxEvent);
158 const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
161 initialize(moduleList, histogramManager);
166 const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
167 forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
169 installMainSignalHandlers(cleanupAndRaiseSignal);
171 terminateAndCleanup(histogramManager);
176 if (histogramManager) {
177 histogramManager->initialize();
180 processInitialize(moduleList,
true);
182 B2INFO(
"ZMQEventProcessor : processInitialize done");
186 B2ERROR(
"There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
190 int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
191 if (numLogError != 0) {
192 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
203 gROOT->GetListOfFiles()->Clear(
"nodelete");
206 void ZMQEventProcessor::terminateAndCleanup(
const ModulePtr& histogramManager)
210 if (histogramManager) {
211 B2INFO(
"HistoManager:: adding histogram files");
212 RbTupleManager::Instance().hadd();
216 if (g_signalReceived) {
217 if (g_signalReceived == SIGINT) {
218 B2RESULT(
"Processing aborted via signal " << g_signalReceived <<
219 ", terminating. Output files have been closed safely and should be readable.");
221 B2ERROR(
"Processing aborted via signal " << g_signalReceived <<
222 ", terminating. Output files have been closed safely and should be readable.");
225 installSignalHandler(g_signalReceived, SIG_DFL);
226 raise(g_signalReceived);
230 void ZMQEventProcessor::runInput(
const PathPtr& inputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
232 if (not inputPath or inputPath->isEmpty()) {
236 if (not GlobalProcHandler::startInputProcess()) {
238 DataStore::Instance().invalidateData(DataStore::c_Event);
243 installMainSignalHandlers(SIG_IGN);
245 m_processMonitor.reset();
246 DataStoreStreamer::removeSideEffects();
248 processPath(inputPath, terminateGlobally, maxEvent);
249 B2DEBUG(30,
"Finished an input process");
253 void ZMQEventProcessor::runOutput(
const PathPtr& outputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
255 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
256 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
257 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
259 if (not outputPath or outputPath->isEmpty()) {
263 if (not GlobalProcHandler::startOutputProcess()) {
268 installMainSignalHandlers(SIG_IGN);
270 m_processMonitor.reset();
273 m_master = outputPath->getModules().begin()->get();
275 processPath(outputPath, terminateGlobally, maxEvent);
283 zmqClient.
initialize(pubSocketAddress, subSocketAddress);
286 const auto& evtMessage = streamer.
stream();
287 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
288 zmqClient.
publish(std::move(message));
290 B2DEBUG(30,
"Finished an output process");
293 void ZMQEventProcessor::runWorker(
unsigned int numProcesses,
const PathPtr& inputPath,
const PathPtr& mainPath,
296 if (numProcesses == 0) {
300 if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
304 m_processMonitor.waitForRunningWorker(Environment::Instance().getZMQMaximalWaitingTime());
309 installMainSignalHandlers(SIG_IGN);
311 if (inputPath and not inputPath->isEmpty()) {
313 m_master = mainPath->getModules().begin()->get();
316 m_processMonitor.reset();
317 DataStoreStreamer::removeSideEffects();
319 processPath(mainPath, terminateGlobally, maxEvent);
320 B2DEBUG(30,
"Finished a worker process");
324 void ZMQEventProcessor::processPath(
const PathPtr& localPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
326 ModulePtrList localModules = localPath->buildModulePathList();
327 maxEvent = getMaximumEventNumber(maxEvent);
330 processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input),
331 GlobalProcHandler::isProcess(ProcType::c_Worker),
332 GlobalProcHandler::isProcess(ProcType::c_Output));
334 B2DEBUG(30,
"terminate process...");
335 PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
336 processTerminate(localModules);
343 if (not GlobalProcHandler::startMonitoringProcess()) {
347 const auto& environment = Environment::Instance();
349 B2DEBUG(30,
"Will now start process monitor...");
350 const int numProcesses = environment.getNumberProcesses();
351 m_processMonitor.initialize(numProcesses);
354 m_processMonitor.waitForRunningInput(60 * 1000);
355 if (m_processMonitor.hasEnded()) {
359 m_processMonitor.waitForRunningOutput(60 * 1000);
360 if (m_processMonitor.hasEnded()) {
364 installMainSignalHandlers(storeSignal);
367 runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
369 const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
370 const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
372 B2DEBUG(30,
"Will now start main loop...");
375 m_processMonitor.checkMulticast();
377 m_processMonitor.checkChildProcesses();
379 m_processMonitor.checkSignals(g_signalReceived);
382 if (m_processMonitor.hasEnded()) {
387 const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
388 if (neededWorkers > 0) {
389 if (restartFailedWorkers) {
390 runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
391 }
else if (failOnFailedWorkers) {
392 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
394 }
else if (not m_processMonitor.hasWorkers()) {
395 B2WARNING(
"All workers have died and you did not request to restart them. Going down now.");
401 B2DEBUG(30,
"Finished the monitoring process");
404 void ZMQEventProcessor::forkAndRun(
long maxEvent,
const PathPtr& inputPath,
const PathPtr& mainPath,
const PathPtr& outputPath,
407 const int numProcesses = Environment::Instance().getNumberProcesses();
408 GlobalProcHandler::initialize(numProcesses);
410 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
412 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
413 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
418 installMainSignalHandlers(cleanupAndRaiseSignal);
419 m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
421 runInput(inputPath, terminateGlobally, maxEvent);
422 runOutput(outputPath, terminateGlobally, maxEvent);
423 runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
426 void ZMQEventProcessor::cleanup()
428 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
429 B2DEBUG(30,
"Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
432 m_processMonitor.killProcesses(5000);
433 m_processMonitor.terminate();
438 void ZMQEventProcessor::processCore(
const PathPtr& startPath,
const ModulePtrList& modulePathList,
long maxEvent,
439 bool isInputProcess,
bool isWorkerProcess,
bool isOutputProcess)
443 DataStore::Instance().setInitializeActive(
false);
444 m_moduleList = modulePathList;
447 m_previousEventMetaData.setEndOfData();
449 const bool collectStats = !Environment::Instance().getNoStats();
453 bool endProcess =
false;
454 while (!endProcess) {
456 m_processStatisticsPtr->startGlobal();
462 if (isInputProcess) {
463 endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
464 }
else if (isWorkerProcess) {
465 endProcess = ZMQEventProcessor::processEvent(moduleIter,
false,
466 isWorkerProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
467 }
else if (isOutputProcess) {
468 endProcess = ZMQEventProcessor::processEvent(moduleIter,
false,
false,
469 isOutputProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
471 B2INFO(
"processCore : should not come here. Specified path is invalid");
479 DataStore::Instance().invalidateData(DataStore::c_Event);
482 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess =
true;
484 m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_Event);
493 m_eventMetaDataPtr.create();
494 B2INFO(
"processCore : End Last Run. calling processEndRun()");
499 bool ZMQEventProcessor::processEvent(
PathIterator moduleIter,
bool skipMasterModule,
bool WorkerPath,
bool OutputPath)
501 double time = Utils::getClock() / Unit::s;
502 if (time > m_lastMetadataUpdate + m_metadataUpdateInterval) {
503 MetadataService::Instance().addBasf2Status(
"running event loop");
504 m_lastMetadataUpdate = time;
507 const bool collectStats = !Environment::Instance().getNoStats();
509 while (!moduleIter.
isDone()) {
514 if (module != m_master) {
518 }
else if (!skipMasterModule) {
523 B2INFO(
"Skipping execution of module " << module->getName());
525 if (!m_eventMetaDataPtr) {
531 if (m_eventMetaDataPtr->isEndOfData()) {
533 B2INFO(
"isEndOfData. Return");
538 if (module == m_master && !skipMasterModule) {
541 RandomNumbers::initializeEvent();
545 B2INFO(
"Worker Path and First Event!");
546 if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaDataPtr->getExperiment(), m_eventMetaDataPtr->getRun())) {
549 B2INFO(
"Worker path processing for ZMQDAQ first event.....Skip to the end of path");
550 B2INFO(
" --> exp = " << m_eventMetaDataPtr->getExperiment() <<
" run = " << m_eventMetaDataPtr->getRun());
552 module = moduleIter.
get();
554 if (module->getName() ==
"ZMQTxWorker")
break;
563 if (!WorkerPath && !OutputPath) {
564 if (m_eventMetaDataPtr->isEndOfRun()) {
565 B2INFO(
"===> EndOfRun : calling processEndRun(); isEndOfRun = " << m_eventMetaDataPtr->isEndOfRun());
568 m_previousEventMetaData = *m_eventMetaDataPtr;
571 }
else if (m_previousEventMetaData.isEndOfData() || m_previousEventMetaData.isEndOfRun()) {
572 if (m_eventMetaDataPtr->getRun() == m_previousEventMetaData.getRun()) {
573 B2INFO(
"===> EndOfData : ----> Run change request to the same run!!! Skip this event.");
576 B2INFO(
"===> EndOfData : calling processBeginRun(); isEndOfData = " << m_previousEventMetaData.isEndOfData() <<
577 " isEndOfRun = " << m_previousEventMetaData.isEndOfRun());
578 B2INFO(
"--> cur run = " << m_eventMetaDataPtr->getRun() <<
" <- prev run = " << m_previousEventMetaData.getRun());
579 B2INFO(
"--> cur evt = " << m_eventMetaDataPtr->getEvent() <<
" <- prev evt = " << m_previousEventMetaData.getEvent());
581 m_previousEventMetaData = *m_eventMetaDataPtr;
586 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
587 (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
588 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
589 and not m_previousEventMetaData.isEndOfRun();
591 if (runChangedWithoutNotice) {
593 m_processStatisticsPtr->suspendGlobal();
595 B2INFO(
"===> Run Change (possibly offline) : calling processEndRun() and processBeginRun()");
596 B2INFO(
"--> cur run = " << m_eventMetaDataPtr->getRun() <<
" <- prev run = " << m_previousEventMetaData.getRun());
597 B2INFO(
"--> cur evt = " << m_eventMetaDataPtr->getEvent() <<
" <- prev evt = " << m_previousEventMetaData.getEvent());
598 B2INFO(
"--> runChanged = " << runChanged <<
" runChangedWithoutNotice = " << runChangedWithoutNotice);
604 m_processStatisticsPtr->resumeGlobal();
607 m_previousEventMetaData = *m_eventMetaDataPtr;
609 B2INFO(
"Skipping begin/end run processing");
612 RandomNumbers::useEventDependent();
614 DBStore::Instance().updateEvent();
616 }
else if (!WorkerPath && !OutputPath) {
620 if (!skipMasterModule && m_eventMetaDataPtr &&
621 (*m_eventMetaDataPtr != m_previousEventMetaData)) {
622 B2FATAL(
"Two modules setting EventMetaData were discovered: " << m_master->getName() <<
" and " << module->getName());
626 if (g_signalReceived != 0) {
631 if (module->evalCondition()) {
632 PathPtr condPath = module->getConditionPath();
634 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
646 void ZMQEventProcessor::processBeginRun(
bool skipDB)
648 MetadataService::Instance().addBasf2Status(
"beginning run");
653 LogSystem& logSystem = LogSystem::Instance();
654 m_processStatisticsPtr->startGlobal();
656 if (!skipDB) DBStore::Instance().update();
659 RandomNumbers::initializeBeginRun();
661 for (
const ModulePtr& modPtr : m_moduleList) {
662 Module* module = modPtr.get();
665 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
668 m_processStatisticsPtr->startModule();
671 m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_BeginRun);
677 m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_BeginRun);
681 void ZMQEventProcessor::processEndRun()
683 MetadataService::Instance().addBasf2Status(
"ending run");
689 LogSystem& logSystem = LogSystem::Instance();
690 m_processStatisticsPtr->startGlobal();
696 RandomNumbers::initializeEndRun();
698 for (
const ModulePtr& modPtr : m_moduleList) {
699 Module* module = modPtr.get();
702 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
705 m_processStatisticsPtr->startModule();
708 m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_EndRun);
713 *m_eventMetaDataPtr = newEventMetaData;
715 m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_EndRun);
Exception thrown when execution is stopped by a signal.
Class for logging debug, info and error messages.
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
Helper class for data store serialization.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
This class provides the core event processing loop for parallel processing with ZMQ.
void cleanup()
clean up IPC resources (should only be called in one process).
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
Abstract base class for different kinds of events.