Belle II Software  release-08-01-10
ZMQEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/GlobalProcHandler.h>
11 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/PathUtils.h>
15 
16 #include <framework/pcore/ZMQEventProcessor.h>
17 #include <framework/pcore/DataStoreStreamer.h>
18 #include <framework/pcore/RbTuple.h>
19 
20 #include <framework/core/Environment.h>
21 #include <framework/logging/LogSystem.h>
22 
23 #include <framework/database/DBStore.h>
24 #include <framework/database/Database.h>
25 #include <framework/core/RandomNumbers.h>
26 #include <framework/core/MetadataService.h>
27 #include <framework/gearbox/Unit.h>
28 #include <framework/utilities/Utils.h>
29 
30 #include <TROOT.h>
31 
32 #include <sys/stat.h>
33 
34 #include <csignal>
35 #include <fstream>
36 
37 using namespace std;
38 using namespace Belle2;
39 
40 namespace {
48  static int g_signalReceived = 0;
49 
51  static ZMQEventProcessor* g_eventProcessorForSignalHandling = nullptr;
52 
53  static void cleanupAndRaiseSignal(int signalNumber)
54  {
55  if (g_eventProcessorForSignalHandling) {
56  g_eventProcessorForSignalHandling->cleanup();
57  }
58  // uninstall current handler and call default one.
59  signal(signalNumber, SIG_DFL);
60  raise(signalNumber);
61  }
62 
63  static void storeSignal(int signalNumber)
64  {
65  if (signalNumber == SIGINT) {
66  EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
67  }
68 
69  // We do not want to remove the first signal
70  if (g_signalReceived == 0) {
71  g_signalReceived = signalNumber;
72  }
73  }
74 
76  std::string g_socketAddress = "";
77 
78  void deleteSocketFiles()
79  {
80  if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
81  return;
82  }
83 
84  const std::vector<ZMQAddressType> socketAddressList = {ZMQAddressType::c_input, ZMQAddressType::c_output, ZMQAddressType::c_pub, ZMQAddressType::c_sub, ZMQAddressType::c_control};
85  const auto seperatorPos = g_socketAddress.find("://");
86 
87  if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
88  return;
89  }
90 
91  const std::string filename(g_socketAddress.substr(seperatorPos + 3));
92 
93  struct stat buffer;
94  for (const auto socketAdressType : socketAddressList) {
95  const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
96  if (stat(socketAddress.c_str(), &buffer) == 0) {
97  remove(socketAddress.c_str());
98  }
99  }
100  }
101 } // namespace
102 
103 ZMQEventProcessor::ZMQEventProcessor()
104 {
105  B2ASSERT("You are having two instances of the ZMQEventProcessor running! This is not possible",
106  not g_eventProcessorForSignalHandling);
107  g_eventProcessorForSignalHandling = this;
108 
109  // Make sure to remove the sockets
110  g_socketAddress = Environment::Instance().getZMQSocketAddress();
111  std::atexit(deleteSocketFiles);
112 }
113 
114 ZMQEventProcessor::~ZMQEventProcessor()
115 {
116  cleanup();
117  g_eventProcessorForSignalHandling = nullptr;
118 }
119 
120 void ZMQEventProcessor::process(const PathPtr& path, long maxEvent)
121 {
122  // Concerning signal handling:
123  // * During the initialization, we just raise the signal without doing any cleanup etc.
124  // * During the event execution, we will not allow for any signal in all processes except the parent process.
125  // Here, we catch sigint and clean up the processes AND WHAT DO WE DO IN THE OTHER CASES?
126  // * During cleanup, we will just ignore sigint, but the rest will be raised
127 
128  if (path->isEmpty()) {
129  return;
130  }
131 
132  const int numProcesses = Environment::Instance().getNumberProcesses();
133  if (numProcesses == 0) {
134  B2FATAL("ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
135  }
136 
137  // Split the path into input, main and output. A nullptr means, the path should not be used
138  PathPtr inputPath, mainPath, outputPath;
139  std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
140  const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath, mainPath, outputPath);
141 
142  // Check for existence of HLTZMQ2Ds module in input path to set DAQ environment
143  for (const ModulePtr& module : inputPath->getModules()) {
144  if (module->getName() == "HLTZMQ2Ds") {
145  Environment::Instance().setZMQDAQEnvironment(true);
146  B2INFO("ZMQEventProcessor : DAQ environment set");
147  break;
148  }
149  }
150 
151  if (not mainPath or mainPath->isEmpty()) {
152  B2WARNING("Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
153  EventProcessor::process(path, maxEvent);
154  return;
155  }
156 
157  // inserts Rx/Tx modules into path (sets up IPC structures)
158  const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
159 
160  // Run the initialization of the modules and the histogram manager
161  initialize(moduleList, histogramManager);
162 
163  // pause();
164 
165  // The main part: fork into the different processes and run!
166  const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
167  forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
168 
169  installMainSignalHandlers(cleanupAndRaiseSignal);
170  // Run the final termination and cleanup with error check
171  terminateAndCleanup(histogramManager);
172 }
173 
174 void ZMQEventProcessor::initialize(const ModulePtrList& moduleList, const ModulePtr& histogramManager)
175 {
176  if (histogramManager) {
177  histogramManager->initialize();
178  }
179  // from now on the datastore is available
180  processInitialize(moduleList, true);
181 
182  B2INFO("ZMQEventProcessor : processInitialize done");
183 
184  // Don't start processing in case of no master module
185  if (!m_master) {
186  B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
187  }
188 
189  // Check if errors appeared. If yes, don't start the event processing.
190  int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
191  if (numLogError != 0) {
192  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
193  }
194 
195  // TODO: I do not really understand what is going on here...
201  // disable ROOT's management of TFiles
202  // clear list, but don't actually delete the objects
203  gROOT->GetListOfFiles()->Clear("nodelete");
204 }
205 
206 void ZMQEventProcessor::terminateAndCleanup(const ModulePtr& histogramManager)
207 {
208  cleanup();
209 
210  if (histogramManager) {
211  B2INFO("HistoManager:: adding histogram files");
212  RbTupleManager::Instance().hadd();
213  }
214 
215  // did anything bad happen?
216  if (g_signalReceived) {
217  if (g_signalReceived == SIGINT) {
218  B2RESULT("Processing aborted via signal " << g_signalReceived <<
219  ", terminating. Output files have been closed safely and should be readable.");
220  } else {
221  B2ERROR("Processing aborted via signal " << g_signalReceived <<
222  ", terminating. Output files have been closed safely and should be readable.");
223  }
224  // re-raise the signal
225  installSignalHandler(g_signalReceived, SIG_DFL);
226  raise(g_signalReceived);
227  }
228 }
229 
230 void ZMQEventProcessor::runInput(const PathPtr& inputPath, const ModulePtrList& terminateGlobally, long maxEvent)
231 {
232  if (not inputPath or inputPath->isEmpty()) {
233  return;
234  }
235 
236  if (not GlobalProcHandler::startInputProcess()) {
237  // This is not the input process, clean up datastore to not contain the first event
238  DataStore::Instance().invalidateData(DataStore::c_Event);
239  return;
240  }
241 
242  // The default will be to not do anything on signals...
243  installMainSignalHandlers(SIG_IGN);
244 
245  m_processMonitor.reset();
246  DataStoreStreamer::removeSideEffects();
247 
248  processPath(inputPath, terminateGlobally, maxEvent);
249  B2DEBUG(30, "Finished an input process");
250  exit(0);
251 }
252 
253 void ZMQEventProcessor::runOutput(const PathPtr& outputPath, const ModulePtrList& terminateGlobally, long maxEvent)
254 {
255  const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
256  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
257  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
258 
259  if (not outputPath or outputPath->isEmpty()) {
260  return;
261  }
262 
263  if (not GlobalProcHandler::startOutputProcess()) {
264  return;
265  }
266 
267  // The default will be to not do anything on signals...
268  installMainSignalHandlers(SIG_IGN);
269 
270  m_processMonitor.reset();
271 
272  // Set the rx module as main module
273  m_master = outputPath->getModules().begin()->get();
274 
275  processPath(outputPath, terminateGlobally, maxEvent);
276 
277  // Send the statistics to the process monitor
278  StreamHelper streamer;
279  ZMQClient zmqClient;
280 
281  // TODO: true?
282  streamer.initialize(0, true);
283  zmqClient.initialize(pubSocketAddress, subSocketAddress);
284 
285  // TODO: make sure to only send statistics!
286  const auto& evtMessage = streamer.stream();
287  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
288  zmqClient.publish(std::move(message));
289 
290  B2DEBUG(30, "Finished an output process");
291  exit(0);
292 }
293 void ZMQEventProcessor::runWorker(unsigned int numProcesses, const PathPtr& inputPath, const PathPtr& mainPath,
294  const ModulePtrList& terminateGlobally, long maxEvent)
295 {
296  if (numProcesses == 0) {
297  return;
298  }
299 
300  if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
301  // Make sure the worker process is running until we go on
302  // m_processMonitor.waitForRunningWorker(60);
303  // m_processMonitor.waitForRunningWorker(7200);
304  m_processMonitor.waitForRunningWorker(Environment::Instance().getZMQMaximalWaitingTime());
305  return;
306  }
307 
308  // The default will be to not do anything on signals...
309  installMainSignalHandlers(SIG_IGN);
310 
311  if (inputPath and not inputPath->isEmpty()) {
312  // set Rx as master
313  m_master = mainPath->getModules().begin()->get();
314  }
315 
316  m_processMonitor.reset();
317  DataStoreStreamer::removeSideEffects();
318 
319  processPath(mainPath, terminateGlobally, maxEvent);
320  B2DEBUG(30, "Finished a worker process");
321  exit(0);
322 }
323 
324 void ZMQEventProcessor::processPath(const PathPtr& localPath, const ModulePtrList& terminateGlobally, long maxEvent)
325 {
326  ModulePtrList localModules = localPath->buildModulePathList();
327  maxEvent = getMaximumEventNumber(maxEvent);
328  // we are not using the default signal handler, so the processCore can not throw any exception because if sigint...
329  // processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input));
330  processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input),
331  GlobalProcHandler::isProcess(ProcType::c_Worker),
332  GlobalProcHandler::isProcess(ProcType::c_Output));
333 
334  B2DEBUG(30, "terminate process...");
335  PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
336  processTerminate(localModules);
337 }
338 
339 
340 void ZMQEventProcessor::runMonitoring(const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally,
341  long maxEvent)
342 {
343  if (not GlobalProcHandler::startMonitoringProcess()) {
344  return;
345  }
346 
347  const auto& environment = Environment::Instance();
348 
349  B2DEBUG(30, "Will now start process monitor...");
350  const int numProcesses = environment.getNumberProcesses();
351  m_processMonitor.initialize(numProcesses);
352 
353  // Make sure the input process is running until we go on
354  m_processMonitor.waitForRunningInput(60 * 1000);
355  if (m_processMonitor.hasEnded()) {
356  return;
357  }
358  // Make sure the output process is running until we go on
359  m_processMonitor.waitForRunningOutput(60 * 1000);
360  if (m_processMonitor.hasEnded()) {
361  return;
362  }
363 
364  installMainSignalHandlers(storeSignal);
365 
366  // at least start the number of workers requested
367  runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
368 
369  const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
370  const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
371 
372  B2DEBUG(30, "Will now start main loop...");
373  while (true) {
374  // check multicast for messages and kill workers if requested
375  m_processMonitor.checkMulticast();
376  // check the child processes, if one has died
377  m_processMonitor.checkChildProcesses();
378  // check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
379  m_processMonitor.checkSignals(g_signalReceived);
380 
381  // If we have received a SIGINT signal or the last process is gone, we can end smoothly
382  if (m_processMonitor.hasEnded()) {
383  break;
384  }
385 
386  // Test if we need more workers
387  const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
388  if (neededWorkers > 0) {
389  B2DEBUG(30, "restartFailedWorkers = " << restartFailedWorkers);
390  if (restartFailedWorkers) {
391  B2DEBUG(30, ".... Restarting a new worker");
392  B2ERROR(".... Restarting a new worker process");
393  runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
394  } else if (failOnFailedWorkers) {
395  B2ERROR("A worker failed. Will try to end the process smoothly now.");
396  break;
397  } else if (not m_processMonitor.hasWorkers()) {
398  B2WARNING("All workers have died and you did not request to restart them. Going down now.");
399  break;
400  }
401  }
402  }
403 
404  B2DEBUG(30, "Finished the monitoring process");
405 }
406 
407 void ZMQEventProcessor::forkAndRun(long maxEvent, const PathPtr& inputPath, const PathPtr& mainPath, const PathPtr& outputPath,
408  const ModulePtrList& terminateGlobally)
409 {
410  const int numProcesses = Environment::Instance().getNumberProcesses();
411  GlobalProcHandler::initialize(numProcesses);
412 
413  const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
414 
415  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
416  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
417  const auto controlSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_control));
418 
419  // We catch all signals and store them into a variable. This is used during the main loop then.
420  // From now on, we have to make sure to clean up behind us
421  installMainSignalHandlers(cleanupAndRaiseSignal);
422  m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
423 
424  runInput(inputPath, terminateGlobally, maxEvent);
425  runOutput(outputPath, terminateGlobally, maxEvent);
426  runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
427 }
428 
429 void ZMQEventProcessor::cleanup()
430 {
431  if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
432  B2DEBUG(30, "Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
433  return;
434  }
435  m_processMonitor.killProcesses(5000);
436  m_processMonitor.terminate();
437 
438  deleteSocketFiles();
439 }
440 
441 void ZMQEventProcessor::processCore(const PathPtr& startPath, const ModulePtrList& modulePathList, long maxEvent,
442  bool isInputProcess, bool isWorkerProcess, bool isOutputProcess)
443 {
444  // bool firstRound = true;
445 
446  DataStore::Instance().setInitializeActive(false);
447  m_moduleList = modulePathList;
448 
449  //Remember the previous event meta data, and identify end of data meta data
450  m_previousEventMetaData.setEndOfData(); //invalid start state
451 
452  const bool collectStats = !Environment::Instance().getNoStats();
453 
454  //Loop over the events
455  long currEvent = 0;
456  bool endProcess = false;
457  while (!endProcess) {
458  if (collectStats)
459  m_processStatisticsPtr->startGlobal();
460 
461  // B2INFO ( "processCore:: currEvent = " << currEvent );
462 
463  PathIterator moduleIter(startPath);
464 
465  if (isInputProcess) {
466  endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
467  } else if (isWorkerProcess) {
468  endProcess = ZMQEventProcessor::processEvent(moduleIter, false,
469  isWorkerProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
470  } else if (isOutputProcess) {
471  endProcess = ZMQEventProcessor::processEvent(moduleIter, false, false,
472  isOutputProcess && currEvent == 0 && Environment::Instance().getZMQDAQEnvironment());
473  } else {
474  B2INFO("processCore : should not come here. Specified path is invalid");
475  return;
476  }
477 
478  // Original code
479  // endProcess = ZMQEventProcessor::processEvent(moduleIter, isInputProcess && currEvent == 0);
480 
481  //Delete event related data in DataStore
482  DataStore::Instance().invalidateData(DataStore::c_Event);
483 
484  currEvent++;
485  if ((maxEvent > 0) && (currEvent >= maxEvent)) endProcess = true;
486  if (collectStats)
487  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_Event);
488 
489  // firstRound = false;
490 
491  // B2INFO ( "processCore :: event processed" );
492 
493  } //end event loop
494 
495  //End last run
496  m_eventMetaDataPtr.create();
497  B2INFO("processCore : End Last Run. calling processEndRun()");
498  processEndRun();
499 }
500 
501 
502 bool ZMQEventProcessor::processEvent(PathIterator moduleIter, bool skipMasterModule, bool WorkerPath, bool OutputPath)
503 {
504  double time = Utils::getClock() / Unit::s;
505  if (time > m_lastMetadataUpdate + m_metadataUpdateInterval) {
506  MetadataService::Instance().addBasf2Status("running event loop");
507  m_lastMetadataUpdate = time;
508  }
509 
510  const bool collectStats = !Environment::Instance().getNoStats();
511 
512  while (!moduleIter.isDone()) {
513  Module* module = moduleIter.get();
514  // B2INFO ("Starting event of " << module->getName() );
515 
516  // run the module ... unless we don't want to
517  if (module != m_master) {
518  callEvent(module);
519  // B2INFO ( "not master. callEvent" );
520  // B2INFO ( "ZMQEventProcessor :: " <<module->getName() << " called. Not master" );
521  } else if (!skipMasterModule) {
522  callEvent(module);
523  // B2INFO ( "master but not skipModule. callEvent");
524  // B2INFO ( "ZMQEventProcessor :: " <<module->getName() << " called. Not skipMasterModule" );
525  } else
526  B2INFO("Skipping execution of module " << module->getName());
527 
528  if (!m_eventMetaDataPtr) {
529  // B2INFO ( "No event metadata....." );
530  return false;
531  }
532 
533  //Check for end of data
534  if (m_eventMetaDataPtr->isEndOfData()) {
535  // Immediately leave the loop and terminate (true)
536  B2INFO("isEndOfData. Return");
537  return true;
538  }
539 
540  //Handle EventMetaData changes by master module
541  if (module == m_master && !skipMasterModule) {
542 
543  //initialize random number state for the event
544  RandomNumbers::initializeEvent();
545 
546  // Worker Path
547  if (WorkerPath) {
548  B2INFO("Worker Path and First Event!");
549  if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaDataPtr->getExperiment(), m_eventMetaDataPtr->getRun())) {
550  // if ( m_eventMetaDataPtr->getExperiment() == Environment::Instance().getZMQDAQFirstEventExp() &&
551  // m_eventMetaDataPtr->getRun() == Environment::Instance().getZMQDAQFirstEventRun() ) {
552  B2INFO("Worker path processing for ZMQDAQ first event.....Skip to the end of path");
553  B2INFO(" --> exp = " << m_eventMetaDataPtr->getExperiment() << " run = " << m_eventMetaDataPtr->getRun());
554  while (true) {
555  module = moduleIter.get();
556  // B2INFO ( "Module in the path = " << module->getName() );
557  if (module->getName() == "ZMQTxWorker") break;
558  moduleIter.next();
559  }
560  // B2INFO ( "ZMQTxWorker will be called" );
561  continue;
562  }
563  }
564 
565  // Check for EndOfRun
566  if (!WorkerPath && !OutputPath) {
567  if (m_eventMetaDataPtr->isEndOfRun()) {
568  B2INFO("===> EndOfRun : calling processEndRun(); isEndOfRun = " << m_eventMetaDataPtr->isEndOfRun());
569  processEndRun();
570  // Store the current event meta data for the next round
571  m_previousEventMetaData = *m_eventMetaDataPtr;
572  // Leave this event, but not the full processing (false)
573  return false;
574  } else if (m_previousEventMetaData.isEndOfData() || m_previousEventMetaData.isEndOfRun()) {
575  if (m_eventMetaDataPtr->getRun() == m_previousEventMetaData.getRun()) {
576  B2INFO("===> EndOfData : ----> Run change request to the same run!!! Skip this event.");
577  return false;
578  }
579  B2INFO("===> EndOfData : calling processBeginRun(); isEndOfData = " << m_previousEventMetaData.isEndOfData() <<
580  " isEndOfRun = " << m_previousEventMetaData.isEndOfRun());
581  B2INFO("--> cur run = " << m_eventMetaDataPtr->getRun() << " <- prev run = " << m_previousEventMetaData.getRun());
582  B2INFO("--> cur evt = " << m_eventMetaDataPtr->getEvent() << " <- prev evt = " << m_previousEventMetaData.getEvent());
583  processBeginRun();
584  m_previousEventMetaData = *m_eventMetaDataPtr;
585  }
586 
587  //Check for a change of the run (should not come here)
588  else {
589  const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
590  (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
591  const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
592  and not m_previousEventMetaData.isEndOfRun();
593  // if (runChangedWithoutNotice && !g_first_round) {
594  if (runChangedWithoutNotice) {
595  if (collectStats)
596  m_processStatisticsPtr->suspendGlobal();
597 
598  B2INFO("===> Run Change (possibly offline) : calling processEndRun() and processBeginRun()");
599  B2INFO("--> cur run = " << m_eventMetaDataPtr->getRun() << " <- prev run = " << m_previousEventMetaData.getRun());
600  B2INFO("--> cur evt = " << m_eventMetaDataPtr->getEvent() << " <- prev evt = " << m_previousEventMetaData.getEvent());
601  B2INFO("--> runChanged = " << runChanged << " runChangedWithoutNotice = " << runChangedWithoutNotice);
602 
603  processEndRun();
604  processBeginRun();
605 
606  if (collectStats)
607  m_processStatisticsPtr->resumeGlobal();
608  }
609  }
610  m_previousEventMetaData = *m_eventMetaDataPtr;
611  } else
612  B2INFO("Skipping begin/end run processing");
613 
614  //make sure we use the event dependent generator again
615  RandomNumbers::useEventDependent();
616 
617  DBStore::Instance().updateEvent();
618 
619  } else if (!WorkerPath && !OutputPath) {
620  //Check for a second master module. Cannot do this if we skipped the
621  //master module as the EventMetaData is probably set before we call this
622  //function
623  if (!skipMasterModule && m_eventMetaDataPtr &&
624  (*m_eventMetaDataPtr != m_previousEventMetaData)) {
625  B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and " << module->getName());
626  }
627  }
628 
629  if (g_signalReceived != 0) {
630  throw StoppedBySignalException(g_signalReceived);
631  }
632 
633  //Check for the module conditions, evaluate them and if one is true switch to the new path
634  if (module->evalCondition()) {
635  PathPtr condPath = module->getConditionPath();
636  //continue with parent Path after condition path is executed?
637  if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
638  moduleIter = PathIterator(condPath, moduleIter);
639  } else {
640  moduleIter = PathIterator(condPath);
641  }
642  } else {
643  moduleIter.next();
644  }
645  } //end module loop
646  return false;
647 }
648 
649 void ZMQEventProcessor::processBeginRun(bool skipDB)
650 {
651  MetadataService::Instance().addBasf2Status("beginning run");
652 
653  m_inRun = true;
654  // auto dbsession = Database::Instance().createScopedUpdateSession();
655 
656  LogSystem& logSystem = LogSystem::Instance();
657  m_processStatisticsPtr->startGlobal();
658 
659  if (!skipDB) DBStore::Instance().update();
660 
661  //initialize random generator for end run
662  RandomNumbers::initializeBeginRun();
663 
664  for (const ModulePtr& modPtr : m_moduleList) {
665  Module* module = modPtr.get();
666 
667  //Set the module dependent log level
668  logSystem.updateModule(&(module->getLogConfig()), module->getName());
669 
670  //Do beginRun() call
671  m_processStatisticsPtr->startModule();
672  // CALL_MODULE(module, beginRun);
673  module->beginRun();
674  m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_BeginRun);
675 
676  //Set the global log level
677  logSystem.updateModule(nullptr);
678  }
679 
680  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_BeginRun);
681 }
682 
683 
684 void ZMQEventProcessor::processEndRun()
685 {
686  MetadataService::Instance().addBasf2Status("ending run");
687 
688  if (!m_inRun)
689  return;
690  m_inRun = false;
691 
692  LogSystem& logSystem = LogSystem::Instance();
693  m_processStatisticsPtr->startGlobal();
694 
695  const EventMetaData newEventMetaData = *m_eventMetaDataPtr;
696  // *m_eventMetaDataPtr = m_previousEventMetaData;
697 
698  //initialize random generator for end run
699  RandomNumbers::initializeEndRun();
700 
701  for (const ModulePtr& modPtr : m_moduleList) {
702  Module* module = modPtr.get();
703 
704  //Set the module dependent log level
705  logSystem.updateModule(&(module->getLogConfig()), module->getName());
706 
707  //Do endRun() call
708  m_processStatisticsPtr->startModule();
709  // CALL_MODULE(module, endRun);
710  module->endRun();
711  m_processStatisticsPtr->stopModule(module, ModuleStatistics::c_EndRun);
712 
713  //Set the global log level
714  logSystem.updateModule(nullptr);
715  }
716  *m_eventMetaDataPtr = newEventMetaData;
717 
718  m_processStatisticsPtr->stopGlobal(ModuleStatistics::c_EndRun);
719 }
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
Exception thrown when execution is stopped by a signal.
Class for logging debug, info and error messages.
Definition: LogSystem.h:46
void updateModule(const LogConfig *moduleLogConfig=nullptr, const std::string &moduleName="")
Sets the log configuration to the given module log configuration and sets the module name This method...
Definition: LogSystem.h:191
Base class for Modules.
Definition: Module.h:72
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
Helper class for data store serialization.
Definition: StreamHelper.h:23
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
This class provides the core event processing loop for parallel processing with ZMQ.
void cleanup()
clean up IPC resources (should only be called in one process).
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
@ c_pub
Output socket.
@ c_output
Input socket.
Abstract base class for different kinds of events.