12This module implements several objects/functions to configure and run calibrations.
13These classes are used to construct the workflow of the calibration job.
14The actual processing code is mostly in the `caf.state_machines` module.
17__all__ = [
"CalibrationBase",
"Calibration",
"Algorithm",
"CAF"]
20from threading
import Thread
22from pathlib
import Path
26from basf2
import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
27from basf2
import find_file
28from basf2
import conditions
as b2conditions
30from abc
import ABC, abstractmethod
33from caf.utils
import B2INFO_MULTILINE
34from caf.utils
import past_from_future_dependencies
35from caf.utils
import topological_sort
36from caf.utils
import all_dependencies
37from caf.utils
import method_dispatch
38from caf.utils
import temporary_workdir
39from caf.utils
import find_int_dirs
40from caf.utils
import LocalDatabase
41from caf.utils
import CentralDatabase
42from caf.utils
import parse_file_uri
44import caf.strategies
as strategies
45import caf.runners
as runners
46from caf.backends
import MaxSubjobsSplitter, MaxFilesSplitter
47from caf.state_machines
import CalibrationMachine, ConditionError, MachineError
48from caf.database
import CAFDB
54 collector (str, basf2.Module): The collector module or module name for this `Collection`.
55 input_files (list[str]): The input files to be used for only this `Collection`.
56 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
57 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
58 output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
59 `Algorithm.data_input` function. Setting this here, replaces the default completely.
60 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
61 Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
62 max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
63 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
64 max_files_for_collector_job.
65 backend_args (dict): The args for the backend submission of this `Collection`.
69 default_max_collector_jobs = 1000
72 job_config =
"collector_job.json"
77 pre_collector_path=None,
80 max_files_per_collector_job=None,
81 max_collector_jobs=None,
104 if pre_collector_path:
117 if max_files_per_collector_job
and max_collector_jobs:
118 B2FATAL(
"Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
120 elif max_files_per_collector_job:
122 elif max_collector_jobs:
144 for tag
in reversed(b2conditions.default_globaltags):
148 self.
job_script = Path(find_file(
"calibration/scripts/caf/run_collector_path.py")).absolute()
149 """The basf2 steering file that will be used for Collector jobs run by this collection.
150This script will be copied into subjob directories as part of the input sandbox."""
157 Remove everything in the database_chain of this Calibration, including the default central database
158 tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
165 global_tag (str): The central database global tag to use for this calibration.
167 Using this allows you to add a central database to the head of the global tag database chain for this collection.
168 The default database chain is just the central one from
169 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
170 The input file global tag will always be overridden and never used unless explicitly set.
172 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
173 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
175 Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
177 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
178 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
179 list which will be checked after all local database files have been checked.
181 So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
182 that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
184 central_db = CentralDatabase(global_tag)
190 filename (str): The path to the database.txt of the local database
191 directory (str): The path to the payloads directory for this local database.
193 Append a local database to the chain for this collection.
194 You can call this function multiple times and each database will be added to the chain IN ORDER.
195 The databases are applied to this collection ONLY.
197 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
198 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
199 list which will be checked after all local database files have been checked.
201 So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
202 that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
204 local_db = LocalDatabase(filename, directory)
211 input_file (str): A local file/glob pattern or XROOTD URI
214 list: A list of the URIs found from the initial string.
217 uri = parse_file_uri(input_file)
218 if uri.scheme ==
"file":
221 uris = [parse_file_uri(f).geturl()
for f
in glob(input_file)]
237 if isinstance(value, str):
241 elif isinstance(value, list):
244 for pattern
in value:
248 raise TypeError(
"Input files must be a list or string")
263 from basf2
import Module
264 if isinstance(collector, str):
265 from basf2
import register_module
266 collector = register_module(collector)
267 if not isinstance(collector, Module):
268 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
289 @max_collector_jobs.setter
296 self.
splitter = MaxSubjobsSplitter(max_subjobs=value)
303 return self.
splitter.max_files_per_subjob
307 @max_files_per_collector_job.setter
314 self.
splitter = MaxFilesSplitter(max_files_per_subjob=value)
319 Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from
320 this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
321 class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
322 that doesn't depend on the C++ CAF at all and run everything in your own way.
324 .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
325 But it's there if you really need it.
328 name (str): Name of this calibration object. Should be unique if you are going to run it.
331 input_files (list[str]): Input files for this calibration. May contain wildcard expressions usable by `glob.glob`.
335 end_state =
"completed"
338 fail_state =
"failed"
376 The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
377 once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
383 A simple method you should implement that will return True or False depending on whether
384 the Calibration has been set up correctly and can be run safely.
390 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
392 Adds dependency of this calibration on another i.e. This calibration
393 will not run until the dependency has completed, and the constants produced
394 will be used via the database chain.
396 You can define multiple dependencies for a single calibration simply
397 by calling this multiple times. Be careful when adding the calibration into
398 the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
399 empty order and the `CAF` processing will fail.
401 This function appends to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
402 `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
405 if self.name != calibration.name:
407 if calibration
not in self.dependencies:
408 self.dependencies.append(calibration)
409 if self
not in calibration.dependencies:
410 calibration.future_dependencies.append(self)
412 B2WARNING(f
"Tried to add {calibration} as a dependency for {self} but they have the same name."
413 "Dependency was not added.")
417 Checks if all of the Calibrations that this one depends on have reached a successful end state.
423 Returns the list of calibrations in our dependency list that have failed.
429 for calibration
in dependencies:
431 failed.append(calibration)
436 We pass in default calibration options from the `CAF` instance here if called.
437 Won't overwrite any options already set.
439 for key, value
in defaults.items():
441 if getattr(self, key)
is None:
442 setattr(self, key, value)
443 except AttributeError:
444 print(f
"The calibration {self.name} does not support the attribute {key}.")
449 Every Calibration object must have at least one collector at least one algorithm.
450 You have the option to add in your collector/algorithm by argument here, or add them
451 later by changing the properties.
453 If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
454 separately via `add_collection()`.
457 name (str): Name of this calibration. It should be unique for use in the `CAF`
459 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
460 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
461 input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards usable by `glob.glob`
463 A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
465 >>> cal = Calibration('TestCalibration1')
466 >>> col1 = register_module('CaTest')
467 >>> cal.add_collection('TestColl', col1)
471 >>> cal = Calibration('TestCalibration1', 'CaTest')
473 If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
475 >>> cal.pre_collector_path = my_basf2_path
477 You don't have to put a RootInput module in this pre-collection path, but you can if
478 you need some special parameters. If you want to process sroot files the you have to explicitly add
479 SeqRootInput to your pre-collection path.
480 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
483 You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
485 >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
487 you can change the input file list later on, before running with `CAF`
489 >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
491 If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
492 input_files, database chain etc from there. See `Collection`.
494 Adding the CalibrationAlgorithm(s) is easy
496 >>> alg1 = TestAlgo()
497 >>> cal.algorithms = alg1
501 >>> cal.algorithms = [alg1]
503 Or for multiple algorithms for one collector
505 >>> alg2 = TestAlgo()
506 >>> cal.algorithms = [alg1, alg2]
508 Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
509 `Algorithm`. To access the C++ algorithm clas underneath directly do:
511 >>> cal.algorithms[i].algorithm
513 If you have a setup function that you want to run before each of the algorithms, set that with
515 >>> cal.pre_algorithms = my_function_object
517 If you want a different setup for each algorithm use a list with the same number of elements
518 as your algorithm list.
520 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
522 You can also specify the dependencies of the calibration on others
524 >>> cal.depends_on(cal2)
526 By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
527 calibration constants created by earlier completed calibrations to dependent ones.
530 moves = [
"submit_collector",
"complete",
"run_algorithms",
"iterate",
"fail_fully"]
532 alg_output_dir =
"algorithm_output"
534 checkpoint_states = [
"init",
"collector_completed",
"completed"]
536 default_collection_name =
"default"
543 pre_collector_path=None,
545 output_patterns=None,
546 max_files_per_collector_job=None,
547 max_collector_jobs=None,
565 max_files_per_collector_job,
598 for tag
in reversed(b2conditions.default_globaltags):
622 name (str): Unique name of this `Collection` in the Calibration.
623 collection (`Collection`): `Collection` object to use.
625 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
626 A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
628 You can ignore the default one and only add your own custom Collections. You can configure the default from the
629 Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
635 B2WARNING(f
"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
636 "Please use another name.")
640 A full calibration consists of a collector AND an associated algorithm AND input_files.
643 1) We are missing any of the above.
644 2) There are multiple Collections and the Collectors have mis-matched granularities.
645 3) Any of our Collectors have granularities that don't match what our Strategy can use.
648 B2WARNING(f
"Empty algorithm list for {self.name}.")
651 if not any([collection.is_valid()
for collection
in self.
collections.values()]):
652 B2WARNING(f
"No valid Collections for {self.name}.")
657 if collection.is_valid():
658 collector_params = collection.collector.available_params()
659 for param
in collector_params:
660 if param.name ==
"granularity":
661 granularities.append(param.values)
662 if len(set(granularities)) > 1:
663 B2WARNING(
"Multiple different granularities set for the Collections in this Calibration.")
667 alg_type = type(alg.algorithm).__name__
668 incorrect_gran = [granularity
not in alg.strategy.allowed_granularities
for granularity
in granularities]
669 if any(incorrect_gran):
670 B2WARNING(f
"Selected strategy for {alg_type} does not match a collector's granularity.")
677 apply_to_default_collection (bool): Should we also reset the default collection?
679 Remove everything in the database_chain of this Calibration, including the default central database tag automatically
680 included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
681 database chain of any `Collection` other than the default one. You can prevent the default Collection from having its chain
682 reset by setting 'apply_to_default_collection' to False.
691 global_tag (str): The central database global tag to use for this calibration.
694 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
696 Using this allows you to append a central database to the database chain for this calibration.
697 The default database chain is just the central one from
698 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
699 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
700 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
702 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
703 and the algorithm processes. So calling:
705 >> cal.use_central_database("global_tag")
707 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
710 >> cal.collections['default'].database_chain
714 >> cal.use_central_database(file_path, payload_dir, False)
716 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
717 So if you have multiple Collections in this Calibration *their database chains are separate*.
718 To specify an additional `CentralDatabase` for a different collection, you will have to call:
720 >> cal.collections['OtherCollection'].use_central_database("global_tag")
722 central_db = CentralDatabase(global_tag)
730 filename (str): The path to the database.txt of the local database
733 directory (str): The path to the payloads directory for this local database.
734 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
736 Append a local database to the chain for this calibration.
737 You can call this function multiple times and each database will be added to the chain IN ORDER.
738 The databases are applied to this calibration ONLY.
739 The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
740 the default `Collection` job as a database chain.
741 There are other databases applied to the processes later, checked by basf2 in this order:
743 1) Local Database from previous iteration of this Calibration.
744 2) Local Database chain from output of previous dependent Calibrations.
745 3) This chain of Local and Central databases where the last added is checked first.
747 Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
748 'apply_to_default_collection' remains True. So calling:
750 >> cal.use_local_database(file_path, payload_dir)
752 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
755 >> cal.collections['default'].database_chain
759 >> cal.use_local_database(file_path, payload_dir, False)
761 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
763 If you have multiple Collections in this Calibration *their database chains are separate*.
764 To specify an additional `LocalDatabase` for a different collection, you will have to call:
766 >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
769 local_db = LocalDatabase(filename, directory)
780 B2WARNING(f
"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
781 "but the default collection doesn't exist."
782 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
783 "collection's attributes directly.")
792 B2WARNING(f
"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
793 "but the default collection doesn't exist."
794 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
795 "collection's attributes directly.")
809 from basf2
import Module
810 if isinstance(collector, str):
811 from basf2
import register_module
812 collector = register_module(collector)
813 if not isinstance(collector, Module):
814 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
836 @files_to_iovs.setter
848 @pre_collector_path.setter
860 @output_patterns.setter
872 @max_files_per_collector_job.setter
884 @max_collector_jobs.setter
913 from ROOT
import Belle2
914 from ROOT.Belle2
import CalibrationAlgorithm
915 if isinstance(value, CalibrationAlgorithm):
918 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
919 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
924 @algorithms.fset.register(tuple)
925 @algorithms.fset.register(list)
928 Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
930 from ROOT
import Belle2
931 from ROOT.Belle2
import CalibrationAlgorithm
935 if isinstance(alg, CalibrationAlgorithm):
938 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in {type(value)}."
939 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
945 Callback run prior to each algorithm iteration.
947 return [alg.pre_algorithm
for alg
in self.
algorithms]
949 @pre_algorithms.setter
956 alg.pre_algorithm = func
958 B2ERROR(
"Something evaluated as False passed in as pre_algorithm function.")
961 @pre_algorithms.fset.register(tuple)
962 @pre_algorithms.fset.register(list)
965 Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
969 for func, alg
in zip(values, self.
algorithms):
970 alg.pre_algorithm = func
972 B2ERROR(
"Number of functions and number of algorithms doesn't match.")
974 B2ERROR(
"Empty container passed in for pre_algorithm functions")
980 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
982 return [alg.strategy
for alg
in self.
algorithms]
991 alg.strategy = strategy
993 B2ERROR(
"Something evaluated as False passed in as a strategy.")
996 @strategies.fset.register(tuple)
997 @strategies.fset.register(list)
1000 Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
1004 for strategy, alg
in zip(strategies, self.
algorithms):
1005 alg.strategy = strategy
1007 B2ERROR(
"Number of strategies and number of algorithms doesn't match.")
1009 B2ERROR(
"Empty container passed in for strategies list")
1019 Main logic of the Calibration object.
1020 Will be run in a new Thread by calling the start() method.
1022 with CAFDB(self.
_db_path, read_only=
True)
as db:
1023 initial_state = db.get_calibration_value(self.
name,
"checkpoint")
1024 initial_iteration = db.get_calibration_value(self.
name,
"iteration")
1025 B2INFO(f
"Initial status of {self.name} found to be state={initial_state}, iteration={initial_iteration}")
1026 self.
machine = CalibrationMachine(self,
1027 iov_to_calibrate=self.
iov,
1028 initial_state=initial_state,
1029 iteration=initial_iteration)
1032 self.
machine.root_dir = Path(os.getcwd(), self.
name)
1037 all_iteration_paths = find_int_dirs(self.
machine.root_dir)
1038 for iteration_path
in all_iteration_paths:
1039 if int(iteration_path.name) > initial_iteration:
1040 shutil.rmtree(iteration_path)
1043 if self.
state ==
"init":
1045 B2INFO(f
"Attempting collector submission for calibration {self.name}.")
1047 except Exception
as err:
1053 if self.
state ==
"collector_failed":
1061 B2INFO(f
"Attempting to run algorithms for calibration {self.name}.")
1063 except MachineError
as err:
1068 if self.
machine.state ==
"algorithms_failed":
1075 while self.
state ==
"running_collector":
1079 except ConditionError:
1081 B2DEBUG(29, f
"Checking if collector jobs for calibration {self.name} have failed.")
1083 except ConditionError:
1090 The current major state of the calibration in the database file. The machine may have a different state.
1092 with CAFDB(self.
_db_path, read_only=
True)
as db:
1093 state = db.get_calibration_value(self.
name,
"state")
1100 B2DEBUG(29, f
"Setting {self.name} to state {state}.")
1102 db.update_calibration_value(self.
name,
"state", str(state))
1104 db.update_calibration_value(self.
name,
"checkpoint", str(state))
1105 B2DEBUG(29, f
"{self.name} set to {state}.")
1110 Retrieves the current iteration number in the database file.
1113 int: The current iteration number
1115 with CAFDB(self.
_db_path, read_only=
True)
as db:
1116 iteration = db.get_calibration_value(self.
name,
"iteration")
1123 B2DEBUG(29, f
"Setting {self.name} to {iteration}.")
1125 db.update_calibration_value(self.
name,
"iteration", iteration)
1126 B2DEBUG(29, f
"{self.name} set to {self.iteration}.")
1132 algorithm: The CalibrationAlgorithm instance that we want to execute.
1134 data_input : An optional function that sets the input files of the algorithm.
1135 pre_algorithm : An optional function that runs just prior to execution of the algorithm.
1136 Useful for set up e.g. module initialisation
1138 This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1139 It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1140 than separating the logic into those classes directly.
1142 This is **not** currently a class that a user should interact with much during `CAF`
1143 setup (unless you're doing something advanced).
1144 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1146 Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1147 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1148 is often useful i.e. by calling for the Geometry module to initialise.
1151 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1157 cppname = type(algorithm).__cpp_name__
1159 self.
name = cppname[cppname.rfind(
'::') + 2:]
1182 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1183 by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1184 for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1186 collector_output_files = list(filter(
lambda file_path:
"CollectorOutput.root" == Path(file_path).name,
1188 info_lines = [f
"Input files used in {self.name}:"]
1189 info_lines.extend(collector_output_files)
1190 B2INFO_MULTILINE(info_lines)
1191 self.
algorithm.setInputFileNames(collector_output_files)
1197 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1199 >>> calibration_defaults={"max_iterations":2}
1201 This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1202 for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1204 The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1205 `CalibrationBase.start` when the dependencies are met.
1207 Much of the checking for consistency is done in this class so that no processing is done with an invalid
1208 setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1209 `CalibrationBase` instances.
1213 _db_name =
"caf_state.db"
1215 default_calibration_config = {
1216 "max_iterations": 5,
1240 if not calibration_defaults:
1241 calibration_defaults = {}
1250 Adds a `Calibration` that is to be used in this program to the list.
1251 Also adds an empty dependency list to the overall dictionary.
1252 You should not directly alter a `Calibration` object after it has been
1255 if calibration.is_valid():
1259 B2WARNING(f
"Tried to add a calibration with the name {calibration.name} twice.")
1261 B2WARNING(f
"Tried to add incomplete/invalid calibration ({calibration.name}) to the framework."
1262 "It was not added and will not be part of the final process.")
1266 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1267 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1270 calibration_names = [calibration.name
for calibration
in self.
calibrations.values()]
1272 def is_dependency_in_caf(dependency):
1274 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1276 dependency_in_caf = dependency.name
in calibration_names
1277 if not dependency_in_caf:
1278 B2WARNING(f
"The calibration {dependency.name} is a required dependency but is not in the CAF."
1279 " It has been removed as a dependency.")
1280 return dependency_in_caf
1285 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1286 calibration.future_dependencies = filtered_future_dependencies
1288 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1289 calibration.dependencies = filtered_dependencies
1293 - Uses dependency attributes of calibrations to create a dependency dictionary and passes it
1294 to a sorting algorithm.
1295 - Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)
1302 future_dependencies_names = [dependency.name
for dependency
in calibration.future_dependencies]
1303 past_dependencies_names = [dependency.name
for dependency
in calibration.dependencies]
1306 self.
dependencies[calibration.name] = past_dependencies_names
1316 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1319 full_deps = full_past_dependencies[calibration.name]
1320 explicit_deps = [cal.name
for cal
in calibration.dependencies]
1321 for dep
in full_deps:
1322 if dep
not in explicit_deps:
1323 calibration.dependencies.append(self.
calibrations[dep])
1326 ordered_dependency_list = []
1327 for ordered_calibration_name
in order:
1328 if ordered_calibration_name
in [dep.name
for dep
in calibration.dependencies]:
1329 ordered_dependency_list.append(self.
calibrations[ordered_calibration_name])
1330 calibration.dependencies = ordered_dependency_list
1331 order = ordered_full_dependencies
1337 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1338 one that is stored isn't a valid Backend object) we should create a default Local backend.
1340 if not isinstance(self.
_backend, caf.backends.Backend):
1346 Checks all current calibrations and removes any invalid Collections from their collections list.
1348 B2INFO(
"Checking for any invalid Collections in Calibrations.")
1350 valid_collections = {}
1351 for name, collection
in calibration.collections.items():
1352 if collection.is_valid():
1353 valid_collections[name] = collection
1355 B2WARNING(f
"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1356 calibration.collections = valid_collections
1361 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1362 this IoV will be used in the collection step.
1364 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1365 and creates database payloads.
1367 Upload of final databases is not done here. This simply creates the local databases in
1368 the output directory. You should check the validity of your new local database before uploading
1369 to the conditions DB via the basf2 tools/interface to the DB.
1372 B2FATAL(
"There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1376 B2FATAL(
"Couldn't order the calibrations properly. Could be a cyclic dependency.")
1393 db_initial_calibrations = db.query(
"select * from calibrations").fetchall()
1397 calibration._db_path = self.
_db_path
1398 calibration.output_database_dir = Path(self.
output_dir, calibration.name,
"outputdb").as_posix()
1399 calibration.iov = iov
1400 if not calibration.backend:
1401 calibration.backend = self.
backend
1403 if calibration.name
not in [db_cal[0]
for db_cal
in db_initial_calibrations]:
1404 db.insert_calibration(calibration.name)
1407 for cal_info
in db_initial_calibrations:
1408 if cal_info[0] == calibration.name:
1409 cal_initial_state = cal_info[2]
1410 cal_initial_iteration = cal_info[3]
1411 B2INFO(f
"Previous entry in database found for {calibration.name}.")
1412 B2INFO(f
"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1413 calibration.state = cal_initial_state
1414 B2INFO(f
"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1415 calibration.iteration = cal_initial_iteration
1417 calibration.daemon =
True
1424 keep_running =
False
1426 remaining_calibrations = []
1430 if (calibration.state == CalibrationBase.end_state
or calibration.state == CalibrationBase.fail_state):
1432 if calibration.is_alive():
1433 B2DEBUG(29, f
"Joining {calibration.name}.")
1436 if calibration.dependencies_met():
1437 if not calibration.is_alive():
1438 B2DEBUG(29, f
"Starting {calibration.name}.")
1441 except RuntimeError:
1444 B2DEBUG(29, f
"{calibration.name} probably just finished, join it later.")
1445 remaining_calibrations.append(calibration)
1447 if not calibration.failed_dependencies():
1448 remaining_calibrations.append(calibration)
1449 if remaining_calibrations:
1454 for calibration
in remaining_calibrations:
1455 for job
in calibration.jobs_to_submit[:]:
1456 calibration.backend.submit(job)
1457 calibration.jobs_to_submit.remove(job)
1460 B2INFO(
"Printing summary of final CAF status.")
1461 with CAFDB(self.
_db_path, read_only=
True)
as db:
1462 print(db.output_calibration_table())
1467 The `backend <backends.Backend>` that runs the collector job.
1468 When set, this is checked that a `backends.Backend` class instance was passed in.
1476 if isinstance(backend, caf.backends.Backend):
1479 B2ERROR(
'Backend property must inherit from Backend class.')
1483 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1486 str: The absolute path of the new output_dir
1490 B2INFO(f
"{p.as_posix()} output directory already exists. "
1491 "We will try to restart from the previous finishing state.")
1494 p.mkdir(parents=
True)
1498 raise FileNotFoundError(f
"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1502 Creates the CAF status database. If it already exists we don't overwrite it.
1506 B2INFO(f
"Previous CAF database found {self._db_path}")
pre_algorithm
Function called after data_input but before algorithm.execute to do any remaining setup.
__init__(self, algorithm, data_input=None, pre_algorithm=None)
data_input
Function called before the pre_algorithm method to setup the input data that the CalibrationAlgorithm...
dict params
Parameters that could be used in the execution of the algorithm strategy/runner to modify behaviour.
algorithm
CalibrationAlgorithm instance (assumed to be true since the Calibration class checks)
strategy
The algorithm stratgey that will be used when running over the collected data.
default_inputdata_setup(self, input_file_paths)
dict dependencies
Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects tha...
add_calibration(self, calibration)
_remove_missing_dependencies(self)
_order_calibrations(self)
int heartbeat
The heartbeat (seconds) between polling for Calibrations that are finished.
dict default_calibration_config
The defaults for Calibrations.
dict calibrations
Dictionary of calibrations for this CAF instance.
dict calibration_defaults
Default options applied to each calibration known to the CAF, if the Calibration has these defined by...
_backend
Private backend attribute.
dict future_dependencies
Dictionary of future dependencies of Calibration objects, where the value is all calibrations that wi...
_db_path
The path of the SQLite DB.
order
The ordering and explicit future dependencies of calibrations.
__init__(self, calibration_defaults=None)
str _db_name
The name of the SQLite DB that gets created.
_prune_invalid_collections(self)
str output_dir
Output path to store results of calibration and bookkeeping information.
list input_files
Files used for collection procedure.
list dependencies
List of calibration objects, where each one is a dependency of this one.
failed_dependencies(self)
dict files_to_iovs
File -> Iov dictionary, should be :
__init__(self, name, input_files=None)
_apply_calibration_defaults(self, defaults)
list future_dependencies
List of calibration objects that depend on this one.
str end_state
The name of the successful completion state.
str fail_state
The name of the failure state.
bool save_payloads
Marks this Calibration as one which has payloads that should be copied and uploaded.
list jobs_to_submit
A simple list of jobs that this Calibration wants submitted at some point.
str output_database_dir
The directory where we'll store the local database payloads from this calibration.
name
Name of calibration object.
depends_on(self, calibration)
iov
IoV which will be calibrated.
ignored_runs
List of ExpRun that will be ignored by this Calibration.
use_central_database(self, global_tag, apply_to_default_collection=True)
list database_chain
The database chain that is applied to the algorithms.
strategies
The strategy that the algorithm(s) will be run against.
dict results
Output results of algorithms for each iteration.
int heartbeat
This calibration's sleep time before rechecking to see if it can move state.
int collector_full_update_interval
While checking if the collector is finished we don't bother wastefully checking every subjob's status...
machine
The caf.state_machines.CalibrationMachine that we will run to process this calibration start to finis...
list checkpoint_states
Checkpoint states which we are allowed to restart from.
_db_path
Location of a SQLite database that will save the state of the calibration so that it can be restarted...
max_iterations
Variable to define the maximum number of iterations for this calibration specifically.
use_local_database(self, filename, directory="", apply_to_default_collection=True)
_get_default_collection_attribute(self, attr)
list _algorithms
Internal calibration algorithms stored for this calibration.
reset_database(self, apply_to_default_collection=True)
add_collection(self, name, collection)
algorithms
Algorithm classes that will be run by this Calibration.
backend
The backend <backends.Backend> we'll use for our collector submission in this calibration.
algorithms_runner
The class that runs all the algorithms in this Calibration using their assigned :py:class:caf....
max_files_per_collector_job(self)
dict collections
Collections stored for this calibration.
__init__(self, name, collector=None, algorithms=None, input_files=None, pre_collector_path=None, database_chain=None, output_patterns=None, max_files_per_collector_job=None, max_collector_jobs=None, backend_args=None)
str default_collection_name
Default collection name.
_set_default_collection_attribute(self, attr, value)
list job_cmd
The Collector caf.backends.Job.cmd attribute.
int default_max_collector_jobs
The default maximum number of collector jobs to create.
list input_files
Internal input_files stored for this calibration.
dict backend_args
Dictionary passed to the collector Job object to configure how the caf.backends.Backend instance shou...
_collector
Internal storage of collector attribute.
dict files_to_iovs
File -> Iov dictionary, should be :
list database_chain
The database chain used for this Collection.
__init__(self, collector=None, input_files=None, pre_collector_path=None, database_chain=None, output_patterns=None, max_files_per_collector_job=None, max_collector_jobs=None, backend_args=None)
list output_patterns
Output patterns of files produced by collector which will be used to pass to the Algorithm....
splitter
The SubjobSplitter to use when constructing collector subjobs from the overall Job object.
_input_files
set input files
pre_collector_path
Since many collectors require some different setup, if you set this attribute to a basf2....
max_files_per_collector_job(self)
collector
Collector module of this collection.
uri_list_from_input_file(input_file)
use_local_database(self, filename, directory="")
use_central_database(self, global_tag)