15This module implements several objects/functions to configure and run calibrations.
16These classes are used to construct the workflow of the calibration job.
17The actual processing code is mostly in the `caf.state_machines` module.
20__all__ = [
"CalibrationBase",
"Calibration",
"Algorithm",
"CAF"]
23from threading
import Thread
25from pathlib
import Path
29from basf2
import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
30from basf2
import find_file
31from basf2
import conditions
as b2conditions
33from abc
import ABC, abstractmethod
36from caf.utils
import B2INFO_MULTILINE
37from caf.utils
import past_from_future_dependencies
38from caf.utils
import topological_sort
39from caf.utils
import all_dependencies
40from caf.utils
import method_dispatch
41from caf.utils
import temporary_workdir
42from caf.utils
import find_int_dirs
43from caf.utils
import LocalDatabase
44from caf.utils
import CentralDatabase
45from caf.utils
import parse_file_uri
47import caf.strategies
as strategies
48import caf.runners
as runners
49from caf.backends
import MaxSubjobsSplitter, MaxFilesSplitter
50from caf.state_machines
import CalibrationMachine, ConditionError, MachineError
51from caf.database
import CAFDB
57 collector (str, basf2.Module): The collector module or module name
for this `Collection`.
58 input_files (list[str]): The input files to be used
for only this `Collection`.
59 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
60 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially
for this `Collection`.
61 output_patterns (list[str]): Output patterns of files produced by collector which will be used to
pass to the
62 `Algorithm.data_input` function. Setting this here, replaces the default completely.
63 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob
for this `Collection`.
64 Technically this sets the SubjobSplitter to be used,
not compatible
with max_collector_jobs.
65 max_collector_jobs (int): Maximum number of collector subjobs
for this `Collection`.
66 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible
with
67 max_files_for_collector_job.
68 backend_args (dict): The args
for the backend submission of this `Collection`.
72 default_max_collector_jobs = 1000
75 job_config = "collector_job.json"
80 pre_collector_path=None,
83 max_files_per_collector_job=None,
84 max_collector_jobs=None,
88 self.collector = collector
92 self.input_files = input_files
99 self.files_to_iovs = {}
104 self.pre_collector_path =
None
105 if pre_collector_path:
106 self.pre_collector_path = pre_collector_path
111 self.output_patterns = [
"CollectorOutput.root"]
113 self.output_patterns = output_patterns
118 if max_files_per_collector_job
and max_collector_jobs:
119 B2FATAL(
"Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
120 elif max_files_per_collector_job:
121 self.max_files_per_collector_job = max_files_per_collector_job
122 elif max_collector_jobs:
123 self.max_collector_jobs = max_collector_jobs
125 self.max_collector_jobs = self.default_max_collector_jobs
129 self.backend_args = {}
131 self.backend_args = backend_args
137 self.database_chain = database_chain
139 self.database_chain = []
143 for tag
in reversed(b2conditions.default_globaltags):
144 self.use_central_database(tag)
146 self.job_script = Path(find_file(
"calibration/scripts/caf/run_collector_path.py")).absolute()
147 """The basf2 steering file that will be used for Collector jobs run by this collection.
148This script will be copied into subjob directories as part of the input sandbox."""
151 self.job_cmd = [
"basf2", self.job_script.name,
"--job-information job_info.json"]
153 def reset_database(self):
155 Remove everything in the database_chain of this Calibration, including the default central database
156 tag automatically included
from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
158 self.database_chain = []
160 def use_central_database(self, global_tag):
163 global_tag (str): The central database global tag to use
for this calibration.
165 Using this allows you to add a central database to the head of the
global tag database chain
for this collection.
166 The default database chain
is just the central one
from
167 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
168 The input file
global tag will always be overrided
and never used unless explicitly set.
170 To turn off central database completely
or use a custom tag
as the base, you should call `Calibration.reset_database`
171 and start adding databases
with `Calibration.use_local_database`
and `Calibration.use_central_database`.
173 Alternatively you could set an empty list
as the input database_chain when adding the Collection to the Calibration.
175 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
176 All local database files MUST now be at the head of the
'chain',
with all central database
global tags
in their own
177 list which will be checked after all local database files have been checked.
179 So even
if you ask
for ``[
"global_tag1",
"localdb/database.txt",
"global_tag2"]`` to be the database chain, the real order
180 that basf2 will use them
is ``[
"global_tag1",
"global_tag2",
"localdb/database.txt"]`` where the file
is checked first.
182 central_db = CentralDatabase(global_tag)
183 self.database_chain.append(central_db)
185 def use_local_database(self, filename, directory=""):
188 filename (str): The path to the database.txt of the local database
189 directory (str): The path to the payloads directory for this local database.
191 Append a local database to the chain
for this collection.
192 You can call this function multiple times
and each database will be added to the chain IN ORDER.
193 The databases are applied to this collection ONLY.
195 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
196 All local database files MUST now be at the head of the
'chain',
with all central database
global tags
in their own
197 list which will be checked after all local database files have been checked.
199 So even
if you ask
for [
"global_tag1",
"localdb/database.txt",
"global_tag2"] to be the database chain, the real order
200 that basf2 will use them
is [
"global_tag1",
"global_tag2",
"localdb/database.txt"] where the file
is checked first.
202 local_db = LocalDatabase(filename, directory)
203 self.database_chain.append(local_db)
206 def uri_list_from_input_file(input_file):
209 input_file (str): A local file/glob pattern or XROOTD URI
212 list: A list of the URIs found
from the initial string.
215 uri = parse_file_uri(input_file)
216 if uri.scheme ==
"file":
219 uris = [parse_file_uri(f).geturl()
for f
in glob(input_file)]
226 def input_files(self):
227 return self._input_files
230 def input_files(self, value):
231 if isinstance(value, str):
233 self._input_files = self.uri_list_from_input_file(value)
234 elif isinstance(value, list):
237 for pattern
in value:
238 total_files.extend(self.uri_list_from_input_file(pattern))
239 self._input_files = total_files
241 raise TypeError(
"Input files must be a list or string")
247 return self._collector
256 from basf2
import Module
257 if isinstance(collector, str):
258 from basf2
import register_module
259 collector = register_module(collector)
260 if not isinstance(collector, Module):
261 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
263 self._collector = collector
266 if (
not self.collector
or not self.input_files):
272 def max_collector_jobs(self):
274 return self.splitter.max_subjobs
278 @max_collector_jobs.setter
279 def max_collector_jobs(self, value):
283 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
286 def max_files_per_collector_job(self):
288 return self.splitter.max_files_per_subjob
292 @max_files_per_collector_job.setter
293 def max_files_per_collector_job(self, value):
297 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
300class CalibrationBase(ABC, Thread):
302 Abstract base class of
Calibration types. The CAF implements the :py:
class:`Calibration`
class which inherits from this
and runs the C++ CalibrationCollectorModule
and CalibrationAlgorithm classes. But by inheriting
from this
303 class and providing the minimal necessary methods/attributes you could plug
in your own Calibration types
304 that doesn
't depend on the C++ CAF at all and run everything in your own way.
306 .. warning:: Writing your own class inheriting from :py:
class:`CalibrationBase`
class is not recommended!
307 But it
's there if you really need it.
310 name (str): Name of this calibration object. Should be unique if you are going to run it.
313 input_files (list[str]): Input files
for this calibration. May contain wildcard expressions useable by `glob.glob`.
317 end_state = "completed"
320 fail_state =
"failed"
322 def __init__(self, name, input_files=None):
329 self.future_dependencies = []
331 self.dependencies = []
338 self.files_to_iovs = {}
341 self.input_files = input_files
343 self.input_files = []
348 self.output_database_dir =
""
351 self.save_payloads =
True
353 self.jobs_to_submit = []
358 The most important method. Runs inside a new Thread and is called
from `CalibrationBase.start`
359 once the dependencies of this `CalibrationBase` have returned
with state == end_state i.e.
"completed".
365 A simple method you should implement that will return True or False depending on whether
366 the Calibration has been set up correctly
and can be run safely.
369 def depends_on(self, calibration):
372 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
374 Adds dependency of this calibration on another i.e. This calibration
375 will not run until the dependency has completed,
and the constants produced
376 will be used via the database chain.
378 You can define multiple dependencies
for a single calibration simply
379 by calling this multiple times. Be careful when adding the calibration into
380 the `CAF`
not to add a circular/cyclic dependency. If you do the sort will
return an
381 empty order
and the `CAF` processing will fail.
383 This function appens to the `CalibrationBase.dependencies`
and `CalibrationBase.future_dependencies` attributes of this
384 `CalibrationBase`
and the input one respectively. This prevents us having to do too much recalculation later on.
387 if self.name != calibration.name:
389 if calibration
not in self.dependencies:
390 self.dependencies.append(calibration)
391 if self
not in calibration.dependencies:
392 calibration.future_dependencies.append(self)
394 B2WARNING(f
"Tried to add {calibration} as a dependency for {self} but they have the same name."
395 "Dependency was not added.")
397 def dependencies_met(self):
399 Checks if all of the Calibrations that this one depends on have reached a successful end state.
401 return all(map(
lambda x: x.state == x.end_state, self.dependencies))
403 def failed_dependencies(self):
405 Returns the list of calibrations in our dependency list that have failed.
408 for calibration
in self.dependencies:
409 if calibration.state == self.fail_state:
410 failed.append(calibration)
413 def _apply_calibration_defaults(self, defaults):
415 We pass in default calibration options
from the `CAF` instance here
if called.
416 Won
't overwrite any options already set. """
417 for key, value
in defaults.items():
419 if getattr(self, key)
is None:
420 setattr(self, key, value)
421 except AttributeError:
422 print(f
"The calibration {self.name} does not support the attribute {key}.")
427 Every Calibration object must have at least one collector at least one algorithm.
428 You have the option to add in your collector/algorithm by argument here,
or add them
429 later by changing the properties.
431 If you plan to use multiple `Collection` objects I recommend that you only set the name here
and add the Collections
432 separately via `add_collection()`.
435 name (str): Name of this calibration. It should be unique
for use
in the `CAF`
437 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule()
or a string
with the module name.
438 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use
for this `Calibration`.
439 input_files (str, list[str]): Input files
for use by this Calibration. May contain wildcards useable by `glob.glob`
441 A Calibration won
't be valid in the `CAF` until it has all of these four attributes set. For example:
444 >>> col1 = register_module(
'CaTest')
445 >>> cal.add_collection(
'TestColl', col1)
449 >>> cal =
Calibration(
'TestCalibration1',
'CaTest')
451 If you want to run a basf2 :py:
class:`path <basf2.Path>` before your collector module when running over data
453 >>> cal.pre_collector_path = my_basf2_path
455 You don
't have to put a RootInput module in this pre-collection path, but you can if
456 you need some special parameters. If you want to process sroot files the you have to explicitly add
457 SeqRootInput to your pre-collection path.
458 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
461 You can use optional arguments to
pass in some/all during initialisation of the `Calibration`
class
463 >>> cal =
Calibration(
'TestCalibration1',
'CaTest', [alg1,alg2], [
'/path/to/file.root'])
465 you can change the input file list later on, before running
with `CAF`
467 >>> cal.input_files = [
'path/to/*.root',
'other/path/to/file2.root']
469 If you have multiple collections
from calling `add_collection()` then you should instead set the pre_collector_path,
470 input_files, database chain etc
from there. See `Collection`.
472 Adding the CalibrationAlgorithm(s)
is easy
474 >>> alg1 = TestAlgo()
475 >>> cal.algorithms = alg1
479 >>> cal.algorithms = [alg1]
481 Or
for multiple algorithms
for one collector
483 >>> alg2 = TestAlgo()
484 >>> cal.algorithms = [alg1, alg2]
486 Note that when you set the algorithms, they are automatically wrapped
and stored
as a Python
class
487 `Algorithm`. To access the C++ algorithm clas underneath directly do:
489 >>> cal.algorithms[i].algorithm
491 If you have a setup function that you want to run before each of the algorithms, set that
with
493 >>> cal.pre_algorithms = my_function_object
495 If you want a different setup
for each algorithm use a list
with the same number of elements
496 as your algorithm list.
498 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
500 You can also specify the dependencies of the calibration on others
502 >>> cal.depends_on(cal2)
504 By doing this, the `CAF` will respect the ordering of the calibrations
and will
pass the
505 calibration constants created by earlier completed calibrations to dependent ones.
508 moves = ["submit_collector",
"complete",
"run_algorithms",
"iterate",
"fail_fully"]
510 alg_output_dir =
"algorithm_output"
512 checkpoint_states = [
"init",
"collector_completed",
"completed"]
514 default_collection_name =
"default"
521 pre_collector_path=None,
523 output_patterns=None,
524 max_files_per_collector_job=None,
525 max_collector_jobs=None,
531 self.collections = {}
533 self._algorithms = []
537 self.add_collection(self.default_collection_name,
543 max_files_per_collector_job,
548 super().__init__(name, input_files)
553 self.algorithms = algorithms
558 self.max_iterations =
None
563 self.ignored_runs =
None
568 self.strategies = strategies.SingleIOV
572 self.database_chain = database_chain
574 self.database_chain = []
576 for tag
in reversed(b2conditions.default_globaltags):
577 self.use_central_database(tag, apply_to_default_collection=
False)
581 self.algorithms_runner = runners.SeqAlgorithmsRunner
589 self.collector_full_update_interval = 30
597 def add_collection(self, name, collection):
600 name (str): Unique name of this `Collection` in the Calibration.
601 collection (`Collection`): `Collection` object to use.
603 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used
in the Calibration.
604 A default Collection
is automatically added but isn
't valid and won't run unless you have assigned a collector
606 You can ignore the default one
and only add your own custom Collections. You can configure the default
from the
607 Calibration(...) arguments
or after creating the Calibration object via directly setting the cal.collector, cal.input_files
610 if name
not in self.collections:
611 self.collections[name] = collection
613 B2WARNING(f
"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
614 "Please use another name.")
618 A full calibration consists of a collector AND an associated algorithm AND input_files.
621 1) We are missing any of the above.
622 2) There are multiple Collections
and the Collectors have mis-matched granularities.
623 3) Any of our Collectors have granularities that don
't match what our Strategy can use. """
624 if not self.algorithms:
625 B2WARNING(f
"Empty algorithm list for {self.name}.")
628 if not any([collection.is_valid()
for collection
in self.collections.values()]):
629 B2WARNING(f
"No valid Collections for {self.name}.")
633 for collection
in self.collections.values():
634 if collection.is_valid():
635 collector_params = collection.collector.available_params()
636 for param
in collector_params:
637 if param.name ==
"granularity":
638 granularities.append(param.values)
639 if len(set(granularities)) > 1:
640 B2WARNING(
"Multiple different granularities set for the Collections in this Calibration.")
643 for alg
in self.algorithms:
644 alg_type = type(alg.algorithm).__name__
645 incorrect_gran = [granularity
not in alg.strategy.allowed_granularities
for granularity
in granularities]
646 if any(incorrect_gran):
647 B2WARNING(f
"Selected strategy for {alg_type} does not match a collector's granularity.")
651 def reset_database(self, apply_to_default_collection=True):
654 apply_to_default_collection (bool): Should we also reset the default collection?
656 Remove everything in the database_chain of this Calibration, including the default central database tag automatically
657 included
from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
658 database chain of any `Collection` other than the default one. You can prevent the default Collection
from having its chain
659 reset by setting
'apply_to_default_collection' to
False.
661 self.database_chain = []
662 if self.default_collection_name
in self.collections
and apply_to_default_collection:
663 self.collections[self.default_collection_name].reset_database()
665 def use_central_database(self, global_tag, apply_to_default_collection=True):
668 global_tag (str): The central database global tag to use
for this calibration.
671 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (
if it exists)
673 Using this allows you to append a central database to the database chain
for this calibration.
674 The default database chain
is just the central one
from
675 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
676 To turn off central database completely
or use a custom tag
as the base, you should call `Calibration.reset_database`
677 and start adding databases
with `Calibration.use_local_database`
and `Calibration.use_central_database`.
679 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (
if it exists),
680 and the algorithm processes. So calling:
682 >> cal.use_central_database(
"global_tag")
684 will modify the database chain used by all the algorithms assigned to this `Calibration`,
and modifies the database chain
687 >> cal.collections[
'default'].database_chain
691 >> cal.use_central_database(file_path, payload_dir,
False)
693 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
694 So
if you have multiple Collections
in this Calibration *their database chains are separate*.
695 To specify an additional `CentralDatabase`
for a different collection, you will have to call:
697 >> cal.collections[
'OtherCollection'].use_central_database(
"global_tag")
699 central_db = CentralDatabase(global_tag)
700 self.database_chain.append(central_db)
701 if self.default_collection_name
in self.collections
and apply_to_default_collection:
702 self.collections[self.default_collection_name].use_central_database(global_tag)
704 def use_local_database(self, filename, directory="", apply_to_default_collection=True):
707 filename (str): The path to the database.txt of the local database
710 directory (str): The path to the payloads directory for this local database.
711 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (
if it exists)
713 Append a local database to the chain
for this calibration.
714 You can call this function multiple times
and each database will be added to the chain IN ORDER.
715 The databases are applied to this calibration ONLY.
716 The Local
and Central databases applied via these functions are applied to the algorithm processes
and optionally
717 the default `Collection` job
as a database chain.
718 There are other databases applied to the processes later, checked by basf2
in this order:
720 1) Local Database
from previous iteration of this Calibration.
721 2) Local Database chain
from output of previous dependent Calibrations.
722 3) This chain of Local
and Central databases where the last added
is checked first.
724 Note that this function on the `Calibration` object will only affect the default `Collection`
if it exists
and if
725 'apply_to_default_collection' remains
True. So calling:
727 >> cal.use_local_database(file_path, payload_dir)
729 will modify the database chain used by all the algorithms assigned to this `Calibration`,
and modifies the database chain
732 >> cal.collections[
'default'].database_chain
736 >> cal.use_local_database(file_path, payload_dir,
False)
738 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
740 If you have multiple Collections
in this Calibration *their database chains are separate*.
741 To specify an additional `LocalDatabase`
for a different collection, you will have to call:
743 >> cal.collections[
'OtherCollection'].use_local_database(file_path, payload_dir)
746 local_db = LocalDatabase(filename, directory)
747 self.database_chain.append(local_db)
748 if self.default_collection_name
in self.collections
and apply_to_default_collection:
749 self.collections[self.default_collection_name].use_local_database(filename, directory)
751 def _get_default_collection_attribute(self, attr):
752 if self.default_collection_name
in self.collections:
753 return getattr(self.collections[self.default_collection_name], attr)
755 B2WARNING(f
"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
756 "but the default collection doesn't exist."
757 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
758 "collection's attributes directly.")
761 def _set_default_collection_attribute(self, attr, value):
762 if self.default_collection_name
in self.collections:
763 setattr(self.collections[self.default_collection_name], attr, value)
765 B2WARNING(f
"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
766 "but the default collection doesn't exist."
767 f
"You should use the cal.collections['CollectionName'].{attr} to access a custom "
768 "collection's attributes directly.")
774 return self._get_default_collection_attribute(
"collector")
782 from basf2
import Module
783 if isinstance(collector, str):
784 from basf2
import register_module
785 collector = register_module(collector)
786 if not isinstance(collector, Module):
787 B2ERROR(
"Collector needs to be either a Module or the name of such a module")
789 self._set_default_collection_attribute(
"collector", collector)
792 def input_files(self):
795 return self._get_default_collection_attribute(
"input_files")
798 def input_files(self, files):
801 self._set_default_collection_attribute("input_files", files)
804 def files_to_iovs(self):
807 return self._get_default_collection_attribute(
"files_to_iovs")
809 @files_to_iovs.setter
810 def files_to_iovs(self, file_map):
813 self._set_default_collection_attribute("files_to_iovs", file_map)
816 def pre_collector_path(self):
819 return self._get_default_collection_attribute(
"pre_collector_path")
821 @pre_collector_path.setter
822 def pre_collector_path(self, path):
825 self._set_default_collection_attribute("pre_collector_path", path)
828 def output_patterns(self):
831 return self._get_default_collection_attribute(
"output_patterns")
833 @output_patterns.setter
834 def output_patterns(self, patterns):
837 self._set_default_collection_attribute("output_patterns", patterns)
840 def max_files_per_collector_job(self):
843 return self._get_default_collection_attribute(
"max_files_per_collector_job")
845 @max_files_per_collector_job.setter
846 def max_files_per_collector_job(self, max_files):
849 self._set_default_collection_attribute("max_files_per_collector_job", max_files)
852 def max_collector_jobs(self):
855 return self._get_default_collection_attribute(
"max_collector_jobs")
857 @max_collector_jobs.setter
858 def max_collector_jobs(self, max_jobs):
861 self._set_default_collection_attribute("max_collector_jobs", max_jobs)
864 def backend_args(self):
867 return self._get_default_collection_attribute(
"backend_args")
870 def backend_args(self, args):
873 self._set_default_collection_attribute("backend_args", args)
876 def algorithms(self):
879 return self._algorithms
883 def algorithms(self, value):
886 from ROOT.Belle2
import CalibrationAlgorithm
887 if isinstance(value, CalibrationAlgorithm):
888 self._algorithms = [Algorithm(value)]
890 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
891 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
893 @algorithms.fset.register(tuple)
894 @algorithms.fset.register(list)
897 Alternate algorithms setter for lists
and tuples of CalibrationAlgorithms.
899 from ROOT.Belle2
import CalibrationAlgorithm
901 self._algorithms = []
903 if isinstance(alg, CalibrationAlgorithm):
904 self._algorithms.append(Algorithm(alg))
906 B2ERROR(f
"Something other than CalibrationAlgorithm instance passed in {type(value)}."
907 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
910 def pre_algorithms(self):
912 Callback run prior to each algorithm iteration.
914 return [alg.pre_algorithm
for alg
in self.algorithms]
916 @pre_algorithms.setter
918 def pre_algorithms(self, func):
922 for alg
in self.algorithms:
923 alg.pre_algorithm = func
925 B2ERROR(
"Something evaluated as False passed in as pre_algorithm function.")
927 @pre_algorithms.fset.register(tuple)
928 @pre_algorithms.fset.register(list)
931 Alternate pre_algorithms setter for lists
and tuples of functions, should be one per algorithm.
934 if len(values) == len(self.algorithms):
935 for func, alg
in zip(values, self.algorithms):
936 alg.pre_algorithm = func
938 B2ERROR(
"Number of functions and number of algorithms doesn't match.")
940 B2ERROR(
"Empty container passed in for pre_algorithm functions")
945 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
947 return [alg.strategy
for alg
in self.algorithms]
955 for alg
in self.algorithms:
956 alg.strategy = strategy
958 B2ERROR(
"Something evaluated as False passed in as a strategy.")
960 @strategies.fset.register(tuple)
961 @strategies.fset.register(list)
964 Alternate strategies setter for lists
and tuples of functions, should be one per algorithm.
967 if len(values) == len(self.algorithms):
968 for strategy, alg
in zip(strategies, self.algorithms):
969 alg.strategy = strategy
971 B2ERROR(
"Number of strategies and number of algorithms doesn't match.")
973 B2ERROR(
"Empty container passed in for strategies list")
982 Main logic of the Calibration object.
983 Will be run in a new Thread by calling the start() method.
985 with CAFDB(self._db_path, read_only=
True)
as db:
986 initial_state = db.get_calibration_value(self.name,
"checkpoint")
987 initial_iteration = db.get_calibration_value(self.name,
"iteration")
988 B2INFO(f
"Initial status of {self.name} found to be state={initial_state}, iteration={initial_iteration}")
989 self.machine = CalibrationMachine(self,
990 iov_to_calibrate=self.iov,
991 initial_state=initial_state,
992 iteration=initial_iteration)
993 self.state = initial_state
994 self.machine.root_dir = Path(os.getcwd(), self.name)
995 self.machine.collector_backend = self.backend
999 all_iteration_paths = find_int_dirs(self.machine.root_dir)
1000 for iteration_path
in all_iteration_paths:
1001 if int(iteration_path.name) > initial_iteration:
1002 shutil.rmtree(iteration_path)
1004 while self.state != self.end_state
and self.state != self.fail_state:
1005 if self.state ==
"init":
1007 B2INFO(f
"Attempting collector submission for calibration {self.name}.")
1009 except Exception
as err:
1012 self._poll_collector()
1015 if self.state ==
"collector_failed":
1016 self.machine.fail_fully()
1023 B2INFO(f
"Attempting to run algorithms for calibration {self.name}.")
1024 self.machine.run_algorithms()
1025 except MachineError
as err:
1030 if self.machine.state ==
"algorithms_failed":
1031 self.machine.fail_fully()
1034 def _poll_collector(self):
1037 while self.state ==
"running_collector":
1039 self.machine.complete()
1041 except ConditionError:
1043 B2DEBUG(29, f
"Checking if collector jobs for calibration {self.name} have failed.")
1045 except ConditionError:
1047 sleep(self.heartbeat)
1052 The current major state of the calibration in the database file. The machine may have a different state.
1054 with CAFDB(self._db_path, read_only=
True)
as db:
1055 state = db.get_calibration_value(self.name,
"state")
1059 def state(self, state):
1062 B2DEBUG(29, f"Setting {self.name} to state {state}.")
1063 with CAFDB(self._db_path)
as db:
1064 db.update_calibration_value(self.name,
"state", str(state))
1065 if state
in self.checkpoint_states:
1066 db.update_calibration_value(self.name,
"checkpoint", str(state))
1067 B2DEBUG(29, f
"{self.name} set to {state}.")
1070 def iteration(self):
1072 Retrieves the current iteration number in the database file.
1075 int: The current iteration number
1077 with CAFDB(self._db_path, read_only=
True)
as db:
1078 iteration = db.get_calibration_value(self.name,
"iteration")
1082 def iteration(self, iteration):
1085 B2DEBUG(29, f"Setting {self.name} to {iteration}.")
1086 with CAFDB(self._db_path)
as db:
1087 db.update_calibration_value(self.name,
"iteration", iteration)
1088 B2DEBUG(29, f
"{self.name} set to {self.iteration}.")
1094 algorithm: The CalibrationAlgorithm instance that we want to execute.
1096 data_input : An optional function that sets the input files of the algorithm.
1097 pre_algorithm : An optional function that runs just prior to execution of the algorithm.
1098 Useful for set up e.g. module initialisation
1100 This
is a simple wrapper
class around the C++ CalibrationAlgorithm
class.
1101 It helps to add functionality to algorithms
for use by the Calibration
and CAF classes rather
1102 than separating the logic into those classes directly.
1104 This
is **
not** currently a
class that a user should interact with much during `CAF`
1105 setup (unless you
're doing something advanced).
1106 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1108 Setting the `data_input` function might be necessary
if you have set the `Calibration.output_patterns`.
1109 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1110 is often useful i.e. by calling
for the Geometry module to initialise.
1113 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1117 self.algorithm = algorithm
1119 cppname = type(algorithm).__cpp_name__
1120 self.name = cppname[cppname.rfind('::') + 2:]
1124 self.data_input = data_input
1125 if not self.data_input:
1126 self.data_input = self.default_inputdata_setup
1130 self.pre_algorithm = pre_algorithm
1133 self.strategy = strategies.SingleIOV
1141 def default_inputdata_setup(self, input_file_paths):
1143 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1144 by default. This simply takes all files returned from the `Calibration.output_patterns`
and filters
1145 for only the CollectorOutput.root files. Then it sets them
as input files to the CalibrationAlgorithm
class.
1147 collector_output_files = list(filter(lambda file_path:
"CollectorOutput.root" == Path(file_path).name,
1149 info_lines = [f
"Input files used in {self.name}:"]
1150 info_lines.extend(collector_output_files)
1151 B2INFO_MULTILINE(info_lines)
1152 self.algorithm.setInputFileNames(collector_output_files)
1158 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1160 >>> calibration_defaults={
"max_iterations":2}
1162 This
class holds `Calibration` objects
and processes them. It defines the initial configuration/setup
1163 for the calibrations. But most of the real processing
is done through the `caf.state_machines.CalibrationMachine`.
1165 The `CAF`
class essentially does some initial setup, holds the `CalibrationBase` instances
and calls the
1166 `CalibrationBase.start` when the dependencies are met.
1168 Much of the checking
for consistency
is done
in this
class so that no processing is done with an invalid setup. Choosing which files to use
as input should be done
from outside during the setup of the `CAF`
and
1169 `CalibrationBase` instances.
1173 _db_name = "caf_state.db"
1175 default_calibration_config = {
1176 "max_iterations": 5,
1180 def __init__(self, calibration_defaults=None):
1184 self.calibrations = {}
1187 self.future_dependencies = {}
1190 self.dependencies = {}
1192 self.output_dir = "calibration_results"
1196 self._backend =
None
1200 if not calibration_defaults:
1201 calibration_defaults = {}
1204 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1206 self._db_path =
None
1208 def add_calibration(self, calibration):
1210 Adds a `Calibration` that is to be used
in this program to the list.
1211 Also adds an empty dependency list to the overall dictionary.
1212 You should
not directly alter a `Calibration` object after it has been
1215 if calibration.is_valid():
1216 if calibration.name
not in self.calibrations:
1217 self.calibrations[calibration.name] = calibration
1219 B2WARNING(f
"Tried to add a calibration with the name {calibration.name} twice.")
1221 B2WARNING(f
"Tried to add incomplete/invalid calibration ({calibration.name}) to the framwork."
1222 "It was not added and will not be part of the final process.")
1224 def _remove_missing_dependencies(self):
1226 This checks the future and past dependencies of each `Calibration`
in the `CAF`.
1227 If any dependencies are
not known to the `CAF` then they are removed
from the `Calibration`
1230 calibration_names = [calibration.name for calibration
in self.calibrations.values()]
1232 def is_dependency_in_caf(dependency):
1234 Quick function to use with filter()
and check dependencies against calibrations known to `CAF`
1236 dependency_in_caf = dependency.name in calibration_names
1237 if not dependency_in_caf:
1238 B2WARNING(f
"The calibration {dependency.name} is a required dependency but is not in the CAF."
1239 " It has been removed as a dependency.")
1240 return dependency_in_caf
1244 for calibration
in self.calibrations.values():
1245 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1246 calibration.future_dependencies = filtered_future_dependencies
1248 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1249 calibration.dependencies = filtered_dependencies
1251 def _order_calibrations(self):
1253 - Uses dependency atrributes of calibrations to create a dependency dictionary and passes it
1254 to a sorting algorithm.
1255 - Returns valid OrderedDict
if sort was succesful, empty one
if it failed (most likely a cyclic dependency)
1258 self._remove_missing_dependencies()
1261 for calibration
in self.calibrations.values():
1262 future_dependencies_names = [dependency.name
for dependency
in calibration.future_dependencies]
1263 past_dependencies_names = [dependency.name
for dependency
in calibration.dependencies]
1265 self.future_dependencies[calibration.name] = future_dependencies_names
1266 self.dependencies[calibration.name] = past_dependencies_names
1268 order = topological_sort(self.future_dependencies)
1273 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1276 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1278 for calibration
in self.calibrations.values():
1279 full_deps = full_past_dependencies[calibration.name]
1280 explicit_deps = [cal.name
for cal
in calibration.dependencies]
1281 for dep
in full_deps:
1282 if dep
not in explicit_deps:
1283 calibration.dependencies.append(self.calibrations[dep])
1286 ordered_dependency_list = []
1287 for ordered_calibration_name
in order:
1288 if ordered_calibration_name
in [dep.name
for dep
in calibration.dependencies]:
1289 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1290 calibration.dependencies = ordered_dependency_list
1291 order = ordered_full_dependencies
1295 def _check_backend(self):
1297 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the one that is stored isn
't a valid Backend object) we should create a default Local backend.
1299 if not isinstance(self._backend, caf.backends.Backend):
1301 self.backend = caf.backends.Local()
1303 def _prune_invalid_collections(self):
1305 Checks all current calibrations and removes any invalid Collections
from their collections list.
1307 B2INFO("Checking for any invalid Collections in Calibrations.")
1308 for calibration
in self.calibrations.values():
1309 valid_collections = {}
1310 for name, collection
in calibration.collections.items():
1311 if collection.is_valid():
1312 valid_collections[name] = collection
1314 B2WARNING(f
"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1315 calibration.collections = valid_collections
1317 def run(self, iov=None):
1320 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1321 this IoV will be used
in the collection step.
1323 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1324 and creates database payloads.
1326 Upload of final databases
is not done here. This simply creates the local databases
in
1327 the output directory. You should check the validity of your new local database before uploading
1328 to the conditions DB via the basf2 tools/interface to the DB.
1330 if not self.calibrations:
1331 B2FATAL(
"There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1333 order = self._order_calibrations()
1335 B2FATAL(
"Couldn't order the calibrations properly. Could be a cyclic dependency.")
1338 self._check_backend()
1340 self._prune_invalid_collections()
1343 self.output_dir = self._make_output_dir()
1346 self._make_database()
1349 with temporary_workdir(self.output_dir):
1350 db = CAFDB(self._db_path)
1352 db_initial_calibrations = db.query(
"select * from calibrations").fetchall()
1353 for calibration
in self.calibrations.values():
1355 calibration._apply_calibration_defaults(self.calibration_defaults)
1356 calibration._db_path = self._db_path
1357 calibration.output_database_dir = Path(self.output_dir, calibration.name,
"outputdb").as_posix()
1358 calibration.iov = iov
1359 if not calibration.backend:
1360 calibration.backend = self.backend
1362 if calibration.name
not in [db_cal[0]
for db_cal
in db_initial_calibrations]:
1363 db.insert_calibration(calibration.name)
1366 for cal_info
in db_initial_calibrations:
1367 if cal_info[0] == calibration.name:
1368 cal_initial_state = cal_info[2]
1369 cal_initial_iteration = cal_info[3]
1370 B2INFO(f
"Previous entry in database found for {calibration.name}.")
1371 B2INFO(f
"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1372 calibration.state = cal_initial_state
1373 B2INFO(f
"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1374 calibration.iteration = cal_initial_iteration
1376 calibration.daemon =
True
1383 keep_running =
False
1385 remaining_calibrations = []
1387 for calibration
in self.calibrations.values():
1389 if (calibration.state == CalibrationBase.end_state
or calibration.state == CalibrationBase.fail_state):
1391 if calibration.is_alive():
1392 B2DEBUG(29, f
"Joining {calibration.name}.")
1395 if calibration.dependencies_met():
1396 if not calibration.is_alive():
1397 B2DEBUG(29, f
"Starting {calibration.name}.")
1400 except RuntimeError:
1403 B2DEBUG(29, f
"{calibration.name} probably just finished, join it later.")
1404 remaining_calibrations.append(calibration)
1406 if not calibration.failed_dependencies():
1407 remaining_calibrations.append(calibration)
1408 if remaining_calibrations:
1413 for calibration
in remaining_calibrations:
1414 for job
in calibration.jobs_to_submit[:]:
1415 calibration.backend.submit(job)
1416 calibration.jobs_to_submit.remove(job)
1417 sleep(self.heartbeat)
1419 B2INFO(
"Printing summary of final CAF status.")
1420 with CAFDB(self._db_path, read_only=
True)
as db:
1421 print(db.output_calibration_table())
1426 The `backend <backends.Backend>` that runs the collector job.
1427 When set, this is checked that a `backends.Backend`
class instance was passed in.
1429 return self._backend
1432 def backend(self, backend):
1435 if isinstance(backend, caf.backends.Backend):
1436 self._backend = backend
1438 B2ERROR(
'Backend property must inherit from Backend class.')
1440 def _make_output_dir(self):
1442 Creates the output directory. If it already exists we are now going to try and restart the program
from the last state.
1445 str: The absolute path of the new output_dir
1447 p = Path(self.output_dir).resolve()
1449 B2INFO(f
"{p.as_posix()} output directory already exists. "
1450 "We will try to restart from the previous finishing state.")
1453 p.mkdir(parents=
True)
1457 raise FileNotFoundError(f
"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1459 def _make_database(self):
1461 Creates the CAF status database. If it already exists we don't overwrite it. """
1462 self._db_path = Path(self.output_dir, self._db_name).absolute()
1463 if self._db_path.exists():
1464 B2INFO(f
"Previous CAF database found {self._db_path}")
1466 with CAFDB(self._db_path):