11from abc
import ABC, abstractmethod
13from basf2
import B2DEBUG, B2ERROR, B2INFO
19 Abstract Base Class for Runner type object.
27class AlgorithmsRunner(Runner):
29 Base class for `AlgorithmsRunner` classes. Defines the necessary information that will be provided to every
30 `AlgorithmsRunner` used by the `framework.CAF`
32 An `AlgorithmsRunner` will be given a list of `framework.Algorithm` objects defined during the setup of a
33 `framework.Calibration` instance. The `AlgorithmsRunner` describes how to run each of the `strategies.AlgorithmStrategy`
34 objects. As an example, assume that a single `framework.Calibration` was given and list of two `framework.Algorithm`
37 In this example the chosen :py:meth:`AlgorithmsRunner.run()` is simple and just loops over the list of `caf.framework.Algorithm`
38 calling each one's :py:meth:`caf.strategies.AlgorithmStrategy.run()` methods in order.
39 Thereby generating a localdb with the only communication between the `strategies.AlgorithmStrategy` instances coming from the
40 database payloads being available from one algorithm to the next.
42 But you could imagine a more complex situation. The `AlgorithmsRunner` might take the first `framework.Algorithm` and
43 call its `AlgorithmStrategy.run` for only the first (exp,run) in the collected data. Then it might not commit the payloads
44 to a localdb but instead pass some calculated values to the next algorithm to run on the same IoV. Then it might go back
45 and re-run the first AlgorithmStrategy with new information and commit payloads this time. Then move onto the next IoV.
47 Hopefully you can see that while the default provided `AlgorithmsRunner` and `AlgorithmStrategy` classes should be good for
48 most situations, you have lot of freedom to define your own strategy if needed. By following the basic requirements for the
49 interface to the `framework.CAF` you can easily plugin a different special case, or mix and match a custom class with
52 The run(self) method should be defined for every derived `AlgorithmsRunner`. It will be called once and only once for each
53 iteration of (collector -> algorithm).
55 Input files are automatically given via the `framework.Calibration.output_patterns` which constructs
56 a list of all files in the collector output directories that match the output_patterns. If you have multiple types of
57 output data it is your job to filter through the input files and assign them correctly.
59 A list of local database paths are given to the `AlgorithmsRunner` based on the `framework.Calibration` dependencies and
60 any overall database chain given to the Calibration before running.
61 By default you can call the "setup_algorithm" transition of the `caf.state_machines.AlgorithmMachine` to automatically
62 set a database chain based on this list.
63 But you have freedom to not call this at all in `run`, or to implement a different method to deal with this.
69 COMPLETED =
"COMPLETED"
103 def run(self, iov, iteration):
106 from caf.strategies
import AlgorithmStrategy
107 B2INFO(f
"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
112 strategy = algorithm.strategy(algorithm)
117 strategy_params[
"output_dir"] = self.
output_dir
120 strategy_params[
"ignored_runs"] = self.ignored_runs
121 strategy.setup_from_dict(strategy_params)
122 strategies.append(strategy)
125 ctx = multiprocessing.get_context(
"fork")
126 for strategy
in strategies:
127 queue = multiprocessing.SimpleQueue()
128 child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
129 args=(strategy, iov, iteration, queue))
131 self.
results[strategy.algorithm.name] = []
132 B2INFO(f
"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
133 B2INFO(
"Logging will be diverted into algorithm output.")
138 B2INFO(f
"Collecting results for {strategy.algorithm.name}.")
141 while not queue.empty():
143 B2DEBUG(29, f
"Result from queue was {output}")
144 if output[
"type"] ==
"result":
145 self.
results[strategy.algorithm.name].append(output[
"value"])
146 elif output[
"type"] ==
"final_state":
147 final_state = output[
"value"]
149 raise RunnerError(f
"Unknown result output: {output}")
159 if child.exitcode == 0:
160 B2INFO(f
"AlgorithmStrategy subprocess for {strategy.algorithm.name} exited")
163 raise RunnerError(f
"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
171 raise RunnerError(f
"Strategy for {strategy.algorithm.name} "
172 "exited subprocess but without a final state!")
175 if final_state == AlgorithmStrategy.FAILED:
176 B2ERROR(f
"AlgorithmStrategy for {strategy.algorithm.name} failed. We will not proceed with any more algorithms")
180 B2DEBUG(29, f
"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
183 B2INFO(f
"SequentialAlgorithmsRunner finished for Calibration {self.name}")
188 """Runs the AlgorithmStrategy sends back the results"""
189 strategy.run(iov, iteration, queue)
191 B2INFO(f
"Finished Strategy for {strategy.algorithm.name}.")
196 Base exception class for Runners
list input_files
All of the output files made by the collector job and recovered by the "output_patterns".
list dependent_databases
List of local databases created by previous CAF calibrations/iterations.
list database_chain
User set databases, can be used to apply your own constants and global tags.
dict results
Algorithm results from each algorithm we execute.
final_state
Final state of runner.
str output_database_dir
The directory of the local database we use to store algorithm payloads from this execution.
name
The name of this runner instance.
algorithms
The list of algorithms that this runner executes.
str output_dir
Output directory of these algorithms, for logging mostly.
run(self, iov, iteration)
_run_strategy(strategy, iov, iteration, queue)