Belle II Software development
pEventProcessor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/pEventProcessor.h>
10#include <framework/pcore/ProcHandler.h>
11#include <framework/pcore/RingBuffer.h>
12#include <framework/pcore/RxModule.h>
13#include <framework/pcore/TxModule.h>
14#include <framework/pcore/DataStoreStreamer.h>
15#include <framework/pcore/RbTuple.h>
16
17#include <framework/core/ModuleManager.h>
18#include <framework/core/Environment.h>
19#include <framework/logging/LogSystem.h>
20
21#include <TROOT.h>
22
23#include <csignal>
24#include <memory>
25
26
27using namespace std;
28using namespace Belle2;
29
30namespace {
31 static int gSignalReceived = 0;
32
33 static pEventProcessor* g_pEventProcessor = nullptr;
34
35 void cleanupIPC()
36 {
37 if (g_pEventProcessor)
38 g_pEventProcessor->cleanup();
39 }
40 void cleanupAndStop(int sig)
41 {
42 cleanupIPC();
43
44 //uninstall current handler and call default one.
45 signal(sig, SIG_DFL);
46 raise(sig);
47 }
48
49 static void parentSignalHandler(int signal)
50 {
51 //signal handlers are called asynchronously, making many standard functions (including output) dangerous
52 if (signal == SIGINT) {
53 g_pEventProcessor->gotSigINT();
54 } else if (signal == SIGTERM or signal == SIGQUIT) {
55 g_pEventProcessor->killRingBuffers();
56 }
57 if (gSignalReceived == 0)
58 gSignalReceived = signal;
59 }
60}
61
63 m_histoman(nullptr)
64{
65 g_pEventProcessor = this;
66}
67
68
70{
71 cleanup();
72 g_pEventProcessor = nullptr;
73}
74
76{
78 delete m_rbin;
79 m_rbin = nullptr;
80 delete m_rbout;
81 m_rbout = nullptr;
82 }
83}
84
85
87{
88 EventProcessor::writeToStdErr("\nStopping basf2...\n");
90}
91
93{
94 m_rbin->kill();
95 //these might be locked by _this_ process, so we cannot escape
96 m_rbout->kill(); //atomic, so doesn't lock
97}
98
100{
101 //B2WARNING("list of files: " << gROOT->GetListOfFiles()->GetEntries());
102 //clear list, but don't actually delete the objects
103 gROOT->GetListOfFiles()->Clear("nodelete");
104}
105
106
107void pEventProcessor::process(const PathPtr& spath, long maxEvent)
108{
109 if (spath->getModules().size() == 0) return;
110
111 const int numProcesses = Environment::Instance().getNumberProcesses();
112
113 //Check whether the number of events was set via command line argument
114 unsigned int numEventsArgument = Environment::Instance().getNumberEventsOverride();
115 if ((numEventsArgument > 0) && ((maxEvent == 0) || (maxEvent > numEventsArgument))) {
116 maxEvent = numEventsArgument;
117 }
118
119 if (numProcesses == 0)
120 B2FATAL("pEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
121
122 // 1. Analyze start path and split into parallel paths
123 analyzePath(spath);
124
125
126 if (m_inputPath) {
127 B2INFO("Input Path " << m_inputPath->getPathString());
128 }
129 if (m_mainPath) {
130 if (m_mainPath->getModules().size() <= 5) {
131 B2INFO("Main Path " << m_mainPath->getPathString());
132 } else {
133 B2INFO("Main Path [" << m_mainPath->getModules().front()->getName() << " -> ... (" << m_mainPath->getModules().size() - 2 <<
134 " further modules) ... -> " << m_mainPath->getModules().back()->getName() << " ]");
135 }
136 }
137 if (m_outputPath) {
138 B2INFO("Output Path " << m_outputPath->getPathString());
139 }
140 if (not m_mainPath) {
141 B2WARNING("Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
142 EventProcessor::process(spath, maxEvent);
143 return;
144 }
145
146 installMainSignalHandlers(cleanupAndStop);
147
148 //inserts Rx/Tx modules into path (sets up IPC structures)
149 preparePaths();
150
151 // ensure that we free the IPC resources!
152 atexit(cleanupIPC);
153
154
155 //init statistics
156 {
157 m_processStatisticsPtr.registerInDataStore();
159 m_processStatisticsPtr.create();
160 Path mergedPath;
161 if (m_inputPath)
162 mergedPath.addPath(m_inputPath);
163 mergedPath.addPath(m_mainPath);
164 if (m_outputPath)
165 mergedPath.addPath(m_outputPath);
166 for (const ModulePtr& module : mergedPath.buildModulePathList())
167 m_processStatisticsPtr->initModule(module.get());
168 }
169
170 // 2. Initialization
171 ModulePtrList modulelist = spath->buildModulePathList();
173 //dump_modules("Initializing globally: ", initGlobally);
174 processInitialize(initGlobally);
175
177
178 //Don't start processing in case of no master module
179 if (!m_master) {
180 B2ERROR("There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
181 }
182
183 //Check if errors appeared. If yes, don't start the event processing.
185 if (numLogError != 0) {
186 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
187 }
188
189 //disable ROOT's management of TFiles
191
192
193 //install new signal handlers before forking
194 installMainSignalHandlers(parentSignalHandler);
195
196 //Path for current process
197 PathPtr localPath;
198
199 m_procHandler = std::make_unique<ProcHandler>(numProcesses);
200
201 // 3. Fork input path
202 m_procHandler->startInputProcess();
203 if (m_procHandler->isInputProcess()) { // In input process
204 localPath = m_inputPath;
205 } else {
206 // This is not the input path, clean up datastore to not contain the first event
208 }
209
210 if (localPath == nullptr) { //not forked yet
211 // 5. Fork out worker path (parallel section)
212 m_procHandler->startWorkerProcesses();
213 if (m_procHandler->isWorkerProcess()) {
214 localPath = m_mainPath;
215 m_master = localPath->getModules().begin()->get(); //set Rx as master
216 }
217 }
218
219 if (localPath == nullptr) { //not forked yet -> this process is the output process
220 m_procHandler->startOutputProcess();
221 if (m_outputPath) {
222 localPath = m_outputPath;
223 m_master = localPath->getModules().begin()->get(); //set Rx as master
224 }
225 }
226
227
228 if (!m_procHandler->isOutputProcess()) {
230 }
231
232 bool gotSigINT = false;
233 if (localPath != nullptr) {
234 ModulePtrList localModules = localPath->buildModulePathList();
235 ModulePtrList procinitmodules = getModulesWithFlag(localModules, Module::c_InternalSerializer);
236 //dump_modules("processInitialize for ", procinitmodules);
237 processInitialize(procinitmodules);
238
239 //input: handle signals normally, will slowly cascade down
240 if (m_procHandler->isInputProcess())
242 //workers will have to ignore the signals, there's no good way to do this safely
243 if (m_procHandler->isWorkerProcess())
245
246 try {
247 processCore(localPath, localModules, maxEvent, m_procHandler->isInputProcess());
248 } catch (StoppedBySignalException& e) {
249 if (e.signal != SIGINT) {
250 B2FATAL(e.what());
251 }
252 //in case of SIGINT, we move on to processTerminate() to shut down safely
253 gotSigINT = true;
254 }
255 prependModulesIfNotPresent(&localModules, terminateGlobally);
256 processTerminate(localModules);
257 }
258
259 B2INFO(m_procHandler->getProcessName() << " process finished.");
260
261 //all processes except output stop here
262 if (!m_procHandler->isOutputProcess()) {
263 if (gotSigINT) {
264 installSignalHandler(SIGINT, SIG_DFL);
265 raise(SIGINT);
266 } else {
267 exit(0);
268 }
269 }
270
271 //output process: do final cleanup
272 m_procHandler->waitForAllProcesses();
273 B2INFO("All processes completed");
274
275 //finished, disable handler again
276 installSignalHandler(SIGINT, SIG_IGN);
277
278 cleanup();
279 B2INFO("Global process: completed");
280
281 if (m_histoman) {
282 B2INFO("HistoManager:: adding histogram files");
284 }
285
286 //did anything bad happen?
287 if (gSignalReceived) {
288 B2ERROR("Processing aborted via signal " << gSignalReceived <<
289 ", terminating. Output files have been closed safely and should be readable.");
290 installSignalHandler(gSignalReceived, SIG_DFL);
291 raise(gSignalReceived);
292 }
293
294}
295
297{
298 //modules that can be parallelised, but should not go into a parallel section by themselves
299 std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
300
301 PathPtr inpath(new Path);
302 PathPtr mainpath(new Path);
303 PathPtr outpath(new Path);
304
305 int stage = 0; //0: in, 1: event/main, 2: out
306 for (const ModulePtr& module : path->getModules()) {
307 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
308 //entire conditional path must also be compatible
309 if (hasParallelFlag and module->hasCondition()) {
310 for (const auto& conditionPath : module->getAllConditionPaths()) {
312 hasParallelFlag = false;
313 }
314 }
315 }
316
317 //update stage?
318 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
319 stage++;
320
321 if (stage == 2) {
322 bool path_is_useful = false;
323 for (const auto& parallelModule : mainpath->getModules()) {
324 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
325 path_is_useful = true;
326 break;
327 }
328 }
329 if (not path_is_useful) {
330 //merge mainpath back into input path
331 inpath->addPath(mainpath);
332 mainpath.reset(new Path);
333 //and search for further parallel sections
334 stage = 0;
335 }
336 }
337 }
338
339 if (stage == 0) { //fill input path
340 inpath->addModule(module);
341
342 if (module->hasProperties(Module::c_HistogramManager)) {
343 // Initialize histogram manager if found in the path
344 m_histoman = module;
345
346 //add histoman to other paths
347 mainpath->addModule(m_histoman);
348 outpath->addModule(m_histoman);
349 }
350 }
351
352 if (stage == 1)
353 mainpath->addModule(module);
354 if (stage == 2)
355 outpath->addModule(module);
356 }
357
358 bool createAllPaths = false; //usually we might not need e.g. an output path
359 for (const ModulePtr& module : path->getModules()) {
360 if (module->hasProperties(Module::c_TerminateInAllProcesses))
361 createAllPaths = true; //ensure there are all kinds of processes
362 }
363
364 //if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
365 if (!mainpath->isEmpty())
366 m_mainPath = mainpath;
367 if (createAllPaths or !inpath->isEmpty())
368 m_inputPath = inpath;
369 if (createAllPaths or !outpath->isEmpty())
370 m_outputPath = outpath;
371}
372
374{
375 //create ringbuffers and add rx/tx where needed
376 const char* inrbname = getenv(name);
377 RingBuffer* rbuf;
378 if (inrbname == nullptr) {
379 rbuf = new RingBuffer();
380 } else {
381 string rbname(inrbname + to_string(0)); //currently at most one input, one output buffer
382 rbuf = new RingBuffer(rbname.c_str(), RingBuffer::c_DefaultSize);
383 }
384
385 // Insert Tx at the end of current path
386 ModulePtr txptr(new TxModule(rbuf));
387 a->addModule(txptr);
388 // Insert Rx at beginning of next path
389 ModulePtr rxptr(new RxModule(rbuf));
390 PathPtr newB(new Path());
391 newB->addModule(rxptr);
392 newB->addPath(b);
393 b.swap(newB);
394
395 return rbuf;
396}
397
399{
400 if (m_histoman) {
401 m_histoman->initialize();
402 }
403 if (not m_mainPath)
404 return; //we'll fall back to single-core
405
406 if (m_inputPath)
408 if (m_outputPath)
410}
411
412
413void pEventProcessor::dump_modules(const std::string& title, const ModulePtrList& modlist)
414{
415 ModulePtrList::const_iterator it;
416 std::ostringstream strbuf;
417 strbuf << title << " : ";
418 for (it = modlist.begin(); it != modlist.end(); ++it) {
419 const Module* module = it->get();
420 strbuf << module->getName();
421 if (module->hasCondition()) {
422 for (const auto& conditionPath : module->getAllConditionPaths()) {
423 strbuf << "[->" << conditionPath.get() << "] ";
424 }
425 }
426 if (*it != modlist.back())
427 strbuf << " -> ";
428 }
429 B2INFO(strbuf.str());
430}
431
432
434{
435 ModulePtrList tmpModuleList;
436 for (const ModulePtr& m : modules) {
437 if (m->hasProperties(flag))
438 tmpModuleList.push_back(m);
439 }
440
441 return tmpModuleList;
442}
444{
445 ModulePtrList tmpModuleList;
446 for (const ModulePtr& m : modules) {
447 if (!m->hasProperties(flag))
448 tmpModuleList.push_back(m);
449 }
450 return tmpModuleList;
451}
452
454{
455 for (const ModulePtr& m : prependModules) {
456 if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
457 modules->push_front(m);
458 }
459 }
460}
static void removeSideEffects()
call clear() and removeSideEffects() for all Mergeable objects in datastore (for c_Persistent durabil...
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:715
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:157
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
unsigned int getNumberEventsOverride() const
Returns number of events in run 1 for EventInfoSetter module, or 0 for no override.
Definition: Environment.h:67
Exception thrown when execution is stopped by a signal.
provides the core event processing loop.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true)
Processes the full module chain consisting of an arbitrary number of connected paths,...
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
const Module * m_master
The master module that determines the experiment/run/event number.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
Base class for Modules.
Definition: Module.h:72
EModulePropFlags
Each module can be tagged with property flags, which indicate certain features of the module.
Definition: Module.h:77
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_InternalSerializer
This module is an internal serializer/deserializer for parallel processing.
Definition: Module.h:82
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
std::list< ModulePtr > getModules() const override
no submodules, return empty list
Definition: Module.h:506
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:67
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:37
static bool isOutputProcess()
Return true if the process is an output process.
Definition: ProcHandler.cc:232
static bool parallelProcessingUsed()
Returns true if multiple processes have been spawned, false in single-core mode.
Definition: ProcHandler.cc:226
static RbTupleManager & Instance()
Access to singleton.
Definition: RbTuple.cc:40
int hadd(bool deleteflag=true)
Functions to add up all histogram files.
Definition: RbTuple.cc:138
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
static const int c_DefaultSize
Standard size of buffer, in integers (~60MB).
Definition: RingBuffer.h:42
void kill()
Cause termination of reading processes (if they use isDead()).
Definition: RingBuffer.cc:389
Module to decode data store contents from RingBuffer.
Definition: RxModule.h:25
Module for encoding data store contents into a RingBuffer.
Definition: TxModule.h:25
This class provides the core event processing loop for parallel processing.
void preparePaths()
Adds internal modules to paths, prepare RingBuffers.
void dump_modules(const std::string &, const ModulePtrList &)
Dump module names in the ModulePtrList.
PathPtr m_outputPath
Output path.
static ModulePtrList getModulesWithFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which have the given Module flag set.
RingBuffer * m_rbin
input RingBuffer
RingBuffer * m_rbout
output RingBuffer
std::unique_ptr< ProcHandler > m_procHandler
handler to fork and manage processes.
void cleanup()
clean up IPC resources (should only be called in one process).
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain, starting with the first module in the given path.
RingBuffer * connectViaRingBuffer(const char *name, const PathPtr &a, PathPtr &b)
Create RingBuffer with name from given environment variable, add Tx and Rx modules to a and b.
pEventProcessor()
Constructor.
ModulePtr m_histoman
Pointer to HistoManagerModule, or nullptr if not found.
void analyzePath(const PathPtr &path)
Analyze given path.
void gotSigINT()
signal handler for Ctrl+C (async-safe)
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
virtual ~pEventProcessor()
Destructor.
PathPtr m_mainPath
Main (parallel section) path.
static ModulePtrList getModulesWithoutFlag(const ModulePtrList &modules, Module::EModulePropFlags flag)
Return only modules which do not have the given Module flag set.
PathPtr m_inputPath
Input path.
void killRingBuffers()
signal handler (async-safe)
void clearFileList()
TFiles are stored in a global list and cleaned up by root since this will happen in all forked proces...
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
Abstract base class for different kinds of events.
STL namespace.