Source code for caf.strategies

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
from caf.utils import AlgResult
from caf.utils import B2INFO_MULTILINE
from caf.utils import runs_overlapping_iov, runs_from_vector
from caf.utils import iov_from_runs, split_runs_by_exp, vector_from_runs
from caf.utils import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
from caf.utils import IoV, ExpRun
from caf.state_machines import AlgorithmMachine

from abc import ABC, abstractmethod
import json


[docs]class AlgorithmStrategy(ABC): """ Base class for Algorithm strategies. These do the actual execution of a single algorithm on collected data. Each strategy may be quite different in terms of how fast it may be, how database payloads are passed between executions, and whether or not final payloads have an IoV that is independent to the actual runs used to calculates them. Parameters: algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner. When defining a derived class you are free to use these attributes or to implement as much functionality as you want. If you define your derived class with an __init__ method, then you should first call the base class `AlgorithmStrategy.__init__()` method via super() e.g. >>> def __init__(self): >>> super().__init__() The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it in the required way defined by the options you have selected/attributes set. """ #: Required attributes that must exist before the strategy can run properly. #: Some are allowed be values that return False whe tested e.g. "" or [] required_attrs = ["algorithm", "database_chain", "dependent_databases", "output_dir", "output_database_dir", "input_files", "ignored_runs" ] #: Attributes that must have a value that returns True when tested by :py:meth:`is_valid`. required_true_attrs = ["algorithm", "output_dir", "output_database_dir", "input_files" ] #: Granularity of collector that can be run by this algorithm properly allowed_granularities = ["run", "all"] #: Signal value that is put into the Queue when there are no more results left FINISHED_RESULTS = "DONE" #: Completed state COMPLETED = "COMPLETED" #: Failed state FAILED = "FAILED"
[docs] def __init__(self, algorithm): """ """ #: Algorithm() class that we're running self.algorithm = algorithm #: Collector output files, will contain all files retured by the output patterns self.input_files = [] #: The algorithm output directory which is mostly used to store the stdout file self.output_dir = "" #: The output database directory for the localdb that the algorithm will commit to self.output_database_dir = "" #: User defined database chain i.e. the default global tag, or if you have localdb's/tags for custom alignment etc self.database_chain = [] #: CAF created local databases from previous calibrations that this calibration/algorithm depends on self.dependent_databases = [] #: Runs that will not be included in ANY execution of the algorithm. Usually set by Calibration.ignored_runs. #: The different strategies may handle the resulting run gaps differently. self.ignored_runs = [] #: The list of results objects which will be sent out before the end self.results = [] #: The multiprocessing Queue we use to pass back results one at a time self.queue = None
[docs] @abstractmethod def run(self, iov, iteration, queue): """ Abstract method that needs to be implemented. It will be called to actually execute the algorithm. """
[docs] def setup_from_dict(self, params): """ Parameters: params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name. """ for attribute_name, value in params.items(): setattr(self, attribute_name, value)
[docs] def is_valid(self): """ Returns: bool: Whether or not this strategy has been set up correctly with all its necessary attributes. """ B2INFO("Checking validity of current AlgorithmStrategy setup.") # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init) for attribute_name in self.required_attrs: if not hasattr(self, attribute_name): B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.") return False # Check if any attributes that need actual values haven't been set or were empty for attribute_name in self.required_true_attrs: if not getattr(self, attribute_name): B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.") return False return True
[docs] def find_iov_gaps(self): """ Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps within the same experiment are found. Returns: iov_gaps(list[IoV]) """ iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.results])) if iov_gaps: gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."] gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.") gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs") gap_msg.append("unless you edit the final database.txt yourself.") B2INFO_MULTILINE(gap_msg) for iov in iov_gaps: B2INFO(f"{iov} not covered by any execution of the algorithm.") return iov_gaps
[docs] def any_failed_iov(self): """ Returns: bool: If any result in the current results list has a failed algorithm code we return True """ failed_results = [] for result in self.results: if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value: failed_results.append(result) if failed_results: B2WARNING("Failed results found.") for result in failed_results: if result.result == AlgResult.failure.value: B2ERROR(f"c_Failure returned for {result.iov}.") elif result.result == AlgResult.not_enough_data.value: B2WARNING(f"c_NotEnoughData returned for {result.iov}.") return True else: return False
def send_result(self, result): self.queue.put({"type": "result", "value": result}) def send_final_state(self, state): self.queue.put({"type": "final_state", "value": state})
[docs]class SingleIOV(AlgorithmStrategy): """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one that was executed. This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on a CalibrationAlgorithm C++ class directly. """ #: The params that you could set on the Algorithm object which this Strategy would use. #: Just here for documentation reasons. usable_params = {"apply_iov": IoV} def __init__(self, algorithm): """ """ super().__init__(algorithm) #: :py:class:`caf.state_machines.AlgorithmMachine` used to help set up and execute CalibrationAlgorithm #: It gets setup properly in :py:func:`run` self.machine = AlgorithmMachine(self.algorithm)
[docs] def run(self, iov, iteration, queue): """ Runs the algorithm machine over the collected data and fills the results. """ if not self.is_valid(): raise StrategyError("This AlgorithmStrategy was not set up correctly!") self.queue = queue B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.") # Now add all the necessary parameters for a strategy to run machine_params = {} machine_params["database_chain"] = self.database_chain machine_params["dependent_databases"] = self.dependent_databases machine_params["output_dir"] = self.output_dir machine_params["output_database_dir"] = self.output_database_dir machine_params["input_files"] = self.input_files machine_params["ignored_runs"] = self.ignored_runs self.machine.setup_from_dict(machine_params) # Start moving through machine states B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.") self.machine.setup_algorithm(iteration=iteration) # After this point, the logging is in the stdout of the algorithm B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.") all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())) # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once if iov: runs_to_execute = runs_overlapping_iov(iov, all_runs_collected) else: runs_to_execute = all_runs_collected # Remove the ignored runs from our run list to execute if self.ignored_runs: B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.") runs_to_execute.difference_update(set(self.ignored_runs)) # Sets aren't ordered so lets go back to lists and sort runs_to_execute = sorted(runs_to_execute) apply_iov = None if "apply_iov" in self.algorithm.params: apply_iov = self.algorithm.params["apply_iov"] self.machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov) B2INFO(f"Finished execution with result code {self.machine.result.result}.") # Send out the result to the runner self.send_result(self.machine.result) # Make sure the algorithm state and commit is done if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value): # Valid exit codes mean we can complete properly self.machine.complete() # Commit all the payloads and send out the results self.machine.algorithm.algorithm.commit() self.send_final_state(self.COMPLETED) else: # Either there wasn't enough data or the algorithm failed self.machine.fail() self.send_final_state(self.FAILED)
[docs]class SequentialRunByRun(AlgorithmStrategy): """ Algorithm strategy to do run-by-run calibration of collected data. Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only. If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds the next run's data and tries again. Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left) and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs are merged with the previous successful execution's runs and a final execution is performed on all remaining runs. Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload. This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration. This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on a CalibrationAlgorithm C++ class directly. """ #: The params that you could set on the Algorithm object which this Strategy would use. #: Just here for documentation reasons. usable_params = { "has_experiment_settings": bool, "iov_coverage": IoV, "step_size": int } #: Granularity of collector that can be run by this algorithm properly allowed_granularities = ["run"] def __init__(self, algorithm): """ """ super().__init__(algorithm) #: :py:class:`caf.state_machines.AlgorithmMachine` used to help set up and execute CalibrationAlgorithm #: It gets setup properly in :py:func:`run` self.machine = AlgorithmMachine(self.algorithm) if "step_size" not in self.algorithm.params: self.algorithm.params["step_size"] = 1 self.first_execution = True
[docs] def apply_experiment_settings(self, algorithm, experiment): """ Apply experiment-dependent settings. This is the default version, which does not do anything. If necessary, it should be reimplemented by derived classes. """ return
[docs] def run(self, iov, iteration, queue): """ Runs the algorithm machine over the collected data and fills the results. """ if not self.is_valid(): raise StrategyError("This AlgorithmStrategy was not set up correctly!") self.queue = queue B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.") # Now add all the necessary parameters for a strategy to run machine_params = {} machine_params["database_chain"] = self.database_chain machine_params["dependent_databases"] = self.dependent_databases machine_params["output_dir"] = self.output_dir machine_params["output_database_dir"] = self.output_database_dir machine_params["input_files"] = self.input_files machine_params["ignored_runs"] = self.ignored_runs self.machine.setup_from_dict(machine_params) # Start moving through machine states self.machine.setup_algorithm(iteration=iteration) # After this point, the logging is in the stdout of the algorithm B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.") runs_to_execute = [] all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()) # If we were given a specific IoV to calibrate we just execute over runs in that IoV if iov: runs_to_execute = runs_overlapping_iov(iov, all_runs_collected) else: runs_to_execute = all_runs_collected[:] # Remove the ignored runs from our run list to execute if self.ignored_runs: B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.") runs_to_execute.difference_update(set(self.ignored_runs)) # Sets aren't ordered so lets go back to lists and sort runs_to_execute = sorted(runs_to_execute) # We don't want to cross the boundary of Experiments accidentally. So we will split our run list # into separate lists, one for each experiment number contained. That way we can evaluate each experiment # separately and prevent IoVs from crossing the boundary. runs_to_execute = split_runs_by_exp(runs_to_execute) # Now iterate through the experiments, executing runs in blocks of 'step_size'. We DO NOT allow a payload IoV to # extend over multiple experiments, only multiple runs iov_coverage = None if "iov_coverage" in self.algorithm.params: B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.") iov_coverage = self.algorithm.params["iov_coverage"] number_of_experiments = len(runs_to_execute) # Iterate over experiment run lists for i_exp, run_list in enumerate(runs_to_execute, start=1): # Apply experiment-dependent settings. if "has_experiment_settings" in self.algorithm.params: if self.algorithm.params["has_experiment_settings"]: self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp) # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage. # This is only true for the lowest and highest experiments in our input data. if iov_coverage and i_exp == 1: lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low) else: lowest_exprun = run_list[0] if iov_coverage and i_exp == number_of_experiments: highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high) else: highest_exprun = run_list[-1] self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun) # Print any knowable gaps between result IoVs, if any are foun there is a problem. gaps = self.find_iov_gaps() # Dump them to a file for logging with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f: json.dump(gaps, f) # If any results weren't successes we fail if self.any_failed_iov(): self.send_final_state(self.FAILED) else: self.send_final_state(self.COMPLETED)
def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun): # The runs (data) we have left to execute from this run list remaining_runs = run_list[:] # The previous execution's runs previous_runs = [] # The current runs we are executing current_runs = [] # The last successful payload and result last_successful_payloads = None last_successful_result = None # Iterate over ExpRuns within an experiment in chunks of 'step_size' for expruns in grouper(self.algorithm.params["step_size"], run_list): # Already set up earlier the first time, so we shouldn't do it again if not self.first_execution: self.machine.setup_algorithm() else: self.first_execution = False # Add on the next step of runs current_runs.extend(expruns) # Remove them from our remaining runs remaining_runs = [run for run in remaining_runs if run not in current_runs] # Is this the first payload of the experiment if not last_successful_result: B2INFO("Detected that this will be the first payload of this experiment.") # If this is the first payload but we have other data, we need the IoV to cover from the # lowest IoV extent requested up to the ExpRun right before the next run in the remaining runs list. if remaining_runs: apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1) # If this is the first payload but there isn't more data, we set the IoV to cover the full range else: B2INFO("Detected that this will be the only payload of the experiment.") apply_iov = IoV(*lowest_exprun, *highest_exprun) # If there were previous successes else: if not remaining_runs: B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.") apply_iov = IoV(*current_runs[0], *highest_exprun) # Othewise, it's just a normal IoV in the middle. else: B2INFO("Detected that there are more runs to execute in this experiment after this next execution.") apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1) B2INFO(f"Executing and applying {apply_iov} to the payloads.") self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov) B2INFO(f"Finished execution with result code {self.machine.result.result}.") # Does this count as a successful execution? if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value): self.machine.complete() # If we've succeeded but we have a previous success we can commit the previous payloads # since we have new ones ready if last_successful_payloads and last_successful_result: B2INFO("Saving this execution's payloads to be committed later.") # Save the payloads and result new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues() new_successful_result = self.machine.result B2INFO("We just succeded in execution of the Algorithm." f" Will now commit payloads from the previous success for {last_successful_result.iov}.") self.machine.algorithm.algorithm.commit(last_successful_payloads) self.results.append(last_successful_result) self.send_result(last_successful_result) # If there are remaining runs we need to have the current payloads ready to commit after the next execution if remaining_runs: last_successful_payloads = new_successful_payloads last_successful_result = new_successful_result # If there's not more runs to process we should also commit the new ones else: B2INFO("We have no more runs to process. " f"Will now commit the most recent payloads for {new_successful_result.iov}.") self.machine.algorithm.algorithm.commit(new_successful_payloads) self.results.append(new_successful_result) self.send_result(new_successful_result) break # if there's no previous success this must be the first run executed else: # Need to save payloads for later if we have a success but runs remain if remaining_runs: B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.") # Save the payloads and result last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues() last_successful_result = self.machine.result # Need to commit and exit if we have a success and no remaining data else: B2INFO("We just succeeded in execution of the Algorithm." " No runs left to be processed, so we are committing results of this execution.") self.machine.algorithm.algorithm.commit() self.results.append(self.machine.result) self.send_result(self.machine.result) break previous_runs = current_runs[:] current_runs = [] # If it wasn't successful, was it due to lack of data in the runs? elif (self.machine.result.result == AlgResult.not_enough_data.value): B2INFO(f"There wasn't enough data in {self.machine.result.iov}.") if remaining_runs: B2INFO("Some runs remain to be processed. " f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.") elif not remaining_runs and not last_successful_result: B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success." " There wasn't enough data in the full input data requested.") self.results.append(self.machine.result) self.send_result(self.machine.result) self.machine.fail() break elif not remaining_runs and last_successful_result: B2INFO("There aren't any more runs remaining to merge with. But we had a previous success" ", so we'll merge with the previous IoV.") final_runs = current_runs[:] current_runs = previous_runs current_runs.extend(final_runs) self.machine.fail() elif self.machine.result.result == AlgResult.failure.value: B2ERROR(f"{self.algorithm.name} returned failure exit code.") self.results.append(self.machine.result) self.send_result(self.machine.result) self.machine.fail() break else: # Check if we need to run a final execution on the previous execution + dangling set of runs if current_runs: self.machine.setup_algorithm() apply_iov = IoV(last_successful_result.iov.exp_low, last_successful_result.iov.run_low, *highest_exprun) B2INFO(f"Executing on {apply_iov}.") self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov) B2INFO(f"Finished execution with result code {self.machine.result.result}.") if (self.machine.result.result == AlgResult.ok.value) or ( self.machine.result.result == AlgResult.iterate.value): self.machine.complete() # Commit all the payloads and send out the results self.machine.algorithm.algorithm.commit() # Save the result self.results.append(self.machine.result) self.send_result(self.machine.result) else: # Save the result self.results.append(self.machine.result) self.send_result(self.machine.result) # But failed self.machine.fail()
[docs]class SimpleRunByRun(AlgorithmStrategy): """ Algorithm strategy to do run-by-run calibration of collected data. Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only. This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns 'not enough data' on the current run. Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the next run (if any are left). Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes. .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit code. The failure will prevent iteration/successful completion of the CAF though. .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain enough data to complete the algorithm successfully, you won't be able to get a successful calibration. The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration. However, you will still have the database created that covers all the successfull runs. This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on a CalibrationAlgorithm C++ class directly. """ allowed_granularities = ["run"] #: The params that you could set on the Algorithm object which this Strategy would use. #: Just here for documentation reasons. usable_params = {} def __init__(self, algorithm): """ """ super().__init__(algorithm) #: :py:class:`caf.state_machines.AlgorithmMachine` used to help set up and execute CalibrationAlgorithm #: It gets setup properly in :py:func:`run` self.machine = AlgorithmMachine(self.algorithm)
[docs] def run(self, iov, iteration, queue): """ Runs the algorithm machine over the collected data and fills the results. """ if not self.is_valid(): raise StrategyError("This AlgorithmStrategy was not set up correctly!") self.queue = queue B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.") # Now add all the necessary parameters for a strategy to run machine_params = {} machine_params["database_chain"] = self.database_chain machine_params["dependent_databases"] = self.dependent_databases machine_params["output_dir"] = self.output_dir machine_params["output_database_dir"] = self.output_database_dir machine_params["input_files"] = self.input_files machine_params["ignored_runs"] = self.ignored_runs self.machine.setup_from_dict(machine_params) # Start moving through machine states B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.") self.machine.setup_algorithm(iteration=iteration) # After this point, the logging is in the stdout of the algorithm B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.") all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())) # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once if iov: runs_to_execute = runs_overlapping_iov(iov, all_runs_collected) else: runs_to_execute = all_runs_collected # Remove the ignored runs from our run list to execute if self.ignored_runs: B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.") runs_to_execute.difference_update(set(self.ignored_runs)) # Sets aren't ordered so lets go back to lists and sort runs_to_execute = sorted(runs_to_execute) # Is this the first time executing the algorithm? first_execution = True for exprun in runs_to_execute: if not first_execution: self.machine.setup_algorithm() current_runs = exprun apply_iov = iov_from_runs([current_runs]) B2INFO(f"Executing on IoV = {apply_iov}.") self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov) first_execution = False B2INFO(f"Finished execution with result code {self.machine.result.result}.") # Does this count as a successful execution? if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value): # Commit the payloads and result B2INFO(f"Committing payloads for {iov_from_runs([current_runs])}.") self.machine.algorithm.algorithm.commit() self.results.append(self.machine.result) self.send_result(self.machine.result) self.machine.complete() # If it wasn't successful, was it due to lack of data in the runs? elif (self.machine.result.result == AlgResult.not_enough_data.value): B2INFO(f"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.") self.results.append(self.machine.result) self.send_result(self.machine.result) self.machine.fail() elif self.machine.result.result == AlgResult.failure.value: B2ERROR(f"Failure exit code in the IoV {iov_from_runs([current_runs])}.") self.results.append(self.machine.result) self.send_result(self.machine.result) self.machine.fail() # Print any knowable gaps between result IoVs, if any are foun there is a problem. gaps = self.find_iov_gaps() # Dump them to a file for logging with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f: json.dump(gaps, f) self.send_final_state(self.COMPLETED)
[docs]class SequentialBoundaries(AlgorithmStrategy): """ Algorithm strategy to first calculate run boundaries where execution should be attempted. Runs the algorithm over the input data contained within the requested IoV of the boundaries, starting with the first boundary data only. If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads, but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun` but using run boundaries instead of runs directly. Notice that boundaries cannot span multiple experiments. By default the algorithm will get the payload boundaries directly from the algorithm that need to have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid the need to define the ``isBoundaryRequired`` function. ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each experiment will be added if not already present. An empty list will thus produce a single payload for each experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm. """ #: The params that you could set on the Algorithm object which this Strategy would use. #: Just here for documentation reasons. usable_params = { "iov_coverage": IoV, "payload_boundaries": [] # [(exp1, run1), (exp2, run2), ...] } #: Granularity of collector that can be run by this algorithm properly allowed_granularities = ["run"] def __init__(self, algorithm): """ """ super().__init__(algorithm) #: :py:class:`caf.state_machines.AlgorithmMachine` used to help set up and execute CalibrationAlgorithm #: It gets setup properly in :py:func:`run` self.machine = AlgorithmMachine(self.algorithm) self.first_execution = True
[docs] def run(self, iov, iteration, queue): """ Runs the algorithm machine over the collected data and fills the results. """ if not self.is_valid(): raise StrategyError("This AlgorithmStrategy was not set up correctly!") self.queue = queue B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.") # Now add all the necessary parameters for a strategy to run machine_params = {} machine_params["database_chain"] = self.database_chain machine_params["dependent_databases"] = self.dependent_databases machine_params["output_dir"] = self.output_dir machine_params["output_database_dir"] = self.output_database_dir machine_params["input_files"] = self.input_files machine_params["ignored_runs"] = self.ignored_runs self.machine.setup_from_dict(machine_params) # Start moving through machine states self.machine.setup_algorithm(iteration=iteration) # After this point, the logging is in the stdout of the algorithm B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.") runs_to_execute = [] all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()) # If we were given a specific IoV to calibrate we just execute over runs in that IoV if iov: runs_to_execute = runs_overlapping_iov(iov, all_runs_collected) else: runs_to_execute = all_runs_collected[:] # Remove the ignored runs from our run list to execute if self.ignored_runs: B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.") runs_to_execute.difference_update(set(self.ignored_runs)) # Sets aren't ordered so lets go back to lists and sort runs_to_execute = sorted(runs_to_execute) # We don't want to cross the boundary of Experiments accidentally. So we will split our run list # into separate lists, one for each experiment number contained. That way we can evaluate each experiment # separately and prevent IoVs from crossing the boundary. runs_to_execute = split_runs_by_exp(runs_to_execute) # Now iterate through the experiments. We DO NOT allow a payload IoV to # extend over multiple experiments, only multiple runs iov_coverage = None if "iov_coverage" in self.algorithm.params: B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.") iov_coverage = self.algorithm.params["iov_coverage"] payload_boundaries = None if "payload_boundaries" in self.algorithm.params: B2INFO(f"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.") payload_boundaries = self.algorithm.params["payload_boundaries"] number_of_experiments = len(runs_to_execute) B2INFO(f"We are iterating over {number_of_experiments} experiments.") # Iterate over experiment run lists for i_exp, run_list in enumerate(runs_to_execute, start=1): B2DEBUG(26, f"Run List for this experiment={run_list}") current_experiment = run_list[0].exp B2INFO(f"Executing over data from experiment {current_experiment}") # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage. # This is only true for the lowest and highest experiments in our input data. if i_exp == 1: if iov_coverage: lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low) else: lowest_exprun = run_list[0] # We are calibrating across multiple experiments so we shouldn't start from the middle but from the 0th run else: lowest_exprun = ExpRun(current_experiment, 0) # Override the normal value for the highest ExpRun (from data) if iov_coverage was set if iov_coverage and i_exp == number_of_experiments: highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high) # If we have more experiments to execute then we wil be setting the final payload IoV in this experiment # to be unbounded elif i_exp < number_of_experiments: highest_exprun = ExpRun(current_experiment, -1) # Otherwise just get the values from data else: highest_exprun = run_list[-1] # Find the boundaries for this experiment's runs vec_run_list = vector_from_runs(run_list) if payload_boundaries is None: # Find the boundaries using the findPayloadBoundaries implemented in the algorithm B2INFO("Attempting to find payload boundaries.") vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list) # If this vector is empty then that's bad. Maybe the isBoundaryRequired function # wasn't implemented? Either way we should stop. if vec_boundaries.empty(): B2ERROR("No boundaries found but we are in a strategy that requires them! Failing...") # Tell the Runner that we have failed self.send_final_state(self.FAILED) break vec_boundaries = runs_from_vector(vec_boundaries) else: # Using boundaries set by user B2INFO(f"Using as payload boundaries {payload_boundaries}.") vec_boundaries = [ExpRun(exp, run) for exp, run in payload_boundaries] # No need to check that vec_boundaries is not empty. In case it is we will anyway add # a boundary at the first run of each experiment. # Remove any boundaries not from the current experiment (only likely if they were set manually) # We sort just to make everything easier later and just in case something mad happened. run_boundaries = sorted([er for er in vec_boundaries if er.exp == current_experiment]) # In this strategy we consider separately each experiment. We then now check that the # boundary (exp, 0) is present and if not we add it. It is indeed possible to miss it # if the boundaries were given manually first_exprun = ExpRun(current_experiment, 0) if first_exprun not in run_boundaries: B2WARNING(f"No boundary found at ({current_experiment}, 0), adding it.") run_boundaries[0:0] = [first_exprun] B2INFO((f"Found {len(run_boundaries)} boundaries for this experiment. " "Checking if we have some data for all boundary IoVs...")) # First figure out the run lists to use for each execution (potentially different from the applied IoVs) # We use the boundaries and the run_list boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list) B2DEBUG(26, f"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}") # If there were any boundary IoVs with no run data, just remove them. Otherwise they will execute over all data. boundary_iovs_to_run_lists = {key: value for key, value in boundary_iovs_to_run_lists.items() if value} B2DEBUG(26, f"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}") # If any were removed then we might have gaps between the boundary IoVs. Fix those now by merging IoVs. new_boundary_iovs_to_run_lists = {} previous_boundary_iov = None previous_boundary_run_list = None for boundary_iov, run_list in boundary_iovs_to_run_lists.items(): if not previous_boundary_iov: previous_boundary_iov = boundary_iov previous_boundary_run_list = run_list continue # We are definitely dealiing with IoVs from one experiment so we can make assumptions here if previous_boundary_iov.run_high != (boundary_iov.run_low-1): B2WARNING("Gap in boundary IoVs found before execution! " "Will correct it by extending the previous boundary up to the next one.") B2INFO(f"Original boundary IoV={previous_boundary_iov}") previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low, previous_boundary_iov.exp_high, boundary_iov.run_low-1) B2INFO(f"New boundary IoV={previous_boundary_iov}") new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list previous_boundary_iov = boundary_iov previous_boundary_run_list = run_list else: new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists B2DEBUG(26, f"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}") # Actually execute now that we have an IoV list to apply success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration) if not success: # Tell the Runner that we have failed self.send_final_state(self.FAILED) break # Only executes if we didn't fail any experiment execution else: # Print any knowable gaps between result IoVs, if any are found there is a problem, but not necessarily too bad. gaps = self.find_iov_gaps() if gaps: B2WARNING("There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.") # Dump them to a file for logging with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f: json.dump(gaps, f) # If any results weren't successes we fail if self.any_failed_iov(): self.send_final_state(self.FAILED) else: self.send_final_state(self.COMPLETED)
[docs] def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration): """ Take the previously found boundaries and the run lists they correspond to and actually perform the Algorithm execution. This is assumed to be for a single experiment. """ # Copy of boundary IoVs remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:]) # The current runs we are executing current_runs = [] # The IoV of the current boundary(s) current_boundary_iov = None # The current execution's applied IoV, may be different to the boundary IoV current_iov = None # The last successful payload list and result. We hold on to them so that we can commit or discard later. last_successful_payloads = None last_successful_result = None # The previous execution's runs last_successful_runs = [] # The previous execution's applied IoV last_successful_iov = None while True: # Do we have previous successes? if not last_successful_result: if not current_runs: # Did we actually have any boundaries? if not remaining_boundary_iovs: # Fail because we have no boundaries to use B2ERROR("No boundaries found for the current experiment's run list. Failing the strategy.") return False B2INFO("This appears to be the first attempted execution of the experiment.") # Attempt to execute on the first boundary current_boundary_iov = remaining_boundary_iovs.pop(0) current_runs = boundary_iovs_to_run_lists[current_boundary_iov] # What if there is only one boundary? Need to apply the highest exprun if not remaining_boundary_iovs: current_iov = IoV(*lowest_exprun, *highest_exprun) else: current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high) # Returned not enough data from first execution else: # Any remaining boundaries? if not remaining_boundary_iovs: # Fail because we have no boundaries to use B2ERROR("Not enough data found for the current experiment's run list. Failing the strategy.") return False B2INFO("There wasn't enough data previously. Merging with the runs from the next boundary.") # Extend the previous run lists/iovs next_boundary_iov = remaining_boundary_iovs.pop(0) current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, next_boundary_iov.exp_high, next_boundary_iov.run_high) current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov]) # At the last boundary? Need to apply the highest exprun if not remaining_boundary_iovs: current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun) else: current_iov = IoV(current_iov.exp_low, current_iov.run_low, current_boundary_iov.exp_high, current_boundary_iov.run_high) self.execute_runs(current_runs, iteration, current_iov) # Does this count as a successful execution? if self.alg_success(): # Commit previous values we were holding onto B2INFO("Found a success. Will save the payloads for later.") # Save success last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues() last_successful_result = self.machine.result last_successful_runs = current_runs[:] last_successful_iov = current_iov # Reset values for next loop current_runs = [] current_boundary_iov = None current_iov = None self.machine.complete() continue elif self.machine.result.result == AlgResult.not_enough_data.value: B2INFO("Not Enough Data result.") # Just complete but leave the current runs alone for next loop self.machine.complete() continue else: B2ERROR("Hit a failure or some kind of result we can't continue from. Failing out...") self.machine.fail() return False # Previous result exists else: # Previous loop was a success if not current_runs: # Remaining boundaries? if not remaining_boundary_iovs: # Out of data, can now commit B2INFO("Finished this experiment's boundaries. " f"Committing remaining payloads from {last_successful_result.iov}") self.machine.algorithm.algorithm.commit(last_successful_payloads) self.results.append(last_successful_result) self.send_result(last_successful_result) return True # Remaining boundaries exist so we try to execute current_boundary_iov = remaining_boundary_iovs.pop(0) current_runs = boundary_iovs_to_run_lists[current_boundary_iov] # What if there is only one boundary? Need to apply the highest exprun if not remaining_boundary_iovs: current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun) else: current_iov = current_boundary_iov # Returned not enough data from last execution else: # Any remaining boundaries? if not remaining_boundary_iovs: B2INFO("We have no remaining runs to increase the amount of data. " "Instead we will merge with the previous successful runs.") # Merge with previous success IoV new_current_runs = last_successful_runs[:] new_current_runs.extend(current_runs) current_runs = new_current_runs[:] current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low, current_iov.exp_high, current_iov.run_high) # We reset the last successful stuff because we are dropping it last_successful_payloads = [] last_successful_result = None last_successful_runs = [] last_successful_iov = None else: B2INFO("Since there wasn't enough data previously, we will merge with the runs from the next boundary.") # Extend the previous run lists/iovs next_boundary_iov = remaining_boundary_iovs.pop(0) current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, next_boundary_iov.exp_high, next_boundary_iov.run_high) # Extend previous execution's runs with the next set current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov]) # At the last boundary? Need to apply the highest exprun if not remaining_boundary_iovs: current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun) else: current_iov = IoV(current_iov.exp_low, current_iov.run_low, current_boundary_iov.exp_high, current_boundary_iov.run_high) self.execute_runs(current_runs, iteration, current_iov) # Does this count as a successful execution? if self.alg_success(): # Commit previous values we were holding onto B2INFO("Found a success.") if last_successful_result: B2INFO("Can now commit the previous success.") self.machine.algorithm.algorithm.commit(last_successful_payloads) self.results.append(last_successful_result) self.send_result(last_successful_result) # Replace last success last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues() last_successful_result = self.machine.result last_successful_runs = current_runs[:] last_successful_iov = current_iov # Reset values for next loop current_runs = [] current_boundary_iov = None current_iov = None self.machine.complete() continue elif self.machine.result.result == AlgResult.not_enough_data.value: B2INFO("Not Enough Data result.") # Just complete but leave the current runs alone for next loop self.machine.complete() continue else: B2ERROR("Hit a failure or some other result we can't continue from. Failing out...") self.machine.fail() return False
def execute_runs(self, runs, iteration, iov): # Already set up earlier the first time, so we shouldn't do it again if not self.first_execution: self.machine.setup_algorithm() else: self.first_execution = False B2INFO(f"Executing and applying {iov} to the payloads.") self.machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov) B2INFO(f"Finished execution with result code {self.machine.result.result}.") def alg_success(self): return ((self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value))
class StrategyError(Exception): """ Basic Exception for this type of class. """