9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/GlobalProcHandler.h>
11 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/PathUtils.h>
16 #include <framework/pcore/ZMQEventProcessor.h>
17 #include <framework/pcore/DataStoreStreamer.h>
18 #include <framework/pcore/RbTuple.h>
20 #include <framework/core/Environment.h>
21 #include <framework/logging/LogSystem.h>
23 #include <framework/database/DBStore.h>
24 #include <framework/database/Database.h>
25 #include <framework/core/RandomNumbers.h>
26 #include <framework/core/MetadataService.h>
27 #include <framework/gearbox/Unit.h>
28 #include <framework/utilities/Utils.h>
48 static int g_signalReceived = 0;
53 static void cleanupAndRaiseSignal(
int signalNumber)
55 if (g_eventProcessorForSignalHandling) {
56 g_eventProcessorForSignalHandling->
cleanup();
59 signal(signalNumber, SIG_DFL);
63 static void storeSignal(
int signalNumber)
65 if (signalNumber == SIGINT) {
66 EventProcessor::writeToStdErr(
"\nStopping basf2 gracefully...\n");
70 if (g_signalReceived == 0) {
71 g_signalReceived = signalNumber;
76 std::string g_socketAddress =
"";
78 void deleteSocketFiles()
80 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
85 const auto seperatorPos = g_socketAddress.find(
"://");
87 if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
91 const std::string filename(g_socketAddress.substr(seperatorPos + 3));
94 for (
const auto socketAdressType : socketAddressList) {
95 const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
96 if (stat(socketAddress.c_str(), &buffer) == 0) {
97 remove(socketAddress.c_str());
103 ZMQEventProcessor::ZMQEventProcessor()
105 B2ASSERT(
"You are having two instances of the ZMQEventProcessor running! This is not possible",
106 not g_eventProcessorForSignalHandling);
107 g_eventProcessorForSignalHandling =
this;
110 g_socketAddress = Environment::Instance().getZMQSocketAddress();
111 std::atexit(deleteSocketFiles);
114 ZMQEventProcessor::~ZMQEventProcessor()
117 g_eventProcessorForSignalHandling =
nullptr;
120 void ZMQEventProcessor::process(
const PathPtr& path,
long maxEvent)
128 if (path->isEmpty()) {
132 const int numProcesses = Environment::Instance().getNumberProcesses();
133 if (numProcesses == 0) {
134 B2FATAL(
"ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
138 PathPtr inputPath, mainPath, outputPath;
139 std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
140 const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath, mainPath, outputPath);
143 for (
const ModulePtr& module : inputPath->getModules()) {
144 if (module->getName() ==
"HLTZMQ2Ds") {
145 Environment::Instance().setZMQDAQEnvironment(
true);
146 B2INFO(
"ZMQEventProcessor : DAQ environment set");
151 if (not mainPath or mainPath->isEmpty()) {
152 B2WARNING(
"Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
153 EventProcessor::process(path, maxEvent);
158 const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
161 initialize(moduleList, histogramManager);
166 const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
167 forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
169 installMainSignalHandlers(cleanupAndRaiseSignal);
171 terminateAndCleanup(histogramManager);
176 if (histogramManager) {
177 histogramManager->initialize();
180 processInitialize(moduleList,
true);
182 B2INFO(
"ZMQEventProcessor : processInitialize done");
186 B2ERROR(
"There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
190 int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
191 if (numLogError != 0) {
192 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
203 gROOT->GetListOfFiles()->Clear(
"nodelete");
206 void ZMQEventProcessor::terminateAndCleanup(
const ModulePtr& histogramManager)
210 if (histogramManager) {
211 B2INFO(
"HistoManager:: adding histogram files");
212 RbTupleManager::Instance().hadd();
216 if (g_signalReceived) {
217 if (g_signalReceived == SIGINT) {
218 B2RESULT(
"Processing aborted via signal " << g_signalReceived <<
219 ", terminating. Output files have been closed safely and should be readable.");
221 B2ERROR(
"Processing aborted via signal " << g_signalReceived <<
222 ", terminating. Output files have been closed safely and should be readable.");
225 installSignalHandler(g_signalReceived, SIG_DFL);
226 raise(g_signalReceived);
230 void ZMQEventProcessor::runInput(
const PathPtr& inputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
232 if (not inputPath or inputPath->isEmpty()) {
236 if (not GlobalProcHandler::startInputProcess()) {
238 DataStore::Instance().invalidateData(DataStore::c_Event);
243 installMainSignalHandlers(SIG_IGN);
245 m_processMonitor.reset();
246 DataStoreStreamer::removeSideEffects();
248 processPath(inputPath, terminateGlobally, maxEvent);
249 B2DEBUG(30,
"Finished an input process");
253 void ZMQEventProcessor::runOutput(
const PathPtr& outputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
255 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
256 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
257 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
259 if (not outputPath or outputPath->isEmpty()) {
263 if (not GlobalProcHandler::startOutputProcess()) {
268 installMainSignalHandlers(SIG_IGN);
270 m_processMonitor.reset();
273 m_master = outputPath->getModules().begin()->get();
275 processPath(outputPath, terminateGlobally, maxEvent);
283 zmqClient.
initialize(pubSocketAddress, subSocketAddress);
286 const auto& evtMessage = streamer.
stream();
287 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
288 zmqClient.
publish(std::move(message));
290 B2DEBUG(30,
"Finished an output process");
293 void ZMQEventProcessor::runWorker(
unsigned int numProcesses,
const PathPtr& inputPath,
const PathPtr& mainPath,
296 if (numProcesses == 0) {
300 if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
304 m_processMonitor.waitForRunningWorker(Environment::Instance().getZMQMaximalWaitingTime());
309 installMainSignalHandlers(SIG_IGN);
311 if (inputPath and not inputPath->isEmpty()) {
313 m_master = mainPath->getModules().begin()->get();
316 m_processMonitor.reset();
317 DataStoreStreamer::removeSideEffects();
319 processPath(mainPath, terminateGlobally, maxEvent);
320 B2DEBUG(30,
"Finished a worker process");
324 void ZMQEventProcessor::processPath(
const PathPtr& localPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
326 ModulePtrList localModules = localPath->buildModulePathList();
327 maxEvent = getMaximumEventNumber(maxEvent);
330 processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input),
331 GlobalProcHandler::isProcess(ProcType::c_Worker),
332 GlobalProcHandler::isProcess(ProcType::c_Output));
334 B2DEBUG(30,
"terminate process...");
335 PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
336 processTerminate(localModules);
343 if (not GlobalProcHandler::startMonitoringProcess()) {
347 const auto& environment = Environment::Instance();
349 B2DEBUG(30,
"Will now start process monitor...");
350 const int numProcesses = environment.getNumberProcesses();
351 m_processMonitor.initialize(numProcesses);
354 m_processMonitor.waitForRunningInput(60 * 1000);
355 if (m_processMonitor.hasEnded()) {
359 m_processMonitor.waitForRunningOutput(60 * 1000);
360 if (m_processMonitor.hasEnded()) {
364 installMainSignalHandlers(storeSignal);
367 runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
369 const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
370 const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
372 B2DEBUG(30,
"Will now start main loop...");
375 m_processMonitor.checkMulticast();
377 m_processMonitor.checkChildProcesses();
379 m_processMonitor.checkSignals(g_signalReceived);
382 if (m_processMonitor.hasEnded()) {
387 const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
388 if (neededWorkers > 0) {
389 B2DEBUG(30,
"restartFailedWorkers = " << restartFailedWorkers);
390 if (restartFailedWorkers) {
391 B2DEBUG(30,
".... Restarting a new worker");
392 B2ERROR(
".... Restarting a new worker process");
393 runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
394 }
else if (failOnFailedWorkers) {
395 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
397 }
else if (not m_processMonitor.hasWorkers()) {
398 B2WARNING(
"All workers have died and you did not request to restart them. Going down now.");
404 B2DEBUG(30,
"Finished the monitoring process");
407 void ZMQEventProcessor::forkAndRun(
long maxEvent,
const PathPtr& inputPath,
const PathPtr& mainPath,
const PathPtr& outputPath,
410 const int numProcesses = Environment::Instance().getNumberProcesses();
411 GlobalProcHandler::initialize(numProcesses);
413 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
415 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
416 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
421 installMainSignalHandlers(cleanupAndRaiseSignal);
422 m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
424 runInput(inputPath, terminateGlobally, maxEvent);
425 runOutput(outputPath, terminateGlobally, maxEvent);
426 runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
429 void ZMQEventProcessor::cleanup()
431 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
432 B2DEBUG(30,
"Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
435 m_processMonitor.killProcesses(5000);
436 m_processMonitor.terminate();
441 void ZMQEventProcessor::processCore(
const PathPtr& startPath,
const ModulePtrList& modulePathList,
long maxEvent,
442 bool isInputProcess,
bool isWorkerProcess,
bool isOutputProcess)
446 DataStore::Instance().setInitializeActive(
false);
447 m_moduleList = modulePathList;
450 m_previousEventMetaData.setEndOfData();
452 const bool collectStats = !Environment::Instance().getNoStats();
456 bool endProcess =
false;
457 while (!endProcess) {
459 m_processStatisticsPtr->startGlobal();
465 if (isInputProcess) {
466 endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
467 }
else if (isWorkerProcess) {
468 endProcess = ZMQEventProcessor::processEvent(moduleIter,
false,
469 isWorkerProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
470 }
else if (isOutputProcess) {
471 endProcess = ZMQEventProcessor::processEvent(moduleIter,
false,
false,
472 isOutputProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
474 B2INFO(
"processCore : should not come here. Specified path is invalid");
482 DataStore::Instance().invalidateData(DataStore::c_Event);
485 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess =
true;
487 m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_Event);
496 m_eventMetaDataPtr.create();
497 B2INFO(
"processCore : End Last Run. calling processEndRun()");
502 bool ZMQEventProcessor::processEvent(
PathIterator moduleIter,
bool skipMasterModule,
bool WorkerPath,
bool OutputPath)
504 double time = Utils::getClock() / Unit::s;
505 if (time > m_lastMetadataUpdate + m_metadataUpdateInterval) {
506 MetadataService::Instance().addBasf2Status(
"running event loop");
507 m_lastMetadataUpdate = time;
510 const bool collectStats = !Environment::Instance().getNoStats();
512 while (!moduleIter.
isDone()) {
517 if (module != m_master) {
521 }
else if (!skipMasterModule) {
526 B2INFO(
"Skipping execution of module " << module->getName());
528 if (!m_eventMetaDataPtr) {
534 if (m_eventMetaDataPtr->isEndOfData()) {
536 B2INFO(
"isEndOfData. Return");
541 if (module == m_master && !skipMasterModule) {
544 RandomNumbers::initializeEvent();
548 B2INFO(
"Worker Path and First Event!");
549 if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaDataPtr->getExperiment(), m_eventMetaDataPtr->getRun())) {
552 B2INFO(
"Worker path processing for ZMQDAQ first event.....Skip to the end of path");
553 B2INFO(
" --> exp = " << m_eventMetaDataPtr->getExperiment() <<
" run = " << m_eventMetaDataPtr->getRun());
555 module = moduleIter.
get();
557 if (module->getName() ==
"ZMQTxWorker")
break;
566 if (!WorkerPath && !OutputPath) {
567 if (m_eventMetaDataPtr->isEndOfRun()) {
568 B2INFO(
"===> EndOfRun : calling processEndRun(); isEndOfRun = " << m_eventMetaDataPtr->isEndOfRun());
571 m_previousEventMetaData = *m_eventMetaDataPtr;
574 }
else if (m_previousEventMetaData.isEndOfData() || m_previousEventMetaData.isEndOfRun()) {
575 if (m_eventMetaDataPtr->getRun() == m_previousEventMetaData.getRun()) {
576 B2INFO(
"===> EndOfData : ----> Run change request to the same run!!! Skip this event.");
579 B2INFO(
"===> EndOfData : calling processBeginRun(); isEndOfData = " << m_previousEventMetaData.isEndOfData() <<
580 " isEndOfRun = " << m_previousEventMetaData.isEndOfRun());
581 B2INFO(
"--> cur run = " << m_eventMetaDataPtr->getRun() <<
" <- prev run = " << m_previousEventMetaData.getRun());
582 B2INFO(
"--> cur evt = " << m_eventMetaDataPtr->getEvent() <<
" <- prev evt = " << m_previousEventMetaData.getEvent());
584 m_previousEventMetaData = *m_eventMetaDataPtr;
589 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
590 (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
591 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
592 and not m_previousEventMetaData.isEndOfRun();
594 if (runChangedWithoutNotice) {
596 m_processStatisticsPtr->suspendGlobal();
598 B2INFO(
"===> Run Change (possibly offline) : calling processEndRun() and processBeginRun()");
599 B2INFO(
"--> cur run = " << m_eventMetaDataPtr->getRun() <<
" <- prev run = " << m_previousEventMetaData.getRun());
600 B2INFO(
"--> cur evt = " << m_eventMetaDataPtr->getEvent() <<
" <- prev evt = " << m_previousEventMetaData.getEvent());
601 B2INFO(
"--> runChanged = " << runChanged <<
" runChangedWithoutNotice = " << runChangedWithoutNotice);
607 m_processStatisticsPtr->resumeGlobal();
610 m_previousEventMetaData = *m_eventMetaDataPtr;
612 B2INFO(
"Skipping begin/end run processing");
615 RandomNumbers::useEventDependent();
617 DBStore::Instance().updateEvent();
619 }
else if (!WorkerPath && !OutputPath) {
623 if (!skipMasterModule && m_eventMetaDataPtr &&
624 (*m_eventMetaDataPtr != m_previousEventMetaData)) {
625 B2FATAL(
"Two modules setting EventMetaData were discovered: " << m_master->getName() <<
" and " << module->getName());
629 if (g_signalReceived != 0) {
634 if (module->evalCondition()) {
635 PathPtr condPath = module->getConditionPath();
637 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
649 void ZMQEventProcessor::processBeginRun(
bool skipDB)
651 MetadataService::Instance().addBasf2Status(
"beginning run");
656 LogSystem& logSystem = LogSystem::Instance();
657 m_processStatisticsPtr->startGlobal();
659 if (!skipDB) DBStore::Instance().update();
662 RandomNumbers::initializeBeginRun();
664 for (
const ModulePtr& modPtr : m_moduleList) {
665 Module* module = modPtr.get();
668 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
671 m_processStatisticsPtr->startModule();
674 m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_BeginRun);
680 m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_BeginRun);
684 void ZMQEventProcessor::processEndRun()
686 MetadataService::Instance().addBasf2Status(
"ending run");
692 LogSystem& logSystem = LogSystem::Instance();
693 m_processStatisticsPtr->startGlobal();
699 RandomNumbers::initializeEndRun();
701 for (
const ModulePtr& modPtr : m_moduleList) {
702 Module* module = modPtr.get();
705 logSystem.
updateModule(&(module->getLogConfig()), module->getName());
708 m_processStatisticsPtr->startModule();
711 m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_EndRun);
716 *m_eventMetaDataPtr = newEventMetaData;
718 m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_EndRun);
Exception thrown when execution is stopped by a signal.
Class for logging debug, info and error messages.
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
Helper class for data store serialization.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
This class provides the core event processing loop for parallel processing with ZMQ.
void cleanup()
clean up IPC resources (should only be called in one process).
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
Abstract base class for different kinds of events.