Belle II Software development
strategies.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
15from caf.utils import AlgResult
16from caf.utils import B2INFO_MULTILINE
17from caf.utils import runs_overlapping_iov, runs_from_vector
18from caf.utils import iov_from_runs, split_runs_by_exp, vector_from_runs
19from caf.utils import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
20from caf.utils import IoV, ExpRun
21from caf.state_machines import AlgorithmMachine
22
23from abc import ABC, abstractmethod
24import json
25
26
27class AlgorithmStrategy(ABC):
28 """
29 Base class for Algorithm strategies. These do the actual execution of a single
30 algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
31 how database payloads are passed between executions, and whether or not final payloads have an IoV
32 that is independent to the actual runs used to calculates them.
33
34 Parameters:
35 algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
36
37 This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
38 When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
39
40 If you define your derived class with an __init__ method, then you should first call the base class
41 `AlgorithmStrategy.__init__()` method via super() e.g.
42
43 >>> def __init__(self):
44 >>> super().__init__()
45
46 The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
47 in the required way defined by the options you have selected/attributes set.
48 """
49
51 required_attrs = ["algorithm",
52 "database_chain",
53 "dependent_databases",
54 "output_dir",
55 "output_database_dir",
56 "input_files",
57 "ignored_runs"
58 ]
59
60
61 required_true_attrs = ["algorithm",
62 "output_dir",
63 "output_database_dir",
64 "input_files"
65 ]
66
67
68 allowed_granularities = ["run", "all"]
69
70
71 FINISHED_RESULTS = "DONE"
72
73
74 COMPLETED = "COMPLETED"
75
76
77 FAILED = "FAILED"
78
79 def __init__(self, algorithm):
80 """
81 """
82
83 self.algorithm = algorithm
84
85 self.input_files = []
86
87 self.output_dir = ""
88
89 self.output_database_dir = ""
90
91 self.database_chain = []
92
93 self.dependent_databases = []
94
96 self.ignored_runs = []
97
98 self.results = []
99
100 self.queue = None
101
102 @abstractmethod
103 def run(self, iov, iteration, queue):
104 """
105 Abstract method that needs to be implemented. It will be called to actually execute the
106 algorithm.
107 """
108
109 def setup_from_dict(self, params):
110 """
111 Parameters:
112 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
113 """
114 for attribute_name, value in params.items():
115 setattr(self, attribute_name, value)
116
117 def is_valid(self):
118 """
119 Returns:
120 bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
121 """
122 B2INFO("Checking validity of current AlgorithmStrategy setup.")
123 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
124 for attribute_name in self.required_attrs:
125 if not hasattr(self, attribute_name):
126 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
127 return False
128 # Check if any attributes that need actual values haven't been set or were empty
129 for attribute_name in self.required_true_attrs:
130 if not getattr(self, attribute_name):
131 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.")
132 return False
133 return True
134
135 def find_iov_gaps(self):
136 """
137 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
138 not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
139 within the same experiment are found.
140
141 Returns:
142 iov_gaps(list[IoV])
143 """
144 iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.results]))
145 if iov_gaps:
146 gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."]
147 gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.")
148 gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs")
149 gap_msg.append("unless you edit the final database.txt yourself.")
150 B2INFO_MULTILINE(gap_msg)
151 for iov in iov_gaps:
152 B2INFO(f"{iov} not covered by any execution of the algorithm.")
153 return iov_gaps
154
155 def any_failed_iov(self):
156 """
157 Returns:
158 bool: If any result in the current results list has a failed algorithm code we return True
159 """
160 failed_results = []
161 for result in self.results:
162 if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value:
163 failed_results.append(result)
164 if failed_results:
165 B2WARNING("Failed results found.")
166 for result in failed_results:
167 if result.result == AlgResult.failure.value:
168 B2ERROR(f"c_Failure returned for {result.iov}.")
169 elif result.result == AlgResult.not_enough_data.value:
170 B2WARNING(f"c_NotEnoughData returned for {result.iov}.")
171 return True
172 else:
173 return False
174
175 def send_result(self, result):
176 self.queue.put({"type": "result", "value": result})
177
178 def send_final_state(self, state):
179 self.queue.put({"type": "final_state", "value": state})
180
181
182class SingleIOV(AlgorithmStrategy):
183 """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
184 data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
185 that was executed.
186
187 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
188 a CalibrationAlgorithm C++ class directly.
189"""
190
192 usable_params = {"apply_iov": IoV}
193
194 def __init__(self, algorithm):
195 """
196 """
197 super().__init__(algorithm)
198
200 self.machine = AlgorithmMachine(self.algorithm)
201
202 def run(self, iov, iteration, queue):
203 """
204 Runs the algorithm machine over the collected data and fills the results.
205 """
206 if not self.is_valid():
207 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
208 self.queue = queue
209
210 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
211 # Now add all the necessary parameters for a strategy to run
212 machine_params = {}
213 machine_params["database_chain"] = self.database_chain
214 machine_params["dependent_databases"] = self.dependent_databases
215 machine_params["output_dir"] = self.output_dir
216 machine_params["output_database_dir"] = self.output_database_dir
217 machine_params["input_files"] = self.input_files
218 machine_params["ignored_runs"] = self.ignored_runs
219 self.machine.setup_from_dict(machine_params)
220 # Start moving through machine states
221 B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
222 self.machine.setup_algorithm(iteration=iteration)
223 # After this point, the logging is in the stdout of the algorithm
224 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
225
226 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
227 # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
228 if iov:
229 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
230 else:
231 runs_to_execute = all_runs_collected
232
233 # Remove the ignored runs from our run list to execute
234 if self.ignored_runs:
235 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
236 runs_to_execute.difference_update(set(self.ignored_runs))
237 # Sets aren't ordered so lets go back to lists and sort
238 runs_to_execute = sorted(runs_to_execute)
239 apply_iov = None
240 if "apply_iov" in self.algorithm.params:
241 apply_iov = self.algorithm.params["apply_iov"]
242 self.machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
243 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
244
245 # Send out the result to the runner
246 self.send_result(self.machine.result)
247
248 # Make sure the algorithm state and commit is done
249 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
250 # Valid exit codes mean we can complete properly
251 self.machine.complete()
252 # Commit all the payloads and send out the results
253 self.machine.algorithm.algorithm.commit()
254 self.send_final_state(self.COMPLETED)
255 else:
256 # Either there wasn't enough data or the algorithm failed
257 self.machine.fail()
258 self.send_final_state(self.FAILED)
259
260
261class SequentialRunByRun(AlgorithmStrategy):
262 """
263 Algorithm strategy to do run-by-run calibration of collected data.
264 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
265 If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
266 the next run's data and tries again.
267
268 Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
269 and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
270 is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
271 are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
272
273 Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
274 This means that there shouldn't be any IoVs that don't get a new payload by the end of running an iteration.
275
276 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
277 a CalibrationAlgorithm C++ class directly.
278"""
279
281 usable_params = {
282 "has_experiment_settings": bool,
283 "iov_coverage": IoV,
284 "step_size": int
285 }
286
287
288 allowed_granularities = ["run"]
289
290 def __init__(self, algorithm):
291 """
292 """
293 super().__init__(algorithm)
294
296 self.machine = AlgorithmMachine(self.algorithm)
297 if "step_size" not in self.algorithm.params:
298 self.algorithm.params["step_size"] = 1
299 self.first_execution = True
300
301 def apply_experiment_settings(self, algorithm, experiment):
302 """
303 Apply experiment-dependent settings.
304 This is the default version, which does not do anything.
305 If necessary, it should be reimplemented by derived classes.
306 """
307 return
308
309 def run(self, iov, iteration, queue):
310 """
311 Runs the algorithm machine over the collected data and fills the results.
312 """
313 if not self.is_valid():
314 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
315 self.queue = queue
316 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
317 # Now add all the necessary parameters for a strategy to run
318 machine_params = {}
319 machine_params["database_chain"] = self.database_chain
320 machine_params["dependent_databases"] = self.dependent_databases
321 machine_params["output_dir"] = self.output_dir
322 machine_params["output_database_dir"] = self.output_database_dir
323 machine_params["input_files"] = self.input_files
324 machine_params["ignored_runs"] = self.ignored_runs
325 self.machine.setup_from_dict(machine_params)
326 # Start moving through machine states
327 self.machine.setup_algorithm(iteration=iteration)
328 # After this point, the logging is in the stdout of the algorithm
329 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
330 runs_to_execute = []
331 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
332 # If we were given a specific IoV to calibrate we just execute over runs in that IoV
333 if iov:
334 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
335 else:
336 runs_to_execute = all_runs_collected[:]
337
338 # Remove the ignored runs from our run list to execute
339 if self.ignored_runs:
340 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
341 runs_to_execute.difference_update(set(self.ignored_runs))
342 # Sets aren't ordered so lets go back to lists and sort
343 runs_to_execute = sorted(runs_to_execute)
344
345 # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
346 # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
347 # separately and prevent IoVs from crossing the boundary.
348 runs_to_execute = split_runs_by_exp(runs_to_execute)
349
350 # Now iterate through the experiments, executing runs in blocks of 'step_size'. We DO NOT allow a payload IoV to
351 # extend over multiple experiments, only multiple runs
352 iov_coverage = None
353 if "iov_coverage" in self.algorithm.params:
354 B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
355 iov_coverage = self.algorithm.params["iov_coverage"]
356
357 number_of_experiments = len(runs_to_execute)
358 # Iterate over experiment run lists
359 for i_exp, run_list in enumerate(runs_to_execute, start=1):
360
361 # Apply experiment-dependent settings.
362 if "has_experiment_settings" in self.algorithm.params:
363 if self.algorithm.params["has_experiment_settings"]:
364 self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
365
366 # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
367 # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
368 # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
369 # This is only true for the lowest and highest experiments in our input data.
370 # If we have multiple experiments the iov must be adjusted to avoid gaps at the iov boundaries
371 lowest_exprun = ExpRun(run_list[0].exp, 0)
372 highest_exprun = ExpRun(run_list[-1].exp, -1)
373
374 if i_exp == 1:
375 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low) if iov_coverage else run_list[0]
376 if i_exp == number_of_experiments:
377 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high) if iov_coverage else run_list[-1]
378
379 self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
380
381 # Print any knowable gaps between result IoVs, if any are foun there is a problem.
382 gaps = self.find_iov_gaps()
383 # Dump them to a file for logging
384 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
385 json.dump(gaps, f)
386
387 # If any results weren't successes we fail
388 if self.any_failed_iov():
389 self.send_final_state(self.FAILED)
390 else:
391 self.send_final_state(self.COMPLETED)
392
393 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
394 # The runs (data) we have left to execute from this run list
395 remaining_runs = run_list[:]
396 # The previous execution's runs
397 previous_runs = []
398 # The current runs we are executing
399 current_runs = []
400 # The last successful payload and result
401 last_successful_payloads = None
402 last_successful_result = None
403
404 # Iterate over ExpRuns within an experiment in chunks of 'step_size'
405 for expruns in grouper(self.algorithm.params["step_size"], run_list):
406 # Already set up earlier the first time, so we shouldn't do it again
407 if not self.first_execution:
408 self.machine.setup_algorithm()
409 else:
410 self.first_execution = False
411
412 # Add on the next step of runs
413 current_runs.extend(expruns)
414 # Remove them from our remaining runs
415 remaining_runs = [run for run in remaining_runs if run not in current_runs]
416
417 # Is this the first payload of the experiment
418 if not last_successful_result:
419 B2INFO("Detected that this will be the first payload of this experiment.")
420 # If this is the first payload but we have other data, we need the IoV to cover from the
421 # lowest IoV extent requested up to the ExpRun right before the next run in the remaining runs list.
422 if remaining_runs:
423 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
424 # If this is the first payload but there isn't more data, we set the IoV to cover the full range
425 else:
426 B2INFO("Detected that this will be the only payload of the experiment.")
427 apply_iov = IoV(*lowest_exprun, *highest_exprun)
428 # If there were previous successes
429 else:
430 if not remaining_runs:
431 B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.")
432 apply_iov = IoV(*current_runs[0], *highest_exprun)
433 # Otherwise, it's just a normal IoV in the middle.
434 else:
435 B2INFO("Detected that there are more runs to execute in this experiment after this next execution.")
436 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
437
438 B2INFO(f"Executing and applying {apply_iov} to the payloads.")
439 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
440 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
441
442 # Does this count as a successful execution?
443 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
444 self.machine.complete()
445 # If we've succeeded but we have a previous success we can commit the previous payloads
446 # since we have new ones ready
447 if last_successful_payloads and last_successful_result:
448 B2INFO("Saving this execution's payloads to be committed later.")
449 # Save the payloads and result
450 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
451 new_successful_result = self.machine.result
452 B2INFO("We just succeeded in execution of the Algorithm."
453 f" Will now commit payloads from the previous success for {last_successful_result.iov}.")
454 self.machine.algorithm.algorithm.commit(last_successful_payloads)
455 self.results.append(last_successful_result)
456 self.send_result(last_successful_result)
457 # If there are remaining runs we need to have the current payloads ready to commit after the next execution
458 if remaining_runs:
459 last_successful_payloads = new_successful_payloads
460 last_successful_result = new_successful_result
461 # If there's not more runs to process we should also commit the new ones
462 else:
463 B2INFO("We have no more runs to process. "
464 f"Will now commit the most recent payloads for {new_successful_result.iov}.")
465 self.machine.algorithm.algorithm.commit(new_successful_payloads)
466 self.results.append(new_successful_result)
467 self.send_result(new_successful_result)
468 break
469 # if there's no previous success this must be the first run executed
470 else:
471 # Need to save payloads for later if we have a success but runs remain
472 if remaining_runs:
473 B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
474 # Save the payloads and result
475 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
476 last_successful_result = self.machine.result
477 # Need to commit and exit if we have a success and no remaining data
478 else:
479 B2INFO("We just succeeded in execution of the Algorithm."
480 " No runs left to be processed, so we are committing results of this execution.")
481 self.machine.algorithm.algorithm.commit()
482 self.results.append(self.machine.result)
483 self.send_result(self.machine.result)
484 break
485
486 previous_runs = current_runs[:]
487 current_runs = []
488 # If it wasn't successful, was it due to lack of data in the runs?
489 elif (self.machine.result.result == AlgResult.not_enough_data.value):
490 B2INFO(f"There wasn't enough data in {self.machine.result.iov}.")
491 if remaining_runs:
492 B2INFO("Some runs remain to be processed. "
493 f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
494 elif not remaining_runs and not last_successful_result:
495 B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success."
496 " There wasn't enough data in the full input data requested.")
497 self.results.append(self.machine.result)
498 self.send_result(self.machine.result)
499 self.machine.fail()
500 break
501 elif not remaining_runs and last_successful_result:
502 B2INFO("There aren't any more runs remaining to merge with. But we had a previous success"
503 ", so we'll merge with the previous IoV.")
504 final_runs = current_runs[:]
505 current_runs = previous_runs
506 current_runs.extend(final_runs)
507 self.machine.fail()
508 elif self.machine.result.result == AlgResult.failure.value:
509 B2ERROR(f"{self.algorithm.name} returned failure exit code.")
510 self.results.append(self.machine.result)
511 self.send_result(self.machine.result)
512 self.machine.fail()
513 break
514 else:
515 # Check if we need to run a final execution on the previous execution + dangling set of runs
516 if current_runs:
517 self.machine.setup_algorithm()
518 apply_iov = IoV(last_successful_result.iov.exp_low,
519 last_successful_result.iov.run_low,
520 *highest_exprun)
521 B2INFO(f"Executing on {apply_iov}.")
522 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
523 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
524 if (self.machine.result.result == AlgResult.ok.value) or (
525 self.machine.result.result == AlgResult.iterate.value):
526 self.machine.complete()
527 # Commit all the payloads and send out the results
528 self.machine.algorithm.algorithm.commit()
529 # Save the result
530 self.results.append(self.machine.result)
531 self.send_result(self.machine.result)
532 else:
533 # Save the result
534 self.results.append(self.machine.result)
535 self.send_result(self.machine.result)
536 # But failed
537 self.machine.fail()
538
539
540class SimpleRunByRun(AlgorithmStrategy):
541 """
542 Algorithm strategy to do run-by-run calibration of collected data.
543 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
544
545 This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
546 'not enough data' on the current run.
547
548 Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
549 next run (if any are left).
550 Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
551
552 .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
553 code.
554 The failure will prevent iteration/successful completion of the CAF though.
555
556 .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
557 enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
558 The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
559 However, you will still have the database created that covers all the successful runs.
560
561 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
562 a CalibrationAlgorithm C++ class directly.
563"""
564
565 allowed_granularities = ["run"]
566
568 usable_params = {}
569
570 def __init__(self, algorithm):
571 """
572 """
573 super().__init__(algorithm)
574
576 self.machine = AlgorithmMachine(self.algorithm)
577
578 def run(self, iov, iteration, queue):
579 """
580 Runs the algorithm machine over the collected data and fills the results.
581 """
582
583 if not self.is_valid():
584 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
585 self.queue = queue
586
587 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
588 # Now add all the necessary parameters for a strategy to run
589 machine_params = {}
590 machine_params["database_chain"] = self.database_chain
591 machine_params["dependent_databases"] = self.dependent_databases
592 machine_params["output_dir"] = self.output_dir
593 machine_params["output_database_dir"] = self.output_database_dir
594 machine_params["input_files"] = self.input_files
595 machine_params["ignored_runs"] = self.ignored_runs
596 self.machine.setup_from_dict(machine_params)
597 # Start moving through machine states
598 B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
599 self.machine.setup_algorithm(iteration=iteration)
600 # After this point, the logging is in the stdout of the algorithm
601 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
602
603 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
604 # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
605 if iov:
606 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
607 else:
608 runs_to_execute = all_runs_collected
609
610 # Remove the ignored runs from our run list to execute
611 if self.ignored_runs:
612 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
613 runs_to_execute.difference_update(set(self.ignored_runs))
614 # Sets aren't ordered so lets go back to lists and sort
615 runs_to_execute = sorted(runs_to_execute)
616
617 # Is this the first time executing the algorithm?
618 first_execution = True
619 for exprun in runs_to_execute:
620 if not first_execution:
621 self.machine.setup_algorithm()
622 current_runs = exprun
623 apply_iov = iov_from_runs([current_runs])
624 B2INFO(f"Executing on IoV = {apply_iov}.")
625 self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
626 first_execution = False
627 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
628 # Does this count as a successful execution?
629 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
630 # Commit the payloads and result
631 B2INFO(f"Committing payloads for {iov_from_runs([current_runs])}.")
632 self.machine.algorithm.algorithm.commit()
633 self.results.append(self.machine.result)
634 self.send_result(self.machine.result)
635 self.machine.complete()
636 # If it wasn't successful, was it due to lack of data in the runs?
637 elif (self.machine.result.result == AlgResult.not_enough_data.value):
638 B2INFO(f"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
639 self.results.append(self.machine.result)
640 self.send_result(self.machine.result)
641 self.machine.fail()
642 elif self.machine.result.result == AlgResult.failure.value:
643 B2ERROR(f"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
644 self.results.append(self.machine.result)
645 self.send_result(self.machine.result)
646 self.machine.fail()
647
648 # Print any knowable gaps between result IoVs, if any are foun there is a problem.
649 gaps = self.find_iov_gaps()
650 # Dump them to a file for logging
651 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
652 json.dump(gaps, f)
653
654 self.send_final_state(self.COMPLETED)
655
656
657class SequentialBoundaries(AlgorithmStrategy):
658 """
659 Algorithm strategy to first calculate run boundaries where execution should be attempted.
660 Runs the algorithm over the input data contained within the requested IoV of the boundaries,
661 starting with the first boundary data only.
662 If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
663 but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
664 but using run boundaries instead of runs directly.
665 Notice that boundaries cannot span multiple experiments.
666
667 By default the algorithm will get the payload boundaries directly from the algorithm that need to
668 have implemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
669 is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
670 the need to define the ``isBoundaryRequired`` function.
671
672 ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
673 experiment will be added if not already present. An empty list will thus produce a single payload for each
674 experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
675 behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
676 """
677
679 usable_params = {
680 "iov_coverage": IoV,
681 "payload_boundaries": [] # [(exp1, run1), (exp2, run2), ...]
682 }
683
684
685 allowed_granularities = ["run"]
686
687 def __init__(self, algorithm):
688 """
689 """
690 super().__init__(algorithm)
691
693 self.machine = AlgorithmMachine(self.algorithm)
694 self.first_execution = True
695
696 def run(self, iov, iteration, queue):
697 """
698 Runs the algorithm machine over the collected data and fills the results.
699 """
700 if not self.is_valid():
701 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
702 self.queue = queue
703 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
704 # Now add all the necessary parameters for a strategy to run
705 machine_params = {}
706 machine_params["database_chain"] = self.database_chain
707 machine_params["dependent_databases"] = self.dependent_databases
708 machine_params["output_dir"] = self.output_dir
709 machine_params["output_database_dir"] = self.output_database_dir
710 machine_params["input_files"] = self.input_files
711 machine_params["ignored_runs"] = self.ignored_runs
712 self.machine.setup_from_dict(machine_params)
713 # Start moving through machine states
714 self.machine.setup_algorithm(iteration=iteration)
715 # After this point, the logging is in the stdout of the algorithm
716 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
717 runs_to_execute = []
718 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
719 # If we were given a specific IoV to calibrate we just execute over runs in that IoV
720 if iov:
721 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
722 else:
723 runs_to_execute = all_runs_collected[:]
724
725 # Remove the ignored runs from our run list to execute
726 if self.ignored_runs:
727 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
728 runs_to_execute.difference_update(set(self.ignored_runs))
729 # Sets aren't ordered so lets go back to lists and sort
730 runs_to_execute = sorted(runs_to_execute)
731
732 # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
733 # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
734 # separately and prevent IoVs from crossing the boundary.
735 runs_to_execute = split_runs_by_exp(runs_to_execute)
736
737 # Now iterate through the experiments. We DO NOT allow a payload IoV to
738 # extend over multiple experiments, only multiple runs
739 iov_coverage = None
740 if "iov_coverage" in self.algorithm.params:
741 B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
742 iov_coverage = self.algorithm.params["iov_coverage"]
743
744 payload_boundaries = None
745 if "payload_boundaries" in self.algorithm.params:
746 B2INFO(f"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
747 payload_boundaries = self.algorithm.params["payload_boundaries"]
748
749 number_of_experiments = len(runs_to_execute)
750 B2INFO(f"We are iterating over {number_of_experiments} experiments.")
751
752 # Iterate over experiment run lists
753 for i_exp, run_list in enumerate(runs_to_execute, start=1):
754 B2DEBUG(26, f"Run List for this experiment={run_list}")
755 current_experiment = run_list[0].exp
756 B2INFO(f"Executing over data from experiment {current_experiment}")
757 # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
758 # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
759 # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
760 # This is only true for the lowest and highest experiments in our input data.
761 if i_exp == 1:
762 if iov_coverage:
763 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
764 else:
765 lowest_exprun = run_list[0]
766 # We are calibrating across multiple experiments so we shouldn't start from the middle but from the 0th run
767 else:
768 lowest_exprun = ExpRun(current_experiment, 0)
769
770 # Override the normal value for the highest ExpRun (from data) if iov_coverage was set
771 if iov_coverage and i_exp == number_of_experiments:
772 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
773 # If we have more experiments to execute then we will be setting the final payload IoV in this experiment
774 # to be unbounded
775 elif i_exp < number_of_experiments:
776 highest_exprun = ExpRun(current_experiment, -1)
777 # Otherwise just get the values from data
778 else:
779 highest_exprun = run_list[-1]
780
781 # Find the boundaries for this experiment's runs
782 vec_run_list = vector_from_runs(run_list)
783 if payload_boundaries is None:
784 # Find the boundaries using the findPayloadBoundaries implemented in the algorithm
785 B2INFO("Attempting to find payload boundaries.")
786 vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list)
787 # If this vector is empty then that's bad. Maybe the isBoundaryRequired function
788 # wasn't implemented? Either way we should stop.
789 if vec_boundaries.empty():
790 B2ERROR("No boundaries found but we are in a strategy that requires them! Failing...")
791 # Tell the Runner that we have failed
792 self.send_final_state(self.FAILED)
793 break
794 vec_boundaries = runs_from_vector(vec_boundaries)
795 else:
796 # Using boundaries set by user
797 B2INFO(f"Using as payload boundaries {payload_boundaries}.")
798 vec_boundaries = [ExpRun(exp, run) for exp, run in payload_boundaries]
799 # No need to check that vec_boundaries is not empty. In case it is we will anyway add
800 # a boundary at the first run of each experiment.
801 # Remove any boundaries not from the current experiment (only likely if they were set manually)
802 # We sort just to make everything easier later and just in case something mad happened.
803 run_boundaries = sorted([er for er in vec_boundaries if er.exp == current_experiment])
804 # In this strategy we consider separately each experiment. We then now check that the
805 # boundary (exp, 0) is present and if not we add it. It is indeed possible to miss it
806 # if the boundaries were given manually
807 first_exprun = ExpRun(current_experiment, 0)
808 if first_exprun not in run_boundaries:
809 B2WARNING(f"No boundary found at ({current_experiment}, 0), adding it.")
810 run_boundaries[0:0] = [first_exprun]
811 B2INFO(f"Found {len(run_boundaries)} boundaries for this experiment. "
812 "Checking if we have some data for all boundary IoVs...")
813 # First figure out the run lists to use for each execution (potentially different from the applied IoVs)
814 # We use the boundaries and the run_list
815 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
816 B2DEBUG(26, f"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
817 # If there were any boundary IoVs with no run data, just remove them. Otherwise they will execute over all data.
818 boundary_iovs_to_run_lists = {key: value for key, value in boundary_iovs_to_run_lists.items() if value}
819 B2DEBUG(26, f"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
820 # If any were removed then we might have gaps between the boundary IoVs. Fix those now by merging IoVs.
821 new_boundary_iovs_to_run_lists = {}
822 previous_boundary_iov = None
823 previous_boundary_run_list = None
824 for boundary_iov, run_list in boundary_iovs_to_run_lists.items():
825 if not previous_boundary_iov:
826 previous_boundary_iov = boundary_iov
827 previous_boundary_run_list = run_list
828 continue
829 # We are definitely dealiing with IoVs from one experiment so we can make assumptions here
830 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
831 B2WARNING("Gap in boundary IoVs found before execution! "
832 "Will correct it by extending the previous boundary up to the next one.")
833 B2INFO(f"Original boundary IoV={previous_boundary_iov}")
834 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
835 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
836 B2INFO(f"New boundary IoV={previous_boundary_iov}")
837 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
838 previous_boundary_iov = boundary_iov
839 previous_boundary_run_list = run_list
840 else:
841 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
842 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
843 B2DEBUG(26, f"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
844 # Actually execute now that we have an IoV list to apply
845 success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
846 if not success:
847 # Tell the Runner that we have failed
848 self.send_final_state(self.FAILED)
849 break
850 # Only executes if we didn't fail any experiment execution
851 else:
852 # Print any knowable gaps between result IoVs, if any are found there is a problem, but not necessarily too bad.
853 gaps = self.find_iov_gaps()
854 if gaps:
855 B2WARNING("There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
856 # Dump them to a file for logging
857 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
858 json.dump(gaps, f)
859
860 # If any results weren't successes we fail
861 if self.any_failed_iov():
862 self.send_final_state(self.FAILED)
863 else:
864 self.send_final_state(self.COMPLETED)
865
866 def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration):
867 """
868 Take the previously found boundaries and the run lists they correspond to and actually perform the
869 Algorithm execution. This is assumed to be for a single experiment.
870 """
871 # Copy of boundary IoVs
872 remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
873
874 # The current runs we are executing
875 current_runs = []
876 # The IoV of the current boundary(s)
877 current_boundary_iov = None
878 # The current execution's applied IoV, may be different to the boundary IoV
879 current_iov = None
880
881 # The last successful payload list and result. We hold on to them so that we can commit or discard later.
882 last_successful_payloads = None
883 last_successful_result = None
884 # The previous execution's runs
885 last_successful_runs = []
886 # The previous execution's applied IoV
887 last_successful_iov = None
888
889 while True:
890 # Do we have previous successes?
891 if not last_successful_result:
892 if not current_runs:
893 # Did we actually have any boundaries?
894 if not remaining_boundary_iovs:
895 # Fail because we have no boundaries to use
896 B2ERROR("No boundaries found for the current experiment's run list. Failing the strategy.")
897 return False
898
899 B2INFO("This appears to be the first attempted execution of the experiment.")
900 # Attempt to execute on the first boundary
901 current_boundary_iov = remaining_boundary_iovs.pop(0)
902 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
903 # What if there is only one boundary? Need to apply the highest exprun
904 if not remaining_boundary_iovs:
905 current_iov = IoV(*lowest_exprun, *highest_exprun)
906 else:
907 current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
908 # Returned not enough data from first execution
909 else:
910 # Any remaining boundaries?
911 if not remaining_boundary_iovs:
912 # Fail because we have no boundaries to use
913 B2ERROR("Not enough data found for the current experiment's run list. Failing the strategy.")
914 return False
915
916 B2INFO("There wasn't enough data previously. Merging with the runs from the next boundary.")
917 # Extend the previous run lists/iovs
918 next_boundary_iov = remaining_boundary_iovs.pop(0)
919 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
920 next_boundary_iov.exp_high, next_boundary_iov.run_high)
921 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
922 # At the last boundary? Need to apply the highest exprun
923 if not remaining_boundary_iovs:
924 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
925 else:
926 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
927 current_boundary_iov.exp_high, current_boundary_iov.run_high)
928
929 self.execute_runs(current_runs, iteration, current_iov)
930
931 # Does this count as a successful execution?
932 if self.alg_success():
933 # Commit previous values we were holding onto
934 B2INFO("Found a success. Will save the payloads for later.")
935 # Save success
936 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
937 last_successful_result = self.machine.result
938 last_successful_runs = current_runs[:]
939 last_successful_iov = current_iov
940 # Reset values for next loop
941 current_runs = []
942 current_boundary_iov = None
943 current_iov = None
944 self.machine.complete()
945 continue
946 elif self.machine.result.result == AlgResult.not_enough_data.value:
947 B2INFO("Not Enough Data result.")
948 # Just complete but leave the current runs alone for next loop
949 self.machine.complete()
950 continue
951 else:
952 B2ERROR("Hit a failure or some kind of result we can't continue from. Failing out...")
953 self.machine.fail()
954 return False
955 # Previous result exists
956 else:
957 # Previous loop was a success
958 if not current_runs:
959 # Remaining boundaries?
960 if not remaining_boundary_iovs:
961 # Out of data, can now commit
962 B2INFO("Finished this experiment's boundaries. "
963 f"Committing remaining payloads from {last_successful_result.iov}")
964 self.machine.algorithm.algorithm.commit(last_successful_payloads)
965 self.results.append(last_successful_result)
966 self.send_result(last_successful_result)
967 return True
968
969 # Remaining boundaries exist so we try to execute
970 current_boundary_iov = remaining_boundary_iovs.pop(0)
971 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
972 # What if there is only one boundary? Need to apply the highest exprun
973 if not remaining_boundary_iovs:
974 current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
975 else:
976 current_iov = current_boundary_iov
977
978 # Returned not enough data from last execution
979 else:
980 # Any remaining boundaries?
981 if not remaining_boundary_iovs:
982 B2INFO("We have no remaining runs to increase the amount of data. "
983 "Instead we will merge with the previous successful runs.")
984 # Merge with previous success IoV
985 new_current_runs = last_successful_runs[:]
986 new_current_runs.extend(current_runs)
987 current_runs = new_current_runs[:]
988 current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
989 current_iov.exp_high, current_iov.run_high)
990 # We reset the last successful stuff because we are dropping it
991 last_successful_payloads = []
992 last_successful_result = None
993 last_successful_runs = []
994 last_successful_iov = None
995
996 else:
997 B2INFO("Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
998 # Extend the previous run lists/iovs
999 next_boundary_iov = remaining_boundary_iovs.pop(0)
1000 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
1001 next_boundary_iov.exp_high, next_boundary_iov.run_high)
1002 # Extend previous execution's runs with the next set
1003 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
1004 # At the last boundary? Need to apply the highest exprun
1005 if not remaining_boundary_iovs:
1006 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
1007 else:
1008 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1009 current_boundary_iov.exp_high, current_boundary_iov.run_high)
1010
1011 self.execute_runs(current_runs, iteration, current_iov)
1012
1013 # Does this count as a successful execution?
1014 if self.alg_success():
1015 # Commit previous values we were holding onto
1016 B2INFO("Found a success.")
1017 if last_successful_result:
1018 B2INFO("Can now commit the previous success.")
1019 self.machine.algorithm.algorithm.commit(last_successful_payloads)
1020 self.results.append(last_successful_result)
1021 self.send_result(last_successful_result)
1022 # Replace last success
1023 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
1024 last_successful_result = self.machine.result
1025 last_successful_runs = current_runs[:]
1026 last_successful_iov = current_iov
1027 # Reset values for next loop
1028 current_runs = []
1029 current_boundary_iov = None
1030 current_iov = None
1031 self.machine.complete()
1032 continue
1033 elif self.machine.result.result == AlgResult.not_enough_data.value:
1034 B2INFO("Not Enough Data result.")
1035 # Just complete but leave the current runs alone for next loop
1036 self.machine.complete()
1037 continue
1038 else:
1039 B2ERROR("Hit a failure or some other result we can't continue from. Failing out...")
1040 self.machine.fail()
1041 return False
1042
1043 def execute_runs(self, runs, iteration, iov):
1044 # Already set up earlier the first time, so we shouldn't do it again
1045 if not self.first_execution:
1046 self.machine.setup_algorithm()
1047 else:
1048 self.first_execution = False
1049
1050 B2INFO(f"Executing and applying {iov} to the payloads.")
1051 self.machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1052 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
1053
1054 def alg_success(self):
1055 return ((self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value))
1056
1057
1058class StrategyError(Exception):
1059 """
1060 Basic Exception for this type of class.
1061 """
1062
1063# @endcond