13 This module implements several objects/functions to configure and run calibrations.
14 These classes are used to construct the workflow of the calibration job.
15 The actual processing code is mostly in the `caf.state_machines` module.
18 __all__ = [
"CalibrationBase",
"Calibration",
"Algorithm",
"CAF"]
21 from threading
import Thread
22 from time
import sleep
23 from pathlib
import Path
27 from basf2
import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
28 from basf2
import find_file
29 from basf2
import conditions
as b2conditions
31 from abc
import ABC, abstractmethod
34 from caf.utils
import B2INFO_MULTILINE
35 from caf.utils
import past_from_future_dependencies
36 from caf.utils
import topological_sort
37 from caf.utils
import all_dependencies
38 from caf.utils
import method_dispatch
39 from caf.utils
import temporary_workdir
40 from caf.utils
import find_int_dirs
41 from caf.utils
import LocalDatabase
42 from caf.utils
import CentralDatabase
43 from caf.utils
import parse_file_uri
45 import caf.strategies
as strategies
46 import caf.runners
as runners
47 from caf.backends
import MaxSubjobsSplitter, MaxFilesSplitter
48 from caf.state_machines
import CalibrationMachine, ConditionError, MachineError
49 from caf.database
import CAFDB
55 collector (str, basf2.Module): The collector module or module name for this `Collection`.
56 input_files (list[str]): The input files to be used for only this `Collection`.
57 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
58 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
59 output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
60 `Algorithm.data_input` function. Setting this here, replaces the default completely.
61 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
62 Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
63 max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
64 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
65 max_files_for_collector_job.
66 backend_args (dict): The args for the backend submission of this `Collection`.
70 default_max_collector_jobs = 1000
73 job_config =
"collector_job.json"
78 pre_collector_path=None,
81 max_files_per_collector_job=None,
82 max_collector_jobs=None,
103 if pre_collector_path:
116 if max_files_per_collector_job
and max_collector_jobs:
117 B2FATAL(
"Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
118 elif max_files_per_collector_job:
120 elif max_collector_jobs:
141 for tag
in reversed(b2conditions.default_globaltags):
144 self.
job_scriptjob_script = Path(find_file(
"calibration/scripts/caf/run_collector_path.py")).absolute()
145 """The basf2 steering file that will be used for Collector jobs run by this collection.
146 This script will be copied into subjob directories as part of the input sandbox."""
149 self.
job_cmdjob_cmd = [
"basf2", self.
job_scriptjob_script.name,
"--job-information job_info.json"]
153 Remove everything in the database_chain of this Calibration, including the default central database
154 tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
161 global_tag (str): The central database global tag to use for this calibration.
163 Using this allows you to add a central database to the head of the global tag database chain for this collection.
164 The default database chain is just the central one from
165 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
166 The input file global tag will always be overrided and never used unless explicitly set.
168 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
169 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
171 Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
173 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
174 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
175 list which will be checked after all local database files have been checked.
177 So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
178 that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
180 central_db = CentralDatabase(global_tag)
186 filename (str): The path to the database.txt of the local database
187 directory (str): The path to the payloads directory for this local database.
189 Append a local database to the chain for this collection.
190 You can call this function multiple times and each database will be added to the chain IN ORDER.
191 The databases are applied to this collection ONLY.
193 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
194 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
195 list which will be checked after all local database files have been checked.
197 So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
198 that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
200 local_db = LocalDatabase(filename, directory)
207 input_file (str): A local file/glob pattern or XROOTD URI
210 list: A list of the URIs found from the initial string.
213 uri = parse_file_uri(input_file)
214 if uri.scheme ==
"file":
217 uris = [parse_file_uri(f).geturl()
for f
in glob(input_file)]
229 if isinstance(value, str):
232 elif isinstance(value, list):
235 for pattern
in value:
239 raise TypeError(
"Input files must be a list or string")
254 from basf2
import Module
255 if isinstance(collector, str):
256 from basf2
import register_module
257 collector = register_module(collector)
258 if not isinstance(collector, Module):
259 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
270 def max_collector_jobs(self):
272 return self.
splittersplitter.max_subjobs
276 @max_collector_jobs.setter
277 def max_collector_jobs(self, value):
281 self.
splittersplitter = MaxSubjobsSplitter(max_subjobs=value)
284 def max_files_per_collector_job(self):
286 return self.
splittersplitter.max_files_per_subjob
290 @max_files_per_collector_job.setter
291 def max_files_per_collector_job(self, value):
295 self.
splittersplitter = MaxFilesSplitter(max_files_per_subjob=value)
300 Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from
301 this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
302 class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
303 that doesn't depend on the C++ CAF at all and run everything in your own way.
305 .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
306 But it's there if you really need it.
309 name (str): Name of this calibration object. Should be unique if you are going to run it.
312 input_files (list[str]): Input files for this calibration. May contain wildcard expressions useable by `glob.glob`.
316 end_state =
"completed"
319 fail_state =
"failed"
357 The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
358 once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
364 A simple method you should implement that will return True or False depending on whether
365 the Calibration has been set up correctly and can be run safely.
371 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
373 Adds dependency of this calibration on another i.e. This calibration
374 will not run until the dependency has completed, and the constants produced
375 will be used via the database chain.
377 You can define multiple dependencies for a single calibration simply
378 by calling this multiple times. Be careful when adding the calibration into
379 the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
380 empty order and the `CAF` processing will fail.
382 This function appens to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
383 `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
386 if self.name != calibration.name:
388 if calibration
not in self.dependencies:
389 self.dependencies.append(calibration)
390 if self
not in calibration.dependencies:
391 calibration.future_dependencies.append(self)
393 B2WARNING((f
"Tried to add {calibration} as a dependency for {self} but they have the same name."
394 "Dependency was not added."))
398 Checks if all of the Calibrations that this one depends on have reached a successful end state.
400 return all(map(
lambda x: x.state == x.end_state, self.
dependenciesdependencies))
404 Returns the list of calibrations in our dependency list that have failed.
408 if calibration.state == self.
fail_statefail_state:
409 failed.append(calibration)
414 We pass in default calibration options from the `CAF` instance here if called.
415 Won't overwrite any options already set.
417 for key, value
in defaults.items():
419 if getattr(self, key)
is None:
420 setattr(self, key, value)
421 except AttributeError:
422 print(f
"The calibration {self.name} does not support the attribute {key}.")
427 Every Calibration object must have at least one collector at least one algorithm.
428 You have the option to add in your collector/algorithm by argument here, or add them
429 later by changing the properties.
431 If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
432 separately via `add_collection()`.
435 name (str): Name of this calibration. It should be unique for use in the `CAF`
437 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
438 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
439 input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards useable by `glob.glob`
441 A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
443 >>> cal = Calibration('TestCalibration1')
444 >>> col1 = register_module('CaTest')
445 >>> cal.add_collection('TestColl', col1)
449 >>> cal = Calibration('TestCalibration1', 'CaTest')
451 If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
453 >>> cal.pre_collector_path = my_basf2_path
455 You don't have to put a RootInput module in this pre-collection path, but you can if
456 you need some special parameters. If you want to process sroot files the you have to explicitly add
457 SeqRootInput to your pre-collection path.
458 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
461 You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
463 >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
465 you can change the input file list later on, before running with `CAF`
467 >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
469 If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
470 input_files, database chain etc from there. See `Collection`.
472 Adding the CalibrationAlgorithm(s) is easy
474 >>> alg1 = TestAlgo()
475 >>> cal.algorithms = alg1
479 >>> cal.algorithms = [alg1]
481 Or for multiple algorithms for one collector
483 >>> alg2 = TestAlgo()
484 >>> cal.algorithms = [alg1, alg2]
486 Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
487 `Algorithm`. To access the C++ algorithm clas underneath directly do:
489 >>> cal.algorithms[i].algorithm
491 If you have a setup function that you want to run before each of the algorithms, set that with
493 >>> cal.pre_algorithms = my_function_object
495 If you want a different setup for each algorithm use a list with the same number of elements
496 as your algorithm list.
498 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
500 You can also specify the dependencies of the calibration on others
502 >>> cal.depends_on(cal2)
504 By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
505 calibration constants created by earlier completed calibrations to dependent ones.
508 moves = [
"submit_collector",
"complete",
"run_algorithms",
"iterate",
"fail_fully"]
510 alg_output_dir =
"algorithm_output"
512 checkpoint_states = [
"init",
"collector_completed",
"completed"]
514 default_collection_name =
"default"
521 pre_collector_path=None,
523 output_patterns=None,
524 max_files_per_collector_job=None,
525 max_collector_jobs=None,
543 max_files_per_collector_job,
576 for tag
in reversed(b2conditions.default_globaltags):
600 name (str): Unique name of this `Collection` in the Calibration.
601 collection (`Collection`): `Collection` object to use.
603 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
604 A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
606 You can ignore the default one and only add your own custom Collections. You can configure the default from the
607 Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
613 B2WARNING(f
"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
614 "Please use another name.")
618 A full calibration consists of a collector AND an associated algorithm AND input_files.
621 1) We are missing any of the above.
622 2) There are multiple Collections and the Collectors have mis-matched granularities.
623 3) Any of our Collectors have granularities that don't match what our Strategy can use.
626 B2WARNING(f
"Empty algorithm list for {self.name}.")
629 if not any([collection.is_valid()
for collection
in self.
collectionscollections.values()]):
630 B2WARNING(f
"No valid Collections for {self.name}.")
634 for collection
in self.
collectionscollections.values():
635 if collection.is_valid():
636 collector_params = collection.collector.available_params()
637 for param
in collector_params:
638 if param.name ==
"granularity":
639 granularities.append(param.values)
640 if len(set(granularities)) > 1:
641 B2WARNING(
"Multiple different granularities set for the Collections in this Calibration.")
645 alg_type = type(alg.algorithm).__name__
646 incorrect_gran = [granularity
not in alg.strategy.allowed_granularities
for granularity
in granularities]
647 if any(incorrect_gran):
648 B2WARNING(f
"Selected strategy for {alg_type} does not match a collector's granularity.")
655 apply_to_default_collection (bool): Should we also reset the default collection?
657 Remove everything in the database_chain of this Calibration, including the default central database tag automatically
658 included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
659 database chain of any `Collection` other than the default one. You can prevent the default Collection from having its chain
660 reset by setting 'apply_to_default_collection' to False.
669 global_tag (str): The central database global tag to use for this calibration.
672 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
674 Using this allows you to append a central database to the database chain for this calibration.
675 The default database chain is just the central one from
676 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
677 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
678 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
680 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
681 and the algorithm processes. So calling:
683 >> cal.use_central_database("global_tag")
685 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
688 >> cal.collections['default'].database_chain
692 >> cal.use_central_database(file_path, payload_dir, False)
694 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
695 So if you have multiple Collections in this Calibration *their database chains are separate*.
696 To specify an additional `CentralDatabase` for a different collection, you will have to call:
698 >> cal.collections['OtherCollection'].use_central_database("global_tag")
700 central_db = CentralDatabase(global_tag)
708 filename (str): The path to the database.txt of the local database
711 directory (str): The path to the payloads directory for this local database.
712 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
714 Append a local database to the chain for this calibration.
715 You can call this function multiple times and each database will be added to the chain IN ORDER.
716 The databases are applied to this calibration ONLY.
717 The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
718 the default `Collection` job as a database chain.
719 There are other databases applied to the processes later, checked by basf2 in this order:
721 1) Local Database from previous iteration of this Calibration.
722 2) Local Database chain from output of previous dependent Calibrations.
723 3) This chain of Local and Central databases where the last added is checked first.
725 Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
726 'apply_to_default_collection' remains True. So calling:
728 >> cal.use_local_database(file_path, payload_dir)
730 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
733 >> cal.collections['default'].database_chain
737 >> cal.use_local_database(file_path, payload_dir, False)
739 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
741 If you have multiple Collections in this Calibration *their database chains are separate*.
742 To specify an additional `LocalDatabase` for a different collection, you will have to call:
744 >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
747 local_db = LocalDatabase(filename, directory)
752 def _get_default_collection_attribute(self, attr):
756 B2WARNING(f
"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
757 "but the default collection doesn't exist."
758 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
759 "collection's attributes directly.")
762 def _set_default_collection_attribute(self, attr, value):
766 B2WARNING(f
"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
767 "but the default collection doesn't exist."
768 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
769 "collection's attributes directly.")
783 from basf2
import Module
784 if isinstance(collector, str):
785 from basf2
import register_module
786 collector = register_module(collector)
787 if not isinstance(collector, Module):
788 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
810 @files_to_iovs.setter
822 @pre_collector_path.setter
834 @output_patterns.setter
846 @max_files_per_collector_job.setter
858 @max_collector_jobs.setter
887 from ROOT.Belle2
import CalibrationAlgorithm
888 if isinstance(value, CalibrationAlgorithm):
891 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
892 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
894 @algorithms.fset.register(tuple)
895 @algorithms.fset.register(list)
898 Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
900 from ROOT.Belle2
import CalibrationAlgorithm
904 if isinstance(alg, CalibrationAlgorithm):
907 B2ERROR((f
"Something other than CalibrationAlgorithm instance passed in {type(value)}."
908 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm"))
913 Callback run prior to each algorithm iteration.
917 @pre_algorithms.setter
924 alg.pre_algorithm = func
926 B2ERROR(
"Something evaluated as False passed in as pre_algorithm function.")
928 @pre_algorithms.fset.register(tuple)
929 @pre_algorithms.fset.register(list)
932 Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
937 alg.pre_algorithm = func
939 B2ERROR(
"Number of functions and number of algorithms doesn't match.")
941 B2ERROR(
"Empty container passed in for pre_algorithm functions")
946 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
957 alg.strategy = strategy
959 B2ERROR(
"Something evaluated as False passed in as a strategy.")
961 @strategies.fset.register(tuple)
962 @strategies.fset.register(list)
965 Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
970 alg.strategy = strategy
972 B2ERROR(
"Number of strategies and number of algorithms doesn't match.")
974 B2ERROR(
"Empty container passed in for strategies list")
983 Main logic of the Calibration object.
984 Will be run in a new Thread by calling the start() method.
986 with CAFDB(self.
_db_path_db_path, read_only=
True)
as db:
987 initial_state = db.get_calibration_value(self.
namename,
"checkpoint")
988 initial_iteration = db.get_calibration_value(self.
namename,
"iteration")
989 B2INFO(
"Initial status of {} found to be state={}, iteration={}".format(self.
namename,
992 self.
machinemachine = CalibrationMachine(self,
993 iov_to_calibrate=self.
ioviov,
994 initial_state=initial_state,
995 iteration=initial_iteration)
997 self.
machinemachine.root_dir = Path(os.getcwd(), self.
namename)
1002 all_iteration_paths = find_int_dirs(self.
machinemachine.root_dir)
1003 for iteration_path
in all_iteration_paths:
1004 if int(iteration_path.name) > initial_iteration:
1005 shutil.rmtree(iteration_path)
1010 B2INFO(f
"Attempting collector submission for calibration {self.name}.")
1012 except Exception
as err:
1019 self.
machinemachine.fail_fully()
1026 B2INFO(f
"Attempting to run algorithms for calibration {self.name}.")
1027 self.
machinemachine.run_algorithms()
1028 except MachineError
as err:
1033 if self.
machinemachine.state ==
"algorithms_failed":
1034 self.
machinemachine.fail_fully()
1042 self.
machinemachine.complete()
1044 except ConditionError:
1046 B2DEBUG(29, f
"Checking if collector jobs for calibration {self.name} have failed.")
1048 except ConditionError:
1055 The current major state of the calibration in the database file. The machine may have a different state.
1057 with CAFDB(self.
_db_path_db_path, read_only=
True)
as db:
1058 state = db.get_calibration_value(self.
namename,
"state")
1062 def state(self, state):
1065 B2DEBUG(29, f
"Setting {self.name} to state {state}.")
1066 with CAFDB(self.
_db_path_db_path)
as db:
1067 db.update_calibration_value(self.
namename,
"state", str(state))
1069 db.update_calibration_value(self.
namename,
"checkpoint", str(state))
1070 B2DEBUG(29, f
"{self.name} set to {state}.")
1075 Retrieves the current iteration number in the database file.
1078 int: The current iteration number
1080 with CAFDB(self.
_db_path_db_path, read_only=
True)
as db:
1081 iteration = db.get_calibration_value(self.
namename,
"iteration")
1088 B2DEBUG(29, f
"Setting {self.name} to {iteration}.")
1089 with CAFDB(self.
_db_path_db_path)
as db:
1090 db.update_calibration_value(self.
namename,
"iteration", iteration)
1091 B2DEBUG(29, f
"{self.name} set to {self.iteration}.")
1097 algorithm: The CalibrationAlgorithm instance that we want to execute.
1099 data_input (types.FunctionType): An optional function that sets the input files of the algorithm.
1100 pre_algorithm (types.FunctionType): An optional function that runs just prior to execution of the algorithm.
1101 Useful for set up e.g. module initialisation
1103 This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1104 It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1105 than separating the logic into those classes directly.
1107 This is **not** currently a class that a user should interact with much during `CAF`
1108 setup (unless you're doing something advanced).
1109 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1111 Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1112 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1113 is often useful i.e. by calling for the Geometry module to initialise.
1116 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1122 cppname = type(algorithm).__cpp_name__
1123 self.
namename = cppname[cppname.rfind(
'::') + 2:]
1146 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1147 by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1148 for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1150 collector_output_files = list(
filter(
lambda file_path:
"CollectorOutput.root" == Path(file_path).name,
1152 info_lines = [f
"Input files used in {self.name}:"]
1153 info_lines.extend(collector_output_files)
1154 B2INFO_MULTILINE(info_lines)
1155 self.
algorithmalgorithm.setInputFileNames(collector_output_files)
1161 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1163 >>> calibration_defaults={"max_iterations":2}
1165 This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1166 for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1168 The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1169 `CalibrationBase.start` when the dependencies are met.
1171 Much of the checking for consistency is done in this class so that no processing is done with an invalid
1172 setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1173 `CalibrationBase` instances.
1177 _db_name =
"caf_state.db"
1179 default_calibration_config = {
1180 "max_iterations": 5,
1204 if not calibration_defaults:
1205 calibration_defaults = {}
1214 Adds a `Calibration` that is to be used in this program to the list.
1215 Also adds an empty dependency list to the overall dictionary.
1216 You should not directly alter a `Calibration` object after it has been
1219 if calibration.is_valid():
1220 if calibration.name
not in self.
calibrationscalibrations:
1221 self.
calibrationscalibrations[calibration.name] = calibration
1223 B2WARNING(f
"Tried to add a calibration with the name {calibration.name} twice.")
1225 B2WARNING((f
"Tried to add incomplete/invalid calibration ({calibration.name}) to the framwork."
1226 "It was not added and will not be part of the final process."))
1230 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1231 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1234 calibration_names = [calibration.name
for calibration
in self.
calibrationscalibrations.values()]
1236 def is_dependency_in_caf(dependency):
1238 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1240 dependency_in_caf = dependency.name
in calibration_names
1241 if not dependency_in_caf:
1242 B2WARNING(f
"The calibration {dependency.name} is a required dependency but is not in the CAF."
1243 " It has been removed as a dependency.")
1244 return dependency_in_caf
1248 for calibration
in self.
calibrationscalibrations.values():
1249 filtered_future_dependencies = list(
filter(is_dependency_in_caf, calibration.future_dependencies))
1250 calibration.future_dependencies = filtered_future_dependencies
1252 filtered_dependencies = list(
filter(is_dependency_in_caf, calibration.dependencies))
1253 calibration.dependencies = filtered_dependencies
1257 - Uses dependency atrributes of calibrations to create a dependency dictionary and passes it
1258 to a sorting algorithm.
1259 - Returns valid OrderedDict if sort was succesful, empty one if it failed (most likely a cyclic dependency)
1265 for calibration
in self.
calibrationscalibrations.values():
1266 future_dependencies_names = [dependency.name
for dependency
in calibration.future_dependencies]
1267 past_dependencies_names = [dependency.name
for dependency
in calibration.dependencies]
1270 self.
dependenciesdependencies[calibration.name] = past_dependencies_names
1277 ordered_full_dependencies = all_dependencies(self.
future_dependenciesfuture_dependencies, order)
1280 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1282 for calibration
in self.
calibrationscalibrations.values():
1283 full_deps = full_past_dependencies[calibration.name]
1284 explicit_deps = [cal.name
for cal
in calibration.dependencies]
1285 for dep
in full_deps:
1286 if dep
not in explicit_deps:
1287 calibration.dependencies.append(self.
calibrationscalibrations[dep])
1290 ordered_dependency_list = []
1291 for ordered_calibration_name
in order:
1292 if ordered_calibration_name
in [dep.name
for dep
in calibration.dependencies]:
1293 ordered_dependency_list.append(self.
calibrationscalibrations[ordered_calibration_name])
1294 calibration.dependencies = ordered_dependency_list
1295 order = ordered_full_dependencies
1301 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1302 one that is stored isn't a valid Backend object) we should create a default Local backend.
1304 if not isinstance(self.
_backend_backend, caf.backends.Backend):
1310 Checks all current calibrations and removes any invalid Collections from their collections list.
1312 B2INFO(
"Checking for any invalid Collections in Calibrations.")
1313 for calibration
in self.
calibrationscalibrations.values():
1314 valid_collections = {}
1315 for name, collection
in calibration.collections.items():
1316 if collection.is_valid():
1317 valid_collections[name] = collection
1319 B2WARNING(f
"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1320 calibration.collections = valid_collections
1325 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1326 this IoV will be used in the collection step.
1328 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1329 and creates database payloads.
1331 Upload of final databases is not done here. This simply creates the local databases in
1332 the output directory. You should check the validity of your new local database before uploading
1333 to the conditions DB via the basf2 tools/interface to the DB.
1336 B2FATAL(
"There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1340 B2FATAL(
"Couldn't order the calibrations properly. Could be a cyclic dependency.")
1354 with temporary_workdir(self.
output_diroutput_dir):
1357 db_initial_calibrations = db.query(
"select * from calibrations").fetchall()
1358 for calibration
in self.
calibrationscalibrations.values():
1361 calibration._db_path = self.
_db_path_db_path
1362 calibration.output_database_dir = Path(self.
output_diroutput_dir, calibration.name,
"outputdb").as_posix()
1363 calibration.iov = iov
1364 if not calibration.backend:
1367 if calibration.name
not in [db_cal[0]
for db_cal
in db_initial_calibrations]:
1368 db.insert_calibration(calibration.name)
1371 for cal_info
in db_initial_calibrations:
1372 if cal_info[0] == calibration.name:
1373 cal_initial_state = cal_info[2]
1374 cal_initial_iteration = cal_info[3]
1375 B2INFO(f
"Previous entry in database found for {calibration.name}.")
1376 B2INFO(f
"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1377 calibration.state = cal_initial_state
1378 B2INFO(f
"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1379 calibration.iteration = cal_initial_iteration
1381 calibration.daemon =
True
1388 keep_running =
False
1390 remaining_calibrations = []
1392 for calibration
in self.
calibrationscalibrations.values():
1394 if (calibration.state == CalibrationBase.end_state
or calibration.state == CalibrationBase.fail_state):
1396 if calibration.is_alive():
1397 B2DEBUG(29, f
"Joining {calibration.name}.")
1400 if calibration.dependencies_met():
1401 if not calibration.is_alive():
1402 B2DEBUG(29, f
"Starting {calibration.name}.")
1405 except RuntimeError:
1408 B2DEBUG(29, f
"{calibration.name} probably just finished, join it later.")
1409 remaining_calibrations.append(calibration)
1411 if not calibration.failed_dependencies():
1412 remaining_calibrations.append(calibration)
1413 if remaining_calibrations:
1418 for calibration
in remaining_calibrations:
1419 for job
in calibration.jobs_to_submit[:]:
1420 calibration.backend.submit(job)
1421 calibration.jobs_to_submit.remove(job)
1424 B2INFO(
"Printing summary of final CAF status.")
1425 with CAFDB(self.
_db_path_db_path, read_only=
True)
as db:
1426 print(db.output_calibration_table())
1431 The `backend <backends.Backend>` that runs the collector job.
1432 When set, this is checked that a `backends.Backend` class instance was passed in.
1440 if isinstance(backend, caf.backends.Backend):
1443 B2ERROR(
'Backend property must inherit from Backend class.')
1447 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1450 str: The absolute path of the new output_dir
1452 p = Path(self.
output_diroutput_dir).resolve()
1454 B2INFO(f
"{p.as_posix()} output directory already exists. "
1455 "We will try to restart from the previous finishing state.")
1458 p.mkdir(parents=
True)
1462 raise FileNotFoundError(f
"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1466 Creates the CAF status database. If it already exists we don't overwrite it.
1470 B2INFO(f
"Previous CAF database found {self._db_path}")
pre_algorithm
Function called after data_input but before algorithm.execute to do any remaining setup.
data_input
Function called before the pre_algorithm method to setup the input data that the CalibrationAlgorithm...
params
Parameters that could be used in the execution of the algorithm strategy/runner to modify behaviour.
def __init__(self, algorithm, data_input=None, pre_algorithm=None)
algorithm
CalibrationAlgorithm instance (assumed to be true since the Calibration class checks)
strategy
The algorithm stratgey that will be used when running over the collected data.
name
The name of the algorithm, default is the Algorithm class name.
def default_inputdata_setup(self, input_file_paths)
def backend(self, backend)
def _prune_invalid_collections(self)
calibrations
Dictionary of calibrations for this CAF instance.
heartbeat
The heartbeat (seconds) between polling for Calibrations that are finished.
future_dependencies
Dictionary of future dependencies of Calibration objects, where the value is all calibrations that wi...
calibration_defaults
Default options applied to each calibration known to the CAF, if the Calibration has these defined by...
_backend
Private backend attribute.
_db_path
The path of the SQLite DB.
def _order_calibrations(self)
order
The ordering and explicit future dependencies of calibrations.
string _db_name
The name of the SQLite DB that gets created.
def _make_output_dir(self)
def add_calibration(self, calibration)
dictionary default_calibration_config
The defaults for Calibrations.
output_dir
Output path to store results of calibration and bookkeeping information.
dependencies
Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects tha...
def _remove_missing_dependencies(self)
def __init__(self, calibration_defaults=None)
def __init__(self, name, input_files=None)
output_database_dir
The directory where we'll store the local database payloads from this calibration.
input_files
Files used for collection procedure.
save_payloads
Marks this Calibration as one which has payloads that should be copied and uploaded.
def depends_on(self, calibration)
files_to_iovs
File -> Iov dictionary, should be : {absolute_file_path:iov} : Where iov is a :py:class:IoV <caf....
future_dependencies
List of calibration objects that depend on this one.
name
Name of calibration object.
jobs_to_submit
A simple list of jobs that this Calibration wants submitted at some point.
string end_state
The name of the successful completion state.
string fail_state
The name of the failure state.
def _apply_calibration_defaults(self, defaults)
def dependencies_met(self)
def failed_dependencies(self)
iov
IoV which will be calibrated.
dependencies
List of calibration objects, where each one is a dependency of this one.
def _poll_collector(self)
def algorithms(self, value)
def use_central_database(self, global_tag, apply_to_default_collection=True)
collections
Collections stored for this calibration.
ignored_runs
List of ExpRun that will be ignored by this Calibration.
results
Output results of algorithms for each iteration.
strategies
The strategy that the algorithm(s) will be run against.
def __init__(self, name, collector=None, algorithms=None, input_files=None, pre_collector_path=None, database_chain=None, output_patterns=None, max_files_per_collector_job=None, max_collector_jobs=None, backend_args=None)
_algorithms
Internal calibration algorithms stored for this calibration.
def use_local_database(self, filename, directory="", apply_to_default_collection=True)
def _get_default_collection_attribute(self, attr)
heartbeat
This calibration's sleep time before rechecking to see if it can move state.
def strategies(self, strategy)
machine
The caf.state_machines.CalibrationMachine that we will run to process this calibration start to finis...
def max_files_per_collector_job(self)
def _set_default_collection_attribute(self, attr, value)
list checkpoint_states
Checkpoint states which we are allowed to restart from.
database_chain
The database chain that is applied to the algorithms.
_db_path
Location of a SQLite database that will save the state of the calibration so that it can be restarted...
max_iterations
Variable to define the maximum number of iterations for this calibration specifically.
def pre_collector_path(self)
def add_collection(self, name, collection)
collector_full_update_interval
While checking if the collector is finished we don't bother wastefully checking every subjob's status...
def reset_database(self, apply_to_default_collection=True)
def output_patterns(self)
algorithms
Algorithm classes that wil be run by this Calibration.
backend
The backend <backends.Backend> we'll use for our collector submission in this calibration.
algorithms_runner
The class that runs all the algorithms in this Calibration using their assigned :py:class:caf....
string default_collection_name
Default collection name.
def max_collector_jobs(self)
int default_max_collector_jobs
The default maximum number of collector jobs to create.
input_files
Internal input_files stored for this calibration.
_collector
Internal storage of collector attribute.
def max_collector_jobs(self, value)
files_to_iovs
File -> Iov dictionary, should be : {absolute_file_path:iov} : Where iov is a :py:class:IoV <caf....
def max_files_per_collector_job(self)
database_chain
The database chain used for this Collection.
def input_files(self, value)
output_patterns
Output patterns of files produced by collector which will be used to pass to the Algorithm....
splitter
The SubjobSplitter to use when constructing collector subjobs from the overall Job object.
job_cmd
The Collector caf.backends.Job.cmd attribute.
def use_local_database(self, filename, directory="")
backend_args
Dictionary passed to the collector Job object to configure how the caf.backends.Backend instance shou...
def max_files_per_collector_job(self, value)
pre_collector_path
Since many collectors require some different setup, if you set this attribute to a basf2....
max_files_per_collector_job
collector
Collector module of this collection.
def uri_list_from_input_file(input_file)
def collector(self, collector)
def max_collector_jobs(self)
def use_central_database(self, global_tag)
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved