15This module implements several objects/functions to configure and run calibrations.
16These classes are used to construct the workflow of the calibration job.
17The actual processing code is mostly in the `caf.state_machines` module.
20__all__ = [
"CalibrationBase",
"Calibration",
"Algorithm",
"CAF"]
23from threading
import Thread
25from pathlib
import Path
29from basf2
import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
30from basf2
import find_file
31from basf2
import conditions
as b2conditions
33from abc
import ABC, abstractmethod
36from caf.utils
import B2INFO_MULTILINE
37from caf.utils
import past_from_future_dependencies
38from caf.utils
import topological_sort
39from caf.utils
import all_dependencies
40from caf.utils
import method_dispatch
41from caf.utils
import temporary_workdir
42from caf.utils
import find_int_dirs
43from caf.utils
import LocalDatabase
44from caf.utils
import CentralDatabase
45from caf.utils
import parse_file_uri
47import caf.strategies
as strategies
48import caf.runners
as runners
49from caf.backends
import MaxSubjobsSplitter, MaxFilesSplitter
50from caf.state_machines
import CalibrationMachine, ConditionError, MachineError
51from caf.database
import CAFDB
57 collector (str, basf2.Module): The collector module or module name
for this `Collection`.
58 input_files (list[str]): The input files to be used
for only this `Collection`.
59 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
60 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially
for this `Collection`.
61 output_patterns (list[str]): Output patterns of files produced by collector which will be used to
pass to the
62 `Algorithm.data_input` function. Setting this here, replaces the default completely.
63 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob
for this `Collection`.
64 Technically this sets the SubjobSplitter to be used,
not compatible
with max_collector_jobs.
65 max_collector_jobs (int): Maximum number of collector subjobs
for this `Collection`.
66 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible
with
67 max_files_for_collector_job.
68 backend_args (dict): The args
for the backend submission of this `Collection`.
72 default_max_collector_jobs = 1000
75 job_config = "collector_job.json"
80 pre_collector_path=None,
83 max_files_per_collector_job=None,
84 max_collector_jobs=None,
88 self.collector = collector
92 self.input_files = input_files
99 self.files_to_iovs = {}
104 self.pre_collector_path =
None
105 if pre_collector_path:
106 self.pre_collector_path = pre_collector_path
111 self.output_patterns = [
"CollectorOutput.root"]
113 self.output_patterns = output_patterns
118 if max_files_per_collector_job
and max_collector_jobs:
119 B2FATAL(
"Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
120 elif max_files_per_collector_job:
121 self.max_files_per_collector_job = max_files_per_collector_job
122 elif max_collector_jobs:
123 self.max_collector_jobs = max_collector_jobs
125 self.max_collector_jobs = self.default_max_collector_jobs
129 self.backend_args = {}
131 self.backend_args = backend_args
137 self.database_chain = database_chain
139 self.database_chain = []
143 for tag
in reversed(b2conditions.default_globaltags):
144 self.use_central_database(tag)
146 self.job_script = Path(find_file(
"calibration/scripts/caf/run_collector_path.py")).absolute()
147 """The basf2 steering file that will be used for Collector jobs run by this collection.
148This script will be copied into subjob directories as part of the input sandbox."""
151 self.job_cmd = [
"basf2", self.job_script.name,
"--job-information job_info.json"]
153 def reset_database(self):
155 Remove everything in the database_chain of this Calibration, including the default central database
156 tag automatically included
from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
158 self.database_chain = []
160 def use_central_database(self, global_tag):
163 global_tag (str): The central database global tag to use
for this calibration.
165 Using this allows you to add a central database to the head of the
global tag database chain
for this collection.
166 The default database chain
is just the central one
from
167 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
168 The input file
global tag will always be overridden
and never used unless explicitly set.
170 To turn off central database completely
or use a custom tag
as the base, you should call `Calibration.reset_database`
171 and start adding databases
with `Calibration.use_local_database`
and `Calibration.use_central_database`.
173 Alternatively you could set an empty list
as the input database_chain when adding the Collection to the Calibration.
175 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
176 All local database files MUST now be at the head of the
'chain',
with all central database
global tags
in their own
177 list which will be checked after all local database files have been checked.
179 So even
if you ask
for ``[
"global_tag1",
"localdb/database.txt",
"global_tag2"]`` to be the database chain, the real order
180 that basf2 will use them
is ``[
"global_tag1",
"global_tag2",
"localdb/database.txt"]`` where the file
is checked first.
182 central_db = CentralDatabase(global_tag)
183 self.database_chain.append(central_db)
185 def use_local_database(self, filename, directory=""):
188 filename (str): The path to the database.txt of the local database
189 directory (str): The path to the payloads directory for this local database.
191 Append a local database to the chain
for this collection.
192 You can call this function multiple times
and each database will be added to the chain IN ORDER.
193 The databases are applied to this collection ONLY.
195 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
196 All local database files MUST now be at the head of the
'chain',
with all central database
global tags
in their own
197 list which will be checked after all local database files have been checked.
199 So even
if you ask
for [
"global_tag1",
"localdb/database.txt",
"global_tag2"] to be the database chain, the real order
200 that basf2 will use them
is [
"global_tag1",
"global_tag2",
"localdb/database.txt"] where the file
is checked first.
202 local_db = LocalDatabase(filename, directory)
203 self.database_chain.append(local_db)
206 def uri_list_from_input_file(input_file):
209 input_file (str): A local file/glob pattern or XROOTD URI
212 list: A list of the URIs found
from the initial string.
215 uri = parse_file_uri(input_file)
216 if uri.scheme ==
"file":
219 uris = [parse_file_uri(f).geturl()
for f
in glob(input_file)]
226 def input_files(self):
227 return self._input_files
230 def input_files(self, value):
231 if isinstance(value, str):
233 self._input_files = self.uri_list_from_input_file(value)
234 elif isinstance(value, list):
237 for pattern
in value:
238 total_files.extend(self.uri_list_from_input_file(pattern))
239 self._input_files = total_files
241 raise TypeError(
"Input files must be a list or string")
247 return self._collector
256 from basf2
import Module
257 if isinstance(collector, str):
258 from basf2
import register_module
259 collector = register_module(collector)
260 if not isinstance(collector, Module):
261 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
263 self._collector = collector
266 if (
not self.collector
or not self.input_files):
272 def max_collector_jobs(self):
274 return self.splitter.max_subjobs
278 @max_collector_jobs.setter
279 def max_collector_jobs(self, value):
283 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
286 def max_files_per_collector_job(self):
288 return self.splitter.max_files_per_subjob
292 @max_files_per_collector_job.setter
293 def max_files_per_collector_job(self, value):
297 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
300class CalibrationBase(ABC, Thread):
302 Abstract base class of
Calibration types. The CAF implements the :py:
class:`Calibration`
class which inherits from this
and runs the C++ CalibrationCollectorModule
and CalibrationAlgorithm classes. But by inheriting
from this
303 class and providing the minimal necessary methods/attributes you could plug
in your own Calibration types
304 that doesn
't depend on the C++ CAF at all and run everything in your own way.
306 .. warning:: Writing your own class inheriting from :py:
class:`CalibrationBase`
class is not recommended!
307 But it
's there if you really need it.
310 name (str): Name of this calibration object. Should be unique if you are going to run it.
313 input_files (list[str]): Input files
for this calibration. May contain wildcard expressions usable by `glob.glob`.
317 end_state = "completed"
320 fail_state =
"failed"
322 def __init__(self, name, input_files=None):
329 self.future_dependencies = []
331 self.dependencies = []
338 self.files_to_iovs = {}
341 self.input_files = input_files
343 self.input_files = []
348 self.output_database_dir =
""
351 self.save_payloads =
True
353 self.jobs_to_submit = []
358 The most important method. Runs inside a new Thread and is called
from `CalibrationBase.start`
359 once the dependencies of this `CalibrationBase` have returned
with state == end_state i.e.
"completed".
365 A simple method you should implement that will return True or False depending on whether
366 the Calibration has been set up correctly
and can be run safely.
369 def depends_on(self, calibration):
372 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
374 Adds dependency of this calibration on another i.e. This calibration
375 will not run until the dependency has completed,
and the constants produced
376 will be used via the database chain.
378 You can define multiple dependencies
for a single calibration simply
379 by calling this multiple times. Be careful when adding the calibration into
380 the `CAF`
not to add a circular/cyclic dependency. If you do the sort will
return an
381 empty order
and the `CAF` processing will fail.
383 This function appends to the `CalibrationBase.dependencies`
and `CalibrationBase.future_dependencies` attributes of this
384 `CalibrationBase`
and the input one respectively. This prevents us having to do too much recalculation later on.
387 if self.name != calibration.name:
389 if calibration
not in self.dependencies:
390 self.dependencies.append(calibration)
391 if self
not in calibration.dependencies:
392 calibration.future_dependencies.append(self)
394 B2WARNING(f
"Tried to add {calibration} as a dependency for {self} but they have the same name."
395 "Dependency was not added.")
397 def dependencies_met(self):
399 Checks if all of the Calibrations that this one depends on have reached a successful end state.
401 return all(map(
lambda x: x.state == x.end_state, self.dependencies))
403 def failed_dependencies(self):
405 Returns the list of calibrations in our dependency list that have failed.
408 for calibration
in self.dependencies:
409 if calibration.state == self.fail_state:
410 failed.append(calibration)
413 def _apply_calibration_defaults(self, defaults):
415 We pass in default calibration options
from the `CAF` instance here
if called.
416 Won
't overwrite any options already set. """
417 for key, value
in defaults.items():
419 if getattr(self, key)
is None:
420 setattr(self, key, value)
421 except AttributeError:
422 print(f
"The calibration {self.name} does not support the attribute {key}.")
427 Every Calibration object must have at least one collector at least one algorithm.
428 You have the option to add in your collector/algorithm by argument here,
or add them
429 later by changing the properties.
431 If you plan to use multiple `Collection` objects I recommend that you only set the name here
and add the Collections
432 separately via `add_collection()`.
435 name (str): Name of this calibration. It should be unique
for use
in the `CAF`
437 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule()
or a string
with the module name.
438 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use
for this `Calibration`.
439 input_files (str, list[str]): Input files
for use by this Calibration. May contain wildcards usable by `glob.glob`
441 A Calibration won
't be valid in the `CAF` until it has all of these four attributes set. For example:
444 >>> col1 = register_module(
'CaTest')
445 >>> cal.add_collection(
'TestColl', col1)
449 >>> cal =
Calibration(
'TestCalibration1',
'CaTest')
451 If you want to run a basf2 :py:
class:`path <basf2.Path>` before your collector module when running over data
453 >>> cal.pre_collector_path = my_basf2_path
455 You don
't have to put a RootInput module in this pre-collection path, but you can if
456 you need some special parameters. If you want to process sroot files the you have to explicitly add
457 SeqRootInput to your pre-collection path.
458 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
461 You can use optional arguments to
pass in some/all during initialisation of the `Calibration`
class
463 >>> cal =
Calibration(
'TestCalibration1',
'CaTest', [alg1,alg2], [
'/path/to/file.root'])
465 you can change the input file list later on, before running
with `CAF`
467 >>> cal.input_files = [
'path/to/*.root',
'other/path/to/file2.root']
469 If you have multiple collections
from calling `add_collection()` then you should instead set the pre_collector_path,
470 input_files, database chain etc
from there. See `Collection`.
472 Adding the CalibrationAlgorithm(s)
is easy
474 >>> alg1 = TestAlgo()
475 >>> cal.algorithms = alg1
479 >>> cal.algorithms = [alg1]
481 Or
for multiple algorithms
for one collector
483 >>> alg2 = TestAlgo()
484 >>> cal.algorithms = [alg1, alg2]
486 Note that when you set the algorithms, they are automatically wrapped
and stored
as a Python
class
487 `Algorithm`. To access the C++ algorithm clas underneath directly do:
489 >>> cal.algorithms[i].algorithm
491 If you have a setup function that you want to run before each of the algorithms, set that
with
493 >>> cal.pre_algorithms = my_function_object
495 If you want a different setup
for each algorithm use a list
with the same number of elements
496 as your algorithm list.
498 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
500 You can also specify the dependencies of the calibration on others
502 >>> cal.depends_on(cal2)
504 By doing this, the `CAF` will respect the ordering of the calibrations
and will
pass the
505 calibration constants created by earlier completed calibrations to dependent ones.
508 moves = ["submit_collector",
"complete",
"run_algorithms",
"iterate",
"fail_fully"]
510 alg_output_dir =
"algorithm_output"
512 checkpoint_states = [
"init",
"collector_completed",
"completed"]
514 default_collection_name =
"default"
521 pre_collector_path=None,
523 output_patterns=None,
524 max_files_per_collector_job=None,
525 max_collector_jobs=None,
531 self.collections = {}
533 self._algorithms = []
537 self.add_collection(self.default_collection_name,
543 max_files_per_collector_job,
548 super().__init__(name, input_files)
553 self.algorithms = algorithms
558 self.max_iterations =
None
563 self.ignored_runs =
None
568 self.strategies = strategies.SingleIOV
572 self.database_chain = database_chain
574 self.database_chain = []
576 for tag
in reversed(b2conditions.default_globaltags):
577 self.use_central_database(tag, apply_to_default_collection=
False)
581 self.algorithms_runner = runners.SeqAlgorithmsRunner
589 self.collector_full_update_interval = 30
597 def add_collection(self, name, collection):
600 name (str): Unique name of this `Collection` in the Calibration.
601 collection (`Collection`): `Collection` object to use.
603 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used
in the Calibration.
604 A default Collection
is automatically added but isn
't valid and won't run unless you have assigned a collector
606 You can ignore the default one
and only add your own custom Collections. You can configure the default
from the
607 Calibration(...) arguments
or after creating the Calibration object via directly setting the cal.collector, cal.input_files
610 if name
not in self.collections:
611 self.collections[name] = collection
613 B2WARNING(f
"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
614 "Please use another name.")
618 A full calibration consists of a collector AND an associated algorithm AND input_files.
621 1) We are missing any of the above.
622 2) There are multiple Collections
and the Collectors have mis-matched granularities.
623 3) Any of our Collectors have granularities that don
't match what our Strategy can use. """
624 if not self.algorithms:
625 B2WARNING(f
"Empty algorithm list for {self.name}.")
628 if not any([collection.is_valid()
for collection
in self.collections.values()]):
629 B2WARNING(f
"No valid Collections for {self.name}.")
633 for collection
in self.collections.values():
634 if collection.is_valid():
635 collector_params = collection.collector.available_params()
636 for param
in collector_params:
637 if param.name ==
"granularity":
638 granularities.append(param.values)
639 if len(set(granularities)) > 1:
640 B2WARNING(
"Multiple different granularities set for the Collections in this Calibration.")
643 for alg
in self.algorithms:
644 alg_type = type(alg.algorithm).__name__
645 incorrect_gran = [granularity
not in alg.strategy.allowed_granularities
for granularity
in granularities]
646 if any(incorrect_gran):
647 B2WARNING(f
"Selected strategy for {alg_type} does not match a collector's granularity.")
651 def reset_database(self, apply_to_default_collection=True):
654 apply_to_default_collection (bool): Should we also reset the default collection?
656 Remove everything in the database_chain of this Calibration, including the default central database tag automatically
657 included
from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
658 database chain of any `Collection` other than the default one. You can prevent the default Collection
from having its chain
659 reset by setting
'apply_to_default_collection' to
False.
661 self.database_chain = []
662 if self.default_collection_name
in self.collections
and apply_to_default_collection:
663 self.collections[self.default_collection_name].reset_database()
665 def use_central_database(self, global_tag, apply_to_default_collection=True):
668 global_tag (str): The central database global tag to use
for this calibration.
671 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (
if it exists)
673 Using this allows you to append a central database to the database chain
for this calibration.
674 The default database chain
is just the central one
from
675 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
676 To turn off central database completely
or use a custom tag
as the base, you should call `Calibration.reset_database`
677 and start adding databases
with `Calibration.use_local_database`
and `Calibration.use_central_database`.
679 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (
if it exists),
680 and the algorithm processes. So calling:
682 >> cal.use_central_database(
"global_tag")
684 will modify the database chain used by all the algorithms assigned to this `Calibration`,
and modifies the database chain
687 >> cal.collections[
'default'].database_chain
691 >> cal.use_central_database(file_path, payload_dir,
False)
693 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
694 So
if you have multiple Collections
in this Calibration *their database chains are separate*.
695 To specify an additional `CentralDatabase`
for a different collection, you will have to call:
697 >> cal.collections[
'OtherCollection'].use_central_database(
"global_tag")
699 central_db = CentralDatabase(global_tag)
700 self.database_chain.append(central_db)
701 if self.default_collection_name
in self.collections
and apply_to_default_collection:
702 self.collections[self.default_collection_name].use_central_database(global_tag)
704 def use_local_database(self, filename, directory="", apply_to_default_collection=True):
707 filename (str): The path to the database.txt of the local database
710 directory (str): The path to the payloads directory for this local database.
711 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (
if it exists)
713 Append a local database to the chain
for this calibration.
714 You can call this function multiple times
and each database will be added to the chain IN ORDER.
715 The databases are applied to this calibration ONLY.
716 The Local
and Central databases applied via these functions are applied to the algorithm processes
and optionally
717 the default `Collection` job
as a database chain.
718 There are other databases applied to the processes later, checked by basf2
in this order:
720 1) Local Database
from previous iteration of this Calibration.
721 2) Local Database chain
from output of previous dependent Calibrations.
722 3) This chain of Local
and Central databases where the last added
is checked first.
724 Note that this function on the `Calibration` object will only affect the default `Collection`
if it exists
and if
725 'apply_to_default_collection' remains
True. So calling:
727 >> cal.use_local_database(file_path, payload_dir)
729 will modify the database chain used by all the algorithms assigned to this `Calibration`,
and modifies the database chain
732 >> cal.collections[
'default'].database_chain
736 >> cal.use_local_database(file_path, payload_dir,
False)
738 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
740 If you have multiple Collections
in this Calibration *their database chains are separate*.
741 To specify an additional `LocalDatabase`
for a different collection, you will have to call:
743 >> cal.collections[
'OtherCollection'].use_local_database(file_path, payload_dir)
746 local_db = LocalDatabase(filename, directory)
747 self.database_chain.append(local_db)
748 if self.default_collection_name
in self.collections
and apply_to_default_collection:
749 self.collections[self.default_collection_name].use_local_database(filename, directory)
751 def _get_default_collection_attribute(self, attr):
752 if self.default_collection_name
in self.collections:
753 return getattr(self.collections[self.default_collection_name], attr)
755 B2WARNING(f
"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
756 "but the default collection doesn't exist."
757 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
758 "collection's attributes directly.")
761 def _set_default_collection_attribute(self, attr, value):
762 if self.default_collection_name
in self.collections:
763 setattr(self.collections[self.default_collection_name], attr, value)
765 B2WARNING(f
"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
766 "but the default collection doesn't exist."
767 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
768 "collection's attributes directly.")
774 return self._get_default_collection_attribute(
"collector")
782 from basf2
import Module
783 if isinstance(collector, str):
784 from basf2
import register_module
785 collector = register_module(collector)
786 if not isinstance(collector, Module):
787 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
789 self._set_default_collection_attribute(
"collector", collector)
792 def input_files(self):
795 return self._get_default_collection_attribute(
"input_files")
798 def input_files(self, files):
801 self._set_default_collection_attribute("input_files", files)
804 def files_to_iovs(self):
807 return self._get_default_collection_attribute(
"files_to_iovs")
809 @files_to_iovs.setter
810 def files_to_iovs(self, file_map):
813 self._set_default_collection_attribute("files_to_iovs", file_map)
816 def pre_collector_path(self):
819 return self._get_default_collection_attribute(
"pre_collector_path")
821 @pre_collector_path.setter
822 def pre_collector_path(self, path):
825 self._set_default_collection_attribute("pre_collector_path", path)
828 def output_patterns(self):
831 return self._get_default_collection_attribute(
"output_patterns")
833 @output_patterns.setter
834 def output_patterns(self, patterns):
837 self._set_default_collection_attribute("output_patterns", patterns)
840 def max_files_per_collector_job(self):
843 return self._get_default_collection_attribute(
"max_files_per_collector_job")
845 @max_files_per_collector_job.setter
846 def max_files_per_collector_job(self, max_files):
849 self._set_default_collection_attribute("max_files_per_collector_job", max_files)
852 def max_collector_jobs(self):
855 return self._get_default_collection_attribute(
"max_collector_jobs")
857 @max_collector_jobs.setter
858 def max_collector_jobs(self, max_jobs):
861 self._set_default_collection_attribute("max_collector_jobs", max_jobs)
864 def backend_args(self):
867 return self._get_default_collection_attribute(
"backend_args")
870 def backend_args(self, args):
873 self._set_default_collection_attribute("backend_args", args)
876 def algorithms(self):
879 return self._algorithms
883 def algorithms(self, value):
886 from ROOT
import Belle2
887 from ROOT.Belle2
import CalibrationAlgorithm
888 if isinstance(value, CalibrationAlgorithm):
889 self._algorithms = [Algorithm(value)]
891 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
892 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
894 @algorithms.fset.register(tuple)
895 @algorithms.fset.register(list)
898 Alternate algorithms setter for lists
and tuples of CalibrationAlgorithms.
900 from ROOT
import Belle2
901 from ROOT.Belle2
import CalibrationAlgorithm
903 self._algorithms = []
905 if isinstance(alg, CalibrationAlgorithm):
906 self._algorithms.append(Algorithm(alg))
908 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in {type(value)}."
909 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
912 def pre_algorithms(self):
914 Callback run prior to each algorithm iteration.
916 return [alg.pre_algorithm
for alg
in self.algorithms]
918 @pre_algorithms.setter
920 def pre_algorithms(self, func):
924 for alg
in self.algorithms:
925 alg.pre_algorithm = func
927 B2ERROR(
"Something evaluated as False passed in as pre_algorithm function.")
929 @pre_algorithms.fset.register(tuple)
930 @pre_algorithms.fset.register(list)
933 Alternate pre_algorithms setter for lists
and tuples of functions, should be one per algorithm.
936 if len(values) == len(self.algorithms):
937 for func, alg
in zip(values, self.algorithms):
938 alg.pre_algorithm = func
940 B2ERROR(
"Number of functions and number of algorithms doesn't match.")
942 B2ERROR(
"Empty container passed in for pre_algorithm functions")
947 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
949 return [alg.strategy
for alg
in self.algorithms]
957 for alg
in self.algorithms:
958 alg.strategy = strategy
960 B2ERROR(
"Something evaluated as False passed in as a strategy.")
962 @strategies.fset.register(tuple)
963 @strategies.fset.register(list)
966 Alternate strategies setter for lists
and tuples of functions, should be one per algorithm.
969 if len(values) == len(self.algorithms):
970 for strategy, alg
in zip(strategies, self.algorithms):
971 alg.strategy = strategy
973 B2ERROR(
"Number of strategies and number of algorithms doesn't match.")
975 B2ERROR(
"Empty container passed in for strategies list")
984 Main logic of the Calibration object.
985 Will be run in a new Thread by calling the start() method.
987 with CAFDB(self._db_path, read_only=
True)
as db:
988 initial_state = db.get_calibration_value(self.name,
"checkpoint")
989 initial_iteration = db.get_calibration_value(self.name,
"iteration")
990 B2INFO(f
"Initial status of {self.name} found to be state={initial_state}, iteration={initial_iteration}")
991 self.machine = CalibrationMachine(self,
992 iov_to_calibrate=self.iov,
993 initial_state=initial_state,
994 iteration=initial_iteration)
995 self.state = initial_state
996 self.machine.root_dir = Path(os.getcwd(), self.name)
997 self.machine.collector_backend = self.backend
1001 all_iteration_paths = find_int_dirs(self.machine.root_dir)
1002 for iteration_path
in all_iteration_paths:
1003 if int(iteration_path.name) > initial_iteration:
1004 shutil.rmtree(iteration_path)
1006 while self.state != self.end_state
and self.state != self.fail_state:
1007 if self.state ==
"init":
1009 B2INFO(f
"Attempting collector submission for calibration {self.name}.")
1011 except Exception
as err:
1014 self._poll_collector()
1017 if self.state ==
"collector_failed":
1018 self.machine.fail_fully()
1025 B2INFO(f
"Attempting to run algorithms for calibration {self.name}.")
1026 self.machine.run_algorithms()
1027 except MachineError
as err:
1032 if self.machine.state ==
"algorithms_failed":
1033 self.machine.fail_fully()
1036 def _poll_collector(self):
1039 while self.state ==
"running_collector":
1041 self.machine.complete()
1043 except ConditionError:
1045 B2DEBUG(29, f
"Checking if collector jobs for calibration {self.name} have failed.")
1047 except ConditionError:
1049 sleep(self.heartbeat)
1054 The current major state of the calibration in the database file. The machine may have a different state.
1056 with CAFDB(self._db_path, read_only=
True)
as db:
1057 state = db.get_calibration_value(self.name,
"state")
1061 def state(self, state):
1064 B2DEBUG(29, f"Setting {self.name} to state {state}.")
1065 with CAFDB(self._db_path)
as db:
1066 db.update_calibration_value(self.name,
"state", str(state))
1067 if state
in self.checkpoint_states:
1068 db.update_calibration_value(self.name,
"checkpoint", str(state))
1069 B2DEBUG(29, f
"{self.name} set to {state}.")
1072 def iteration(self):
1074 Retrieves the current iteration number in the database file.
1077 int: The current iteration number
1079 with CAFDB(self._db_path, read_only=
True)
as db:
1080 iteration = db.get_calibration_value(self.name,
"iteration")
1084 def iteration(self, iteration):
1087 B2DEBUG(29, f"Setting {self.name} to {iteration}.")
1088 with CAFDB(self._db_path)
as db:
1089 db.update_calibration_value(self.name,
"iteration", iteration)
1090 B2DEBUG(29, f
"{self.name} set to {self.iteration}.")
1096 algorithm: The CalibrationAlgorithm instance that we want to execute.
1098 data_input : An optional function that sets the input files of the algorithm.
1099 pre_algorithm : An optional function that runs just prior to execution of the algorithm.
1100 Useful for set up e.g. module initialisation
1102 This
is a simple wrapper
class around the C++ CalibrationAlgorithm
class.
1103 It helps to add functionality to algorithms
for use by the Calibration
and CAF classes rather
1104 than separating the logic into those classes directly.
1106 This
is **
not** currently a
class that a user should interact with much during `CAF`
1107 setup (unless you
're doing something advanced).
1108 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1110 Setting the `data_input` function might be necessary
if you have set the `Calibration.output_patterns`.
1111 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1112 is often useful i.e. by calling
for the Geometry module to initialise.
1115 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1119 self.algorithm = algorithm
1121 cppname = type(algorithm).__cpp_name__
1122 self.name = cppname[cppname.rfind('::') + 2:]
1126 self.data_input = data_input
1127 if not self.data_input:
1128 self.data_input = self.default_inputdata_setup
1132 self.pre_algorithm = pre_algorithm
1135 self.strategy = strategies.SingleIOV
1143 def default_inputdata_setup(self, input_file_paths):
1145 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1146 by default. This simply takes all files returned from the `Calibration.output_patterns`
and filters
1147 for only the CollectorOutput.root files. Then it sets them
as input files to the CalibrationAlgorithm
class.
1149 collector_output_files = list(filter(lambda file_path:
"CollectorOutput.root" == Path(file_path).name,
1151 info_lines = [f
"Input files used in {self.name}:"]
1152 info_lines.extend(collector_output_files)
1153 B2INFO_MULTILINE(info_lines)
1154 self.algorithm.setInputFileNames(collector_output_files)
1160 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1162 >>> calibration_defaults={
"max_iterations":2}
1164 This
class holds `Calibration` objects
and processes them. It defines the initial configuration/setup
1165 for the calibrations. But most of the real processing
is done through the `caf.state_machines.CalibrationMachine`.
1167 The `CAF`
class essentially does some initial setup, holds the `CalibrationBase` instances
and calls the
1168 `CalibrationBase.start` when the dependencies are met.
1170 Much of the checking
for consistency
is done
in this
class so that no processing is done with an invalid setup. Choosing which files to use
as input should be done
from outside during the setup of the `CAF`
and
1171 `CalibrationBase` instances.
1175 _db_name = "caf_state.db"
1177 default_calibration_config = {
1178 "max_iterations": 5,
1182 def __init__(self, calibration_defaults=None):
1186 self.calibrations = {}
1189 self.future_dependencies = {}
1192 self.dependencies = {}
1194 self.output_dir = "calibration_results"
1198 self._backend =
None
1202 if not calibration_defaults:
1203 calibration_defaults = {}
1206 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1208 self._db_path =
None
1210 def add_calibration(self, calibration):
1212 Adds a `Calibration` that is to be used
in this program to the list.
1213 Also adds an empty dependency list to the overall dictionary.
1214 You should
not directly alter a `Calibration` object after it has been
1217 if calibration.is_valid():
1218 if calibration.name
not in self.calibrations:
1219 self.calibrations[calibration.name] = calibration
1221 B2WARNING(f
"Tried to add a calibration with the name {calibration.name} twice.")
1223 B2WARNING(f
"Tried to add incomplete/invalid calibration ({calibration.name}) to the framework."
1224 "It was not added and will not be part of the final process.")
1226 def _remove_missing_dependencies(self):
1228 This checks the future and past dependencies of each `Calibration`
in the `CAF`.
1229 If any dependencies are
not known to the `CAF` then they are removed
from the `Calibration`
1232 calibration_names = [calibration.name for calibration
in self.calibrations.values()]
1234 def is_dependency_in_caf(dependency):
1236 Quick function to use with filter()
and check dependencies against calibrations known to `CAF`
1238 dependency_in_caf = dependency.name in calibration_names
1239 if not dependency_in_caf:
1240 B2WARNING(f
"The calibration {dependency.name} is a required dependency but is not in the CAF."
1241 " It has been removed as a dependency.")
1242 return dependency_in_caf
1246 for calibration
in self.calibrations.values():
1247 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1248 calibration.future_dependencies = filtered_future_dependencies
1250 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1251 calibration.dependencies = filtered_dependencies
1253 def _order_calibrations(self):
1255 - Uses dependency attributes of calibrations to create a dependency dictionary and passes it
1256 to a sorting algorithm.
1257 - Returns valid OrderedDict
if sort was successful, empty one
if it failed (most likely a cyclic dependency)
1260 self._remove_missing_dependencies()
1263 for calibration
in self.calibrations.values():
1264 future_dependencies_names = [dependency.name
for dependency
in calibration.future_dependencies]
1265 past_dependencies_names = [dependency.name
for dependency
in calibration.dependencies]
1267 self.future_dependencies[calibration.name] = future_dependencies_names
1268 self.dependencies[calibration.name] = past_dependencies_names
1270 order = topological_sort(self.future_dependencies)
1275 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1278 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1280 for calibration
in self.calibrations.values():
1281 full_deps = full_past_dependencies[calibration.name]
1282 explicit_deps = [cal.name
for cal
in calibration.dependencies]
1283 for dep
in full_deps:
1284 if dep
not in explicit_deps:
1285 calibration.dependencies.append(self.calibrations[dep])
1288 ordered_dependency_list = []
1289 for ordered_calibration_name
in order:
1290 if ordered_calibration_name
in [dep.name
for dep
in calibration.dependencies]:
1291 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1292 calibration.dependencies = ordered_dependency_list
1293 order = ordered_full_dependencies
1297 def _check_backend(self):
1299 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the one that is stored isn
't a valid Backend object) we should create a default Local backend.
1301 if not isinstance(self._backend, caf.backends.Backend):
1303 self.backend = caf.backends.Local()
1305 def _prune_invalid_collections(self):
1307 Checks all current calibrations and removes any invalid Collections
from their collections list.
1309 B2INFO("Checking for any invalid Collections in Calibrations.")
1310 for calibration
in self.calibrations.values():
1311 valid_collections = {}
1312 for name, collection
in calibration.collections.items():
1313 if collection.is_valid():
1314 valid_collections[name] = collection
1316 B2WARNING(f
"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1317 calibration.collections = valid_collections
1319 def run(self, iov=None):
1322 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1323 this IoV will be used
in the collection step.
1325 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1326 and creates database payloads.
1328 Upload of final databases
is not done here. This simply creates the local databases
in
1329 the output directory. You should check the validity of your new local database before uploading
1330 to the conditions DB via the basf2 tools/interface to the DB.
1332 if not self.calibrations:
1333 B2FATAL(
"There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1335 order = self._order_calibrations()
1337 B2FATAL(
"Couldn't order the calibrations properly. Could be a cyclic dependency.")
1340 self._check_backend()
1342 self._prune_invalid_collections()
1345 self.output_dir = self._make_output_dir()
1348 self._make_database()
1351 with temporary_workdir(self.output_dir):
1352 db = CAFDB(self._db_path)
1354 db_initial_calibrations = db.query(
"select * from calibrations").fetchall()
1355 for calibration
in self.calibrations.values():
1357 calibration._apply_calibration_defaults(self.calibration_defaults)
1358 calibration._db_path = self._db_path
1359 calibration.output_database_dir = Path(self.output_dir, calibration.name,
"outputdb").as_posix()
1360 calibration.iov = iov
1361 if not calibration.backend:
1362 calibration.backend = self.backend
1364 if calibration.name
not in [db_cal[0]
for db_cal
in db_initial_calibrations]:
1365 db.insert_calibration(calibration.name)
1368 for cal_info
in db_initial_calibrations:
1369 if cal_info[0] == calibration.name:
1370 cal_initial_state = cal_info[2]
1371 cal_initial_iteration = cal_info[3]
1372 B2INFO(f
"Previous entry in database found for {calibration.name}.")
1373 B2INFO(f
"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1374 calibration.state = cal_initial_state
1375 B2INFO(f
"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1376 calibration.iteration = cal_initial_iteration
1378 calibration.daemon =
True
1385 keep_running =
False
1387 remaining_calibrations = []
1389 for calibration
in self.calibrations.values():
1391 if (calibration.state == CalibrationBase.end_state
or calibration.state == CalibrationBase.fail_state):
1393 if calibration.is_alive():
1394 B2DEBUG(29, f
"Joining {calibration.name}.")
1397 if calibration.dependencies_met():
1398 if not calibration.is_alive():
1399 B2DEBUG(29, f
"Starting {calibration.name}.")
1402 except RuntimeError:
1405 B2DEBUG(29, f
"{calibration.name} probably just finished, join it later.")
1406 remaining_calibrations.append(calibration)
1408 if not calibration.failed_dependencies():
1409 remaining_calibrations.append(calibration)
1410 if remaining_calibrations:
1415 for calibration
in remaining_calibrations:
1416 for job
in calibration.jobs_to_submit[:]:
1417 calibration.backend.submit(job)
1418 calibration.jobs_to_submit.remove(job)
1419 sleep(self.heartbeat)
1421 B2INFO(
"Printing summary of final CAF status.")
1422 with CAFDB(self._db_path, read_only=
True)
as db:
1423 print(db.output_calibration_table())
1428 The `backend <backends.Backend>` that runs the collector job.
1429 When set, this is checked that a `backends.Backend`
class instance was passed in.
1431 return self._backend
1434 def backend(self, backend):
1437 if isinstance(backend, caf.backends.Backend):
1438 self._backend = backend
1440 B2ERROR(
'Backend property must inherit from Backend class.')
1442 def _make_output_dir(self):
1444 Creates the output directory. If it already exists we are now going to try and restart the program
from the last state.
1447 str: The absolute path of the new output_dir
1449 p = Path(self.output_dir).resolve()
1451 B2INFO(f
"{p.as_posix()} output directory already exists. "
1452 "We will try to restart from the previous finishing state.")
1455 p.mkdir(parents=
True)
1459 raise FileNotFoundError(f
"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1461 def _make_database(self):
1463 Creates the CAF status database. If it already exists we don't overwrite it. """
1464 self._db_path = Path(self.output_dir, self._db_name).absolute()
1465 if self._db_path.exists():
1466 B2INFO(f
"Previous CAF database found {self._db_path}")
1468 with CAFDB(self._db_path):