4 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
5 from caf.utils
import AlgResult
6 from caf.utils
import B2INFO_MULTILINE
7 from caf.utils
import runs_overlapping_iov, runs_from_vector
8 from caf.utils
import iov_from_runs, split_runs_by_exp, vector_from_runs
9 from caf.utils
import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
10 from caf.utils
import IoV, ExpRun
11 from caf.state_machines
import AlgorithmMachine
13 from abc
import ABC, abstractmethod
19 Base class for Algorithm strategies. These do the actual execution of a single
20 algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
21 how database payloads are passed between executions, and whether or not final payloads have an IoV
22 that is independent to the actual runs used to calculates them.
25 algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
27 This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
28 When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
30 If you define your derived class with an __init__ method, then you should first call the base class
31 `AlgorithmStrategy.__init__()` method via super() e.g.
33 >>> def __init__(self):
34 >>> super().__init__()
36 The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
37 in the required way defined by the options you have selected/attributes set.
41 required_attrs = [
"algorithm",
43 "dependent_databases",
45 "output_database_dir",
51 required_true_attrs = [
"algorithm",
53 "output_database_dir",
58 allowed_granularities = [
"run",
"all"]
61 FINISHED_RESULTS =
"DONE"
64 COMPLETED =
"COMPLETED"
93 def run(self, iov, iteration, queue):
95 Abstract method that needs to be implemented. It will be called to actually execute the
102 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
104 for attribute_name, value
in params.items():
105 setattr(self, attribute_name, value)
110 bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
112 B2INFO(
"Checking validity of current AlgorithmStrategy setup.")
115 if not hasattr(self, attribute_name):
116 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
120 if not getattr(self, attribute_name):
121 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} returned False.")
127 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
128 not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
129 within the same experiment are found.
134 iov_gaps = find_gaps_in_iov_list(sorted([result.iov
for result
in self.
results]))
136 gap_msg = [
"Found gaps between IoVs of algorithm results (regardless of result)."]
137 gap_msg.append(
"You may have requested these gaps deliberately by not passing in data containing these runs.")
138 gap_msg.append(
"This may not be a problem, but you will not have payoads defined for these IoVs")
139 gap_msg.append(
"unless you edit the final database.txt yourself.")
140 B2INFO_MULTILINE(gap_msg)
142 B2INFO(f
"{iov} not covered by any execution of the algorithm.")
148 bool: If any result in the current results list has a failed algorithm code we return True
152 if result.result == AlgResult.failure.value
or result.result == AlgResult.not_enough_data.value:
153 failed_results.append(result)
155 B2WARNING(
"Failed results found.")
156 for result
in failed_results:
157 if result.result == AlgResult.failure.value:
158 B2ERROR(f
"c_Failure returned for {result.iov}.")
159 elif result.result == AlgResult.not_enough_data.value:
160 B2WARNING(f
"c_NotEnoughData returned for {result.iov}.")
165 def send_result(self, result):
166 self.
queue.put({
"type":
"result",
"value": result})
168 def send_final_state(self, state):
169 self.
queue.put({
"type":
"final_state",
"value": state})
173 """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
174 data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
177 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
178 a CalibrationAlgorithm C++ class directly.
182 usable_params = {
"apply_iov": IoV}
192 def run(self, iov, iteration, queue):
194 Runs the algorithm machine over the collected data and fills the results.
197 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
200 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
205 machine_params[
"output_dir"] = self.
output_dir
211 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
212 self.
machine.setup_algorithm(iteration=iteration)
214 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
216 all_runs_collected = set(runs_from_vector(self.
algorithm.algorithm.getRunListFromAllData()))
219 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
221 runs_to_execute = all_runs_collected
225 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
226 runs_to_execute.difference_update(set(self.
ignored_runs))
228 runs_to_execute = sorted(runs_to_execute)
231 apply_iov = self.
algorithm.params[
"apply_iov"]
232 self.
machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
233 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
239 if (self.
machine.result.result == AlgResult.ok.value)
or (self.
machine.result.result == AlgResult.iterate.value):
243 self.
machine.algorithm.algorithm.commit()
253 Algorithm strategy to do run-by-run calibration of collected data.
254 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
255 If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
256 the next run's data and tries again.
258 Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
259 and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
260 is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
261 are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
263 Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
264 This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration.
266 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
267 a CalibrationAlgorithm C++ class directly.
272 "has_experiment_settings": bool,
278 allowed_granularities = [
"run"]
287 if "step_size" not in self.
algorithm.params:
293 Apply experiment-dependent settings.
294 This is the default version, which does not do anything.
295 If necessary, it should be reimplemented by derived classes.
299 def run(self, iov, iteration, queue):
301 Runs the algorithm machine over the collected data and fills the results.
303 if not self.is_valid():
304 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
306 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
309 machine_params[
"database_chain"] = self.database_chain
310 machine_params[
"dependent_databases"] = self.dependent_databases
311 machine_params[
"output_dir"] = self.output_dir
312 machine_params[
"output_database_dir"] = self.output_database_dir
313 machine_params[
"input_files"] = self.input_files
314 machine_params[
"ignored_runs"] = self.ignored_runs
317 self.machine.setup_algorithm(iteration=iteration)
319 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
321 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
324 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
326 runs_to_execute = all_runs_collected[:]
329 if self.ignored_runs:
330 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
331 runs_to_execute.difference_update(set(self.ignored_runs))
333 runs_to_execute = sorted(runs_to_execute)
338 runs_to_execute = split_runs_by_exp(runs_to_execute)
343 if "iov_coverage" in self.algorithm.params:
344 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
345 iov_coverage = self.algorithm.params[
"iov_coverage"]
347 number_of_experiments = len(runs_to_execute)
349 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
352 if "has_experiment_settings" in self.algorithm.params:
353 if self.algorithm.params[
"has_experiment_settings"]:
354 self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
361 lowest_exprun = ExpRun(run_list[0].exp, 0)
362 highest_exprun = ExpRun(run_list[-1].exp, -1)
365 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
if iov_coverage
else run_list[0]
366 if i_exp == number_of_experiments:
367 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
if iov_coverage
else run_list[-1]
369 self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
372 gaps = self.find_iov_gaps()
374 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
378 if self.any_failed_iov():
379 self.send_final_state(self.FAILED)
381 self.send_final_state(self.COMPLETED)
383 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
385 remaining_runs = run_list[:]
391 last_successful_payloads =
None
392 last_successful_result =
None
395 for expruns
in grouper(self.algorithm.params[
"step_size"], run_list):
397 if not self.first_execution:
398 self.machine.setup_algorithm()
400 self.first_execution =
False
403 current_runs.extend(expruns)
405 remaining_runs = [run
for run
in remaining_runs
if run
not in current_runs]
408 if not last_successful_result:
409 B2INFO(
"Detected that this will be the first payload of this experiment.")
413 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
416 B2INFO(
"Detected that this will be the only payload of the experiment.")
417 apply_iov = IoV(*lowest_exprun, *highest_exprun)
420 if not remaining_runs:
421 B2INFO(
"Detected that there are no more runs to execute in this experiment after this next execution.")
422 apply_iov = IoV(*current_runs[0], *highest_exprun)
425 B2INFO(
"Detected that there are more runs to execute in this experiment after this next execution.")
426 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
428 B2INFO(f
"Executing and applying {apply_iov} to the payloads.")
429 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
430 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
433 if (self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value):
434 self.machine.complete()
437 if last_successful_payloads
and last_successful_result:
438 B2INFO(
"Saving this execution's payloads to be committed later.")
440 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
441 new_successful_result = self.machine.result
442 B2INFO(
"We just succeded in execution of the Algorithm."
443 f
" Will now commit payloads from the previous success for {last_successful_result.iov}.")
444 self.machine.algorithm.algorithm.commit(last_successful_payloads)
445 self.results.append(last_successful_result)
446 self.send_result(last_successful_result)
449 last_successful_payloads = new_successful_payloads
450 last_successful_result = new_successful_result
453 B2INFO(
"We have no more runs to process. "
454 f
"Will now commit the most recent payloads for {new_successful_result.iov}.")
455 self.machine.algorithm.algorithm.commit(new_successful_payloads)
456 self.results.append(new_successful_result)
457 self.send_result(new_successful_result)
463 B2INFO(f
"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
465 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
466 last_successful_result = self.machine.result
469 B2INFO(
"We just succeeded in execution of the Algorithm."
470 " No runs left to be processed, so we are committing results of this execution.")
471 self.machine.algorithm.algorithm.commit()
472 self.results.append(self.machine.result)
473 self.send_result(self.machine.result)
476 previous_runs = current_runs[:]
479 elif (self.machine.result.result == AlgResult.not_enough_data.value):
480 B2INFO(f
"There wasn't enough data in {self.machine.result.iov}.")
482 B2INFO(
"Some runs remain to be processed. "
483 f
"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
484 elif not remaining_runs
and not last_successful_result:
485 B2ERROR(
"There aren't any more runs remaining to merge with, and we never had a previous success."
486 " There wasn't enough data in the full input data requested.")
487 self.results.append(self.machine.result)
488 self.send_result(self.machine.result)
491 elif not remaining_runs
and last_successful_result:
492 B2INFO(
"There aren't any more runs remaining to merge with. But we had a previous success"
493 ", so we'll merge with the previous IoV.")
494 final_runs = current_runs[:]
495 current_runs = previous_runs
496 current_runs.extend(final_runs)
498 elif self.machine.result.result == AlgResult.failure.value:
499 B2ERROR(f
"{self.algorithm.name} returned failure exit code.")
500 self.results.append(self.machine.result)
501 self.send_result(self.machine.result)
507 self.machine.setup_algorithm()
508 apply_iov = IoV(last_successful_result.iov.exp_low,
509 last_successful_result.iov.run_low,
511 B2INFO(f
"Executing on {apply_iov}.")
512 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
513 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
514 if (self.machine.result.result == AlgResult.ok.value)
or (
515 self.machine.result.result == AlgResult.iterate.value):
516 self.machine.complete()
518 self.machine.algorithm.algorithm.commit()
520 self.results.append(self.machine.result)
521 self.send_result(self.machine.result)
524 self.results.append(self.machine.result)
525 self.send_result(self.machine.result)
532 Algorithm strategy to do run-by-run calibration of collected data.
533 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
535 This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
536 'not enough data' on the current run.
538 Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
539 next run (if any are left).
540 Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
542 .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
544 The failure will prevent iteration/successful completion of the CAF though.
546 .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
547 enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
548 The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
549 However, you will still have the database created that covers all the successfull runs.
551 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
552 a CalibrationAlgorithm C++ class directly.
555 allowed_granularities = [
"run"]
568 def run(self, iov, iteration, queue):
570 Runs the algorithm machine over the collected data and fills the results.
574 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
577 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
582 machine_params[
"output_dir"] = self.
output_dir
588 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
589 self.
machine.setup_algorithm(iteration=iteration)
591 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
593 all_runs_collected = set(runs_from_vector(self.
algorithm.algorithm.getRunListFromAllData()))
596 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
598 runs_to_execute = all_runs_collected
602 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
603 runs_to_execute.difference_update(set(self.
ignored_runs))
605 runs_to_execute = sorted(runs_to_execute)
608 first_execution =
True
609 for exprun
in runs_to_execute:
610 if not first_execution:
612 current_runs = exprun
613 apply_iov = iov_from_runs([current_runs])
614 B2INFO(f
"Executing on IoV = {apply_iov}.")
615 self.
machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
616 first_execution =
False
617 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
619 if (self.
machine.result.result == AlgResult.ok.value)
or (self.
machine.result.result == AlgResult.iterate.value):
621 B2INFO(f
"Committing payloads for {iov_from_runs([current_runs])}.")
622 self.
machine.algorithm.algorithm.commit()
627 elif (self.
machine.result.result == AlgResult.not_enough_data.value):
628 B2INFO(f
"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
632 elif self.
machine.result.result == AlgResult.failure.value:
633 B2ERROR(f
"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
641 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
649 Algorithm strategy to first calculate run boundaries where execution should be attempted.
650 Runs the algorithm over the input data contained within the requested IoV of the boundaries,
651 starting with the first boundary data only.
652 If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
653 but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
654 but using run boundaries instead of runs directly.
655 Notice that boundaries cannot span multiple experiments.
657 By default the algorithm will get the payload boundaries directly from the algorithm that need to
658 have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
659 is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
660 the need to define the ``isBoundaryRequired`` function.
662 ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
663 experiment will be added if not already present. An empty list will thus produce a single payload for each
664 experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
665 behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
671 "payload_boundaries": []
675 allowed_granularities = [
"run"]
686 def run(self, iov, iteration, queue):
688 Runs the algorithm machine over the collected data and fills the results.
691 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
693 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
698 machine_params[
"output_dir"] = self.
output_dir
704 self.
machine.setup_algorithm(iteration=iteration)
706 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
708 all_runs_collected = runs_from_vector(self.
algorithm.algorithm.getRunListFromAllData())
711 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
713 runs_to_execute = all_runs_collected[:]
717 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
718 runs_to_execute.difference_update(set(self.
ignored_runs))
720 runs_to_execute = sorted(runs_to_execute)
725 runs_to_execute = split_runs_by_exp(runs_to_execute)
730 if "iov_coverage" in self.
algorithm.params:
731 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
732 iov_coverage = self.
algorithm.params[
"iov_coverage"]
734 payload_boundaries =
None
735 if "payload_boundaries" in self.
algorithm.params:
736 B2INFO(f
"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
737 payload_boundaries = self.
algorithm.params[
"payload_boundaries"]
739 number_of_experiments = len(runs_to_execute)
740 B2INFO(f
"We are iterating over {number_of_experiments} experiments.")
743 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
744 B2DEBUG(26, f
"Run List for this experiment={run_list}")
745 current_experiment = run_list[0].exp
746 B2INFO(f
"Executing over data from experiment {current_experiment}")
753 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
755 lowest_exprun = run_list[0]
758 lowest_exprun = ExpRun(current_experiment, 0)
761 if iov_coverage
and i_exp == number_of_experiments:
762 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
765 elif i_exp < number_of_experiments:
766 highest_exprun = ExpRun(current_experiment, -1)
769 highest_exprun = run_list[-1]
772 vec_run_list = vector_from_runs(run_list)
773 if payload_boundaries
is None:
775 B2INFO(
"Attempting to find payload boundaries.")
776 vec_boundaries = self.
algorithm.algorithm.findPayloadBoundaries(vec_run_list)
779 if vec_boundaries.empty():
780 B2ERROR(
"No boundaries found but we are in a strategy that requires them! Failing...")
784 vec_boundaries = runs_from_vector(vec_boundaries)
787 B2INFO(f
"Using as payload boundaries {payload_boundaries}.")
788 vec_boundaries = [ExpRun(exp, run)
for exp, run
in payload_boundaries]
793 run_boundaries = sorted([er
for er
in vec_boundaries
if er.exp == current_experiment])
797 first_exprun = ExpRun(current_experiment, 0)
798 if first_exprun
not in run_boundaries:
799 B2WARNING(f
"No boundary found at ({current_experiment}, 0), adding it.")
800 run_boundaries[0:0] = [first_exprun]
801 B2INFO((f
"Found {len(run_boundaries)} boundaries for this experiment. "
802 "Checking if we have some data for all boundary IoVs..."))
805 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
806 B2DEBUG(26, f
"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
808 boundary_iovs_to_run_lists = {key: value
for key, value
in boundary_iovs_to_run_lists.items()
if value}
809 B2DEBUG(26, f
"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
811 new_boundary_iovs_to_run_lists = {}
812 previous_boundary_iov =
None
813 previous_boundary_run_list =
None
814 for boundary_iov, run_list
in boundary_iovs_to_run_lists.items():
815 if not previous_boundary_iov:
816 previous_boundary_iov = boundary_iov
817 previous_boundary_run_list = run_list
820 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
821 B2WARNING(
"Gap in boundary IoVs found before execution! "
822 "Will correct it by extending the previous boundary up to the next one.")
823 B2INFO(f
"Original boundary IoV={previous_boundary_iov}")
824 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
825 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
826 B2INFO(f
"New boundary IoV={previous_boundary_iov}")
827 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
828 previous_boundary_iov = boundary_iov
829 previous_boundary_run_list = run_list
831 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
832 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
833 B2DEBUG(26, f
"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
845 B2WARNING(
"There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
847 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
858 Take the previously found boundaries and the run lists they correspond to and actually perform the
859 Algorithm execution. This is assumed to be for a single experiment.
862 remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
867 current_boundary_iov =
None
872 last_successful_payloads =
None
873 last_successful_result =
None
875 last_successful_runs = []
877 last_successful_iov =
None
881 if not last_successful_result:
884 if not remaining_boundary_iovs:
886 B2ERROR(
"No boundaries found for the current experiment's run list. Failing the strategy.")
889 B2INFO(
"This appears to be the first attempted execution of the experiment.")
891 current_boundary_iov = remaining_boundary_iovs.pop(0)
892 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
894 if not remaining_boundary_iovs:
895 current_iov = IoV(*lowest_exprun, *highest_exprun)
897 current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
901 if not remaining_boundary_iovs:
903 B2ERROR(
"Not enough data found for the current experiment's run list. Failing the strategy.")
906 B2INFO(
"There wasn't enough data previously. Merging with the runs from the next boundary.")
908 next_boundary_iov = remaining_boundary_iovs.pop(0)
909 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
910 next_boundary_iov.exp_high, next_boundary_iov.run_high)
911 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
913 if not remaining_boundary_iovs:
914 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
916 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
917 current_boundary_iov.exp_high, current_boundary_iov.run_high)
924 B2INFO(
"Found a success. Will save the payloads for later.")
926 last_successful_payloads = self.
machine.algorithm.algorithm.getPayloadValues()
927 last_successful_result = self.
machine.result
928 last_successful_runs = current_runs[:]
929 last_successful_iov = current_iov
932 current_boundary_iov =
None
936 elif self.
machine.result.result == AlgResult.not_enough_data.value:
937 B2INFO(
"Not Enough Data result.")
942 B2ERROR(
"Hit a failure or some kind of result we can't continue from. Failing out...")
950 if not remaining_boundary_iovs:
952 B2INFO(
"Finished this experiment's boundaries. "
953 f
"Committing remaining payloads from {last_successful_result.iov}")
954 self.
machine.algorithm.algorithm.commit(last_successful_payloads)
955 self.
results.append(last_successful_result)
960 current_boundary_iov = remaining_boundary_iovs.pop(0)
961 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
963 if not remaining_boundary_iovs:
964 current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
966 current_iov = current_boundary_iov
971 if not remaining_boundary_iovs:
972 B2INFO(
"We have no remaining runs to increase the amount of data. "
973 "Instead we will merge with the previous successful runs.")
975 new_current_runs = last_successful_runs[:]
976 new_current_runs.extend(current_runs)
977 current_runs = new_current_runs[:]
978 current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
979 current_iov.exp_high, current_iov.run_high)
981 last_successful_payloads = []
982 last_successful_result =
None
983 last_successful_runs = []
984 last_successful_iov =
None
987 B2INFO(
"Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
989 next_boundary_iov = remaining_boundary_iovs.pop(0)
990 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
991 next_boundary_iov.exp_high, next_boundary_iov.run_high)
993 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
995 if not remaining_boundary_iovs:
996 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
998 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
999 current_boundary_iov.exp_high, current_boundary_iov.run_high)
1001 self.
execute_runs(current_runs, iteration, current_iov)
1006 B2INFO(
"Found a success.")
1007 if last_successful_result:
1008 B2INFO(
"Can now commit the previous success.")
1009 self.
machine.algorithm.algorithm.commit(last_successful_payloads)
1010 self.
results.append(last_successful_result)
1013 last_successful_payloads = self.
machine.algorithm.algorithm.getPayloadValues()
1014 last_successful_result = self.
machine.result
1015 last_successful_runs = current_runs[:]
1016 last_successful_iov = current_iov
1019 current_boundary_iov =
None
1023 elif self.
machine.result.result == AlgResult.not_enough_data.value:
1024 B2INFO(
"Not Enough Data result.")
1029 B2ERROR(
"Hit a failure or some other result we can't continue from. Failing out...")
1033 def execute_runs(self, runs, iteration, iov):
1036 self.
machine.setup_algorithm()
1040 B2INFO(f
"Executing and applying {iov} to the payloads.")
1041 self.
machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1042 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
1044 def alg_success(self):
1045 return ((self.
machine.result.result == AlgResult.ok.value)
or (self.
machine.result.result == AlgResult.iterate.value))
1050 Basic Exception for this type of class.