12 from abc
import ABC, abstractmethod
14 from basf2
import B2DEBUG, B2ERROR, B2INFO
15 import multiprocessing
20 Abstract Base Class for Runner type object.
28 class AlgorithmsRunner(Runner):
30 Base class for `AlgorithmsRunner` classes. Defines the necessary information that will be provided to every
31 `AlgorithmsRunner` used by the `framework.CAF`
33 An `AlgorithmsRunner` will be given a list of `framework.Algorithm` objects defined during the setup of a
34 `framework.Calibration` instance. The `AlgorithmsRunner` describes how to run each of the `strategies.AlgorithmStrategy`
35 objects. As an example, assume that a single `framework.Calibration` was given and list of two `framework.Algorithm`
38 In this example the chosen :py:meth:`AlgorithmsRunner.run()` is simple and just loops over the list of `caf.framework.Algorithm`
39 calling each one's :py:meth:`caf.strategies.AlgorithmStrategy.run()` methods in order.
40 Thereby generating a localdb with the only communication between the `strategies.AlgorithmStrategy` instances coming from the
41 database payloads being available from one algorithm to the next.
43 But you could imagine a more complex situation. The `AlgorithmsRunner` might take the first `framework.Algorithm` and
44 call its `AlgorithmStrategy.run` for only the first (exp,run) in the collected data. Then it might not commit the payloads
45 to a localdb but instead pass some calculated values to the next algorithm to run on the same IoV. Then it might go back
46 and re-run the first AlgorithmStrategy with new information and commit payloads this time. Then move onto the next IoV.
48 Hopefully you can see that while the default provided `AlgorithmsRunner` and `AlgorithmStrategy` classes should be good for
49 most situations, you have lot of freedom to define your own strategy if needed. By following the basic requirements for the
50 interface to the `framework.CAF` you can easily plugin a different special case, or mix and match a custom class with
53 The run(self) method should be defined for every derived `AlgorithmsRunner`. It will be called once and only once for each
54 iteration of (collector -> algorithm).
56 Input files are automatically given via the `framework.Calibration.output_patterns` which constructs
57 a list of all files in the collector output directories that match the output_patterns. If you have multiple types of
58 output data it is your job to filter through the input files and assign them correctly.
60 A list of local database paths are given to the `AlgorithmsRunner` based on the `framework.Calibration` dependencies and
61 any overall database chain given to the Calibration before running.
62 By default you can call the "setup_algorithm" transition of the `caf.state_machines.AlgorithmMachine` to automatically
63 set a database chain based on this list.
64 But you have freedom to not call this at all in `run`, or to implement a different method to deal with this.
68 COMPLETED =
"COMPLETED"
102 def run(self, iov, iteration):
105 from caf.strategies
import AlgorithmStrategy
106 B2INFO(f
"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
111 strategy = algorithm.strategy(algorithm)
114 strategy_params[
"database_chain"] = self.
database_chaindatabase_chain
116 strategy_params[
"output_dir"] = self.
output_diroutput_dir
118 strategy_params[
"input_files"] = self.
input_filesinput_files
119 strategy_params[
"ignored_runs"] = self.ignored_runs
120 strategy.setup_from_dict(strategy_params)
121 strategies.append(strategy)
124 ctx = multiprocessing.get_context(
"fork")
125 for strategy
in strategies:
126 queue = multiprocessing.SimpleQueue()
127 child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
128 args=(strategy, iov, iteration, queue))
130 self.
resultsresults[strategy.algorithm.name] = []
131 B2INFO(f
"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
132 B2INFO(
"Logging will be diverted into algorithm output.")
137 B2INFO(f
"Collecting results for {strategy.algorithm.name}.")
140 while not queue.empty():
142 B2DEBUG(29, f
"Result from queue was {output}")
143 if output[
"type"] ==
"result":
144 self.
resultsresults[strategy.algorithm.name].append(output[
"value"])
145 elif output[
"type"] ==
"final_state":
146 final_state = output[
"value"]
148 raise RunnerError(f
"Unknown result output: {output}")
158 if child.exitcode == 0:
159 B2INFO(f
"AlgorithmStrategy subprocess for {strategy.algorithm.name} exited")
162 raise RunnerError(f
"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
170 raise RunnerError((f
"Strategy for {strategy.algorithm.name} "
171 "exited subprocess but without a final state!"))
174 if final_state == AlgorithmStrategy.FAILED:
175 B2ERROR(f
"AlgorithmStrategy for {strategy.algorithm.name} failed. We wil not proceed with any more algorithms")
179 B2DEBUG(29, f
"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
182 B2INFO(f
"SequentialAlgorithmsRunner finished for Calibration {self.name}")
187 """Runs the AlgorithmStrategy sends back the results"""
188 strategy.run(iov, iteration, queue)
190 B2INFO(f
"Finished Strategy for {strategy.algorithm.name}.")
195 Base exception class for Runners
output_database_dir
The directory of the local database we use to store algorithm payloads from this execution.
input_files
All of the output files made by the collector job and recovered by the "output_patterns".
results
Algorithm results from each algorithm we execute.
final_state
Final state of runner.
database_chain
User set databases, can be used to apply your own constants and global tags.
name
The name of this runner instance.
dependent_databases
List of local databases created by previous CAF calibrations/iterations.
algorithms
The list of algorithms that this runner executes.
output_dir
Output directory of these algorithms, for logging mostly.
def run(self, iov, iteration)
def _run_strategy(strategy, iov, iteration, queue)