Belle II Software  release-05-01-25
runners.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 from abc import ABC, abstractmethod
5 import time
6 from basf2 import B2DEBUG, B2ERROR, B2INFO
7 import multiprocessing
8 
9 
10 class Runner(ABC):
11  """
12  Abstract Base Class for Runner type object.
13  """
14  @abstractmethod
15  def run(self):
16  """
17  """
18 
19 
20 class AlgorithmsRunner(Runner):
21  """
22  Base class for `AlgorithmsRunner` classes. Defines the necessary information that will be provided to every
23  `AlgorithmsRunner` used by the `framework.CAF`
24 
25  An `AlgorithmsRunner` will be given a list of `framework.Algorithm` objects defined during the setup of a
26  `framework.Calibration` instance. The `AlgorithmsRunner` describes how to run each of the `strategies.AlgorithmStrategy`
27  objects. As an example, assume that a single `framework.Calibration` was given and list of two `framework.Algorithm`
28  instances to run.
29 
30  In this example the chosen :py:meth:`AlgorithmsRunner.run()` is simple and just loops over the list of `caf.framework.Algorithm`
31  calling each one's :py:meth:`caf.strategies.AlgorithmStrategy.run()` methods in order.
32  Thereby generating a localdb with the only communication between the `strategies.AlgorithmStrategy` instances coming from the
33  database payloads being available from one algorithm to the next.
34 
35  But you could imagine a more complex situation. The `AlgorithmsRunner` might take the first `framework.Algorithm` and
36  call its `AlgorithmStrategy.run` for only the first (exp,run) in the collected data. Then it might not commit the payloads
37  to a localdb but instead pass some calculated values to the next algorithm to run on the same IoV. Then it might go back
38  and re-run the first AlgorithmStrategy with new information and commit payloads this time. Then move onto the next IoV.
39 
40  Hopefully you can see that while the default provided `AlgorithmsRunner` and `AlgorithmStrategy` classes should be good for
41  most situations, you have lot of freedom to define your own strategy if needed. By following the basic requirements for the
42  interface to the `framework.CAF` you can easily plugin a different special case, or mix and match a custom class with
43  default CAF ones.
44 
45  The run(self) method should be defined for every derived `AlgorithmsRunner`. It will be called once and only once for each
46  iteration of (collector -> algorithm).
47 
48  Input files are automatically given via the `framework.Calibration.output_patterns` which constructs
49  a list of all files in the collector output directories that match the output_patterns. If you have multiple types of
50  output data it is your job to filter through the input files and assign them correctly.
51 
52  A list of local database paths are given to the `AlgorithmsRunner` based on the `framework.Calibration` dependencies and
53  any overall database chain given to the Calibration before running.
54  By default you can call the "setup_algorithm" transition of the `caf.state_machines.AlgorithmMachine` to automatically
55  set a database chain based on this list.
56  But you have freedom to not call this at all in `run`, or to implement a different method to deal with this.
57  """
58 
59  FAILED = "FAILED"
60  COMPLETED = "COMPLETED"
61 
62  def __init__(self, name):
63  """
64  """
65 
66  self.name = name
67 
68  self.input_files = []
69 
70  self.database_chain = []
71 
73 
75 
76  self.results = {}
77 
78  self.final_state = None
79 
80  self.algorithms = None
81 
82  self.output_dir = ""
83 
84 
86  """
87  """
88 
89  def __init__(self, name):
90  """
91  """
92  super().__init__(name)
93 
94  def run(self, iov, iteration):
95  """
96  """
97  from .strategies import AlgorithmStrategy
98  B2INFO(f"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
99  # First we do the setup of algorithm strategies
100  strategies = []
101  for algorithm in self.algorithms:
102  # Need to create an instance of the requested strategy and set the attributes
103  strategy = algorithm.strategy(algorithm)
104  # Now add all the necessary parameters for a strategy to run
105  strategy_params = {}
106  strategy_params["database_chain"] = self.database_chain
107  strategy_params["dependent_databases"] = self.dependent_databases
108  strategy_params["output_dir"] = self.output_dir
109  strategy_params["output_database_dir"] = self.output_database_dir
110  strategy_params["input_files"] = self.input_files
111  strategy_params["ignored_runs"] = self.ignored_runs
112  strategy.setup_from_dict(strategy_params)
113  strategies.append(strategy)
114 
115  # We then fork off a copy of this python process so that we don't affect the original with logging changes
116  ctx = multiprocessing.get_context("fork")
117  for strategy in strategies:
118  queue = multiprocessing.SimpleQueue()
119  child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
120  args=(strategy, iov, iteration, queue))
121 
122  self.results[strategy.algorithm.name] = []
123  B2INFO(f"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
124  B2INFO("Logging will be diverted into algorithm output.")
125  child.start()
126  final_state = None
127  final_loop = False
128 
129  B2INFO(f"Collecting results for {strategy.algorithm.name}.")
130  while True:
131  # Do we have results?
132  while not queue.empty():
133  output = queue.get()
134  B2DEBUG(29, f"Result from queue was {output}")
135  if output["type"] == "result":
136  self.results[strategy.algorithm.name].append(output["value"])
137  elif output["type"] == "final_state":
138  final_state = output["value"]
139  else:
140  raise RunnerError(f"Unknown result output: {output}")
141 
142  # Still alive but not results at the moment? Wait a few seconds before checking.
143  if child.is_alive():
144  time.sleep(5)
145  continue
146  else:
147  # Reached a good ending of strategy
148  if final_state:
149  # Check the exitcode for failed Process()
150  if child.exitcode == 0:
151  B2INFO(f"AlgorithStrategy subprocess for {strategy.algorithm.name} exited")
152  break
153  else:
154  raise RunnerError(f"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
155  # It might be possible that the subprocess has finished but all results weren't gathered yet.
156  else:
157  # Go around once more since all results should be in the queue waiting
158  if not final_loop:
159  final_loop = True
160  continue
161  else:
162  raise RunnerError((f"Strategy for {strategy.algorithm.name} "
163  "exited subprocess but without a final state!"))
164 
165  # Exit early and don't continue strategies as this one failed
166  if final_state == AlgorithmStrategy.FAILED:
167  B2ERROR(f"AlgorithmStrategy for {strategy.algorithm.name} failed. We wil not proceed with any more algorithms")
168  self.final_state = self.FAILED
169  break
170 
171  B2DEBUG(29, f"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
172 
173  if self.final_state != self.FAILED:
174  B2INFO(f"SequentialAlgorithmsRunner finished for Calibration {self.name}")
175  self.final_state = self.COMPLETED
176 
177  @staticmethod
178  def _run_strategy(strategy, iov, iteration, queue):
179  """Runs the AlgorithmStrategy sends back the results"""
180  strategy.run(iov, iteration, queue)
181  # Get the return codes of the algorithm for the IoVs found by the Process
182  B2INFO(f"Finished Strategy for {strategy.algorithm.name}.")
183 
184 
185 class RunnerError(Exception):
186  """
187  Base exception class for Runners
188  """
runners.AlgorithmsRunner.output_dir
output_dir
Output directory of these algorithms, for logging mostly.
Definition: runners.py:82
runners.AlgorithmsRunner.output_database_dir
output_database_dir
The directory of the local database we use to store algorithm payloads from this execution.
Definition: runners.py:74
runners.Runner.run
def run(self)
Definition: runners.py:15
runners.AlgorithmsRunner.name
name
The name of this runner instance.
Definition: runners.py:66
runners.SeqAlgorithmsRunner.run
def run(self, iov, iteration)
Definition: runners.py:94
runners.AlgorithmsRunner
Definition: runners.py:20
runners.AlgorithmsRunner.__init__
def __init__(self, name)
Definition: runners.py:62
runners.SeqAlgorithmsRunner
Definition: runners.py:85
runners.AlgorithmsRunner.results
results
Algorithm results from each algorithm we execute.
Definition: runners.py:76
runners.Runner
Definition: runners.py:10
runners.AlgorithmsRunner.database_chain
database_chain
User set databases, can be used to apply your own constants and global tags.
Definition: runners.py:70
runners.SeqAlgorithmsRunner._run_strategy
def _run_strategy(strategy, iov, iteration, queue)
Definition: runners.py:178
runners.AlgorithmsRunner.COMPLETED
string COMPLETED
Definition: runners.py:60
runners.AlgorithmsRunner.algorithms
algorithms
The list of algorithms that this runner executes.
Definition: runners.py:80
runners.AlgorithmsRunner.FAILED
string FAILED
Definition: runners.py:59
runners.AlgorithmsRunner.final_state
final_state
Final state of runner.
Definition: runners.py:78
runners.AlgorithmsRunner.dependent_databases
dependent_databases
List of local databases created by previous CAF calibrations/iterations.
Definition: runners.py:72
runners.RunnerError
Definition: runners.py:185
runners.SeqAlgorithmsRunner.__init__
def __init__(self, name)
Definition: runners.py:89
runners.AlgorithmsRunner.input_files
input_files
All of the output files made by the collector job and recovered by the "output_patterns".
Definition: runners.py:68