Belle II Software development
runners.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14from abc import ABC, abstractmethod
15import time
16from basf2 import B2DEBUG, B2ERROR, B2INFO
17import multiprocessing
18
19
20class Runner(ABC):
21 """
22 Abstract Base Class for Runner type object.
23 """
24 @abstractmethod
25 def run(self):
26 """
27 """
28
29
30class AlgorithmsRunner(Runner):
31 """
32 Base class for `AlgorithmsRunner` classes. Defines the necessary information that will be provided to every
33 `AlgorithmsRunner` used by the `framework.CAF`
34
35 An `AlgorithmsRunner` will be given a list of `framework.Algorithm` objects defined during the setup of a
36 `framework.Calibration` instance. The `AlgorithmsRunner` describes how to run each of the `strategies.AlgorithmStrategy`
37 objects. As an example, assume that a single `framework.Calibration` was given and list of two `framework.Algorithm`
38 instances to run.
39
40 In this example the chosen :py:meth:`AlgorithmsRunner.run()` is simple and just loops over the list of `caf.framework.Algorithm`
41 calling each one's :py:meth:`caf.strategies.AlgorithmStrategy.run()` methods in order.
42 Thereby generating a localdb with the only communication between the `strategies.AlgorithmStrategy` instances coming from the
43 database payloads being available from one algorithm to the next.
44
45 But you could imagine a more complex situation. The `AlgorithmsRunner` might take the first `framework.Algorithm` and
46 call its `AlgorithmStrategy.run` for only the first (exp,run) in the collected data. Then it might not commit the payloads
47 to a localdb but instead pass some calculated values to the next algorithm to run on the same IoV. Then it might go back
48 and re-run the first AlgorithmStrategy with new information and commit payloads this time. Then move onto the next IoV.
49
50 Hopefully you can see that while the default provided `AlgorithmsRunner` and `AlgorithmStrategy` classes should be good for
51 most situations, you have lot of freedom to define your own strategy if needed. By following the basic requirements for the
52 interface to the `framework.CAF` you can easily plugin a different special case, or mix and match a custom class with
53 default CAF ones.
54
55 The run(self) method should be defined for every derived `AlgorithmsRunner`. It will be called once and only once for each
56 iteration of (collector -> algorithm).
57
58 Input files are automatically given via the `framework.Calibration.output_patterns` which constructs
59 a list of all files in the collector output directories that match the output_patterns. If you have multiple types of
60 output data it is your job to filter through the input files and assign them correctly.
61
62 A list of local database paths are given to the `AlgorithmsRunner` based on the `framework.Calibration` dependencies and
63 any overall database chain given to the Calibration before running.
64 By default you can call the "setup_algorithm" transition of the `caf.state_machines.AlgorithmMachine` to automatically
65 set a database chain based on this list.
66 But you have freedom to not call this at all in `run`, or to implement a different method to deal with this.
67 """
68
69 FAILED = "FAILED"
70 COMPLETED = "COMPLETED"
71
72 def __init__(self, name):
73 """
74 """
75
76 self.name = name
77
78 self.input_files = []
79
80 self.database_chain = []
81
82 self.dependent_databases = []
83
84 self.output_database_dir = ""
85
86 self.results = {}
87
88 self.final_state = None
89
90 self.algorithms = None
91
92 self.output_dir = ""
93
94
95class SeqAlgorithmsRunner(AlgorithmsRunner):
96 """
97 """
98
99 def __init__(self, name):
100 """
101 """
102 super().__init__(name)
103
104 def run(self, iov, iteration):
105 """
106 """
107 from caf.strategies import AlgorithmStrategy
108 B2INFO(f"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
109 # First we do the setup of algorithm strategies
110 strategies = []
111 for algorithm in self.algorithms:
112 # Need to create an instance of the requested strategy and set the attributes
113 strategy = algorithm.strategy(algorithm)
114 # Now add all the necessary parameters for a strategy to run
115 strategy_params = {}
116 strategy_params["database_chain"] = self.database_chain
117 strategy_params["dependent_databases"] = self.dependent_databases
118 strategy_params["output_dir"] = self.output_dir
119 strategy_params["output_database_dir"] = self.output_database_dir
120 strategy_params["input_files"] = self.input_files
121 strategy_params["ignored_runs"] = self.ignored_runs
122 strategy.setup_from_dict(strategy_params)
123 strategies.append(strategy)
124
125 # We then fork off a copy of this python process so that we don't affect the original with logging changes
126 ctx = multiprocessing.get_context("fork")
127 for strategy in strategies:
128 queue = multiprocessing.SimpleQueue()
129 child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
130 args=(strategy, iov, iteration, queue))
131
132 self.results[strategy.algorithm.name] = []
133 B2INFO(f"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
134 B2INFO("Logging will be diverted into algorithm output.")
135 child.start()
136 final_state = None
137 final_loop = False
138
139 B2INFO(f"Collecting results for {strategy.algorithm.name}.")
140 while True:
141 # Do we have results?
142 while not queue.empty():
143 output = queue.get()
144 B2DEBUG(29, f"Result from queue was {output}")
145 if output["type"] == "result":
146 self.results[strategy.algorithm.name].append(output["value"])
147 elif output["type"] == "final_state":
148 final_state = output["value"]
149 else:
150 raise RunnerError(f"Unknown result output: {output}")
151
152 # Still alive but not results at the moment? Wait a few seconds before checking.
153 if child.is_alive():
154 time.sleep(5)
155 continue
156 else:
157 # Reached a good ending of strategy
158 if final_state:
159 # Check the exitcode for failed Process()
160 if child.exitcode == 0:
161 B2INFO(f"AlgorithmStrategy subprocess for {strategy.algorithm.name} exited")
162 break
163 else:
164 raise RunnerError(f"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
165 # It might be possible that the subprocess has finished but all results weren't gathered yet.
166 else:
167 # Go around once more since all results should be in the queue waiting
168 if not final_loop:
169 final_loop = True
170 continue
171 else:
172 raise RunnerError(f"Strategy for {strategy.algorithm.name} "
173 "exited subprocess but without a final state!")
174
175 # Exit early and don't continue strategies as this one failed
176 if final_state == AlgorithmStrategy.FAILED:
177 B2ERROR(f"AlgorithmStrategy for {strategy.algorithm.name} failed. We will not proceed with any more algorithms")
178 self.final_state = self.FAILED
179 break
180
181 B2DEBUG(29, f"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
182
183 if self.final_state != self.FAILED:
184 B2INFO(f"SequentialAlgorithmsRunner finished for Calibration {self.name}")
185 self.final_state = self.COMPLETED
186
187 @staticmethod
188 def _run_strategy(strategy, iov, iteration, queue):
189 """Runs the AlgorithmStrategy sends back the results"""
190 strategy.run(iov, iteration, queue)
191 # Get the return codes of the algorithm for the IoVs found by the Process
192 B2INFO(f"Finished Strategy for {strategy.algorithm.name}.")
193
194
195class RunnerError(Exception):
196 """
197 Base exception class for Runners
198 """
199
200# @endcond