Belle II Software development
strategies.py
1#!/usr/bin/env python3
2
3
10
11from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
12from caf.utils import AlgResult
13from caf.utils import B2INFO_MULTILINE
14from caf.utils import runs_overlapping_iov, runs_from_vector
15from caf.utils import iov_from_runs, split_runs_by_exp, vector_from_runs
16from caf.utils import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
17from caf.utils import IoV, ExpRun
18from caf.state_machines import AlgorithmMachine
19
20from abc import ABC, abstractmethod
21import json
22
23
25 """
26 Base class for Algorithm strategies. These do the actual execution of a single
27 algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
28 how database payloads are passed between executions, and whether or not final payloads have an IoV
29 that is independent to the actual runs used to calculates them.
30
31 Parameters:
32 algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
33
34 This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
35 When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
36
37 If you define your derived class with an __init__ method, then you should first call the base class
38 `AlgorithmStrategy.__init__()` method via super() e.g.
39
40 >>> def __init__(self):
41 >>> super().__init__()
42
43 The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
44 in the required way defined by the options you have selected/attributes set.
45 """
46
48 required_attrs = ["algorithm",
49 "database_chain",
50 "dependent_databases",
51 "output_dir",
52 "output_database_dir",
53 "input_files",
54 "ignored_runs"
55 ]
56
57
58 required_true_attrs = ["algorithm",
59 "output_dir",
60 "output_database_dir",
61 "input_files"
62 ]
63
64
65 allowed_granularities = ["run", "all"]
66
67
68 FINISHED_RESULTS = "DONE"
69
70
71 COMPLETED = "COMPLETED"
72
73
74 FAILED = "FAILED"
75
76 def __init__(self, algorithm):
77 """
78 """
79
80 self.algorithm = algorithm
81
82 self.input_files = []
83
84 self.output_dir = ""
85
87
89
91
93 self.ignored_runs = []
94
95 self.results = []
96
97 self.queue = None
98
99 @abstractmethod
100 def run(self, iov, iteration, queue):
101 """
102 Abstract method that needs to be implemented. It will be called to actually execute the
103 algorithm.
104 """
105
106 def setup_from_dict(self, params):
107 """
108 Parameters:
109 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
110 """
111 for attribute_name, value in params.items():
112 setattr(self, attribute_name, value)
113
114 def is_valid(self):
115 """
116 Returns:
117 bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
118 """
119 B2INFO("Checking validity of current AlgorithmStrategy setup.")
120 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
121 for attribute_name in self.required_attrs:
122 if not hasattr(self, attribute_name):
123 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
124 return False
125 # Check if any attributes that need actual values haven't been set or were empty
126 for attribute_name in self.required_true_attrs:
127 if not getattr(self, attribute_name):
128 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.")
129 return False
130 return True
131
132 def find_iov_gaps(self):
133 """
134 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
135 not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
136 within the same experiment are found.
137
138 Returns:
139 iov_gaps(list[IoV])
140 """
141 iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.results]))
142 if iov_gaps:
143 gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."]
144 gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.")
145 gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs")
146 gap_msg.append("unless you edit the final database.txt yourself.")
147 B2INFO_MULTILINE(gap_msg)
148 for iov in iov_gaps:
149 B2INFO(f"{iov} not covered by any execution of the algorithm.")
150 return iov_gaps
151
152 def any_failed_iov(self):
153 """
154 Returns:
155 bool: If any result in the current results list has a failed algorithm code we return True
156 """
157 failed_results = []
158 for result in self.results:
159 if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value:
160 failed_results.append(result)
161 if failed_results:
162 B2WARNING("Failed results found.")
163 for result in failed_results:
164 if result.result == AlgResult.failure.value:
165 B2ERROR(f"c_Failure returned for {result.iov}.")
166 elif result.result == AlgResult.not_enough_data.value:
167 B2WARNING(f"c_NotEnoughData returned for {result.iov}.")
168 return True
169 else:
170 return False
171
172 def send_result(self, result):
173 """send result"""
174 self.queue.put({"type": "result", "value": result})
175
176 def send_final_state(self, state):
177 """send final state"""
178 self.queue.put({"type": "final_state", "value": state})
179
180
182 """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
183 data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
184 that was executed.
185
186 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
187 a CalibrationAlgorithm C++ class directly.
188"""
189
191 usable_params = {"apply_iov": IoV}
192
193 def __init__(self, algorithm):
194 """
195 """
196 super().__init__(algorithm)
197
199 self.machine = AlgorithmMachine(self.algorithm)
200
201 def run(self, iov, iteration, queue):
202 """
203 Runs the algorithm machine over the collected data and fills the results.
204 """
205 if not self.is_valid():
206 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
207 self.queue = queue
208
209 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
210 # Now add all the necessary parameters for a strategy to run
211 machine_params = {}
212 machine_params["database_chain"] = self.database_chain
213 machine_params["dependent_databases"] = self.dependent_databases
214 machine_params["output_dir"] = self.output_dir
215 machine_params["output_database_dir"] = self.output_database_dir
216 machine_params["input_files"] = self.input_files
217 machine_params["ignored_runs"] = self.ignored_runs
218 self.machine.setup_from_dict(machine_params)
219 # Start moving through machine states
220 B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
221 self.machine.setup_algorithm(iteration=iteration)
222 # After this point, the logging is in the stdout of the algorithm
223 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
224
225 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
226 # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
227 if iov:
228 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
229 else:
230 runs_to_execute = all_runs_collected
231
232 # Remove the ignored runs from our run list to execute
233 if self.ignored_runs:
234 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
235 runs_to_execute.difference_update(set(self.ignored_runs))
236 # Sets aren't ordered so lets go back to lists and sort
237 runs_to_execute = sorted(runs_to_execute)
238 apply_iov = None
239 if "apply_iov" in self.algorithm.params:
240 apply_iov = self.algorithm.params["apply_iov"]
241 self.machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
242 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
243
244 # Send out the result to the runner
245 self.send_result(self.machine.result)
246
247 # Make sure the algorithm state and commit is done
248 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
249 # Valid exit codes mean we can complete properly
250 self.machine.complete()
251 # Commit all the payloads and send out the results
252 self.machine.algorithm.algorithm.commit()
253 self.send_final_state(self.COMPLETED)
254 else:
255 # Either there wasn't enough data or the algorithm failed
256 self.machine.fail()
257 self.send_final_state(self.FAILED)
258
259
261 """
262 Algorithm strategy to do run-by-run calibration of collected data.
263 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
264 If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
265 the next run's data and tries again.
266
267 Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
268 and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
269 is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
270 are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
271
272 Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
273 This means that there shouldn't be any IoVs that don't get a new payload by the end of running an iteration.
274
275 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
276 a CalibrationAlgorithm C++ class directly.
277"""
278
280 usable_params = {
281 "has_experiment_settings": bool,
282 "iov_coverage": IoV,
283 "step_size": int
284 }
285
286
287 allowed_granularities = ["run"]
288
289 def __init__(self, algorithm):
290 """
291 """
292 super().__init__(algorithm)
293
295 self.machine = AlgorithmMachine(self.algorithm)
296 if "step_size" not in self.algorithm.params:
297 self.algorithm.params["step_size"] = 1
298
299 self.first_execution = True
300
301 def apply_experiment_settings(self, algorithm, experiment):
302 """
303 Apply experiment-dependent settings.
304 This is the default version, which does not do anything.
305 If necessary, it should be reimplemented by derived classes.
306 """
307 return
308
309 def run(self, iov, iteration, queue):
310 """
311 Runs the algorithm machine over the collected data and fills the results.
312 """
313 if not self.is_valid():
314 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
315 self.queue = queue
316 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
317 # Now add all the necessary parameters for a strategy to run
318 machine_params = {}
319 machine_params["database_chain"] = self.database_chain
320 machine_params["dependent_databases"] = self.dependent_databases
321 machine_params["output_dir"] = self.output_dir
322 machine_params["output_database_dir"] = self.output_database_dir
323 machine_params["input_files"] = self.input_files
324 machine_params["ignored_runs"] = self.ignored_runs
325 self.machine.setup_from_dict(machine_params)
326 # Start moving through machine states
327 self.machine.setup_algorithm(iteration=iteration)
328 # After this point, the logging is in the stdout of the algorithm
329 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
330 runs_to_execute = []
331 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
332 # If we were given a specific IoV to calibrate we just execute over runs in that IoV
333 if iov:
334 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
335 else:
336 runs_to_execute = all_runs_collected[:]
337
338 # Remove the ignored runs from our run list to execute
339 if self.ignored_runs:
340 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
341 runs_to_execute.difference_update(set(self.ignored_runs))
342 # Sets aren't ordered so lets go back to lists and sort
343 runs_to_execute = sorted(runs_to_execute)
344
345 # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
346 # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
347 # separately and prevent IoVs from crossing the boundary.
348 runs_to_execute = split_runs_by_exp(runs_to_execute)
349
350 # Now iterate through the experiments, executing runs in blocks of 'step_size'. We DO NOT allow a payload IoV to
351 # extend over multiple experiments, only multiple runs
352 iov_coverage = None
353 if "iov_coverage" in self.algorithm.params:
354 B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
355 iov_coverage = self.algorithm.params["iov_coverage"]
356
357 number_of_experiments = len(runs_to_execute)
358 # Iterate over experiment run lists
359 for i_exp, run_list in enumerate(runs_to_execute, start=1):
360
361 # Apply experiment-dependent settings.
362 if "has_experiment_settings" in self.algorithm.params:
363 if self.algorithm.params["has_experiment_settings"]:
364 self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
365
366 # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
367 # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
368 # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
369 # This is only true for the lowest and highest experiments in our input data.
370 # If we have multiple experiments the iov must be adjusted to avoid gaps at the iov boundaries
371 lowest_exprun = ExpRun(run_list[0].exp, 0)
372 highest_exprun = ExpRun(run_list[-1].exp, -1)
373
374 if i_exp == 1:
375 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low) if iov_coverage else run_list[0]
376 if i_exp == number_of_experiments:
377 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high) if iov_coverage else run_list[-1]
378
379 self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
380
381 # Print any knowable gaps between result IoVs, if any are foun there is a problem.
382 gaps = self.find_iov_gaps()
383 # Dump them to a file for logging
384 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
385 json.dump(gaps, f)
386
387 # If any results weren't successes we fail
388 if self.any_failed_iov():
389 self.send_final_state(self.FAILED)
390 else:
391 self.send_final_state(self.COMPLETED)
392
393 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
394 """Execute runs given in list"""
395 # The runs (data) we have left to execute from this run list
396 remaining_runs = run_list[:]
397 # The previous execution's runs
398 previous_runs = []
399 # The current runs we are executing
400 current_runs = []
401 # The last successful payload and result
402 last_successful_payloads = None
403 last_successful_result = None
404
405 # Iterate over ExpRuns within an experiment in chunks of 'step_size'
406 for expruns in grouper(self.algorithm.params["step_size"], run_list):
407 # Already set up earlier the first time, so we shouldn't do it again
408 if not self.first_execution:
409 self.machine.setup_algorithm()
410 else:
411 self.first_execution = False
412
413 # Add on the next step of runs
414 current_runs.extend(expruns)
415 # Remove them from our remaining runs
416 remaining_runs = [run for run in remaining_runs if run not in current_runs]
417
418 # Is this the first payload of the experiment
419 if not last_successful_result:
420 B2INFO("Detected that this will be the first payload of this experiment.")
421 # If this is the first payload but we have other data, we need the IoV to cover from the
422 # lowest IoV extent requested up to the ExpRun right before the next run in the remaining runs list.
423 if remaining_runs:
424 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
425 # If this is the first payload but there isn't more data, we set the IoV to cover the full range
426 else:
427 B2INFO("Detected that this will be the only payload of the experiment.")
428 apply_iov = IoV(*lowest_exprun, *highest_exprun)
429 # If there were previous successes
430 else:
431 if not remaining_runs:
432 B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.")
433 apply_iov = IoV(*current_runs[0], *highest_exprun)
434 # Otherwise, it's just a normal IoV in the middle.
435 else:
436 B2INFO("Detected that there are more runs to execute in this experiment after this next execution.")
437 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
438
439 B2INFO(f"Executing and applying {apply_iov} to the payloads.")
440 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
441 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
442
443 # Does this count as a successful execution?
444 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
445 self.machine.complete()
446 # If we've succeeded but we have a previous success we can commit the previous payloads
447 # since we have new ones ready
448 if last_successful_payloads and last_successful_result:
449 B2INFO("Saving this execution's payloads to be committed later.")
450 # Save the payloads and result
451 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
452 new_successful_result = self.machine.result
453 B2INFO("We just succeeded in execution of the Algorithm."
454 f" Will now commit payloads from the previous success for {last_successful_result.iov}.")
455 self.machine.algorithm.algorithm.commit(last_successful_payloads)
456 self.results.append(last_successful_result)
457 self.send_result(last_successful_result)
458 # If there are remaining runs we need to have the current payloads ready to commit after the next execution
459 if remaining_runs:
460 last_successful_payloads = new_successful_payloads
461 last_successful_result = new_successful_result
462 # If there's not more runs to process we should also commit the new ones
463 else:
464 B2INFO("We have no more runs to process. "
465 f"Will now commit the most recent payloads for {new_successful_result.iov}.")
466 self.machine.algorithm.algorithm.commit(new_successful_payloads)
467 self.results.append(new_successful_result)
468 self.send_result(new_successful_result)
469 break
470 # if there's no previous success this must be the first run executed
471 else:
472 # Need to save payloads for later if we have a success but runs remain
473 if remaining_runs:
474 B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
475 # Save the payloads and result
476 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
477 last_successful_result = self.machine.result
478 # Need to commit and exit if we have a success and no remaining data
479 else:
480 B2INFO("We just succeeded in execution of the Algorithm."
481 " No runs left to be processed, so we are committing results of this execution.")
482 self.machine.algorithm.algorithm.commit()
483 self.results.append(self.machine.result)
484 self.send_result(self.machine.result)
485 break
486
487 previous_runs = current_runs[:]
488 current_runs = []
489 # If it wasn't successful, was it due to lack of data in the runs?
490 elif (self.machine.result.result == AlgResult.not_enough_data.value):
491 B2INFO(f"There wasn't enough data in {self.machine.result.iov}.")
492 if remaining_runs:
493 B2INFO("Some runs remain to be processed. "
494 f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
495 elif not remaining_runs and not last_successful_result:
496 B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success."
497 " There wasn't enough data in the full input data requested.")
498 self.results.append(self.machine.result)
499 self.send_result(self.machine.result)
500 self.machine.fail()
501 break
502 elif not remaining_runs and last_successful_result:
503 B2INFO("There aren't any more runs remaining to merge with. But we had a previous success"
504 ", so we'll merge with the previous IoV.")
505 final_runs = current_runs[:]
506 current_runs = previous_runs
507 current_runs.extend(final_runs)
508 self.machine.fail()
509 elif self.machine.result.result == AlgResult.failure.value:
510 B2ERROR(f"{self.algorithm.name} returned failure exit code.")
511 self.results.append(self.machine.result)
512 self.send_result(self.machine.result)
513 self.machine.fail()
514 break
515 else:
516 # Check if we need to run a final execution on the previous execution + dangling set of runs
517 if current_runs:
518 self.machine.setup_algorithm()
519 apply_iov = IoV(last_successful_result.iov.exp_low,
520 last_successful_result.iov.run_low,
521 *highest_exprun)
522 B2INFO(f"Executing on {apply_iov}.")
523 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
524 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
525 if (self.machine.result.result == AlgResult.ok.value) or (
526 self.machine.result.result == AlgResult.iterate.value):
527 self.machine.complete()
528 # Commit all the payloads and send out the results
529 self.machine.algorithm.algorithm.commit()
530 # Save the result
531 self.results.append(self.machine.result)
532 self.send_result(self.machine.result)
533 else:
534 # Save the result
535 self.results.append(self.machine.result)
536 self.send_result(self.machine.result)
537 # But failed
538 self.machine.fail()
539
540
542 """
543 Algorithm strategy to do run-by-run calibration of collected data.
544 Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
545
546 This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
547 'not enough data' on the current run.
548
549 Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
550 next run (if any are left).
551 Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
552
553 .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
554 code.
555 The failure will prevent iteration/successful completion of the CAF though.
556
557 .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
558 enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
559 The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
560 However, you will still have the database created that covers all the successful runs.
561
562 This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
563 a CalibrationAlgorithm C++ class directly.
564"""
565
566 allowed_granularities = ["run"]
567
569 usable_params = {}
570
571 def __init__(self, algorithm):
572 """
573 """
574 super().__init__(algorithm)
575
577 self.machine = AlgorithmMachine(self.algorithm)
578
579 def run(self, iov, iteration, queue):
580 """
581 Runs the algorithm machine over the collected data and fills the results.
582 """
583
584 if not self.is_valid():
585 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
586 self.queue = queue
587
588 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
589 # Now add all the necessary parameters for a strategy to run
590 machine_params = {}
591 machine_params["database_chain"] = self.database_chain
592 machine_params["dependent_databases"] = self.dependent_databases
593 machine_params["output_dir"] = self.output_dir
594 machine_params["output_database_dir"] = self.output_database_dir
595 machine_params["input_files"] = self.input_files
596 machine_params["ignored_runs"] = self.ignored_runs
597 self.machine.setup_from_dict(machine_params)
598 # Start moving through machine states
599 B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
600 self.machine.setup_algorithm(iteration=iteration)
601 # After this point, the logging is in the stdout of the algorithm
602 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
603
604 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
605 # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
606 if iov:
607 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
608 else:
609 runs_to_execute = all_runs_collected
610
611 # Remove the ignored runs from our run list to execute
612 if self.ignored_runs:
613 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
614 runs_to_execute.difference_update(set(self.ignored_runs))
615 # Sets aren't ordered so lets go back to lists and sort
616 runs_to_execute = sorted(runs_to_execute)
617
618 # Is this the first time executing the algorithm?
619 first_execution = True
620 for exprun in runs_to_execute:
621 if not first_execution:
622 self.machine.setup_algorithm()
623 current_runs = exprun
624 apply_iov = iov_from_runs([current_runs])
625 B2INFO(f"Executing on IoV = {apply_iov}.")
626 self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
627 first_execution = False
628 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
629 # Does this count as a successful execution?
630 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
631 # Commit the payloads and result
632 B2INFO(f"Committing payloads for {iov_from_runs([current_runs])}.")
633 self.machine.algorithm.algorithm.commit()
634 self.results.append(self.machine.result)
635 self.send_result(self.machine.result)
636 self.machine.complete()
637 # If it wasn't successful, was it due to lack of data in the runs?
638 elif (self.machine.result.result == AlgResult.not_enough_data.value):
639 B2INFO(f"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
640 self.results.append(self.machine.result)
641 self.send_result(self.machine.result)
642 self.machine.fail()
643 elif self.machine.result.result == AlgResult.failure.value:
644 B2ERROR(f"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
645 self.results.append(self.machine.result)
646 self.send_result(self.machine.result)
647 self.machine.fail()
648
649 # Print any knowable gaps between result IoVs, if any are foun there is a problem.
650 gaps = self.find_iov_gaps()
651 # Dump them to a file for logging
652 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
653 json.dump(gaps, f)
654
655 self.send_final_state(self.COMPLETED)
656
657
659 """
660 Algorithm strategy to first calculate run boundaries where execution should be attempted.
661 Runs the algorithm over the input data contained within the requested IoV of the boundaries,
662 starting with the first boundary data only.
663 If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
664 but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
665 but using run boundaries instead of runs directly.
666 Notice that boundaries cannot span multiple experiments.
667
668 By default the algorithm will get the payload boundaries directly from the algorithm that need to
669 have implemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
670 is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
671 the need to define the ``isBoundaryRequired`` function.
672
673 ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
674 experiment will be added if not already present. An empty list will thus produce a single payload for each
675 experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
676 behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
677 """
678
680 usable_params = {
681 "iov_coverage": IoV,
682 "payload_boundaries": [] # [(exp1, run1), (exp2, run2), ...]
683 }
684
685
686 allowed_granularities = ["run"]
687
688 def __init__(self, algorithm):
689 """
690 """
691 super().__init__(algorithm)
692
694 self.machine = AlgorithmMachine(self.algorithm)
695
696 self.first_execution = True
697
698 def run(self, iov, iteration, queue):
699 """
700 Runs the algorithm machine over the collected data and fills the results.
701 """
702 if not self.is_valid():
703 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
704 self.queue = queue
705 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
706 # Now add all the necessary parameters for a strategy to run
707 machine_params = {}
708 machine_params["database_chain"] = self.database_chain
709 machine_params["dependent_databases"] = self.dependent_databases
710 machine_params["output_dir"] = self.output_dir
711 machine_params["output_database_dir"] = self.output_database_dir
712 machine_params["input_files"] = self.input_files
713 machine_params["ignored_runs"] = self.ignored_runs
714 self.machine.setup_from_dict(machine_params)
715 # Start moving through machine states
716 self.machine.setup_algorithm(iteration=iteration)
717 # After this point, the logging is in the stdout of the algorithm
718 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
719 runs_to_execute = []
720 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
721 # If we were given a specific IoV to calibrate we just execute over runs in that IoV
722 if iov:
723 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
724 else:
725 runs_to_execute = all_runs_collected[:]
726
727 # Remove the ignored runs from our run list to execute
728 if self.ignored_runs:
729 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
730 runs_to_execute.difference_update(set(self.ignored_runs))
731 # Sets aren't ordered so lets go back to lists and sort
732 runs_to_execute = sorted(runs_to_execute)
733
734 # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
735 # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
736 # separately and prevent IoVs from crossing the boundary.
737 runs_to_execute = split_runs_by_exp(runs_to_execute)
738
739 # Now iterate through the experiments. We DO NOT allow a payload IoV to
740 # extend over multiple experiments, only multiple runs
741 iov_coverage = None
742 if "iov_coverage" in self.algorithm.params:
743 B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
744 iov_coverage = self.algorithm.params["iov_coverage"]
745
746 payload_boundaries = None
747 if "payload_boundaries" in self.algorithm.params:
748 B2INFO(f"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
749 payload_boundaries = self.algorithm.params["payload_boundaries"]
750
751 number_of_experiments = len(runs_to_execute)
752 B2INFO(f"We are iterating over {number_of_experiments} experiments.")
753
754 # Iterate over experiment run lists
755 for i_exp, run_list in enumerate(runs_to_execute, start=1):
756 B2DEBUG(26, f"Run List for this experiment={run_list}")
757 current_experiment = run_list[0].exp
758 B2INFO(f"Executing over data from experiment {current_experiment}")
759 # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
760 # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
761 # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
762 # This is only true for the lowest and highest experiments in our input data.
763 if i_exp == 1:
764 if iov_coverage:
765 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
766 else:
767 lowest_exprun = run_list[0]
768 # We are calibrating across multiple experiments so we shouldn't start from the middle but from the 0th run
769 else:
770 lowest_exprun = ExpRun(current_experiment, 0)
771
772 # Override the normal value for the highest ExpRun (from data) if iov_coverage was set
773 if iov_coverage and i_exp == number_of_experiments:
774 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
775 # If we have more experiments to execute then we will be setting the final payload IoV in this experiment
776 # to be unbounded
777 elif i_exp < number_of_experiments:
778 highest_exprun = ExpRun(current_experiment, -1)
779 # Otherwise just get the values from data
780 else:
781 highest_exprun = run_list[-1]
782
783 # Find the boundaries for this experiment's runs
784 vec_run_list = vector_from_runs(run_list)
785 if payload_boundaries is None:
786 # Find the boundaries using the findPayloadBoundaries implemented in the algorithm
787 B2INFO("Attempting to find payload boundaries.")
788 vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list)
789 # If this vector is empty then that's bad. Maybe the isBoundaryRequired function
790 # wasn't implemented? Either way we should stop.
791 if vec_boundaries.empty():
792 B2ERROR("No boundaries found but we are in a strategy that requires them! Failing...")
793 # Tell the Runner that we have failed
794 self.send_final_state(self.FAILED)
795 break
796 vec_boundaries = runs_from_vector(vec_boundaries)
797 else:
798 # Using boundaries set by user
799 B2INFO(f"Using as payload boundaries {payload_boundaries}.")
800 vec_boundaries = [ExpRun(exp, run) for exp, run in payload_boundaries]
801 # No need to check that vec_boundaries is not empty. In case it is we will anyway add
802 # a boundary at the first run of each experiment.
803 # Remove any boundaries not from the current experiment (only likely if they were set manually)
804 # We sort just to make everything easier later and just in case something mad happened.
805 run_boundaries = sorted([er for er in vec_boundaries if er.exp == current_experiment])
806 # In this strategy we consider separately each experiment. We then now check that the
807 # boundary (exp, 0) is present and if not we add it. It is indeed possible to miss it
808 # if the boundaries were given manually
809 first_exprun = ExpRun(current_experiment, 0)
810 if first_exprun not in run_boundaries:
811 B2WARNING(f"No boundary found at ({current_experiment}, 0), adding it.")
812 run_boundaries[0:0] = [first_exprun]
813 B2INFO(f"Found {len(run_boundaries)} boundaries for this experiment. "
814 "Checking if we have some data for all boundary IoVs...")
815 # First figure out the run lists to use for each execution (potentially different from the applied IoVs)
816 # We use the boundaries and the run_list
817 boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
818 B2DEBUG(26, f"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
819 # If there were any boundary IoVs with no run data, just remove them. Otherwise they will execute over all data.
820 boundary_iovs_to_run_lists = {key: value for key, value in boundary_iovs_to_run_lists.items() if value}
821 B2DEBUG(26, f"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
822 # If any were removed then we might have gaps between the boundary IoVs. Fix those now by merging IoVs.
823 new_boundary_iovs_to_run_lists = {}
824 previous_boundary_iov = None
825 previous_boundary_run_list = None
826 for boundary_iov, run_list in boundary_iovs_to_run_lists.items():
827 if not previous_boundary_iov:
828 previous_boundary_iov = boundary_iov
829 previous_boundary_run_list = run_list
830 continue
831 # We are definitely dealiing with IoVs from one experiment so we can make assumptions here
832 if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
833 B2WARNING("Gap in boundary IoVs found before execution! "
834 "Will correct it by extending the previous boundary up to the next one.")
835 B2INFO(f"Original boundary IoV={previous_boundary_iov}")
836 previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
837 previous_boundary_iov.exp_high, boundary_iov.run_low-1)
838 B2INFO(f"New boundary IoV={previous_boundary_iov}")
839 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
840 previous_boundary_iov = boundary_iov
841 previous_boundary_run_list = run_list
842 else:
843 new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
844 boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
845 B2DEBUG(26, f"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
846 # Actually execute now that we have an IoV list to apply
847 success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
848 if not success:
849 # Tell the Runner that we have failed
850 self.send_final_state(self.FAILED)
851 break
852 # Only executes if we didn't fail any experiment execution
853 else:
854 # Print any knowable gaps between result IoVs, if any are found there is a problem, but not necessarily too bad.
855 gaps = self.find_iov_gaps()
856 if gaps:
857 B2WARNING("There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
858 # Dump them to a file for logging
859 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
860 json.dump(gaps, f)
861
862 # If any results weren't successes we fail
863 if self.any_failed_iov():
864 self.send_final_state(self.FAILED)
865 else:
866 self.send_final_state(self.COMPLETED)
867
868 def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration):
869 """
870 Take the previously found boundaries and the run lists they correspond to and actually perform the
871 Algorithm execution. This is assumed to be for a single experiment.
872 """
873 # Copy of boundary IoVs
874 remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
875
876 # The current runs we are executing
877 current_runs = []
878 # The IoV of the current boundary(s)
879 current_boundary_iov = None
880 # The current execution's applied IoV, may be different to the boundary IoV
881 current_iov = None
882
883 # The last successful payload list and result. We hold on to them so that we can commit or discard later.
884 last_successful_payloads = None
885 last_successful_result = None
886 # The previous execution's runs
887 last_successful_runs = []
888 # The previous execution's applied IoV
889 last_successful_iov = None
890
891 while True:
892 # Do we have previous successes?
893 if not last_successful_result:
894 if not current_runs:
895 # Did we actually have any boundaries?
896 if not remaining_boundary_iovs:
897 # Fail because we have no boundaries to use
898 B2ERROR("No boundaries found for the current experiment's run list. Failing the strategy.")
899 return False
900
901 B2INFO("This appears to be the first attempted execution of the experiment.")
902 # Attempt to execute on the first boundary
903 current_boundary_iov = remaining_boundary_iovs.pop(0)
904 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
905 # What if there is only one boundary? Need to apply the highest exprun
906 if not remaining_boundary_iovs:
907 current_iov = IoV(*lowest_exprun, *highest_exprun)
908 else:
909 current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
910 # Returned not enough data from first execution
911 else:
912 # Any remaining boundaries?
913 if not remaining_boundary_iovs:
914 # Fail because we have no boundaries to use
915 B2ERROR("Not enough data found for the current experiment's run list. Failing the strategy.")
916 return False
917
918 B2INFO("There wasn't enough data previously. Merging with the runs from the next boundary.")
919 # Extend the previous run lists/iovs
920 next_boundary_iov = remaining_boundary_iovs.pop(0)
921 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
922 next_boundary_iov.exp_high, next_boundary_iov.run_high)
923 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
924 # At the last boundary? Need to apply the highest exprun
925 if not remaining_boundary_iovs:
926 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
927 else:
928 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
929 current_boundary_iov.exp_high, current_boundary_iov.run_high)
930
931 self.execute_runs(current_runs, iteration, current_iov)
932
933 # Does this count as a successful execution?
934 if self.alg_success():
935 # Commit previous values we were holding onto
936 B2INFO("Found a success. Will save the payloads for later.")
937 # Save success
938 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
939 last_successful_result = self.machine.result
940 last_successful_runs = current_runs[:]
941 last_successful_iov = current_iov
942 # Reset values for next loop
943 current_runs = []
944 current_boundary_iov = None
945 current_iov = None
946 self.machine.complete()
947 continue
948 elif self.machine.result.result == AlgResult.not_enough_data.value:
949 B2INFO("Not Enough Data result.")
950 # Just complete but leave the current runs alone for next loop
951 self.machine.complete()
952 continue
953 else:
954 B2ERROR("Hit a failure or some kind of result we can't continue from. Failing out...")
955 self.machine.fail()
956 return False
957 # Previous result exists
958 else:
959 # Previous loop was a success
960 if not current_runs:
961 # Remaining boundaries?
962 if not remaining_boundary_iovs:
963 # Out of data, can now commit
964 B2INFO("Finished this experiment's boundaries. "
965 f"Committing remaining payloads from {last_successful_result.iov}")
966 self.machine.algorithm.algorithm.commit(last_successful_payloads)
967 self.results.append(last_successful_result)
968 self.send_result(last_successful_result)
969 return True
970
971 # Remaining boundaries exist so we try to execute
972 current_boundary_iov = remaining_boundary_iovs.pop(0)
973 current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
974 # What if there is only one boundary? Need to apply the highest exprun
975 if not remaining_boundary_iovs:
976 current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
977 else:
978 current_iov = current_boundary_iov
979
980 # Returned not enough data from last execution
981 else:
982 # Any remaining boundaries?
983 if not remaining_boundary_iovs:
984 B2INFO("We have no remaining runs to increase the amount of data. "
985 "Instead we will merge with the previous successful runs.")
986 # Merge with previous success IoV
987 new_current_runs = last_successful_runs[:]
988 new_current_runs.extend(current_runs)
989 current_runs = new_current_runs[:]
990 current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
991 current_iov.exp_high, current_iov.run_high)
992 # We reset the last successful stuff because we are dropping it
993 last_successful_payloads = []
994 last_successful_result = None
995 last_successful_runs = []
996 last_successful_iov = None
997
998 else:
999 B2INFO("Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
1000 # Extend the previous run lists/iovs
1001 next_boundary_iov = remaining_boundary_iovs.pop(0)
1002 current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
1003 next_boundary_iov.exp_high, next_boundary_iov.run_high)
1004 # Extend previous execution's runs with the next set
1005 current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
1006 # At the last boundary? Need to apply the highest exprun
1007 if not remaining_boundary_iovs:
1008 current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
1009 else:
1010 current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1011 current_boundary_iov.exp_high, current_boundary_iov.run_high)
1012
1013 self.execute_runs(current_runs, iteration, current_iov)
1014
1015 # Does this count as a successful execution?
1016 if self.alg_success():
1017 # Commit previous values we were holding onto
1018 B2INFO("Found a success.")
1019 if last_successful_result:
1020 B2INFO("Can now commit the previous success.")
1021 self.machine.algorithm.algorithm.commit(last_successful_payloads)
1022 self.results.append(last_successful_result)
1023 self.send_result(last_successful_result)
1024 # Replace last success
1025 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
1026 last_successful_result = self.machine.result
1027 last_successful_runs = current_runs[:]
1028 last_successful_iov = current_iov
1029 # Reset values for next loop
1030 current_runs = []
1031 current_boundary_iov = None
1032 current_iov = None
1033 self.machine.complete()
1034 continue
1035 elif self.machine.result.result == AlgResult.not_enough_data.value:
1036 B2INFO("Not Enough Data result.")
1037 # Just complete but leave the current runs alone for next loop
1038 self.machine.complete()
1039 continue
1040 else:
1041 B2ERROR("Hit a failure or some other result we can't continue from. Failing out...")
1042 self.machine.fail()
1043 return False
1044
1045 def execute_runs(self, runs, iteration, iov):
1046 """Execute runs"""
1047 # Already set up earlier the first time, so we shouldn't do it again
1048 if not self.first_execution:
1049 self.machine.setup_algorithm()
1050 else:
1051 self.first_execution = False
1052
1053 B2INFO(f"Executing and applying {iov} to the payloads.")
1054 self.machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1055 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
1056
1057 def alg_success(self):
1058 """Check whether algorithm was successful"""
1059 return ((self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value))
1060
1061
1062class StrategyError(Exception):
1063 """
1064 Basic Exception for this type of class.
1065 """
list required_true_attrs
Attributes that must have a value that returns True when tested by :py:meth:is_valid.
Definition strategies.py:58
list results
The list of results objects which will be sent out before the end.
Definition strategies.py:95
list input_files
Collector output files, will contain all files returned by the output patterns.
Definition strategies.py:82
list dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
Definition strategies.py:90
list database_chain
User defined database chain i.e.
Definition strategies.py:88
run(self, iov, iteration, queue)
str COMPLETED
Completed state.
Definition strategies.py:71
list required_attrs
Required attributes that must exist before the strategy can run properly.
Definition strategies.py:48
algorithm
Algorithm() class that we're running.
Definition strategies.py:80
__init__(self, algorithm)
Definition strategies.py:76
str output_database_dir
The output database directory for the localdb that the algorithm will commit to.
Definition strategies.py:86
list ignored_runs
Runs that will not be included in ANY execution of the algorithm.
Definition strategies.py:93
str output_dir
The algorithm output directory which is mostly used to store the stdout file.
Definition strategies.py:84
queue
The multiprocessing Queue we use to pass back results one at a time.
Definition strategies.py:97
str FAILED
Failed state.
Definition strategies.py:74
execute_runs(self, runs, iteration, iov)
run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
bool first_execution
boolean storing whether this is the first time the algorithm is executed
execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun)
run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
bool first_execution
boolean storing whether this is the first time the algorithm is executed
apply_experiment_settings(self, algorithm, experiment)
run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
__init__(self, algorithm)
run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
__init__(self, algorithm)