Source code for caf.runners

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

##########################################################################
# basf2 (Belle II Analysis Software Framework)                           #
# Author: The Belle II Collaboration                                     #
#                                                                        #
# See git log for contributors and copyright holders.                    #
# This file is licensed under LGPL-3.0, see LICENSE.md.                  #
##########################################################################

from abc import ABC, abstractmethod
import time
from basf2 import B2DEBUG, B2ERROR, B2INFO
import multiprocessing


[docs]class Runner(ABC): """ Abstract Base Class for Runner type object. """
[docs] @abstractmethod def run(self): """ """
[docs]class AlgorithmsRunner(Runner): """ Base class for `AlgorithmsRunner` classes. Defines the necessary information that will be provided to every `AlgorithmsRunner` used by the `framework.CAF` An `AlgorithmsRunner` will be given a list of `framework.Algorithm` objects defined during the setup of a `framework.Calibration` instance. The `AlgorithmsRunner` describes how to run each of the `strategies.AlgorithmStrategy` objects. As an example, assume that a single `framework.Calibration` was given and list of two `framework.Algorithm` instances to run. In this example the chosen :py:meth:`AlgorithmsRunner.run()` is simple and just loops over the list of `caf.framework.Algorithm` calling each one's :py:meth:`caf.strategies.AlgorithmStrategy.run()` methods in order. Thereby generating a localdb with the only communication between the `strategies.AlgorithmStrategy` instances coming from the database payloads being available from one algorithm to the next. But you could imagine a more complex situation. The `AlgorithmsRunner` might take the first `framework.Algorithm` and call its `AlgorithmStrategy.run` for only the first (exp,run) in the collected data. Then it might not commit the payloads to a localdb but instead pass some calculated values to the next algorithm to run on the same IoV. Then it might go back and re-run the first AlgorithmStrategy with new information and commit payloads this time. Then move onto the next IoV. Hopefully you can see that while the default provided `AlgorithmsRunner` and `AlgorithmStrategy` classes should be good for most situations, you have lot of freedom to define your own strategy if needed. By following the basic requirements for the interface to the `framework.CAF` you can easily plugin a different special case, or mix and match a custom class with default CAF ones. The run(self) method should be defined for every derived `AlgorithmsRunner`. It will be called once and only once for each iteration of (collector -> algorithm). Input files are automatically given via the `framework.Calibration.output_patterns` which constructs a list of all files in the collector output directories that match the output_patterns. If you have multiple types of output data it is your job to filter through the input files and assign them correctly. A list of local database paths are given to the `AlgorithmsRunner` based on the `framework.Calibration` dependencies and any overall database chain given to the Calibration before running. By default you can call the "setup_algorithm" transition of the `caf.state_machines.AlgorithmMachine` to automatically set a database chain based on this list. But you have freedom to not call this at all in `run`, or to implement a different method to deal with this. """ FAILED = "FAILED" COMPLETED = "COMPLETED" def __init__(self, name): """ """ #: The name of this runner instance self.name = name #: All of the output files made by the collector job and recovered by the "output_patterns" self.input_files = [] #: User set databases, can be used to apply your own constants and global tags self.database_chain = [] #: List of local databases created by previous CAF calibrations/iterations self.dependent_databases = [] #: The directory of the local database we use to store algorithm payloads from this execution self.output_database_dir = "" #: Algorithm results from each algorithm we execute self.results = {} #: Final state of runner self.final_state = None #: The list of algorithms that this runner executes self.algorithms = None #: Output directory of these algorithms, for logging mostly self.output_dir = ""
[docs]class SeqAlgorithmsRunner(AlgorithmsRunner): """ """ def __init__(self, name): """ """ super().__init__(name)
[docs] def run(self, iov, iteration): """ """ from caf.strategies import AlgorithmStrategy B2INFO(f"SequentialAlgorithmsRunner begun for Calibration {self.name}.") # First we do the setup of algorithm strategies strategies = [] for algorithm in self.algorithms: # Need to create an instance of the requested strategy and set the attributes strategy = algorithm.strategy(algorithm) # Now add all the necessary parameters for a strategy to run strategy_params = {} strategy_params["database_chain"] = self.database_chain strategy_params["dependent_databases"] = self.dependent_databases strategy_params["output_dir"] = self.output_dir strategy_params["output_database_dir"] = self.output_database_dir strategy_params["input_files"] = self.input_files strategy_params["ignored_runs"] = self.ignored_runs strategy.setup_from_dict(strategy_params) strategies.append(strategy) # We then fork off a copy of this python process so that we don't affect the original with logging changes ctx = multiprocessing.get_context("fork") for strategy in strategies: queue = multiprocessing.SimpleQueue() child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy, args=(strategy, iov, iteration, queue)) self.results[strategy.algorithm.name] = [] B2INFO(f"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.") B2INFO("Logging will be diverted into algorithm output.") child.start() final_state = None final_loop = False B2INFO(f"Collecting results for {strategy.algorithm.name}.") while True: # Do we have results? while not queue.empty(): output = queue.get() B2DEBUG(29, f"Result from queue was {output}") if output["type"] == "result": self.results[strategy.algorithm.name].append(output["value"]) elif output["type"] == "final_state": final_state = output["value"] else: raise RunnerError(f"Unknown result output: {output}") # Still alive but not results at the moment? Wait a few seconds before checking. if child.is_alive(): time.sleep(5) continue else: # Reached a good ending of strategy if final_state: # Check the exitcode for failed Process() if child.exitcode == 0: B2INFO(f"AlgorithmStrategy subprocess for {strategy.algorithm.name} exited") break else: raise RunnerError(f"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}") # It might be possible that the subprocess has finished but all results weren't gathered yet. else: # Go around once more since all results should be in the queue waiting if not final_loop: final_loop = True continue else: raise RunnerError((f"Strategy for {strategy.algorithm.name} " "exited subprocess but without a final state!")) # Exit early and don't continue strategies as this one failed if final_state == AlgorithmStrategy.FAILED: B2ERROR(f"AlgorithmStrategy for {strategy.algorithm.name} failed. We wil not proceed with any more algorithms") self.final_state = self.FAILED break B2DEBUG(29, f"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}") if self.final_state != self.FAILED: B2INFO(f"SequentialAlgorithmsRunner finished for Calibration {self.name}") self.final_state = self.COMPLETED
@staticmethod def _run_strategy(strategy, iov, iteration, queue): """Runs the AlgorithmStrategy sends back the results""" strategy.run(iov, iteration, queue) # Get the return codes of the algorithm for the IoVs found by the Process B2INFO(f"Finished Strategy for {strategy.algorithm.name}.")
[docs]class RunnerError(Exception): """ Base exception class for Runners """