Belle II Software  release-06-02-00
runners.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 from abc import ABC, abstractmethod
13 import time
14 from basf2 import B2DEBUG, B2ERROR, B2INFO
15 import multiprocessing
16 
17 
18 class Runner(ABC):
19  """
20  Abstract Base Class for Runner type object.
21  """
22  @abstractmethod
23  def run(self):
24  """
25  """
26 
27 
28 class AlgorithmsRunner(Runner):
29  """
30  Base class for `AlgorithmsRunner` classes. Defines the necessary information that will be provided to every
31  `AlgorithmsRunner` used by the `framework.CAF`
32 
33  An `AlgorithmsRunner` will be given a list of `framework.Algorithm` objects defined during the setup of a
34  `framework.Calibration` instance. The `AlgorithmsRunner` describes how to run each of the `strategies.AlgorithmStrategy`
35  objects. As an example, assume that a single `framework.Calibration` was given and list of two `framework.Algorithm`
36  instances to run.
37 
38  In this example the chosen :py:meth:`AlgorithmsRunner.run()` is simple and just loops over the list of `caf.framework.Algorithm`
39  calling each one's :py:meth:`caf.strategies.AlgorithmStrategy.run()` methods in order.
40  Thereby generating a localdb with the only communication between the `strategies.AlgorithmStrategy` instances coming from the
41  database payloads being available from one algorithm to the next.
42 
43  But you could imagine a more complex situation. The `AlgorithmsRunner` might take the first `framework.Algorithm` and
44  call its `AlgorithmStrategy.run` for only the first (exp,run) in the collected data. Then it might not commit the payloads
45  to a localdb but instead pass some calculated values to the next algorithm to run on the same IoV. Then it might go back
46  and re-run the first AlgorithmStrategy with new information and commit payloads this time. Then move onto the next IoV.
47 
48  Hopefully you can see that while the default provided `AlgorithmsRunner` and `AlgorithmStrategy` classes should be good for
49  most situations, you have lot of freedom to define your own strategy if needed. By following the basic requirements for the
50  interface to the `framework.CAF` you can easily plugin a different special case, or mix and match a custom class with
51  default CAF ones.
52 
53  The run(self) method should be defined for every derived `AlgorithmsRunner`. It will be called once and only once for each
54  iteration of (collector -> algorithm).
55 
56  Input files are automatically given via the `framework.Calibration.output_patterns` which constructs
57  a list of all files in the collector output directories that match the output_patterns. If you have multiple types of
58  output data it is your job to filter through the input files and assign them correctly.
59 
60  A list of local database paths are given to the `AlgorithmsRunner` based on the `framework.Calibration` dependencies and
61  any overall database chain given to the Calibration before running.
62  By default you can call the "setup_algorithm" transition of the `caf.state_machines.AlgorithmMachine` to automatically
63  set a database chain based on this list.
64  But you have freedom to not call this at all in `run`, or to implement a different method to deal with this.
65  """
66 
67  FAILED = "FAILED"
68  COMPLETED = "COMPLETED"
69 
70  def __init__(self, name):
71  """
72  """
73 
74  self.namename = name
75 
76  self.input_filesinput_files = []
77 
78  self.database_chaindatabase_chain = []
79 
80  self.dependent_databasesdependent_databases = []
81 
82  self.output_database_diroutput_database_dir = ""
83 
84  self.resultsresults = {}
85 
86  self.final_statefinal_state = None
87 
88  self.algorithmsalgorithms = None
89 
90  self.output_diroutput_dir = ""
91 
92 
94  """
95  """
96 
97  def __init__(self, name):
98  """
99  """
100  super().__init__(name)
101 
102  def run(self, iov, iteration):
103  """
104  """
105  from caf.strategies import AlgorithmStrategy
106  B2INFO(f"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
107  # First we do the setup of algorithm strategies
108  strategies = []
109  for algorithm in self.algorithmsalgorithms:
110  # Need to create an instance of the requested strategy and set the attributes
111  strategy = algorithm.strategy(algorithm)
112  # Now add all the necessary parameters for a strategy to run
113  strategy_params = {}
114  strategy_params["database_chain"] = self.database_chaindatabase_chain
115  strategy_params["dependent_databases"] = self.dependent_databasesdependent_databases
116  strategy_params["output_dir"] = self.output_diroutput_dir
117  strategy_params["output_database_dir"] = self.output_database_diroutput_database_dir
118  strategy_params["input_files"] = self.input_filesinput_files
119  strategy_params["ignored_runs"] = self.ignored_runs
120  strategy.setup_from_dict(strategy_params)
121  strategies.append(strategy)
122 
123  # We then fork off a copy of this python process so that we don't affect the original with logging changes
124  ctx = multiprocessing.get_context("fork")
125  for strategy in strategies:
126  queue = multiprocessing.SimpleQueue()
127  child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
128  args=(strategy, iov, iteration, queue))
129 
130  self.resultsresults[strategy.algorithm.name] = []
131  B2INFO(f"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
132  B2INFO("Logging will be diverted into algorithm output.")
133  child.start()
134  final_state = None
135  final_loop = False
136 
137  B2INFO(f"Collecting results for {strategy.algorithm.name}.")
138  while True:
139  # Do we have results?
140  while not queue.empty():
141  output = queue.get()
142  B2DEBUG(29, f"Result from queue was {output}")
143  if output["type"] == "result":
144  self.resultsresults[strategy.algorithm.name].append(output["value"])
145  elif output["type"] == "final_state":
146  final_state = output["value"]
147  else:
148  raise RunnerError(f"Unknown result output: {output}")
149 
150  # Still alive but not results at the moment? Wait a few seconds before checking.
151  if child.is_alive():
152  time.sleep(5)
153  continue
154  else:
155  # Reached a good ending of strategy
156  if final_state:
157  # Check the exitcode for failed Process()
158  if child.exitcode == 0:
159  B2INFO(f"AlgorithmStrategy subprocess for {strategy.algorithm.name} exited")
160  break
161  else:
162  raise RunnerError(f"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
163  # It might be possible that the subprocess has finished but all results weren't gathered yet.
164  else:
165  # Go around once more since all results should be in the queue waiting
166  if not final_loop:
167  final_loop = True
168  continue
169  else:
170  raise RunnerError((f"Strategy for {strategy.algorithm.name} "
171  "exited subprocess but without a final state!"))
172 
173  # Exit early and don't continue strategies as this one failed
174  if final_state == AlgorithmStrategy.FAILED:
175  B2ERROR(f"AlgorithmStrategy for {strategy.algorithm.name} failed. We wil not proceed with any more algorithms")
176  self.final_statefinal_statefinal_state = self.FAILEDFAILED
177  break
178 
179  B2DEBUG(29, f"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
180 
181  if self.final_statefinal_statefinal_state != self.FAILEDFAILED:
182  B2INFO(f"SequentialAlgorithmsRunner finished for Calibration {self.name}")
183  self.final_statefinal_statefinal_state = self.COMPLETEDCOMPLETED
184 
185  @staticmethod
186  def _run_strategy(strategy, iov, iteration, queue):
187  """Runs the AlgorithmStrategy sends back the results"""
188  strategy.run(iov, iteration, queue)
189  # Get the return codes of the algorithm for the IoVs found by the Process
190  B2INFO(f"Finished Strategy for {strategy.algorithm.name}.")
191 
192 
193 class RunnerError(Exception):
194  """
195  Base exception class for Runners
196  """
output_database_dir
The directory of the local database we use to store algorithm payloads from this execution.
Definition: runners.py:82
input_files
All of the output files made by the collector job and recovered by the "output_patterns".
Definition: runners.py:76
results
Algorithm results from each algorithm we execute.
Definition: runners.py:84
final_state
Final state of runner.
Definition: runners.py:86
database_chain
User set databases, can be used to apply your own constants and global tags.
Definition: runners.py:78
name
The name of this runner instance.
Definition: runners.py:74
def __init__(self, name)
Definition: runners.py:70
dependent_databases
List of local databases created by previous CAF calibrations/iterations.
Definition: runners.py:80
algorithms
The list of algorithms that this runner executes.
Definition: runners.py:88
output_dir
Output directory of these algorithms, for logging mostly.
Definition: runners.py:90
def run(self)
Definition: runners.py:23
def run(self, iov, iteration)
Definition: runners.py:102
def _run_strategy(strategy, iov, iteration, queue)
Definition: runners.py:186
def __init__(self, name)
Definition: runners.py:97