Belle II Software  release-05-01-25
strategies.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
5 from caf.utils import AlgResult
6 from caf.utils import B2INFO_MULTILINE
7 from caf.utils import runs_overlapping_iov, runs_from_vector
8 from caf.utils import iov_from_runs, split_runs_by_exp, vector_from_runs
9 from caf.utils import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
10 from caf.utils import IoV, ExpRun
11 from caf.state_machines import AlgorithmMachine
12 
13 from abc import ABC, abstractmethod
14 import json
15 
16 
17 class AlgorithmStrategy(ABC):
18  """
19  Base class for Algorithm strategies. These do the actual execution of a single
20  algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
21  how database payloads are passed between executions, and whether or not final payloads have an IoV
22  that is independent to the actual runs used to calculates them.
23 
24  Parameters:
25  algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
26 
27  This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
28  When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
29 
30  If you define your derived class with an __init__ method, then you should first call the base class
31  `AlgorithmStrategy.__init__()` method via super() e.g.
32 
33  >>> def __init__(self):
34  >>> super().__init__()
35 
36  The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
37  in the required way defined by the options you have selected/attributes set.
38  """
39 
41  required_attrs = ["algorithm",
42  "database_chain",
43  "dependent_databases",
44  "output_dir",
45  "output_database_dir",
46  "input_files",
47  "ignored_runs"
48  ]
49 
50 
51  required_true_attrs = ["algorithm",
52  "output_dir",
53  "output_database_dir",
54  "input_files"
55  ]
56 
57 
58  allowed_granularities = ["run", "all"]
59 
60 
61  FINISHED_RESULTS = "DONE"
62 
63 
64  COMPLETED = "COMPLETED"
65 
66 
67  FAILED = "FAILED"
68 
69  def __init__(self, algorithm):
70  """
71  """
72 
73  self.algorithm = algorithm
74 
75  self.input_files = []
76 
77  self.output_dir = ""
78 
80 
81  self.database_chain = []
82 
84 
86  self.ignored_runs = []
87 
88  self.results = []
89 
90  self.queue = None
91 
92  @abstractmethod
93  def run(self, iov, iteration, queue):
94  """
95  Abstract method that needs to be implemented. It will be called to actually execute the
96  algorithm.
97  """
98 
99  def setup_from_dict(self, params):
100  """
101  Parameters:
102  params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
103  """
104  for attribute_name, value in params.items():
105  setattr(self, attribute_name, value)
106 
107  def is_valid(self):
108  """
109  Returns:
110  bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
111  """
112  B2INFO("Checking validity of current AlgorithmStrategy setup.")
113  # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
114  for attribute_name in self.required_attrs:
115  if not hasattr(self, attribute_name):
116  B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
117  return False
118  # Check if any attributes that need actual values haven't been set or were empty
119  for attribute_name in self.required_true_attrs:
120  if not getattr(self, attribute_name):
121  B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.")
122  return False
123  return True
124 
125  def find_iov_gaps(self):
126  """
127  Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
128  not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
129  within the same experiment are found.
130 
131  Returns:
132  iov_gaps(list[IoV])
133  """
134  iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.results]))
135  if iov_gaps:
136  gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."]
137  gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.")
138  gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs")
139  gap_msg.append("unless you edit the final database.txt yourself.")
140  B2INFO_MULTILINE(gap_msg)
141  for iov in iov_gaps:
142  B2INFO(f"{iov} not covered by any execution of the algorithm.")
143  return iov_gaps
144 
145  def any_failed_iov(self):
146  """
147  Returns:
148  bool: If any result in the current results list has a failed algorithm code we return True
149  """
150  failed_results = []
151  for result in self.results:
152  if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value:
153  failed_results.append(result)
154  if failed_results:
155  B2WARNING("Failed results found.")
156  for result in failed_results:
157  if result.result == AlgResult.failure.value:
158  B2ERROR(f"c_Failure returned for {result.iov}.")
159  elif result.result == AlgResult.not_enough_data.value:
160  B2WARNING(f"c_NotEnoughData returned for {result.iov}.")
161  return True
162  else:
163  return False
164 
165  def send_result(self, result):
166  self.queue.put({"type": "result", "value": result})
167 
168  def send_final_state(self, state):
169  self.queue.put({"type": "final_state", "value": state})
170 
171 
173  """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
174  data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
175  that was executed.
176 
177  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
178  a CalibrationAlgorithm C++ class directly.
179 """
180 
182  usable_params = {"apply_iov": IoV}
183 
184  def __init__(self, algorithm):
185  """
186  """
187  super().__init__(algorithm)
188 
190  self.machine = AlgorithmMachine(self.algorithm)
191 
192  def run(self, iov, iteration, queue):
193  """
194  Runs the algorithm machine over the collected data and fills the results.
195  """
196  if not self.is_valid():
197  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
198  self.queue = queue
199 
200  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
201  # Now add all the necessary parameters for a strategy to run
202  machine_params = {}
203  machine_params["database_chain"] = self.database_chain
204  machine_params["dependent_databases"] = self.dependent_databases
205  machine_params["output_dir"] = self.output_dir
206  machine_params["output_database_dir"] = self.output_database_dir
207  machine_params["input_files"] = self.input_files
208  machine_params["ignored_runs"] = self.ignored_runs
209  self.machine.setup_from_dict(machine_params)
210  # Start moving through machine states
211  B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
212  self.machine.setup_algorithm(iteration=iteration)
213  # After this point, the logging is in the stdout of the algorithm
214  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
215 
216  all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
217  # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
218  if iov:
219  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
220  else:
221  runs_to_execute = all_runs_collected
222 
223  # Remove the ignored runs from our run list to execute
224  if self.ignored_runs:
225  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
226  runs_to_execute.difference_update(set(self.ignored_runs))
227  # Sets aren't ordered so lets go back to lists and sort
228  runs_to_execute = sorted(runs_to_execute)
229  apply_iov = None
230  if "apply_iov" in self.algorithm.params:
231  apply_iov = self.algorithm.params["apply_iov"]
232  self.machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
233  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
234 
235  # Send out the result to the runner
236  self.send_result(self.machine.result)
237 
238  # Make sure the algorithm state and commit is done
239  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
240  # Valid exit codes mean we can complete properly
241  self.machine.complete()
242  # Commit all the payloads and send out the results
243  self.machine.algorithm.algorithm.commit()
244  self.send_final_state(self.COMPLETED)
245  else:
246  # Either there wasn't enough data or the algorithm failed
247  self.machine.fail()
248  self.send_final_state(self.FAILED)
249 
250 
252  """
253  Algorithm strategy to do run-by-run calibration of collected data.
254  Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
255  If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
256  the next run's data and tries again.
257 
258  Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
259  and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
260  is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
261  are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
262 
263  Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
264  This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration.
265 
266  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
267  a CalibrationAlgorithm C++ class directly.
268 """
269 
271  usable_params = {
272  "has_experiment_settings": bool,
273  "iov_coverage": IoV,
274  "step_size": int
275  }
276 
277 
278  allowed_granularities = ["run"]
279 
280  def __init__(self, algorithm):
281  """
282  """
283  super().__init__(algorithm)
284 
286  self.machine = AlgorithmMachine(self.algorithm)
287  if "step_size" not in self.algorithm.params:
288  self.algorithm.params["step_size"] = 1
289  self.first_execution = True
290 
291  def apply_experiment_settings(self, algorithm, experiment):
292  """
293  Apply experiment-dependent settings.
294  This is the default version, which does not do anything.
295  If necessary, it should be reimplemented by derived classes.
296  """
297  return
298 
299  def run(self, iov, iteration, queue):
300  """
301  Runs the algorithm machine over the collected data and fills the results.
302  """
303  if not self.is_valid():
304  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
305  self.queue = queue
306  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
307  # Now add all the necessary parameters for a strategy to run
308  machine_params = {}
309  machine_params["database_chain"] = self.database_chain
310  machine_params["dependent_databases"] = self.dependent_databases
311  machine_params["output_dir"] = self.output_dir
312  machine_params["output_database_dir"] = self.output_database_dir
313  machine_params["input_files"] = self.input_files
314  machine_params["ignored_runs"] = self.ignored_runs
315  self.machine.setup_from_dict(machine_params)
316  # Start moving through machine states
317  self.machine.setup_algorithm(iteration=iteration)
318  # After this point, the logging is in the stdout of the algorithm
319  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
320  runs_to_execute = []
321  all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
322  # If we were given a specific IoV to calibrate we just execute over runs in that IoV
323  if iov:
324  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
325  else:
326  runs_to_execute = all_runs_collected[:]
327 
328  # Remove the ignored runs from our run list to execute
329  if self.ignored_runs:
330  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
331  runs_to_execute.difference_update(set(self.ignored_runs))
332  # Sets aren't ordered so lets go back to lists and sort
333  runs_to_execute = sorted(runs_to_execute)
334 
335  # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
336  # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
337  # separately and prevent IoVs from crossing the boundary.
338  runs_to_execute = split_runs_by_exp(runs_to_execute)
339 
340  # Now iterate through the experiments, executing runs in blocks of 'step_size'. We DO NOT allow a payload IoV to
341  # extend over multiple experiments, only multiple runs
342  iov_coverage = None
343  if "iov_coverage" in self.algorithm.params:
344  B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
345  iov_coverage = self.algorithm.params["iov_coverage"]
346 
347  number_of_experiments = len(runs_to_execute)
348  # Iterate over experiment run lists
349  for i_exp, run_list in enumerate(runs_to_execute, start=1):
350 
351  # Apply experiment-dependent settings.
352  if "has_experiment_settings" in self.algorithm.params:
353  if self.algorithm.params["has_experiment_settings"]:
354  self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
355 
356  # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
357  # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
358  # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
359  # This is only true for the lowest and highest experiments in our input data.
360  if iov_coverage and i_exp == 1:
361  lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
362  else:
363  lowest_exprun = run_list[0]
364 
365  if iov_coverage and i_exp == number_of_experiments:
366  highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
367  else:
368  highest_exprun = run_list[-1]
369 
370  self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
371 
372  # Print any knowable gaps between result IoVs, if any are foun there is a problem.
373  gaps = self.find_iov_gaps()
374  # Dump them to a file for logging
375  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
376  json.dump(gaps, f)
377 
378  # If any results weren't successes we fail
379  if self.any_failed_iov():
380  self.send_final_state(self.FAILED)
381  else:
382  self.send_final_state(self.COMPLETED)
383 
384  def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
385  # The runs (data) we have left to execute from this run list
386  remaining_runs = run_list[:]
387  # The previous execution's runs
388  previous_runs = []
389  # The current runs we are executing
390  current_runs = []
391  # The last successful payload and result
392  last_successful_payloads = None
393  last_successful_result = None
394 
395  # Iterate over ExpRuns within an experiment in chunks of 'step_size'
396  for expruns in grouper(self.algorithm.params["step_size"], run_list):
397  # Already set up earlier the first time, so we shouldn't do it again
398  if not self.first_execution:
399  self.machine.setup_algorithm()
400  else:
401  self.first_execution = False
402 
403  # Add on the next step of runs
404  current_runs.extend(expruns)
405  # Remove them from our remaining runs
406  remaining_runs = [run for run in remaining_runs if run not in current_runs]
407 
408  # Is this the first payload of the experiment
409  if not last_successful_result:
410  B2INFO("Detected that this will be the first payload of this experiment.")
411  # If this is the first payload but we have other data, we need the IoV to cover from the
412  # lowest IoV extent requested up to the ExpRun right before the next run in the remaining runs list.
413  if remaining_runs:
414  apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
415  # If this is the first payload but there isn't more data, we set the IoV to cover the full range
416  else:
417  B2INFO("Detected that this will be the only payload of the experiment.")
418  apply_iov = IoV(*lowest_exprun, *highest_exprun)
419  # If there were previous successes
420  else:
421  if not remaining_runs:
422  B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.")
423  apply_iov = IoV(*current_runs[0], *highest_exprun)
424  # Othewise, it's just a normal IoV in the middle.
425  else:
426  B2INFO("Detected that there are more runs to execute in this experiment after this next execution.")
427  apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
428 
429  B2INFO(f"Executing and applying {apply_iov} to the payloads.")
430  self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
431  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
432 
433  # Does this count as a successful execution?
434  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
435  self.machine.complete()
436  # If we've succeeded but we have a previous success we can commit the previous payloads
437  # since we have new ones ready
438  if last_successful_payloads and last_successful_result:
439  B2INFO("Saving this execution's payloads to be committed later.")
440  # Save the payloads and result
441  new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
442  new_successful_result = self.machine.result
443  B2INFO("We just succeded in execution of the Algorithm."
444  f" Will now commit payloads from the previous success for {last_successful_result.iov}.")
445  self.machine.algorithm.algorithm.commit(last_successful_payloads)
446  self.results.append(last_successful_result)
447  self.send_result(last_successful_result)
448  # If there are remaining runs we need to have the current payloads ready to commit after the next execution
449  if remaining_runs:
450  last_successful_payloads = new_successful_payloads
451  last_successful_result = new_successful_result
452  # If there's not more runs to process we should also commit the new ones
453  else:
454  B2INFO("We have no more runs to process. "
455  f"Will now commit the most recent payloads for {new_successful_result.iov}.")
456  self.machine.algorithm.algorithm.commit(new_successful_payloads)
457  self.results.append(new_successful_result)
458  self.send_result(new_successful_result)
459  break
460  # if there's no previous success this must be the first run executed
461  else:
462  # Need to save payloads for later if we have a success but runs remain
463  if remaining_runs:
464  B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
465  # Save the payloads and result
466  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
467  last_successful_result = self.machine.result
468  # Need to commit and exit if we have a success and no remaining data
469  else:
470  B2INFO("We just succeeded in execution of the Algorithm."
471  " No runs left to be processed, so we are committing results of this execution.")
472  self.machine.algorithm.algorithm.commit()
473  self.results.append(self.machine.result)
474  self.send_result(self.machine.result)
475  break
476 
477  previous_runs = current_runs[:]
478  current_runs = []
479  # If it wasn't successful, was it due to lack of data in the runs?
480  elif (self.machine.result.result == AlgResult.not_enough_data.value):
481  B2INFO(f"There wasn't enough data in {self.machine.result.iov}.")
482  if remaining_runs:
483  B2INFO("Some runs remain to be processed. "
484  f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
485  elif not remaining_runs and not last_successful_result:
486  B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success."
487  " There wasn't enough data in the full input data requested.")
488  self.results.append(self.machine.result)
489  self.send_result(self.machine.result)
490  self.machine.fail()
491  break
492  elif not remaining_runs and last_successful_result:
493  B2INFO("There aren't any more runs remaining to merge with. But we had a previous success"
494  ", so we'll merge with the previous IoV.")
495  final_runs = current_runs[:]
496  current_runs = previous_runs
497  current_runs.extend(final_runs)
498  self.machine.fail()
499  elif self.machine.result.result == AlgResult.failure.value:
500  B2ERROR(f"{self.algorithm.name} returned failure exit code.")
501  self.results.append(self.machine.result)
502  self.send_result(self.machine.result)
503  self.machine.fail()
504  break
505  else:
506  # Check if we need to run a final execution on the previous execution + dangling set of runs
507  if current_runs:
508  self.machine.setup_algorithm()
509  apply_iov = IoV(last_successful_result.iov.exp_low,
510  last_successful_result.iov.run_low,
511  *highest_exprun)
512  B2INFO(f"Executing on {apply_iov}.")
513  self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
514  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
515  if (self.machine.result.result == AlgResult.ok.value) or (
516  self.machine.result.result == AlgResult.iterate.value):
517  self.machine.complete()
518  # Commit all the payloads and send out the results
519  self.machine.algorithm.algorithm.commit()
520  # Save the result
521  self.results.append(self.machine.result)
522  self.send_result(self.machine.result)
523  else:
524  # Save the result
525  self.results.append(self.machine.result)
526  self.send_result(self.machine.result)
527  # But failed
528  self.machine.fail()
529 
530 
532  """
533  Algorithm strategy to do run-by-run calibration of collected data.
534  Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
535 
536  This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
537  'not enough data' on the current run.
538 
539  Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
540  next run (if any are left).
541  Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
542 
543  .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
544  code.
545  The failure will prevent iteration/successful completion of the CAF though.
546 
547  .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
548  enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
549  The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
550  However, you will still have the database created that covers all the successfull runs.
551 
552  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
553  a CalibrationAlgorithm C++ class directly.
554 """
555 
556  allowed_granularities = ["run"]
557 
559  usable_params = {}
560 
561  def __init__(self, algorithm):
562  """
563  """
564  super().__init__(algorithm)
565 
567  self.machine = AlgorithmMachine(self.algorithm)
568 
569  def run(self, iov, iteration, queue):
570  """
571  Runs the algorithm machine over the collected data and fills the results.
572  """
573 
574  if not self.is_valid():
575  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
576  self.queue = queue
577 
578  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
579  # Now add all the necessary parameters for a strategy to run
580  machine_params = {}
581  machine_params["database_chain"] = self.database_chain
582  machine_params["dependent_databases"] = self.dependent_databases
583  machine_params["output_dir"] = self.output_dir
584  machine_params["output_database_dir"] = self.output_database_dir
585  machine_params["input_files"] = self.input_files
586  machine_params["ignored_runs"] = self.ignored_runs
587  self.machine.setup_from_dict(machine_params)
588  # Start moving through machine states
589  B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
590  self.machine.setup_algorithm(iteration=iteration)
591  # After this point, the logging is in the stdout of the algorithm
592  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
593 
594  all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
595  # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
596  if iov:
597  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
598  else:
599  runs_to_execute = all_runs_collected
600 
601  # Remove the ignored runs from our run list to execute
602  if self.ignored_runs:
603  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
604  runs_to_execute.difference_update(set(self.ignored_runs))
605  # Sets aren't ordered so lets go back to lists and sort
606  runs_to_execute = sorted(runs_to_execute)
607 
608  # Is this the first time executing the algorithm?
609  first_execution = True
610  for exprun in runs_to_execute:
611  if not first_execution:
612  self.machine.setup_algorithm()
613  current_runs = exprun
614  apply_iov = iov_from_runs([current_runs])
615  B2INFO(f"Executing on IoV = {apply_iov}.")
616  self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
617  first_execution = False
618  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
619  # Does this count as a successful execution?
620  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
621  # Commit the payloads and result
622  B2INFO(f"Committing payloads for {iov_from_runs([current_runs])}.")
623  self.machine.algorithm.algorithm.commit()
624  self.results.append(self.machine.result)
625  self.send_result(self.machine.result)
626  self.machine.complete()
627  # If it wasn't successful, was it due to lack of data in the runs?
628  elif (self.machine.result.result == AlgResult.not_enough_data.value):
629  B2INFO(f"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
630  self.results.append(self.machine.result)
631  self.send_result(self.machine.result)
632  self.machine.fail()
633  elif self.machine.result.result == AlgResult.failure.value:
634  B2ERROR(f"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
635  self.results.append(self.machine.result)
636  self.send_result(self.machine.result)
637  self.machine.fail()
638 
639  # Print any knowable gaps between result IoVs, if any are foun there is a problem.
640  gaps = self.find_iov_gaps()
641  # Dump them to a file for logging
642  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
643  json.dump(gaps, f)
644 
645  self.send_final_state(self.COMPLETED)
646 
647 
649  """
650  Algorithm strategy to first calculate run boundaries where execution should be attempted.
651  Runs the algorithm over the input data contained within the requested IoV of the boundaries,
652  starting with the first boundary data only.
653  If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
654  but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
655  but using run boundaries instead of runs directly.
656  Notice that boundaries cannot span multiple experiments.
657 
658  By default the algorithm will get the payload boundaries directly from the algorithm that need to
659  have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
660  is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
661  the need to define the ``isBoundaryRequired`` function.
662 
663  ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
664  experiment will be added if not already present. An empty list will thus produce a single payload for each
665  experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
666  behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
667  """
668 
670  usable_params = {
671  "iov_coverage": IoV,
672  "payload_boundaries": [] # [(exp1, run1), (exp2, run2), ...]
673  }
674 
675 
676  allowed_granularities = ["run"]
677 
678  def __init__(self, algorithm):
679  """
680  """
681  super().__init__(algorithm)
682 
684  self.machine = AlgorithmMachine(self.algorithm)
685  self.first_execution = True
686 
687  def run(self, iov, iteration, queue):
688  """
689  Runs the algorithm machine over the collected data and fills the results.
690  """
691  if not self.is_valid():
692  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
693  self.queue = queue
694  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
695  # Now add all the necessary parameters for a strategy to run
696  machine_params = {}
697  machine_params["database_chain"] = self.database_chain
698  machine_params["dependent_databases"] = self.dependent_databases
699  machine_params["output_dir"] = self.output_dir
700  machine_params["output_database_dir"] = self.output_database_dir
701  machine_params["input_files"] = self.input_files
702  machine_params["ignored_runs"] = self.ignored_runs
703  self.machine.setup_from_dict(machine_params)
704  # Start moving through machine states
705  self.machine.setup_algorithm(iteration=iteration)
706  # After this point, the logging is in the stdout of the algorithm
707  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
708  runs_to_execute = []
709  all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
710  # If we were given a specific IoV to calibrate we just execute over runs in that IoV
711  if iov:
712  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
713  else:
714  runs_to_execute = all_runs_collected[:]
715 
716  # Remove the ignored runs from our run list to execute
717  if self.ignored_runs:
718  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
719  runs_to_execute.difference_update(set(self.ignored_runs))
720  # Sets aren't ordered so lets go back to lists and sort
721  runs_to_execute = sorted(runs_to_execute)
722 
723  # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
724  # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
725  # separately and prevent IoVs from crossing the boundary.
726  runs_to_execute = split_runs_by_exp(runs_to_execute)
727 
728  # Now iterate through the experiments. We DO NOT allow a payload IoV to
729  # extend over multiple experiments, only multiple runs
730  iov_coverage = None
731  if "iov_coverage" in self.algorithm.params:
732  B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
733  iov_coverage = self.algorithm.params["iov_coverage"]
734 
735  payload_boundaries = None
736  if "payload_boundaries" in self.algorithm.params:
737  B2INFO(f"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
738  payload_boundaries = self.algorithm.params["payload_boundaries"]
739 
740  number_of_experiments = len(runs_to_execute)
741  B2INFO(f"We are iterating over {number_of_experiments} experiments.")
742 
743  # Iterate over experiment run lists
744  for i_exp, run_list in enumerate(runs_to_execute, start=1):
745  B2DEBUG(26, f"Run List for this experiment={run_list}")
746  current_experiment = run_list[0].exp
747  B2INFO(f"Executing over data from experiment {current_experiment}")
748  # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
749  # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
750  # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
751  # This is only true for the lowest and highest experiments in our input data.
752  if i_exp == 1:
753  if iov_coverage:
754  lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
755  else:
756  lowest_exprun = run_list[0]
757  # We are calibrating across multiple experiments so we shouldn't start from the middle but from the 0th run
758  else:
759  lowest_exprun = ExpRun(current_experiment, 0)
760 
761  # Override the normal value for the highest ExpRun (from data) if iov_coverage was set
762  if iov_coverage and i_exp == number_of_experiments:
763  highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
764  # If we have more experiments to execute then we wil be setting the final payload IoV in this experiment
765  # to be unbounded
766  elif i_exp < number_of_experiments:
767  highest_exprun = ExpRun(current_experiment, -1)
768  # Otherwise just get the values from data
769  else:
770  highest_exprun = run_list[-1]
771 
772  # Find the boundaries for this experiment's runs
773  vec_run_list = vector_from_runs(run_list)
774  if payload_boundaries is None:
775  # Find the boundaries using the findPayloadBoundaries implemented in the algorithm
776  B2INFO("Attempting to find payload boundaries.")
777  vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list)
778  # If this vector is empty then that's bad. Maybe the isBoundaryRequired function
779  # wasn't implemented? Either way we should stop.
780  if vec_boundaries.empty():
781  B2ERROR("No boundaries found but we are in a strategy that requires them! Failing...")
782  # Tell the Runner that we have failed
783  self.send_final_state(self.FAILED)
784  break
785  vec_boundaries = runs_from_vector(vec_boundaries)
786  else:
787  # Using boundaries set by user
788  B2INFO(f"Using as payload boundaries {payload_boundaries}.")
789  vec_boundaries = [ExpRun(exp, run) for exp, run in payload_boundaries]
790  # No need to check that vec_boundaries is not empty. In case it is we will anyway add
791  # a boundary at the first run of each experiment.
792  # Remove any boundaries not from the current experiment (only likely if they were set manually)
793  # We sort just to make everything easier later and just in case something mad happened.
794  run_boundaries = sorted([er for er in vec_boundaries if er.exp == current_experiment])
795  # In this strategy we consider separately each experiment. We then now check that the
796  # boundary (exp, 0) is present and if not we add it. It is indeed possible to miss it
797  # if the boundaries were given manually
798  first_exprun = ExpRun(current_experiment, 0)
799  if first_exprun not in run_boundaries:
800  B2WARNING(f"No boundary found at ({current_experiment}, 0), adding it.")
801  run_boundaries[0:0] = [first_exprun]
802  B2INFO((f"Found {len(run_boundaries)} boundaries for this experiment. "
803  "Checking if we have some data for all boundary IoVs..."))
804  # First figure out the run lists to use for each execution (potentially different from the applied IoVs)
805  # We use the boundaries and the run_list
806  boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
807  B2DEBUG(26, f"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
808  # If there were any boundary IoVs with no run data, just remove them. Otherwise they will execute over all data.
809  boundary_iovs_to_run_lists = {key: value for key, value in boundary_iovs_to_run_lists.items() if value}
810  B2DEBUG(26, f"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
811  # If any were removed then we might have gaps between the boundary IoVs. Fix those now by merging IoVs.
812  new_boundary_iovs_to_run_lists = {}
813  previous_boundary_iov = None
814  previous_boundary_run_list = None
815  for boundary_iov, run_list in boundary_iovs_to_run_lists.items():
816  if not previous_boundary_iov:
817  previous_boundary_iov = boundary_iov
818  previous_boundary_run_list = run_list
819  continue
820  # We are definitely dealiing with IoVs from one experiment so we can make assumptions here
821  if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
822  B2WARNING("Gap in boundary IoVs found before execution! "
823  "Will correct it by extending the previous boundary up to the next one.")
824  B2INFO(f"Original boundary IoV={previous_boundary_iov}")
825  previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
826  previous_boundary_iov.exp_high, boundary_iov.run_low-1)
827  B2INFO(f"New boundary IoV={previous_boundary_iov}")
828  new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
829  previous_boundary_iov = boundary_iov
830  previous_boundary_run_list = run_list
831  else:
832  new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
833  boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
834  B2DEBUG(26, f"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
835  # Actually execute now that we have an IoV list to apply
836  success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
837  if not success:
838  # Tell the Runner that we have failed
839  self.send_final_state(self.FAILED)
840  break
841  # Only executes if we didn't fail any experiment execution
842  else:
843  # Print any knowable gaps between result IoVs, if any are found there is a problem, but not necessarily too bad.
844  gaps = self.find_iov_gaps()
845  if gaps:
846  B2WARNING("There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
847  # Dump them to a file for logging
848  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
849  json.dump(gaps, f)
850 
851  # If any results weren't successes we fail
852  if self.any_failed_iov():
853  self.send_final_state(self.FAILED)
854  else:
855  self.send_final_state(self.COMPLETED)
856 
857  def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration):
858  """
859  Take the previously found boundaries and the run lists they correspond to and actually perform the
860  Algorithm execution. This is assumed to be for a single experiment.
861  """
862  # Copy of boundary IoVs
863  remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
864 
865  # The current runs we are executing
866  current_runs = []
867  # The IoV of the current boundary(s)
868  current_boundary_iov = None
869  # The current execution's applied IoV, may be different to the boundary IoV
870  current_iov = None
871 
872  # The last successful payload list and result. We hold on to them so that we can commit or discard later.
873  last_successful_payloads = None
874  last_successful_result = None
875  # The previous execution's runs
876  last_successful_runs = []
877  # The previous execution's applied IoV
878  last_successful_iov = None
879 
880  while True:
881  # Do we have previous successes?
882  if not last_successful_result:
883  if not current_runs:
884  # Did we actually have any boundaries?
885  if not remaining_boundary_iovs:
886  # Fail because we have no boundaries to use
887  B2ERROR("No boundaries found for the current experiment's run list. Failing the strategy.")
888  return False
889 
890  B2INFO("This appears to be the first attempted execution of the experiment.")
891  # Attempt to execute on the first boundary
892  current_boundary_iov = remaining_boundary_iovs.pop(0)
893  current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
894  # What if there is only one boundary? Need to apply the highest exprun
895  if not remaining_boundary_iovs:
896  current_iov = IoV(*lowest_exprun, *highest_exprun)
897  else:
898  current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
899  # Returned not enough data from first execution
900  else:
901  # Any remaining boundaries?
902  if not remaining_boundary_iovs:
903  # Fail because we have no boundaries to use
904  B2ERROR("Not enough data found for the current experiment's run list. Failing the strategy.")
905  return False
906 
907  B2INFO("There wasn't enough data previously. Merging with the runs from the next boundary.")
908  # Extend the previous run lists/iovs
909  next_boundary_iov = remaining_boundary_iovs.pop(0)
910  current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
911  next_boundary_iov.exp_high, next_boundary_iov.run_high)
912  current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
913  # At the last boundary? Need to apply the highest exprun
914  if not remaining_boundary_iovs:
915  current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
916  else:
917  current_iov = IoV(current_iov.exp_low, current_iov.run_low,
918  current_boundary_iov.exp_high, current_boundary_iov.run_high)
919 
920  self.execute_runs(current_runs, iteration, current_iov)
921 
922  # Does this count as a successful execution?
923  if self.alg_success():
924  # Commit previous values we were holding onto
925  B2INFO("Found a success. Will save the payloads for later.")
926  # Save success
927  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
928  last_successful_result = self.machine.result
929  last_successful_runs = current_runs[:]
930  last_successful_iov = current_iov
931  # Reset values for next loop
932  current_runs = []
933  current_boundary_iov = None
934  current_iov = None
935  self.machine.complete()
936  continue
937  elif self.machine.result.result == AlgResult.not_enough_data.value:
938  B2INFO("Not Enough Data result.")
939  # Just complete but leave the current runs alone for next loop
940  self.machine.complete()
941  continue
942  else:
943  B2ERROR("Hit a failure or some kind of result we can't continue from. Failing out...")
944  self.machine.fail()
945  return False
946  # Previous result exists
947  else:
948  # Previous loop was a success
949  if not current_runs:
950  # Remaining boundaries?
951  if not remaining_boundary_iovs:
952  # Out of data, can now commit
953  B2INFO("Finished this experiment's boundaries. "
954  f"Committing remaining payloads from {last_successful_result.iov}")
955  self.machine.algorithm.algorithm.commit(last_successful_payloads)
956  self.results.append(last_successful_result)
957  self.send_result(last_successful_result)
958  return True
959 
960  # Remaining boundaries exist so we try to execute
961  current_boundary_iov = remaining_boundary_iovs.pop(0)
962  current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
963  # What if there is only one boundary? Need to apply the highest exprun
964  if not remaining_boundary_iovs:
965  current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
966  else:
967  current_iov = current_boundary_iov
968 
969  # Returned not enough data from last execution
970  else:
971  # Any remaining boundaries?
972  if not remaining_boundary_iovs:
973  B2INFO("We have no remaining runs to increase the amount of data. "
974  "Instead we will merge with the previous successful runs.")
975  # Merge with previous success IoV
976  new_current_runs = last_successful_runs[:]
977  new_current_runs.extend(current_runs)
978  current_runs = new_current_runs[:]
979  current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
980  current_iov.exp_high, current_iov.run_high)
981  # We reset the last successful stuff because we are dropping it
982  last_successful_payloads = []
983  last_successful_result = None
984  last_successful_runs = []
985  last_successful_iov = None
986 
987  else:
988  B2INFO("Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
989  # Extend the previous run lists/iovs
990  next_boundary_iov = remaining_boundary_iovs.pop(0)
991  current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
992  next_boundary_iov.exp_high, next_boundary_iov.run_high)
993  # Extend previous execution's runs with the next set
994  current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
995  # At the last boundary? Need to apply the highest exprun
996  if not remaining_boundary_iovs:
997  current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
998  else:
999  current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1000  current_boundary_iov.exp_high, current_boundary_iov.run_high)
1001 
1002  self.execute_runs(current_runs, iteration, current_iov)
1003 
1004  # Does this count as a successful execution?
1005  if self.alg_success():
1006  # Commit previous values we were holding onto
1007  B2INFO("Found a success.")
1008  if last_successful_result:
1009  B2INFO("Can now commit the previous success.")
1010  self.machine.algorithm.algorithm.commit(last_successful_payloads)
1011  self.results.append(last_successful_result)
1012  self.send_result(last_successful_result)
1013  # Replace last success
1014  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
1015  last_successful_result = self.machine.result
1016  last_successful_runs = current_runs[:]
1017  last_successful_iov = current_iov
1018  # Reset values for next loop
1019  current_runs = []
1020  current_boundary_iov = None
1021  current_iov = None
1022  self.machine.complete()
1023  continue
1024  elif self.machine.result.result == AlgResult.not_enough_data.value:
1025  B2INFO("Not Enough Data result.")
1026  # Just complete but leave the current runs alone for next loop
1027  self.machine.complete()
1028  continue
1029  else:
1030  B2ERROR("Hit a failure or some other result we can't continue from. Failing out...")
1031  self.machine.fail()
1032  return False
1033 
1034  def execute_runs(self, runs, iteration, iov):
1035  # Already set up earlier the first time, so we shouldn't do it again
1036  if not self.first_execution:
1037  self.machine.setup_algorithm()
1038  else:
1039  self.first_execution = False
1040 
1041  B2INFO(f"Executing and applying {iov} to the payloads.")
1042  self.machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1043  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
1044 
1045  def alg_success(self):
1046  return ((self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value))
1047 
1048 
1049 class StrategyError(Exception):
1050  """
1051  Basic Exception for this type of class.
1052  """
strategies.AlgorithmStrategy.database_chain
database_chain
User defined database chain i.e.
Definition: strategies.py:81
strategies.SingleIOV
Definition: strategies.py:172
strategies.AlgorithmStrategy.output_dir
output_dir
The algorithm output directory which is mostly used to store the stdout file.
Definition: strategies.py:77
strategies.AlgorithmStrategy.find_iov_gaps
def find_iov_gaps(self)
Definition: strategies.py:125
strategies.AlgorithmStrategy.send_final_state
def send_final_state(self, state)
Definition: strategies.py:168
strategies.AlgorithmStrategy.input_files
input_files
Collector output files, will contain all files retured by the output patterns.
Definition: strategies.py:75
strategies.AlgorithmStrategy.setup_from_dict
def setup_from_dict(self, params)
Definition: strategies.py:99
strategies.SequentialBoundaries.run
def run(self, iov, iteration, queue)
Definition: strategies.py:687
strategies.AlgorithmStrategy.COMPLETED
string COMPLETED
Completed state.
Definition: strategies.py:64
strategies.AlgorithmStrategy.algorithm
algorithm
Algorithm() class that we're running.
Definition: strategies.py:73
strategies.SequentialRunByRun
Definition: strategies.py:251
strategies.SequentialRunByRun.apply_experiment_settings
def apply_experiment_settings(self, algorithm, experiment)
Definition: strategies.py:291
strategies.AlgorithmStrategy.run
def run(self, iov, iteration, queue)
Definition: strategies.py:93
strategies.AlgorithmStrategy.FAILED
string FAILED
Failed state.
Definition: strategies.py:67
strategies.SequentialBoundaries.execute_runs
def execute_runs(self, runs, iteration, iov)
Definition: strategies.py:1034
strategies.AlgorithmStrategy.send_result
def send_result(self, result)
Definition: strategies.py:165
strategies.AlgorithmStrategy
Definition: strategies.py:17
strategies.AlgorithmStrategy.dependent_databases
dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
Definition: strategies.py:83
strategies.SequentialRunByRun.run
def run(self, iov, iteration, queue)
Definition: strategies.py:299
strategies.SimpleRunByRun
Definition: strategies.py:531
strategies.AlgorithmStrategy.queue
queue
The multiprocessing Queue we use to pass back results one at a time.
Definition: strategies.py:90
strategies.SimpleRunByRun.run
def run(self, iov, iteration, queue)
Definition: strategies.py:569
strategies.AlgorithmStrategy.output_database_dir
output_database_dir
The output database directory for the localdb that the algorithm will commit to.
Definition: strategies.py:79
strategies.SequentialBoundaries.execute_over_boundaries
def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
Definition: strategies.py:857
strategies.SequentialRunByRun.machine
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:286
strategies.AlgorithmStrategy.results
results
The list of results objects which will be sent out before the end.
Definition: strategies.py:88
strategies.SequentialBoundaries
Definition: strategies.py:648
strategies.SequentialRunByRun.first_execution
first_execution
Definition: strategies.py:289
strategies.SingleIOV.run
def run(self, iov, iteration, queue)
Definition: strategies.py:192
strategies.SimpleRunByRun.machine
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:567
strategies.AlgorithmStrategy.required_true_attrs
list required_true_attrs
Attributes that must have a value that returns True when tested by :py:meth:is_valid.
Definition: strategies.py:51
strategies.SimpleRunByRun.__init__
def __init__(self, algorithm)
Definition: strategies.py:561
strategies.SingleIOV.machine
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:190
strategies.SequentialBoundaries.first_execution
first_execution
Definition: strategies.py:685
strategies.AlgorithmStrategy.required_attrs
list required_attrs
Required attributes that must exist before the strategy can run properly.
Definition: strategies.py:41
strategies.SequentialBoundaries.machine
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:684
strategies.AlgorithmStrategy.ignored_runs
ignored_runs
Runs that will not be included in ANY execution of the algorithm.
Definition: strategies.py:86
strategies.AlgorithmStrategy.is_valid
def is_valid(self)
Definition: strategies.py:107
strategies.SingleIOV.__init__
def __init__(self, algorithm)
Definition: strategies.py:184
strategies.SequentialBoundaries.__init__
def __init__(self, algorithm)
Definition: strategies.py:678
strategies.SequentialBoundaries.alg_success
def alg_success(self)
Definition: strategies.py:1045
strategies.SequentialRunByRun.__init__
def __init__(self, algorithm)
Definition: strategies.py:280
strategies.StrategyError
Definition: strategies.py:1049
strategies.AlgorithmStrategy.any_failed_iov
def any_failed_iov(self)
Definition: strategies.py:145
strategies.AlgorithmStrategy.__init__
def __init__(self, algorithm)
Definition: strategies.py:69