4 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
5 from caf.utils
import AlgResult
6 from caf.utils
import B2INFO_MULTILINE
7 from caf.utils
import runs_overlapping_iov, runs_from_vector
8 from caf.utils
import iov_from_runs, split_runs_by_exp, vector_from_runs
9 from caf.utils
import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
10 from caf.utils
import IoV, ExpRun
11 from caf.state_machines
import AlgorithmMachine
13 from abc
import ABC, abstractmethod
19 Base class for Algorithm strategies. These do the actual execution of a single
20 algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
21 how database payloads are passed between executions, and whether or not final payloads have an IoV
22 that is independent to the actual runs used to calculates them.
25 algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
27 This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
28 When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
30 If you define your derived class with an __init__ method, then you should first call the base class
31 `AlgorithmStrategy.__init__()` method via super() e.g.
33 >>> def __init__(self):
34 >>> super().__init__()
36 The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
37 in the required way defined by the options you have selected/attributes set.
41 required_attrs = [
"algorithm",
43 "dependent_databases",
45 "output_database_dir",
51 required_true_attrs = [
"algorithm",
53 "output_database_dir",
58 allowed_granularities = [
"run",
"all"]
61 FINISHED_RESULTS =
"DONE"
64 COMPLETED =
"COMPLETED"
93 def run(self, iov, iteration, queue):
95 Abstract method that needs to be implemented. It will be called to actually execute the
102 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
104 for attribute_name, value
in params.items():
105 setattr(self, attribute_name, value)
110 bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
112 B2INFO(
"Checking validity of current AlgorithmStrategy setup.")
115 if not hasattr(self, attribute_name):
116 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
120 if not getattr(self, attribute_name):
121 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} returned False.")
127 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
128 not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
129 within the same experiment are found.
134 iov_gaps = find_gaps_in_iov_list(sorted([result.iov
for result
in self.
results]))
136 gap_msg = [
"Found gaps between IoVs of algorithm results (regardless of result)."]
137 gap_msg.append(
"You may have requested these gaps deliberately by not passing in data containing these runs.")
138 gap_msg.append(
"This may not be a problem, but you will not have payoads defined for these IoVs")
139 gap_msg.append(
"unless you edit the final database.txt yourself.")
140 B2INFO_MULTILINE(gap_msg)
142 B2INFO(f
"{iov} not covered by any execution of the algorithm.")
148 bool: If any result in the current results list has a failed algorithm code we return True
152 if result.result == AlgResult.failure.value
or result.result == AlgResult.not_enough_data.value:
153 failed_results.append(result)
155 B2WARNING(
"Failed results found.")
156 for result
in failed_results:
157 if result.result == AlgResult.failure.value:
158 B2ERROR(f
"c_Failure returned for {result.iov}.")
159 elif result.result == AlgResult.not_enough_data.value:
160 B2WARNING(f
"c_NotEnoughData returned for {result.iov}.")
165 def send_result(self, result):
166 self.
queue.put({
"type":
"result",
"value": result})
168 def send_final_state(self, state):
169 self.
queue.put({
"type":
"final_state",
"value": state})
173 """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
174 data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
177 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
178 a CalibrationAlgorithm C++ class directly.
182 usable_params = {
"apply_iov": IoV}
192 def run(self, iov, iteration, queue):
194 Runs the algorithm machine over the collected data and fills the results.
197 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
200 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
205 machine_params[
"output_dir"] = self.
output_dir
211 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
212 self.
machine.setup_algorithm(iteration=iteration)
214 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
216 all_runs_collected = set(runs_from_vector(self.
algorithm.algorithm.getRunListFromAllData()))
219 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
221 runs_to_execute = all_runs_collected
225 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
226 runs_to_execute.difference_update(set(self.
ignored_runs))
228 runs_to_execute = sorted(runs_to_execute)
231 apply_iov = self.
algorithm.params[
"apply_iov"]
232 self.
machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
233 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
239 if (self.
machine.result.result == AlgResult.ok.value)
or (self.
machine.result.result == AlgResult.iterate.value):
243 self.
machine.algorithm.algorithm.commit()
253 Algorithm strategy to do run-by-run calibration of collected data.
254 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
255 If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
256 the next run's data and tries again.
258 Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
259 and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
260 is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
261 are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
263 Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
264 This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration.
266 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
267 a CalibrationAlgorithm C++ class directly.
272 "has_experiment_settings": bool,
278 allowed_granularities = [
"run"]
287 if "step_size" not in self.
algorithm.params:
293 Apply experiment-dependent settings.
294 This is the default version, which does not do anything.
295 If necessary, it should be reimplemented by derived classes.
299 def run(self, iov, iteration, queue):
301 Runs the algorithm machine over the collected data and fills the results.
303 if not self.is_valid():
304 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
306 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
309 machine_params[
"database_chain"] = self.database_chain
310 machine_params[
"dependent_databases"] = self.dependent_databases
311 machine_params[
"output_dir"] = self.output_dir
312 machine_params[
"output_database_dir"] = self.output_database_dir
313 machine_params[
"input_files"] = self.input_files
314 machine_params[
"ignored_runs"] = self.ignored_runs
317 self.machine.setup_algorithm(iteration=iteration)
319 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
321 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
324 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
326 runs_to_execute = all_runs_collected[:]
329 if self.ignored_runs:
330 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
331 runs_to_execute.difference_update(set(self.ignored_runs))
333 runs_to_execute = sorted(runs_to_execute)
338 runs_to_execute = split_runs_by_exp(runs_to_execute)
343 if "iov_coverage" in self.algorithm.params:
344 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
345 iov_coverage = self.algorithm.params[
"iov_coverage"]
347 number_of_experiments = len(runs_to_execute)
349 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
352 if "has_experiment_settings" in self.algorithm.params:
353 if self.algorithm.params[
"has_experiment_settings"]:
354 self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
360 if iov_coverage
and i_exp == 1:
361 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
363 lowest_exprun = run_list[0]
365 if iov_coverage
and i_exp == number_of_experiments:
366 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
368 highest_exprun = run_list[-1]
370 self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
373 gaps = self.find_iov_gaps()
375 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
379 if self.any_failed_iov():
380 self.send_final_state(self.FAILED)
382 self.send_final_state(self.COMPLETED)
384 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
386 remaining_runs = run_list[:]
392 last_successful_payloads =
None
393 last_successful_result =
None
396 for expruns
in grouper(self.algorithm.params[
"step_size"], run_list):
398 if not self.first_execution:
399 self.machine.setup_algorithm()
401 self.first_execution =
False
404 current_runs.extend(expruns)
406 remaining_runs = [run
for run
in remaining_runs
if run
not in current_runs]
409 if not last_successful_result:
410 B2INFO(
"Detected that this will be the first payload of this experiment.")
414 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
417 B2INFO(
"Detected that this will be the only payload of the experiment.")
418 apply_iov = IoV(*lowest_exprun, *highest_exprun)
421 if not remaining_runs:
422 B2INFO(
"Detected that there are no more runs to execute in this experiment after this next execution.")
423 apply_iov = IoV(*current_runs[0], *highest_exprun)
426 B2INFO(
"Detected that there are more runs to execute in this experiment after this next execution.")
427 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
429 B2INFO(f
"Executing and applying {apply_iov} to the payloads.")
430 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
431 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
434 if (self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value):
435 self.machine.complete()
438 if last_successful_payloads
and last_successful_result:
439 B2INFO(
"Saving this execution's payloads to be committed later.")
441 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
442 new_successful_result = self.machine.result
443 B2INFO(
"We just succeded in execution of the Algorithm."
444 f
" Will now commit payloads from the previous success for {last_successful_result.iov}.")
445 self.machine.algorithm.algorithm.commit(last_successful_payloads)
446 self.results.append(last_successful_result)
447 self.send_result(last_successful_result)
450 last_successful_payloads = new_successful_payloads
451 last_successful_result = new_successful_result
454 B2INFO(
"We have no more runs to process. "
455 f
"Will now commit the most recent payloads for {new_successful_result.iov}.")
456 self.machine.algorithm.algorithm.commit(new_successful_payloads)
457 self.results.append(new_successful_result)
458 self.send_result(new_successful_result)
464 B2INFO(f
"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
466 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
467 last_successful_result = self.machine.result
470 B2INFO(
"We just succeeded in execution of the Algorithm."
471 " No runs left to be processed, so we are committing results of this execution.")
472 self.machine.algorithm.algorithm.commit()
473 self.results.append(self.machine.result)
474 self.send_result(self.machine.result)
477 previous_runs = current_runs[:]
480 elif (self.machine.result.result == AlgResult.not_enough_data.value):
481 B2INFO(f
"There wasn't enough data in {self.machine.result.iov}.")
483 B2INFO(
"Some runs remain to be processed. "
484 f
"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
485 elif not remaining_runs
and not last_successful_result:
486 B2ERROR(
"There aren't any more runs remaining to merge with, and we never had a previous success."
487 " There wasn't enough data in the full input data requested.")
488 self.results.append(self.machine.result)
489 self.send_result(self.machine.result)
492 elif not remaining_runs
and last_successful_result:
493 B2INFO(
"There aren't any more runs remaining to merge with. But we had a previous success"
494 ", so we'll merge with the previous IoV.")
495 final_runs = current_runs[:]
496 current_runs = previous_runs
497 current_runs.extend(final_runs)
499 elif self.machine.result.result == AlgResult.failure.value:
500 B2ERROR(f
"{self.algorithm.name} returned failure exit code.")
501 self.results.append(self.machine.result)
502 self.send_result(self.machine.result)
508 self.machine.setup_algorithm()
509 apply_iov = IoV(last_successful_result.iov.exp_low,
510 last_successful_result.iov.run_low,
512 B2INFO(f
"Executing on {apply_iov}.")
513 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
514 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
515 if (self.machine.result.result == AlgResult.ok.value)
or (
516 self.machine.result.result == AlgResult.iterate.value):
517 self.machine.complete()
519 self.machine.algorithm.algorithm.commit()
521 self.results.append(self.machine.result)
522 self.send_result(self.machine.result)
525 self.results.append(self.machine.result)
526 self.send_result(self.machine.result)
533 Algorithm strategy to do run-by-run calibration of collected data.
534 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
536 This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
537 'not enough data' on the current run.
539 Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
540 next run (if any are left).
541 Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
543 .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
545 The failure will prevent iteration/successful completion of the CAF though.
547 .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
548 enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
549 The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
550 However, you will still have the database created that covers all the successfull runs.
552 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
553 a CalibrationAlgorithm C++ class directly.
556 allowed_granularities = [
"run"]
569 def run(self, iov, iteration, queue):
571 Runs the algorithm machine over the collected data and fills the results.
575 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
578 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
583 machine_params[
"output_dir"] = self.
output_dir
589 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
590 self.
machine.setup_algorithm(iteration=iteration)
592 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
594 all_runs_collected = set(runs_from_vector(self.
algorithm.algorithm.getRunListFromAllData()))
597 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
599 runs_to_execute = all_runs_collected
603 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
604 runs_to_execute.difference_update(set(self.
ignored_runs))
606 runs_to_execute = sorted(runs_to_execute)
609 first_execution =
True
610 for exprun
in runs_to_execute:
611 if not first_execution:
613 current_runs = exprun
614 apply_iov = iov_from_runs([current_runs])
615 B2INFO(f
"Executing on IoV = {apply_iov}.")
616 self.
machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
617 first_execution =
False
618 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
620 if (self.
machine.result.result == AlgResult.ok.value)
or (self.
machine.result.result == AlgResult.iterate.value):
622 B2INFO(f
"Committing payloads for {iov_from_runs([current_runs])}.")
623 self.
machine.algorithm.algorithm.commit()
628 elif (self.
machine.result.result == AlgResult.not_enough_data.value):
629 B2INFO(f
"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
633 elif self.
machine.result.result == AlgResult.failure.value:
634 B2ERROR(f
"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
642 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
650 Algorithm strategy to first calculate run boundaries where execution should be attempted.
651 Runs the algorithm over the input data contained within the requested IoV of the boundaries,
652 starting with the first boundary data only.
653 If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
654 but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
655 but using run boundaries instead of runs directly.
656 Notice that boundaries cannot span multiple experiments.
658 By default the algorithm will get the payload boundaries directly from the algorithm that need to
659 have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
660 is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
661 the need to define the ``isBoundaryRequired`` function.
663 ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
664 experiment will be added if not already present. An empty list will thus produce a single payload for each
665 experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
666 behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
672 "payload_boundaries": []
676 allowed_granularities = [
"run"]
687 def run(self, iov, iteration, queue):
689 Runs the algorithm machine over the collected data and fills the results.
692 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
694 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
699 machine_params[
"output_dir"] = self.
output_dir
705 self.
machine.setup_algorithm(iteration=iteration)
707 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
709 all_runs_collected = runs_from_vector(self.
algorithm.algorithm.getRunListFromAllData())
712 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
714 runs_to_execute = all_runs_collected[:]
718 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
719 runs_to_execute.difference_update(set(self.
ignored_runs))
721 runs_to_execute = sorted(runs_to_execute)
726 runs_to_execute = split_runs_by_exp(runs_to_execute)
731 if "iov_coverage" in self.
algorithm.params:
732 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
733 iov_coverage = self.
algorithm.params[
"iov_coverage"]
735 payload_boundaries =
None
736 if "payload_boundaries" in self.
algorithm.params:
737 B2INFO(f
"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
738 payload_boundaries = self.
algorithm.params[
"payload_boundaries"]
740 number_of_experiments = len(runs_to_execute)
741 B2INFO(f
"We are iterating over {number_of_experiments} experiments.")
744 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
745 B2DEBUG(26, f
"Run List for this experiment={run_list}")
746 current_experiment = run_list[0].exp
747 B2INFO(f
"Executing over data from experiment {current_experiment}")
754 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
756 lowest_exprun = run_list[0]
759 lowest_exprun = ExpRun(current_experiment, 0)
762 if iov_coverage
and i_exp == number_of_experiments:
763 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
766 elif i_exp < number_of_experiments:
767 highest_exprun = ExpRun(current_experiment, -1)
770 highest_exprun = run_list[-1]
773 vec_run_list = vector_from_runs(run_list)
774 if payload_boundaries
is None:
776 B2INFO(
"Attempting to find payload boundaries.")
777 vec_boundaries = self.
algorithm.algorithm.findPayloadBoundaries(vec_run_list)
780 if vec_boundaries.empty():
781 B2ERROR(
"No boundaries found but we are in a strategy that requires them! Failing...")
785 vec_boundaries = runs_from_vector(vec_boundaries)
788 B2INFO(f
"Using as payload boundaries {payload_boundaries}.")
789 vec_boundaries = [ExpRun(exp, run)
for exp, run
in payload_boundaries]
794 run_boundaries = sorted([er
for er
in vec_boundaries
if er.exp == current_experiment])
798 first_exprun = ExpRun(current_experiment, 0)
799 if first_exprun
not in run_boundaries:
800 B2WARNING(f
"No boundary found at ({current_experiment}, 0), adding it.")
801 run_boundaries[0:0] = [first_exprun]
802 B2INFO((f
"Found {len(run_boundaries)} boundaries for this experiment. "
803 "Checking if we have some data for all boundary IoVs..."))
806 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
807 B2DEBUG(26, f
"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
809 boundary_iovs_to_run_lists = {key: value
for key, value
in boundary_iovs_to_run_lists.items()
if value}
810 B2DEBUG(26, f
"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
812 new_boundary_iovs_to_run_lists = {}
813 previous_boundary_iov =
None
814 previous_boundary_run_list =
None
815 for boundary_iov, run_list
in boundary_iovs_to_run_lists.items():
816 if not previous_boundary_iov:
817 previous_boundary_iov = boundary_iov
818 previous_boundary_run_list = run_list
821 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
822 B2WARNING(
"Gap in boundary IoVs found before execution! "
823 "Will correct it by extending the previous boundary up to the next one.")
824 B2INFO(f
"Original boundary IoV={previous_boundary_iov}")
825 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
826 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
827 B2INFO(f
"New boundary IoV={previous_boundary_iov}")
828 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
829 previous_boundary_iov = boundary_iov
830 previous_boundary_run_list = run_list
832 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
833 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
834 B2DEBUG(26, f
"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
846 B2WARNING(
"There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
848 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
859 Take the previously found boundaries and the run lists they correspond to and actually perform the
860 Algorithm execution. This is assumed to be for a single experiment.
863 remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
868 current_boundary_iov =
None
873 last_successful_payloads =
None
874 last_successful_result =
None
876 last_successful_runs = []
878 last_successful_iov =
None
882 if not last_successful_result:
885 if not remaining_boundary_iovs:
887 B2ERROR(
"No boundaries found for the current experiment's run list. Failing the strategy.")
890 B2INFO(
"This appears to be the first attempted execution of the experiment.")
892 current_boundary_iov = remaining_boundary_iovs.pop(0)
893 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
895 if not remaining_boundary_iovs:
896 current_iov = IoV(*lowest_exprun, *highest_exprun)
898 current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
902 if not remaining_boundary_iovs:
904 B2ERROR(
"Not enough data found for the current experiment's run list. Failing the strategy.")
907 B2INFO(
"There wasn't enough data previously. Merging with the runs from the next boundary.")
909 next_boundary_iov = remaining_boundary_iovs.pop(0)
910 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
911 next_boundary_iov.exp_high, next_boundary_iov.run_high)
912 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
914 if not remaining_boundary_iovs:
915 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
917 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
918 current_boundary_iov.exp_high, current_boundary_iov.run_high)
925 B2INFO(
"Found a success. Will save the payloads for later.")
927 last_successful_payloads = self.
machine.algorithm.algorithm.getPayloadValues()
928 last_successful_result = self.
machine.result
929 last_successful_runs = current_runs[:]
930 last_successful_iov = current_iov
933 current_boundary_iov =
None
937 elif self.
machine.result.result == AlgResult.not_enough_data.value:
938 B2INFO(
"Not Enough Data result.")
943 B2ERROR(
"Hit a failure or some kind of result we can't continue from. Failing out...")
951 if not remaining_boundary_iovs:
953 B2INFO(
"Finished this experiment's boundaries. "
954 f
"Committing remaining payloads from {last_successful_result.iov}")
955 self.
machine.algorithm.algorithm.commit(last_successful_payloads)
956 self.
results.append(last_successful_result)
961 current_boundary_iov = remaining_boundary_iovs.pop(0)
962 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
964 if not remaining_boundary_iovs:
965 current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
967 current_iov = current_boundary_iov
972 if not remaining_boundary_iovs:
973 B2INFO(
"We have no remaining runs to increase the amount of data. "
974 "Instead we will merge with the previous successful runs.")
976 new_current_runs = last_successful_runs[:]
977 new_current_runs.extend(current_runs)
978 current_runs = new_current_runs[:]
979 current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
980 current_iov.exp_high, current_iov.run_high)
982 last_successful_payloads = []
983 last_successful_result =
None
984 last_successful_runs = []
985 last_successful_iov =
None
988 B2INFO(
"Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
990 next_boundary_iov = remaining_boundary_iovs.pop(0)
991 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
992 next_boundary_iov.exp_high, next_boundary_iov.run_high)
994 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
996 if not remaining_boundary_iovs:
997 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
999 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1000 current_boundary_iov.exp_high, current_boundary_iov.run_high)
1002 self.
execute_runs(current_runs, iteration, current_iov)
1007 B2INFO(
"Found a success.")
1008 if last_successful_result:
1009 B2INFO(
"Can now commit the previous success.")
1010 self.
machine.algorithm.algorithm.commit(last_successful_payloads)
1011 self.
results.append(last_successful_result)
1014 last_successful_payloads = self.
machine.algorithm.algorithm.getPayloadValues()
1015 last_successful_result = self.
machine.result
1016 last_successful_runs = current_runs[:]
1017 last_successful_iov = current_iov
1020 current_boundary_iov =
None
1024 elif self.
machine.result.result == AlgResult.not_enough_data.value:
1025 B2INFO(
"Not Enough Data result.")
1030 B2ERROR(
"Hit a failure or some other result we can't continue from. Failing out...")
1034 def execute_runs(self, runs, iteration, iov):
1037 self.
machine.setup_algorithm()
1041 B2INFO(f
"Executing and applying {iov} to the payloads.")
1042 self.
machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1043 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
1045 def alg_success(self):
1046 return ((self.
machine.result.result == AlgResult.ok.value)
or (self.
machine.result.result == AlgResult.iterate.value))
1051 Basic Exception for this type of class.