Belle II Software  release-08-01-10
1 #!/usr/bin/env python3
3 # disable doxygen check for this file
4 # @cond
14 """
15 This module implements several objects/functions to configure and run calibrations.
16 These classes are used to construct the workflow of the calibration job.
17 The actual processing code is mostly in the `caf.state_machines` module.
18 """
20 __all__ = ["CalibrationBase", "Calibration", "Algorithm", "CAF"]
22 import os
23 from threading import Thread
24 from time import sleep
25 from pathlib import Path
26 import shutil
27 from glob import glob
29 from basf2 import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
30 from basf2 import find_file
31 from basf2 import conditions as b2conditions
33 from abc import ABC, abstractmethod
35 import caf
36 from caf.utils import B2INFO_MULTILINE
37 from caf.utils import past_from_future_dependencies
38 from caf.utils import topological_sort
39 from caf.utils import all_dependencies
40 from caf.utils import method_dispatch
41 from caf.utils import temporary_workdir
42 from caf.utils import find_int_dirs
43 from caf.utils import LocalDatabase
44 from caf.utils import CentralDatabase
45 from caf.utils import parse_file_uri
47 import caf.strategies as strategies
48 import caf.runners as runners
49 from caf.backends import MaxSubjobsSplitter, MaxFilesSplitter
50 from caf.state_machines import CalibrationMachine, ConditionError, MachineError
51 from caf.database import CAFDB
54 class Collection():
55  """
56  Keyword Arguments:
57  collector (str, basf2.Module): The collector module or module name for this `Collection`.
58  input_files (list[str]): The input files to be used for only this `Collection`.
59  pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
60  database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
61  output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
62  `Algorithm.data_input` function. Setting this here, replaces the default completely.
63  max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
64  Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
65  max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
66  Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
67  max_files_for_collector_job.
68  backend_args (dict): The args for the backend submission of this `Collection`.
69  """
72  default_max_collector_jobs = 1000
75  job_config = "collector_job.json"
77  def __init__(self,
78  collector=None,
79  input_files=None,
80  pre_collector_path=None,
81  database_chain=None,
82  output_patterns=None,
83  max_files_per_collector_job=None,
84  max_collector_jobs=None,
85  backend_args=None
86  ):
88  self.collector = collector
90  self.input_files = []
91  if input_files:
92  self.input_files = input_files
99  self.files_to_iovs = {}
104  self.pre_collector_path = None
105  if pre_collector_path:
106  self.pre_collector_path = pre_collector_path
111  self.output_patterns = ["CollectorOutput.root"]
112  if output_patterns:
113  self.output_patterns = output_patterns
117  self.splitter = None
118  if max_files_per_collector_job and max_collector_jobs:
119  B2FATAL("Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
120  elif max_files_per_collector_job:
121  self.max_files_per_collector_job = max_files_per_collector_job
122  elif max_collector_jobs:
123  self.max_collector_jobs = max_collector_jobs
124  else:
125  self.max_collector_jobs = self.default_max_collector_jobs
129  self.backend_args = {}
130  if backend_args:
131  self.backend_args = backend_args
133  if database_chain:
137  self.database_chain = database_chain
138  else:
139  self.database_chain = []
140  # This may seem weird but the changes to the DB interface mean that they have effectively swapped from being
141  # described well by appending to a list to a deque. So we do bit of reversal to translate it back and make the
142  # most important GT the last one encountered.
143  for tag in reversed(b2conditions.default_globaltags):
144  self.use_central_database(tag)
146  self.job_script = Path(find_file("calibration/scripts/caf/")).absolute()
147  """The basf2 steering file that will be used for Collector jobs run by this collection.
148 This script will be copied into subjob directories as part of the input sandbox."""
151  self.job_cmd = ["basf2",, "--job-information job_info.json"]
153  def reset_database(self):
154  """
155  Remove everything in the database_chain of this Calibration, including the default central database
156  tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
157  """
158  self.database_chain = []
160  def use_central_database(self, global_tag):
161  """
162  Parameters:
163  global_tag (str): The central database global tag to use for this calibration.
165  Using this allows you to add a central database to the head of the global tag database chain for this collection.
166  The default database chain is just the central one from
167  `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
168  The input file global tag will always be overrided and never used unless explicitly set.
170  To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
171  and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
173  Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
175  NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
176  All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
177  list which will be checked after all local database files have been checked.
179  So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
180  that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
181  """
182  central_db = CentralDatabase(global_tag)
183  self.database_chain.append(central_db)
185  def use_local_database(self, filename, directory=""):
186  """
187  Parameters:
188  filename (str): The path to the database.txt of the local database
189  directory (str): The path to the payloads directory for this local database.
191  Append a local database to the chain for this collection.
192  You can call this function multiple times and each database will be added to the chain IN ORDER.
193  The databases are applied to this collection ONLY.
195  NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
196  All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
197  list which will be checked after all local database files have been checked.
199  So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
200  that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
201  """
202  local_db = LocalDatabase(filename, directory)
203  self.database_chain.append(local_db)
205  @staticmethod
206  def uri_list_from_input_file(input_file):
207  """
208  Parameters:
209  input_file (str): A local file/glob pattern or XROOTD URI
211  Returns:
212  list: A list of the URIs found from the initial string.
213  """
214  # By default we assume it is a local file path if no "scheme" is given
215  uri = parse_file_uri(input_file)
216  if uri.scheme == "file":
217  # For local files we also perform a glob just in case it is a wildcard pattern.
218  # That way we will have all the uris of files separately
219  uris = [parse_file_uri(f).geturl() for f in glob(input_file)]
220  else:
221  # Just let everything else through and hop the backend can figure it out
222  uris = [input_file]
223  return uris
225  @property
226  def input_files(self):
227  return self._input_files
229  @input_files.setter
230  def input_files(self, value):
231  if isinstance(value, str):
232  # If it's a string, we convert to a list of URIs
233  self._input_files = self.uri_list_from_input_file(value)
234  elif isinstance(value, list):
235  # If it's a list we loop and do the same thing
236  total_files = []
237  for pattern in value:
238  total_files.extend(self.uri_list_from_input_file(pattern))
239  self._input_files = total_files
240  else:
241  raise TypeError("Input files must be a list or string")
243  @property
244  def collector(self):
245  """
246  """
247  return self._collector
249  @collector.setter
250  def collector(self, collector):
251  """
252  """
253  # check if collector is already a module or if we need to create one
254  # from the name
255  if collector:
256  from basf2 import Module
257  if isinstance(collector, str):
258  from basf2 import register_module
259  collector = register_module(collector)
260  if not isinstance(collector, Module):
261  B2ERROR("Collector needs to be either a Module or the name of such a module")
263  self._collector = collector
265  def is_valid(self):
266  if (not self.collector or not self.input_files):
267  return False
268  else:
269  return True
271  @property
272  def max_collector_jobs(self):
273  if self.splitter:
274  return self.splitter.max_subjobs
275  else:
276  return None
278  @max_collector_jobs.setter
279  def max_collector_jobs(self, value):
280  if value is None:
281  self.splitter = None
282  else:
283  self.splitter = MaxSubjobsSplitter(max_subjobs=value)
285  @property
286  def max_files_per_collector_job(self):
287  if self.splitter:
288  return self.splitter.max_files_per_subjob
289  else:
290  return None
292  @max_files_per_collector_job.setter
293  def max_files_per_collector_job(self, value):
294  if value is None:
295  self.splitter = None
296  else:
297  self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
300 class CalibrationBase(ABC, Thread):
301  """
302  Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from
303  this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
304  class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
305  that doesn't depend on the C++ CAF at all and run everything in your own way.
307  .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
308  But it's there if you really need it.
310  Parameters:
311  name (str): Name of this calibration object. Should be unique if you are going to run it.
313  Keyword Arguments:
314  input_files (list[str]): Input files for this calibration. May contain wildcard expressions useable by `glob.glob`.
315  """
318  end_state = "completed"
321  fail_state = "failed"
323  def __init__(self, name, input_files=None):
324  """
325  """
326  super().__init__()
328 = name
330  self.future_dependencies = []
332  self.dependencies = []
339  self.files_to_iovs = {}
340  if input_files:
342  self.input_files = input_files
343  else:
344  self.input_files = []
347  self.iov = None
349  self.output_database_dir = ""
352  self.save_payloads = True
354  self.jobs_to_submit = []
356  @abstractmethod
357  def run(self):
358  """
359  The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
360  once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
361  """
363  @abstractmethod
364  def is_valid(self):
365  """
366  A simple method you should implement that will return True or False depending on whether
367  the Calibration has been set up correctly and can be run safely.
368  """
370  def depends_on(self, calibration):
371  """
372  Parameters:
373  calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
375  Adds dependency of this calibration on another i.e. This calibration
376  will not run until the dependency has completed, and the constants produced
377  will be used via the database chain.
379  You can define multiple dependencies for a single calibration simply
380  by calling this multiple times. Be careful when adding the calibration into
381  the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
382  empty order and the `CAF` processing will fail.
384  This function appens to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
385  `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
386  """
387  # Check that we don't have two calibration names that are the same
388  if !=
389  # Tests if we have the calibrations added as dependencies already and adds if not
390  if calibration not in self.dependencies:
391  self.dependencies.append(calibration)
392  if self not in calibration.dependencies:
393  calibration.future_dependencies.append(self)
394  else:
395  B2WARNING(f"Tried to add {calibration} as a dependency for {self} but they have the same name."
396  "Dependency was not added.")
398  def dependencies_met(self):
399  """
400  Checks if all of the Calibrations that this one depends on have reached a successful end state.
401  """
402  return all(map(lambda x: x.state == x.end_state, self.dependencies))
404  def failed_dependencies(self):
405  """
406  Returns the list of calibrations in our dependency list that have failed.
407  """
408  failed = []
409  for calibration in self.dependencies:
410  if calibration.state == self.fail_state:
411  failed.append(calibration)
412  return failed
414  def _apply_calibration_defaults(self, defaults):
415  """
416  We pass in default calibration options from the `CAF` instance here if called.
417  Won't overwrite any options already set.
418  """
419  for key, value in defaults.items():
420  try:
421  if getattr(self, key) is None:
422  setattr(self, key, value)
423  except AttributeError:
424  print(f"The calibration {} does not support the attribute {key}.")
427 class Calibration(CalibrationBase):
428  """
429  Every Calibration object must have at least one collector at least one algorithm.
430  You have the option to add in your collector/algorithm by argument here, or add them
431  later by changing the properties.
433  If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
434  separately via `add_collection()`.
436  Parameters:
437  name (str): Name of this calibration. It should be unique for use in the `CAF`
438  Keyword Arguments:
439  collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
440  algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
441  input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards useable by `glob.glob`
443  A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
445  >>> cal = Calibration('TestCalibration1')
446  >>> col1 = register_module('CaTest')
447  >>> cal.add_collection('TestColl', col1)
449  or equivalently
451  >>> cal = Calibration('TestCalibration1', 'CaTest')
453  If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
455  >>> cal.pre_collector_path = my_basf2_path
457  You don't have to put a RootInput module in this pre-collection path, but you can if
458  you need some special parameters. If you want to process sroot files the you have to explicitly add
459  SeqRootInput to your pre-collection path.
460  The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
463  You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
465  >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
467  you can change the input file list later on, before running with `CAF`
469  >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
471  If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
472  input_files, database chain etc from there. See `Collection`.
474  Adding the CalibrationAlgorithm(s) is easy
476  >>> alg1 = TestAlgo()
477  >>> cal.algorithms = alg1
479  Or equivalently
481  >>> cal.algorithms = [alg1]
483  Or for multiple algorithms for one collector
485  >>> alg2 = TestAlgo()
486  >>> cal.algorithms = [alg1, alg2]
488  Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
489  `Algorithm`. To access the C++ algorithm clas underneath directly do:
491  >>> cal.algorithms[i].algorithm
493  If you have a setup function that you want to run before each of the algorithms, set that with
495  >>> cal.pre_algorithms = my_function_object
497  If you want a different setup for each algorithm use a list with the same number of elements
498  as your algorithm list.
500  >>> cal.pre_algorithms = [my_function1, my_function2, ...]
502  You can also specify the dependencies of the calibration on others
504  >>> cal.depends_on(cal2)
506  By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
507  calibration constants created by earlier completed calibrations to dependent ones.
508  """
510  moves = ["submit_collector", "complete", "run_algorithms", "iterate", "fail_fully"]
512  alg_output_dir = "algorithm_output"
514  checkpoint_states = ["init", "collector_completed", "completed"]
516  default_collection_name = "default"
518  def __init__(self,
519  name,
520  collector=None,
521  algorithms=None,
522  input_files=None,
523  pre_collector_path=None,
524  database_chain=None,
525  output_patterns=None,
526  max_files_per_collector_job=None,
527  max_collector_jobs=None,
528  backend_args=None
529  ):
530  """
531  """
533  self.collections = {}
535  self._algorithms = []
537  # Default collection added, will have None type and requires setting later via `self.collector`, or will take the
538  # CollectorModule/module name directly.
539  self.add_collection(self.default_collection_name,
540  Collection(collector,
541  input_files,
542  pre_collector_path,
543  database_chain,
544  output_patterns,
545  max_files_per_collector_job,
546  max_collector_jobs,
547  backend_args
548  ))
550  super().__init__(name, input_files)
551  if algorithms:
555  self.algorithms = algorithms
557  self.results = {}
560  self.max_iterations = None
565  self.ignored_runs = None
566  if self.algorithms:
570  self.strategies = strategies.SingleIOV
571  if database_chain:
574  self.database_chain = database_chain
575  else:
576  self.database_chain = []
577  # This database is already applied to the `Collection` automatically, so don't do it again
578  for tag in reversed(b2conditions.default_globaltags):
579  self.use_central_database(tag, apply_to_default_collection=False)
583  self.algorithms_runner = runners.SeqAlgorithmsRunner
586  self.backend = None
591  self.collector_full_update_interval = 30
593  self.heartbeat = 3
595  self.machine = None
597  self._db_path = None
599  def add_collection(self, name, collection):
600  """
601  Parameters:
602  name (str): Unique name of this `Collection` in the Calibration.
603  collection (`Collection`): `Collection` object to use.
605  Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
606  A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
607  + input files.
608  You can ignore the default one and only add your own custom Collections. You can configure the default from the
609  Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
610  attributes.
611  """
612  if name not in self.collections:
613  self.collections[name] = collection
614  else:
615  B2WARNING(f"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
616  "Please use another name.")
618  def is_valid(self):
619  """
620  A full calibration consists of a collector AND an associated algorithm AND input_files.
622  Returns False if:
623  1) We are missing any of the above.
624  2) There are multiple Collections and the Collectors have mis-matched granularities.
625  3) Any of our Collectors have granularities that don't match what our Strategy can use.
626  """
627  if not self.algorithms:
628  B2WARNING(f"Empty algorithm list for {}.")
629  return False
631  if not any([collection.is_valid() for collection in self.collections.values()]):
632  B2WARNING(f"No valid Collections for {}.")
633  return False
635  granularities = []
636  for collection in self.collections.values():
637  if collection.is_valid():
638  collector_params = collection.collector.available_params()
639  for param in collector_params:
640  if == "granularity":
641  granularities.append(param.values)
642  if len(set(granularities)) > 1:
643  B2WARNING("Multiple different granularities set for the Collections in this Calibration.")
644  return False
646  for alg in self.algorithms:
647  alg_type = type(alg.algorithm).__name__
648  incorrect_gran = [granularity not in alg.strategy.allowed_granularities for granularity in granularities]
649  if any(incorrect_gran):
650  B2WARNING(f"Selected strategy for {alg_type} does not match a collector's granularity.")
651  return False
652  return True
654  def reset_database(self, apply_to_default_collection=True):
655  """
656  Keyword Arguments:
657  apply_to_default_collection (bool): Should we also reset the default collection?
659  Remove everything in the database_chain of this Calibration, including the default central database tag automatically
660  included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
661  database chain of any `Collection` other than the default one. You can prevent the default Collection from having its chain
662  reset by setting 'apply_to_default_collection' to False.
663  """
664  self.database_chain = []
665  if self.default_collection_name in self.collections and apply_to_default_collection:
666  self.collections[self.default_collection_name].reset_database()
668  def use_central_database(self, global_tag, apply_to_default_collection=True):
669  """
670  Parameters:
671  global_tag (str): The central database global tag to use for this calibration.
673  Keyword Arguments:
674  apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
676  Using this allows you to append a central database to the database chain for this calibration.
677  The default database chain is just the central one from
678  `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
679  To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
680  and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
682  Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
683  and the algorithm processes. So calling:
685  >> cal.use_central_database("global_tag")
687  will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
688  assigned to
690  >> cal.collections['default'].database_chain
692  But calling
694  >> cal.use_central_database(file_path, payload_dir, False)
696  will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
697  So if you have multiple Collections in this Calibration *their database chains are separate*.
698  To specify an additional `CentralDatabase` for a different collection, you will have to call:
700  >> cal.collections['OtherCollection'].use_central_database("global_tag")
701  """
702  central_db = CentralDatabase(global_tag)
703  self.database_chain.append(central_db)
704  if self.default_collection_name in self.collections and apply_to_default_collection:
705  self.collections[self.default_collection_name].use_central_database(global_tag)
707  def use_local_database(self, filename, directory="", apply_to_default_collection=True):
708  """
709  Parameters:
710  filename (str): The path to the database.txt of the local database
712  Keyword Argumemts:
713  directory (str): The path to the payloads directory for this local database.
714  apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
716  Append a local database to the chain for this calibration.
717  You can call this function multiple times and each database will be added to the chain IN ORDER.
718  The databases are applied to this calibration ONLY.
719  The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
720  the default `Collection` job as a database chain.
721  There are other databases applied to the processes later, checked by basf2 in this order:
723  1) Local Database from previous iteration of this Calibration.
724  2) Local Database chain from output of previous dependent Calibrations.
725  3) This chain of Local and Central databases where the last added is checked first.
727  Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
728  'apply_to_default_collection' remains True. So calling:
730  >> cal.use_local_database(file_path, payload_dir)
732  will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
733  assigned to
735  >> cal.collections['default'].database_chain
737  But calling
739  >> cal.use_local_database(file_path, payload_dir, False)
741  will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
743  If you have multiple Collections in this Calibration *their database chains are separate*.
744  To specify an additional `LocalDatabase` for a different collection, you will have to call:
746  >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
748  """
749  local_db = LocalDatabase(filename, directory)
750  self.database_chain.append(local_db)
751  if self.default_collection_name in self.collections and apply_to_default_collection:
752  self.collections[self.default_collection_name].use_local_database(filename, directory)
754  def _get_default_collection_attribute(self, attr):
755  if self.default_collection_name in self.collections:
756  return getattr(self.collections[self.default_collection_name], attr)
757  else:
758  B2WARNING(f"You tried to get the attribute '{attr}' from the Calibration '{}', "
759  "but the default collection doesn't exist."
760  f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
761  "collection's attributes directly.")
762  return None
764  def _set_default_collection_attribute(self, attr, value):
765  if self.default_collection_name in self.collections:
766  setattr(self.collections[self.default_collection_name], attr, value)
767  else:
768  B2WARNING(f"You tried to set the attribute '{attr}' from the Calibration '{}', "
769  "but the default collection doesn't exist."
770  f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
771  "collection's attributes directly.")
773  @property
774  def collector(self):
775  """
776  """
777  return self._get_default_collection_attribute("collector")
779  @collector.setter
780  def collector(self, collector):
781  """
782  """
783  # check if collector is already a module or if we need to create one
784  # from the name
785  from basf2 import Module
786  if isinstance(collector, str):
787  from basf2 import register_module
788  collector = register_module(collector)
789  if not isinstance(collector, Module):
790  B2ERROR("Collector needs to be either a Module or the name of such a module")
792  self._set_default_collection_attribute("collector", collector)
794  @property
795  def input_files(self):
796  """
797  """
798  return self._get_default_collection_attribute("input_files")
800  @input_files.setter
801  def input_files(self, files):
802  """
803  """
804  self._set_default_collection_attribute("input_files", files)
806  @property
807  def files_to_iovs(self):
808  """
809  """
810  return self._get_default_collection_attribute("files_to_iovs")
812  @files_to_iovs.setter
813  def files_to_iovs(self, file_map):
814  """
815  """
816  self._set_default_collection_attribute("files_to_iovs", file_map)
818  @property
819  def pre_collector_path(self):
820  """
821  """
822  return self._get_default_collection_attribute("pre_collector_path")
824  @pre_collector_path.setter
825  def pre_collector_path(self, path):
826  """
827  """
828  self._set_default_collection_attribute("pre_collector_path", path)
830  @property
831  def output_patterns(self):
832  """
833  """
834  return self._get_default_collection_attribute("output_patterns")
836  @output_patterns.setter
837  def output_patterns(self, patterns):
838  """
839  """
840  self._set_default_collection_attribute("output_patterns", patterns)
842  @property
843  def max_files_per_collector_job(self):
844  """
845  """
846  return self._get_default_collection_attribute("max_files_per_collector_job")
848  @max_files_per_collector_job.setter
849  def max_files_per_collector_job(self, max_files):
850  """
851  """
852  self._set_default_collection_attribute("max_files_per_collector_job", max_files)
854  @property
855  def max_collector_jobs(self):
856  """
857  """
858  return self._get_default_collection_attribute("max_collector_jobs")
860  @max_collector_jobs.setter
861  def max_collector_jobs(self, max_jobs):
862  """
863  """
864  self._set_default_collection_attribute("max_collector_jobs", max_jobs)
866  @property
867  def backend_args(self):
868  """
869  """
870  return self._get_default_collection_attribute("backend_args")
872  @backend_args.setter
873  def backend_args(self, args):
874  """
875  """
876  self._set_default_collection_attribute("backend_args", args)
878  @property
879  def algorithms(self):
880  """
881  """
882  return self._algorithms
884  @algorithms.setter
885  @method_dispatch
886  def algorithms(self, value):
887  """
888  """
889  from ROOT.Belle2 import CalibrationAlgorithm
890  if isinstance(value, CalibrationAlgorithm):
891  self._algorithms = [Algorithm(value)]
892  else:
893  B2ERROR(f"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
894  "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
896  @algorithms.fset.register(tuple)
897  @algorithms.fset.register(list)
898  def _(self, value):
899  """
900  Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
901  """
902  from ROOT.Belle2 import CalibrationAlgorithm
903  if value:
904  self._algorithms = []
905  for alg in value:
906  if isinstance(alg, CalibrationAlgorithm):
907  self._algorithms.append(Algorithm(alg))
908  else:
909  B2ERROR(f"Something other than CalibrationAlgorithm instance passed in {type(value)}."
910  "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
912  @property
913  def pre_algorithms(self):
914  """
915  Callback run prior to each algorithm iteration.
916  """
917  return [alg.pre_algorithm for alg in self.algorithms]
919  @pre_algorithms.setter
920  @method_dispatch
921  def pre_algorithms(self, func):
922  """
923  """
924  if func:
925  for alg in self.algorithms:
926  alg.pre_algorithm = func
927  else:
928  B2ERROR("Something evaluated as False passed in as pre_algorithm function.")
930  @pre_algorithms.fset.register(tuple)
931  @pre_algorithms.fset.register(list)
932  def _(self, values):
933  """
934  Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
935  """
936  if values:
937  if len(values) == len(self.algorithms):
938  for func, alg in zip(values, self.algorithms):
939  alg.pre_algorithm = func
940  else:
941  B2ERROR("Number of functions and number of algorithms doesn't match.")
942  else:
943  B2ERROR("Empty container passed in for pre_algorithm functions")
945  @property
946  def strategies(self):
947  """
948  The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
949  """
950  return [alg.strategy for alg in self.algorithms]
952  @strategies.setter
953  @method_dispatch
954  def strategies(self, strategy):
955  """
956  """
957  if strategy:
958  for alg in self.algorithms:
959  alg.strategy = strategy
960  else:
961  B2ERROR("Something evaluated as False passed in as a strategy.")
963  @strategies.fset.register(tuple)
964  @strategies.fset.register(list)
965  def _(self, values):
966  """
967  Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
968  """
969  if values:
970  if len(values) == len(self.algorithms):
971  for strategy, alg in zip(strategies, self.algorithms):
972  alg.strategy = strategy
973  else:
974  B2ERROR("Number of strategies and number of algorithms doesn't match.")
975  else:
976  B2ERROR("Empty container passed in for strategies list")
978  def __repr__(self):
979  """
980  """
981  return
983  def run(self):
984  """
985  Main logic of the Calibration object.
986  Will be run in a new Thread by calling the start() method.
987  """
988  with CAFDB(self._db_path, read_only=True) as db:
989  initial_state = db.get_calibration_value(, "checkpoint")
990  initial_iteration = db.get_calibration_value(, "iteration")
991  B2INFO("Initial status of {} found to be state={}, iteration={}".format(,
992  initial_state,
993  initial_iteration))
994  self.machine = CalibrationMachine(self,
995  iov_to_calibrate=self.iov,
996  initial_state=initial_state,
997  iteration=initial_iteration)
998  self.state = initial_state
999  self.machine.root_dir = Path(os.getcwd(),
1000  self.machine.collector_backend = self.backend
1002  # Before we start running, let's clean up any iteration directories from iterations above our initial one.
1003  # Should prevent confusion between attempts if we fail again.
1004  all_iteration_paths = find_int_dirs(self.machine.root_dir)
1005  for iteration_path in all_iteration_paths:
1006  if int( > initial_iteration:
1007  shutil.rmtree(iteration_path)
1009  while self.state != self.end_state and self.state != self.fail_state:
1010  if self.state == "init":
1011  try:
1012  B2INFO(f"Attempting collector submission for calibration {}.")
1013  self.machine.submit_collector()
1014  except Exception as err:
1015  B2FATAL(str(err))
1017  self._poll_collector()
1019  # If we failed take us to the final fail state
1020  if self.state == "collector_failed":
1021  self.machine.fail_fully()
1022  return
1024  # It's possible that we might raise an error while attempting to run due
1025  # to some problems e.g. Missing collector output files
1026  # We catch the error and exit with failed state so the CAF will stop
1027  try:
1028  B2INFO(f"Attempting to run algorithms for calibration {}.")
1029  self.machine.run_algorithms()
1030  except MachineError as err:
1031  B2ERROR(str(err))
1034  # If we failed take us to the final fail state
1035  if self.machine.state == "algorithms_failed":
1036  self.machine.fail_fully()
1037  return
1039  def _poll_collector(self):
1040  """
1041  """
1042  while self.state == "running_collector":
1043  try:
1044  self.machine.complete()
1045  # ConditionError is thrown when the condtions for the transition have returned false, it's not serious.
1046  except ConditionError:
1047  try:
1048  B2DEBUG(29, f"Checking if collector jobs for calibration {} have failed.")
1050  except ConditionError:
1051  pass
1052  sleep(self.heartbeat) # Sleep until we want to check again
1054  @property
1055  def state(self):
1056  """
1057  The current major state of the calibration in the database file. The machine may have a different state.
1058  """
1059  with CAFDB(self._db_path, read_only=True) as db:
1060  state = db.get_calibration_value(, "state")
1061  return state
1063  @state.setter
1064  def state(self, state):
1065  """
1066  """
1067  B2DEBUG(29, f"Setting {} to state {state}.")
1068  with CAFDB(self._db_path) as db:
1069  db.update_calibration_value(, "state", str(state))
1070  if state in self.checkpoint_states:
1071  db.update_calibration_value(, "checkpoint", str(state))
1072  B2DEBUG(29, f"{} set to {state}.")
1074  @property
1075  def iteration(self):
1076  """
1077  Retrieves the current iteration number in the database file.
1079  Returns:
1080  int: The current iteration number
1081  """
1082  with CAFDB(self._db_path, read_only=True) as db:
1083  iteration = db.get_calibration_value(, "iteration")
1084  return iteration
1086  @iteration.setter
1087  def iteration(self, iteration):
1088  """
1089  """
1090  B2DEBUG(29, f"Setting {} to {iteration}.")
1091  with CAFDB(self._db_path) as db:
1092  db.update_calibration_value(, "iteration", iteration)
1093  B2DEBUG(29, f"{} set to {self.iteration}.")
1096 class Algorithm():
1097  """
1098  Parameters:
1099  algorithm: The CalibrationAlgorithm instance that we want to execute.
1100  Keyword Arguments:
1101  data_input (types.FunctionType): An optional function that sets the input files of the algorithm.
1102  pre_algorithm (types.FunctionType): An optional function that runs just prior to execution of the algorithm.
1103  Useful for set up e.g. module initialisation
1105  This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1106  It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1107  than separating the logic into those classes directly.
1109  This is **not** currently a class that a user should interact with much during `CAF`
1110  setup (unless you're doing something advanced).
1111  The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1113  Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1114  Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1115  is often useful i.e. by calling for the Geometry module to initialise.
1116  """
1118  def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1119  """
1120  """
1122  self.algorithm = algorithm
1124  cppname = type(algorithm).__cpp_name__
1125 = cppname[cppname.rfind('::') + 2:]
1129  self.data_input = data_input
1130  if not self.data_input:
1131  self.data_input = self.default_inputdata_setup
1135  self.pre_algorithm = pre_algorithm
1138  self.strategy = strategies.SingleIOV
1144  self.params = {}
1146  def default_inputdata_setup(self, input_file_paths):
1147  """
1148  Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1149  by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1150  for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1151  """
1152  collector_output_files = list(filter(lambda file_path: "CollectorOutput.root" == Path(file_path).name,
1153  input_file_paths))
1154  info_lines = [f"Input files used in {}:"]
1155  info_lines.extend(collector_output_files)
1156  B2INFO_MULTILINE(info_lines)
1157  self.algorithm.setInputFileNames(collector_output_files)
1160 class CAF():
1161  """
1162  Parameters:
1163  calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1165  >>> calibration_defaults={"max_iterations":2}
1167  This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1168  for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1170  The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1171  `CalibrationBase.start` when the dependencies are met.
1173  Much of the checking for consistency is done in this class so that no processing is done with an invalid
1174  setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1175  `CalibrationBase` instances.
1176  """
1179  _db_name = "caf_state.db"
1181  default_calibration_config = {
1182  "max_iterations": 5,
1183  "ignored_runs": []
1184  }
1186  def __init__(self, calibration_defaults=None):
1187  """
1188  """
1190  self.calibrations = {}
1193  self.future_dependencies = {}
1196  self.dependencies = {}
1198  self.output_dir = "calibration_results"
1200  self.order = None
1202  self._backend = None
1204  self.heartbeat = 5
1206  if not calibration_defaults:
1207  calibration_defaults = {}
1210  self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1212  self._db_path = None
1214  def add_calibration(self, calibration):
1215  """
1216  Adds a `Calibration` that is to be used in this program to the list.
1217  Also adds an empty dependency list to the overall dictionary.
1218  You should not directly alter a `Calibration` object after it has been
1219  added here.
1220  """
1221  if calibration.is_valid():
1222  if not in self.calibrations:
1223  self.calibrations[] = calibration
1224  else:
1225  B2WARNING(f"Tried to add a calibration with the name {} twice.")
1226  else:
1227  B2WARNING(f"Tried to add incomplete/invalid calibration ({}) to the framwork."
1228  "It was not added and will not be part of the final process.")
1230  def _remove_missing_dependencies(self):
1231  """
1232  This checks the future and past dependencies of each `Calibration` in the `CAF`.
1233  If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1234  object directly.
1235  """
1236  calibration_names = [ for calibration in self.calibrations.values()]
1238  def is_dependency_in_caf(dependency):
1239  """
1240  Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1241  """
1242  dependency_in_caf = in calibration_names
1243  if not dependency_in_caf:
1244  B2WARNING(f"The calibration {} is a required dependency but is not in the CAF."
1245  " It has been removed as a dependency.")
1246  return dependency_in_caf
1248  # Check that there aren't dependencies on calibrations not added to the framework
1249  # Remove them from the calibration objects if there are.
1250  for calibration in self.calibrations.values():
1251  filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1252  calibration.future_dependencies = filtered_future_dependencies
1254  filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1255  calibration.dependencies = filtered_dependencies
1257  def _order_calibrations(self):
1258  """
1259  - Uses dependency atrributes of calibrations to create a dependency dictionary and passes it
1260  to a sorting algorithm.
1261  - Returns valid OrderedDict if sort was succesful, empty one if it failed (most likely a cyclic dependency)
1262  """
1263  # First remove any dependencies on calibrations not added to the CAF
1264  self._remove_missing_dependencies()
1265  # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1266  # Note that they currently use the names not the calibration objects.
1267  for calibration in self.calibrations.values():
1268  future_dependencies_names = [ for dependency in calibration.future_dependencies]
1269  past_dependencies_names = [ for dependency in calibration.dependencies]
1271  self.future_dependencies[] = future_dependencies_names
1272  self.dependencies[] = past_dependencies_names
1273  # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1274  order = topological_sort(self.future_dependencies)
1275  if not order:
1276  return False
1278  # Get an ordered dictionary of the sort order but including all implicit dependencies.
1279  ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1281  # Return all the implicit+explicit past dependencies
1282  full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1283  # Correct each calibration's dependency list to reflect the implicit dependencies
1284  for calibration in self.calibrations.values():
1285  full_deps = full_past_dependencies[]
1286  explicit_deps = [ for cal in calibration.dependencies]
1287  for dep in full_deps:
1288  if dep not in explicit_deps:
1289  calibration.dependencies.append(self.calibrations[dep])
1290  # At this point the calibrations have their full dependencies but they aren't in topological
1291  # sort order. Correct that here
1292  ordered_dependency_list = []
1293  for ordered_calibration_name in order:
1294  if ordered_calibration_name in [ for dep in calibration.dependencies]:
1295  ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1296  calibration.dependencies = ordered_dependency_list
1297  order = ordered_full_dependencies
1298  # We should also patch in all of the implicit dependencies for the calibrations
1299  return order
1301  def _check_backend(self):
1302  """
1303  Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1304  one that is stored isn't a valid Backend object) we should create a default Local backend.
1305  """
1306  if not isinstance(self._backend, caf.backends.Backend):
1308  self.backend = caf.backends.Local()
1310  def _prune_invalid_collections(self):
1311  """
1312  Checks all current calibrations and removes any invalid Collections from their collections list.
1313  """
1314  B2INFO("Checking for any invalid Collections in Calibrations.")
1315  for calibration in self.calibrations.values():
1316  valid_collections = {}
1317  for name, collection in calibration.collections.items():
1318  if collection.is_valid():
1319  valid_collections[name] = collection
1320  else:
1321  B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{}'.")
1322  calibration.collections = valid_collections
1324  def run(self, iov=None):
1325  """
1326  Keyword Arguments:
1327  iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1328  this IoV will be used in the collection step.
1330  This function runs the overall calibration job, saves the outputs to the output_dir directory,
1331  and creates database payloads.
1333  Upload of final databases is not done here. This simply creates the local databases in
1334  the output directory. You should check the validity of your new local database before uploading
1335  to the conditions DB via the basf2 tools/interface to the DB.
1336  """
1337  if not self.calibrations:
1338  B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1339  # Checks whether the dependencies we've added will give a valid order
1340  order = self._order_calibrations()
1341  if not order:
1342  B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1344  # Check that a backend has been set and use default Local() one if not
1345  self._check_backend()
1347  self._prune_invalid_collections()
1349  # Creates the overall output directory and reset the attribute to use an absolute path to it.
1350  self.output_dir = self._make_output_dir()
1352  # Creates a SQLite DB to save the status of the various calibrations
1353  self._make_database()
1355  # Enter the overall output dir during processing and opena connection to the DB
1356  with temporary_workdir(self.output_dir):
1357  db = CAFDB(self._db_path)
1359  db_initial_calibrations = db.query("select * from calibrations").fetchall()
1360  for calibration in self.calibrations.values():
1361  # Apply defaults given to the `CAF` to the calibrations if they aren't set
1362  calibration._apply_calibration_defaults(self.calibration_defaults)
1363  calibration._db_path = self._db_path
1364  calibration.output_database_dir = Path(self.output_dir,, "outputdb").as_posix()
1365  calibration.iov = iov
1366  if not calibration.backend:
1367  calibration.backend = self.backend
1368  # Do some checking of the db to see if we need to add an entry for this calibration
1369  if not in [db_cal[0] for db_cal in db_initial_calibrations]:
1370  db.insert_calibration(
1371  db.commit()
1372  else:
1373  for cal_info in db_initial_calibrations:
1374  if cal_info[0] ==
1375  cal_initial_state = cal_info[2]
1376  cal_initial_iteration = cal_info[3]
1377  B2INFO(f"Previous entry in database found for {}.")
1378  B2INFO(f"Setting {} state to checkpoint state '{cal_initial_state}'.")
1379  calibration.state = cal_initial_state
1380  B2INFO(f"Setting {} iteration to '{cal_initial_iteration}'.")
1381  calibration.iteration = cal_initial_iteration
1382  # Daemonize so that it exits if the main program exits
1383  calibration.daemon = True
1385  db.close()
1387  # Is it possible to keep going?
1388  keep_running = True
1389  while keep_running:
1390  keep_running = False
1391  # Do we have calibrations that may yet complete?
1392  remaining_calibrations = []
1394  for calibration in self.calibrations.values():
1395  # Find the currently ended calibrations (may not be joined yet)
1396  if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1397  # Search for any alive Calibrations and join them
1398  if calibration.is_alive():
1399  B2DEBUG(29, f"Joining {}.")
1400  calibration.join()
1401  else:
1402  if calibration.dependencies_met():
1403  if not calibration.is_alive():
1404  B2DEBUG(29, f"Starting {}.")
1405  try:
1406  calibration.start()
1407  except RuntimeError:
1408  # Catch the case when the calibration just finished so it ended up here
1409  # in the "else" and not above where it should have been joined.
1410  B2DEBUG(29, f"{} probably just finished, join it later.")
1411  remaining_calibrations.append(calibration)
1412  else:
1413  if not calibration.failed_dependencies():
1414  remaining_calibrations.append(calibration)
1415  if remaining_calibrations:
1416  keep_running = True
1417  # Loop over jobs that the calibrations want submitted and submit them.
1418  # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1419  # So this is like a mini job queue without getting too clever with it
1420  for calibration in remaining_calibrations:
1421  for job in calibration.jobs_to_submit[:]:
1422  calibration.backend.submit(job)
1423  calibration.jobs_to_submit.remove(job)
1424  sleep(self.heartbeat)
1426  B2INFO("Printing summary of final CAF status.")
1427  with CAFDB(self._db_path, read_only=True) as db:
1428  print(db.output_calibration_table())
1430  @property
1431  def backend(self):
1432  """
1433  The `backend <backends.Backend>` that runs the collector job.
1434  When set, this is checked that a `backends.Backend` class instance was passed in.
1435  """
1436  return self._backend
1438  @backend.setter
1439  def backend(self, backend):
1440  """
1441  """
1442  if isinstance(backend, caf.backends.Backend):
1443  self._backend = backend
1444  else:
1445  B2ERROR('Backend property must inherit from Backend class.')
1447  def _make_output_dir(self):
1448  """
1449  Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1451  Returns:
1452  str: The absolute path of the new output_dir
1453  """
1454  p = Path(self.output_dir).resolve()
1455  if p.is_dir():
1456  B2INFO(f"{p.as_posix()} output directory already exists. "
1457  "We will try to restart from the previous finishing state.")
1458  return p.as_posix()
1459  else:
1460  p.mkdir(parents=True)
1461  if p.is_dir():
1462  return p.as_posix()
1463  else:
1464  raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1466  def _make_database(self):
1467  """
1468  Creates the CAF status database. If it already exists we don't overwrite it.
1469  """
1470  self._db_path = Path(self.output_dir, self._db_name).absolute()
1471  if self._db_path.exists():
1472  B2INFO(f"Previous CAF database found {self._db_path}")
1473  # Will create a new database + tables, or do nothing but checks we can connect to existing one
1474  with CAFDB(self._db_path):
1475  pass
1477 # @endcond
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved