Belle II Software  release-08-01-10
runners.py
1 #!/usr/bin/env python3
2 
3 # disable doxygen check for this file
4 # @cond
5 
6 
13 
14 from abc import ABC, abstractmethod
15 import time
16 from basf2 import B2DEBUG, B2ERROR, B2INFO
17 import multiprocessing
18 
19 
20 class Runner(ABC):
21  """
22  Abstract Base Class for Runner type object.
23  """
24  @abstractmethod
25  def run(self):
26  """
27  """
28 
29 
30 class AlgorithmsRunner(Runner):
31  """
32  Base class for `AlgorithmsRunner` classes. Defines the necessary information that will be provided to every
33  `AlgorithmsRunner` used by the `framework.CAF`
34 
35  An `AlgorithmsRunner` will be given a list of `framework.Algorithm` objects defined during the setup of a
36  `framework.Calibration` instance. The `AlgorithmsRunner` describes how to run each of the `strategies.AlgorithmStrategy`
37  objects. As an example, assume that a single `framework.Calibration` was given and list of two `framework.Algorithm`
38  instances to run.
39 
40  In this example the chosen :py:meth:`AlgorithmsRunner.run()` is simple and just loops over the list of `caf.framework.Algorithm`
41  calling each one's :py:meth:`caf.strategies.AlgorithmStrategy.run()` methods in order.
42  Thereby generating a localdb with the only communication between the `strategies.AlgorithmStrategy` instances coming from the
43  database payloads being available from one algorithm to the next.
44 
45  But you could imagine a more complex situation. The `AlgorithmsRunner` might take the first `framework.Algorithm` and
46  call its `AlgorithmStrategy.run` for only the first (exp,run) in the collected data. Then it might not commit the payloads
47  to a localdb but instead pass some calculated values to the next algorithm to run on the same IoV. Then it might go back
48  and re-run the first AlgorithmStrategy with new information and commit payloads this time. Then move onto the next IoV.
49 
50  Hopefully you can see that while the default provided `AlgorithmsRunner` and `AlgorithmStrategy` classes should be good for
51  most situations, you have lot of freedom to define your own strategy if needed. By following the basic requirements for the
52  interface to the `framework.CAF` you can easily plugin a different special case, or mix and match a custom class with
53  default CAF ones.
54 
55  The run(self) method should be defined for every derived `AlgorithmsRunner`. It will be called once and only once for each
56  iteration of (collector -> algorithm).
57 
58  Input files are automatically given via the `framework.Calibration.output_patterns` which constructs
59  a list of all files in the collector output directories that match the output_patterns. If you have multiple types of
60  output data it is your job to filter through the input files and assign them correctly.
61 
62  A list of local database paths are given to the `AlgorithmsRunner` based on the `framework.Calibration` dependencies and
63  any overall database chain given to the Calibration before running.
64  By default you can call the "setup_algorithm" transition of the `caf.state_machines.AlgorithmMachine` to automatically
65  set a database chain based on this list.
66  But you have freedom to not call this at all in `run`, or to implement a different method to deal with this.
67  """
68 
69  FAILED = "FAILED"
70  COMPLETED = "COMPLETED"
71 
72  def __init__(self, name):
73  """
74  """
75 
76  self.name = name
77 
78  self.input_files = []
79 
80  self.database_chain = []
81 
82  self.dependent_databases = []
83 
84  self.output_database_dir = ""
85 
86  self.results = {}
87 
88  self.final_state = None
89 
90  self.algorithms = None
91 
92  self.output_dir = ""
93 
94 
95 class SeqAlgorithmsRunner(AlgorithmsRunner):
96  """
97  """
98 
99  def __init__(self, name):
100  """
101  """
102  super().__init__(name)
103 
104  def run(self, iov, iteration):
105  """
106  """
107  from caf.strategies import AlgorithmStrategy
108  B2INFO(f"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
109  # First we do the setup of algorithm strategies
110  strategies = []
111  for algorithm in self.algorithms:
112  # Need to create an instance of the requested strategy and set the attributes
113  strategy = algorithm.strategy(algorithm)
114  # Now add all the necessary parameters for a strategy to run
115  strategy_params = {}
116  strategy_params["database_chain"] = self.database_chain
117  strategy_params["dependent_databases"] = self.dependent_databases
118  strategy_params["output_dir"] = self.output_dir
119  strategy_params["output_database_dir"] = self.output_database_dir
120  strategy_params["input_files"] = self.input_files
121  strategy_params["ignored_runs"] = self.ignored_runs
122  strategy.setup_from_dict(strategy_params)
123  strategies.append(strategy)
124 
125  # We then fork off a copy of this python process so that we don't affect the original with logging changes
126  ctx = multiprocessing.get_context("fork")
127  for strategy in strategies:
128  queue = multiprocessing.SimpleQueue()
129  child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
130  args=(strategy, iov, iteration, queue))
131 
132  self.results[strategy.algorithm.name] = []
133  B2INFO(f"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
134  B2INFO("Logging will be diverted into algorithm output.")
135  child.start()
136  final_state = None
137  final_loop = False
138 
139  B2INFO(f"Collecting results for {strategy.algorithm.name}.")
140  while True:
141  # Do we have results?
142  while not queue.empty():
143  output = queue.get()
144  B2DEBUG(29, f"Result from queue was {output}")
145  if output["type"] == "result":
146  self.results[strategy.algorithm.name].append(output["value"])
147  elif output["type"] == "final_state":
148  final_state = output["value"]
149  else:
150  raise RunnerError(f"Unknown result output: {output}")
151 
152  # Still alive but not results at the moment? Wait a few seconds before checking.
153  if child.is_alive():
154  time.sleep(5)
155  continue
156  else:
157  # Reached a good ending of strategy
158  if final_state:
159  # Check the exitcode for failed Process()
160  if child.exitcode == 0:
161  B2INFO(f"AlgorithmStrategy subprocess for {strategy.algorithm.name} exited")
162  break
163  else:
164  raise RunnerError(f"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
165  # It might be possible that the subprocess has finished but all results weren't gathered yet.
166  else:
167  # Go around once more since all results should be in the queue waiting
168  if not final_loop:
169  final_loop = True
170  continue
171  else:
172  raise RunnerError(f"Strategy for {strategy.algorithm.name} "
173  "exited subprocess but without a final state!")
174 
175  # Exit early and don't continue strategies as this one failed
176  if final_state == AlgorithmStrategy.FAILED:
177  B2ERROR(f"AlgorithmStrategy for {strategy.algorithm.name} failed. We wil not proceed with any more algorithms")
178  self.final_state = self.FAILED
179  break
180 
181  B2DEBUG(29, f"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
182 
183  if self.final_state != self.FAILED:
184  B2INFO(f"SequentialAlgorithmsRunner finished for Calibration {self.name}")
185  self.final_state = self.COMPLETED
186 
187  @staticmethod
188  def _run_strategy(strategy, iov, iteration, queue):
189  """Runs the AlgorithmStrategy sends back the results"""
190  strategy.run(iov, iteration, queue)
191  # Get the return codes of the algorithm for the IoVs found by the Process
192  B2INFO(f"Finished Strategy for {strategy.algorithm.name}.")
193 
194 
195 class RunnerError(Exception):
196  """
197  Base exception class for Runners
198  """
199 
200 # @endcond