Belle II Software  release-06-00-14
ZMQEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/GlobalProcHandler.h>
11 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/PathUtils.h>
15 
16 #include <framework/pcore/ZMQEventProcessor.h>
17 #include <framework/pcore/DataStoreStreamer.h>
18 #include <framework/pcore/RbTuple.h>
19 
20 #include <framework/core/Environment.h>
21 #include <framework/logging/LogSystem.h>
22 
23 #include <TROOT.h>
24 
25 #include <sys/stat.h>
26 
27 #include <csignal>
28 #include <fstream>
29 
30 using namespace std;
31 using namespace Belle2;
32 
33 namespace {
41  static int g_signalReceived = 0;
42 
44  static ZMQEventProcessor* g_eventProcessorForSignalHandling = nullptr;
45 
46  static void cleanupAndRaiseSignal(int signalNumber)
47  {
48  if (g_eventProcessorForSignalHandling) {
49  g_eventProcessorForSignalHandling->cleanup();
50  }
51  // uninstall current handler and call default one.
52  signal(signalNumber, SIG_DFL);
53  raise(signalNumber);
54  }
55 
56  static void storeSignal(int signalNumber)
57  {
58  if (signalNumber == SIGINT) {
59  EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
60  }
61 
62  // We do not want to remove the first signal
63  if (g_signalReceived == 0) {
64  g_signalReceived = signalNumber;
65  }
66  }
67 
69  std::string g_socketAddress = "";
70 
71  void deleteSocketFiles()
72  {
73  if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
74  return;
75  }
76 
77  const std::vector<ZMQAddressType> socketAddressList = {ZMQAddressType::c_input, ZMQAddressType::c_output, ZMQAddressType::c_pub, ZMQAddressType::c_sub, ZMQAddressType::c_control};
78  const auto seperatorPos = g_socketAddress.find("://");
79 
80  if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
81  return;
82  }
83 
84  const std::string filename(g_socketAddress.substr(seperatorPos + 3));
85 
86  struct stat buffer;
87  for (const auto socketAdressType : socketAddressList) {
88  const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
89  if (stat(socketAddress.c_str(), &buffer) == 0) {
90  remove(socketAddress.c_str());
91  }
92  }
93  }
94 } // namespace
95 
96 ZMQEventProcessor::ZMQEventProcessor()
97 {
98  B2ASSERT("You are having two instances of the ZMQEventProcessor running! This is not possible",
99  not g_eventProcessorForSignalHandling);
100  g_eventProcessorForSignalHandling = this;
101 
102  // Make sure to remove the sockets
103  g_socketAddress = Environment::Instance().getZMQSocketAddress();
104  std::atexit(deleteSocketFiles);
105 }
106 
107 ZMQEventProcessor::~ZMQEventProcessor()
108 {
109  cleanup();
110  g_eventProcessorForSignalHandling = nullptr;
111 }
112 
113 void ZMQEventProcessor::process(const PathPtr& path, long maxEvent)
114 {
115  // Concerning signal handling:
116  // * During the initialization, we just raise the signal without doing any cleanup etc.
117  // * During the event execution, we will not allow for any signal in all processes except the parent process.
118  // Here, we catch sigint and clean up the processes AND WHAT DO WE DO IN THE OTHER CASES?
119  // * During cleanup, we will just ignore sigint, but the rest will be raised
120 
121  if (path->isEmpty()) {
122  return;
123  }
124 
125  const int numProcesses = Environment::Instance().getNumberProcesses();
126  if (numProcesses == 0) {
127  B2FATAL("ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
128  }
129 
130  // Split the path into input, main and output. A nullptr means, the path should not be used
131  PathPtr inputPath, mainPath, outputPath;
132  std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
133  const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath, mainPath, outputPath);
134 
135  if (not mainPath or mainPath->isEmpty()) {
136  B2WARNING("Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
137  EventProcessor::process(path, maxEvent);
138  return;
139  }
140 
141  // inserts Rx/Tx modules into path (sets up IPC structures)
142  const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
143 
144  B2DEBUG(10, "Initialisation phase");
145  // Run the initialization of the modules and the histogram manager
146  initialize(moduleList, histogramManager);
147 
148  B2DEBUG(10, "Main phase");
149  // The main part: fork into the different processes and run!
150  const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
151  forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
152 
153  B2DEBUG(10, "Terminate phase");
154  installMainSignalHandlers(cleanupAndRaiseSignal);
155  // Run the final termination and cleanup with error check
156  terminateAndCleanup(histogramManager);
157 }
158 
159 void ZMQEventProcessor::initialize(const ModulePtrList& moduleList, const ModulePtr& histogramManager)
160 {
161  if (histogramManager) {
162  histogramManager->initialize();
163  }
164  // from now on the datastore is available
165  processInitialize(moduleList, true);
166 
167  // Don't start processing in case of no master module
168  if (!m_master) {
169  B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
170  }
171 
172  // Check if errors appeared. If yes, don't start the event processing.
173  int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
174  if (numLogError != 0) {
175  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
176  }
177 
178  // TODO: I do not really understand what is going on here...
184  // disable ROOT's management of TFiles
185  // clear list, but don't actually delete the objects
186  gROOT->GetListOfFiles()->Clear("nodelete");
187 }
188 
189 void ZMQEventProcessor::terminateAndCleanup(const ModulePtr& histogramManager)
190 {
191  cleanup();
192 
193  if (histogramManager) {
194  B2INFO("HistoManager:: adding histogram files");
195  RbTupleManager::Instance().hadd();
196  }
197 
198  // did anything bad happen?
199  if (g_signalReceived) {
200  if (g_signalReceived == SIGINT) {
201  B2RESULT("Processing aborted via signal " << g_signalReceived <<
202  ", terminating. Output files have been closed safely and should be readable.");
203  } else {
204  B2ERROR("Processing aborted via signal " << g_signalReceived <<
205  ", terminating. Output files have been closed safely and should be readable.");
206  }
207  // re-raise the signal
208  installSignalHandler(g_signalReceived, SIG_DFL);
209  raise(g_signalReceived);
210  }
211 }
212 
213 void ZMQEventProcessor::runInput(const PathPtr& inputPath, const ModulePtrList& terminateGlobally, long maxEvent)
214 {
215  if (not inputPath or inputPath->isEmpty()) {
216  return;
217  }
218 
219  if (not GlobalProcHandler::startInputProcess()) {
220  // This is not the input process, clean up datastore to not contain the first event
221  DataStore::Instance().invalidateData(DataStore::c_Event);
222  return;
223  }
224 
225  // The default will be to not do anything on signals...
226  installMainSignalHandlers(SIG_IGN);
227 
228  m_processMonitor.reset();
229  DataStoreStreamer::removeSideEffects();
230 
231  processPath(inputPath, terminateGlobally, maxEvent);
232  B2DEBUG(10, "Finished an input process");
233  exit(0);
234 }
235 
236 void ZMQEventProcessor::runOutput(const PathPtr& outputPath, const ModulePtrList& terminateGlobally, long maxEvent)
237 {
238  const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
239  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
240  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
241 
242  if (not outputPath or outputPath->isEmpty()) {
243  return;
244  }
245 
246  if (not GlobalProcHandler::startOutputProcess()) {
247  return;
248  }
249 
250  // The default will be to not do anything on signals...
251  installMainSignalHandlers(SIG_IGN);
252 
253  m_processMonitor.reset();
254 
255  // Set the rx module as main module
256  m_master = outputPath->getModules().begin()->get();
257 
258  processPath(outputPath, terminateGlobally, maxEvent);
259 
260  // Send the statistics to the process monitor
261  StreamHelper streamer;
262  ZMQClient zmqClient;
263 
264  // TODO: true?
265  streamer.initialize(0, true);
266  zmqClient.initialize(pubSocketAddress, subSocketAddress);
267 
268  // TODO: make sure to only send statistics!
269  const auto& evtMessage = streamer.stream();
270  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
271  zmqClient.publish(std::move(message));
272 
273  B2DEBUG(10, "Finished an output process");
274  exit(0);
275 }
276 void ZMQEventProcessor::runWorker(unsigned int numProcesses, const PathPtr& inputPath, const PathPtr& mainPath,
277  const ModulePtrList& terminateGlobally, long maxEvent)
278 {
279  if (numProcesses == 0) {
280  return;
281  }
282 
283  if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
284  // Make sure the worker process is running until we go on
285  m_processMonitor.waitForRunningWorker(60);
286  return;
287  }
288 
289  // The default will be to not do anything on signals...
290  installMainSignalHandlers(SIG_IGN);
291 
292  if (inputPath and not inputPath->isEmpty()) {
293  // set Rx as master
294  m_master = mainPath->getModules().begin()->get();
295  }
296 
297  m_processMonitor.reset();
298  DataStoreStreamer::removeSideEffects();
299 
300  processPath(mainPath, terminateGlobally, maxEvent);
301  B2DEBUG(10, "Finished a worker process");
302  exit(0);
303 }
304 
305 void ZMQEventProcessor::processPath(const PathPtr& localPath, const ModulePtrList& terminateGlobally, long maxEvent)
306 {
307  ModulePtrList localModules = localPath->buildModulePathList();
308  maxEvent = getMaximumEventNumber(maxEvent);
309  // we are not using the default signal handler, so the processCore can not throw any exception because if sigint...
310  processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input));
311 
312  B2DEBUG(10, "terminate process...");
313  PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
314  processTerminate(localModules);
315 }
316 
317 
318 void ZMQEventProcessor::runMonitoring(const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally,
319  long maxEvent)
320 {
321  if (not GlobalProcHandler::startMonitoringProcess()) {
322  return;
323  }
324 
325  const auto& environment = Environment::Instance();
326 
327  B2DEBUG(10, "Will now start process monitor...");
328  const int numProcesses = environment.getNumberProcesses();
329  m_processMonitor.initialize(numProcesses);
330 
331  // Make sure the input process is running until we go on
332  m_processMonitor.waitForRunningInput(60);
333  if (m_processMonitor.hasEnded()) {
334  return;
335  }
336  // Make sure the output process is running until we go on
337  m_processMonitor.waitForRunningOutput(60);
338  if (m_processMonitor.hasEnded()) {
339  return;
340  }
341 
342  installMainSignalHandlers(storeSignal);
343 
344  // at least start the number of workers requested
345  runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
346 
347  const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
348  const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
349 
350  B2DEBUG(10, "Will now start main loop...");
351  while (true) {
352  // check multicast for messages and kill workers if requested
353  m_processMonitor.checkMulticast();
354  // check the child processes, if one has died
355  m_processMonitor.checkChildProcesses();
356  // check if we have received any signal from the user or OS. Kill the processes if not SIGINT.
357  m_processMonitor.checkSignals(g_signalReceived);
358 
359  // If we have received a SIGINT signal or the last process is gone, we can end smoothly
360  if (m_processMonitor.hasEnded()) {
361  break;
362  }
363 
364  // Test if we need more workers
365  const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
366  if (neededWorkers > 0) {
367  if (restartFailedWorkers) {
368  runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
369  } else if (failOnFailedWorkers) {
370  B2ERROR("A worker failed. Will try to end the process smoothly now.");
371  break;
372  } else if (not m_processMonitor.hasWorkers()) {
373  B2WARNING("All workers have died and you did not request to restart them. Going down now.");
374  break;
375  }
376  }
377  }
378 
379  B2DEBUG(10, "Finished the monitoring process");
380 }
381 
382 void ZMQEventProcessor::forkAndRun(long maxEvent, const PathPtr& inputPath, const PathPtr& mainPath, const PathPtr& outputPath,
383  const ModulePtrList& terminateGlobally)
384 {
385  const int numProcesses = Environment::Instance().getNumberProcesses();
386  GlobalProcHandler::initialize(numProcesses);
387 
388  const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
389 
390  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
391  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
392  const auto controlSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_control));
393 
394  // We catch all signals and store them into a variable. This is used during the main loop then.
395  // From now on, we have to make sure to clean up behind us
396  installMainSignalHandlers(cleanupAndRaiseSignal);
397  m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
398 
399  B2DEBUG(10, "Starting input process...");
400  runInput(inputPath, terminateGlobally, maxEvent);
401  B2DEBUG(10, "Starting output process...");
402  runOutput(outputPath, terminateGlobally, maxEvent);
403 
404  B2DEBUG(10, "Starting monitoring process...");
405  runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
406 }
407 
408 void ZMQEventProcessor::cleanup()
409 {
410  if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
411  B2DEBUG(10, "Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
412  return;
413  }
414  m_processMonitor.killProcesses(5);
415  m_processMonitor.terminate();
416 
417  deleteSocketFiles();
418 }
Helper class for data store serialization.
Definition: StreamHelper.h:23
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:52
This class provides the core event processing loop for parallel processing with ZMQ.
void cleanup()
clean up IPC resources (should only be called in one process).
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
@ c_pub
Output socket.
@ c_output
Input socket.
Abstract base class for different kinds of events.