698 def run(self, iov, iteration, queue):
700 Runs the algorithm machine over the collected data and fills the results.
703 raise StrategyError(
"This AlgorithmStrategy was not set up correctly!")
705 B2INFO(f
"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
710 machine_params[
"output_dir"] = self.
output_dir
716 self.
machine.setup_algorithm(iteration=iteration)
718 B2INFO(f
"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
720 all_runs_collected = runs_from_vector(self.
algorithm.algorithm.getRunListFromAllData())
723 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
725 runs_to_execute = all_runs_collected[:]
729 B2INFO(f
"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
730 runs_to_execute.difference_update(set(self.
ignored_runs))
732 runs_to_execute = sorted(runs_to_execute)
737 runs_to_execute = split_runs_by_exp(runs_to_execute)
742 if "iov_coverage" in self.
algorithm.params:
743 B2INFO(f
"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
744 iov_coverage = self.
algorithm.params[
"iov_coverage"]
746 payload_boundaries =
None
747 if "payload_boundaries" in self.
algorithm.params:
748 B2INFO(f
"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
749 payload_boundaries = self.
algorithm.params[
"payload_boundaries"]
751 number_of_experiments = len(runs_to_execute)
752 B2INFO(f
"We are iterating over {number_of_experiments} experiments.")
755 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
756 B2DEBUG(26, f
"Run List for this experiment={run_list}")
757 current_experiment = run_list[0].exp
758 B2INFO(f
"Executing over data from experiment {current_experiment}")
765 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
767 lowest_exprun = run_list[0]
770 lowest_exprun = ExpRun(current_experiment, 0)
773 if iov_coverage
and i_exp == number_of_experiments:
774 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
777 elif i_exp < number_of_experiments:
778 highest_exprun = ExpRun(current_experiment, -1)
781 highest_exprun = run_list[-1]
784 vec_run_list = vector_from_runs(run_list)
785 if payload_boundaries
is None:
787 B2INFO(
"Attempting to find payload boundaries.")
788 vec_boundaries = self.
algorithm.algorithm.findPayloadBoundaries(vec_run_list)
791 if vec_boundaries.empty():
792 B2ERROR(
"No boundaries found but we are in a strategy that requires them! Failing...")
796 vec_boundaries = runs_from_vector(vec_boundaries)
799 B2INFO(f
"Using as payload boundaries {payload_boundaries}.")
800 vec_boundaries = [ExpRun(exp, run)
for exp, run
in payload_boundaries]
805 run_boundaries = sorted([er
for er
in vec_boundaries
if er.exp == current_experiment])
809 first_exprun = ExpRun(current_experiment, 0)
810 if first_exprun
not in run_boundaries:
811 B2WARNING(f
"No boundary found at ({current_experiment}, 0), adding it.")
812 run_boundaries[0:0] = [first_exprun]
813 B2INFO(f
"Found {len(run_boundaries)} boundaries for this experiment. "
814 "Checking if we have some data for all boundary IoVs...")
817 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
818 B2DEBUG(26, f
"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
820 boundary_iovs_to_run_lists = {key: value
for key, value
in boundary_iovs_to_run_lists.items()
if value}
821 B2DEBUG(26, f
"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
823 new_boundary_iovs_to_run_lists = {}
824 previous_boundary_iov =
None
825 previous_boundary_run_list =
None
826 for boundary_iov, run_list
in boundary_iovs_to_run_lists.items():
827 if not previous_boundary_iov:
828 previous_boundary_iov = boundary_iov
829 previous_boundary_run_list = run_list
832 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
833 B2WARNING(
"Gap in boundary IoVs found before execution! "
834 "Will correct it by extending the previous boundary up to the next one.")
835 B2INFO(f
"Original boundary IoV={previous_boundary_iov}")
836 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
837 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
838 B2INFO(f
"New boundary IoV={previous_boundary_iov}")
839 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
840 previous_boundary_iov = boundary_iov
841 previous_boundary_run_list = run_list
843 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
844 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
845 B2DEBUG(26, f
"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
857 B2WARNING(
"There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
859 with open(f
"{self.algorithm.name}_iov_gaps.json",
"w")
as f:
870 Take the previously found boundaries and the run lists they correspond to and actually perform the
871 Algorithm execution. This is assumed to be for a single experiment.
874 remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
879 current_boundary_iov =
None
884 last_successful_payloads =
None
885 last_successful_result =
None
887 last_successful_runs = []
889 last_successful_iov =
None
893 if not last_successful_result:
896 if not remaining_boundary_iovs:
898 B2ERROR(
"No boundaries found for the current experiment's run list. Failing the strategy.")
901 B2INFO(
"This appears to be the first attempted execution of the experiment.")
903 current_boundary_iov = remaining_boundary_iovs.pop(0)
904 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
906 if not remaining_boundary_iovs:
907 current_iov = IoV(*lowest_exprun, *highest_exprun)
909 current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
913 if not remaining_boundary_iovs:
915 B2ERROR(
"Not enough data found for the current experiment's run list. Failing the strategy.")
918 B2INFO(
"There wasn't enough data previously. Merging with the runs from the next boundary.")
920 next_boundary_iov = remaining_boundary_iovs.pop(0)
921 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
922 next_boundary_iov.exp_high, next_boundary_iov.run_high)
923 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
925 if not remaining_boundary_iovs:
926 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
928 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
929 current_boundary_iov.exp_high, current_boundary_iov.run_high)
936 B2INFO(
"Found a success. Will save the payloads for later.")
938 last_successful_payloads = self.
machine.algorithm.algorithm.getPayloadValues()
939 last_successful_result = self.
machine.result
940 last_successful_runs = current_runs[:]
941 last_successful_iov = current_iov
944 current_boundary_iov =
None
948 elif self.
machine.result.result == AlgResult.not_enough_data.value:
949 B2INFO(
"Not Enough Data result.")
954 B2ERROR(
"Hit a failure or some kind of result we can't continue from. Failing out...")
962 if not remaining_boundary_iovs:
964 B2INFO(
"Finished this experiment's boundaries. "
965 f
"Committing remaining payloads from {last_successful_result.iov}")
966 self.
machine.algorithm.algorithm.commit(last_successful_payloads)
967 self.
results.append(last_successful_result)
972 current_boundary_iov = remaining_boundary_iovs.pop(0)
973 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
975 if not remaining_boundary_iovs:
976 current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
978 current_iov = current_boundary_iov
983 if not remaining_boundary_iovs:
984 B2INFO(
"We have no remaining runs to increase the amount of data. "
985 "Instead we will merge with the previous successful runs.")
987 new_current_runs = last_successful_runs[:]
988 new_current_runs.extend(current_runs)
989 current_runs = new_current_runs[:]
990 current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
991 current_iov.exp_high, current_iov.run_high)
993 last_successful_payloads = []
994 last_successful_result =
None
995 last_successful_runs = []
996 last_successful_iov =
None
999 B2INFO(
"Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
1001 next_boundary_iov = remaining_boundary_iovs.pop(0)
1002 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
1003 next_boundary_iov.exp_high, next_boundary_iov.run_high)
1005 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
1007 if not remaining_boundary_iovs:
1008 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
1010 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1011 current_boundary_iov.exp_high, current_boundary_iov.run_high)
1013 self.
execute_runs(current_runs, iteration, current_iov)
1018 B2INFO(
"Found a success.")
1019 if last_successful_result:
1020 B2INFO(
"Can now commit the previous success.")
1021 self.
machine.algorithm.algorithm.commit(last_successful_payloads)
1022 self.
results.append(last_successful_result)
1025 last_successful_payloads = self.
machine.algorithm.algorithm.getPayloadValues()
1026 last_successful_result = self.
machine.result
1027 last_successful_runs = current_runs[:]
1028 last_successful_iov = current_iov
1031 current_boundary_iov =
None
1035 elif self.
machine.result.result == AlgResult.not_enough_data.value:
1036 B2INFO(
"Not Enough Data result.")
1041 B2ERROR(
"Hit a failure or some other result we can't continue from. Failing out...")