Belle II Software development
strategies.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
15from caf.utils import AlgResult
16from caf.utils import B2INFO_MULTILINE
17from caf.utils import runs_overlapping_iov, runs_from_vector
18from caf.utils import iov_from_runs, split_runs_by_exp, vector_from_runs
19from caf.utils import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
20from caf.utils import IoV, ExpRun
21from caf.state_machines import AlgorithmMachine
22
23from abc import ABC, abstractmethod
24import json
25
26
27class AlgorithmStrategy(ABC):
28 """
29 Base class for Algorithm strategies. These do the actual execution of a single
30 algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
31 how database payloads are passed between executions, and whether or not final payloads have an IoV
32 that is independent to the actual runs used to calculates them.
33
34 Parameters:
35 algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
36
37 This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
38 When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
39
40 If you define your derived class with an __init__ method, then you should first call the base class `AlgorithmStrategy.__init__()` method via super() e.g.
41
42 >>> def __init__(self):
43 >>> super().__init__()
44
45 The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
46 in the required way defined by the options you have selected/attributes set.
47 """
48
50 required_attrs = ["algorithm",
51 "database_chain",
52 "dependent_databases",
53 "output_dir",
54 "output_database_dir",
55 "input_files",
56 "ignored_runs"
57 ]
58
59
60 required_true_attrs = ["algorithm",
61 "output_dir",
62 "output_database_dir",
63 "input_files"
64 ]
65
66
67 allowed_granularities = ["run", "all"]
68
69
70 FINISHED_RESULTS = "DONE"
71
72
73 COMPLETED = "COMPLETED"
74
75
76 FAILED = "FAILED"
77
78 def __init__(self, algorithm):
79 """
80 """
81
82 self.algorithm = algorithm
83
84 self.input_files = []
85
86 self.output_dir = ""
87
88 self.output_database_dir = ""
89
90 self.database_chain = []
91
92 self.dependent_databases = []
93
95 self.ignored_runs = []
96
97 self.results = []
98
99 self.queue = None
100
101 @abstractmethod
102 def run(self, iov, iteration, queue):
103 """
104 Abstract method that needs to be implemented. It will be called to actually execute the
105 algorithm.
106 """
107
108 def setup_from_dict(self, params):
109 """
110 Parameters:
111 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
112 """
113 for attribute_name, value in params.items():
114 setattr(self, attribute_name, value)
115
116 def is_valid(self):
117 """
118 Returns:
119 bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
120 """
121 B2INFO("Checking validity of current AlgorithmStrategy setup.")
122 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
123 for attribute_name in self.required_attrs:
124 if not hasattr(self, attribute_name):
125 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
126 return False
127 # Check if any attributes that need actual values haven't been set or were empty
128 for attribute_name in self.required_true_attrs:
129 if not getattr(self, attribute_name):
130 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.")
131 return False
132 return True
133
134 def find_iov_gaps(self):
135 """
136 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
137 not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
138 within the same experiment are found.
139
140 Returns:
141 iov_gaps(list[IoV])
142 """
143 iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.results]))
144 if iov_gaps:
145 gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."]
146 gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.")
147 gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs")
148 gap_msg.append("unless you edit the final database.txt yourself.")
149 B2INFO_MULTILINE(gap_msg)
150 for iov in iov_gaps:
151 B2INFO(f"{iov} not covered by any execution of the algorithm.")
152 return iov_gaps
153
154 def any_failed_iov(self):
155 """
156 Returns:
157 bool: If any result in the current results list has a failed algorithm code we return True
158 """
159 failed_results = []
160 for result in self.results:
161 if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value:
162 failed_results.append(result)
163 if failed_results:
164 B2WARNING("Failed results found.")
165 for result in failed_results:
166 if result.result == AlgResult.failure.value:
167 B2ERROR(f"c_Failure returned for {result.iov}.")
168 elif result.result == AlgResult.not_enough_data.value:
169 B2WARNING(f"c_NotEnoughData returned for {result.iov}.")
170 return True
171 else:
172 return False
173
174 def send_result(self, result):
175 self.queue.put({"type": "result", "value": result})
176
177 def send_final_state(self, state):
178 self.queue.put({"type": "final_state", "value": state})
179
180
181class SingleIOV(AlgorithmStrategy):
182 """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
183 data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
184 that was executed.
185
186 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
187 a CalibrationAlgorithm C++ class directly.
188"""
189
191 usable_params = {"apply_iov": IoV}
192
193 def __init__(self, algorithm):
194 """
195 """
196 super().__init__(algorithm)
197
199 self.machine = AlgorithmMachine(self.algorithm)
200
201 def run(self, iov, iteration, queue):
202 """
203 Runs the algorithm machine over the collected data and fills the results.
204 """
205 if not self.is_valid():
206 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
207 self.queue = queue
208
209 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
210 # Now add all the necessary parameters for a strategy to run
211 machine_params = {}
212 machine_params["database_chain"] = self.database_chain
213 machine_params["dependent_databases"] = self.dependent_databases
214 machine_params["output_dir"] = self.output_dir
215 machine_params["output_database_dir"] = self.output_database_dir
216 machine_params["input_files"] = self.input_files
217 machine_params["ignored_runs"] = self.ignored_runs
218 self.machine.setup_from_dict(machine_params)
219 # Start moving through machine states
220 B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
221 self.machine.setup_algorithm(iteration=iteration)
222 # After this point, the logging is in the stdout of the algorithm
223 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
224
225 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
226 # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
227 if iov:
228 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
229 else:
230 runs_to_execute = all_runs_collected
231
232 # Remove the ignored runs from our run list to execute
233 if self.ignored_runs:
234 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
235 runs_to_execute.difference_update(set(self.ignored_runs))
236 # Sets aren't ordered so lets go back to lists and sort
237 runs_to_execute = sorted(runs_to_execute)
238 apply_iov = None
239 if "apply_iov" in self.algorithm.params:
240 apply_iov = self.algorithm.params["apply_iov"]
241 self.machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
242 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
243
244 # Send out the result to the runner
245 self.send_result(self.machine.result)
246
247 # Make sure the algorithm state and commit is done
248 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
249 # Valid exit codes mean we can complete properly
250 self.machine.complete()
251 # Commit all the payloads and send out the results
252 self.machine.algorithm.algorithm.commit()
253 self.send_final_state(self.COMPLETED)
254 else:
255 # Either there wasn't enough data or the algorithm failed
256 self.machine.fail()
257 self.send_final_state(self.FAILED)
258
259
260class SequentialRunByRun(AlgorithmStrategy):
261 """
262 Algorithm strategy to do run-by-run calibration of collected data.
263 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only. If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
264 the next run's data and tries again.
265
266 Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
267 and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
268 is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
269 are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
270
271 Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
272 This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration.
273
274 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
275 a CalibrationAlgorithm C++ class directly.
276"""
277
279 usable_params = {
280 "has_experiment_settings": bool,
281 "iov_coverage": IoV,
282 "step_size": int
283 }
284
285
286 allowed_granularities = ["run"]
287
288 def __init__(self, algorithm):
289 """
290 """
291 super().__init__(algorithm)
292
294 self.machine = AlgorithmMachine(self.algorithm)
295 if "step_size" not in self.algorithm.params:
296 self.algorithm.params["step_size"] = 1
297 self.first_execution = True
298
299 def apply_experiment_settings(self, algorithm, experiment):
300 """
301 Apply experiment-dependent settings.
302 This is the default version, which does not do anything.
303 If necessary, it should be reimplemented by derived classes.
304 """
305 return
306
307 def run(self, iov, iteration, queue):
308 """
309 Runs the algorithm machine over the collected data and fills the results.
310 """
311 if not self.is_valid():
312 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
313 self.queue = queue
314 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
315 # Now add all the necessary parameters for a strategy to run
316 machine_params = {}
317 machine_params["database_chain"] = self.database_chain
318 machine_params["dependent_databases"] = self.dependent_databases
319 machine_params["output_dir"] = self.output_dir
320 machine_params["output_database_dir"] = self.output_database_dir
321 machine_params["input_files"] = self.input_files
322 machine_params["ignored_runs"] = self.ignored_runs
323 self.machine.setup_from_dict(machine_params)
324 # Start moving through machine states
325 self.machine.setup_algorithm(iteration=iteration)
326 # After this point, the logging is in the stdout of the algorithm
327 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
328 runs_to_execute = []
329 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
330 # If we were given a specific IoV to calibrate we just execute over runs in that IoV
331 if iov:
332 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
333 else:
334 runs_to_execute = all_runs_collected[:]
335
336 # Remove the ignored runs from our run list to execute
337 if self.ignored_runs:
338 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
339 runs_to_execute.difference_update(set(self.ignored_runs))
340 # Sets aren't ordered so lets go back to lists and sort
341 runs_to_execute = sorted(runs_to_execute)
342
343 # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
344 # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
345 # separately and prevent IoVs from crossing the boundary.
346 runs_to_execute = split_runs_by_exp(runs_to_execute)
347
348 # Now iterate through the experiments, executing runs in blocks of 'step_size'. We DO NOT allow a payload IoV to
349 # extend over multiple experiments, only multiple runs
350 iov_coverage = None
351 if "iov_coverage" in self.algorithm.params:
352 B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
353 iov_coverage = self.algorithm.params["iov_coverage"]
354
355 number_of_experiments = len(runs_to_execute)
356 # Iterate over experiment run lists
357 for i_exp, run_list in enumerate(runs_to_execute, start=1):
358
359 # Apply experiment-dependent settings.
360 if "has_experiment_settings" in self.algorithm.params:
361 if self.algorithm.params["has_experiment_settings"]:
362 self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
363
364 # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
365 # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
366 # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
367 # This is only true for the lowest and highest experiments in our input data.
368 # If we have multiple experiments the iov must be adjusted to avoid gaps at the iov boundaries
369 lowest_exprun = ExpRun(run_list[0].exp, 0)
370 highest_exprun = ExpRun(run_list[-1].exp, -1)
371
372 if i_exp == 1:
373 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low) if iov_coverage else run_list[0]
374 if i_exp == number_of_experiments:
375 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high) if iov_coverage else run_list[-1]
376
377 self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
378
379 # Print any knowable gaps between result IoVs, if any are foun there is a problem.
380 gaps = self.find_iov_gaps()
381 # Dump them to a file for logging
382 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
383 json.dump(gaps, f)
384
385 # If any results weren't successes we fail
386 if self.any_failed_iov():
387 self.send_final_state(self.FAILED)
388 else:
389 self.send_final_state(self.COMPLETED)
390
391 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
392 # The runs (data) we have left to execute from this run list
393 remaining_runs = run_list[:]
394 # The previous execution's runs
395 previous_runs = []
396 # The current runs we are executing
397 current_runs = []
398 # The last successful payload and result
399 last_successful_payloads = None
400 last_successful_result = None
401
402 # Iterate over ExpRuns within an experiment in chunks of 'step_size'
403 for expruns in grouper(self.algorithm.params["step_size"], run_list):
404 # Already set up earlier the first time, so we shouldn't do it again
405 if not self.first_execution:
406 self.machine.setup_algorithm()
407 else:
408 self.first_execution = False
409
410 # Add on the next step of runs
411 current_runs.extend(expruns)
412 # Remove them from our remaining runs
413 remaining_runs = [run for run in remaining_runs if run not in current_runs]
414
415 # Is this the first payload of the experiment
416 if not last_successful_result:
417 B2INFO("Detected that this will be the first payload of this experiment.")
418 # If this is the first payload but we have other data, we need the IoV to cover from the
419 # lowest IoV extent requested up to the ExpRun right before the next run in the remaining runs list.
420 if remaining_runs:
421 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
422 # If this is the first payload but there isn't more data, we set the IoV to cover the full range
423 else:
424 B2INFO("Detected that this will be the only payload of the experiment.")
425 apply_iov = IoV(*lowest_exprun, *highest_exprun)
426 # If there were previous successes
427 else:
428 if not remaining_runs:
429 B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.")
430 apply_iov = IoV(*current_runs[0], *highest_exprun)
431 # Othewise, it's just a normal IoV in the middle.
432 else:
433 B2INFO("Detected that there are more runs to execute in this experiment after this next execution.")
434 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
435
436 B2INFO(f"Executing and applying {apply_iov} to the payloads.")
437 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
438 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
439
440 # Does this count as a successful execution?
441 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
442 self.machine.complete()
443 # If we've succeeded but we have a previous success we can commit the previous payloads
444 # since we have new ones ready
445 if last_successful_payloads and last_successful_result:
446 B2INFO("Saving this execution's payloads to be committed later.")
447 # Save the payloads and result
448 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
449 new_successful_result = self.machine.result
450 B2INFO("We just succeded in execution of the Algorithm."
451 f" Will now commit payloads from the previous success for {last_successful_result.iov}.")
452 self.machine.algorithm.algorithm.commit(last_successful_payloads)
453 self.results.append(last_successful_result)
454 self.send_result(last_successful_result)
455 # If there are remaining runs we need to have the current payloads ready to commit after the next execution
456 if remaining_runs:
457 last_successful_payloads = new_successful_payloads
458 last_successful_result = new_successful_result
459 # If there's not more runs to process we should also commit the new ones
460 else:
461 B2INFO("We have no more runs to process. "
462 f"Will now commit the most recent payloads for {new_successful_result.iov}.")
463 self.machine.algorithm.algorithm.commit(new_successful_payloads)
464 self.results.append(new_successful_result)
465 self.send_result(new_successful_result)
466 break
467 # if there's no previous success this must be the first run executed
468 else:
469 # Need to save payloads for later if we have a success but runs remain
470 if remaining_runs:
471 B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
472 # Save the payloads and result
473 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
474 last_successful_result = self.machine.result
475 # Need to commit and exit if we have a success and no remaining data
476 else:
477 B2INFO("We just succeeded in execution of the Algorithm."
478 " No runs left to be processed, so we are committing results of this execution.")
479 self.machine.algorithm.algorithm.commit()
480 self.results.append(self.machine.result)
481 self.send_result(self.machine.result)
482 break
483
484 previous_runs = current_runs[:]
485 current_runs = []
486 # If it wasn't successful, was it due to lack of data in the runs?
487 elif (self.machine.result.result == AlgResult.not_enough_data.value):
488 B2INFO(f"There wasn't enough data in {self.machine.result.iov}.")
489 if remaining_runs:
490 B2INFO("Some runs remain to be processed. "
491 f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
492 elif not remaining_runs and not last_successful_result:
493 B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success."
494 " There wasn't enough data in the full input data requested.")
495 self.results.append(self.machine.result)
496 self.send_result(self.machine.result)
497 self.machine.fail()
498 break
499 elif not remaining_runs and last_successful_result:
500 B2INFO("There aren't any more runs remaining to merge with. But we had a previous success"
501 ", so we'll merge with the previous IoV.")
502 final_runs = current_runs[:]
503 current_runs = previous_runs
504 current_runs.extend(final_runs)
505 self.machine.fail()
506 elif self.machine.result.result == AlgResult.failure.value:
507 B2ERROR(f"{self.algorithm.name} returned failure exit code.")
508 self.results.append(self.machine.result)
509 self.send_result(self.machine.result)
510 self.machine.fail()
511 break
512 else:
513 # Check if we need to run a final execution on the previous execution + dangling set of runs
514 if current_runs:
515 self.machine.setup_algorithm()
516 apply_iov = IoV(last_successful_result.iov.exp_low,
517 last_successful_result.iov.run_low,
518 *highest_exprun)
519 B2INFO(f"Executing on {apply_iov}.")
520 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
521 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
522 if (self.machine.result.result == AlgResult.ok.value) or (
523 self.machine.result.result == AlgResult.iterate.value):
524 self.machine.complete()
525 # Commit all the payloads and send out the results
526 self.machine.algorithm.algorithm.commit()
527 # Save the result
528 self.results.append(self.machine.result)
529 self.send_result(self.machine.result)
530 else:
531 # Save the result
532 self.results.append(self.machine.result)
533 self.send_result(self.machine.result)
534 # But failed
535 self.machine.fail()
536
537
538class SimpleRunByRun(AlgorithmStrategy):
539 """
540 Algorithm strategy to do run-by-run calibration of collected data.
541 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
542 This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
543 'not enough data' on the current run.
544
545 Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
546 next run (if any are left).
547 Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
548
549 .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
550 code.
551 The failure will prevent iteration/successful completion of the CAF though.
552
553 .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
554 enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
555 The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
556 However, you will still have the database created that covers all the successfull runs.
557
558 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
559 a CalibrationAlgorithm C++ class directly.
560"""
561
562 allowed_granularities = ["run"]
563
565 usable_params = {}
566
567 def __init__(self, algorithm):
568 """
569 """
570 super().__init__(algorithm)
571
573 self.machine = AlgorithmMachine(self.algorithm)
574
575 def run(self, iov, iteration, queue):
576 """
577 Runs the algorithm machine over the collected data and fills the results.
578 """
579
580 if not self.is_valid():
581 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
582 self.queue = queue
583
584 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
585 # Now add all the necessary parameters for a strategy to run
586 machine_params = {}
587 machine_params["database_chain"] = self.database_chain
588 machine_params["dependent_databases"] = self.dependent_databases
589 machine_params["output_dir"] = self.output_dir
590 machine_params["output_database_dir"] = self.output_database_dir
591 machine_params["input_files"] = self.input_files
592 machine_params["ignored_runs"] = self.ignored_runs
593 self.machine.setup_from_dict(machine_params)
594 # Start moving through machine states
595 B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
596 self.machine.setup_algorithm(iteration=iteration)
597 # After this point, the logging is in the stdout of the algorithm
598 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
599
600 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
601 # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
602 if iov:
603 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
604 else:
605 runs_to_execute = all_runs_collected
606
607 # Remove the ignored runs from our run list to execute
608 if self.ignored_runs:
609 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
610 runs_to_execute.difference_update(set(self.ignored_runs))
611 # Sets aren't ordered so lets go back to lists and sort
612 runs_to_execute = sorted(runs_to_execute)
613
614 # Is this the first time executing the algorithm?
615 first_execution = True
616 for exprun in runs_to_execute:
617 if not first_execution:
618 self.machine.setup_algorithm()
619 current_runs = exprun
620 apply_iov = iov_from_runs([current_runs])
621 B2INFO(f"Executing on IoV = {apply_iov}.")
622 self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
623 first_execution = False
624 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
625 # Does this count as a successful execution?
626 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
627 # Commit the payloads and result
628 B2INFO(f"Committing payloads for {iov_from_runs([current_runs])}.")
629 self.machine.algorithm.algorithm.commit()
630 self.results.append(self.machine.result)
631 self.send_result(self.machine.result)
632 self.machine.complete()
633 # If it wasn't successful, was it due to lack of data in the runs?
634 elif (self.machine.result.result == AlgResult.not_enough_data.value):
635 B2INFO(f"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
636 self.results.append(self.machine.result)
637 self.send_result(self.machine.result)
638 self.machine.fail()
639 elif self.machine.result.result == AlgResult.failure.value:
640 B2ERROR(f"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
641 self.results.append(self.machine.result)
642 self.send_result(self.machine.result)
643 self.machine.fail()
644
645 # Print any knowable gaps between result IoVs, if any are foun there is a problem.
646 gaps = self.find_iov_gaps()
647 # Dump them to a file for logging
648 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
649 json.dump(gaps, f)
650
651 self.send_final_state(self.COMPLETED)
652
653
654class SequentialBoundaries(AlgorithmStrategy):
655 """
656 Algorithm strategy to first calculate run boundaries where execution should be attempted.
657 Runs the algorithm over the input data contained within the requested IoV of the boundaries,
658 starting with the first boundary data only.
659 If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads, but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
660 but using run boundaries instead of runs directly.
661 Notice that boundaries cannot span multiple experiments.
662
663 By default the algorithm will get the payload boundaries directly from the algorithm that need to
664 have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
665 is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
666 the need to define the ``isBoundaryRequired`` function.
667
668 ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
669 experiment will be added if not already present. An empty list will thus produce a single payload for each
670 experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
671 behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
672 """
673
675 usable_params = {
676 "iov_coverage": IoV,
677 "payload_boundaries": [] # [(exp1, run1), (exp2, run2), ...]
678 }
679
680
681 allowed_granularities = ["run"]
682
683 def __init__(self, algorithm):
684 """
685 """
686 super().__init__(algorithm)
687
689 self.machine = AlgorithmMachine(self.algorithm)
690 self.first_execution = True
691
692 def run(self, iov, iteration, queue):
693 """
694 Runs the algorithm machine over the collected data and fills the results.
695 """
696 if not self.is_valid():
697 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
698 self.queue = queue
699 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
700 # Now add all the necessary parameters for a strategy to run
701 machine_params = {}
702 machine_params["database_chain"] = self.database_chain
703 machine_params["dependent_databases"] = self.dependent_databases
704 machine_params["output_dir"] = self.output_dir
705 machine_params["output_database_dir"] = self.output_database_dir
706 machine_params["input_files"] = self.input_files
707 machine_params["ignored_runs"] = self.ignored_runs
708 self.machine.setup_from_dict(machine_params)
709 # Start moving through machine states
710 self.machine.setup_algorithm(iteration=iteration)
711 # After this point, the logging is in the stdout of the algorithm
712 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
713 runs_to_execute = []
714 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
715 # If we were given a specific IoV to calibrate we just execute over runs in that IoV
716 if iov:
717 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
718 else:
719 runs_to_execute = all_runs_collected[:]
720
721 # Remove the ignored runs from our run list to execute
722 if self.ignored_runs:
723 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
724 runs_to_execute.difference_update(set(self.ignored_runs))
725 # Sets aren't ordered so lets go back to lists and sort
726 runs_to_execute = sorted(runs_to_execute)
727
728 # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
729 # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
730 # separately and prevent IoVs from crossing the boundary.
731 runs_to_execute = split_runs_by_exp(runs_to_execute)
732
733 # Now iterate through the experiments. We DO NOT allow a payload IoV to
734 # extend over multiple experiments, only multiple runs
735 iov_coverage = None
736 if "iov_coverage" in self.algorithm.params:
737 B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
738 iov_coverage = self.algorithm.params["iov_coverage"]
739
740 payload_boundaries = None
741 if "payload_boundaries" in self.algorithm.params:
742 B2INFO(f"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
743 payload_boundaries = self.algorithm.params["payload_boundaries"]
744
745 number_of_experiments = len(runs_to_execute)
746 B2INFO(f"We are iterating over {number_of_experiments} experiments.")
747
748 # Iterate over experiment run lists
749 for i_exp, run_list in enumerate(runs_to_execute, start=1):
750 B2DEBUG(26, f"Run List for this experiment={run_list}")
751 current_experiment = run_list[0].exp
752 B2INFO(f"Executing over data from experiment {current_experiment}")
753 # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
754 # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
755 # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
756 # This is only true for the lowest and highest experiments in our input data.
757 if i_exp == 1:
758 if iov_coverage:
759 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
760 else:
761 lowest_exprun = run_list[0]
762 # We are calibrating across multiple experiments so we shouldn't start from the middle but from the 0th run
763 else:
764 lowest_exprun = ExpRun(current_experiment, 0)
765
766 # Override the normal value for the highest ExpRun (from data) if iov_coverage was set
767 if iov_coverage and i_exp == number_of_experiments:
768 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
769 # If we have more experiments to execute then we wil be setting the final payload IoV in this experiment
770 # to be unbounded
771 elif i_exp < number_of_experiments:
772 highest_exprun = ExpRun(current_experiment, -1)
773 # Otherwise just get the values from data
774 else:
775 highest_exprun = run_list[-1]
776
777 # Find the boundaries for this experiment's runs
778 vec_run_list = vector_from_runs(run_list)
779 if payload_boundaries is None:
780 # Find the boundaries using the findPayloadBoundaries implemented in the algorithm
781 B2INFO("Attempting to find payload boundaries.")
782 vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list)
783 # If this vector is empty then that's bad. Maybe the isBoundaryRequired function
784 # wasn't implemented? Either way we should stop.
785 if vec_boundaries.empty():
786 B2ERROR("No boundaries found but we are in a strategy that requires them! Failing...")
787 # Tell the Runner that we have failed
788 self.send_final_state(self.FAILED)
789 break
790 vec_boundaries = runs_from_vector(vec_boundaries)
791 else:
792 # Using boundaries set by user
793 B2INFO(f"Using as payload boundaries {payload_boundaries}.")
794 vec_boundaries = [ExpRun(exp, run) for exp, run in payload_boundaries]
795 # No need to check that vec_boundaries is not empty. In case it is we will anyway add
796 # a boundary at the first run of each experiment.
797 # Remove any boundaries not from the current experiment (only likely if they were set manually)
798 # We sort just to make everything easier later and just in case something mad happened.
799 run_boundaries = sorted([er for er in vec_boundaries if er.exp == current_experiment])
800 # In this strategy we consider separately each experiment. We then now check that the
801 # boundary (exp, 0) is present and if not we add it. It is indeed possible to miss it
802 # if the boundaries were given manually
803 first_exprun = ExpRun(current_experiment, 0)
804 if first_exprun not in run_boundaries:
805 B2WARNING(f"No boundary found at ({current_experiment}, 0), adding it.")
806 run_boundaries[0:0] = [first_exprun]
807 B2INFO(f"Found {len(run_boundaries)} boundaries for this experiment. "
808 "Checking if we have some data for all boundary IoVs...")
809 # First figure out the run lists to use for each execution (potentially different from the applied IoVs)
810 # We use the boundaries and the run_list
811 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
812 B2DEBUG(26, f"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
813 # If there were any boundary IoVs with no run data, just remove them. Otherwise they will execute over all data.
814 boundary_iovs_to_run_lists = {key: value for key, value in boundary_iovs_to_run_lists.items() if value}
815 B2DEBUG(26, f"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
816 # If any were removed then we might have gaps between the boundary IoVs. Fix those now by merging IoVs.
817 new_boundary_iovs_to_run_lists = {}
818 previous_boundary_iov = None
819 previous_boundary_run_list = None
820 for boundary_iov, run_list in boundary_iovs_to_run_lists.items():
821 if not previous_boundary_iov:
822 previous_boundary_iov = boundary_iov
823 previous_boundary_run_list = run_list
824 continue
825 # We are definitely dealiing with IoVs from one experiment so we can make assumptions here
826 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
827 B2WARNING("Gap in boundary IoVs found before execution! "
828 "Will correct it by extending the previous boundary up to the next one.")
829 B2INFO(f"Original boundary IoV={previous_boundary_iov}")
830 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
831 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
832 B2INFO(f"New boundary IoV={previous_boundary_iov}")
833 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
834 previous_boundary_iov = boundary_iov
835 previous_boundary_run_list = run_list
836 else:
837 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
838 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
839 B2DEBUG(26, f"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
840 # Actually execute now that we have an IoV list to apply
841 success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
842 if not success:
843 # Tell the Runner that we have failed
844 self.send_final_state(self.FAILED)
845 break
846 # Only executes if we didn't fail any experiment execution
847 else:
848 # Print any knowable gaps between result IoVs, if any are found there is a problem, but not necessarily too bad.
849 gaps = self.find_iov_gaps()
850 if gaps:
851 B2WARNING("There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
852 # Dump them to a file for logging
853 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
854 json.dump(gaps, f)
855
856 # If any results weren't successes we fail
857 if self.any_failed_iov():
858 self.send_final_state(self.FAILED)
859 else:
860 self.send_final_state(self.COMPLETED)
861
862 def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration):
863 """
864 Take the previously found boundaries and the run lists they correspond to and actually perform the
865 Algorithm execution. This is assumed to be for a single experiment.
866 """
867 # Copy of boundary IoVs
868 remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
869
870 # The current runs we are executing
871 current_runs = []
872 # The IoV of the current boundary(s)
873 current_boundary_iov = None
874 # The current execution's applied IoV, may be different to the boundary IoV
875 current_iov = None
876
877 # The last successful payload list and result. We hold on to them so that we can commit or discard later.
878 last_successful_payloads = None
879 last_successful_result = None
880 # The previous execution's runs
881 last_successful_runs = []
882 # The previous execution's applied IoV
883 last_successful_iov = None
884
885 while True:
886 # Do we have previous successes?
887 if not last_successful_result:
888 if not current_runs:
889 # Did we actually have any boundaries?
890 if not remaining_boundary_iovs:
891 # Fail because we have no boundaries to use
892 B2ERROR("No boundaries found for the current experiment's run list. Failing the strategy.")
893 return False
894
895 B2INFO("This appears to be the first attempted execution of the experiment.")
896 # Attempt to execute on the first boundary
897 current_boundary_iov = remaining_boundary_iovs.pop(0)
898 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
899 # What if there is only one boundary? Need to apply the highest exprun
900 if not remaining_boundary_iovs:
901 current_iov = IoV(*lowest_exprun, *highest_exprun)
902 else:
903 current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
904 # Returned not enough data from first execution
905 else:
906 # Any remaining boundaries?
907 if not remaining_boundary_iovs:
908 # Fail because we have no boundaries to use
909 B2ERROR("Not enough data found for the current experiment's run list. Failing the strategy.")
910 return False
911
912 B2INFO("There wasn't enough data previously. Merging with the runs from the next boundary.")
913 # Extend the previous run lists/iovs
914 next_boundary_iov = remaining_boundary_iovs.pop(0)
915 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
916 next_boundary_iov.exp_high, next_boundary_iov.run_high)
917 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
918 # At the last boundary? Need to apply the highest exprun
919 if not remaining_boundary_iovs:
920 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
921 else:
922 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
923 current_boundary_iov.exp_high, current_boundary_iov.run_high)
924
925 self.execute_runs(current_runs, iteration, current_iov)
926
927 # Does this count as a successful execution?
928 if self.alg_success():
929 # Commit previous values we were holding onto
930 B2INFO("Found a success. Will save the payloads for later.")
931 # Save success
932 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
933 last_successful_result = self.machine.result
934 last_successful_runs = current_runs[:]
935 last_successful_iov = current_iov
936 # Reset values for next loop
937 current_runs = []
938 current_boundary_iov = None
939 current_iov = None
940 self.machine.complete()
941 continue
942 elif self.machine.result.result == AlgResult.not_enough_data.value:
943 B2INFO("Not Enough Data result.")
944 # Just complete but leave the current runs alone for next loop
945 self.machine.complete()
946 continue
947 else:
948 B2ERROR("Hit a failure or some kind of result we can't continue from. Failing out...")
949 self.machine.fail()
950 return False
951 # Previous result exists
952 else:
953 # Previous loop was a success
954 if not current_runs:
955 # Remaining boundaries?
956 if not remaining_boundary_iovs:
957 # Out of data, can now commit
958 B2INFO("Finished this experiment's boundaries. "
959 f"Committing remaining payloads from {last_successful_result.iov}")
960 self.machine.algorithm.algorithm.commit(last_successful_payloads)
961 self.results.append(last_successful_result)
962 self.send_result(last_successful_result)
963 return True
964
965 # Remaining boundaries exist so we try to execute
966 current_boundary_iov = remaining_boundary_iovs.pop(0)
967 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
968 # What if there is only one boundary? Need to apply the highest exprun
969 if not remaining_boundary_iovs:
970 current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
971 else:
972 current_iov = current_boundary_iov
973
974 # Returned not enough data from last execution
975 else:
976 # Any remaining boundaries?
977 if not remaining_boundary_iovs:
978 B2INFO("We have no remaining runs to increase the amount of data. "
979 "Instead we will merge with the previous successful runs.")
980 # Merge with previous success IoV
981 new_current_runs = last_successful_runs[:]
982 new_current_runs.extend(current_runs)
983 current_runs = new_current_runs[:]
984 current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
985 current_iov.exp_high, current_iov.run_high)
986 # We reset the last successful stuff because we are dropping it
987 last_successful_payloads = []
988 last_successful_result = None
989 last_successful_runs = []
990 last_successful_iov = None
991
992 else:
993 B2INFO("Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
994 # Extend the previous run lists/iovs
995 next_boundary_iov = remaining_boundary_iovs.pop(0)
996 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
997 next_boundary_iov.exp_high, next_boundary_iov.run_high)
998 # Extend previous execution's runs with the next set
999 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
1000 # At the last boundary? Need to apply the highest exprun
1001 if not remaining_boundary_iovs:
1002 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
1003 else:
1004 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1005 current_boundary_iov.exp_high, current_boundary_iov.run_high)
1006
1007 self.execute_runs(current_runs, iteration, current_iov)
1008
1009 # Does this count as a successful execution?
1010 if self.alg_success():
1011 # Commit previous values we were holding onto
1012 B2INFO("Found a success.")
1013 if last_successful_result:
1014 B2INFO("Can now commit the previous success.")
1015 self.machine.algorithm.algorithm.commit(last_successful_payloads)
1016 self.results.append(last_successful_result)
1017 self.send_result(last_successful_result)
1018 # Replace last success
1019 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
1020 last_successful_result = self.machine.result
1021 last_successful_runs = current_runs[:]
1022 last_successful_iov = current_iov
1023 # Reset values for next loop
1024 current_runs = []
1025 current_boundary_iov = None
1026 current_iov = None
1027 self.machine.complete()
1028 continue
1029 elif self.machine.result.result == AlgResult.not_enough_data.value:
1030 B2INFO("Not Enough Data result.")
1031 # Just complete but leave the current runs alone for next loop
1032 self.machine.complete()
1033 continue
1034 else:
1035 B2ERROR("Hit a failure or some other result we can't continue from. Failing out...")
1036 self.machine.fail()
1037 return False
1038
1039 def execute_runs(self, runs, iteration, iov):
1040 # Already set up earlier the first time, so we shouldn't do it again
1041 if not self.first_execution:
1042 self.machine.setup_algorithm()
1043 else:
1044 self.first_execution = False
1045
1046 B2INFO(f"Executing and applying {iov} to the payloads.")
1047 self.machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1048 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
1049
1050 def alg_success(self):
1051 return ((self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value))
1052
1053
1054class StrategyError(Exception):
1055 """
1056 Basic Exception for this type of class.
1057 """
1058
1059# @endcond
1060