14 from abc 
import ABC, abstractmethod
 
   16 from basf2 
import B2DEBUG, B2ERROR, B2INFO
 
   17 import multiprocessing
 
   22     Abstract Base Class for Runner type object. 
   30 class AlgorithmsRunner(Runner):
 
   32     Base class for `AlgorithmsRunner` classes. Defines the necessary information that will be provided to every 
   33     `AlgorithmsRunner` used by the `framework.CAF` 
   35     An `AlgorithmsRunner` will be given a list of `framework.Algorithm` objects defined during the setup of a 
   36     `framework.Calibration` instance. The `AlgorithmsRunner` describes how to run each of the `strategies.AlgorithmStrategy` 
   37     objects. As an example, assume that a single `framework.Calibration` was given and list of two `framework.Algorithm` 
   40     In this example the chosen :py:meth:`AlgorithmsRunner.run()` is simple and just loops over the list of `caf.framework.Algorithm` 
   41     calling each one's :py:meth:`caf.strategies.AlgorithmStrategy.run()` methods in order. 
   42     Thereby generating a localdb with the only communication between the `strategies.AlgorithmStrategy` instances coming from the 
   43     database payloads being available from one algorithm to the next. 
   45     But you could imagine a more complex situation. The `AlgorithmsRunner` might take the first `framework.Algorithm` and 
   46     call its `AlgorithmStrategy.run` for only the first (exp,run) in the collected data. Then it might not commit the payloads 
   47     to a localdb but instead pass some calculated values to the next algorithm to run on the same IoV. Then it might go back 
   48     and re-run the first AlgorithmStrategy with new information and commit payloads this time. Then move onto the next IoV. 
   50     Hopefully you can see that while the default provided `AlgorithmsRunner` and `AlgorithmStrategy` classes should be good for 
   51     most situations, you have lot of freedom to define your own strategy if needed. By following the basic requirements for the 
   52     interface to the `framework.CAF` you can easily plugin a different special case, or mix and match a custom class with 
   55     The run(self) method should be defined for every derived `AlgorithmsRunner`. It will be called once and only once for each 
   56     iteration of (collector -> algorithm). 
   58     Input files are automatically given via the `framework.Calibration.output_patterns` which constructs 
   59     a list of all files in the collector output directories that match the output_patterns. If you have multiple types of 
   60     output data it is your job to filter through the input files and assign them correctly. 
   62     A list of local database paths are given to the `AlgorithmsRunner` based on the `framework.Calibration` dependencies and 
   63     any overall database chain given to the Calibration before running. 
   64     By default you can call the "setup_algorithm" transition of the `caf.state_machines.AlgorithmMachine` to automatically 
   65     set a database chain based on this list. 
   66     But you have freedom to not call this at all in `run`, or to implement a different method to deal with this. 
   70     COMPLETED = 
"COMPLETED" 
   72     def __init__(self, name):
 
   80         self.database_chain = []
 
   82         self.dependent_databases = []
 
   84         self.output_database_dir = 
"" 
   88         self.final_state = 
None 
   90         self.algorithms = 
None 
   95 class SeqAlgorithmsRunner(AlgorithmsRunner):
 
   99     def __init__(self, name):
 
  102         super().__init__(name)
 
  104     def run(self, iov, iteration):
 
  107         from caf.strategies 
import AlgorithmStrategy
 
  108         B2INFO(f
"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
 
  111         for algorithm 
in self.algorithms:
 
  113             strategy = algorithm.strategy(algorithm)
 
  116             strategy_params[
"database_chain"] = self.database_chain
 
  117             strategy_params[
"dependent_databases"] = self.dependent_databases
 
  118             strategy_params[
"output_dir"] = self.output_dir
 
  119             strategy_params[
"output_database_dir"] = self.output_database_dir
 
  120             strategy_params[
"input_files"] = self.input_files
 
  121             strategy_params[
"ignored_runs"] = self.ignored_runs
 
  122             strategy.setup_from_dict(strategy_params)
 
  123             strategies.append(strategy)
 
  126         ctx = multiprocessing.get_context(
"fork")
 
  127         for strategy 
in strategies:
 
  128             queue = multiprocessing.SimpleQueue()
 
  129             child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
 
  130                                 args=(strategy, iov, iteration, queue))
 
  132             self.results[strategy.algorithm.name] = []
 
  133             B2INFO(f
"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
 
  134             B2INFO(
"Logging will be diverted into algorithm output.")
 
  139             B2INFO(f
"Collecting results for {strategy.algorithm.name}.")
 
  142                 while not queue.empty():
 
  144                     B2DEBUG(29, f
"Result from queue was {output}")
 
  145                     if output[
"type"] == 
"result":
 
  146                         self.results[strategy.algorithm.name].append(output[
"value"])
 
  147                     elif output[
"type"] == 
"final_state":
 
  148                         final_state = output[
"value"]
 
  150                         raise RunnerError(f
"Unknown result output: {output}")
 
  160                         if child.exitcode == 0:
 
  161                             B2INFO(f
"AlgorithmStrategy subprocess for {strategy.algorithm.name} exited")
 
  164                             raise RunnerError(f
"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
 
  172                             raise RunnerError(f
"Strategy for {strategy.algorithm.name} " 
  173                                               "exited subprocess but without a final state!")
 
  176             if final_state == AlgorithmStrategy.FAILED:
 
  177                 B2ERROR(f
"AlgorithmStrategy for {strategy.algorithm.name} failed. We wil not proceed with any more algorithms")
 
  178                 self.final_state = self.FAILED
 
  181             B2DEBUG(29, f
"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
 
  183         if self.final_state != self.FAILED:
 
  184             B2INFO(f
"SequentialAlgorithmsRunner finished for Calibration {self.name}")
 
  185             self.final_state = self.COMPLETED
 
  188     def _run_strategy(strategy, iov, iteration, queue):
 
  189         """Runs the AlgorithmStrategy sends back the results""" 
  190         strategy.run(iov, iteration, queue)
 
  192         B2INFO(f
"Finished Strategy for {strategy.algorithm.name}.")
 
  195 class RunnerError(Exception):
 
  197     Base exception class for Runners