Belle II Software  light-2403-persian
Framework.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pybasf2/Framework.h>
10 
11 #include <framework/core/PyObjConvUtils.h>
12 #include <framework/core/Environment.h>
13 #include <framework/core/RandomNumbers.h>
14 #include <framework/core/EventProcessor.h>
15 #include <framework/core/ModuleManager.h>
16 #include <framework/datastore/DataStore.h>
17 #include <framework/database/DBStore.h>
18 #include <framework/database/Database.h>
19 #include <framework/pcore/pEventProcessor.h>
20 #include <framework/pcore/ZMQEventProcessor.h>
21 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
22 #include <framework/utilities/FileSystem.h>
23 #include <framework/database/Configuration.h>
24 
25 #include <framework/logging/Logger.h>
26 #include <framework/logging/LogSystem.h>
27 
28 #include <boost/algorithm/string.hpp>
29 #include <boost/algorithm/string/join.hpp>
30 #include <boost/python.hpp>
31 
32 #include <set>
33 #include <vector>
34 
35 using namespace boost::python;
36 using namespace Belle2;
37 
38 
39 Framework::Framework()
40 {
41  DataStore::s_DoCleanup = true;
42  LogSystem::Instance().enableErrorSummary(true);
43 
44  if (!RandomNumbers::isInitialized()) {
45  RandomNumbers::initialize();
46  }
47  Environment::Instance();
48 }
49 
50 
51 Framework::~Framework()
52 {
53  //empty module manager of modules
54  //since modules may contain shared pointers of Path objects created in Python,
55  //these shared pointers have special cleanup hooks that can cause crashes if run
56  //after Py_Finalize(). The framework object is cleaned up before, so this is a good place.
57  ModuleManager::Instance().reset();
58  DataStore::s_DoCleanup = false;
59  //Also the database configuration has things to cleanup before Py_Finalize()
60  Conditions::Configuration::getInstance().reset();
61 }
62 
63 
64 void Framework::addModuleSearchPath(const std::string& path)
65 {
66  ModuleManager::Instance().addModuleSearchPath(path);
67 }
68 
69 
70 void Framework::setExternalsPath(const std::string& path)
71 {
72  Environment::Instance().setExternalsPath(path);
73 }
74 
75 
76 ModulePtr Framework::registerModule(const std::string& moduleName)
77 {
78  return ModuleManager::Instance().registerModule(moduleName);
79 }
80 
81 
82 ModulePtr Framework::registerModule(const std::string& moduleName, const std::string& sharedLibPath)
83 {
84  return ModuleManager::Instance().registerModule(moduleName, sharedLibPath);
85 }
86 
87 
88 void Framework::process(PathPtr startPath, long maxEvent)
89 {
90  if (Environment::Instance().getDryRun()) {
91  Environment::Instance().setJobInformation(startPath);
92  return; //processing disabled!
93  }
94 
95  static bool already_executed = false;
96  static std::set<const Module*> previously_run_modules; //not a shared pointer to not screw up ownership
97  static int errors_from_previous_run = 0;
98  const auto moduleListUnique = startPath->buildModulePathList(true);
99  if (already_executed) {
100  B2WARNING("Calling process() more than once per steering file is still experimental, please check results carefully! Python modules especially should reinitialise their state in initialise() to avoid problems");
101  if (startPath->buildModulePathList(true) != startPath->buildModulePathList(false)) {
102  B2FATAL("Your path contains the same module instance in multiple places. Calling process() multiple times is not implemented for this case.");
103  }
104 
105  //were any modules in moduleListUnique already run?
106  for (const auto& m : moduleListUnique) {
107  if (previously_run_modules.count(m.get()) > 0) {
108  //only clone if modules have been run before
109  startPath = std::static_pointer_cast<Path>(startPath->clone());
110  break;
111  }
112  }
113  }
114  for (const auto& m : moduleListUnique) {
115  previously_run_modules.insert(m.get());
116  }
117 
118  int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
119  if (numLogError != errors_from_previous_run) {
120  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
121  }
122 
123  try {
124  LogSystem::Instance().resetMessageCounter();
125  DataStore::Instance().reset();
126  DataStore::Instance().setInitializeActive(true);
127 
128  auto& environment = Environment::Instance();
129 
130  already_executed = true;
131  if (environment.getNumberProcesses() == 0) {
132  EventProcessor processor;
133  processor.setProfileModuleName(environment.getProfileModuleName());
134  processor.process(startPath, maxEvent);
135  } else {
136  if (environment.getUseZMQ()) {
137  // If the user has not given any socket address, use a random one.
138  if (environment.getZMQSocketAddress().empty()) {
139  environment.setZMQSocketAddress(ZMQAddressUtils::randomSocketName());
140  }
141  ZMQEventProcessor processor;
142  processor.process(startPath, maxEvent);
143  } else {
144  pEventProcessor processor;
145  processor.process(startPath, maxEvent);
146  }
147  }
148  errors_from_previous_run = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
149 
150  DBStore::Instance().reset();
151  // Also, reset the Database connection itself. However don't reset the
152  // configuration, just the actual setup. In case the user runs process()
153  // again it will reinitialize correctly with the same settings.
154  Database::Instance().reset(true);
155  } catch (std::exception& e) {
156  B2ERROR("Uncaught exception encountered: " << e.what()); //should show module name
157  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
158  throw; //and let python's global handler do the rest
159  } catch (...) {
160  B2ERROR("Uncaught exception encountered!"); //should show module name
161  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
162  throw; //and let python's global handler do the rest
163  //TODO: having a stack trace would be nicer, but somehow a handler I set using std::set_terminate() never gets called
164  }
165 }
166 
167 
168 void Framework::setNumberProcesses(int numProcesses)
169 {
170  Environment::Instance().setNumberProcesses(numProcesses);
171 }
172 
173 
174 int Framework::getNumberProcesses()
175 {
176  return Environment::Instance().getNumberProcesses();
177 }
178 
179 
180 void Framework::setPicklePath(const std::string& path)
181 {
182  Environment::Instance().setPicklePath(path);
183 }
184 
185 
186 std::string Framework::getPicklePath()
187 {
188  return Environment::Instance().getPicklePath();
189 }
190 
191 void Framework::setStreamingObjects(const boost::python::list& streamingObjects)
192 {
193  auto vec = PyObjConvUtils::convertPythonObject(streamingObjects, std::vector<std::string>());
194  Environment::Instance().setStreamingObjects(vec);
195 }
196 
197 void Framework::setRealm(const std::string& realm)
198 {
199  int irealm = -1;
200  std::vector<std::string> realms;
201  for (int i = LogConfig::c_None; i <= LogConfig::c_Production; i++) {
202  std::string thisRealm = LogConfig::logRealmToString((LogConfig::ELogRealm)i);
203  realms.push_back(thisRealm);
204  if (boost::iequals(realm, thisRealm)) { //case-insensitive
205  irealm = i;
206  break;
207  }
208  }
209  if (irealm < 0) {
210  B2ERROR("Invalid realm! Needs to be one of " << boost::join(realms, ", "));
211  } else {
212  Environment::Instance().setRealm((LogConfig::ELogRealm)irealm);
213  }
214 }
215 
216 void Framework::writeSimulationSteps()
217 {
218  B2WARNING("basf2 will write the simulation steps of each event into output csv files. "
219  "This is fine if you are producing events for the Belle II Virtual Reality application, "
220  "otherwise this function should not be used since the exeuction time will significantly increase.");
221  Environment::Instance().setWriteSimSteps(true);
222 }
223 
224 std::string Framework::findFile(const std::string& filename, const std::string& type, bool ignore_errors)
225 {
226  std::string result;
227  if (type.empty()) {
228  //behave like FileSystem.findFile by using it
229  result = FileSystem::findFile(filename, ignore_errors);
230  } else {
231  result = FileSystem::findFile(filename, type, ignore_errors);
232  }
233  if (!ignore_errors and result.empty()) {
234  // Still not found ... see if we raise an exception or not.
235  // We want a FileNotFoundError ... so lets fudge the errno to the correct
236  // error value and then create the correct exception in python
237  errno = ENOENT;
238  PyErr_SetFromErrnoWithFilename(PyExc_FileNotFoundError, filename.c_str());
239  boost::python::throw_error_already_set();
240  }
241  return result;
242 }
243 
244 //=====================================================================
245 // Python API
246 //=====================================================================
247 
248 boost::python::list Framework::getModuleSearchPathsPython()
249 {
250  boost::python::list returnList;
251 
252  for (const std::string& path : ModuleManager::Instance().getModuleSearchPaths())
253  returnList.append(boost::python::object(path));
254  return returnList;
255 }
256 
257 
258 boost::python::dict Framework::getAvailableModulesPython()
259 {
260  boost::python::dict returnDict;
261  for (const auto& modulePair : ModuleManager::Instance().getAvailableModules())
262  returnDict[boost::python::object(modulePair.first)] = boost::python::object(modulePair.second);
263  return returnDict;
264 }
265 
266 
267 boost::python::list Framework::getRegisteredModulesPython()
268 {
269  boost::python::list returnList;
270 
271  for (const ModulePtr& mod : ModuleManager::Instance().getCreatedModules())
272  returnList.append(boost::python::object(mod));
273  return returnList;
274 }
275 
276 
277 #if !defined(__GNUG__) || defined(__ICC)
278 #else
279 #pragma GCC diagnostic push
280 #pragma GCC diagnostic ignored "-Wunused-local-typedefs"
281 #endif
282 BOOST_PYTHON_FUNCTION_OVERLOADS(process_overloads, Framework::process, 1, 2)
283 #if !defined(__GNUG__) || defined(__ICC)
284 #else
285 #pragma GCC diagnostic pop
286 #endif
287 
288 namespace {
289  PyObject* PyExc_ModuleNotCreatedError{nullptr};
292  void moduleNotCreatedTranslator(const ModuleManager::ModuleNotCreatedError& e)
293  {
294  PyErr_SetString(PyExc_ModuleNotCreatedError, e.what());
295  }
296 }
297 
298 void Framework::exposePythonAPI()
299 {
300  PyExc_ModuleNotCreatedError = PyErr_NewExceptionWithDoc("basf2.ModuleNotCreatedError",
301  "This exception is raised when a basf2 module could not be created for any reason",
302  PyExc_RuntimeError, nullptr);
303  scope().attr("ModuleNotCreatedError") = handle<>(borrowed(PyExc_ModuleNotCreatedError));
304  register_exception_translator<ModuleManager::ModuleNotCreatedError>(moduleNotCreatedTranslator);
305  //Overloaded methods
306  ModulePtr(*registerModule1)(const std::string&) = &Framework::registerModule;
307  ModulePtr(*registerModule2)(const std::string&, const std::string&) = &Framework::registerModule;
308 
309  //don't show c++ signature in python doc to keep it simple
310  docstring_options options(true, true, false);
311 
312  //Expose framework class
313  class_<Framework, std::shared_ptr<Framework>, boost::noncopyable>("Framework", "Initialize and Cleanup functions", no_init);
314  std::shared_ptr<Framework> initguard{new Framework()};
315  scope().attr("__framework") = initguard;
316 
317  def("add_module_search_path", &Framework::addModuleSearchPath, R"DOCSTRING(
318 Add a directory in which to search for compiled basf2 C++ `Modules <Module>`.
319 
320 This directory needs to contain the shared libraries containing the compiled
321 modules as well as companion files ending in ``.b2modmap`` which contain a list
322 of the module names contained in each library.
323 
324 Note:
325  The newly added path will not override existing modules
326 
327 Parameters:
328  path (str): directory containing the modules.
329 )DOCSTRING", args("path"));
330  def("set_externals_path", &Framework::setExternalsPath, R"DOCSTRING(
331 Set the path to the externals to be used.
332 
333 Warning:
334  This will not change the library and executable paths but will just change
335  the directory where to look for certain data files like the Evtgen particle
336  definition file. Don't use this unless you really know what you are doing.
337 
338 Parameters:
339  path (str): new top level directory for the externals
340 )DOCSTRING", args("path"));
341  def("list_module_search_paths", &Framework::getModuleSearchPathsPython, R"DOCSTRING(
342 Return a python list containing all the directories included in the module
343 search Path.
344 
345 See:
346  `add_module_search_path`
347 )DOCSTRING");
348  def("list_available_modules", &Framework::getAvailableModulesPython, R"DOCSTRING(
349 Return a dictionary containing the names of all known modules
350 as keys and the name of the shared library containing these modules as values.
351 )DOCSTRING");
352  def("list_registered_modules", &Framework::getRegisteredModulesPython, R"DOCSTRING(
353 Return a list with pointers to all previously created module instances by calling `register_module()`
354 )DOCSTRING");
355  def("get_pickle_path", &Framework::getPicklePath, R"DOCSTRING(
356 Return the filename where the pickled path is or should be stored
357 )DOCSTRING");
358  def("set_pickle_path", &Framework::setPicklePath, R"DOCSTRING(
359 Set the filename where the pickled path should be stored or retrieved from
360 )DOCSTRING", args("path"));
361  def("set_nprocesses", &Framework::setNumberProcesses, R"DOCSTRING(
362 Sets number of worker processes for parallel processing.
363 
364 Can be overridden using the ``-p`` argument to basf2.
365 
366 Note:
367  Setting this to 1 will have one parallel worker job which is almost always
368  slower than just running without parallel processing but is still provided to
369  allow debugging of parallel execution.
370 
371 Parameters:
372  nproc (int): number of worker processes. 0 to disable parallel processing.
373 )DOCSTRING");
374  def("get_nprocesses", &Framework::getNumberProcesses, R"DOCSTRING(
375 Gets number of worker processes for parallel processing. 0 disables parallel processing
376 )DOCSTRING");
377  def("set_streamobjs", &Framework::setStreamingObjects, R"DOCSTRING(
378 Set the names of all DataStore objects which should be sent between the
379 parallel processes. This can be used to improve parallel processing performance
380 by removing objects not required.
381 )DOCSTRING");
382  {
383  // The register_module function is overloaded with different signatures which makes
384  // the boost docstring very useless so we handcraft a docstring
385  docstring_options param_options(true, false, false);
386  def("_register_module", registerModule1);
387  def("_register_module", registerModule2, R"DOCSTRING(register_module(name, library=None)
388 Register a new Module.
389 
390 This function will try to create a new instance of a module with the given name. If no library is given it will try to find the module by itself from the module search path. Optionally one can specify the name of a shared library containing the module code then this library will be loaded
391 
392 See:
393  `list_module_search_paths()`, `add_module_search_path()`
394 
395 Parameters:
396  name (str): Type of the module to create
397  library (str): Optional, name of a shared library containing the module
398 
399 Returns:
400  An instance of the module if successful.
401 
402 Raises:
403  will raise a `ModuleNotCreatedError` if there is any problem creating the module.
404 )DOCSTRING");
405  def("set_realm", &Framework::setRealm, R"DOCSTRING(
406 Set the basf2 execution realm.
407 
408 The severity of log messages sometimes depends on where basf2 runs. This is controlled by the execution realm.
409 
410 Usually the realm does not have to be set explicitly. On the HLT or express reco it should be set to 'online' and for official productions to 'production'.
411 )DOCSTRING", args("realm"));
412  def("write_simulation_steps", &Framework::writeSimulationSteps, R"DOCSTRING(
413 Allow basf2 to write the simulation steps of each event into csv files.
414 
415 This function should not be used in production jobs because the exeuction time will significantly increase.
416 )DOCSTRING");
417  def("_process", &Framework::process, process_overloads(R"DOCSTRING(process(path, num_events=0)
418 Processes up to max_events events by starting with the first module in the specified path.
419 
420  This method starts processing events only if there is a module in the path
421  which is capable of specifying the end of the data flow.
422 
423  Parameters:
424  path (Path): The processing starts with the first module of this path.
425  max_events (int): The maximum number of events that will be processed.
426  If the number is smaller than 1, all events will be processed (default).
427 )DOCSTRING"));
428  ;
429  }
430 
431  def("find_file", &Framework::findFile, (arg("filename"), arg("data_type") = "", arg("silent") = false), R"DOC(
432  Try to find a file and return its full path
433 
434  If ``data_type`` is empty this function will try to find the file
435 
436  1. in ``$BELLE2_LOCAL_DIR``,
437  2. in ``$BELLE2_RELEASE_DIR``
438  3. relative to the current working directory.
439 
440  Other known ``data_type`` values are
441 
442  ``examples``
443  Example data for examples and tutorials. Will try to find the file
444 
445  1. in ``$BELLE2_EXAMPLES_DATA_DIR``
446  2. relative to the current working directory
447 
448  ``validation``
449  Data for Validation purposes. Will try to find the file in
450 
451  1. in ``$BELLE2_VALIDATION_DATA_DIR``
452  2. relative to the current working directory
453 
454  .. versionadded:: release-03-00-00
455 
456  Arguments:
457  filename (str): relative filename to look for, either in a central place or
458  in the current working directory
459  data_type (str): case insensitive data type to find. Either empty string or
460  one of ``"examples"`` or ``"validation"``
461  silent (bool): If True don't print any errors and just return an empty
462  string if the file cannot be found
463  )DOC");
464 }
provides the core event processing loop.
void setProfileModuleName(const std::string &name)
Set the name of the module we want to profile.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
This class combines all subsystems of the framework, and exports the main interface to Python.
Definition: Framework.h:27
ELogRealm
Definition of the supported execution realms.
Definition: LogConfig.h:48
This class provides the core event processing loop for parallel processing with ZMQ.
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain using parallel processing, starting with the first module in the give...
This class provides the core event processing loop for parallel processing.
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain, starting with the first module in the given path.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
Abstract base class for different kinds of events.
Definition: ClusterUtils.h:24