Belle II Software  release-08-01-10
pEventProcessor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/pEventProcessor.h>
10 #include <framework/pcore/ProcHandler.h>
11 #include <framework/pcore/RingBuffer.h>
12 #include <framework/pcore/RxModule.h>
13 #include <framework/pcore/TxModule.h>
14 #include <framework/pcore/DataStoreStreamer.h>
15 #include <framework/pcore/RbTuple.h>
16 
17 #include <framework/core/ModuleManager.h>
18 #include <framework/core/Environment.h>
19 #include <framework/logging/LogSystem.h>
20 
21 #include <TROOT.h>
22 
23 #include <csignal>
24 #include <memory>
25 
26 
27 using namespace std;
28 using namespace Belle2;
29 
30 namespace {
31  static int gSignalReceived = 0;
32 
33  static pEventProcessor* g_pEventProcessor = nullptr;
34 
35  void cleanupIPC()
36  {
37  if (g_pEventProcessor)
38  g_pEventProcessor->cleanup();
39  }
40  void cleanupAndStop(int sig)
41  {
42  cleanupIPC();
43 
44  //uninstall current handler and call default one.
45  signal(sig, SIG_DFL);
46  raise(sig);
47  }
48 
49  static void parentSignalHandler(int signal)
50  {
51  //signal handlers are called asynchronously, making many standard functions (including output) dangerous
52  if (signal == SIGINT) {
53  g_pEventProcessor->gotSigINT();
54  } else if (signal == SIGTERM or signal == SIGQUIT) {
55  g_pEventProcessor->killRingBuffers();
56  }
57  if (gSignalReceived == 0)
58  gSignalReceived = signal;
59  }
60 }
61 
62 pEventProcessor::pEventProcessor() : EventProcessor(),
63  m_histoman(nullptr)
64 {
65  g_pEventProcessor = this;
66 }
67 
68 
70 {
71  cleanup();
72  g_pEventProcessor = nullptr;
73 }
74 
76 {
78  delete m_rbin;
79  m_rbin = nullptr;
80  delete m_rbout;
81  m_rbout = nullptr;
82  }
83 }
84 
85 
87 {
88  EventProcessor::writeToStdErr("\nStopping basf2...\n");
90 }
91 
93 {
94  m_rbin->kill();
95  //these might be locked by _this_ process, so we cannot escape
96  m_rbout->kill(); //atomic, so doesn't lock
97 }
98 
100 {
101  //B2WARNING("list of files: " << gROOT->GetListOfFiles()->GetEntries());
102  //clear list, but don't actually delete the objects
103  gROOT->GetListOfFiles()->Clear("nodelete");
104 }
105 
106 
107 void pEventProcessor::process(const PathPtr& spath, long maxEvent)
108 {
109  if (spath->getModules().size() == 0) return;
110 
111  const int numProcesses = Environment::Instance().getNumberProcesses();
112 
113  //Check whether the number of events was set via command line argument
114  unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
115  if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
116  maxEvent = numEventsArgument;
117  }
118 
119  if (numProcesses == 0)
120  B2FATAL("pEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
121 
122  // 1. Analyze start path and split into parallel paths
123  analyzePath(spath);
124 
125 
126  if (m_inputPath) {
127  B2INFO("Input Path " << m_inputPath->getPathString());
128  }
129  if (m_mainPath) {
130  if (m_mainPath->getModules().size() <= 5) {
131  B2INFO("Main Path " << m_mainPath->getPathString());
132  } else {
133  B2INFO("Main Path [" << m_mainPath->getModules().front()->getName() << " -> ... (" << m_mainPath->getModules().size() - 2 <<
134  " further modules) ... -> " << m_mainPath->getModules().back()->getName() << " ]");
135  }
136  }
137  if (m_outputPath) {
138  B2INFO("Output Path " << m_outputPath->getPathString());
139  }
140  if (not m_mainPath) {
141  B2WARNING("Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
142  EventProcessor::process(spath, maxEvent);
143  return;
144  }
145 
146  installMainSignalHandlers(cleanupAndStop);
147 
148  //inserts Rx/Tx modules into path (sets up IPC structures)
149  preparePaths();
150 
151  // ensure that we free the IPC resources!
152  atexit(cleanupIPC);
153 
154 
155  //init statistics
156  {
157  m_processStatisticsPtr.registerInDataStore();
159  m_processStatisticsPtr.create();
160  Path mergedPath;
161  if (m_inputPath)
162  mergedPath.addPath(m_inputPath);
163  mergedPath.addPath(m_mainPath);
164  if (m_outputPath)
165  mergedPath.addPath(m_outputPath);
166  for (const ModulePtr& module : mergedPath.buildModulePathList())
167  m_processStatisticsPtr->initModule(module.get());
168  }
169 
170  // 2. Initialization
171  ModulePtrList modulelist = spath->buildModulePathList();
173  //dump_modules("Initializing globally: ", initGlobally);
174  processInitialize(initGlobally);
175 
176  ModulePtrList terminateGlobally = getModulesWithFlag(modulelist, Module::c_TerminateInAllProcesses);
177 
178  //Don't start processing in case of no master module
179  if (!m_master) {
180  B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
181  }
182 
183  //Check if errors appeared. If yes, don't start the event processing.
185  if (numLogError != 0) {
186  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
187  }
188 
189  //disable ROOT's management of TFiles
190  clearFileList();
191 
192 
193  //install new signal handlers before forking
194  installMainSignalHandlers(parentSignalHandler);
195 
196  //Path for current process
197  PathPtr localPath;
198 
199  m_procHandler = std::make_unique<ProcHandler>(numProcesses);
200 
201  // 3. Fork input path
202  m_procHandler->startInputProcess();
203  if (m_procHandler->isInputProcess()) { // In input process
204  localPath = m_inputPath;
205  } else {
206  // This is not the input path, clean up datastore to not contain the first event
208  }
209 
210  if (localPath == nullptr) { //not forked yet
211  // 5. Fork out worker path (parallel section)
212  m_procHandler->startWorkerProcesses();
213  if (m_procHandler->isWorkerProcess()) {
214  localPath = m_mainPath;
215  m_master = localPath->getModules().begin()->get(); //set Rx as master
216  }
217  }
218 
219  if (localPath == nullptr) { //not forked yet -> this process is the output process
220  m_procHandler->startOutputProcess();
221  if (m_outputPath) {
222  localPath = m_outputPath;
223  m_master = localPath->getModules().begin()->get(); //set Rx as master
224  }
225  }
226 
227 
228  if (!m_procHandler->isOutputProcess()) {
230  }
231 
232  bool gotSigINT = false;
233  if (localPath != nullptr) {
234  ModulePtrList localModules = localPath->buildModulePathList();
235  ModulePtrList procinitmodules = getModulesWithFlag(localModules, Module::c_InternalSerializer);
236  //dump_modules("processInitialize for ", procinitmodules);
237  processInitialize(procinitmodules);
238 
239  //input: handle signals normally, will slowly cascade down
240  if (m_procHandler->isInputProcess())
242  //workers will have to ignore the signals, there's no good way to do this safely
243  if (m_procHandler->isWorkerProcess())
244  installMainSignalHandlers(SIG_IGN);
245 
246  try {
247  processCore(localPath, localModules, maxEvent, m_procHandler->isInputProcess());
248  } catch (StoppedBySignalException& e) {
249  if (e.signal != SIGINT) {
250  B2FATAL(e.what());
251  }
252  //in case of SIGINT, we move on to processTerminate() to shut down safely
253  gotSigINT = true;
254  }
255  prependModulesIfNotPresent(&localModules, terminateGlobally);
256  processTerminate(localModules);
257  }
258 
259  B2INFO(m_procHandler->getProcessName() << " process finished.");
260 
261  //all processes except output stop here
262  if (!m_procHandler->isOutputProcess()) {
263  if (gotSigINT) {
264  installSignalHandler(SIGINT, SIG_DFL);
265  raise(SIGINT);
266  } else {
267  exit(0);
268  }
269  }
270 
271  //output process: do final cleanup
272  m_procHandler->waitForAllProcesses();
273  B2INFO("All processes completed");
274 
275  //finished, disable handler again
276  installSignalHandler(SIGINT, SIG_IGN);
277 
278  cleanup();
279  B2INFO("Global process: completed");
280 
281  if (m_histoman) {
282  B2INFO("HistoManager:: adding histogram files");
284  }
285 
286  //did anything bad happen?
287  if (gSignalReceived) {
288  B2ERROR("Processing aborted via signal " << gSignalReceived <<
289  ", terminating. Output files have been closed safely and should be readable.");
290  installSignalHandler(gSignalReceived, SIG_DFL);
291  raise(gSignalReceived);
292  }
293 
294 }
295 
297 {
298  //modules that can be parallelised, but should not go into a parallel section by themselves
299  std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
300 
301  PathPtr inpath(new Path);
302  PathPtr mainpath(new Path);
303  PathPtr outpath(new Path);
304 
305  int stage = 0; //0: in, 1: event/main, 2: out
306  for (const ModulePtr& module : path->getModules()) {
307  bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
308  //entire conditional path must also be compatible
309  if (hasParallelFlag and module->hasCondition()) {
310  for (const auto& conditionPath : module->getAllConditionPaths()) {
312  hasParallelFlag = false;
313  }
314  }
315  }
316 
317  //update stage?
318  if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
319  stage++;
320 
321  if (stage == 2) {
322  bool path_is_useful = false;
323  for (const auto& parallelModule : mainpath->getModules()) {
324  if (uselessParallelModules.count(parallelModule->getType()) == 0) {
325  path_is_useful = true;
326  break;
327  }
328  }
329  if (not path_is_useful) {
330  //merge mainpath back into input path
331  inpath->addPath(mainpath);
332  mainpath.reset(new Path);
333  //and search for further parallel sections
334  stage = 0;
335  }
336  }
337  }
338 
339  if (stage == 0) { //fill input path
340  inpath->addModule(module);
341 
342  if (module->hasProperties(Module::c_HistogramManager)) {
343  // Initialize histogram manager if found in the path
344  m_histoman = module;
345 
346  //add histoman to other paths
347  mainpath->addModule(m_histoman);
348  outpath->addModule(m_histoman);
349  }
350  }
351 
352  if (stage == 1)
353  mainpath->addModule(module);
354  if (stage == 2)
355  outpath->addModule(module);
356  }
357 
358  bool createAllPaths = false; //usually we might not need e.g. an output path
359  for (const ModulePtr& module : path->getModules()) {
360  if (module->hasProperties(Module::c_TerminateInAllProcesses))
361  createAllPaths = true; //ensure there are all kinds of processes
362  }
363 
364  //if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
365  if (!mainpath->isEmpty())
366  m_mainPath = mainpath;
367  if (createAllPaths or !inpath->isEmpty())
368  m_inputPath = inpath;
369  if (createAllPaths or !outpath->isEmpty())
370  m_outputPath = outpath;
371 }
372 
374 {
375  //create ringbuffers and add rx/tx where needed
376  const char* inrbname = getenv(name);
377  RingBuffer* rbuf;
378  if (inrbname == nullptr) {
379  rbuf = new RingBuffer();
380  } else {
381  string rbname(inrbname + to_string(0)); //currently at most one input, one output buffer
382  rbuf = new RingBuffer(rbname.c_str(), RingBuffer::c_DefaultSize);
383  }
384 
385  // Insert Tx at the end of current path
386  ModulePtr txptr(new TxModule(rbuf));
387  a->addModule(txptr);
388  // Insert Rx at beginning of next path
389  ModulePtr rxptr(new RxModule(rbuf));
390  PathPtr newB(new Path());
391  newB->addModule(rxptr);
392  newB->addPath(b);
393  b.swap(newB);
394 
395  return rbuf;
396 }
397 
399 {
400  if (m_histoman) {
401  m_histoman->initialize();
402  }
403  if (not m_mainPath)
404  return; //we'll fall back to single-core
405 
406  if (m_inputPath)
408  if (m_outputPath)
410 }
411 
412 
413 void pEventProcessor::dump_modules(const std::string& title, const ModulePtrList& modlist)
414 {
415  ModulePtrList::const_iterator it;
416  std::ostringstream strbuf;
417  strbuf << title << " : ";
418  for (it = modlist.begin(); it != modlist.end(); ++it) {
419  const Module* module = it->get();
420  strbuf << module->getName();
421  if (module->hasCondition()) {
422  for (const auto& conditionPath : module->getAllConditionPaths()) {
423  strbuf << "[->" << conditionPath.get() << "] ";
424  }
425  }
426  if (*it != modlist.back())
427  strbuf << " -> ";
428  }
429  B2INFO(strbuf.str());
430 }
431 
432 
434 {
435  ModulePtrList tmpModuleList;
436  for (const ModulePtr& m : modules) {
437  if (m->hasProperties(flag))
438  tmpModuleList.push_back(m);
439  }
440 
441  return tmpModuleList;
442 }
444 {
445  ModulePtrList tmpModuleList;
446  for (const ModulePtr& m : modules) {
447  if (!m->hasProperties(flag))
448  tmpModuleList.push_back(m);
449  }
450  return tmpModuleList;
451 }
452 
454 {
455  for (const ModulePtr& m : prependModules) {
456  if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
457  modules->push_front(m);
458  }
459  }
460 }
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:715
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:145
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Definition: Environment.h:66
Exception thrown when execution is stopped by a signal.
provides the core event processing loop.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
const Module * m_master
The master module that determines the experiment/run/event number.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
Base class for Modules.
Definition: Module.h:72
std::list< ModulePtr > getModules() const override
no submodules, return empty list
Definition: Module.h:506
EModulePropFlags
Each module can be tagged with property flags, which indicate certain features of the module.
Definition: Module.h:77
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:82
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:67
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:37
static bool isOutputProcess()
Return true if the process is an output process.
Definition: ProcHandler.cc:232
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
Definition: ProcHandler.cc:226
static RbTupleManager & Instance()
Access to singleton.
Definition: RbTuple.cc:40
int hadd(bool deleteflag=true)
Functions to add up all histogram files.
Definition: RbTuple.cc:138
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
static const int c_DefaultSize
Standard size of buffer, in integers (~60MB).
Definition: RingBuffer.h:42
void kill()
Cause termination of reading processes (if they use isDead()).
Definition: RingBuffer.cc:389
Module to decode data store contents from RingBuffer.
Definition: RxModule.h:25
Module for encoding data store contents into a RingBuffer.
Definition: TxModule.h:25
This class provides the core event processing loop for parallel processing.
void preparePaths()
Adds internal modules to paths, prepare RingBuffers.
void dump_modules(const std::string &, const ModulePtrList &)
Dump module names in the ModulePtrList.
PathPtr m_outputPath
Output path.
static ModulePtrList getModulesWithFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which have the given Module flag set.
RingBuffer * m_rbin
input RingBuffer
RingBuffer * m_rbout
output RingBuffer
std::unique_ptr< ProcHandler > m_procHandler
handler to fork and manage processes.
void cleanup()
clean up IPC resources (should only be called in one process).
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain, starting with the first module in the given path.
RingBuffer * connectViaRingBuffer(const char *name, const PathPtr &a, PathPtr &b)
Create RingBuffer with name from given environment variable, add Tx and Rx modules to a and b.
ModulePtr m_histoman
Pointer to HistoManagerModule, or nullptr if not found.
void analyzePath(const PathPtr &path)
Analyze given path.
void gotSigINT()
signal handler for Ctrl+C (async-safe)
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
virtual ~pEventProcessor()
Destructor.
PathPtr m_mainPath
Main (parallel section) path.
static ModulePtrList getModulesWithoutFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which do not have the given Module flag set.
PathPtr m_inputPath
Input path.
void killRingBuffers()
signal handler (async-safe)
void clearFileList()
TFiles are stored in a global list and cleaned up by root since this will happen in all forked proces...
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
Abstract base class for different kinds of events.