12 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
13 from caf.utils
import AlgResult
14 from caf.utils
import B2INFO_MULTILINE
15 from caf.utils
import runs_overlapping_iov, runs_from_vector
16 from caf.utils
import iov_from_runs, split_runs_by_exp, vector_from_runs
17 from caf.utils
import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
18 from caf.utils
import IoV, ExpRun
19 from caf.state_machines
import AlgorithmMachine
21 from abc
import ABC, abstractmethod
27 Base class for Algorithm strategies. These do the actual execution of a single
28 algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
29 how database payloads are passed between executions, and whether or not final payloads have an IoV
30 that is independent to the actual runs used to calculates them.
33 algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
35 This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
36 When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
38 If you define your derived class with an __init__ method, then you should first call the base class
39 `AlgorithmStrategy.__init__()` method via super() e.g.
41 >>> def __init__(self):
42 >>> super().__init__()
44 The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
45 in the required way defined by the options you have selected/attributes set.
49 required_attrs = [
"algorithm",
51 "dependent_databases",
53 "output_database_dir",
59 required_true_attrs = [
"algorithm",
61 "output_database_dir",
66 allowed_granularities = [
"run",
"all"]
69 FINISHED_RESULTS =
"DONE"
72 COMPLETED =
"COMPLETED"
101 def run(self, iov, iteration, queue):
103 Abstract method that needs to be implemented. It will be called to actually execute the
110 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
112 for attribute_name, value
in params.items():
113 setattr(self, attribute_name, value)
118 bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
120 B2INFO(
"Checking validity of current AlgorithmStrategy setup.")
123 if not hasattr(self, attribute_name):
124 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
128 if not getattr(self, attribute_name):
129 B2ERROR(f
"AlgorithmStrategy attribute {attribute_name} returned False.")
135 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
136 not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
137 within the same experiment are found.
142 iov_gaps = find_gaps_in_iov_list(sorted([result.iov
for result
in self.
resultsresults]))
144 gap_msg = [
"Found gaps between IoVs of algorithm results (regardless of result)."]
145 gap_msg.append(
"You may have requested these gaps deliberately by not passing in data containing these runs.")
146 gap_msg.append(
"This may not be a problem, but you will not have payoads defined for these IoVs")
147 gap_msg.append(
"unless you edit the final database.txt yourself.")
148 B2INFO_MULTILINE(gap_msg)
150 B2INFO(f
"{iov} not covered by any execution of the algorithm.")
156 bool: If any result in the current results list has a failed algorithm code we return True
159 for result
in self.
resultsresults:
160 if result.result == AlgResult.failure.value
or result.result == AlgResult.not_enough_data.value:
161 failed_results.append(result)
163 B2WARNING(
"Failed results found.")
164 for result
in failed_results:
165 if result.result == AlgResult.failure.value:
166 B2ERROR(f
"c_Failure returned for {result.iov}.")
167 elif result.result == AlgResult.not_enough_data.value:
168 B2WARNING(f
"c_NotEnoughData returned for {result.iov}.")
173 def send_result(self, result):
174 self.
queuequeue.put({
"type":
"result",
"value": result})
176 def send_final_state(self, state):
177 self.
queuequeue.put({
"type":
"final_state",
"value": state})
181 """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
182 data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
185 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
186 a CalibrationAlgorithm C++ class directly.
190 usable_params = {
"apply_iov": IoV}
200 def run(self, iov, iteration, queue):
202 Runs the algorithm machine over the collected data and fills the results.
205 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
208 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
211 machine_params[
"database_chain"] = self.
database_chaindatabase_chain
213 machine_params[
"output_dir"] = self.
output_diroutput_dir
215 machine_params[
"input_files"] = self.
input_filesinput_files
216 machine_params[
"ignored_runs"] = self.
ignored_runsignored_runs
219 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
220 self.
machinemachine.setup_algorithm(iteration=iteration)
222 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
224 all_runs_collected = set(runs_from_vector(self.
algorithmalgorithm.algorithm.getRunListFromAllData()))
227 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
229 runs_to_execute = all_runs_collected
233 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
234 runs_to_execute.difference_update(set(self.
ignored_runsignored_runs))
236 runs_to_execute = sorted(runs_to_execute)
238 if "apply_iov" in self.
algorithmalgorithm.params:
239 apply_iov = self.
algorithmalgorithm.params[
"apply_iov"]
240 self.
machinemachine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
241 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
247 if (self.
machinemachine.result.result == AlgResult.ok.value)
or (self.
machinemachine.result.result == AlgResult.iterate.value):
251 self.
machinemachine.algorithm.algorithm.commit()
261 Algorithm strategy to do run-by-run calibration of collected data.
262 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
263 If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
264 the next run's data and tries again.
266 Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
267 and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
268 is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
269 are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
271 Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
272 This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration.
274 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
275 a CalibrationAlgorithm C++ class directly.
280 "has_experiment_settings": bool,
286 allowed_granularities = [
"run"]
295 if "step_size" not in self.
algorithmalgorithm.params:
296 self.
algorithmalgorithm.params[
"step_size"] = 1
301 Apply experiment-dependent settings.
302 This is the default version, which does not do anything.
303 If necessary, it should be reimplemented by derived classes.
307 def run(self, iov, iteration, queue):
309 Runs the algorithm machine over the collected data and fills the results.
311 if not self.is_valid():
312 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
314 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
317 machine_params[
"database_chain"] = self.database_chain
318 machine_params[
"dependent_databases"] = self.dependent_databases
319 machine_params[
"output_dir"] = self.output_dir
320 machine_params[
"output_database_dir"] = self.output_database_dir
321 machine_params[
"input_files"] = self.input_files
322 machine_params[
"ignored_runs"] = self.ignored_runs
325 self.machine.setup_algorithm(iteration=iteration)
327 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
329 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
332 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
334 runs_to_execute = all_runs_collected[:]
337 if self.ignored_runs:
338 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
339 runs_to_execute.difference_update(set(self.ignored_runs))
341 runs_to_execute = sorted(runs_to_execute)
346 runs_to_execute = split_runs_by_exp(runs_to_execute)
351 if "iov_coverage" in self.algorithm.params:
352 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
353 iov_coverage = self.algorithm.params[
"iov_coverage"]
355 number_of_experiments = len(runs_to_execute)
357 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
360 if "has_experiment_settings" in self.algorithm.params:
361 if self.algorithm.params[
"has_experiment_settings"]:
362 self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
369 lowest_exprun = ExpRun(run_list[0].exp, 0)
370 highest_exprun = ExpRun(run_list[-1].exp, -1)
373 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
if iov_coverage
else run_list[0]
374 if i_exp == number_of_experiments:
375 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
if iov_coverage
else run_list[-1]
377 self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
380 gaps = self.find_iov_gaps()
382 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
386 if self.any_failed_iov():
387 self.send_final_state(self.FAILED)
389 self.send_final_state(self.COMPLETED)
391 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
393 remaining_runs = run_list[:]
399 last_successful_payloads =
None
400 last_successful_result =
None
403 for expruns
in grouper(self.algorithm.params[
"step_size"], run_list):
405 if not self.first_execution:
406 self.machine.setup_algorithm()
408 self.first_execution =
False
411 current_runs.extend(expruns)
413 remaining_runs = [run
for run
in remaining_runs
if run
not in current_runs]
416 if not last_successful_result:
417 B2INFO(
"Detected that this will be the first payload of this experiment.")
421 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
424 B2INFO(
"Detected that this will be the only payload of the experiment.")
425 apply_iov = IoV(*lowest_exprun, *highest_exprun)
428 if not remaining_runs:
429 B2INFO(
"Detected that there are no more runs to execute in this experiment after this next execution.")
430 apply_iov = IoV(*current_runs[0], *highest_exprun)
433 B2INFO(
"Detected that there are more runs to execute in this experiment after this next execution.")
434 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
436 B2INFO(f
"Executing and applying {apply_iov} to the payloads.")
437 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
438 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
441 if (self.machine.result.result == AlgResult.ok.value)
or (self.machine.result.result == AlgResult.iterate.value):
442 self.machine.complete()
445 if last_successful_payloads
and last_successful_result:
446 B2INFO(
"Saving this execution's payloads to be committed later.")
448 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
449 new_successful_result = self.machine.result
450 B2INFO(
"We just succeded in execution of the Algorithm."
451 f
" Will now commit payloads from the previous success for {last_successful_result.iov}.")
452 self.machine.algorithm.algorithm.commit(last_successful_payloads)
453 self.results.append(last_successful_result)
454 self.send_result(last_successful_result)
457 last_successful_payloads = new_successful_payloads
458 last_successful_result = new_successful_result
461 B2INFO(
"We have no more runs to process. "
462 f
"Will now commit the most recent payloads for {new_successful_result.iov}.")
463 self.machine.algorithm.algorithm.commit(new_successful_payloads)
464 self.results.append(new_successful_result)
465 self.send_result(new_successful_result)
471 B2INFO(f
"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
473 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
474 last_successful_result = self.machine.result
477 B2INFO(
"We just succeeded in execution of the Algorithm."
478 " No runs left to be processed, so we are committing results of this execution.")
479 self.machine.algorithm.algorithm.commit()
480 self.results.append(self.machine.result)
481 self.send_result(self.machine.result)
484 previous_runs = current_runs[:]
487 elif (self.machine.result.result == AlgResult.not_enough_data.value):
488 B2INFO(f
"There wasn't enough data in {self.machine.result.iov}.")
490 B2INFO(
"Some runs remain to be processed. "
491 f
"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
492 elif not remaining_runs
and not last_successful_result:
493 B2ERROR(
"There aren't any more runs remaining to merge with, and we never had a previous success."
494 " There wasn't enough data in the full input data requested.")
495 self.results.append(self.machine.result)
496 self.send_result(self.machine.result)
499 elif not remaining_runs
and last_successful_result:
500 B2INFO(
"There aren't any more runs remaining to merge with. But we had a previous success"
501 ", so we'll merge with the previous IoV.")
502 final_runs = current_runs[:]
503 current_runs = previous_runs
504 current_runs.extend(final_runs)
506 elif self.machine.result.result == AlgResult.failure.value:
507 B2ERROR(f
"{self.algorithm.name} returned failure exit code.")
508 self.results.append(self.machine.result)
509 self.send_result(self.machine.result)
515 self.machine.setup_algorithm()
516 apply_iov = IoV(last_successful_result.iov.exp_low,
517 last_successful_result.iov.run_low,
519 B2INFO(f
"Executing on {apply_iov}.")
520 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
521 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
522 if (self.machine.result.result == AlgResult.ok.value)
or (
523 self.machine.result.result == AlgResult.iterate.value):
524 self.machine.complete()
526 self.machine.algorithm.algorithm.commit()
528 self.results.append(self.machine.result)
529 self.send_result(self.machine.result)
532 self.results.append(self.machine.result)
533 self.send_result(self.machine.result)
540 Algorithm strategy to do run-by-run calibration of collected data.
541 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
543 This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
544 'not enough data' on the current run.
546 Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
547 next run (if any are left).
548 Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
550 .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
552 The failure will prevent iteration/successful completion of the CAF though.
554 .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
555 enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
556 The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
557 However, you will still have the database created that covers all the successfull runs.
559 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
560 a CalibrationAlgorithm C++ class directly.
563 allowed_granularities = [
"run"]
576 def run(self, iov, iteration, queue):
578 Runs the algorithm machine over the collected data and fills the results.
582 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
585 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
588 machine_params[
"database_chain"] = self.
database_chaindatabase_chain
590 machine_params[
"output_dir"] = self.
output_diroutput_dir
592 machine_params[
"input_files"] = self.
input_filesinput_files
593 machine_params[
"ignored_runs"] = self.
ignored_runsignored_runs
596 B2INFO(f
"Starting AlgorithmMachine of {self.algorithm.name}.")
597 self.
machinemachine.setup_algorithm(iteration=iteration)
599 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
601 all_runs_collected = set(runs_from_vector(self.
algorithmalgorithm.algorithm.getRunListFromAllData()))
604 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
606 runs_to_execute = all_runs_collected
610 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
611 runs_to_execute.difference_update(set(self.
ignored_runsignored_runs))
613 runs_to_execute = sorted(runs_to_execute)
616 first_execution =
True
617 for exprun
in runs_to_execute:
618 if not first_execution:
619 self.
machinemachine.setup_algorithm()
620 current_runs = exprun
621 apply_iov = iov_from_runs([current_runs])
622 B2INFO(f
"Executing on IoV = {apply_iov}.")
623 self.
machinemachine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
624 first_execution =
False
625 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
627 if (self.
machinemachine.result.result == AlgResult.ok.value)
or (self.
machinemachine.result.result == AlgResult.iterate.value):
629 B2INFO(f
"Committing payloads for {iov_from_runs([current_runs])}.")
630 self.
machinemachine.algorithm.algorithm.commit()
635 elif (self.
machinemachine.result.result == AlgResult.not_enough_data.value):
636 B2INFO(f
"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
640 elif self.
machinemachine.result.result == AlgResult.failure.value:
641 B2ERROR(f
"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
649 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
657 Algorithm strategy to first calculate run boundaries where execution should be attempted.
658 Runs the algorithm over the input data contained within the requested IoV of the boundaries,
659 starting with the first boundary data only.
660 If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
661 but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
662 but using run boundaries instead of runs directly.
663 Notice that boundaries cannot span multiple experiments.
665 By default the algorithm will get the payload boundaries directly from the algorithm that need to
666 have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
667 is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
668 the need to define the ``isBoundaryRequired`` function.
670 ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
671 experiment will be added if not already present. An empty list will thus produce a single payload for each
672 experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
673 behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
679 "payload_boundaries": []
683 allowed_granularities = [
"run"]
694 def run(self, iov, iteration, queue):
696 Runs the algorithm machine over the collected data and fills the results.
699 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
701 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
704 machine_params[
"database_chain"] = self.
database_chaindatabase_chain
706 machine_params[
"output_dir"] = self.
output_diroutput_dir
708 machine_params[
"input_files"] = self.
input_filesinput_files
709 machine_params[
"ignored_runs"] = self.
ignored_runsignored_runs
712 self.
machinemachine.setup_algorithm(iteration=iteration)
714 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
716 all_runs_collected = runs_from_vector(self.
algorithmalgorithm.algorithm.getRunListFromAllData())
719 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
721 runs_to_execute = all_runs_collected[:]
725 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
726 runs_to_execute.difference_update(set(self.
ignored_runsignored_runs))
728 runs_to_execute = sorted(runs_to_execute)
733 runs_to_execute = split_runs_by_exp(runs_to_execute)
738 if "iov_coverage" in self.
algorithmalgorithm.params:
739 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
740 iov_coverage = self.
algorithmalgorithm.params[
"iov_coverage"]
742 payload_boundaries =
None
743 if "payload_boundaries" in self.
algorithmalgorithm.params:
744 B2INFO(f
"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
745 payload_boundaries = self.
algorithmalgorithm.params[
"payload_boundaries"]
747 number_of_experiments = len(runs_to_execute)
748 B2INFO(f
"We are iterating over {number_of_experiments} experiments.")
751 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
752 B2DEBUG(26, f
"Run List for this experiment={run_list}")
753 current_experiment = run_list[0].exp
754 B2INFO(f
"Executing over data from experiment {current_experiment}")
761 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
763 lowest_exprun = run_list[0]
766 lowest_exprun = ExpRun(current_experiment, 0)
769 if iov_coverage
and i_exp == number_of_experiments:
770 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
773 elif i_exp < number_of_experiments:
774 highest_exprun = ExpRun(current_experiment, -1)
777 highest_exprun = run_list[-1]
780 vec_run_list = vector_from_runs(run_list)
781 if payload_boundaries
is None:
783 B2INFO(
"Attempting to find payload boundaries.")
784 vec_boundaries = self.
algorithmalgorithm.algorithm.findPayloadBoundaries(vec_run_list)
787 if vec_boundaries.empty():
788 B2ERROR(
"No boundaries found but we are in a strategy that requires them! Failing...")
792 vec_boundaries = runs_from_vector(vec_boundaries)
795 B2INFO(f
"Using as payload boundaries {payload_boundaries}.")
796 vec_boundaries = [ExpRun(exp, run)
for exp, run
in payload_boundaries]
801 run_boundaries = sorted([er
for er
in vec_boundaries
if er.exp == current_experiment])
805 first_exprun = ExpRun(current_experiment, 0)
806 if first_exprun
not in run_boundaries:
807 B2WARNING(f
"No boundary found at ({current_experiment}, 0), adding it.")
808 run_boundaries[0:0] = [first_exprun]
809 B2INFO((f
"Found {len(run_boundaries)} boundaries for this experiment. "
810 "Checking if we have some data for all boundary IoVs..."))
813 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
814 B2DEBUG(26, f
"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
816 boundary_iovs_to_run_lists = {key: value
for key, value
in boundary_iovs_to_run_lists.items()
if value}
817 B2DEBUG(26, f
"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
819 new_boundary_iovs_to_run_lists = {}
820 previous_boundary_iov =
None
821 previous_boundary_run_list =
None
822 for boundary_iov, run_list
in boundary_iovs_to_run_lists.items():
823 if not previous_boundary_iov:
824 previous_boundary_iov = boundary_iov
825 previous_boundary_run_list = run_list
828 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
829 B2WARNING(
"Gap in boundary IoVs found before execution! "
830 "Will correct it by extending the previous boundary up to the next one.")
831 B2INFO(f
"Original boundary IoV={previous_boundary_iov}")
832 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
833 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
834 B2INFO(f
"New boundary IoV={previous_boundary_iov}")
835 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
836 previous_boundary_iov = boundary_iov
837 previous_boundary_run_list = run_list
839 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
840 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
841 B2DEBUG(26, f
"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
843 success = self.
execute_over_boundariesexecute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
853 B2WARNING(
"There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
855 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
866 Take the previously found boundaries and the run lists they correspond to and actually perform the
867 Algorithm execution. This is assumed to be for a single experiment.
870 remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
875 current_boundary_iov =
None
880 last_successful_payloads =
None
881 last_successful_result =
None
883 last_successful_runs = []
885 last_successful_iov =
None
889 if not last_successful_result:
892 if not remaining_boundary_iovs:
894 B2ERROR(
"No boundaries found for the current experiment's run list. Failing the strategy.")
897 B2INFO(
"This appears to be the first attempted execution of the experiment.")
899 current_boundary_iov = remaining_boundary_iovs.pop(0)
900 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
902 if not remaining_boundary_iovs:
903 current_iov = IoV(*lowest_exprun, *highest_exprun)
905 current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
909 if not remaining_boundary_iovs:
911 B2ERROR(
"Not enough data found for the current experiment's run list. Failing the strategy.")
914 B2INFO(
"There wasn't enough data previously. Merging with the runs from the next boundary.")
916 next_boundary_iov = remaining_boundary_iovs.pop(0)
917 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
918 next_boundary_iov.exp_high, next_boundary_iov.run_high)
919 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
921 if not remaining_boundary_iovs:
922 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
924 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
925 current_boundary_iov.exp_high, current_boundary_iov.run_high)
927 self.
execute_runsexecute_runs(current_runs, iteration, current_iov)
932 B2INFO(
"Found a success. Will save the payloads for later.")
934 last_successful_payloads = self.
machinemachine.algorithm.algorithm.getPayloadValues()
935 last_successful_result = self.
machinemachine.result
936 last_successful_runs = current_runs[:]
937 last_successful_iov = current_iov
940 current_boundary_iov =
None
944 elif self.
machinemachine.result.result == AlgResult.not_enough_data.value:
945 B2INFO(
"Not Enough Data result.")
950 B2ERROR(
"Hit a failure or some kind of result we can't continue from. Failing out...")
958 if not remaining_boundary_iovs:
960 B2INFO(
"Finished this experiment's boundaries. "
961 f
"Committing remaining payloads from {last_successful_result.iov}")
962 self.
machinemachine.algorithm.algorithm.commit(last_successful_payloads)
963 self.
resultsresults.append(last_successful_result)
964 self.
send_resultsend_result(last_successful_result)
968 current_boundary_iov = remaining_boundary_iovs.pop(0)
969 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
971 if not remaining_boundary_iovs:
972 current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
974 current_iov = current_boundary_iov
979 if not remaining_boundary_iovs:
980 B2INFO(
"We have no remaining runs to increase the amount of data. "
981 "Instead we will merge with the previous successful runs.")
983 new_current_runs = last_successful_runs[:]
984 new_current_runs.extend(current_runs)
985 current_runs = new_current_runs[:]
986 current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
987 current_iov.exp_high, current_iov.run_high)
989 last_successful_payloads = []
990 last_successful_result =
None
991 last_successful_runs = []
992 last_successful_iov =
None
995 B2INFO(
"Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
997 next_boundary_iov = remaining_boundary_iovs.pop(0)
998 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
999 next_boundary_iov.exp_high, next_boundary_iov.run_high)
1001 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
1003 if not remaining_boundary_iovs:
1004 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
1006 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1007 current_boundary_iov.exp_high, current_boundary_iov.run_high)
1009 self.
execute_runsexecute_runs(current_runs, iteration, current_iov)
1014 B2INFO(
"Found a success.")
1015 if last_successful_result:
1016 B2INFO(
"Can now commit the previous success.")
1017 self.
machinemachine.algorithm.algorithm.commit(last_successful_payloads)
1018 self.
resultsresults.append(last_successful_result)
1019 self.
send_resultsend_result(last_successful_result)
1021 last_successful_payloads = self.
machinemachine.algorithm.algorithm.getPayloadValues()
1022 last_successful_result = self.
machinemachine.result
1023 last_successful_runs = current_runs[:]
1024 last_successful_iov = current_iov
1027 current_boundary_iov =
None
1029 self.
machinemachine.complete()
1031 elif self.
machinemachine.result.result == AlgResult.not_enough_data.value:
1032 B2INFO(
"Not Enough Data result.")
1034 self.
machinemachine.complete()
1037 B2ERROR(
"Hit a failure or some other result we can't continue from. Failing out...")
1041 def execute_runs(self, runs, iteration, iov):
1044 self.
machinemachine.setup_algorithm()
1048 B2INFO(f
"Executing and applying {iov} to the payloads.")
1049 self.
machinemachine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1050 B2INFO(f
"Finished execution with result code {self.machine.result.result}.")
1052 def alg_success(self):
1053 return ((self.
machinemachine.result.result == AlgResult.ok.value)
or (self.
machinemachine.result.result == AlgResult.iterate.value))
1058 Basic Exception for this type of class.
list required_true_attrs
Attributes that must have a value that returns True when tested by :py:meth:is_valid.
output_database_dir
The output database directory for the localdb that the algorithm will commit to.
string COMPLETED
Completed state.
ignored_runs
Runs that will not be included in ANY execution of the algorithm.
def send_result(self, result)
input_files
Collector output files, will contain all files retured by the output patterns.
string FAILED
Failed state.
results
The list of results objects which will be sent out before the end.
def run(self, iov, iteration, queue)
def send_final_state(self, state)
list required_attrs
Required attributes that must exist before the strategy can run properly.
algorithm
Algorithm() class that we're running.
database_chain
User defined database chain i.e.
def setup_from_dict(self, params)
def __init__(self, algorithm)
dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
output_dir
The algorithm output directory which is mostly used to store the stdout file.
queue
The multiprocessing Queue we use to pass back results one at a time.
def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
def execute_runs(self, runs, iteration, iov)
def __init__(self, algorithm)
def apply_experiment_settings(self, algorithm, experiment)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
def __init__(self, algorithm)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
def __init__(self, algorithm)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
def __init__(self, algorithm)