103 def run(self, iov, iteration):
104 """
105 """
106 from caf.strategies import AlgorithmStrategy
107 B2INFO(f"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
108
109 strategies = []
110 for algorithm in self.algorithms:
111
112 strategy = algorithm.strategy(algorithm)
113
114 strategy_params = {}
115 strategy_params["database_chain"] = self.database_chain
116 strategy_params["dependent_databases"] = self.dependent_databases
117 strategy_params["output_dir"] = self.output_dir
118 strategy_params["output_database_dir"] = self.output_database_dir
119 strategy_params["input_files"] = self.input_files
120 strategy_params["ignored_runs"] = self.ignored_runs
121 strategy.setup_from_dict(strategy_params)
122 strategies.append(strategy)
123
124
125 ctx = multiprocessing.get_context("fork")
126 for strategy in strategies:
127 queue = multiprocessing.SimpleQueue()
128 child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
129 args=(strategy, iov, iteration, queue))
130
131 self.results[strategy.algorithm.name] = []
132 B2INFO(f"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
133 B2INFO("Logging will be diverted into algorithm output.")
134 child.start()
135 final_state = None
136 final_loop = False
137
138 B2INFO(f"Collecting results for {strategy.algorithm.name}.")
139 while True:
140
141 while not queue.empty():
142 output = queue.get()
143 B2DEBUG(29, f"Result from queue was {output}")
144 if output["type"] == "result":
145 self.results[strategy.algorithm.name].append(output["value"])
146 elif output["type"] == "final_state":
147 final_state = output["value"]
148 else:
149 raise RunnerError(f"Unknown result output: {output}")
150
151
152 if child.is_alive():
153 time.sleep(5)
154 continue
155 else:
156
157 if final_state:
158
159 if child.exitcode == 0:
160 B2INFO(f"AlgorithmStrategy subprocess for {strategy.algorithm.name} exited")
161 break
162 else:
163 raise RunnerError(f"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
164
165 else:
166
167 if not final_loop:
168 final_loop = True
169 continue
170 else:
171 raise RunnerError(f"Strategy for {strategy.algorithm.name} "
172 "exited subprocess but without a final state!")
173
174
175 if final_state == AlgorithmStrategy.FAILED:
176 B2ERROR(f"AlgorithmStrategy for {strategy.algorithm.name} failed. We will not proceed with any more algorithms")
177 self.final_state = self.FAILED
178 break
179
180 B2DEBUG(29, f"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
181
182 if self.final_state != self.FAILED:
183 B2INFO(f"SequentialAlgorithmsRunner finished for Calibration {self.name}")
184 self.final_state = self.COMPLETED
185