Belle II Software  release-08-01-10
Framework.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pybasf2/Framework.h>
10 
11 #include <framework/core/PyObjConvUtils.h>
12 #include <framework/core/Environment.h>
13 #include <framework/core/RandomNumbers.h>
14 #include <framework/core/EventProcessor.h>
15 #include <framework/core/ModuleManager.h>
16 #include <framework/datastore/DataStore.h>
17 #include <framework/database/DBStore.h>
18 #include <framework/database/Database.h>
19 #include <framework/pcore/pEventProcessor.h>
20 #include <framework/pcore/ZMQEventProcessor.h>
21 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
22 #include <framework/utilities/FileSystem.h>
23 #include <framework/database/Configuration.h>
24 
25 #include <framework/logging/Logger.h>
26 #include <framework/logging/LogSystem.h>
27 
28 #include <boost/algorithm/string.hpp>
29 #include <boost/algorithm/string/join.hpp>
30 #include <boost/python.hpp>
31 
32 #include <set>
33 #include <vector>
34 
35 using namespace boost::python;
36 using namespace Belle2;
37 
38 
39 Framework::Framework()
40 {
41  DataStore::s_DoCleanup = true;
42  LogSystem::Instance().enableErrorSummary(true);
43 
44  if (!RandomNumbers::isInitialized()) {
45  RandomNumbers::initialize();
46  }
47  Environment::Instance();
48 }
49 
50 
51 Framework::~Framework()
52 {
53  //empty module manager of modules
54  //since modules may contain shared pointers of Path objects created in Python,
55  //these shared pointers have special cleanup hooks that can cause crashes if run
56  //after Py_Finalize(). The framework object is cleaned up before, so this is a good place.
57  ModuleManager::Instance().reset();
58  DataStore::s_DoCleanup = false;
59  //Also the database configuration has things to cleanup before Py_Finalize()
60  Conditions::Configuration::getInstance().reset();
61 }
62 
63 
64 void Framework::addModuleSearchPath(const std::string& path)
65 {
66  ModuleManager::Instance().addModuleSearchPath(path);
67 }
68 
69 
70 void Framework::setExternalsPath(const std::string& path)
71 {
72  Environment::Instance().setExternalsPath(path);
73 }
74 
75 
76 ModulePtr Framework::registerModule(const std::string& moduleName)
77 {
78  return ModuleManager::Instance().registerModule(moduleName);
79 }
80 
81 
82 ModulePtr Framework::registerModule(const std::string& moduleName, const std::string& sharedLibPath)
83 {
84  return ModuleManager::Instance().registerModule(moduleName, sharedLibPath);
85 }
86 
87 
88 void Framework::process(PathPtr startPath, long maxEvent)
89 {
90  if (Environment::Instance().getDryRun()) {
91  Environment::Instance().setJobInformation(startPath);
92  return; //processing disabled!
93  }
94 
95  static bool already_executed = false;
96  static std::set<const Module*> previously_run_modules; //not a shared pointer to not screw up ownership
97  static int errors_from_previous_run = 0;
98  const auto moduleListUnique = startPath->buildModulePathList(true);
99  if (already_executed) {
100  B2WARNING("Calling process() more than once per steering file is still experimental, please check results carefully! Python modules especially should reinitialise their state in initialise() to avoid problems");
101  if (startPath->buildModulePathList(true) != startPath->buildModulePathList(false)) {
102  B2FATAL("Your path contains the same module instance in multiple places. Calling process() multiple times is not implemented for this case.");
103  }
104 
105  //were any modules in moduleListUnique already run?
106  for (const auto& m : moduleListUnique) {
107  if (previously_run_modules.count(m.get()) > 0) {
108  //only clone if modules have been run before
109  startPath = std::static_pointer_cast<Path>(startPath->clone());
110  break;
111  }
112  }
113  }
114  for (const auto& m : moduleListUnique) {
115  previously_run_modules.insert(m.get());
116  }
117 
118  int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
119  if (numLogError != errors_from_previous_run) {
120  B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
121  }
122 
123  try {
124  LogSystem::Instance().resetMessageCounter();
125  DataStore::Instance().reset();
126  DataStore::Instance().setInitializeActive(true);
127 
128  auto& environment = Environment::Instance();
129 
130  already_executed = true;
131  if (environment.getNumberProcesses() == 0) {
132  EventProcessor processor;
133  processor.setProfileModuleName(environment.getProfileModuleName());
134  processor.process(startPath, maxEvent);
135  } else {
136  if (environment.getUseZMQ()) {
137  // If the user has not given any socket address, use a random one.
138  if (environment.getZMQSocketAddress().empty()) {
139  environment.setZMQSocketAddress(ZMQAddressUtils::randomSocketName());
140  }
141  ZMQEventProcessor processor;
142  processor.process(startPath, maxEvent);
143  } else {
144  pEventProcessor processor;
145  processor.process(startPath, maxEvent);
146  }
147  }
148  errors_from_previous_run = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
149 
150  DBStore::Instance().reset();
151  // Also, reset the Database connection itself. However don't reset the
152  // configuration, just the actual setup. In case the user runs process()
153  // again it will reinitialize correctly with the same settings.
154  Database::Instance().reset(true);
155  } catch (std::exception& e) {
156  B2ERROR("Uncaught exception encountered: " << e.what()); //should show module name
157  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
158  throw; //and let python's global handler do the rest
159  } catch (...) {
160  B2ERROR("Uncaught exception encountered!"); //should show module name
161  DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
162  throw; //and let python's global handler do the rest
163  //TODO: having a stack trace would be nicer, but somehow a handler I set using std::set_terminate() never gets called
164  }
165 }
166 
167 
168 void Framework::setNumberProcesses(int numProcesses)
169 {
170  Environment::Instance().setNumberProcesses(numProcesses);
171 }
172 
173 
174 int Framework::getNumberProcesses()
175 {
176  return Environment::Instance().getNumberProcesses();
177 }
178 
179 
180 void Framework::setPicklePath(const std::string& path)
181 {
182  Environment::Instance().setPicklePath(path);
183 }
184 
185 
186 std::string Framework::getPicklePath()
187 {
188  return Environment::Instance().getPicklePath();
189 }
190 
191 void Framework::setStreamingObjects(const boost::python::list& streamingObjects)
192 {
193  auto vec = PyObjConvUtils::convertPythonObject(streamingObjects, std::vector<std::string>());
194  Environment::Instance().setStreamingObjects(vec);
195 }
196 
197 void Framework::setRealm(const std::string& realm)
198 {
199  int irealm = -1;
200  std::vector<std::string> realms;
201  for (int i = LogConfig::c_None; i <= LogConfig::c_Production; i++) {
202  std::string thisRealm = LogConfig::logRealmToString((LogConfig::ELogRealm)i);
203  realms.push_back(thisRealm);
204  if (boost::iequals(realm, thisRealm)) { //case-insensitive
205  irealm = i;
206  break;
207  }
208  }
209  if (irealm < 0) {
210  B2ERROR("Invalid realm! Needs to be one of " << boost::join(realms, ", "));
211  } else {
212  Environment::Instance().setRealm((LogConfig::ELogRealm)irealm);
213  }
214 }
215 
216 
217 std::string Framework::findFile(const std::string& filename, const std::string& type, bool ignore_errors)
218 {
219  std::string result;
220  if (type.empty()) {
221  //behave like FileSystem.findFile by using it
222  result = FileSystem::findFile(filename, ignore_errors);
223  } else {
224  result = FileSystem::findFile(filename, type, ignore_errors);
225  }
226  if (!ignore_errors and result.empty()) {
227  // Still not found ... see if we raise an exception or not.
228  // We want a FileNotFoundError ... so lets fudge the errno to the correct
229  // error value and then create the correct exception in python
230  errno = ENOENT;
231  PyErr_SetFromErrnoWithFilename(PyExc_FileNotFoundError, filename.c_str());
232  boost::python::throw_error_already_set();
233  }
234  return result;
235 }
236 
237 //=====================================================================
238 // Python API
239 //=====================================================================
240 
241 boost::python::list Framework::getModuleSearchPathsPython()
242 {
243  boost::python::list returnList;
244 
245  for (const std::string& path : ModuleManager::Instance().getModuleSearchPaths())
246  returnList.append(boost::python::object(path));
247  return returnList;
248 }
249 
250 
251 boost::python::dict Framework::getAvailableModulesPython()
252 {
253  boost::python::dict returnDict;
254  for (const auto& modulePair : ModuleManager::Instance().getAvailableModules())
255  returnDict[boost::python::object(modulePair.first)] = boost::python::object(modulePair.second);
256  return returnDict;
257 }
258 
259 
260 boost::python::list Framework::getRegisteredModulesPython()
261 {
262  boost::python::list returnList;
263 
264  for (const ModulePtr& mod : ModuleManager::Instance().getCreatedModules())
265  returnList.append(boost::python::object(mod));
266  return returnList;
267 }
268 
269 
270 #if !defined(__GNUG__) || defined(__ICC)
271 #else
272 #pragma GCC diagnostic push
273 #pragma GCC diagnostic ignored "-Wunused-local-typedefs"
274 #endif
275 BOOST_PYTHON_FUNCTION_OVERLOADS(process_overloads, Framework::process, 1, 2)
276 #if !defined(__GNUG__) || defined(__ICC)
277 #else
278 #pragma GCC diagnostic pop
279 #endif
280 
281 namespace {
282  PyObject* PyExc_ModuleNotCreatedError{nullptr};
285  void moduleNotCreatedTranslator(const ModuleManager::ModuleNotCreatedError& e)
286  {
287  PyErr_SetString(PyExc_ModuleNotCreatedError, e.what());
288  }
289 }
290 
291 void Framework::exposePythonAPI()
292 {
293  PyExc_ModuleNotCreatedError = PyErr_NewExceptionWithDoc("basf2.ModuleNotCreatedError",
294  "This exception is raised when a basf2 module could not be created for any reason",
295  PyExc_RuntimeError, nullptr);
296  scope().attr("ModuleNotCreatedError") = handle<>(borrowed(PyExc_ModuleNotCreatedError));
297  register_exception_translator<ModuleManager::ModuleNotCreatedError>(moduleNotCreatedTranslator);
298  //Overloaded methods
299  ModulePtr(*registerModule1)(const std::string&) = &Framework::registerModule;
300  ModulePtr(*registerModule2)(const std::string&, const std::string&) = &Framework::registerModule;
301 
302  //don't show c++ signature in python doc to keep it simple
303  docstring_options options(true, true, false);
304 
305  //Expose framework class
306  class_<Framework, std::shared_ptr<Framework>, boost::noncopyable>("Framework", "Initialize and Cleanup functions", no_init);
307  std::shared_ptr<Framework> initguard{new Framework()};
308  scope().attr("__framework") = initguard;
309 
310  def("add_module_search_path", &Framework::addModuleSearchPath, R"DOCSTRING(
311 Add a directory in which to search for compiled basf2 C++ `Modules <Module>`.
312 
313 This directory needs to contain the shared libraries containing the compiled
314 modules as well as companion files ending in ``.b2modmap`` which contain a list
315 of the module names contained in each library.
316 
317 Note:
318  The newly added path will not override existing modules
319 
320 Parameters:
321  path (str): directory containing the modules.
322 )DOCSTRING", args("path"));
323  def("set_externals_path", &Framework::setExternalsPath, R"DOCSTRING(
324 Set the path to the externals to be used.
325 
326 Warning:
327  This will not change the library and executable paths but will just change
328  the directory where to look for certain data files like the Evtgen particle
329  definition file. Don't use this unless you really know what you are doing.
330 
331 Parameters:
332  path (str): new top level directory for the externals
333 )DOCSTRING", args("path"));
334  def("list_module_search_paths", &Framework::getModuleSearchPathsPython, R"DOCSTRING(
335 Return a python list containing all the directories included in the module
336 search Path.
337 
338 See:
339  `add_module_search_path`
340 )DOCSTRING");
341  def("list_available_modules", &Framework::getAvailableModulesPython, R"DOCSTRING(
342 Return a dictionary containing the names of all known modules
343 as keys and the name of the shared library containing these modules as values.
344 )DOCSTRING");
345  def("list_registered_modules", &Framework::getRegisteredModulesPython, R"DOCSTRING(
346 Return a list with pointers to all previously created module instances by calling `register_module()`
347 )DOCSTRING");
348  def("get_pickle_path", &Framework::getPicklePath, R"DOCSTRING(
349 Return the filename where the pickled path is or should be stored
350 )DOCSTRING");
351  def("set_pickle_path", &Framework::setPicklePath, R"DOCSTRING(
352 Set the filename where the pickled path should be stored or retrieved from
353 )DOCSTRING", args("path"));
354  def("set_nprocesses", &Framework::setNumberProcesses, R"DOCSTRING(
355 Sets number of worker processes for parallel processing.
356 
357 Can be overridden using the ``-p`` argument to basf2.
358 
359 Note:
360  Setting this to 1 will have one parallel worker job which is almost always
361  slower than just running without parallel processing but is still provided to
362  allow debugging of parallel execution.
363 
364 Parameters:
365  nproc (int): number of worker processes. 0 to disable parallel processing.
366 )DOCSTRING");
367  def("get_nprocesses", &Framework::getNumberProcesses, R"DOCSTRING(
368 Gets number of worker processes for parallel processing. 0 disables parallel processing
369 )DOCSTRING");
370  def("set_streamobjs", &Framework::setStreamingObjects, R"DOCSTRING(
371 Set the names of all DataStore objects which should be sent between the
372 parallel processes. This can be used to improve parallel processing performance
373 by removing objects not required.
374 )DOCSTRING");
375  {
376  // The register_module function is overloaded with different signatures which makes
377  // the boost docstring very useless so we handcraft a docstring
378  docstring_options param_options(true, false, false);
379  def("_register_module", registerModule1);
380  def("_register_module", registerModule2, R"DOCSTRING(register_module(name, library=None)
381 Register a new Module.
382 
383 This function will try to create a new instance of a module with the given name. If no library is given it will try to find the module by itself from the module search path. Optionally one can specify the name of a shared library containing the module code then this library will be loaded
384 
385 See:
386  `list_module_search_paths()`, `add_module_search_path()`
387 
388 Parameters:
389  name (str): Type of the module to create
390  library (str): Optional, name of a shared library containing the module
391 
392 Returns:
393  An instance of the module if successful.
394 
395 Raises:
396  will raise a `ModuleNotCreatedError` if there is any problem creating the module.
397 )DOCSTRING");
398  def("set_realm", &Framework::setRealm, R"DOCSTRING(
399 Set the basf2 execution realm.
400 
401 The severity of log messages sometimes depends on where basf2 runs. This is controlled by the execution realm.
402 
403 Usually the realm does not have to be set explicitly. On the HLT or express reco it should be set to 'online' and for official productions to 'production'.
404 )DOCSTRING", args("realm"));
405  def("_process", &Framework::process, process_overloads(R"DOCSTRING(process(path, num_events=0)
406 Processes up to max_events events by starting with the first module in the specified path.
407 
408  This method starts processing events only if there is a module in the path
409  which is capable of specifying the end of the data flow.
410 
411  Parameters:
412  path (Path): The processing starts with the first module of this path.
413  max_events (int): The maximum number of events that will be processed.
414  If the number is smaller than 1, all events will be processed (default).
415 )DOCSTRING"));
416  ;
417  }
418 
419  def("find_file", &Framework::findFile, (arg("filename"), arg("data_type") = "", arg("silent") = false), R"DOC(
420  Try to find a file and return its full path
421 
422  If ``data_type`` is empty this function will try to find the file
423 
424  1. in ``$BELLE2_LOCAL_DIR``,
425  2. in ``$BELLE2_RELEASE_DIR``
426  3. relative to the current working directory.
427 
428  Other known ``data_type`` values are
429 
430  ``examples``
431  Example data for examples and tutorials. Will try to find the file
432 
433  1. in ``$BELLE2_EXAMPLES_DATA_DIR``
434  2. relative to the current working directory
435 
436  ``validation``
437  Data for Validation purposes. Will try to find the file in
438 
439  1. in ``$BELLE2_VALIDATION_DATA_DIR``
440  2. relative to the current working directory
441 
442  .. versionadded:: release-03-00-00
443 
444  Arguments:
445  filename (str): relative filename to look for, either in a central place or
446  in the current working directory
447  data_type (str): case insensitive data type to find. Either empty string or
448  one of ``"examples"`` or ``"validation"``
449  silent (bool): If True don't print any errors and just return an empty
450  string if the file cannot be found
451  )DOC");
452 }
provides the core event processing loop.
This class combines all subsystems of the framework, and exports the main interface to Python.
Definition: Framework.h:27
ELogRealm
Definition of the supported execution realms.
Definition: LogConfig.h:48
This class provides the core event processing loop for parallel processing with ZMQ.
This class provides the core event processing loop for parallel processing.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
Abstract base class for different kinds of events.