5 This module implements several objects/functions to configure and run calibrations.
6 These classes are used to construct the workflow of the calibration job.
7 The actual processing code is mostly in the `caf.state_machines` module.
10 __all__ = [
"CalibrationBase",
"Calibration",
"Algorithm",
"CAF"]
13 from threading
import Thread
14 from time
import sleep
15 from pathlib
import Path
19 from basf2
import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
20 from basf2
import find_file
21 from basf2
import conditions
as b2conditions
23 from abc
import ABC, abstractmethod
26 from caf.utils
import B2INFO_MULTILINE
27 from caf.utils
import past_from_future_dependencies
28 from caf.utils
import topological_sort
29 from caf.utils
import all_dependencies
30 from caf.utils
import method_dispatch
31 from caf.utils
import temporary_workdir
32 from caf.utils
import find_int_dirs
33 from caf.utils
import LocalDatabase
34 from caf.utils
import CentralDatabase
35 from caf.utils
import parse_file_uri
37 import caf.strategies
as strategies
38 import caf.runners
as runners
39 from caf
import backends
40 from caf.backends
import MaxSubjobsSplitter, MaxFilesSplitter
41 from caf.state_machines
import CalibrationMachine, ConditionError, MachineError
42 from caf.database
import CAFDB
48 collector (str, basf2.Module): The collector module or module name for this `Collection`.
49 input_files (list[str]): The input files to be used for only this `Collection`.
50 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
51 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
52 output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
53 `Algorithm.data_input` function. Setting this here, replaces the default completely.
54 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
55 Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
56 max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
57 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
58 max_files_for_collector_job.
59 backend_args (dict): The args for the backend submission of this `Collection`.
63 default_max_collector_jobs = 1000
66 job_config =
"collector_job.json"
71 pre_collector_path=None,
74 max_files_per_collector_job=None,
75 max_collector_jobs=None,
96 if pre_collector_path:
109 if max_files_per_collector_job
and max_collector_jobs:
110 B2FATAL(
"Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
111 elif max_files_per_collector_job:
113 elif max_collector_jobs:
134 for tag
in reversed(b2conditions.default_globaltags):
137 self.
job_script = Path(find_file(
"calibration/scripts/caf/run_collector_path.py")).absolute()
138 """The basf2 steering file that will be used for Collector jobs run by this collection.
139 This script will be copied into subjob directories as part of the input sandbox."""
142 self.
job_cmd = [
"basf2", self.
job_script.name,
"--job-information job_info.json"]
146 Remove everything in the database_chain of this Calibration, including the default central database
147 tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
154 global_tag (str): The central database global tag to use for this calibration.
156 Using this allows you to add a central database to the head of the global tag database chain for this collection.
157 The default database chain is just the central one from `basf2.conditions.default_globaltags`.
158 The input file global tag will always be overrided and never used unless explicitly set.
160 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
161 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
163 Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
165 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
166 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
167 list which will be checked after all local database files have been checked.
169 So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
170 that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
172 central_db = CentralDatabase(global_tag)
178 filename (str): The path to the database.txt of the local database
179 directory (str): The path to the payloads directory for this local database.
181 Append a local database to the chain for this collection.
182 You can call this function multiple times and each database will be added to the chain IN ORDER.
183 The databases are applied to this collection ONLY.
185 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
186 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
187 list which will be checked after all local database files have been checked.
189 So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
190 that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
192 local_db = LocalDatabase(filename, directory)
199 input_file (str): A local file/glob pattern or XROOTD URI
202 list: A list of the URIs found from the initial string.
205 uri = parse_file_uri(input_file)
206 if uri.scheme ==
"file":
209 uris = [parse_file_uri(f).geturl()
for f
in glob(input_file)]
221 if isinstance(value, str):
224 elif isinstance(value, list):
227 for pattern
in value:
231 raise TypeError(
"Input files must be a list or string")
246 from basf2
import Module
247 if isinstance(collector, str):
248 from basf2
import register_module
249 collector = register_module(collector)
250 if not isinstance(collector, Module):
251 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
262 def max_collector_jobs(self):
268 @max_collector_jobs.setter
269 def max_collector_jobs(self, value):
273 self.
splitter = MaxSubjobsSplitter(max_subjobs=value)
276 def max_files_per_collector_job(self):
278 return self.
splitter.max_files_per_subjob
282 @max_files_per_collector_job.setter
283 def max_files_per_collector_job(self, value):
287 self.
splitter = MaxFilesSplitter(max_files_per_subjob=value)
292 Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from
293 this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
294 class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
295 that doesn't depend on the C++ CAF at all and run everything in your own way.
297 .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
298 But it's there if you really need it.
301 name (str): Name of this calibration object. Should be unique if you are going to run it.
304 input_files (list[str]): Input files for this calibration. May contain wildcard expressions useable by `glob.glob`.
308 end_state =
"completed"
311 fail_state =
"failed"
349 The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
350 once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
356 A simple method you should implement that will return True or False depending on whether
357 the Calibration has been set up correctly and can be run safely.
363 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
365 Adds dependency of this calibration on another i.e. This calibration
366 will not run until the dependency has completed, and the constants produced
367 will be used via the database chain.
369 You can define multiple dependencies for a single calibration simply
370 by calling this multiple times. Be careful when adding the calibration into
371 the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
372 empty order and the `CAF` processing will fail.
374 This function appens to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
375 `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
378 if self.name != calibration.name:
380 if calibration
not in self.dependencies:
381 self.dependencies.append(calibration)
382 if self
not in calibration.dependencies:
383 calibration.future_dependencies.append(self)
385 B2WARNING((f
"Tried to add {calibration} as a dependency for {self} but they have the same name."
386 "Dependency was not added."))
390 Checks if all of the Calibrations that this one depends on have reached a successful end state.
392 return all(map(
lambda x: x.state == x.end_state, self.
dependencies))
396 Returns the list of calibrations in our dependency list that have failed.
401 failed.append(calibration)
406 We pass in default calibration options from the `CAF` instance here if called.
407 Won't overwrite any options already set.
409 for key, value
in defaults.items():
411 if getattr(self, key)
is None:
412 setattr(self, key, value)
413 except AttributeError:
414 print(f
"The calibration {self.name} does not support the attribute {key}.")
419 Every Calibration object must have at least one collector at least one algorithm.
420 You have the option to add in your collector/algorithm by argument here, or add them
421 later by changing the properties.
423 If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
424 separately via `add_collection()`.
427 name (str): Name of this calibration. It should be unique for use in the `CAF`
429 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
430 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
431 input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards useable by `glob.glob`
433 A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
435 >>> cal = Calibration('TestCalibration1')
436 >>> col1 = register_module('CaTest')
437 >>> cal.add_collection('TestColl', col1)
441 >>> cal = Calibration('TestCalibration1', 'CaTest')
443 If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
445 >>> cal.pre_collector_path = my_basf2_path
447 You don't have to put a RootInput module in this pre-collection path, but you can if
448 you need some special parameters. If you want to process sroot files the you have to explicitly add
449 SeqRootInput to your pre-collection path.
450 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
453 You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
455 >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
457 you can change the input file list later on, before running with `CAF`
459 >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
461 If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
462 input_files, database chain etc from there. See `Collection`.
464 Adding the CalibrationAlgorithm(s) is easy
466 >>> alg1 = TestAlgo()
467 >>> cal.algorithms = alg1
471 >>> cal.algorithms = [alg1]
473 Or for multiple algorithms for one collector
475 >>> alg2 = TestAlgo()
476 >>> cal.algorithms = [alg1, alg2]
478 Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
479 `Algorithm`. To access the C++ algorithm clas underneath directly do:
481 >>> cal.algorithms[i].algorithm
483 If you have a setup function that you want to run before each of the algorithms, set that with
485 >>> cal.pre_algorithms = my_function_object
487 If you want a different setup for each algorithm use a list with the same number of elements
488 as your algorithm list.
490 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
492 You can also specify the dependencies of the calibration on others
494 >>> cal.depends_on(cal2)
496 By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
497 calibration constants created by earlier completed calibrations to dependent ones.
500 moves = [
"submit_collector",
"complete",
"run_algorithms",
"iterate",
"fail_fully"]
502 alg_output_dir =
"algorithm_output"
504 checkpoint_states = [
"init",
"collector_completed",
"completed"]
506 default_collection_name =
"default"
513 pre_collector_path=None,
515 output_patterns=None,
516 max_files_per_collector_job=None,
517 max_collector_jobs=None,
535 max_files_per_collector_job,
568 for tag
in reversed(b2conditions.default_globaltags):
592 name (str): Unique name of this `Collection` in the Calibration.
593 collection (`Collection`): `Collection` object to use.
595 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
596 A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
598 You can ignore the default one and only add your own custom Collections. You can configure the default from the
599 Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
605 B2WARNING(f
"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
606 "Please use another name.")
610 A full calibration consists of a collector AND an associated algorithm AND input_files.
613 1) We are missing any of the above.
614 2) There are multiple Collections and the Collectors have mis-matched granularities.
615 3) Any of our Collectors have granularities that don't match what our Strategy can use.
618 B2WARNING(f
"Empty algorithm list for {self.name}.")
621 if not any([collection.is_valid()
for collection
in self.
collections.values()]):
622 B2WARNING(f
"No valid Collections for {self.name}.")
627 if collection.is_valid():
628 collector_params = collection.collector.available_params()
629 for param
in collector_params:
630 if param.name ==
"granularity":
631 granularities.append(param.values)
632 if len(set(granularities)) > 1:
633 B2WARNING(
"Multiple different granularities set for the Collections in this Calibration.")
637 alg_type = type(alg.algorithm).__name__
638 incorrect_gran = [granularity
not in alg.strategy.allowed_granularities
for granularity
in granularities]
639 if any(incorrect_gran):
640 B2WARNING(f
"Selected strategy for {alg_type} does not match a collector's granularity.")
647 apply_to_default_collection (bool): Should we also reset the default collection?
649 Remove everything in the database_chain of this Calibration, including the default central database
650 tag automatically included from `basf2.conditions.default_globaltags`. This will NOT affect the database chain of any
651 `Collection` other than the default one. You can prevent the default Collection from having its chain reset by setting
652 'apply_to_default_collection' to False.
661 global_tag (str): The central database global tag to use for this calibration.
664 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
666 Using this allows you to append a central database to the database chain for this calibration.
667 The default database chain is just the central one from `basf2.conditions.default_globaltags`.
668 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
669 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
671 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
672 and the algorithm processes. So calling:
674 >> cal.use_central_database("global_tag")
676 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
679 >> cal.collections['default'].database_chain
683 >> cal.use_central_database(file_path, payload_dir, False)
685 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
686 So if you have multiple Collections in this Calibration *their database chains are separate*.
687 To specify an additional `CentralDatabase` for a different collection, you will have to call:
689 >> cal.collections['OtherCollection'].use_central_database("global_tag")
691 central_db = CentralDatabase(global_tag)
699 filename (str): The path to the database.txt of the local database
702 directory (str): The path to the payloads directory for this local database.
703 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
705 Append a local database to the chain for this calibration.
706 You can call this function multiple times and each database will be added to the chain IN ORDER.
707 The databases are applied to this calibration ONLY.
708 The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
709 the default `Collection` job as a database chain.
710 There are other databases applied to the processes later, checked by basf2 in this order:
712 1) Local Database from previous iteration of this Calibration.
713 2) Local Database chain from output of previous dependent Calibrations.
714 3) This chain of Local and Central databases where the last added is checked first.
716 Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
717 'apply_to_default_collection' remains True. So calling:
719 >> cal.use_local_database(file_path, payload_dir)
721 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
724 >> cal.collections['default'].database_chain
728 >> cal.use_local_database(file_path, payload_dir, False)
730 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
732 If you have multiple Collections in this Calibration *their database chains are separate*.
733 To specify an additional `LocalDatabase` for a different collection, you will have to call:
735 >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
738 local_db = LocalDatabase(filename, directory)
743 def _get_default_collection_attribute(self, attr):
747 B2WARNING(f
"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
748 "but the default collection doesn't exist."
749 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
750 "collection's attributes directly.")
753 def _set_default_collection_attribute(self, attr, value):
757 B2WARNING(f
"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
758 "but the default collection doesn't exist."
759 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
760 "collection's attributes directly.")
774 from basf2
import Module
775 if isinstance(collector, str):
776 from basf2
import register_module
777 collector = register_module(collector)
778 if not isinstance(collector, Module):
779 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
801 @files_to_iovs.setter
813 @pre_collector_path.setter
825 @output_patterns.setter
837 @max_files_per_collector_job.setter
849 @max_collector_jobs.setter
878 from ROOT.Belle2
import CalibrationAlgorithm
879 if isinstance(value, CalibrationAlgorithm):
882 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
883 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
885 @algorithms.fset.register(tuple)
886 @algorithms.fset.register(list)
889 Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
891 from ROOT.Belle2
import CalibrationAlgorithm
895 if isinstance(alg, CalibrationAlgorithm):
898 B2ERROR((f
"Something other than CalibrationAlgorithm instance passed in {type(value)}."
899 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm"))
904 Callback run prior to each algorithm iteration.
906 return [alg.pre_algorithm
for alg
in self.
algorithms]
908 @pre_algorithms.setter
915 alg.pre_algorithm = func
917 B2ERROR(
"Something evaluated as False passed in as pre_algorithm function.")
919 @pre_algorithms.fset.register(tuple)
920 @pre_algorithms.fset.register(list)
923 Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
927 for func, alg
in zip(values, self.
algorithms):
928 alg.pre_algorithm = func
930 B2ERROR(
"Number of functions and number of algorithms doesn't match.")
932 B2ERROR(
"Empty container passed in for pre_algorithm functions")
937 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
939 return [alg.strategy
for alg
in self.
algorithms]
948 alg.strategy = strategy
950 B2ERROR(
"Something evaluated as False passed in as a strategy.")
952 @strategies.fset.register(tuple)
953 @strategies.fset.register(list)
956 Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
960 for strategy, alg
in zip(strategies, self.
algorithms):
961 alg.strategy = strategy
963 B2ERROR(
"Number of strategies and number of algorithms doesn't match.")
965 B2ERROR(
"Empty container passed in for strategies list")
974 Main logic of the Calibration object.
975 Will be run in a new Thread by calling the start() method.
977 with CAFDB(self.
_db_path, read_only=
True)
as db:
978 initial_state = db.get_calibration_value(self.
name,
"checkpoint")
979 initial_iteration = db.get_calibration_value(self.
name,
"iteration")
980 B2INFO(
"Initial status of {} found to be state={}, iteration={}".format(self.
name,
983 self.
machine = CalibrationMachine(self,
984 iov_to_calibrate=self.
iov,
985 initial_state=initial_state,
986 iteration=initial_iteration)
987 self.
state = initial_state
988 self.
machine.root_dir = Path(os.getcwd(), self.
name)
993 all_iteration_paths = find_int_dirs(self.
machine.root_dir)
994 for iteration_path
in all_iteration_paths:
995 if int(iteration_path.name) > initial_iteration:
996 shutil.rmtree(iteration_path)
999 if self.
state ==
"init":
1001 B2INFO(f
"Attempting collector submission for calibration {self.name}.")
1003 except Exception
as err:
1009 if self.
state ==
"collector_failed":
1017 B2INFO(f
"Attempting to run algorithms for calibration {self.name}.")
1019 except MachineError
as err:
1024 if self.
machine.state ==
"algorithms_failed":
1031 while self.
state ==
"running_collector":
1035 except ConditionError:
1037 B2DEBUG(29, f
"Checking if collector jobs for calibration {self.name} have failed.")
1039 except ConditionError:
1046 The current major state of the calibration in the database file. The machine may have a different state.
1048 with CAFDB(self.
_db_path, read_only=
True)
as db:
1049 state = db.get_calibration_value(self.
name,
"state")
1053 def state(self, state):
1056 B2DEBUG(29, f
"Setting {self.name} to state {state}.")
1058 db.update_calibration_value(self.
name,
"state", str(state))
1060 db.update_calibration_value(self.
name,
"checkpoint", str(state))
1061 B2DEBUG(29, f
"{self.name} set to {state}.")
1066 Retrieves the current iteration number in the database file.
1069 int: The current iteration number
1071 with CAFDB(self.
_db_path, read_only=
True)
as db:
1072 iteration = db.get_calibration_value(self.
name,
"iteration")
1079 B2DEBUG(29, f
"Setting {self.name} to {iteration}.")
1081 db.update_calibration_value(self.
name,
"iteration", iteration)
1082 B2DEBUG(29, f
"{self.name} set to {self.iteration}.")
1088 algorithm: The CalibrationAlgorithm instance that we want to execute.
1090 data_input (types.FunctionType): An optional function that sets the input files of the algorithm.
1091 pre_algorithm (types.FunctionType): An optional function that runs just prior to execution of the algorithm.
1092 Useful for set up e.g. module initialisation
1094 This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1095 It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1096 than separating the logic into those classes directly.
1098 This is **not** currently a class that a user should interact with much during `CAF`
1099 setup (unless you're doing something advanced).
1100 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1102 Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1103 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1104 is often useful i.e. by calling for the Geometry module to initialise.
1107 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1113 self.
name = algorithm.__cppname__[algorithm.__cppname__.rfind(
'::') + 2:]
1136 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1137 by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1138 for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1140 collector_output_files = list(
filter(
lambda file_path:
"CollectorOutput.root" == Path(file_path).name,
1142 info_lines = [f
"Input files used in {self.name}:"]
1143 info_lines.extend(collector_output_files)
1144 B2INFO_MULTILINE(info_lines)
1145 self.
algorithm.setInputFileNames(collector_output_files)
1151 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1153 >>> calibration_defaults={"max_iterations":2}
1155 This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1156 for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1158 The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1159 `CalibrationBase.start` when the dependencies are met.
1161 Much of the checking for consistency is done in this class so that no processing is done with an invalid
1162 setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1163 `CalibrationBase` instances.
1167 _db_name =
"caf_state.db"
1169 default_calibration_config = {
1170 "max_iterations": 5,
1194 if not calibration_defaults:
1195 calibration_defaults = {}
1204 Adds a `Calibration` that is to be used in this program to the list.
1205 Also adds an empty dependency list to the overall dictionary.
1206 You should not directly alter a `Calibration` object after it has been
1209 if calibration.is_valid():
1213 B2WARNING(f
"Tried to add a calibration with the name {calibration.name} twice.")
1215 B2WARNING((f
"Tried to add incomplete/invalid calibration ({calibration.name}) to the framwork."
1216 "It was not added and will not be part of the final process."))
1220 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1221 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1224 calibration_names = [calibration.name
for calibration
in self.
calibrations.values()]
1226 def is_dependency_in_caf(dependency):
1228 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1230 dependency_in_caf = dependency.name
in calibration_names
1231 if not dependency_in_caf:
1232 B2WARNING(f
"The calibration {dependency.name} is a required dependency but is not in the CAF."
1233 " It has been removed as a dependency.")
1234 return dependency_in_caf
1239 filtered_future_dependencies = list(
filter(is_dependency_in_caf, calibration.future_dependencies))
1240 calibration.future_dependencies = filtered_future_dependencies
1242 filtered_dependencies = list(
filter(is_dependency_in_caf, calibration.dependencies))
1243 calibration.dependencies = filtered_dependencies
1247 - Uses dependency atrributes of calibrations to create a dependency dictionary and passes it
1248 to a sorting algorithm.
1249 - Returns valid OrderedDict if sort was succesful, empty one if it failed (most likely a cyclic dependency)
1256 future_dependencies_names = [dependency.name
for dependency
in calibration.future_dependencies]
1257 past_dependencies_names = [dependency.name
for dependency
in calibration.dependencies]
1260 self.
dependencies[calibration.name] = past_dependencies_names
1270 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1273 full_deps = full_past_dependencies[calibration.name]
1274 explicit_deps = [cal.name
for cal
in calibration.dependencies]
1275 for dep
in full_deps:
1276 if dep
not in explicit_deps:
1277 calibration.dependencies.append(self.
calibrations[dep])
1280 ordered_dependency_list = []
1281 for ordered_calibration_name
in order:
1282 if ordered_calibration_name
in [dep.name
for dep
in calibration.dependencies]:
1283 ordered_dependency_list.append(self.
calibrations[ordered_calibration_name])
1284 calibration.dependencies = ordered_dependency_list
1285 order = ordered_full_dependencies
1291 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1292 one that is stored isn't a valid Backend object) we should create a default Local backend.
1294 if not isinstance(self.
_backend, caf.backends.Backend):
1300 Checks all current calibrations and removes any invalid Collections from their collections list.
1302 B2INFO(
"Checking for any invalid Collections in Calibrations.")
1304 valid_collections = {}
1305 for name, collection
in calibration.collections.items():
1306 if collection.is_valid():
1307 valid_collections[name] = collection
1309 B2WARNING(f
"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1310 calibration.collections = valid_collections
1315 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1316 this IoV will be used in the collection step.
1318 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1319 and creates database payloads.
1321 Upload of final databases is not done here. This simply creates the local databases in
1322 the output directory. You should check the validity of your new local database before uploading
1323 to the conditions DB via the basf2 tools/interface to the DB.
1326 B2FATAL(
"There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1330 B2FATAL(
"Couldn't order the calibrations properly. Could be a cyclic dependency.")
1347 db_initial_calibrations = db.query(
"select * from calibrations").fetchall()
1351 calibration._db_path = self.
_db_path
1352 calibration.output_database_dir = Path(self.
output_dir, calibration.name,
"outputdb").as_posix()
1353 calibration.iov = iov
1354 if not calibration.backend:
1355 calibration.backend = self.
backend
1357 if calibration.name
not in [db_cal[0]
for db_cal
in db_initial_calibrations]:
1358 db.insert_calibration(calibration.name)
1361 for cal_info
in db_initial_calibrations:
1362 if cal_info[0] == calibration.name:
1363 cal_initial_state = cal_info[2]
1364 cal_initial_iteration = cal_info[3]
1365 B2INFO(f
"Previous entry in database found for {calibration.name}.")
1366 B2INFO(f
"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1367 calibration.state = cal_initial_state
1368 B2INFO(f
"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1369 calibration.iteration = cal_initial_iteration
1371 calibration.daemon =
True
1378 keep_running =
False
1380 remaining_calibrations = []
1384 if (calibration.state == CalibrationBase.end_state
or calibration.state == CalibrationBase.fail_state):
1386 if calibration.is_alive():
1387 B2DEBUG(29, f
"Joining {calibration.name}.")
1390 if calibration.dependencies_met():
1391 if not calibration.is_alive():
1392 B2DEBUG(29, f
"Starting {calibration.name}.")
1395 except RuntimeError:
1398 B2DEBUG(29, f
"{calibration.name} probably just finished, join it later.")
1399 remaining_calibrations.append(calibration)
1401 if not calibration.failed_dependencies():
1402 remaining_calibrations.append(calibration)
1403 if remaining_calibrations:
1408 for calibration
in remaining_calibrations:
1409 for job
in calibration.jobs_to_submit[:]:
1410 calibration.backend.submit(job)
1411 calibration.jobs_to_submit.remove(job)
1414 B2INFO(
"Printing summary of final CAF status.")
1415 with CAFDB(self.
_db_path, read_only=
True)
as db:
1416 print(db.output_calibration_table())
1421 The `backend <backends.Backend>` that runs the collector job.
1422 When set, this is checked that a `backends.Backend` class instance was passed in.
1430 if isinstance(backend, caf.backends.Backend):
1433 B2ERROR(
'Backend property must inherit from Backend class.')
1437 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1440 str: The absolute path of the new output_dir
1444 B2INFO(f
"{p.as_posix()} output directory already exists. "
1445 "We will try to restart from the previous finishing state.")
1448 p.mkdir(parents=
True)
1452 raise FileNotFoundError(f
"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1456 Creates the CAF status database. If it already exists we don't overwrite it.
1460 B2INFO(f
"Previous CAF database found {self._db_path}")