14from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
15from caf.utils
import AlgResult
16from caf.utils
import B2INFO_MULTILINE
17from caf.utils
import runs_overlapping_iov, runs_from_vector
18from caf.utils
import iov_from_runs, split_runs_by_exp, vector_from_runs
19from caf.utils
import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
20from caf.utils
import IoV, ExpRun
21from caf.state_machines
import AlgorithmMachine
23from abc
import ABC, abstractmethod
27class AlgorithmStrategy(ABC):
29 Base class for Algorithm
strategies. These do the actual execution of a single
30 algorithm on collected data. Each strategy may be quite different
in terms of how fast it may be,
31 how database payloads are passed between executions,
and whether
or not final payloads have an IoV
32 that
is independent to the actual runs used to calculates them.
35 algorithm (:py:
class:`caf.framework.Algorithm`): The algorithm we will run
37 This base
class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
38 When defining a derived
class you are free to use these attributes or to implement as much functionality as you want.
40 If you define your derived
class with an __init__ method, then you should first call the base class `AlgorithmStrategy.__init__()` method via super() e.g.
42 >>>
def __init__(self):
43 >>> super().__init__()
45 The most important method to implement
is :py:meth:`AlgorithmStrategy.run` which will take an algorithm
and execute it
46 in the required way defined by the options you have selected/attributes set.
50 required_attrs = ["algorithm",
52 "dependent_databases",
54 "output_database_dir",
60 required_true_attrs = [
"algorithm",
62 "output_database_dir",
67 allowed_granularities = [
"run",
"all"]
70 FINISHED_RESULTS =
"DONE"
73 COMPLETED =
"COMPLETED"
78 def __init__(self, algorithm):
82 self.algorithm = algorithm
88 self.output_database_dir =
""
90 self.database_chain = []
92 self.dependent_databases = []
95 self.ignored_runs = []
102 def run(self, iov, iteration, queue):
104 Abstract method that needs to be implemented. It will be called to actually execute the
108 def setup_from_dict(self, params):
111 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
113 for attribute_name, value
in params.items():
114 setattr(self, attribute_name, value)
119 bool: Whether or not this strategy has been set up correctly
with all its necessary attributes.
121 B2INFO("Checking validity of current AlgorithmStrategy setup.")
123 for attribute_name
in self.required_attrs:
124 if not hasattr(self, attribute_name):
125 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
128 for attribute_name
in self.required_true_attrs:
129 if not getattr(self, attribute_name):
130 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} returned False.")
134 def find_iov_gaps(self):
136 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
137 not covered by any payload. It CANNOT find gaps
if they exist across an experiment boundary. Only gaps
138 within the same experiment are found.
143 iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result
in self.results]))
145 gap_msg = [
"Found gaps between IoVs of algorithm results (regardless of result)."]
146 gap_msg.append(
"You may have requested these gaps deliberately by not passing in data containing these runs.")
147 gap_msg.append(
"This may not be a problem, but you will not have payoads defined for these IoVs")
148 gap_msg.append(
"unless you edit the final database.txt yourself.")
149 B2INFO_MULTILINE(gap_msg)
151 B2INFO(f
"{iov} not covered by any execution of the algorithm.")
154 def any_failed_iov(self):
157 bool: If any result in the current results list has a failed algorithm code we
return True
160 for result
in self.results:
161 if result.result == AlgResult.failure.value
or result.result == AlgResult.not_enough_data.value:
162 failed_results.append(result)
164 B2WARNING(
"Failed results found.")
165 for result
in failed_results:
166 if result.result == AlgResult.failure.value:
167 B2ERROR(f
"c_Failure returned for {result.iov}.")
168 elif result.result == AlgResult.not_enough_data.value:
169 B2WARNING(f
"c_NotEnoughData returned for {result.iov}.")
174 def send_result(self, result):
175 self.queue.put({
"type":
"result",
"value": result})
177 def send_final_state(self, state):
178 self.queue.put({
"type":
"final_state",
"value": state})
181class SingleIOV(AlgorithmStrategy):
182 """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
183 data or only the data corresponding to the requested IoV. The payload IoV
is the set to the same
as the one
186 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
187 a CalibrationAlgorithm C++
class directly.
191 usable_params = {"apply_iov": IoV}
193 def __init__(self, algorithm):
196 super().__init__(algorithm)
199 self.machine = AlgorithmMachine(self.algorithm)
201 def run(self, iov, iteration, queue):
203 Runs the algorithm machine over the collected data and fills the results.
205 if not self.is_valid():
206 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
209 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
212 machine_params[
"database_chain"] = self.database_chain
213 machine_params[
"dependent_databases"] = self.dependent_databases
214 machine_params[
"output_dir"] = self.output_dir
215 machine_params[
"output_database_dir"] = self.output_database_dir
216 machine_params[
"input_files"] = self.input_files
217 machine_params[
"ignored_runs"] = self.ignored_runs
218 self.machine.setup_from_dict(machine_params)
220 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
221 self.machine.setup_algorithm(iteration=iteration)
223 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
225 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
228 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
230 runs_to_execute = all_runs_collected
233 if self.ignored_runs:
234 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
235 runs_to_execute.difference_update(set(self.ignored_runs))
237 runs_to_execute = sorted(runs_to_execute)
239 if "apply_iov" in self.algorithm.params:
240 apply_iov = self.algorithm.params[
"apply_iov"]
241 self.machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
242 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
245 self.send_result(self.machine.result)
248 if (self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value):
250 self.machine.complete()
252 self.machine.algorithm.algorithm.commit()
253 self.send_final_state(self.COMPLETED)
257 self.send_final_state(self.FAILED)
260class SequentialRunByRun(AlgorithmStrategy):
262 Algorithm strategy to do run-by-run calibration of collected data.
263 Runs the algorithm over the input data contained within the requested IoV, starting with the first run
's data only. If the algorithm returns 'not enough data' on the current run set, it won
't commit the payloads, but instead adds
264 the next run's data and tries again.
266 Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (
if any are left)
267 and start the same procedure again. Committing of payloads to the outputdb only happens once we
're sure that there
268 is enough data
in the remaining runs to get a full execution. If there isn
't enough data remaining, the last runs
269 are merged with the previous successful execution
's runs and a final execution is performed on all remaining runs.
271 Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
272 This means that there shouldn
't be any IoVs that don't get a new payload by the end of running an iteration.
274 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
275 a CalibrationAlgorithm C++
class directly.
280 "has_experiment_settings": bool,
286 allowed_granularities = [
"run"]
288 def __init__(self, algorithm):
291 super().__init__(algorithm)
294 self.machine = AlgorithmMachine(self.algorithm)
295 if "step_size" not in self.algorithm.params:
296 self.algorithm.params[
"step_size"] = 1
297 self.first_execution =
True
299 def apply_experiment_settings(self, algorithm, experiment):
301 Apply experiment-dependent settings.
302 This is the default version, which does
not do anything.
303 If necessary, it should be reimplemented by derived classes.
307 def run(self, iov, iteration, queue):
309 Runs the algorithm machine over the collected data and fills the results.
311 if not self.is_valid():
312 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
314 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
317 machine_params[
"database_chain"] = self.database_chain
318 machine_params[
"dependent_databases"] = self.dependent_databases
319 machine_params[
"output_dir"] = self.output_dir
320 machine_params[
"output_database_dir"] = self.output_database_dir
321 machine_params[
"input_files"] = self.input_files
322 machine_params[
"ignored_runs"] = self.ignored_runs
323 self.machine.setup_from_dict(machine_params)
325 self.machine.setup_algorithm(iteration=iteration)
327 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
329 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
332 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
334 runs_to_execute = all_runs_collected[:]
337 if self.ignored_runs:
338 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
339 runs_to_execute.difference_update(set(self.ignored_runs))
341 runs_to_execute = sorted(runs_to_execute)
346 runs_to_execute = split_runs_by_exp(runs_to_execute)
351 if "iov_coverage" in self.algorithm.params:
352 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
353 iov_coverage = self.algorithm.params[
"iov_coverage"]
355 number_of_experiments = len(runs_to_execute)
357 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
360 if "has_experiment_settings" in self.algorithm.params:
361 if self.algorithm.params[
"has_experiment_settings"]:
362 self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
369 lowest_exprun = ExpRun(run_list[0].exp, 0)
370 highest_exprun = ExpRun(run_list[-1].exp, -1)
373 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
if iov_coverage
else run_list[0]
374 if i_exp == number_of_experiments:
375 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
if iov_coverage
else run_list[-1]
377 self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
380 gaps = self.find_iov_gaps()
382 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
386 if self.any_failed_iov():
387 self.send_final_state(self.FAILED)
389 self.send_final_state(self.COMPLETED)
391 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
393 remaining_runs = run_list[:]
399 last_successful_payloads =
None
400 last_successful_result =
None
403 for expruns
in grouper(self.algorithm.params[
"step_size"], run_list):
405 if not self.first_execution:
406 self.machine.setup_algorithm()
408 self.first_execution =
False
411 current_runs.extend(expruns)
413 remaining_runs = [run
for run
in remaining_runs
if run
not in current_runs]
416 if not last_successful_result:
417 B2INFO(
"Detected that this will be the first payload of this experiment.")
421 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
424 B2INFO(
"Detected that this will be the only payload of the experiment.")
425 apply_iov = IoV(*lowest_exprun, *highest_exprun)
428 if not remaining_runs:
429 B2INFO(
"Detected that there are no more runs to execute in this experiment after this next execution.")
430 apply_iov = IoV(*current_runs[0], *highest_exprun)
433 B2INFO(
"Detected that there are more runs to execute in this experiment after this next execution.")
434 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
436 B2INFO(f
"Executing and applying {apply_iov} to the payloads.")
437 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
438 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
441 if (self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value):
442 self.machine.complete()
445 if last_successful_payloads
and last_successful_result:
446 B2INFO(
"Saving this execution's payloads to be committed later.")
448 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
449 new_successful_result = self.machine.result
450 B2INFO(
"We just succeeded in execution of the Algorithm."
451 f
" Will now commit payloads from the previous success for {last_successful_result.iov}.")
452 self.machine.algorithm.algorithm.commit(last_successful_payloads)
453 self.results.append(last_successful_result)
454 self.send_result(last_successful_result)
457 last_successful_payloads = new_successful_payloads
458 last_successful_result = new_successful_result
461 B2INFO(
"We have no more runs to process. "
462 f
"Will now commit the most recent payloads for {new_successful_result.iov}.")
463 self.machine.algorithm.algorithm.commit(new_successful_payloads)
464 self.results.append(new_successful_result)
465 self.send_result(new_successful_result)
471 B2INFO(f
"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
473 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
474 last_successful_result = self.machine.result
477 B2INFO(
"We just succeeded in execution of the Algorithm."
478 " No runs left to be processed, so we are committing results of this execution.")
479 self.machine.algorithm.algorithm.commit()
480 self.results.append(self.machine.result)
481 self.send_result(self.machine.result)
484 previous_runs = current_runs[:]
487 elif (self.machine.result.result == AlgResult.not_enough_data.value):
488 B2INFO(f
"There wasn't enough data in {self.machine.result.iov}.")
490 B2INFO(
"Some runs remain to be processed. "
491 f
"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
492 elif not remaining_runs
and not last_successful_result:
493 B2ERROR(
"There aren't any more runs remaining to merge with, and we never had a previous success."
494 " There wasn't enough data in the full input data requested.")
495 self.results.append(self.machine.result)
496 self.send_result(self.machine.result)
499 elif not remaining_runs
and last_successful_result:
500 B2INFO(
"There aren't any more runs remaining to merge with. But we had a previous success"
501 ", so we'll merge with the previous IoV.")
502 final_runs = current_runs[:]
503 current_runs = previous_runs
504 current_runs.extend(final_runs)
506 elif self.machine.result.result == AlgResult.failure.value:
507 B2ERROR(f
"{self.algorithm.name} returned failure exit code.")
508 self.results.append(self.machine.result)
509 self.send_result(self.machine.result)
515 self.machine.setup_algorithm()
516 apply_iov = IoV(last_successful_result.iov.exp_low,
517 last_successful_result.iov.run_low,
519 B2INFO(f
"Executing on {apply_iov}.")
520 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
521 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
522 if (self.machine.result.result == AlgResult.ok.value)
or (
523 self.machine.result.result == AlgResult.iterate.value):
524 self.machine.complete()
526 self.machine.algorithm.algorithm.commit()
528 self.results.append(self.machine.result)
529 self.send_result(self.machine.result)
532 self.results.append(self.machine.result)
533 self.send_result(self.machine.result)
538class SimpleRunByRun(AlgorithmStrategy):
540 Algorithm strategy to do run-by-run calibration of collected data.
541 Runs the algorithm over the input data contained within the requested IoV, starting with the first run
's data only.
542 This strategy differs from `SequentialRunByRun`
in that it *will
not merge run data*
if the algorithm returns
543 'not enough data' on the current run.
545 Once an execution on a run returns *any* result
'iterate',
'ok',
'not_enough_data',
or 'failure', we move onto the
546 next run (
if any are left).
547 Committing of payloads to the outputdb only happens
for 'iterate' or 'ok' return codes.
549 .. important:: Unlike most other strategies, this one won
't immediately fail and return if a run returns a 'failure
' exit
551 The failure will prevent iteration/successful completion of the CAF though.
553 .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
554 enough data to complete the algorithm successfully, you won
't be able to get a successful calibration.
555 The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
556 However, you will still have the database created that covers all the successful runs.
558 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
559 a CalibrationAlgorithm C++ class directly.
562 allowed_granularities = ["run"]
567 def __init__(self, algorithm):
570 super().__init__(algorithm)
573 self.machine = AlgorithmMachine(self.algorithm)
575 def run(self, iov, iteration, queue):
577 Runs the algorithm machine over the collected data and fills the results.
580 if not self.is_valid():
581 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
584 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
587 machine_params[
"database_chain"] = self.database_chain
588 machine_params[
"dependent_databases"] = self.dependent_databases
589 machine_params[
"output_dir"] = self.output_dir
590 machine_params[
"output_database_dir"] = self.output_database_dir
591 machine_params[
"input_files"] = self.input_files
592 machine_params[
"ignored_runs"] = self.ignored_runs
593 self.machine.setup_from_dict(machine_params)
595 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
596 self.machine.setup_algorithm(iteration=iteration)
598 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
600 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
603 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
605 runs_to_execute = all_runs_collected
608 if self.ignored_runs:
609 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
610 runs_to_execute.difference_update(set(self.ignored_runs))
612 runs_to_execute = sorted(runs_to_execute)
615 first_execution =
True
616 for exprun
in runs_to_execute:
617 if not first_execution:
618 self.machine.setup_algorithm()
619 current_runs = exprun
620 apply_iov = iov_from_runs([current_runs])
621 B2INFO(f
"Executing on IoV = {apply_iov}.")
622 self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
623 first_execution =
False
624 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
626 if (self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value):
628 B2INFO(f
"Committing payloads for {iov_from_runs([current_runs])}.")
629 self.machine.algorithm.algorithm.commit()
630 self.results.append(self.machine.result)
631 self.send_result(self.machine.result)
632 self.machine.complete()
634 elif (self.machine.result.result == AlgResult.not_enough_data.value):
635 B2INFO(f
"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
636 self.results.append(self.machine.result)
637 self.send_result(self.machine.result)
639 elif self.machine.result.result == AlgResult.failure.value:
640 B2ERROR(f
"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
641 self.results.append(self.machine.result)
642 self.send_result(self.machine.result)
646 gaps = self.find_iov_gaps()
648 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
651 self.send_final_state(self.COMPLETED)
654class SequentialBoundaries(AlgorithmStrategy):
656 Algorithm strategy to first calculate run boundaries where execution should be attempted.
657 Runs the algorithm over the input data contained within the requested IoV of the boundaries,
658 starting with the first boundary data only.
659 If the algorithm returns
'not enough data' on the current boundary IoV, it won
't commit the payloads, but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
660 but using run boundaries instead of runs directly.
661 Notice that boundaries cannot span multiple experiments.
663 By default the algorithm will get the payload boundaries directly from the algorithm that need to
664 have implemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
665 is possible to
pass them directly setting the algorithm parameter ``payload_boundaries``
and avoid
666 the need to define the ``isBoundaryRequired`` function.
668 ``payload_boundaries``
is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
669 experiment will be added
if not already present. An empty list will thus produce a single payload
for each
670 experiment. A ``payload_boundaries`` set to ``
None``
is equivalent to
not passing it
and restores the default
671 behaviour where the boundaries are computed
in the ``isBoundaryRequired`` function of the algorithm.
677 "payload_boundaries": []
681 allowed_granularities = [
"run"]
683 def __init__(self, algorithm):
686 super().__init__(algorithm)
689 self.machine = AlgorithmMachine(self.algorithm)
690 self.first_execution = True
692 def run(self, iov, iteration, queue):
694 Runs the algorithm machine over the collected data and fills the results.
696 if not self.is_valid():
697 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
699 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
702 machine_params[
"database_chain"] = self.database_chain
703 machine_params[
"dependent_databases"] = self.dependent_databases
704 machine_params[
"output_dir"] = self.output_dir
705 machine_params[
"output_database_dir"] = self.output_database_dir
706 machine_params[
"input_files"] = self.input_files
707 machine_params[
"ignored_runs"] = self.ignored_runs
708 self.machine.setup_from_dict(machine_params)
710 self.machine.setup_algorithm(iteration=iteration)
712 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
714 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
717 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
719 runs_to_execute = all_runs_collected[:]
722 if self.ignored_runs:
723 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
724 runs_to_execute.difference_update(set(self.ignored_runs))
726 runs_to_execute = sorted(runs_to_execute)
731 runs_to_execute = split_runs_by_exp(runs_to_execute)
736 if "iov_coverage" in self.algorithm.params:
737 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
738 iov_coverage = self.algorithm.params[
"iov_coverage"]
740 payload_boundaries =
None
741 if "payload_boundaries" in self.algorithm.params:
742 B2INFO(f
"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
743 payload_boundaries = self.algorithm.params[
"payload_boundaries"]
745 number_of_experiments = len(runs_to_execute)
746 B2INFO(f
"We are iterating over {number_of_experiments} experiments.")
749 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
750 B2DEBUG(26, f
"Run List for this experiment={run_list}")
751 current_experiment = run_list[0].exp
752 B2INFO(f
"Executing over data from experiment {current_experiment}")
759 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
761 lowest_exprun = run_list[0]
764 lowest_exprun = ExpRun(current_experiment, 0)
767 if iov_coverage
and i_exp == number_of_experiments:
768 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
771 elif i_exp < number_of_experiments:
772 highest_exprun = ExpRun(current_experiment, -1)
775 highest_exprun = run_list[-1]
778 vec_run_list = vector_from_runs(run_list)
779 if payload_boundaries
is None:
781 B2INFO(
"Attempting to find payload boundaries.")
782 vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list)
785 if vec_boundaries.empty():
786 B2ERROR(
"No boundaries found but we are in a strategy that requires them! Failing...")
788 self.send_final_state(self.FAILED)
790 vec_boundaries = runs_from_vector(vec_boundaries)
793 B2INFO(f
"Using as payload boundaries {payload_boundaries}.")
794 vec_boundaries = [ExpRun(exp, run)
for exp, run
in payload_boundaries]
799 run_boundaries = sorted([er
for er
in vec_boundaries
if er.exp == current_experiment])
803 first_exprun = ExpRun(current_experiment, 0)
804 if first_exprun
not in run_boundaries:
805 B2WARNING(f
"No boundary found at ({current_experiment}, 0), adding it.")
806 run_boundaries[0:0] = [first_exprun]
807 B2INFO(f
"Found {len(run_boundaries)} boundaries for this experiment. "
808 "Checking if we have some data for all boundary IoVs...")
811 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
812 B2DEBUG(26, f
"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
814 boundary_iovs_to_run_lists = {key: value
for key, value
in boundary_iovs_to_run_lists.items()
if value}
815 B2DEBUG(26, f
"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
817 new_boundary_iovs_to_run_lists = {}
818 previous_boundary_iov =
None
819 previous_boundary_run_list =
None
820 for boundary_iov, run_list
in boundary_iovs_to_run_lists.items():
821 if not previous_boundary_iov:
822 previous_boundary_iov = boundary_iov
823 previous_boundary_run_list = run_list
826 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
827 B2WARNING(
"Gap in boundary IoVs found before execution! "
828 "Will correct it by extending the previous boundary up to the next one.")
829 B2INFO(f
"Original boundary IoV={previous_boundary_iov}")
830 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
831 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
832 B2INFO(f
"New boundary IoV={previous_boundary_iov}")
833 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
834 previous_boundary_iov = boundary_iov
835 previous_boundary_run_list = run_list
837 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
838 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
839 B2DEBUG(26, f
"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
841 success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
844 self.send_final_state(self.FAILED)
849 gaps = self.find_iov_gaps()
851 B2WARNING(
"There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
853 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
857 if self.any_failed_iov():
858 self.send_final_state(self.FAILED)
860 self.send_final_state(self.COMPLETED)
862 def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration):
864 Take the previously found boundaries and the run lists they correspond to
and actually perform the
865 Algorithm execution. This
is assumed to be
for a single experiment.
868 remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
873 current_boundary_iov =
None
878 last_successful_payloads =
None
879 last_successful_result =
None
881 last_successful_runs = []
883 last_successful_iov =
None
887 if not last_successful_result:
890 if not remaining_boundary_iovs:
892 B2ERROR(
"No boundaries found for the current experiment's run list. Failing the strategy.")
895 B2INFO(
"This appears to be the first attempted execution of the experiment.")
897 current_boundary_iov = remaining_boundary_iovs.pop(0)
898 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
900 if not remaining_boundary_iovs:
901 current_iov = IoV(*lowest_exprun, *highest_exprun)
903 current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
907 if not remaining_boundary_iovs:
909 B2ERROR(
"Not enough data found for the current experiment's run list. Failing the strategy.")
912 B2INFO(
"There wasn't enough data previously. Merging with the runs from the next boundary.")
914 next_boundary_iov = remaining_boundary_iovs.pop(0)
915 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
916 next_boundary_iov.exp_high, next_boundary_iov.run_high)
917 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
919 if not remaining_boundary_iovs:
920 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
922 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
923 current_boundary_iov.exp_high, current_boundary_iov.run_high)
925 self.execute_runs(current_runs, iteration, current_iov)
928 if self.alg_success():
930 B2INFO(
"Found a success. Will save the payloads for later.")
932 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
933 last_successful_result = self.machine.result
934 last_successful_runs = current_runs[:]
935 last_successful_iov = current_iov
938 current_boundary_iov =
None
940 self.machine.complete()
942 elif self.machine.result.result == AlgResult.not_enough_data.value:
943 B2INFO(
"Not Enough Data result.")
945 self.machine.complete()
948 B2ERROR(
"Hit a failure or some kind of result we can't continue from. Failing out...")
956 if not remaining_boundary_iovs:
958 B2INFO(
"Finished this experiment's boundaries. "
959 f
"Committing remaining payloads from {last_successful_result.iov}")
960 self.machine.algorithm.algorithm.commit(last_successful_payloads)
961 self.results.append(last_successful_result)
962 self.send_result(last_successful_result)
966 current_boundary_iov = remaining_boundary_iovs.pop(0)
967 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
969 if not remaining_boundary_iovs:
970 current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
972 current_iov = current_boundary_iov
977 if not remaining_boundary_iovs:
978 B2INFO(
"We have no remaining runs to increase the amount of data. "
979 "Instead we will merge with the previous successful runs.")
981 new_current_runs = last_successful_runs[:]
982 new_current_runs.extend(current_runs)
983 current_runs = new_current_runs[:]
984 current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
985 current_iov.exp_high, current_iov.run_high)
987 last_successful_payloads = []
988 last_successful_result =
None
989 last_successful_runs = []
990 last_successful_iov =
None
993 B2INFO(
"Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
995 next_boundary_iov = remaining_boundary_iovs.pop(0)
996 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
997 next_boundary_iov.exp_high, next_boundary_iov.run_high)
999 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
1001 if not remaining_boundary_iovs:
1002 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
1004 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1005 current_boundary_iov.exp_high, current_boundary_iov.run_high)
1007 self.execute_runs(current_runs, iteration, current_iov)
1010 if self.alg_success():
1012 B2INFO(
"Found a success.")
1013 if last_successful_result:
1014 B2INFO(
"Can now commit the previous success.")
1015 self.machine.algorithm.algorithm.commit(last_successful_payloads)
1016 self.results.append(last_successful_result)
1017 self.send_result(last_successful_result)
1019 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
1020 last_successful_result = self.machine.result
1021 last_successful_runs = current_runs[:]
1022 last_successful_iov = current_iov
1025 current_boundary_iov =
None
1027 self.machine.complete()
1029 elif self.machine.result.result == AlgResult.not_enough_data.value:
1030 B2INFO(
"Not Enough Data result.")
1032 self.machine.complete()
1035 B2ERROR(
"Hit a failure or some other result we can't continue from. Failing out...")
1039 def execute_runs(self, runs, iteration, iov):
1041 if not self.first_execution:
1042 self.machine.setup_algorithm()
1044 self.first_execution =
False
1046 B2INFO(f
"Executing and applying {iov} to the payloads.")
1047 self.machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1048 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
1050 def alg_success(self):
1051 return ((self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value))
1054class StrategyError(Exception):
1056 Basic Exception for this type of
class.