Belle II Software development
Framework.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pybasf2/Framework.h>
10
11#include <framework/core/PyObjConvUtils.h>
12#include <framework/core/Environment.h>
13#include <framework/core/RandomNumbers.h>
14#include <framework/core/EventProcessor.h>
15#include <framework/core/ModuleManager.h>
16#include <framework/datastore/DataStore.h>
17#include <framework/database/DBStore.h>
18#include <framework/database/Database.h>
19#include <framework/pcore/pEventProcessor.h>
20#include <framework/pcore/ZMQEventProcessor.h>
21#include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
22#include <framework/utilities/FileSystem.h>
23#include <framework/database/Configuration.h>
24
25#include <framework/logging/Logger.h>
26#include <framework/logging/LogSystem.h>
27
28#include <boost/algorithm/string.hpp>
29#include <boost/algorithm/string/join.hpp>
30#include <boost/python.hpp>
31
32#include <set>
33#include <vector>
34
35using namespace boost::python;
36using namespace Belle2;
37
38
40{
43
46 }
48}
49
50
52{
53 //empty module manager of modules
54 //since modules may contain shared pointers of Path objects created in Python,
55 //these shared pointers have special cleanup hooks that can cause crashes if run
56 //after Py_Finalize(). The framework object is cleaned up before, so this is a good place.
59 //Also the database configuration has things to cleanup before Py_Finalize()
61}
62
63
64void Framework::addModuleSearchPath(const std::string& path)
65{
67}
68
69
70void Framework::setExternalsPath(const std::string& path)
71{
73}
74
75
76ModulePtr Framework::registerModule(const std::string& moduleName)
77{
78 return ModuleManager::Instance().registerModule(moduleName);
79}
80
81
82ModulePtr Framework::registerModule(const std::string& moduleName, const std::string& sharedLibPath)
83{
84 return ModuleManager::Instance().registerModule(moduleName, sharedLibPath);
85}
86
87
88void Framework::process(PathPtr startPath, long maxEvent)
89{
90 if (Environment::Instance().getDryRun()) {
92 return; //processing disabled!
93 }
94
95 static bool already_executed = false;
96 static std::set<const Module*> previously_run_modules; //not a shared pointer to not screw up ownership
97 static int errors_from_previous_run = 0;
98 const auto moduleListUnique = startPath->buildModulePathList(true);
99 if (already_executed) {
100 B2WARNING("Calling process() more than once per steering file is still experimental, please check results carefully! Python modules especially should reinitialise their state in initialise() to avoid problems");
101 if (startPath->buildModulePathList(true) != startPath->buildModulePathList(false)) {
102 B2FATAL("Your path contains the same module instance in multiple places. Calling process() multiple times is not implemented for this case.");
103 }
104
105 //were any modules in moduleListUnique already run?
106 for (const auto& m : moduleListUnique) {
107 if (previously_run_modules.count(m.get()) > 0) {
108 //only clone if modules have been run before
109 startPath = std::static_pointer_cast<Path>(startPath->clone());
110 break;
111 }
112 }
113 }
114 for (const auto& m : moduleListUnique) {
115 previously_run_modules.insert(m.get());
116 }
117
119 if (numLogError != errors_from_previous_run) {
120 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
121 }
122
123 try {
127
128 auto& environment = Environment::Instance();
129
130 already_executed = true;
131 if (environment.getNumberProcesses() == 0) {
132 EventProcessor processor;
133 processor.setProfileModuleName(environment.getProfileModuleName());
134 processor.process(startPath, maxEvent);
135 } else {
136 if (environment.getUseZMQ()) {
137 // If the user has not given any socket address, use a random one.
138 if (environment.getZMQSocketAddress().empty()) {
139 environment.setZMQSocketAddress(ZMQAddressUtils::randomSocketName());
140 }
141 ZMQEventProcessor processor;
142 processor.process(startPath, maxEvent);
143 } else {
144 pEventProcessor processor;
145 processor.process(startPath, maxEvent);
146 }
147 }
148 errors_from_previous_run = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
149
151 // Also, reset the Database connection itself. However don't reset the
152 // configuration, just the actual setup. In case the user runs process()
153 // again it will reinitialize correctly with the same settings.
155 } catch (std::exception& e) {
156 B2ERROR("Uncaught exception encountered: " << e.what()); //should show module name
157 DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
158 throw; //and let python's global handler do the rest
159 } catch (...) {
160 B2ERROR("Uncaught exception encountered!"); //should show module name
161 DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
162 throw; //and let python's global handler do the rest
163 //TODO: having a stack trace would be nicer, but somehow a handler I set using std::set_terminate() never gets called
164 }
165}
166
167
168void Framework::setNumberProcesses(int numProcesses)
169{
171}
172
173
175{
177}
178
179
180void Framework::setPicklePath(const std::string& path)
181{
183}
184
185
187{
189}
190
191void Framework::setStreamingObjects(const boost::python::list& streamingObjects)
192{
193 auto vec = PyObjConvUtils::convertPythonObject(streamingObjects, std::vector<std::string>());
195}
196
197void Framework::setRealm(const std::string& realm)
198{
199 int irealm = -1;
200 std::vector<std::string> realms;
201 for (int i = LogConfig::c_None; i <= LogConfig::c_Production; i++) {
202 std::string thisRealm = LogConfig::logRealmToString((LogConfig::ELogRealm)i);
203 realms.push_back(thisRealm);
204 if (boost::iequals(realm, thisRealm)) { //case-insensitive
205 irealm = i;
206 break;
207 }
208 }
209 if (irealm < 0) {
210 B2ERROR("Invalid realm! Needs to be one of " << boost::join(realms, ", "));
211 } else {
213 }
214}
215
217{
219}
220
222{
223 Environment::Instance().setRunType(Const::c_Cosmic);
224}
225
227{
228 Environment::Instance().setRunType(Const::c_Beam);
229}
230
232{
233 B2WARNING("basf2 will write the simulation steps of each event into output csv files. "
234 "This is fine if you are producing events for the Belle II Virtual Reality application, "
235 "otherwise this function should not be used since the exeuction time will significantly increase.");
237}
238
239std::string Framework::findFile(const std::string& filename, const std::string& type, bool ignore_errors)
240{
241 std::string result;
242 if (type.empty()) {
243 //behave like FileSystem.findFile by using it
244 result = FileSystem::findFile(filename, ignore_errors);
245 } else {
246 result = FileSystem::findFile(filename, type, ignore_errors);
247 }
248 if (!ignore_errors and result.empty()) {
249 // Still not found ... see if we raise an exception or not.
250 // We want a FileNotFoundError ... so lets fudge the errno to the correct
251 // error value and then create the correct exception in python
252 errno = ENOENT;
253 PyErr_SetFromErrnoWithFilename(PyExc_FileNotFoundError, filename.c_str());
254 boost::python::throw_error_already_set();
255 }
256 return result;
257}
258
259//=====================================================================
260// Python API
261//=====================================================================
262
264{
265 boost::python::list returnList;
266
267 for (const std::string& path : ModuleManager::Instance().getModuleSearchPaths())
268 returnList.append(boost::python::object(path));
269 return returnList;
270}
271
272
274{
275 boost::python::dict returnDict;
276 for (const auto& modulePair : ModuleManager::Instance().getAvailableModules())
277 returnDict[boost::python::object(modulePair.first)] = boost::python::object(modulePair.second);
278 return returnDict;
279}
280
281
283{
284 boost::python::list returnList;
285
286 for (const ModulePtr& mod : ModuleManager::Instance().getCreatedModules())
287 returnList.append(boost::python::object(mod));
288 return returnList;
289}
290
291
292#if !defined(__GNUG__) || defined(__ICC)
293#else
294#pragma GCC diagnostic push
295#pragma GCC diagnostic ignored "-Wunused-local-typedefs"
296#endif
297BOOST_PYTHON_FUNCTION_OVERLOADS(process_overloads, Framework::process, 1, 2)
298#if !defined(__GNUG__) || defined(__ICC)
299#else
300#pragma GCC diagnostic pop
301#endif
302
303namespace {
304 PyObject* PyExc_ModuleNotCreatedError{nullptr};
307 void moduleNotCreatedTranslator(const ModuleManager::ModuleNotCreatedError& e)
308 {
309 PyErr_SetString(PyExc_ModuleNotCreatedError, e.what());
310 }
311}
312
314{
315 PyExc_ModuleNotCreatedError = PyErr_NewExceptionWithDoc("basf2.ModuleNotCreatedError",
316 "This exception is raised when a basf2 module could not be created for any reason",
317 PyExc_RuntimeError, nullptr);
318 scope().attr("ModuleNotCreatedError") = handle<>(borrowed(PyExc_ModuleNotCreatedError));
319 register_exception_translator<ModuleManager::ModuleNotCreatedError>(moduleNotCreatedTranslator);
320 //Overloaded methods
321 ModulePtr(*registerModule1)(const std::string&) = &Framework::registerModule;
322 ModulePtr(*registerModule2)(const std::string&, const std::string&) = &Framework::registerModule;
323
324 //don't show c++ signature in python doc to keep it simple
325 docstring_options options(true, true, false);
326
327 //Expose framework class
328 class_<Framework, std::shared_ptr<Framework>, boost::noncopyable>("Framework", "Initialize and Cleanup functions", no_init);
329 std::shared_ptr<Framework> initguard{new Framework()};
330 scope().attr("__framework") = initguard;
331
332 def("add_module_search_path", &Framework::addModuleSearchPath, R"DOCSTRING(
333Add a directory in which to search for compiled basf2 C++ `Modules <Module>`.
334
335This directory needs to contain the shared libraries containing the compiled
336modules as well as companion files ending in ``.b2modmap`` which contain a list
337of the module names contained in each library.
338
339Note:
340 The newly added path will not override existing modules
341
342Parameters:
343 path (str): directory containing the modules.
344)DOCSTRING", args("path"));
345 def("set_externals_path", &Framework::setExternalsPath, R"DOCSTRING(
346Set the path to the externals to be used.
347
348Warning:
349 This will not change the library and executable paths but will just change
350 the directory where to look for certain data files like the Evtgen particle
351 definition file. Don't use this unless you really know what you are doing.
352
353Parameters:
354 path (str): new top level directory for the externals
355)DOCSTRING", args("path"));
356 def("list_module_search_paths", &Framework::getModuleSearchPathsPython, R"DOCSTRING(
357Return a python list containing all the directories included in the module
358search Path.
359
360See:
361 `add_module_search_path`
362)DOCSTRING");
363 def("list_available_modules", &Framework::getAvailableModulesPython, R"DOCSTRING(
364Return a dictionary containing the names of all known modules
365as keys and the name of the shared library containing these modules as values.
366)DOCSTRING");
367 def("list_registered_modules", &Framework::getRegisteredModulesPython, R"DOCSTRING(
368Return a list with pointers to all previously created module instances by calling `register_module()`
369)DOCSTRING");
370 def("get_pickle_path", &Framework::getPicklePath, R"DOCSTRING(
371Return the filename where the pickled path is or should be stored
372)DOCSTRING");
373 def("set_pickle_path", &Framework::setPicklePath, R"DOCSTRING(
374Set the filename where the pickled path should be stored or retrieved from
375)DOCSTRING", args("path"));
376 def("set_nprocesses", &Framework::setNumberProcesses, R"DOCSTRING(
377Sets number of worker processes for parallel processing.
378
379Can be overridden using the ``-p`` argument to basf2.
380
381Note:
382 Setting this to 1 will have one parallel worker job which is almost always
383 slower than just running without parallel processing but is still provided to
384 allow debugging of parallel execution.
385
386Parameters:
387 nproc (int): number of worker processes. 0 to disable parallel processing.
388)DOCSTRING");
389 def("get_nprocesses", &Framework::getNumberProcesses, R"DOCSTRING(
390Gets number of worker processes for parallel processing. 0 disables parallel processing
391)DOCSTRING");
392 def("set_streamobjs", &Framework::setStreamingObjects, R"DOCSTRING(
393Set the names of all DataStore objects which should be sent between the
394parallel processes. This can be used to improve parallel processing performance
395by removing objects not required.
396)DOCSTRING");
397 {
398 // The register_module function is overloaded with different signatures which makes
399 // the boost docstring very useless so we handcraft a docstring
400 docstring_options param_options(true, false, false);
401 def("_register_module", registerModule1);
402 def("_register_module", registerModule2, R"DOCSTRING(register_module(name, library=None)
403Register a new Module.
404
405This function will try to create a new instance of a module with the given name. If no library is given it will try to find the module by itself from the module search path. Optionally one can specify the name of a shared library containing the module code then this library will be loaded
406
407See:
408 `list_module_search_paths()`, `add_module_search_path()`
409
410Parameters:
411 name (str): Type of the module to create
412 library (str): Optional, name of a shared library containing the module
413
414Returns:
415 An instance of the module if successful.
416
417Raises:
418 will raise a `ModuleNotCreatedError` if there is any problem creating the module.
419)DOCSTRING");
420 def("set_realm", &Framework::setRealm, R"DOCSTRING(
421Set the basf2 execution realm.
422
423The severity of log messages sometimes depends on where basf2 runs. This is controlled by the execution realm.
424
425Usually the realm does not have to be set explicitly. On the HLT or express reco it should be set to 'online' and for official productions to 'production'.
426)DOCSTRING", args("realm"));
427 def("declare_cosmics", &Framework::setCosmicRun, R"DOCSTRING(
428Set that the run is for cosmics data
429)DOCSTRING");
430 def("declare_beam", &Framework::setBeamRun, R"DOCSTRING(
431Set that the run is for beam data
432)DOCSTRING");
433 def("write_simulation_steps", &Framework::writeSimulationSteps, R"DOCSTRING(
434Allow basf2 to write the simulation steps of each event into csv files.
435
436This function should not be used in production jobs because the exeuction time will significantly increase.
437)DOCSTRING");
438 def("_process", &Framework::process, process_overloads(R"DOCSTRING(process(path, num_events=0)
439Processes up to max_events events by starting with the first module in the specified path.
440
441 This method starts processing events only if there is a module in the path
442 which is capable of specifying the end of the data flow.
443
444 Parameters:
445 path (Path): The processing starts with the first module of this path.
446 max_events (int): The maximum number of events that will be processed.
447 If the number is smaller than 1, all events will be processed (default).
448)DOCSTRING"));
449 ;
450 }
451
452 def("find_file", &Framework::findFile, (arg("filename"), arg("data_type") = "", arg("silent") = false), R"DOC(
453 Try to find a file and return its full path
454
455 If ``data_type`` is empty this function will try to find the file
456
457 1. in ``$BELLE2_LOCAL_DIR``,
458 2. in ``$BELLE2_RELEASE_DIR``
459 3. relative to the current working directory.
460
461 Other known ``data_type`` values are
462
463 ``examples``
464 Example data for examples and tutorials. Will try to find the file
465
466 1. in ``$BELLE2_EXAMPLES_DATA_DIR``
467 2. relative to the current working directory
468
469 ``validation``
470 Data for Validation purposes. Will try to find the file in
471
472 1. in ``$BELLE2_VALIDATION_DATA_DIR``
473 2. relative to the current working directory
474
475 .. versionadded:: release-03-00-00
476
477 Arguments:
478 filename (str): relative filename to look for, either in a central place or
479 in the current working directory
480 data_type (str): case insensitive data type to find. Either empty string or
481 one of ``"examples"`` or ``"validation"``
482 silent (bool): If True don't print any errors and just return an empty
483 string if the file cannot be found
484 )DOC");
485}
static Configuration & getInstance()
Get a reference to the instance which will be used when the Database is initialized.
void reset()
Reset to default values.
ERunType
Enum for identifying run type (beam or cosmic)
Definition: Const.h:64
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:94
static bool s_DoCleanup
Global flag to to decide if we can do normal cleanup.
Definition: DataStore.h:100
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
Definition: DataStore.cc:86
void setNumberProcesses(int number)
Sets the number of processes which should be used for the parallel processing.
Definition: Environment.h:152
void setRealm(LogConfig::ELogRealm realm)
Set the basf2 execution realm.
Definition: Environment.cc:61
void setJobInformation(const std::shared_ptr< Path > &path)
Set info from path executed by the framework.
Definition: Environment.cc:161
void setRunType(Const::ERunType runType)
Set the run type (beam or cosmic).
Definition: Environment.h:95
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:157
void setExternalsPath(const std::string &externalsPath)
Sets the path which points to the externals directory of the framework.
Definition: Environment.h:54
void setStreamingObjects(const std::vector< std::string > &strobjs)
Set list of streaming objects.
Definition: Environment.h:241
void setWriteSimSteps(const bool writeSimSteps)
Set the flag for writing the simulation steps into an output csv file.
Definition: Environment.h:217
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
void setPicklePath(const std::string &path)
Sets the path to the file where the pickled path is stored.
Definition: Environment.h:170
std::string getPicklePath() const
Returns the path to the file where the pickled path is stored.
Definition: Environment.h:177
provides the core event processing loop.
static std::string findFile(const std::string &path, bool silent=false)
Search for given file or directory in local or central release directory, and return absolute path if...
Definition: FileSystem.cc:151
static boost::python::list getRegisteredModulesPython()
Returns a list of all registered modules.
Definition: Framework.cc:282
static void writeSimulationSteps()
Function for writing the simulation steps of each event into csv files.
Definition: Framework.cc:231
static std::string findFile(const std::string &filename, const std::string &type, bool ignore_errors=false)
Find a file.
Definition: Framework.cc:239
static void setRunType(const Const::ERunType runType)
Function to set the run type (beam or cosmic)
Definition: Framework.cc:216
static void setBeamRun()
Function to set that the script is running on beam data (by default it runs on beam data,...
Definition: Framework.cc:226
static boost::python::list getModuleSearchPathsPython()
Returns a list of all module search paths known to the framework.
Definition: Framework.cc:263
static void setNumberProcesses(int numProcesses)
Function to set number of worker processes for parallel processing.
Definition: Framework.cc:168
static void exposePythonAPI()
Exposes methods of the Framework class to Python.
Definition: Framework.cc:313
static void setCosmicRun()
Function to set that the script is running on cosmics data (by default it runs on beam data)
Definition: Framework.cc:221
static void setRealm(const std::string &realm)
Function to set the execution realm.
Definition: Framework.cc:197
static boost::python::dict getAvailableModulesPython()
Returns a dictionary containing the found modules and the filenames of the shared libraries in which ...
Definition: Framework.cc:273
static void setStreamingObjects(const boost::python::list &streamingObjects)
Function to set streaming objects for Tx module.
Definition: Framework.cc:191
static std::string getPicklePath()
Function to get the path to the file where the pickled path is stored.
Definition: Framework.cc:186
static ModulePtr registerModule(const std::string &moduleName)
Registers a new module to the framework and returns a shared pointer.
Definition: Framework.cc:76
static void setPicklePath(const std::string &path)
Function to set the path to the file where the pickled path is stored.
Definition: Framework.cc:180
static void setExternalsPath(const std::string &path)
Sets the path in which the externals of the framework are located.
Definition: Framework.cc:70
virtual ~Framework()
Destructor.
Definition: Framework.cc:51
static int getNumberProcesses()
Function to get number of worker processes for parallel processing.
Definition: Framework.cc:174
Framework()
Constructor.
Definition: Framework.cc:39
static void addModuleSearchPath(const std::string &path)
Adds a new filepath to the list of filepaths which are searched for modules.
Definition: Framework.cc:64
static void process(PathPtr startPath, long maxEvent=0)
Processes up to maxEvent events by starting with the first module in the specified path.
Definition: Framework.cc:88
static const char * logRealmToString(ELogRealm realm)
Converts a log realm type to a string.
Definition: LogConfig.cc:49
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
ELogRealm
Definition of the supported execution realms.
Definition: LogConfig.h:48
@ c_None
No specific realm.
Definition: LogConfig.h:48
@ c_Production
Data production jobs.
Definition: LogConfig.h:50
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
Definition: LogSystem.cc:150
void enableErrorSummary(bool on)
enable/disable error/warning summary after successful execution and B2FATAL.
Definition: LogSystem.h:181
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
void reset()
Delete all created modules.
void addModuleSearchPath(const std::string &path)
Adds a new filepath to the list of filepaths which are searched for a requested module.
static bool isInitialized()
Truth that the random number generator has been initialized.
static void initialize()
Initialize the random number generator with a unique random seed;.
static std::string randomSocketName()
Generate a random socket name in the form ipc:///socketname.
This class provides the core event processing loop for parallel processing with ZMQ.
This class provides the core event processing loop for parallel processing.
void reset(bool keepEntries=false)
Invalidate all payloads.
Definition: DBStore.cc:177
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
static Database & Instance()
Instance of a singleton Database.
Definition: Database.cc:42
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
static void reset(bool keepConfig=false)
Reset the database instance.
Definition: Database.cc:50
Scalar convertPythonObject(const boost::python::object &pyObject, Scalar)
Convert from Python to given type.
Abstract base class for different kinds of events.