14 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
15 from caf.utils
import AlgResult
16 from caf.utils
import B2INFO_MULTILINE
17 from caf.utils
import runs_overlapping_iov, runs_from_vector
18 from caf.utils
import iov_from_runs, split_runs_by_exp, vector_from_runs
19 from caf.utils
import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
20 from caf.utils
import IoV, ExpRun
21 from caf.state_machines
import AlgorithmMachine
23 from abc
import ABC, abstractmethod
27 class AlgorithmStrategy(ABC):
29 Base class for Algorithm strategies. These do the actual execution of a single
30 algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
31 how database payloads are passed between executions, and whether or not final payloads have an IoV
32 that is independent to the actual runs used to calculates them.
35 algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
37 This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
38 When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
40 If you define your derived class with an __init__ method, then you should first call the base class
41 `AlgorithmStrategy.__init__()` method via super() e.g.
43 >>> def __init__(self):
44 >>> super().__init__()
46 The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
47 in the required way defined by the options you have selected/attributes set.
51 required_attrs = [
"algorithm",
53 "dependent_databases",
55 "output_database_dir",
61 required_true_attrs = [
"algorithm",
63 "output_database_dir",
68 allowed_granularities = [
"run",
"all"]
71 FINISHED_RESULTS =
"DONE"
74 COMPLETED =
"COMPLETED"
79 def __init__(self, algorithm):
83 self.algorithm = algorithm
89 self.output_database_dir =
""
91 self.database_chain = []
93 self.dependent_databases = []
96 self.ignored_runs = []
103 def run(self, iov, iteration, queue):
105 Abstract method that needs to be implemented. It will be called to actually execute the
109 def setup_from_dict(self, params):
112 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
114 for attribute_name, value
in params.items():
115 setattr(self, attribute_name, value)
120 bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
122 B2INFO(
"Checking validity of current AlgorithmStrategy setup.")
124 for attribute_name
in self.required_attrs:
125 if not hasattr(self, attribute_name):
126 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
129 for attribute_name
in self.required_true_attrs:
130 if not getattr(self, attribute_name):
131 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} returned False.")
135 def find_iov_gaps(self):
137 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
138 not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
139 within the same experiment are found.
144 iov_gaps = find_gaps_in_iov_list(sorted([result.iov
for result
in self.results]))
146 gap_msg = [
"Found gaps between IoVs of algorithm results (regardless of result)."]
147 gap_msg.append(
"You may have requested these gaps deliberately by not passing in data containing these runs.")
148 gap_msg.append(
"This may not be a problem, but you will not have payoads defined for these IoVs")
149 gap_msg.append(
"unless you edit the final database.txt yourself.")
150 B2INFO_MULTILINE(gap_msg)
152 B2INFO(f
"{iov} not covered by any execution of the algorithm.")
155 def any_failed_iov(self):
158 bool: If any result in the current results list has a failed algorithm code we return True
161 for result
in self.results:
162 if result.result == AlgResult.failure.value
or result.result == AlgResult.not_enough_data.value:
163 failed_results.append(result)
165 B2WARNING(
"Failed results found.")
166 for result
in failed_results:
167 if result.result == AlgResult.failure.value:
168 B2ERROR(f
"c_Failure returned for {result.iov}.")
169 elif result.result == AlgResult.not_enough_data.value:
170 B2WARNING(f
"c_NotEnoughData returned for {result.iov}.")
175 def send_result(self, result):
176 self.queue.put({
"type":
"result",
"value": result})
178 def send_final_state(self, state):
179 self.queue.put({
"type":
"final_state",
"value": state})
182 class SingleIOV(AlgorithmStrategy):
183 """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
184 data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
187 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
188 a CalibrationAlgorithm C++ class directly.
192 usable_params = {
"apply_iov": IoV}
194 def __init__(self, algorithm):
197 super().__init__(algorithm)
200 self.machine = AlgorithmMachine(self.algorithm)
202 def run(self, iov, iteration, queue):
204 Runs the algorithm machine over the collected data and fills the results.
206 if not self.is_valid():
207 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
210 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
213 machine_params[
"database_chain"] = self.database_chain
214 machine_params[
"dependent_databases"] = self.dependent_databases
215 machine_params[
"output_dir"] = self.output_dir
216 machine_params[
"output_database_dir"] = self.output_database_dir
217 machine_params[
"input_files"] = self.input_files
218 machine_params[
"ignored_runs"] = self.ignored_runs
219 self.machine.setup_from_dict(machine_params)
221 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
222 self.machine.setup_algorithm(iteration=iteration)
224 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
226 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
229 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
231 runs_to_execute = all_runs_collected
234 if self.ignored_runs:
235 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
236 runs_to_execute.difference_update(set(self.ignored_runs))
238 runs_to_execute = sorted(runs_to_execute)
240 if "apply_iov" in self.algorithm.params:
241 apply_iov = self.algorithm.params[
"apply_iov"]
242 self.machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
243 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
246 self.send_result(self.machine.result)
249 if (self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value):
251 self.machine.complete()
253 self.machine.algorithm.algorithm.commit()
254 self.send_final_state(self.COMPLETED)
258 self.send_final_state(self.FAILED)
261 class SequentialRunByRun(AlgorithmStrategy):
263 Algorithm strategy to do run-by-run calibration of collected data.
264 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
265 If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
266 the next run's data and tries again.
268 Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
269 and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
270 is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
271 are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
273 Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
274 This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration.
276 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
277 a CalibrationAlgorithm C++ class directly.
282 "has_experiment_settings": bool,
288 allowed_granularities = [
"run"]
290 def __init__(self, algorithm):
293 super().__init__(algorithm)
296 self.machine = AlgorithmMachine(self.algorithm)
297 if "step_size" not in self.algorithm.params:
298 self.algorithm.params[
"step_size"] = 1
299 self.first_execution =
True
301 def apply_experiment_settings(self, algorithm, experiment):
303 Apply experiment-dependent settings.
304 This is the default version, which does not do anything.
305 If necessary, it should be reimplemented by derived classes.
309 def run(self, iov, iteration, queue):
311 Runs the algorithm machine over the collected data and fills the results.
313 if not self.is_valid():
314 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
316 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
319 machine_params[
"database_chain"] = self.database_chain
320 machine_params[
"dependent_databases"] = self.dependent_databases
321 machine_params[
"output_dir"] = self.output_dir
322 machine_params[
"output_database_dir"] = self.output_database_dir
323 machine_params[
"input_files"] = self.input_files
324 machine_params[
"ignored_runs"] = self.ignored_runs
325 self.machine.setup_from_dict(machine_params)
327 self.machine.setup_algorithm(iteration=iteration)
329 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
331 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
334 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
336 runs_to_execute = all_runs_collected[:]
339 if self.ignored_runs:
340 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
341 runs_to_execute.difference_update(set(self.ignored_runs))
343 runs_to_execute = sorted(runs_to_execute)
348 runs_to_execute = split_runs_by_exp(runs_to_execute)
353 if "iov_coverage" in self.algorithm.params:
354 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
355 iov_coverage = self.algorithm.params[
"iov_coverage"]
357 number_of_experiments = len(runs_to_execute)
359 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
362 if "has_experiment_settings" in self.algorithm.params:
363 if self.algorithm.params[
"has_experiment_settings"]:
364 self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
371 lowest_exprun = ExpRun(run_list[0].exp, 0)
372 highest_exprun = ExpRun(run_list[-1].exp, -1)
375 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
if iov_coverage
else run_list[0]
376 if i_exp == number_of_experiments:
377 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
if iov_coverage
else run_list[-1]
379 self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
382 gaps = self.find_iov_gaps()
384 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
388 if self.any_failed_iov():
389 self.send_final_state(self.FAILED)
391 self.send_final_state(self.COMPLETED)
393 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
395 remaining_runs = run_list[:]
401 last_successful_payloads =
None
402 last_successful_result =
None
405 for expruns
in grouper(self.algorithm.params[
"step_size"], run_list):
407 if not self.first_execution:
408 self.machine.setup_algorithm()
410 self.first_execution =
False
413 current_runs.extend(expruns)
415 remaining_runs = [run
for run
in remaining_runs
if run
not in current_runs]
418 if not last_successful_result:
419 B2INFO(
"Detected that this will be the first payload of this experiment.")
423 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
426 B2INFO(
"Detected that this will be the only payload of the experiment.")
427 apply_iov = IoV(*lowest_exprun, *highest_exprun)
430 if not remaining_runs:
431 B2INFO(
"Detected that there are no more runs to execute in this experiment after this next execution.")
432 apply_iov = IoV(*current_runs[0], *highest_exprun)
435 B2INFO(
"Detected that there are more runs to execute in this experiment after this next execution.")
436 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
438 B2INFO(f
"Executing and applying {apply_iov} to the payloads.")
439 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
440 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
443 if (self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value):
444 self.machine.complete()
447 if last_successful_payloads
and last_successful_result:
448 B2INFO(
"Saving this execution's payloads to be committed later.")
450 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
451 new_successful_result = self.machine.result
452 B2INFO(
"We just succeded in execution of the Algorithm."
453 f
" Will now commit payloads from the previous success for {last_successful_result.iov}.")
454 self.machine.algorithm.algorithm.commit(last_successful_payloads)
455 self.results.append(last_successful_result)
456 self.send_result(last_successful_result)
459 last_successful_payloads = new_successful_payloads
460 last_successful_result = new_successful_result
463 B2INFO(
"We have no more runs to process. "
464 f
"Will now commit the most recent payloads for {new_successful_result.iov}.")
465 self.machine.algorithm.algorithm.commit(new_successful_payloads)
466 self.results.append(new_successful_result)
467 self.send_result(new_successful_result)
473 B2INFO(f
"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
475 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
476 last_successful_result = self.machine.result
479 B2INFO(
"We just succeeded in execution of the Algorithm."
480 " No runs left to be processed, so we are committing results of this execution.")
481 self.machine.algorithm.algorithm.commit()
482 self.results.append(self.machine.result)
483 self.send_result(self.machine.result)
486 previous_runs = current_runs[:]
489 elif (self.machine.result.result == AlgResult.not_enough_data.value):
490 B2INFO(f
"There wasn't enough data in {self.machine.result.iov}.")
492 B2INFO(
"Some runs remain to be processed. "
493 f
"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
494 elif not remaining_runs
and not last_successful_result:
495 B2ERROR(
"There aren't any more runs remaining to merge with, and we never had a previous success."
496 " There wasn't enough data in the full input data requested.")
497 self.results.append(self.machine.result)
498 self.send_result(self.machine.result)
501 elif not remaining_runs
and last_successful_result:
502 B2INFO(
"There aren't any more runs remaining to merge with. But we had a previous success"
503 ", so we'll merge with the previous IoV.")
504 final_runs = current_runs[:]
505 current_runs = previous_runs
506 current_runs.extend(final_runs)
508 elif self.machine.result.result == AlgResult.failure.value:
509 B2ERROR(f
"{self.algorithm.name} returned failure exit code.")
510 self.results.append(self.machine.result)
511 self.send_result(self.machine.result)
517 self.machine.setup_algorithm()
518 apply_iov = IoV(last_successful_result.iov.exp_low,
519 last_successful_result.iov.run_low,
521 B2INFO(f
"Executing on {apply_iov}.")
522 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
523 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
524 if (self.machine.result.result == AlgResult.ok.value)
or (
525 self.machine.result.result == AlgResult.iterate.value):
526 self.machine.complete()
528 self.machine.algorithm.algorithm.commit()
530 self.results.append(self.machine.result)
531 self.send_result(self.machine.result)
534 self.results.append(self.machine.result)
535 self.send_result(self.machine.result)
540 class SimpleRunByRun(AlgorithmStrategy):
542 Algorithm strategy to do run-by-run calibration of collected data.
543 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
545 This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
546 'not enough data' on the current run.
548 Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
549 next run (if any are left).
550 Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
552 .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
554 The failure will prevent iteration/successful completion of the CAF though.
556 .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
557 enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
558 The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
559 However, you will still have the database created that covers all the successfull runs.
561 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
562 a CalibrationAlgorithm C++ class directly.
565 allowed_granularities = [
"run"]
570 def __init__(self, algorithm):
573 super().__init__(algorithm)
576 self.machine = AlgorithmMachine(self.algorithm)
578 def run(self, iov, iteration, queue):
580 Runs the algorithm machine over the collected data and fills the results.
583 if not self.is_valid():
584 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
587 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
590 machine_params[
"database_chain"] = self.database_chain
591 machine_params[
"dependent_databases"] = self.dependent_databases
592 machine_params[
"output_dir"] = self.output_dir
593 machine_params[
"output_database_dir"] = self.output_database_dir
594 machine_params[
"input_files"] = self.input_files
595 machine_params[
"ignored_runs"] = self.ignored_runs
596 self.machine.setup_from_dict(machine_params)
598 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
599 self.machine.setup_algorithm(iteration=iteration)
601 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
603 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
606 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
608 runs_to_execute = all_runs_collected
611 if self.ignored_runs:
612 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
613 runs_to_execute.difference_update(set(self.ignored_runs))
615 runs_to_execute = sorted(runs_to_execute)
618 first_execution =
True
619 for exprun
in runs_to_execute:
620 if not first_execution:
621 self.machine.setup_algorithm()
622 current_runs = exprun
623 apply_iov = iov_from_runs([current_runs])
624 B2INFO(f
"Executing on IoV = {apply_iov}.")
625 self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
626 first_execution =
False
627 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
629 if (self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value):
631 B2INFO(f
"Committing payloads for {iov_from_runs([current_runs])}.")
632 self.machine.algorithm.algorithm.commit()
633 self.results.append(self.machine.result)
634 self.send_result(self.machine.result)
635 self.machine.complete()
637 elif (self.machine.result.result == AlgResult.not_enough_data.value):
638 B2INFO(f
"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
639 self.results.append(self.machine.result)
640 self.send_result(self.machine.result)
642 elif self.machine.result.result == AlgResult.failure.value:
643 B2ERROR(f
"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
644 self.results.append(self.machine.result)
645 self.send_result(self.machine.result)
649 gaps = self.find_iov_gaps()
651 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
654 self.send_final_state(self.COMPLETED)
657 class SequentialBoundaries(AlgorithmStrategy):
659 Algorithm strategy to first calculate run boundaries where execution should be attempted.
660 Runs the algorithm over the input data contained within the requested IoV of the boundaries,
661 starting with the first boundary data only.
662 If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
663 but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
664 but using run boundaries instead of runs directly.
665 Notice that boundaries cannot span multiple experiments.
667 By default the algorithm will get the payload boundaries directly from the algorithm that need to
668 have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
669 is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
670 the need to define the ``isBoundaryRequired`` function.
672 ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
673 experiment will be added if not already present. An empty list will thus produce a single payload for each
674 experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
675 behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
681 "payload_boundaries": []
685 allowed_granularities = [
"run"]
687 def __init__(self, algorithm):
690 super().__init__(algorithm)
693 self.machine = AlgorithmMachine(self.algorithm)
694 self.first_execution =
True
696 def run(self, iov, iteration, queue):
698 Runs the algorithm machine over the collected data and fills the results.
700 if not self.is_valid():
701 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
703 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
706 machine_params[
"database_chain"] = self.database_chain
707 machine_params[
"dependent_databases"] = self.dependent_databases
708 machine_params[
"output_dir"] = self.output_dir
709 machine_params[
"output_database_dir"] = self.output_database_dir
710 machine_params[
"input_files"] = self.input_files
711 machine_params[
"ignored_runs"] = self.ignored_runs
712 self.machine.setup_from_dict(machine_params)
714 self.machine.setup_algorithm(iteration=iteration)
716 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
718 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
721 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
723 runs_to_execute = all_runs_collected[:]
726 if self.ignored_runs:
727 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
728 runs_to_execute.difference_update(set(self.ignored_runs))
730 runs_to_execute = sorted(runs_to_execute)
735 runs_to_execute = split_runs_by_exp(runs_to_execute)
740 if "iov_coverage" in self.algorithm.params:
741 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
742 iov_coverage = self.algorithm.params[
"iov_coverage"]
744 payload_boundaries =
None
745 if "payload_boundaries" in self.algorithm.params:
746 B2INFO(f
"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
747 payload_boundaries = self.algorithm.params[
"payload_boundaries"]
749 number_of_experiments = len(runs_to_execute)
750 B2INFO(f
"We are iterating over {number_of_experiments} experiments.")
753 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
754 B2DEBUG(26, f
"Run List for this experiment={run_list}")
755 current_experiment = run_list[0].exp
756 B2INFO(f
"Executing over data from experiment {current_experiment}")
763 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
765 lowest_exprun = run_list[0]
768 lowest_exprun = ExpRun(current_experiment, 0)
771 if iov_coverage
and i_exp == number_of_experiments:
772 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
775 elif i_exp < number_of_experiments:
776 highest_exprun = ExpRun(current_experiment, -1)
779 highest_exprun = run_list[-1]
782 vec_run_list = vector_from_runs(run_list)
783 if payload_boundaries
is None:
785 B2INFO(
"Attempting to find payload boundaries.")
786 vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list)
789 if vec_boundaries.empty():
790 B2ERROR(
"No boundaries found but we are in a strategy that requires them! Failing...")
792 self.send_final_state(self.FAILED)
794 vec_boundaries = runs_from_vector(vec_boundaries)
797 B2INFO(f
"Using as payload boundaries {payload_boundaries}.")
798 vec_boundaries = [ExpRun(exp, run)
for exp, run
in payload_boundaries]
803 run_boundaries = sorted([er
for er
in vec_boundaries
if er.exp == current_experiment])
807 first_exprun = ExpRun(current_experiment, 0)
808 if first_exprun
not in run_boundaries:
809 B2WARNING(f
"No boundary found at ({current_experiment}, 0), adding it.")
810 run_boundaries[0:0] = [first_exprun]
811 B2INFO(f
"Found {len(run_boundaries)} boundaries for this experiment. "
812 "Checking if we have some data for all boundary IoVs...")
815 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
816 B2DEBUG(26, f
"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
818 boundary_iovs_to_run_lists = {key: value
for key, value
in boundary_iovs_to_run_lists.items()
if value}
819 B2DEBUG(26, f
"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
821 new_boundary_iovs_to_run_lists = {}
822 previous_boundary_iov =
None
823 previous_boundary_run_list =
None
824 for boundary_iov, run_list
in boundary_iovs_to_run_lists.items():
825 if not previous_boundary_iov:
826 previous_boundary_iov = boundary_iov
827 previous_boundary_run_list = run_list
830 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
831 B2WARNING(
"Gap in boundary IoVs found before execution! "
832 "Will correct it by extending the previous boundary up to the next one.")
833 B2INFO(f
"Original boundary IoV={previous_boundary_iov}")
834 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
835 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
836 B2INFO(f
"New boundary IoV={previous_boundary_iov}")
837 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
838 previous_boundary_iov = boundary_iov
839 previous_boundary_run_list = run_list
841 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
842 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
843 B2DEBUG(26, f
"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
845 success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
848 self.send_final_state(self.FAILED)
853 gaps = self.find_iov_gaps()
855 B2WARNING(
"There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
857 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
861 if self.any_failed_iov():
862 self.send_final_state(self.FAILED)
864 self.send_final_state(self.COMPLETED)
866 def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration):
868 Take the previously found boundaries and the run lists they correspond to and actually perform the
869 Algorithm execution. This is assumed to be for a single experiment.
872 remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
877 current_boundary_iov =
None
882 last_successful_payloads =
None
883 last_successful_result =
None
885 last_successful_runs = []
887 last_successful_iov =
None
891 if not last_successful_result:
894 if not remaining_boundary_iovs:
896 B2ERROR(
"No boundaries found for the current experiment's run list. Failing the strategy.")
899 B2INFO(
"This appears to be the first attempted execution of the experiment.")
901 current_boundary_iov = remaining_boundary_iovs.pop(0)
902 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
904 if not remaining_boundary_iovs:
905 current_iov = IoV(*lowest_exprun, *highest_exprun)
907 current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
911 if not remaining_boundary_iovs:
913 B2ERROR(
"Not enough data found for the current experiment's run list. Failing the strategy.")
916 B2INFO(
"There wasn't enough data previously. Merging with the runs from the next boundary.")
918 next_boundary_iov = remaining_boundary_iovs.pop(0)
919 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
920 next_boundary_iov.exp_high, next_boundary_iov.run_high)
921 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
923 if not remaining_boundary_iovs:
924 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
926 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
927 current_boundary_iov.exp_high, current_boundary_iov.run_high)
929 self.execute_runs(current_runs, iteration, current_iov)
932 if self.alg_success():
934 B2INFO(
"Found a success. Will save the payloads for later.")
936 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
937 last_successful_result = self.machine.result
938 last_successful_runs = current_runs[:]
939 last_successful_iov = current_iov
942 current_boundary_iov =
None
944 self.machine.complete()
946 elif self.machine.result.result == AlgResult.not_enough_data.value:
947 B2INFO(
"Not Enough Data result.")
949 self.machine.complete()
952 B2ERROR(
"Hit a failure or some kind of result we can't continue from. Failing out...")
960 if not remaining_boundary_iovs:
962 B2INFO(
"Finished this experiment's boundaries. "
963 f
"Committing remaining payloads from {last_successful_result.iov}")
964 self.machine.algorithm.algorithm.commit(last_successful_payloads)
965 self.results.append(last_successful_result)
966 self.send_result(last_successful_result)
970 current_boundary_iov = remaining_boundary_iovs.pop(0)
971 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
973 if not remaining_boundary_iovs:
974 current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
976 current_iov = current_boundary_iov
981 if not remaining_boundary_iovs:
982 B2INFO(
"We have no remaining runs to increase the amount of data. "
983 "Instead we will merge with the previous successful runs.")
985 new_current_runs = last_successful_runs[:]
986 new_current_runs.extend(current_runs)
987 current_runs = new_current_runs[:]
988 current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
989 current_iov.exp_high, current_iov.run_high)
991 last_successful_payloads = []
992 last_successful_result =
None
993 last_successful_runs = []
994 last_successful_iov =
None
997 B2INFO(
"Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
999 next_boundary_iov = remaining_boundary_iovs.pop(0)
1000 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
1001 next_boundary_iov.exp_high, next_boundary_iov.run_high)
1003 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
1005 if not remaining_boundary_iovs:
1006 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
1008 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1009 current_boundary_iov.exp_high, current_boundary_iov.run_high)
1011 self.execute_runs(current_runs, iteration, current_iov)
1014 if self.alg_success():
1016 B2INFO(
"Found a success.")
1017 if last_successful_result:
1018 B2INFO(
"Can now commit the previous success.")
1019 self.machine.algorithm.algorithm.commit(last_successful_payloads)
1020 self.results.append(last_successful_result)
1021 self.send_result(last_successful_result)
1023 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
1024 last_successful_result = self.machine.result
1025 last_successful_runs = current_runs[:]
1026 last_successful_iov = current_iov
1029 current_boundary_iov =
None
1031 self.machine.complete()
1033 elif self.machine.result.result == AlgResult.not_enough_data.value:
1034 B2INFO(
"Not Enough Data result.")
1036 self.machine.complete()
1039 B2ERROR(
"Hit a failure or some other result we can't continue from. Failing out...")
1043 def execute_runs(self, runs, iteration, iov):
1045 if not self.first_execution:
1046 self.machine.setup_algorithm()
1048 self.first_execution =
False
1050 B2INFO(f
"Executing and applying {iov} to the payloads.")
1051 self.machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1052 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
1054 def alg_success(self):
1055 return ((self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value))
1058 class StrategyError(Exception):
1060 Basic Exception for this type of class.