Belle II Software release-09-00-00
ZMQEventProcessor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/ProcHelper.h>
10#include <framework/pcore/GlobalProcHandler.h>
11#include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12#include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14#include <framework/pcore/PathUtils.h>
15
16#include <framework/pcore/ZMQEventProcessor.h>
17#include <framework/pcore/DataStoreStreamer.h>
18#include <framework/pcore/RbTuple.h>
19
20#include <framework/core/Environment.h>
21#include <framework/logging/LogSystem.h>
22
23#include <framework/database/DBStore.h>
24#include <framework/database/Database.h>
25#include <framework/core/RandomNumbers.h>
26#include <framework/core/MetadataService.h>
27#include <framework/gearbox/Unit.h>
28#include <framework/utilities/Utils.h>
29
30#include <TROOT.h>
31
32#include <sys/stat.h>
33
34#include <csignal>
35#include <fstream>
36
37using namespace std;
38using namespace Belle2;
39
40namespace {
48 static int g_signalReceived = 0;
49
51 static ZMQEventProcessor* g_eventProcessorForSignalHandling = nullptr;
52
53 static void cleanupAndRaiseSignal(int signalNumber)
54 {
55 if (g_eventProcessorForSignalHandling) {
56 g_eventProcessorForSignalHandling->cleanup();
57 }
58 // uninstall current handler and call default one.
59 signal(signalNumber, SIG_DFL);
60 raise(signalNumber);
61 }
62
63 static void storeSignal(int signalNumber)
64 {
65 if (signalNumber == SIGINT) {
66 EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
67 }
68
69 // We do not want to remove the first signal
70 if (g_signalReceived == 0) {
71 g_signalReceived = signalNumber;
72 }
73 }
74
76 std::string g_socketAddress = "";
77
78 void deleteSocketFiles()
79 {
80 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
81 return;
82 }
83
84 const std::vector<ZMQAddressType> socketAddressList = {ZMQAddressType::c_input, ZMQAddressType::c_output, ZMQAddressType::c_pub, ZMQAddressType::c_sub, ZMQAddressType::c_control};
85 const auto seperatorPos = g_socketAddress.find("://");
86
87 if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
88 return;
89 }
90
91 const std::string filename(g_socketAddress.substr(seperatorPos + 3));
92
93 struct stat buffer;
94 for (const auto socketAdressType : socketAddressList) {
95 const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
96 if (stat(socketAddress.c_str(), &buffer) == 0) {
97 remove(socketAddress.c_str());
98 }
99 }
100 }
101} // namespace
102
104{
105 B2ASSERT("You are having two instances of the ZMQEventProcessor running! This is not possible",
106 not g_eventProcessorForSignalHandling);
107 g_eventProcessorForSignalHandling = this;
108
109 // Make sure to remove the sockets
110 g_socketAddress = Environment::Instance().getZMQSocketAddress();
111 std::atexit(deleteSocketFiles);
112}
113
115{
116 cleanup();
117 g_eventProcessorForSignalHandling = nullptr;
118}
119
120void ZMQEventProcessor::process(const PathPtr& path, long maxEvent)
121{
122 // Concerning signal handling:
123 // * During the initialization, we just raise the signal without doing any cleanup etc.
124 // * During the event execution, we will not allow for any signal in all processes except the parent process.
125 // Here, we catch sigint and clean up the processes AND WHAT DO WE DO IN THE OTHER CASES?
126 // * During cleanup, we will just ignore sigint, but the rest will be raised
127
128 if (path->isEmpty()) {
129 return;
130 }
131
132 const int numProcesses = Environment::Instance().getNumberProcesses();
133 if (numProcesses == 0) {
134 B2FATAL("ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
135 }
136
137 // Split the path into input, main and output. A nullptr means, the path should not be used
138 PathPtr inputPath, mainPath, outputPath;
139 std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
140 const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath);
141
142 // Check for existence of HLTZMQ2Ds module in input path to set DAQ environment
143 for (const ModulePtr& module : inputPath->getModules()) {
144 if (module->getName() == "HLTZMQ2Ds") {
146 B2INFO("ZMQEventProcessor : DAQ environment set");
147 break;
148 }
149 }
150
151 if (not mainPath or mainPath->isEmpty()) {
152 B2WARNING("Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
153 EventProcessor::process(path, maxEvent);
154 return;
155 }
156
157 // inserts Rx/Tx modules into path (sets up IPC structures)
158 const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
159
160 // Run the initialization of the modules and the histogram manager
161 initialize(moduleList, histogramManager);
162
163 // pause();
164
165 // The main part: fork into the different processes and run!
166 const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
167 forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
168
169 installMainSignalHandlers(cleanupAndRaiseSignal);
170 // Run the final termination and cleanup with error check
171 terminateAndCleanup(histogramManager);
172}
173
174void ZMQEventProcessor::initialize(const ModulePtrList& moduleList, const ModulePtr& histogramManager)
175{
176 if (histogramManager) {
177 histogramManager->initialize();
178 }
179 // from now on the datastore is available
180 processInitialize(moduleList, true);
181
182 B2INFO("ZMQEventProcessor : processInitialize done");
183
184 // Don't start processing in case of no master module
185 if (!m_master) {
186 B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
187 }
188
189 // Check if errors appeared. If yes, don't start the event processing.
191 if (numLogError != 0) {
192 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
193 }
194
195 // TODO: I do not really understand what is going on here...
201 // disable ROOT's management of TFiles
202 // clear list, but don't actually delete the objects
203 gROOT->GetListOfFiles()->Clear("nodelete");
204}
205
207{
208 cleanup();
209
210 if (histogramManager) {
211 B2INFO("HistoManager:: adding histogram files");
213 }
214
215 // did anything bad happen?
216 if (g_signalReceived) {
217 if (g_signalReceived == SIGINT) {
218 B2RESULT("Processing aborted via signal " << g_signalReceived <<
219 ", terminating. Output files have been closed safely and should be readable.");
220 } else {
221 B2ERROR("Processing aborted via signal " << g_signalReceived <<
222 ", terminating. Output files have been closed safely and should be readable.");
223 }
224 // re-raise the signal
225 installSignalHandler(g_signalReceived, SIG_DFL);
226 raise(g_signalReceived);
227 }
228}
229
230void ZMQEventProcessor::runInput(const PathPtr& inputPath, const ModulePtrList& terminateGlobally, long maxEvent)
231{
232 if (not inputPath or inputPath->isEmpty()) {
233 return;
234 }
235
237 // This is not the input process, clean up datastore to not contain the first event
239 return;
240 }
241
242 // The default will be to not do anything on signals...
244
247
248 processPath(inputPath, terminateGlobally, maxEvent);
249 B2DEBUG(30, "Finished an input process");
250 exit(0);
251}
252
253void ZMQEventProcessor::runOutput(const PathPtr& outputPath, const ModulePtrList& terminateGlobally, long maxEvent)
254{
255 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
256 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
257 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
258
259 if (not outputPath or outputPath->isEmpty()) {
260 return;
261 }
262
264 return;
265 }
266
267 // The default will be to not do anything on signals...
269
271
272 // Set the rx module as main module
273 m_master = outputPath->getModules().begin()->get();
274
275 processPath(outputPath, terminateGlobally, maxEvent);
276
277 // Send the statistics to the process monitor
278 StreamHelper streamer;
279 ZMQClient zmqClient;
280
281 // TODO: true?
282 streamer.initialize(0, true);
283 zmqClient.initialize(pubSocketAddress, subSocketAddress);
284
285 // TODO: make sure to only send statistics!
286 const auto& evtMessage = streamer.stream();
287 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
288 zmqClient.publish(std::move(message));
289
290 B2DEBUG(30, "Finished an output process");
291 exit(0);
292}
293void ZMQEventProcessor::runWorker(unsigned int numProcesses, const PathPtr& inputPath, const PathPtr& mainPath,
294 const ModulePtrList& terminateGlobally, long maxEvent)
295{
296 if (numProcesses == 0) {
297 return;
298 }
299
300 if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
301 // Make sure the worker process is running until we go on
302 // m_processMonitor.waitForRunningWorker(60);
303 // m_processMonitor.waitForRunningWorker(7200);
304 m_processMonitor.waitForRunningWorker(Environment::Instance().getZMQMaximalWaitingTime());
305 return;
306 }
307
308 // The default will be to not do anything on signals...
310
311 if (inputPath and not inputPath->isEmpty()) {
312 // set Rx as master
313 m_master = mainPath->getModules().begin()->get();
314 }
315
318
319 processPath(mainPath, terminateGlobally, maxEvent);
320 B2DEBUG(30, "Finished a worker process");
321 exit(0);
322}
323
324void ZMQEventProcessor::processPath(const PathPtr& localPath, const ModulePtrList& terminateGlobally, long maxEvent)
325{
326 ModulePtrList localModules = localPath->buildModulePathList();
327 maxEvent = getMaximumEventNumber(maxEvent);
328 // we are not using the default signal handler, so the processCore can not throw any exception because if sigint...
329 // processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input));
330 processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input),
333
334 B2DEBUG(30, "terminate process...");
335 PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
336 processTerminate(localModules);
337}
338
339
340void ZMQEventProcessor::runMonitoring(const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally,
341 long maxEvent)
342{
344 return;
345 }
346
347 const auto& environment = Environment::Instance();
348
349 B2DEBUG(30, "Will now start process monitor...");
350 const int numProcesses = environment.getNumberProcesses();
351 m_processMonitor.initialize(numProcesses);
352
353 // Make sure the input process is running until we go on
356 return;
357 }
358 // Make sure the output process is running until we go on
361 return;
362 }
363
364 installMainSignalHandlers(storeSignal);
365
366 // at least start the number of workers requested
367 runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
368
369 const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
370 const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
371
372 B2DEBUG(30, "Will now start main loop...");
373 while (true) {
374 // check multicast for messages and kill workers if requested
376 // check the child processes, if one has died
378 // check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
379 m_processMonitor.checkSignals(g_signalReceived);
380
381 // If we have received a SIGINT signal or the last process is gone, we can end smoothly
383 break;
384 }
385
386 // Test if we need more workers
387 const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
388 if (neededWorkers > 0) {
389 B2DEBUG(30, "restartFailedWorkers = " << restartFailedWorkers);
390 if (restartFailedWorkers) {
391 B2DEBUG(30, ".... Restarting a new worker");
392 B2ERROR(".... Restarting a new worker process");
393 runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
394 } else if (failOnFailedWorkers) {
395 B2ERROR("A worker failed. Will try to end the process smoothly now.");
396 break;
397 } else if (not m_processMonitor.hasWorkers()) {
398 B2WARNING("All workers have died and you did not request to restart them. Going down now.");
399 break;
400 }
401 }
402 }
403
404 B2DEBUG(30, "Finished the monitoring process");
405}
406
407void ZMQEventProcessor::forkAndRun(long maxEvent, const PathPtr& inputPath, const PathPtr& mainPath, const PathPtr& outputPath,
408 const ModulePtrList& terminateGlobally)
409{
410 const int numProcesses = Environment::Instance().getNumberProcesses();
411 GlobalProcHandler::initialize(numProcesses);
412
413 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
414
415 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
416 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
417 const auto controlSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_control));
418
419 // We catch all signals and store them into a variable. This is used during the main loop then.
420 // From now on, we have to make sure to clean up behind us
421 installMainSignalHandlers(cleanupAndRaiseSignal);
422 m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
423
424 runInput(inputPath, terminateGlobally, maxEvent);
425 runOutput(outputPath, terminateGlobally, maxEvent);
426 runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
427}
428
430{
432 B2DEBUG(30, "Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
433 return;
434 }
437
438 deleteSocketFiles();
439}
440
441void ZMQEventProcessor::processCore(const PathPtr& startPath, const ModulePtrList& modulePathList, long maxEvent,
442 bool isInputProcess, bool isWorkerProcess, bool isOutputProcess)
443{
444 // bool firstRound = true;
445
447 m_moduleList = modulePathList;
448
449 //Remember the previous event meta data, and identify end of data meta data
450 m_previousEventMetaData.setEndOfData(); //invalid start state
451
452 const bool collectStats = !Environment::Instance().getNoStats();
453
454 //Loop over the events
455 long currEvent = 0;
456 bool endProcess = false;
457 while (!endProcess) {
458 if (collectStats)
459 m_processStatisticsPtr->startGlobal();
460
461 // B2INFO ( "processCore:: currEvent = " << currEvent );
462
463 PathIterator moduleIter(startPath);
464
465 if (isInputProcess) {
466 endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
467 } else if (isWorkerProcess) {
468 endProcess = ZMQEventProcessor::processEvent(moduleIter, false,
469 isWorkerProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
470 } else if (isOutputProcess) {
471 endProcess = ZMQEventProcessor::processEvent(moduleIter, false, false,
472 isOutputProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
473 } else {
474 B2INFO("processCore : should not come here. Specified path is invalid");
475 return;
476 }
477
478 // Original code
479 // endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
480
481 //Delete event related data in DataStore
483
484 currEvent++;
485 if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
486 if (collectStats)
488
489 // firstRound = false;
490
491 // B2INFO ( "processCore :: event processed" );
492
493 } //end event loop
494
495 //End last run
496 m_eventMetaDataPtr.create();
497 B2INFO("processCore : End Last Run. calling processEndRun()");
499}
500
501
502bool ZMQEventProcessor::processEvent(PathIterator moduleIter, bool skipMasterModule, bool WorkerPath, bool OutputPath)
503{
504 double time = Utils::getClock() / Unit::s;
506 MetadataService::Instance().addBasf2Status("running event loop");
508 }
509
510 const bool collectStats = !Environment::Instance().getNoStats();
511
512 while (!moduleIter.isDone()) {
513 Module* module = moduleIter.get();
514 // B2INFO ("Starting event of " << module->getName() );
515
516 // run the module ... unless we don't want to
517 if (module != m_master) {
518 callEvent(module);
519 // B2INFO ( "not master. callEvent" );
520 // B2INFO ( "ZMQEventProcessor :: " <<module->getName() << " called. Not master" );
521 } else if (!skipMasterModule) {
522 callEvent(module);
523 // B2INFO ( "master but not skipModule. callEvent");
524 // B2INFO ( "ZMQEventProcessor :: " <<module->getName() << " called. Not skipMasterModule" );
525 } else
526 B2INFO("Skipping execution of module " << module->getName());
527
528 if (!m_eventMetaDataPtr) {
529 // B2INFO ( "No event metadata....." );
530 return false;
531 }
532
533 //Check for end of data
534 if (m_eventMetaDataPtr->isEndOfData()) {
535 // Immediately leave the loop and terminate (true)
536 B2INFO("isEndOfData. Return");
537 return true;
538 }
539
540 //Handle EventMetaData changes by master module
541 if (module == m_master && !skipMasterModule) {
542
543 //initialize random number state for the event
545
546 // Worker Path
547 if (WorkerPath) {
548 B2INFO("Worker Path and First Event!");
549 if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaDataPtr->getExperiment(), m_eventMetaDataPtr->getRun())) {
550 // if ( m_eventMetaDataPtr->getExperiment() == Environment::Instance().getZMQDAQFirstEventExp() &&
551 // m_eventMetaDataPtr->getRun() == Environment::Instance().getZMQDAQFirstEventRun() ) {
552 B2INFO("Worker path processing for ZMQDAQ first event.....Skip to the end of path");
553 B2INFO(" --> exp = " << m_eventMetaDataPtr->getExperiment() << " run = " << m_eventMetaDataPtr->getRun());
554 while (true) {
555 module = moduleIter.get();
556 // B2INFO ( "Module in the path = " << module->getName() );
557 if (module->getName() == "ZMQTxWorker") break;
558 moduleIter.next();
559 }
560 // B2INFO ( "ZMQTxWorker will be called" );
561 continue;
562 }
563 }
564
565 // Check for EndOfRun
566 if (!WorkerPath && !OutputPath) {
567 if (m_eventMetaDataPtr->isEndOfRun()) {
568 B2INFO("===> EndOfRun : calling processEndRun(); isEndOfRun = " << m_eventMetaDataPtr->isEndOfRun());
570 // Store the current event meta data for the next round
572 // Leave this event, but not the full processing (false)
573 return false;
576 B2INFO("===> EndOfData : ----> Run change request to the same run!!! Skip this event.");
577 return false;
578 }
579 B2INFO("===> EndOfData : calling processBeginRun(); isEndOfData = " << m_previousEventMetaData.isEndOfData() <<
580 " isEndOfRun = " << m_previousEventMetaData.isEndOfRun());
581 B2INFO("--> cur run = " << m_eventMetaDataPtr->getRun() << " <- prev run = " << m_previousEventMetaData.getRun());
582 B2INFO("--> cur evt = " << m_eventMetaDataPtr->getEvent() << " <- prev evt = " << m_previousEventMetaData.getEvent());
583 // The run number should not be 0
584 if (m_eventMetaDataPtr->getRun() != 0) {
587 } else {
588 return false;
589 }
590 }
591
592 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
594 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
596 // if (runChangedWithoutNotice && !g_first_round) {
597 if (runChangedWithoutNotice) {
598 if (collectStats)
599 m_processStatisticsPtr->suspendGlobal();
600
601 B2INFO("===> Run Change (possibly offline) : calling processEndRun() and processBeginRun()");
602 B2INFO("--> cur run = " << m_eventMetaDataPtr->getRun() << " <- prev run = " << m_previousEventMetaData.getRun());
603 B2INFO("--> cur evt = " << m_eventMetaDataPtr->getEvent() << " <- prev evt = " << m_previousEventMetaData.getEvent());
604 B2INFO("--> runChanged = " << runChanged << " runChangedWithoutNotice = " << runChangedWithoutNotice);
605
608
609 if (collectStats)
610 m_processStatisticsPtr->resumeGlobal();
611 }
613 } else
614 B2INFO("Skipping begin/end run processing");
615
616 //make sure we use the event dependent generator again
618
620
621 } else if (!WorkerPath && !OutputPath) {
622 //Check for a second master module. Cannot do this if we skipped the
623 //master module as the EventMetaData is probably set before we call this
624 //function
625 if (!skipMasterModule && m_eventMetaDataPtr &&
627 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and " << module->getName());
628 }
629 }
630
631 if (g_signalReceived != 0) {
632 throw StoppedBySignalException(g_signalReceived);
633 }
634
635 //Check for the module conditions, evaluate them and if one is true switch to the new path
636 if (module->evalCondition()) {
637 PathPtr condPath = module->getConditionPath();
638 //continue with parent Path after condition path is executed?
639 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
640 moduleIter = PathIterator(condPath, moduleIter);
641 } else {
642 moduleIter = PathIterator(condPath);
643 }
644 } else {
645 moduleIter.next();
646 }
647 } //end module loop
648 return false;
649}
650
652{
653 MetadataService::Instance().addBasf2Status("beginning run");
654
655 m_inRun = true;
656 // auto dbsession = Database::Instance().createScopedUpdateSession();
657
658 LogSystem& logSystem = LogSystem::Instance();
659 m_processStatisticsPtr->startGlobal();
660
661 if (!skipDB) DBStore::Instance().update();
662
663 //initialize random generator for end run
665
666 for (const ModulePtr& modPtr : m_moduleList) {
667 Module* module = modPtr.get();
668
669 //Set the module dependent log level
670 logSystem.updateModule(&(module->getLogConfig()), module->getName());
671
672 //Do beginRun() call
673 m_processStatisticsPtr->startModule();
674 // CALL_MODULE(module, beginRun);
675 module->beginRun();
677
678 //Set the global log level
679 logSystem.updateModule(nullptr);
680 }
681
683}
684
685
687{
689
690 if (!m_inRun)
691 return;
692 m_inRun = false;
693
694 LogSystem& logSystem = LogSystem::Instance();
695 m_processStatisticsPtr->startGlobal();
696
697 const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
698 // *m_eventMetaDataPtr = m_previousEventMetaData;
699
700 //initialize random generator for end run
702
703 for (const ModulePtr& modPtr : m_moduleList) {
704 Module* module = modPtr.get();
705
706 //Set the module dependent log level
707 logSystem.updateModule(&(module->getLogConfig()), module->getName());
708
709 //Do endRun() call
710 m_processStatisticsPtr->startModule();
711 // CALL_MODULE(module, endRun);
712 module->endRun();
714
715 //Set the global log level
716 logSystem.updateModule(nullptr);
717 }
718 *m_eventMetaDataPtr = newEventMetaData;
719
721}
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:94
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:715
const std::string & getZMQSocketAddress() const
Socket address to use in ZMQ.
Definition: Environment.h:260
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:157
bool getNoStats() const
Disable collection of statistics during event processing.
Definition: Environment.h:199
void setZMQDAQEnvironment(bool zmqDAQ)
Set DAQ environment.
Definition: Environment.h:351
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
void setEndOfData()
Marks the end of the data processing.
int getRun() const
Run Getter.
unsigned int getEvent() const
Event Getter.
bool isEndOfRun() const
is end-of-run set? (see setEndOfRun()).
int getExperiment() const
Experiment Getter.
bool isEndOfData() const
is end-of-data set? (see setEndOfData()).
Exception thrown when execution is stopped by a signal.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
double m_lastMetadataUpdate
Time in seconds of last call for metadata update in event loop.
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
const Module * m_master
The master module that determines the experiment/run/event number.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
ModulePtrList m_moduleList
List of all modules in order initialized.
long getMaximumEventNumber(long maxEvent) const
Calculate the maximum event number out of the argument from command line and the environment.
double m_metadataUpdateInterval
Minimal time difference in seconds for metadata updates in event loop.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
static bool startOutputProcess(bool local=false)
Fork and initialize an output process.
static bool startInputProcess()
Fork and initialize an input process.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
static bool startMonitoringProcess()
Fork and initialize a monitoring process.
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
static bool isProcess(ProcType procType)
Return true if the process is of type procType.
static std::string getProcessName()
Get a human readable name for this process. (input, event, output...).
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
void addBasf2Status(const std::string &message="")
Add metadata of basf2 status.
static MetadataService & Instance()
Static method to get a reference to the MetadataService instance.
@ c_EndRun
Counting time/calls in endRun()
@ c_BeginRun
Counting time/calls in beginRun()
@ c_Event
Counting time/calls in event()
Base class for Modules.
Definition: Module.h:72
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:187
std::list< ModulePtr > getModules() const override
no submodules, return empty list
Definition: Module.h:506
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
Definition: PathUtils.cc:197
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
Definition: PathUtils.cc:113
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
Definition: PathUtils.cc:16
static ModulePtr getHistogramManager(PathPtr &inputPath)
Find the histogram manager in the paths and return it.
Definition: PathUtils.cc:98
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
Definition: PathUtils.cc:207
void checkMulticast(int timeout=0)
check multicast for messages and kill workers if requested
void waitForRunningWorker(int timeout)
Block until either the worker process is running or the timeout (in seconds) is raised.
bool hasEnded() const
If we have received a SIGINT signal or the last process is gone, we can end smoothly.
void checkSignals(int g_signalReceived)
check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
void waitForRunningInput(int timeout)
Block until either the input process is running or the timeout (in seconds) is raised.
void checkChildProcesses()
check the child processes, if one has died
void terminate()
Terminate the processing.
void waitForRunningOutput(int timeout)
Block until either the output process is running or the timeout (in seconds) is raised.
void subscribe(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &controlSocketAddress)
Start listening for process management information on the given address.
bool hasWorkers() const
Check if there is at least one running worker.
void killProcesses(unsigned int timeout)
Ask all processes to terminate. If not, kill them after timeout seconds.
void initialize(unsigned int requestedNumberOfWorkers)
Init the processing with that many workers.
unsigned int needMoreWorkers() const
Compare our current list of workers of how many we want to have.
void reset()
Reset the internal state.
static void initializeEndRun()
Initialize run independent random generator for end run.
static void initializeBeginRun()
Initialize run independent random generator for begin run.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static RbTupleManager & Instance()
Access to singleton.
Definition: RbTuple.cc:40
int hadd(bool deleteflag=true)
Functions to add up all histogram files.
Definition: RbTuple.cc:138
Helper class for data store serialization.
Definition: StreamHelper.h:23
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
static const double s
[second]
Definition: Unit.h:95
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
This class provides the core event processing loop for parallel processing with ZMQ.
ProcessMonitor m_processMonitor
Instance of the process monitor.
void processEndRun()
Calls EndRun function.
void runMonitoring(const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Start the monitoring (without forking)
void processBeginRun(bool skipDB=false)
Calls BeginRun function.
void cleanup()
clean up IPC resources (should only be called in one process).
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain using parallel processing, starting with the first module in the give...
void terminateAndCleanup(const ModulePtr &histogramManager)
Last step in the process: run the termination and cleanup (kill all remaining processes)
void runWorker(unsigned int numProcesses, const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the N worker process.
void initialize(const ModulePtrList &moduleList, const ModulePtr &histogramManager)
First step in the process: init the module in the list.
bool processEvent(PathIterator moduleIter, bool skipMasterModule, bool Worker=false, bool output=false)
Calls Event function.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true, bool isWorkerProcess=false, bool isOutputProcess=false)
Process modules in the path.
void runInput(const PathPtr &inputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the input process.
void runOutput(const PathPtr &outputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the output process.
void forkAndRun(long maxEvent, const PathPtr &inputPath, const PathPtr &mainPath, const PathPtr &outputPath, const ModulePtrList &terminateGlobally)
Second step in the process: fork out the processes we need to have and call the event loop.
virtual ~ZMQEventProcessor()
Make sure we remove all sockets cleanly.
EventMetaData m_previousEventMetaData
Stores previous eventMetaData.
void processPath(const PathPtr &localPath, const ModulePtrList &terminateGlobally, long maxEvent)
Basic function run in every process: process the event loop of the given path.
ZMQEventProcessor()
Init the socket cleaning at exit.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:142
void update()
Updates all objects that are outside their interval of validity.
Definition: DBStore.cc:79
@ c_Output
Output Process.
@ c_Worker
Worker/Reconstruction Process.
@ c_Monitor
Monitoring Process.
@ c_Input
Input Process.
@ c_Init
Before the forks, the process is in init state.
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
@ c_pub
Output socket.
double getClock()
Return current value of the real-time clock.
Definition: Utils.cc:66
Abstract base class for different kinds of events.
STL namespace.