698 def run(self, iov, iteration, queue):
699 """
700 Runs the algorithm machine over the collected data and fills the results.
701 """
702 if not self.is_valid():
703 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
704 self.queue = queue
705 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
706
707 machine_params = {}
708 machine_params["database_chain"] = self.database_chain
709 machine_params["dependent_databases"] = self.dependent_databases
710 machine_params["output_dir"] = self.output_dir
711 machine_params["output_database_dir"] = self.output_database_dir
712 machine_params["input_files"] = self.input_files
713 machine_params["ignored_runs"] = self.ignored_runs
714 self.machine.setup_from_dict(machine_params)
715
716 self.machine.setup_algorithm(iteration=iteration)
717
718 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
719 runs_to_execute = []
720 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
721
722 if iov:
723 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
724 else:
725 runs_to_execute = all_runs_collected[:]
726
727
728 if self.ignored_runs:
729 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
730 runs_to_execute.difference_update(set(self.ignored_runs))
731
732 runs_to_execute = sorted(runs_to_execute)
733
734
735
736
737 runs_to_execute = split_runs_by_exp(runs_to_execute)
738
739
740
741 iov_coverage = None
742 if "iov_coverage" in self.algorithm.params:
743 B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
744 iov_coverage = self.algorithm.params["iov_coverage"]
745
746 payload_boundaries = None
747 if "payload_boundaries" in self.algorithm.params:
748 B2INFO(f"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
749 payload_boundaries = self.algorithm.params["payload_boundaries"]
750
751 number_of_experiments = len(runs_to_execute)
752 B2INFO(f"We are iterating over {number_of_experiments} experiments.")
753
754
755 for i_exp, run_list in enumerate(runs_to_execute, start=1):
756 B2DEBUG(26, f"Run List for this experiment={run_list}")
757 current_experiment = run_list[0].exp
758 B2INFO(f"Executing over data from experiment {current_experiment}")
759
760
761
762
763 if i_exp == 1:
764 if iov_coverage:
765 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
766 else:
767 lowest_exprun = run_list[0]
768
769 else:
770 lowest_exprun = ExpRun(current_experiment, 0)
771
772
773 if iov_coverage and i_exp == number_of_experiments:
774 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
775
776
777 elif i_exp < number_of_experiments:
778 highest_exprun = ExpRun(current_experiment, -1)
779
780 else:
781 highest_exprun = run_list[-1]
782
783
784 vec_run_list = vector_from_runs(run_list)
785 if payload_boundaries is None:
786
787 B2INFO("Attempting to find payload boundaries.")
788 vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list)
789
790
791 if vec_boundaries.empty():
792 B2ERROR("No boundaries found but we are in a strategy that requires them! Failing...")
793
794 self.send_final_state(self.FAILED)
795 break
796 vec_boundaries = runs_from_vector(vec_boundaries)
797 else:
798
799 B2INFO(f"Using as payload boundaries {payload_boundaries}.")
800 vec_boundaries = [ExpRun(exp, run) for exp, run in payload_boundaries]
801
802
803
804
805 run_boundaries = sorted([er for er in vec_boundaries if er.exp == current_experiment])
806
807
808
809 first_exprun = ExpRun(current_experiment, 0)
810 if first_exprun not in run_boundaries:
811 B2WARNING(f"No boundary found at ({current_experiment}, 0), adding it.")
812 run_boundaries[0:0] = [first_exprun]
813 B2INFO(f"Found {len(run_boundaries)} boundaries for this experiment. "
814 "Checking if we have some data for all boundary IoVs...")
815
816
817 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
818 B2DEBUG(26, f"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
819
820 boundary_iovs_to_run_lists = {key: value for key, value in boundary_iovs_to_run_lists.items() if value}
821 B2DEBUG(26, f"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
822
823 new_boundary_iovs_to_run_lists = {}
824 previous_boundary_iov = None
825 previous_boundary_run_list = None
826 for boundary_iov, run_list in boundary_iovs_to_run_lists.items():
827 if not previous_boundary_iov:
828 previous_boundary_iov = boundary_iov
829 previous_boundary_run_list = run_list
830 continue
831
832 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
833 B2WARNING("Gap in boundary IoVs found before execution! "
834 "Will correct it by extending the previous boundary up to the next one.")
835 B2INFO(f"Original boundary IoV={previous_boundary_iov}")
836 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
837 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
838 B2INFO(f"New boundary IoV={previous_boundary_iov}")
839 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
840 previous_boundary_iov = boundary_iov
841 previous_boundary_run_list = run_list
842 else:
843 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
844 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
845 B2DEBUG(26, f"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
846
847 success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
848 if not success:
849
850 self.send_final_state(self.FAILED)
851 break
852
853 else:
854
855 gaps = self.find_iov_gaps()
856 if gaps:
857 B2WARNING("There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
858
859 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
860 json.dump(gaps, f)
861
862
863 if self.any_failed_iov():
864 self.send_final_state(self.FAILED)
865 else:
866 self.send_final_state(self.COMPLETED)
867