Belle II Software  release-05-01-25
ZMQEventProcessor.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Anselm Baur, Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 
11 #include <framework/pcore/ProcHelper.h>
12 #include <framework/pcore/GlobalProcHandler.h>
13 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
14 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
15 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
16 #include <framework/pcore/PathUtils.h>
17 
18 #include <framework/pcore/ZMQEventProcessor.h>
19 #include <framework/pcore/DataStoreStreamer.h>
20 #include <framework/pcore/RbTuple.h>
21 
22 #include <framework/core/Environment.h>
23 #include <framework/logging/LogSystem.h>
24 
25 #include <TROOT.h>
26 
27 #include <sys/stat.h>
28 
29 #include <csignal>
30 #include <fstream>
31 
32 using namespace std;
33 using namespace Belle2;
34 
35 namespace {
42  static int g_signalReceived = 0;
44 
46  static ZMQEventProcessor* g_eventProcessorForSignalHandling = nullptr;
47 
48  static void cleanupAndRaiseSignal(int signalNumber)
49  {
50  if (g_eventProcessorForSignalHandling) {
51  g_eventProcessorForSignalHandling->cleanup();
52  }
53  // uninstall current handler and call default one.
54  signal(signalNumber, SIG_DFL);
55  raise(signalNumber);
56  }
57 
58  static void storeSignal(int signalNumber)
59  {
60  if (signalNumber == SIGINT) {
61  EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
62  }
63 
64  // We do not want to remove the first signal
65  if (g_signalReceived == 0) {
66  g_signalReceived = signalNumber;
67  }
68  }
69 
71  std::string g_socketAddress = "";
72 
73  void deleteSocketFiles()
74  {
75  if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
76  return;
77  }
78 
79  const std::vector<ZMQAddressType> socketAddressList = {ZMQAddressType::c_input, ZMQAddressType::c_output, ZMQAddressType::c_pub, ZMQAddressType::c_sub, ZMQAddressType::c_control};
80  const auto seperatorPos = g_socketAddress.find("://");
81 
82  if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
83  return;
84  }
85 
86  const std::string filename(g_socketAddress.substr(seperatorPos + 3));
87 
88  struct stat buffer;
89  for (const auto socketAdressType : socketAddressList) {
90  const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
91  if (stat(socketAddress.c_str(), &buffer) == 0) {
92  remove(socketAddress.c_str());
93  }
94  }
95  }
96 } // namespace
97 
98 ZMQEventProcessor::ZMQEventProcessor()
99 {
100  B2ASSERT("You are having two instances of the ZMQEventProcessor running! This is not possible",
101  not g_eventProcessorForSignalHandling);
102  g_eventProcessorForSignalHandling = this;
103 
104  // Make sure to remove the sockets
105  g_socketAddress = Environment::Instance().getZMQSocketAddress();
106  std::atexit(deleteSocketFiles);
107 }
108 
109 ZMQEventProcessor::~ZMQEventProcessor()
110 {
111  cleanup();
112  g_eventProcessorForSignalHandling = nullptr;
113 }
114 
115 void ZMQEventProcessor::process(const PathPtr& path, long maxEvent)
116 {
117  // Concerning signal handling:
118  // * During the initialization, we just raise the signal without doing any cleanup etc.
119  // * During the event execution, we will not allow for any signal in all processes except the parent process.
120  // Here, we catch sigint and clean up the processes AND WHAT DO WE DO IN THE OTHER CASES?
121  // * During cleanup, we will just ignore sigint, but the rest will be raised
122 
123  if (path->isEmpty()) {
124  return;
125  }
126 
127  const int numProcesses = Environment::Instance().getNumberProcesses();
128  if (numProcesses == 0) {
129  B2FATAL("ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
130  }
131 
132  // Split the path into input, main and output. A nullptr means, the path should not be used
133  PathPtr inputPath, mainPath, outputPath;
134  std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
135  const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath, mainPath, outputPath);
136 
137  if (not mainPath or mainPath->isEmpty()) {
138  B2WARNING("Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
139  EventProcessor::process(path, maxEvent);
140  return;
141  }
142 
143  // inserts Rx/Tx modules into path (sets up IPC structures)
144  const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
145 
146  B2DEBUG(10, "Initialisation phase");
147  // Run the initialization of the modules and the histogram manager
148  initialize(moduleList, histogramManager);
149 
150  B2DEBUG(10, "Main phase");
151  // The main part: fork into the different processes and run!
152  const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
153  forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
154 
155  B2DEBUG(10, "Terminate phase");
156  installMainSignalHandlers(cleanupAndRaiseSignal);
157  // Run the final termination and cleanup with error check
158  terminateAndCleanup(histogramManager);
159 }
160 
161 void ZMQEventProcessor::initialize(const ModulePtrList& moduleList, const ModulePtr& histogramManager)
162 {
163  if (histogramManager) {
164  histogramManager->initialize();
165  }
166  // from now on the datastore is available
167  processInitialize(moduleList, true);
168 
169  // Don't start processing in case of no master module
170  if (!m_master) {
171  B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
172  }
173 
174  // Check if errors appeared. If yes, don't start the event processing.
175  int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
176  if (numLogError != 0) {
177  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
178  }
179 
180  // TODO: I do not really understand what is going on here...
186  // disable ROOT's management of TFiles
187  // clear list, but don't actually delete the objects
188  gROOT->GetListOfFiles()->Clear("nodelete");
189 }
190 
191 void ZMQEventProcessor::terminateAndCleanup(const ModulePtr& histogramManager)
192 {
193  cleanup();
194 
195  if (histogramManager) {
196  B2INFO("HistoManager:: adding histogram files");
197  RbTupleManager::Instance().hadd();
198  }
199 
200  // did anything bad happen?
201  if (g_signalReceived) {
202  if (g_signalReceived == SIGINT) {
203  B2RESULT("Processing aborted via signal " << g_signalReceived <<
204  ", terminating. Output files have been closed safely and should be readable.");
205  } else {
206  B2ERROR("Processing aborted via signal " << g_signalReceived <<
207  ", terminating. Output files have been closed safely and should be readable.");
208  }
209  // re-raise the signal
210  installSignalHandler(g_signalReceived, SIG_DFL);
211  raise(g_signalReceived);
212  }
213 }
214 
215 void ZMQEventProcessor::runInput(const PathPtr& inputPath, const ModulePtrList& terminateGlobally, long maxEvent)
216 {
217  if (not inputPath or inputPath->isEmpty()) {
218  return;
219  }
220 
221  if (not GlobalProcHandler::startInputProcess()) {
222  // This is not the input process, clean up datastore to not contain the first event
223  DataStore::Instance().invalidateData(DataStore::c_Event);
224  return;
225  }
226 
227  // The default will be to not do anything on signals...
228  installMainSignalHandlers(SIG_IGN);
229 
230  m_processMonitor.reset();
231  DataStoreStreamer::removeSideEffects();
232 
233  processPath(inputPath, terminateGlobally, maxEvent);
234  B2DEBUG(10, "Finished an input process");
235  exit(0);
236 }
237 
238 void ZMQEventProcessor::runOutput(const PathPtr& outputPath, const ModulePtrList& terminateGlobally, long maxEvent)
239 {
240  const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
241  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
242  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
243 
244  if (not outputPath or outputPath->isEmpty()) {
245  return;
246  }
247 
248  if (not GlobalProcHandler::startOutputProcess()) {
249  return;
250  }
251 
252  // The default will be to not do anything on signals...
253  installMainSignalHandlers(SIG_IGN);
254 
255  m_processMonitor.reset();
256 
257  // Set the rx module as main module
258  m_master = outputPath->getModules().begin()->get();
259 
260  processPath(outputPath, terminateGlobally, maxEvent);
261 
262  // Send the statistics to the process monitor
263  StreamHelper streamer;
264  ZMQClient zmqClient;
265 
266  // TODO: true?
267  streamer.initialize(0, true);
268  zmqClient.initialize(pubSocketAddress, subSocketAddress);
269 
270  // TODO: make sure to only send statistics!
271  const auto& evtMessage = streamer.stream();
272  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
273  zmqClient.publish(std::move(message));
274 
275  B2DEBUG(10, "Finished an output process");
276  exit(0);
277 }
278 void ZMQEventProcessor::runWorker(unsigned int numProcesses, const PathPtr& inputPath, const PathPtr& mainPath,
279  const ModulePtrList& terminateGlobally, long maxEvent)
280 {
281  if (numProcesses == 0) {
282  return;
283  }
284 
285  if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
286  // Make sure the worker process is running until we go on
287  m_processMonitor.waitForRunningWorker(60);
288  return;
289  }
290 
291  // The default will be to not do anything on signals...
292  installMainSignalHandlers(SIG_IGN);
293 
294  if (inputPath and not inputPath->isEmpty()) {
295  // set Rx as master
296  m_master = mainPath->getModules().begin()->get();
297  }
298 
299  m_processMonitor.reset();
300  DataStoreStreamer::removeSideEffects();
301 
302  processPath(mainPath, terminateGlobally, maxEvent);
303  B2DEBUG(10, "Finished a worker process");
304  exit(0);
305 }
306 
307 void ZMQEventProcessor::processPath(const PathPtr& localPath, const ModulePtrList& terminateGlobally, long maxEvent)
308 {
309  ModulePtrList localModules = localPath->buildModulePathList();
310  maxEvent = getMaximumEventNumber(maxEvent);
311  // we are not using the default signal handler, so the processCore can not throw any exception because if sigint...
312  processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input));
313 
314  B2DEBUG(10, "terminate process...");
315  PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
316  processTerminate(localModules);
317 }
318 
319 
320 void ZMQEventProcessor::runMonitoring(const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally,
321  long maxEvent)
322 {
323  if (not GlobalProcHandler::startMonitoringProcess()) {
324  return;
325  }
326 
327  const auto& environment = Environment::Instance();
328 
329  B2DEBUG(10, "Will now start process monitor...");
330  const int numProcesses = environment.getNumberProcesses();
331  m_processMonitor.initialize(numProcesses);
332 
333  // Make sure the input process is running until we go on
334  m_processMonitor.waitForRunningInput(60);
335  if (m_processMonitor.hasEnded()) {
336  return;
337  }
338  // Make sure the output process is running until we go on
339  m_processMonitor.waitForRunningOutput(60);
340  if (m_processMonitor.hasEnded()) {
341  return;
342  }
343 
344  installMainSignalHandlers(storeSignal);
345 
346  // at least start the number of workers requested
347  runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
348 
349  const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
350  const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
351 
352  B2DEBUG(10, "Will now start main loop...");
353  while (true) {
354  // check multicast for messages and kill workers if requested
355  m_processMonitor.checkMulticast();
356  // check the child processes, if one has died
357  m_processMonitor.checkChildProcesses();
358  // check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
359  m_processMonitor.checkSignals(g_signalReceived);
360 
361  // If we have received a SIGINT signal or the last process is gone, we can end smoothly
362  if (m_processMonitor.hasEnded()) {
363  break;
364  }
365 
366  // Test if we need more workers
367  const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
368  if (neededWorkers > 0) {
369  if (restartFailedWorkers) {
370  runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
371  } else if (failOnFailedWorkers) {
372  B2ERROR("A worker failed. Will try to end the process smoothly now.");
373  break;
374  } else if (not m_processMonitor.hasWorkers()) {
375  B2WARNING("All workers have died and you did not request to restart them. Going down now.");
376  break;
377  }
378  }
379  }
380 
381  B2DEBUG(10, "Finished the monitoring process");
382 }
383 
384 void ZMQEventProcessor::forkAndRun(long maxEvent, const PathPtr& inputPath, const PathPtr& mainPath, const PathPtr& outputPath,
385  const ModulePtrList& terminateGlobally)
386 {
387  const int numProcesses = Environment::Instance().getNumberProcesses();
388  GlobalProcHandler::initialize(numProcesses);
389 
390  const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
391 
392  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
393  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
394  const auto controlSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_control));
395 
396  // We catch all signals and store them into a variable. This is used during the main loop then.
397  // From now on, we have to make sure to clean up behind us
398  installMainSignalHandlers(cleanupAndRaiseSignal);
399  m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
400 
401  B2DEBUG(10, "Starting input process...");
402  runInput(inputPath, terminateGlobally, maxEvent);
403  B2DEBUG(10, "Starting output process...");
404  runOutput(outputPath, terminateGlobally, maxEvent);
405 
406  B2DEBUG(10, "Starting monitoring process...");
407  runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
408 }
409 
410 void ZMQEventProcessor::cleanup()
411 {
412  if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
413  B2DEBUG(10, "Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
414  return;
415  }
416  m_processMonitor.killProcesses(5);
417  m_processMonitor.terminate();
418 
419  deleteSocketFiles();
420 }
Belle2::StreamHelper::stream
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:31
Belle2::StreamHelper::initialize
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:20
Belle2::c_output
@ c_output
Input socket.
Definition: ZMQAddressUtils.h:30
Belle2::c_sub
@ c_sub
Multicast publish socket.
Definition: ZMQAddressUtils.h:32
Belle2::ZMQEventProcessor
This class provides the core event processing loop for parallel processing with ZMQ.
Definition: ZMQEventProcessor.h:33
Belle2::StreamHelper
Helper class for data store serialization.
Definition: StreamHelper.h:33
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ModulePtrList
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:586
Belle2::PathPtr
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:30
Belle2::ZMQEventProcessor::cleanup
void cleanup()
clean up IPC resources (should only be called in one process).
Definition: ZMQEventProcessor.cc:410
Belle2::ZMQClient::publish
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:62
Belle2::ModulePtr
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:42
Belle2::ZMQClient::initialize
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:55
alignment.constraints_generator.filename
filename
File name.
Definition: constraints_generator.py:224
Belle2::c_pub
@ c_pub
Output socket.
Definition: ZMQAddressUtils.h:31
Belle2::ZMQClient
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:32
Belle2::c_control
@ c_control
Multicast subscribe socket.
Definition: ZMQAddressUtils.h:33