15 This module implements several objects/functions to configure and run calibrations.
16 These classes are used to construct the workflow of the calibration job.
17 The actual processing code is mostly in the `caf.state_machines` module.
20 __all__ = [
"CalibrationBase",
"Calibration",
"Algorithm",
"CAF"]
23 from threading
import Thread
24 from time
import sleep
25 from pathlib
import Path
29 from basf2
import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
30 from basf2
import find_file
31 from basf2
import conditions
as b2conditions
33 from abc
import ABC, abstractmethod
36 from caf.utils
import B2INFO_MULTILINE
37 from caf.utils
import past_from_future_dependencies
38 from caf.utils
import topological_sort
39 from caf.utils
import all_dependencies
40 from caf.utils
import method_dispatch
41 from caf.utils
import temporary_workdir
42 from caf.utils
import find_int_dirs
43 from caf.utils
import LocalDatabase
44 from caf.utils
import CentralDatabase
45 from caf.utils
import parse_file_uri
47 import caf.strategies
as strategies
48 import caf.runners
as runners
49 from caf.backends
import MaxSubjobsSplitter, MaxFilesSplitter
50 from caf.state_machines
import CalibrationMachine, ConditionError, MachineError
51 from caf.database
import CAFDB
57 collector (str, basf2.Module): The collector module or module name for this `Collection`.
58 input_files (list[str]): The input files to be used for only this `Collection`.
59 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
60 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
61 output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
62 `Algorithm.data_input` function. Setting this here, replaces the default completely.
63 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
64 Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
65 max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
66 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
67 max_files_for_collector_job.
68 backend_args (dict): The args for the backend submission of this `Collection`.
72 default_max_collector_jobs = 1000
75 job_config =
"collector_job.json"
80 pre_collector_path=None,
83 max_files_per_collector_job=None,
84 max_collector_jobs=None,
88 self.collector = collector
92 self.input_files = input_files
99 self.files_to_iovs = {}
104 self.pre_collector_path =
None
105 if pre_collector_path:
106 self.pre_collector_path = pre_collector_path
111 self.output_patterns = [
"CollectorOutput.root"]
113 self.output_patterns = output_patterns
118 if max_files_per_collector_job
and max_collector_jobs:
119 B2FATAL(
"Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
120 elif max_files_per_collector_job:
121 self.max_files_per_collector_job = max_files_per_collector_job
122 elif max_collector_jobs:
123 self.max_collector_jobs = max_collector_jobs
125 self.max_collector_jobs = self.default_max_collector_jobs
129 self.backend_args = {}
131 self.backend_args = backend_args
137 self.database_chain = database_chain
139 self.database_chain = []
143 for tag
in reversed(b2conditions.default_globaltags):
144 self.use_central_database(tag)
146 self.job_script = Path(find_file(
"calibration/scripts/caf/run_collector_path.py")).absolute()
147 """The basf2 steering file that will be used for Collector jobs run by this collection.
148 This script will be copied into subjob directories as part of the input sandbox."""
151 self.job_cmd = [
"basf2", self.job_script.name,
"--job-information job_info.json"]
153 def reset_database(self):
155 Remove everything in the database_chain of this Calibration, including the default central database
156 tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
158 self.database_chain = []
160 def use_central_database(self, global_tag):
163 global_tag (str): The central database global tag to use for this calibration.
165 Using this allows you to add a central database to the head of the global tag database chain for this collection.
166 The default database chain is just the central one from
167 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
168 The input file global tag will always be overrided and never used unless explicitly set.
170 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
171 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
173 Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
175 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
176 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
177 list which will be checked after all local database files have been checked.
179 So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
180 that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
182 central_db = CentralDatabase(global_tag)
183 self.database_chain.append(central_db)
185 def use_local_database(self, filename, directory=""):
188 filename (str): The path to the database.txt of the local database
189 directory (str): The path to the payloads directory for this local database.
191 Append a local database to the chain for this collection.
192 You can call this function multiple times and each database will be added to the chain IN ORDER.
193 The databases are applied to this collection ONLY.
195 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
196 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
197 list which will be checked after all local database files have been checked.
199 So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
200 that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
202 local_db = LocalDatabase(filename, directory)
203 self.database_chain.append(local_db)
206 def uri_list_from_input_file(input_file):
209 input_file (str): A local file/glob pattern or XROOTD URI
212 list: A list of the URIs found from the initial string.
215 uri = parse_file_uri(input_file)
216 if uri.scheme ==
"file":
219 uris = [parse_file_uri(f).geturl()
for f
in glob(input_file)]
226 def input_files(self):
227 return self._input_files
230 def input_files(self, value):
231 if isinstance(value, str):
233 self._input_files = self.uri_list_from_input_file(value)
234 elif isinstance(value, list):
237 for pattern
in value:
238 total_files.extend(self.uri_list_from_input_file(pattern))
239 self._input_files = total_files
241 raise TypeError(
"Input files must be a list or string")
247 return self._collector
256 from basf2
import Module
257 if isinstance(collector, str):
258 from basf2
import register_module
259 collector = register_module(collector)
260 if not isinstance(collector, Module):
261 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
263 self._collector = collector
266 if (
not self.collector
or not self.input_files):
272 def max_collector_jobs(self):
274 return self.splitter.max_subjobs
278 @max_collector_jobs.setter
279 def max_collector_jobs(self, value):
283 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
286 def max_files_per_collector_job(self):
288 return self.splitter.max_files_per_subjob
292 @max_files_per_collector_job.setter
293 def max_files_per_collector_job(self, value):
297 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
300 class CalibrationBase(ABC, Thread):
302 Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from
303 this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
304 class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
305 that doesn't depend on the C++ CAF at all and run everything in your own way.
307 .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
308 But it's there if you really need it.
311 name (str): Name of this calibration object. Should be unique if you are going to run it.
314 input_files (list[str]): Input files for this calibration. May contain wildcard expressions useable by `glob.glob`.
318 end_state =
"completed"
321 fail_state =
"failed"
323 def __init__(self, name, input_files=None):
330 self.future_dependencies = []
332 self.dependencies = []
339 self.files_to_iovs = {}
342 self.input_files = input_files
344 self.input_files = []
349 self.output_database_dir =
""
352 self.save_payloads =
True
354 self.jobs_to_submit = []
359 The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
360 once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
366 A simple method you should implement that will return True or False depending on whether
367 the Calibration has been set up correctly and can be run safely.
370 def depends_on(self, calibration):
373 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
375 Adds dependency of this calibration on another i.e. This calibration
376 will not run until the dependency has completed, and the constants produced
377 will be used via the database chain.
379 You can define multiple dependencies for a single calibration simply
380 by calling this multiple times. Be careful when adding the calibration into
381 the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
382 empty order and the `CAF` processing will fail.
384 This function appens to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
385 `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
388 if self.name != calibration.name:
390 if calibration
not in self.dependencies:
391 self.dependencies.append(calibration)
392 if self
not in calibration.dependencies:
393 calibration.future_dependencies.append(self)
395 B2WARNING(f
"Tried to add {calibration} as a dependency for {self} but they have the same name."
396 "Dependency was not added.")
398 def dependencies_met(self):
400 Checks if all of the Calibrations that this one depends on have reached a successful end state.
402 return all(map(
lambda x: x.state == x.end_state, self.dependencies))
404 def failed_dependencies(self):
406 Returns the list of calibrations in our dependency list that have failed.
409 for calibration
in self.dependencies:
410 if calibration.state == self.fail_state:
411 failed.append(calibration)
414 def _apply_calibration_defaults(self, defaults):
416 We pass in default calibration options from the `CAF` instance here if called.
417 Won't overwrite any options already set.
419 for key, value
in defaults.items():
421 if getattr(self, key)
is None:
422 setattr(self, key, value)
423 except AttributeError:
424 print(f
"The calibration {self.name} does not support the attribute {key}.")
429 Every Calibration object must have at least one collector at least one algorithm.
430 You have the option to add in your collector/algorithm by argument here, or add them
431 later by changing the properties.
433 If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
434 separately via `add_collection()`.
437 name (str): Name of this calibration. It should be unique for use in the `CAF`
439 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
440 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
441 input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards useable by `glob.glob`
443 A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
445 >>> cal = Calibration('TestCalibration1')
446 >>> col1 = register_module('CaTest')
447 >>> cal.add_collection('TestColl', col1)
451 >>> cal = Calibration('TestCalibration1', 'CaTest')
453 If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
455 >>> cal.pre_collector_path = my_basf2_path
457 You don't have to put a RootInput module in this pre-collection path, but you can if
458 you need some special parameters. If you want to process sroot files the you have to explicitly add
459 SeqRootInput to your pre-collection path.
460 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
463 You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
465 >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
467 you can change the input file list later on, before running with `CAF`
469 >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
471 If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
472 input_files, database chain etc from there. See `Collection`.
474 Adding the CalibrationAlgorithm(s) is easy
476 >>> alg1 = TestAlgo()
477 >>> cal.algorithms = alg1
481 >>> cal.algorithms = [alg1]
483 Or for multiple algorithms for one collector
485 >>> alg2 = TestAlgo()
486 >>> cal.algorithms = [alg1, alg2]
488 Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
489 `Algorithm`. To access the C++ algorithm clas underneath directly do:
491 >>> cal.algorithms[i].algorithm
493 If you have a setup function that you want to run before each of the algorithms, set that with
495 >>> cal.pre_algorithms = my_function_object
497 If you want a different setup for each algorithm use a list with the same number of elements
498 as your algorithm list.
500 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
502 You can also specify the dependencies of the calibration on others
504 >>> cal.depends_on(cal2)
506 By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
507 calibration constants created by earlier completed calibrations to dependent ones.
510 moves = [
"submit_collector",
"complete",
"run_algorithms",
"iterate",
"fail_fully"]
512 alg_output_dir =
"algorithm_output"
514 checkpoint_states = [
"init",
"collector_completed",
"completed"]
516 default_collection_name =
"default"
523 pre_collector_path=None,
525 output_patterns=None,
526 max_files_per_collector_job=None,
527 max_collector_jobs=None,
533 self.collections = {}
535 self._algorithms = []
539 self.add_collection(self.default_collection_name,
545 max_files_per_collector_job,
550 super().__init__(name, input_files)
555 self.algorithms = algorithms
560 self.max_iterations =
None
565 self.ignored_runs =
None
570 self.strategies = strategies.SingleIOV
574 self.database_chain = database_chain
576 self.database_chain = []
578 for tag
in reversed(b2conditions.default_globaltags):
579 self.use_central_database(tag, apply_to_default_collection=
False)
583 self.algorithms_runner = runners.SeqAlgorithmsRunner
591 self.collector_full_update_interval = 30
599 def add_collection(self, name, collection):
602 name (str): Unique name of this `Collection` in the Calibration.
603 collection (`Collection`): `Collection` object to use.
605 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
606 A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
608 You can ignore the default one and only add your own custom Collections. You can configure the default from the
609 Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
612 if name
not in self.collections:
613 self.collections[name] = collection
615 B2WARNING(f
"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
616 "Please use another name.")
620 A full calibration consists of a collector AND an associated algorithm AND input_files.
623 1) We are missing any of the above.
624 2) There are multiple Collections and the Collectors have mis-matched granularities.
625 3) Any of our Collectors have granularities that don't match what our Strategy can use.
627 if not self.algorithms:
628 B2WARNING(f
"Empty algorithm list for {self.name}.")
631 if not any([collection.is_valid()
for collection
in self.collections.values()]):
632 B2WARNING(f
"No valid Collections for {self.name}.")
636 for collection
in self.collections.values():
637 if collection.is_valid():
638 collector_params = collection.collector.available_params()
639 for param
in collector_params:
640 if param.name ==
"granularity":
641 granularities.append(param.values)
642 if len(set(granularities)) > 1:
643 B2WARNING(
"Multiple different granularities set for the Collections in this Calibration.")
646 for alg
in self.algorithms:
647 alg_type = type(alg.algorithm).__name__
648 incorrect_gran = [granularity
not in alg.strategy.allowed_granularities
for granularity
in granularities]
649 if any(incorrect_gran):
650 B2WARNING(f
"Selected strategy for {alg_type} does not match a collector's granularity.")
654 def reset_database(self, apply_to_default_collection=True):
657 apply_to_default_collection (bool): Should we also reset the default collection?
659 Remove everything in the database_chain of this Calibration, including the default central database tag automatically
660 included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
661 database chain of any `Collection` other than the default one. You can prevent the default Collection from having its chain
662 reset by setting 'apply_to_default_collection' to False.
664 self.database_chain = []
665 if self.default_collection_name
in self.collections
and apply_to_default_collection:
666 self.collections[self.default_collection_name].reset_database()
668 def use_central_database(self, global_tag, apply_to_default_collection=True):
671 global_tag (str): The central database global tag to use for this calibration.
674 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
676 Using this allows you to append a central database to the database chain for this calibration.
677 The default database chain is just the central one from
678 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
679 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
680 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
682 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
683 and the algorithm processes. So calling:
685 >> cal.use_central_database("global_tag")
687 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
690 >> cal.collections['default'].database_chain
694 >> cal.use_central_database(file_path, payload_dir, False)
696 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
697 So if you have multiple Collections in this Calibration *their database chains are separate*.
698 To specify an additional `CentralDatabase` for a different collection, you will have to call:
700 >> cal.collections['OtherCollection'].use_central_database("global_tag")
702 central_db = CentralDatabase(global_tag)
703 self.database_chain.append(central_db)
704 if self.default_collection_name
in self.collections
and apply_to_default_collection:
705 self.collections[self.default_collection_name].use_central_database(global_tag)
707 def use_local_database(self, filename, directory="", apply_to_default_collection=True):
710 filename (str): The path to the database.txt of the local database
713 directory (str): The path to the payloads directory for this local database.
714 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
716 Append a local database to the chain for this calibration.
717 You can call this function multiple times and each database will be added to the chain IN ORDER.
718 The databases are applied to this calibration ONLY.
719 The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
720 the default `Collection` job as a database chain.
721 There are other databases applied to the processes later, checked by basf2 in this order:
723 1) Local Database from previous iteration of this Calibration.
724 2) Local Database chain from output of previous dependent Calibrations.
725 3) This chain of Local and Central databases where the last added is checked first.
727 Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
728 'apply_to_default_collection' remains True. So calling:
730 >> cal.use_local_database(file_path, payload_dir)
732 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
735 >> cal.collections['default'].database_chain
739 >> cal.use_local_database(file_path, payload_dir, False)
741 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
743 If you have multiple Collections in this Calibration *their database chains are separate*.
744 To specify an additional `LocalDatabase` for a different collection, you will have to call:
746 >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
749 local_db = LocalDatabase(filename, directory)
750 self.database_chain.append(local_db)
751 if self.default_collection_name
in self.collections
and apply_to_default_collection:
752 self.collections[self.default_collection_name].use_local_database(filename, directory)
754 def _get_default_collection_attribute(self, attr):
755 if self.default_collection_name
in self.collections:
756 return getattr(self.collections[self.default_collection_name], attr)
758 B2WARNING(f
"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
759 "but the default collection doesn't exist."
760 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
761 "collection's attributes directly.")
764 def _set_default_collection_attribute(self, attr, value):
765 if self.default_collection_name
in self.collections:
766 setattr(self.collections[self.default_collection_name], attr, value)
768 B2WARNING(f
"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
769 "but the default collection doesn't exist."
770 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
771 "collection's attributes directly.")
777 return self._get_default_collection_attribute(
"collector")
785 from basf2
import Module
786 if isinstance(collector, str):
787 from basf2
import register_module
788 collector = register_module(collector)
789 if not isinstance(collector, Module):
790 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
792 self._set_default_collection_attribute(
"collector", collector)
795 def input_files(self):
798 return self._get_default_collection_attribute(
"input_files")
801 def input_files(self, files):
804 self._set_default_collection_attribute(
"input_files", files)
807 def files_to_iovs(self):
810 return self._get_default_collection_attribute(
"files_to_iovs")
812 @files_to_iovs.setter
813 def files_to_iovs(self, file_map):
816 self._set_default_collection_attribute(
"files_to_iovs", file_map)
819 def pre_collector_path(self):
822 return self._get_default_collection_attribute(
"pre_collector_path")
824 @pre_collector_path.setter
825 def pre_collector_path(self, path):
828 self._set_default_collection_attribute(
"pre_collector_path", path)
831 def output_patterns(self):
834 return self._get_default_collection_attribute(
"output_patterns")
836 @output_patterns.setter
837 def output_patterns(self, patterns):
840 self._set_default_collection_attribute(
"output_patterns", patterns)
843 def max_files_per_collector_job(self):
846 return self._get_default_collection_attribute(
"max_files_per_collector_job")
848 @max_files_per_collector_job.setter
849 def max_files_per_collector_job(self, max_files):
852 self._set_default_collection_attribute(
"max_files_per_collector_job", max_files)
855 def max_collector_jobs(self):
858 return self._get_default_collection_attribute(
"max_collector_jobs")
860 @max_collector_jobs.setter
861 def max_collector_jobs(self, max_jobs):
864 self._set_default_collection_attribute(
"max_collector_jobs", max_jobs)
867 def backend_args(self):
870 return self._get_default_collection_attribute(
"backend_args")
873 def backend_args(self, args):
876 self._set_default_collection_attribute(
"backend_args", args)
879 def algorithms(self):
882 return self._algorithms
886 def algorithms(self, value):
889 from ROOT.Belle2
import CalibrationAlgorithm
890 if isinstance(value, CalibrationAlgorithm):
891 self._algorithms = [Algorithm(value)]
893 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
894 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
896 @algorithms.fset.register(tuple)
897 @algorithms.fset.register(list)
900 Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
902 from ROOT.Belle2
import CalibrationAlgorithm
904 self._algorithms = []
906 if isinstance(alg, CalibrationAlgorithm):
907 self._algorithms.append(Algorithm(alg))
909 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in {type(value)}."
910 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
913 def pre_algorithms(self):
915 Callback run prior to each algorithm iteration.
917 return [alg.pre_algorithm
for alg
in self.algorithms]
919 @pre_algorithms.setter
921 def pre_algorithms(self, func):
925 for alg
in self.algorithms:
926 alg.pre_algorithm = func
928 B2ERROR(
"Something evaluated as False passed in as pre_algorithm function.")
930 @pre_algorithms.fset.register(tuple)
931 @pre_algorithms.fset.register(list)
934 Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
937 if len(values) == len(self.algorithms):
938 for func, alg
in zip(values, self.algorithms):
939 alg.pre_algorithm = func
941 B2ERROR(
"Number of functions and number of algorithms doesn't match.")
943 B2ERROR(
"Empty container passed in for pre_algorithm functions")
948 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
950 return [alg.strategy
for alg
in self.algorithms]
958 for alg
in self.algorithms:
959 alg.strategy = strategy
961 B2ERROR(
"Something evaluated as False passed in as a strategy.")
963 @strategies.fset.register(tuple)
964 @strategies.fset.register(list)
967 Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
970 if len(values) == len(self.algorithms):
971 for strategy, alg
in zip(strategies, self.algorithms):
972 alg.strategy = strategy
974 B2ERROR(
"Number of strategies and number of algorithms doesn't match.")
976 B2ERROR(
"Empty container passed in for strategies list")
985 Main logic of the Calibration object.
986 Will be run in a new Thread by calling the start() method.
988 with CAFDB(self._db_path, read_only=
True)
as db:
989 initial_state = db.get_calibration_value(self.name,
"checkpoint")
990 initial_iteration = db.get_calibration_value(self.name,
"iteration")
991 B2INFO(
"Initial status of {} found to be state={}, iteration={}".format(self.name,
994 self.machine = CalibrationMachine(self,
995 iov_to_calibrate=self.iov,
996 initial_state=initial_state,
997 iteration=initial_iteration)
998 self.state = initial_state
999 self.machine.root_dir = Path(os.getcwd(), self.name)
1000 self.machine.collector_backend = self.backend
1004 all_iteration_paths = find_int_dirs(self.machine.root_dir)
1005 for iteration_path
in all_iteration_paths:
1006 if int(iteration_path.name) > initial_iteration:
1007 shutil.rmtree(iteration_path)
1009 while self.state != self.end_state
and self.state != self.fail_state:
1010 if self.state ==
"init":
1012 B2INFO(f
"Attempting collector submission for calibration {self.name}.")
1014 except Exception
as err:
1017 self._poll_collector()
1020 if self.state ==
"collector_failed":
1021 self.machine.fail_fully()
1028 B2INFO(f
"Attempting to run algorithms for calibration {self.name}.")
1029 self.machine.run_algorithms()
1030 except MachineError
as err:
1035 if self.machine.state ==
"algorithms_failed":
1036 self.machine.fail_fully()
1039 def _poll_collector(self):
1042 while self.state ==
"running_collector":
1044 self.machine.complete()
1046 except ConditionError:
1048 B2DEBUG(29, f
"Checking if collector jobs for calibration {self.name} have failed.")
1050 except ConditionError:
1052 sleep(self.heartbeat)
1057 The current major state of the calibration in the database file. The machine may have a different state.
1059 with CAFDB(self._db_path, read_only=
True)
as db:
1060 state = db.get_calibration_value(self.name,
"state")
1064 def state(self, state):
1067 B2DEBUG(29, f
"Setting {self.name} to state {state}.")
1068 with CAFDB(self._db_path)
as db:
1069 db.update_calibration_value(self.name,
"state", str(state))
1070 if state
in self.checkpoint_states:
1071 db.update_calibration_value(self.name,
"checkpoint", str(state))
1072 B2DEBUG(29, f
"{self.name} set to {state}.")
1075 def iteration(self):
1077 Retrieves the current iteration number in the database file.
1080 int: The current iteration number
1082 with CAFDB(self._db_path, read_only=
True)
as db:
1083 iteration = db.get_calibration_value(self.name,
"iteration")
1087 def iteration(self, iteration):
1090 B2DEBUG(29, f
"Setting {self.name} to {iteration}.")
1091 with CAFDB(self._db_path)
as db:
1092 db.update_calibration_value(self.name,
"iteration", iteration)
1093 B2DEBUG(29, f
"{self.name} set to {self.iteration}.")
1099 algorithm: The CalibrationAlgorithm instance that we want to execute.
1101 data_input (types.FunctionType): An optional function that sets the input files of the algorithm.
1102 pre_algorithm (types.FunctionType): An optional function that runs just prior to execution of the algorithm.
1103 Useful for set up e.g. module initialisation
1105 This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1106 It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1107 than separating the logic into those classes directly.
1109 This is **not** currently a class that a user should interact with much during `CAF`
1110 setup (unless you're doing something advanced).
1111 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1113 Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1114 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1115 is often useful i.e. by calling for the Geometry module to initialise.
1118 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1122 self.algorithm = algorithm
1124 cppname = type(algorithm).__cpp_name__
1125 self.name = cppname[cppname.rfind(
'::') + 2:]
1129 self.data_input = data_input
1130 if not self.data_input:
1131 self.data_input = self.default_inputdata_setup
1135 self.pre_algorithm = pre_algorithm
1138 self.strategy = strategies.SingleIOV
1146 def default_inputdata_setup(self, input_file_paths):
1148 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1149 by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1150 for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1152 collector_output_files = list(
filter(
lambda file_path:
"CollectorOutput.root" == Path(file_path).name,
1154 info_lines = [f
"Input files used in {self.name}:"]
1155 info_lines.extend(collector_output_files)
1156 B2INFO_MULTILINE(info_lines)
1157 self.algorithm.setInputFileNames(collector_output_files)
1163 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1165 >>> calibration_defaults={"max_iterations":2}
1167 This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1168 for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1170 The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1171 `CalibrationBase.start` when the dependencies are met.
1173 Much of the checking for consistency is done in this class so that no processing is done with an invalid
1174 setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1175 `CalibrationBase` instances.
1179 _db_name =
"caf_state.db"
1181 default_calibration_config = {
1182 "max_iterations": 5,
1186 def __init__(self, calibration_defaults=None):
1190 self.calibrations = {}
1193 self.future_dependencies = {}
1196 self.dependencies = {}
1198 self.output_dir =
"calibration_results"
1202 self._backend =
None
1206 if not calibration_defaults:
1207 calibration_defaults = {}
1210 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1212 self._db_path =
None
1214 def add_calibration(self, calibration):
1216 Adds a `Calibration` that is to be used in this program to the list.
1217 Also adds an empty dependency list to the overall dictionary.
1218 You should not directly alter a `Calibration` object after it has been
1221 if calibration.is_valid():
1222 if calibration.name
not in self.calibrations:
1223 self.calibrations[calibration.name] = calibration
1225 B2WARNING(f
"Tried to add a calibration with the name {calibration.name} twice.")
1227 B2WARNING(f
"Tried to add incomplete/invalid calibration ({calibration.name}) to the framwork."
1228 "It was not added and will not be part of the final process.")
1230 def _remove_missing_dependencies(self):
1232 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1233 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1236 calibration_names = [calibration.name
for calibration
in self.calibrations.values()]
1238 def is_dependency_in_caf(dependency):
1240 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1242 dependency_in_caf = dependency.name
in calibration_names
1243 if not dependency_in_caf:
1244 B2WARNING(f
"The calibration {dependency.name} is a required dependency but is not in the CAF."
1245 " It has been removed as a dependency.")
1246 return dependency_in_caf
1250 for calibration
in self.calibrations.values():
1251 filtered_future_dependencies = list(
filter(is_dependency_in_caf, calibration.future_dependencies))
1252 calibration.future_dependencies = filtered_future_dependencies
1254 filtered_dependencies = list(
filter(is_dependency_in_caf, calibration.dependencies))
1255 calibration.dependencies = filtered_dependencies
1257 def _order_calibrations(self):
1259 - Uses dependency atrributes of calibrations to create a dependency dictionary and passes it
1260 to a sorting algorithm.
1261 - Returns valid OrderedDict if sort was succesful, empty one if it failed (most likely a cyclic dependency)
1264 self._remove_missing_dependencies()
1267 for calibration
in self.calibrations.values():
1268 future_dependencies_names = [dependency.name
for dependency
in calibration.future_dependencies]
1269 past_dependencies_names = [dependency.name
for dependency
in calibration.dependencies]
1271 self.future_dependencies[calibration.name] = future_dependencies_names
1272 self.dependencies[calibration.name] = past_dependencies_names
1274 order = topological_sort(self.future_dependencies)
1279 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1282 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1284 for calibration
in self.calibrations.values():
1285 full_deps = full_past_dependencies[calibration.name]
1286 explicit_deps = [cal.name
for cal
in calibration.dependencies]
1287 for dep
in full_deps:
1288 if dep
not in explicit_deps:
1289 calibration.dependencies.append(self.calibrations[dep])
1292 ordered_dependency_list = []
1293 for ordered_calibration_name
in order:
1294 if ordered_calibration_name
in [dep.name
for dep
in calibration.dependencies]:
1295 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1296 calibration.dependencies = ordered_dependency_list
1297 order = ordered_full_dependencies
1301 def _check_backend(self):
1303 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1304 one that is stored isn't a valid Backend object) we should create a default Local backend.
1306 if not isinstance(self._backend, caf.backends.Backend):
1308 self.backend = caf.backends.Local()
1310 def _prune_invalid_collections(self):
1312 Checks all current calibrations and removes any invalid Collections from their collections list.
1314 B2INFO(
"Checking for any invalid Collections in Calibrations.")
1315 for calibration
in self.calibrations.values():
1316 valid_collections = {}
1317 for name, collection
in calibration.collections.items():
1318 if collection.is_valid():
1319 valid_collections[name] = collection
1321 B2WARNING(f
"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1322 calibration.collections = valid_collections
1324 def run(self, iov=None):
1327 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1328 this IoV will be used in the collection step.
1330 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1331 and creates database payloads.
1333 Upload of final databases is not done here. This simply creates the local databases in
1334 the output directory. You should check the validity of your new local database before uploading
1335 to the conditions DB via the basf2 tools/interface to the DB.
1337 if not self.calibrations:
1338 B2FATAL(
"There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1340 order = self._order_calibrations()
1342 B2FATAL(
"Couldn't order the calibrations properly. Could be a cyclic dependency.")
1345 self._check_backend()
1347 self._prune_invalid_collections()
1350 self.output_dir = self._make_output_dir()
1353 self._make_database()
1356 with temporary_workdir(self.output_dir):
1357 db = CAFDB(self._db_path)
1359 db_initial_calibrations = db.query(
"select * from calibrations").fetchall()
1360 for calibration
in self.calibrations.values():
1362 calibration._apply_calibration_defaults(self.calibration_defaults)
1363 calibration._db_path = self._db_path
1364 calibration.output_database_dir = Path(self.output_dir, calibration.name,
"outputdb").as_posix()
1365 calibration.iov = iov
1366 if not calibration.backend:
1367 calibration.backend = self.backend
1369 if calibration.name
not in [db_cal[0]
for db_cal
in db_initial_calibrations]:
1370 db.insert_calibration(calibration.name)
1373 for cal_info
in db_initial_calibrations:
1374 if cal_info[0] == calibration.name:
1375 cal_initial_state = cal_info[2]
1376 cal_initial_iteration = cal_info[3]
1377 B2INFO(f
"Previous entry in database found for {calibration.name}.")
1378 B2INFO(f
"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1379 calibration.state = cal_initial_state
1380 B2INFO(f
"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1381 calibration.iteration = cal_initial_iteration
1383 calibration.daemon =
True
1390 keep_running =
False
1392 remaining_calibrations = []
1394 for calibration
in self.calibrations.values():
1396 if (calibration.state == CalibrationBase.end_state
or calibration.state == CalibrationBase.fail_state):
1398 if calibration.is_alive():
1399 B2DEBUG(29, f
"Joining {calibration.name}.")
1402 if calibration.dependencies_met():
1403 if not calibration.is_alive():
1404 B2DEBUG(29, f
"Starting {calibration.name}.")
1407 except RuntimeError:
1410 B2DEBUG(29, f
"{calibration.name} probably just finished, join it later.")
1411 remaining_calibrations.append(calibration)
1413 if not calibration.failed_dependencies():
1414 remaining_calibrations.append(calibration)
1415 if remaining_calibrations:
1420 for calibration
in remaining_calibrations:
1421 for job
in calibration.jobs_to_submit[:]:
1422 calibration.backend.submit(job)
1423 calibration.jobs_to_submit.remove(job)
1424 sleep(self.heartbeat)
1426 B2INFO(
"Printing summary of final CAF status.")
1427 with CAFDB(self._db_path, read_only=
True)
as db:
1428 print(db.output_calibration_table())
1433 The `backend <backends.Backend>` that runs the collector job.
1434 When set, this is checked that a `backends.Backend` class instance was passed in.
1436 return self._backend
1439 def backend(self, backend):
1442 if isinstance(backend, caf.backends.Backend):
1443 self._backend = backend
1445 B2ERROR(
'Backend property must inherit from Backend class.')
1447 def _make_output_dir(self):
1449 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1452 str: The absolute path of the new output_dir
1454 p = Path(self.output_dir).resolve()
1456 B2INFO(f
"{p.as_posix()} output directory already exists. "
1457 "We will try to restart from the previous finishing state.")
1460 p.mkdir(parents=
True)
1464 raise FileNotFoundError(f
"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1466 def _make_database(self):
1468 Creates the CAF status database. If it already exists we don't overwrite it.
1470 self._db_path = Path(self.output_dir, self._db_name).absolute()
1471 if self._db_path.exists():
1472 B2INFO(f
"Previous CAF database found {self._db_path}")
1474 with CAFDB(self._db_path):
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved