4 from abc
import ABC, abstractmethod
6 from basf2
import B2DEBUG, B2ERROR, B2INFO
12 Abstract Base Class for Runner type object.
20 class AlgorithmsRunner(Runner):
22 Base class for `AlgorithmsRunner` classes. Defines the necessary information that will be provided to every
23 `AlgorithmsRunner` used by the `framework.CAF`
25 An `AlgorithmsRunner` will be given a list of `framework.Algorithm` objects defined during the setup of a
26 `framework.Calibration` instance. The `AlgorithmsRunner` describes how to run each of the `strategies.AlgorithmStrategy`
27 objects. As an example, assume that a single `framework.Calibration` was given and list of two `framework.Algorithm`
30 In this example the chosen :py:meth:`AlgorithmsRunner.run()` is simple and just loops over the list of `caf.framework.Algorithm`
31 calling each one's :py:meth:`caf.strategies.AlgorithmStrategy.run()` methods in order.
32 Thereby generating a localdb with the only communication between the `strategies.AlgorithmStrategy` instances coming from the
33 database payloads being available from one algorithm to the next.
35 But you could imagine a more complex situation. The `AlgorithmsRunner` might take the first `framework.Algorithm` and
36 call its `AlgorithmStrategy.run` for only the first (exp,run) in the collected data. Then it might not commit the payloads
37 to a localdb but instead pass some calculated values to the next algorithm to run on the same IoV. Then it might go back
38 and re-run the first AlgorithmStrategy with new information and commit payloads this time. Then move onto the next IoV.
40 Hopefully you can see that while the default provided `AlgorithmsRunner` and `AlgorithmStrategy` classes should be good for
41 most situations, you have lot of freedom to define your own strategy if needed. By following the basic requirements for the
42 interface to the `framework.CAF` you can easily plugin a different special case, or mix and match a custom class with
45 The run(self) method should be defined for every derived `AlgorithmsRunner`. It will be called once and only once for each
46 iteration of (collector -> algorithm).
48 Input files are automatically given via the `framework.Calibration.output_patterns` which constructs
49 a list of all files in the collector output directories that match the output_patterns. If you have multiple types of
50 output data it is your job to filter through the input files and assign them correctly.
52 A list of local database paths are given to the `AlgorithmsRunner` based on the `framework.Calibration` dependencies and
53 any overall database chain given to the Calibration before running.
54 By default you can call the "setup_algorithm" transition of the `caf.state_machines.AlgorithmMachine` to automatically
55 set a database chain based on this list.
56 But you have freedom to not call this at all in `run`, or to implement a different method to deal with this.
60 COMPLETED =
"COMPLETED"
94 def run(self, iov, iteration):
97 from .strategies
import AlgorithmStrategy
98 B2INFO(f
"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
103 strategy = algorithm.strategy(algorithm)
108 strategy_params[
"output_dir"] = self.
output_dir
111 strategy_params[
"ignored_runs"] = self.ignored_runs
112 strategy.setup_from_dict(strategy_params)
113 strategies.append(strategy)
116 ctx = multiprocessing.get_context(
"fork")
117 for strategy
in strategies:
118 queue = multiprocessing.SimpleQueue()
119 child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
120 args=(strategy, iov, iteration, queue))
122 self.
results[strategy.algorithm.name] = []
123 B2INFO(f
"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
124 B2INFO(
"Logging will be diverted into algorithm output.")
129 B2INFO(f
"Collecting results for {strategy.algorithm.name}.")
132 while not queue.empty():
134 B2DEBUG(29, f
"Result from queue was {output}")
135 if output[
"type"] ==
"result":
136 self.
results[strategy.algorithm.name].append(output[
"value"])
137 elif output[
"type"] ==
"final_state":
138 final_state = output[
"value"]
140 raise RunnerError(f
"Unknown result output: {output}")
150 if child.exitcode == 0:
151 B2INFO(f
"AlgorithStrategy subprocess for {strategy.algorithm.name} exited")
154 raise RunnerError(f
"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
162 raise RunnerError((f
"Strategy for {strategy.algorithm.name} "
163 "exited subprocess but without a final state!"))
166 if final_state == AlgorithmStrategy.FAILED:
167 B2ERROR(f
"AlgorithmStrategy for {strategy.algorithm.name} failed. We wil not proceed with any more algorithms")
171 B2DEBUG(29, f
"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
174 B2INFO(f
"SequentialAlgorithmsRunner finished for Calibration {self.name}")
179 """Runs the AlgorithmStrategy sends back the results"""
180 strategy.run(iov, iteration, queue)
182 B2INFO(f
"Finished Strategy for {strategy.algorithm.name}.")
187 Base exception class for Runners