Belle II Software  light-2311-nebelung
ZMQEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/GlobalProcHandler.h>
11 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/PathUtils.h>
15 
16 #include <framework/pcore/ZMQEventProcessor.h>
17 #include <framework/pcore/DataStoreStreamer.h>
18 #include <framework/pcore/RbTuple.h>
19 
20 #include <framework/core/Environment.h>
21 #include <framework/logging/LogSystem.h>
22 
23 #include <framework/database/DBStore.h>
24 #include <framework/database/Database.h>
25 #include <framework/core/RandomNumbers.h>
26 #include <framework/core/MetadataService.h>
27 #include <framework/gearbox/Unit.h>
28 #include <framework/utilities/Utils.h>
29 
30 #include <TROOT.h>
31 
32 #include <sys/stat.h>
33 
34 #include <csignal>
35 #include <fstream>
36 
37 using namespace std;
38 using namespace Belle2;
39 
40 namespace {
48  static int g_signalReceived = 0;
49 
51  static ZMQEventProcessor* g_eventProcessorForSignalHandling = nullptr;
52 
53  static void cleanupAndRaiseSignal(int signalNumber)
54  {
55  if (g_eventProcessorForSignalHandling) {
56  g_eventProcessorForSignalHandling->cleanup();
57  }
58  // uninstall current handler and call default one.
59  signal(signalNumber, SIG_DFL);
60  raise(signalNumber);
61  }
62 
63  static void storeSignal(int signalNumber)
64  {
65  if (signalNumber == SIGINT) {
66  EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
67  }
68 
69  // We do not want to remove the first signal
70  if (g_signalReceived == 0) {
71  g_signalReceived = signalNumber;
72  }
73  }
74 
76  std::string g_socketAddress = "";
77 
78  void deleteSocketFiles()
79  {
80  if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
81  return;
82  }
83 
84  const std::vector<ZMQAddressType> socketAddressList = {ZMQAddressType::c_input, ZMQAddressType::c_output, ZMQAddressType::c_pub, ZMQAddressType::c_sub, ZMQAddressType::c_control};
85  const auto seperatorPos = g_socketAddress.find("://");
86 
87  if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
88  return;
89  }
90 
91  const std::string filename(g_socketAddress.substr(seperatorPos + 3));
92 
93  struct stat buffer;
94  for (const auto socketAdressType : socketAddressList) {
95  const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
96  if (stat(socketAddress.c_str(), &buffer) == 0) {
97  remove(socketAddress.c_str());
98  }
99  }
100  }
101 } // namespace
102 
103 ZMQEventProcessor::ZMQEventProcessor()
104 {
105  B2ASSERT("You are having two instances of the ZMQEventProcessor running! This is not possible",
106  not g_eventProcessorForSignalHandling);
107  g_eventProcessorForSignalHandling = this;
108 
109  // Make sure to remove the sockets
110  g_socketAddress = Environment::Instance().getZMQSocketAddress();
111  std::atexit(deleteSocketFiles);
112 }
113 
114 ZMQEventProcessor::~ZMQEventProcessor()
115 {
116  cleanup();
117  g_eventProcessorForSignalHandling = nullptr;
118 }
119 
120 void ZMQEventProcessor::process(const PathPtr& path, long maxEvent)
121 {
122  // Concerning signal handling:
123  // * During the initialization, we just raise the signal without doing any cleanup etc.
124  // * During the event execution, we will not allow for any signal in all processes except the parent process.
125  // Here, we catch sigint and clean up the processes AND WHAT DO WE DO IN THE OTHER CASES?
126  // * During cleanup, we will just ignore sigint, but the rest will be raised
127 
128  if (path->isEmpty()) {
129  return;
130  }
131 
132  const int numProcesses = Environment::Instance().getNumberProcesses();
133  if (numProcesses == 0) {
134  B2FATAL("ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
135  }
136 
137  // Split the path into input, main and output. A nullptr means, the path should not be used
138  PathPtr inputPath, mainPath, outputPath;
139  std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
140  const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath, mainPath, outputPath);
141 
142  // Check for existence of HLTZMQ2Ds module in input path to set DAQ environment
143  for (const ModulePtr& module : inputPath->getModules()) {
144  if (module->getName() == "HLTZMQ2Ds") {
145  Environment::Instance().setZMQDAQEnvironment(true);
146  B2INFO("ZMQEventProcessor : DAQ environment set");
147  break;
148  }
149  }
150 
151  if (not mainPath or mainPath->isEmpty()) {
152  B2WARNING("Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
153  EventProcessor::process(path, maxEvent);
154  return;
155  }
156 
157  // inserts Rx/Tx modules into path (sets up IPC structures)
158  const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
159 
160  // Run the initialization of the modules and the histogram manager
161  initialize(moduleList, histogramManager);
162 
163  // pause();
164 
165  // The main part: fork into the different processes and run!
166  const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
167  forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
168 
169  installMainSignalHandlers(cleanupAndRaiseSignal);
170  // Run the final termination and cleanup with error check
171  terminateAndCleanup(histogramManager);
172 }
173 
174 void ZMQEventProcessor::initialize(const ModulePtrList& moduleList, const ModulePtr& histogramManager)
175 {
176  if (histogramManager) {
177  histogramManager->initialize();
178  }
179  // from now on the datastore is available
180  processInitialize(moduleList, true);
181 
182  B2INFO("ZMQEventProcessor : processInitialize done");
183 
184  // Don't start processing in case of no master module
185  if (!m_master) {
186  B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
187  }
188 
189  // Check if errors appeared. If yes, don't start the event processing.
190  int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
191  if (numLogError != 0) {
192  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
193  }
194 
195  // TODO: I do not really understand what is going on here...
201  // disable ROOT's management of TFiles
202  // clear list, but don't actually delete the objects
203  gROOT->GetListOfFiles()->Clear("nodelete");
204 }
205 
206 void ZMQEventProcessor::terminateAndCleanup(const ModulePtr& histogramManager)
207 {
208  cleanup();
209 
210  if (histogramManager) {
211  B2INFO("HistoManager:: adding histogram files");
212  RbTupleManager::Instance().hadd();
213  }
214 
215  // did anything bad happen?
216  if (g_signalReceived) {
217  if (g_signalReceived == SIGINT) {
218  B2RESULT("Processing aborted via signal " << g_signalReceived <<
219  ", terminating. Output files have been closed safely and should be readable.");
220  } else {
221  B2ERROR("Processing aborted via signal " << g_signalReceived <<
222  ", terminating. Output files have been closed safely and should be readable.");
223  }
224  // re-raise the signal
225  installSignalHandler(g_signalReceived, SIG_DFL);
226  raise(g_signalReceived);
227  }
228 }
229 
230 void ZMQEventProcessor::runInput(const PathPtr& inputPath, const ModulePtrList& terminateGlobally, long maxEvent)
231 {
232  if (not inputPath or inputPath->isEmpty()) {
233  return;
234  }
235 
236  if (not GlobalProcHandler::startInputProcess()) {
237  // This is not the input process, clean up datastore to not contain the first event
238  DataStore::Instance().invalidateData(DataStore::c_Event);
239  return;
240  }
241 
242  // The default will be to not do anything on signals...
243  installMainSignalHandlers(SIG_IGN);
244 
245  m_processMonitor.reset();
246  DataStoreStreamer::removeSideEffects();
247 
248  processPath(inputPath, terminateGlobally, maxEvent);
249  B2DEBUG(30, "Finished an input process");
250  exit(0);
251 }
252 
253 void ZMQEventProcessor::runOutput(const PathPtr& outputPath, const ModulePtrList& terminateGlobally, long maxEvent)
254 {
255  const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
256  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
257  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
258 
259  if (not outputPath or outputPath->isEmpty()) {
260  return;
261  }
262 
263  if (not GlobalProcHandler::startOutputProcess()) {
264  return;
265  }
266 
267  // The default will be to not do anything on signals...
268  installMainSignalHandlers(SIG_IGN);
269 
270  m_processMonitor.reset();
271 
272  // Set the rx module as main module
273  m_master = outputPath->getModules().begin()->get();
274 
275  processPath(outputPath, terminateGlobally, maxEvent);
276 
277  // Send the statistics to the process monitor
278  StreamHelper streamer;
279  ZMQClient zmqClient;
280 
281  // TODO: true?
282  streamer.initialize(0, true);
283  zmqClient.initialize(pubSocketAddress, subSocketAddress);
284 
285  // TODO: make sure to only send statistics!
286  const auto& evtMessage = streamer.stream();
287  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
288  zmqClient.publish(std::move(message));
289 
290  B2DEBUG(30, "Finished an output process");
291  exit(0);
292 }
293 void ZMQEventProcessor::runWorker(unsigned int numProcesses, const PathPtr& inputPath, const PathPtr& mainPath,
294  const ModulePtrList& terminateGlobally, long maxEvent)
295 {
296  if (numProcesses == 0) {
297  return;
298  }
299 
300  if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
301  // Make sure the worker process is running until we go on
302  // m_processMonitor.waitForRunningWorker(60);
303  // m_processMonitor.waitForRunningWorker(7200);
304  m_processMonitor.waitForRunningWorker(Environment::Instance().getZMQMaximalWaitingTime());
305  return;
306  }
307 
308  // The default will be to not do anything on signals...
309  installMainSignalHandlers(SIG_IGN);
310 
311  if (inputPath and not inputPath->isEmpty()) {
312  // set Rx as master
313  m_master = mainPath->getModules().begin()->get();
314  }
315 
316  m_processMonitor.reset();
317  DataStoreStreamer::removeSideEffects();
318 
319  processPath(mainPath, terminateGlobally, maxEvent);
320  B2DEBUG(30, "Finished a worker process");
321  exit(0);
322 }
323 
324 void ZMQEventProcessor::processPath(const PathPtr& localPath, const ModulePtrList& terminateGlobally, long maxEvent)
325 {
326  ModulePtrList localModules = localPath->buildModulePathList();
327  maxEvent = getMaximumEventNumber(maxEvent);
328  // we are not using the default signal handler, so the processCore can not throw any exception because if sigint...
329  // processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input));
330  processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input),
331  GlobalProcHandler::isProcess(ProcType::c_Worker),
332  GlobalProcHandler::isProcess(ProcType::c_Output));
333 
334  B2DEBUG(30, "terminate process...");
335  PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
336  processTerminate(localModules);
337 }
338 
339 
340 void ZMQEventProcessor::runMonitoring(const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally,
341  long maxEvent)
342 {
343  if (not GlobalProcHandler::startMonitoringProcess()) {
344  return;
345  }
346 
347  const auto& environment = Environment::Instance();
348 
349  B2DEBUG(30, "Will now start process monitor...");
350  const int numProcesses = environment.getNumberProcesses();
351  m_processMonitor.initialize(numProcesses);
352 
353  // Make sure the input process is running until we go on
354  m_processMonitor.waitForRunningInput(60 * 1000);
355  if (m_processMonitor.hasEnded()) {
356  return;
357  }
358  // Make sure the output process is running until we go on
359  m_processMonitor.waitForRunningOutput(60 * 1000);
360  if (m_processMonitor.hasEnded()) {
361  return;
362  }
363 
364  installMainSignalHandlers(storeSignal);
365 
366  // at least start the number of workers requested
367  runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
368 
369  const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
370  const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
371 
372  B2DEBUG(30, "Will now start main loop...");
373  while (true) {
374  // check multicast for messages and kill workers if requested
375  m_processMonitor.checkMulticast();
376  // check the child processes, if one has died
377  m_processMonitor.checkChildProcesses();
378  // check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
379  m_processMonitor.checkSignals(g_signalReceived);
380 
381  // If we have received a SIGINT signal or the last process is gone, we can end smoothly
382  if (m_processMonitor.hasEnded()) {
383  break;
384  }
385 
386  // Test if we need more workers
387  const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
388  if (neededWorkers > 0) {
389  if (restartFailedWorkers) {
390  runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
391  } else if (failOnFailedWorkers) {
392  B2ERROR("A worker failed. Will try to end the process smoothly now.");
393  break;
394  } else if (not m_processMonitor.hasWorkers()) {
395  B2WARNING("All workers have died and you did not request to restart them. Going down now.");
396  break;
397  }
398  }
399  }
400 
401  B2DEBUG(30, "Finished the monitoring process");
402 }
403 
404 void ZMQEventProcessor::forkAndRun(long maxEvent, const PathPtr& inputPath, const PathPtr& mainPath, const PathPtr& outputPath,
405  const ModulePtrList& terminateGlobally)
406 {
407  const int numProcesses = Environment::Instance().getNumberProcesses();
408  GlobalProcHandler::initialize(numProcesses);
409 
410  const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
411 
412  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
413  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
414  const auto controlSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_control));
415 
416  // We catch all signals and store them into a variable. This is used during the main loop then.
417  // From now on, we have to make sure to clean up behind us
418  installMainSignalHandlers(cleanupAndRaiseSignal);
419  m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
420 
421  runInput(inputPath, terminateGlobally, maxEvent);
422  runOutput(outputPath, terminateGlobally, maxEvent);
423  runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
424 }
425 
426 void ZMQEventProcessor::cleanup()
427 {
428  if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
429  B2DEBUG(30, "Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
430  return;
431  }
432  m_processMonitor.killProcesses(5000);
433  m_processMonitor.terminate();
434 
435  deleteSocketFiles();
436 }
437 
438 void ZMQEventProcessor::processCore(const PathPtr& startPath, const ModulePtrList& modulePathList, long maxEvent,
439  bool isInputProcess, bool isWorkerProcess, bool isOutputProcess)
440 {
441  // bool firstRound = true;
442 
443  DataStore::Instance().setInitializeActive(false);
444  m_moduleList = modulePathList;
445 
446  //Remember the previous event meta data, and identify end of data meta data
447  m_previousEventMetaData.setEndOfData(); //invalid start state
448 
449  const bool collectStats = !Environment::Instance().getNoStats();
450 
451  //Loop over the events
452  long currEvent = 0;
453  bool endProcess = false;
454  while (!endProcess) {
455  if (collectStats)
456  m_processStatisticsPtr->startGlobal();
457 
458  // B2INFO ( "processCore:: currEvent = " << currEvent );
459 
460  PathIterator moduleIter(startPath);
461 
462  if (isInputProcess) {
463  endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
464  } else if (isWorkerProcess) {
465  endProcess = ZMQEventProcessor::processEvent(moduleIter, false,
466  isWorkerProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
467  } else if (isOutputProcess) {
468  endProcess = ZMQEventProcessor::processEvent(moduleIter, false, false,
469  isOutputProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
470  } else {
471  B2INFO("processCore : should not come here. Specified path is invalid");
472  return;
473  }
474 
475  // Original code
476  // endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
477 
478  //Delete event related data in DataStore
479  DataStore::Instance().invalidateData(DataStore::c_Event);
480 
481  currEvent++;
482  if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
483  if (collectStats)
484  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_Event);
485 
486  // firstRound = false;
487 
488  // B2INFO ( "processCore :: event processed" );
489 
490  } //end event loop
491 
492  //End last run
493  m_eventMetaDataPtr.create();
494  B2INFO("processCore : End Last Run. calling processEndRun()");
495  processEndRun();
496 }
497 
498 
499 bool ZMQEventProcessor::processEvent(PathIterator moduleIter, bool skipMasterModule, bool WorkerPath, bool OutputPath)
500 {
501  double time = Utils::getClock() / Unit::s;
502  if (time > m_lastMetadataUpdate + m_metadataUpdateInterval) {
503  MetadataService::Instance().addBasf2Status("running event loop");
504  m_lastMetadataUpdate = time;
505  }
506 
507  const bool collectStats = !Environment::Instance().getNoStats();
508 
509  while (!moduleIter.isDone()) {
510  Module* module = moduleIter.get();
511  // B2INFO ("Starting event of " << module->getName() );
512 
513  // run the module ... unless we don't want to
514  if (module != m_master) {
515  callEvent(module);
516  // B2INFO ( "not master. callEvent" );
517  // B2INFO ( "ZMQEventProcessor :: " <<module->getName() << " called. Not master" );
518  } else if (!skipMasterModule) {
519  callEvent(module);
520  // B2INFO ( "master but not skipModule. callEvent");
521  // B2INFO ( "ZMQEventProcessor :: " <<module->getName() << " called. Not skipMasterModule" );
522  } else
523  B2INFO("Skipping execution of module " << module->getName());
524 
525  if (!m_eventMetaDataPtr) {
526  // B2INFO ( "No event metadata....." );
527  return false;
528  }
529 
530  //Check for end of data
531  if (m_eventMetaDataPtr->isEndOfData()) {
532  // Immediately leave the loop and terminate (true)
533  B2INFO("isEndOfData. Return");
534  return true;
535  }
536 
537  //Handle EventMetaData changes by master module
538  if (module == m_master && !skipMasterModule) {
539 
540  //initialize random number state for the event
541  RandomNumbers::initializeEvent();
542 
543  // Worker Path
544  if (WorkerPath) {
545  B2INFO("Worker Path and First Event!");
546  if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaDataPtr->getExperiment(), m_eventMetaDataPtr->getRun())) {
547  // if ( m_eventMetaDataPtr->getExperiment() == Environment::Instance().getZMQDAQFirstEventExp() &&
548  // m_eventMetaDataPtr->getRun() == Environment::Instance().getZMQDAQFirstEventRun() ) {
549  B2INFO("Worker path processing for ZMQDAQ first event.....Skip to the end of path");
550  B2INFO(" --> exp = " << m_eventMetaDataPtr->getExperiment() << " run = " << m_eventMetaDataPtr->getRun());
551  while (true) {
552  module = moduleIter.get();
553  // B2INFO ( "Module in the path = " << module->getName() );
554  if (module->getName() == "ZMQTxWorker") break;
555  moduleIter.next();
556  }
557  // B2INFO ( "ZMQTxWorker will be called" );
558  continue;
559  }
560  }
561 
562  // Check for EndOfRun
563  if (!WorkerPath && !OutputPath) {
564  if (m_eventMetaDataPtr->isEndOfRun()) {
565  B2INFO("===> EndOfRun : calling processEndRun(); isEndOfRun = " << m_eventMetaDataPtr->isEndOfRun());
566  processEndRun();
567  // Store the current event meta data for the next round
568  m_previousEventMetaData = *m_eventMetaDataPtr;
569  // Leave this event, but not the full processing (false)
570  return false;
571  } else if (m_previousEventMetaData.isEndOfData() || m_previousEventMetaData.isEndOfRun()) {
572  if (m_eventMetaDataPtr->getRun() == m_previousEventMetaData.getRun()) {
573  B2INFO("===> EndOfData : ----> Run change request to the same run!!! Skip this event.");
574  return false;
575  }
576  B2INFO("===> EndOfData : calling processBeginRun(); isEndOfData = " << m_previousEventMetaData.isEndOfData() <<
577  " isEndOfRun = " << m_previousEventMetaData.isEndOfRun());
578  B2INFO("--> cur run = " << m_eventMetaDataPtr->getRun() << " <- prev run = " << m_previousEventMetaData.getRun());
579  B2INFO("--> cur evt = " << m_eventMetaDataPtr->getEvent() << " <- prev evt = " << m_previousEventMetaData.getEvent());
580  processBeginRun();
581  m_previousEventMetaData = *m_eventMetaDataPtr;
582  }
583 
584  //Check for a change of the run (should not come here)
585  else {
586  const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
587  (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
588  const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
589  and not m_previousEventMetaData.isEndOfRun();
590  // if (runChangedWithoutNotice && !g_first_round) {
591  if (runChangedWithoutNotice) {
592  if (collectStats)
593  m_processStatisticsPtr->suspendGlobal();
594 
595  B2INFO("===> Run Change (possibly offline) : calling processEndRun() and processBeginRun()");
596  B2INFO("--> cur run = " << m_eventMetaDataPtr->getRun() << " <- prev run = " << m_previousEventMetaData.getRun());
597  B2INFO("--> cur evt = " << m_eventMetaDataPtr->getEvent() << " <- prev evt = " << m_previousEventMetaData.getEvent());
598  B2INFO("--> runChanged = " << runChanged << " runChangedWithoutNotice = " << runChangedWithoutNotice);
599 
600  processEndRun();
601  processBeginRun();
602 
603  if (collectStats)
604  m_processStatisticsPtr->resumeGlobal();
605  }
606  }
607  m_previousEventMetaData = *m_eventMetaDataPtr;
608  } else
609  B2INFO("Skipping begin/end run processing");
610 
611  //make sure we use the event dependent generator again
612  RandomNumbers::useEventDependent();
613 
614  DBStore::Instance().updateEvent();
615 
616  } else if (!WorkerPath && !OutputPath) {
617  //Check for a second master module. Cannot do this if we skipped the
618  //master module as the EventMetaData is probably set before we call this
619  //function
620  if (!skipMasterModule && m_eventMetaDataPtr &&
621  (*m_eventMetaDataPtr != m_previousEventMetaData)) {
622  B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and " << module->getName());
623  }
624  }
625 
626  if (g_signalReceived != 0) {
627  throw StoppedBySignalException(g_signalReceived);
628  }
629 
630  //Check for the module conditions, evaluate them and if one is true switch to the new path
631  if (module->evalCondition()) {
632  PathPtr condPath = module->getConditionPath();
633  //continue with parent Path after condition path is executed?
634  if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
635  moduleIter = PathIterator(condPath, moduleIter);
636  } else {
637  moduleIter = PathIterator(condPath);
638  }
639  } else {
640  moduleIter.next();
641  }
642  } //end module loop
643  return false;
644 }
645 
646 void ZMQEventProcessor::processBeginRun(bool skipDB)
647 {
648  MetadataService::Instance().addBasf2Status("beginning run");
649 
650  m_inRun = true;
651  // auto dbsession = Database::Instance().createScopedUpdateSession();
652 
653  LogSystem& logSystem = LogSystem::Instance();
654  m_processStatisticsPtr->startGlobal();
655 
656  if (!skipDB) DBStore::Instance().update();
657 
658  //initialize random generator for end run
659  RandomNumbers::initializeBeginRun();
660 
661  for (const ModulePtr& modPtr : m_moduleList) {
662  Module* module = modPtr.get();
663 
664  //Set the module dependent log level
665  logSystem.updateModule(&(module->getLogConfig()), module->getName());
666 
667  //Do beginRun() call
668  m_processStatisticsPtr->startModule();
669  // CALL_MODULE(module, beginRun);
670  module->beginRun();
671  m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_BeginRun);
672 
673  //Set the global log level
674  logSystem.updateModule(nullptr);
675  }
676 
677  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_BeginRun);
678 }
679 
680 
681 void ZMQEventProcessor::processEndRun()
682 {
683  MetadataService::Instance().addBasf2Status("ending run");
684 
685  if (!m_inRun)
686  return;
687  m_inRun = false;
688 
689  LogSystem& logSystem = LogSystem::Instance();
690  m_processStatisticsPtr->startGlobal();
691 
692  const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
693  // *m_eventMetaDataPtr = m_previousEventMetaData;
694 
695  //initialize random generator for end run
696  RandomNumbers::initializeEndRun();
697 
698  for (const ModulePtr& modPtr : m_moduleList) {
699  Module* module = modPtr.get();
700 
701  //Set the module dependent log level
702  logSystem.updateModule(&(module->getLogConfig()), module->getName());
703 
704  //Do endRun() call
705  m_processStatisticsPtr->startModule();
706  // CALL_MODULE(module, endRun);
707  module->endRun();
708  m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_EndRun);
709 
710  //Set the global log level
711  logSystem.updateModule(nullptr);
712  }
713  *m_eventMetaDataPtr = newEventMetaData;
714 
715  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_EndRun);
716 }
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
Exception thrown when execution is stopped by a signal.
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
Base class for Modules.
Definition: Module.h:72
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
Helper class for data store serialization.
Definition: StreamHelper.h:23
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
This class provides the core event processing loop for parallel processing with ZMQ.
void cleanup()
clean up IPC resources (should only be called in one process).
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
@ c_pub
Output socket.
@ c_output
Input socket.
Abstract base class for different kinds of events.
Definition: ClusterUtils.h:24