Belle II Software  release-05-02-19
strategies.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
5 from caf.utils import AlgResult
6 from caf.utils import B2INFO_MULTILINE
7 from caf.utils import runs_overlapping_iov, runs_from_vector
8 from caf.utils import iov_from_runs, split_runs_by_exp, vector_from_runs
9 from caf.utils import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
10 from caf.utils import IoV, ExpRun
11 from caf.state_machines import AlgorithmMachine
12 
13 from abc import ABC, abstractmethod
14 import json
15 
16 
17 class AlgorithmStrategy(ABC):
18  """
19  Base class for Algorithm strategies. These do the actual execution of a single
20  algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
21  how database payloads are passed between executions, and whether or not final payloads have an IoV
22  that is independent to the actual runs used to calculates them.
23 
24  Parameters:
25  algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
26 
27  This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
28  When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
29 
30  If you define your derived class with an __init__ method, then you should first call the base class
31  `AlgorithmStrategy.__init__()` method via super() e.g.
32 
33  >>> def __init__(self):
34  >>> super().__init__()
35 
36  The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
37  in the required way defined by the options you have selected/attributes set.
38  """
39 
41  required_attrs = ["algorithm",
42  "database_chain",
43  "dependent_databases",
44  "output_dir",
45  "output_database_dir",
46  "input_files",
47  "ignored_runs"
48  ]
49 
50 
51  required_true_attrs = ["algorithm",
52  "output_dir",
53  "output_database_dir",
54  "input_files"
55  ]
56 
57 
58  allowed_granularities = ["run", "all"]
59 
60 
61  FINISHED_RESULTS = "DONE"
62 
63 
64  COMPLETED = "COMPLETED"
65 
66 
67  FAILED = "FAILED"
68 
69  def __init__(self, algorithm):
70  """
71  """
72 
73  self.algorithm = algorithm
74 
75  self.input_files = []
76 
77  self.output_dir = ""
78 
80 
81  self.database_chain = []
82 
84 
86  self.ignored_runs = []
87 
88  self.results = []
89 
90  self.queue = None
91 
92  @abstractmethod
93  def run(self, iov, iteration, queue):
94  """
95  Abstract method that needs to be implemented. It will be called to actually execute the
96  algorithm.
97  """
98 
99  def setup_from_dict(self, params):
100  """
101  Parameters:
102  params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
103  """
104  for attribute_name, value in params.items():
105  setattr(self, attribute_name, value)
106 
107  def is_valid(self):
108  """
109  Returns:
110  bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
111  """
112  B2INFO("Checking validity of current AlgorithmStrategy setup.")
113  # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
114  for attribute_name in self.required_attrs:
115  if not hasattr(self, attribute_name):
116  B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
117  return False
118  # Check if any attributes that need actual values haven't been set or were empty
119  for attribute_name in self.required_true_attrs:
120  if not getattr(self, attribute_name):
121  B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.")
122  return False
123  return True
124 
125  def find_iov_gaps(self):
126  """
127  Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
128  not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
129  within the same experiment are found.
130 
131  Returns:
132  iov_gaps(list[IoV])
133  """
134  iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.results]))
135  if iov_gaps:
136  gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."]
137  gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.")
138  gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs")
139  gap_msg.append("unless you edit the final database.txt yourself.")
140  B2INFO_MULTILINE(gap_msg)
141  for iov in iov_gaps:
142  B2INFO(f"{iov} not covered by any execution of the algorithm.")
143  return iov_gaps
144 
145  def any_failed_iov(self):
146  """
147  Returns:
148  bool: If any result in the current results list has a failed algorithm code we return True
149  """
150  failed_results = []
151  for result in self.results:
152  if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value:
153  failed_results.append(result)
154  if failed_results:
155  B2WARNING("Failed results found.")
156  for result in failed_results:
157  if result.result == AlgResult.failure.value:
158  B2ERROR(f"c_Failure returned for {result.iov}.")
159  elif result.result == AlgResult.not_enough_data.value:
160  B2WARNING(f"c_NotEnoughData returned for {result.iov}.")
161  return True
162  else:
163  return False
164 
165  def send_result(self, result):
166  self.queue.put({"type": "result", "value": result})
167 
168  def send_final_state(self, state):
169  self.queue.put({"type": "final_state", "value": state})
170 
171 
173  """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
174  data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
175  that was executed.
176 
177  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
178  a CalibrationAlgorithm C++ class directly.
179 """
180 
182  usable_params = {"apply_iov": IoV}
183 
184  def __init__(self, algorithm):
185  """
186  """
187  super().__init__(algorithm)
188 
190  self.machine = AlgorithmMachine(self.algorithm)
191 
192  def run(self, iov, iteration, queue):
193  """
194  Runs the algorithm machine over the collected data and fills the results.
195  """
196  if not self.is_valid():
197  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
198  self.queue = queue
199 
200  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
201  # Now add all the necessary parameters for a strategy to run
202  machine_params = {}
203  machine_params["database_chain"] = self.database_chain
204  machine_params["dependent_databases"] = self.dependent_databases
205  machine_params["output_dir"] = self.output_dir
206  machine_params["output_database_dir"] = self.output_database_dir
207  machine_params["input_files"] = self.input_files
208  machine_params["ignored_runs"] = self.ignored_runs
209  self.machine.setup_from_dict(machine_params)
210  # Start moving through machine states
211  B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
212  self.machine.setup_algorithm(iteration=iteration)
213  # After this point, the logging is in the stdout of the algorithm
214  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
215 
216  all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
217  # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
218  if iov:
219  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
220  else:
221  runs_to_execute = all_runs_collected
222 
223  # Remove the ignored runs from our run list to execute
224  if self.ignored_runs:
225  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
226  runs_to_execute.difference_update(set(self.ignored_runs))
227  # Sets aren't ordered so lets go back to lists and sort
228  runs_to_execute = sorted(runs_to_execute)
229  apply_iov = None
230  if "apply_iov" in self.algorithm.params:
231  apply_iov = self.algorithm.params["apply_iov"]
232  self.machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
233  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
234 
235  # Send out the result to the runner
236  self.send_result(self.machine.result)
237 
238  # Make sure the algorithm state and commit is done
239  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
240  # Valid exit codes mean we can complete properly
241  self.machine.complete()
242  # Commit all the payloads and send out the results
243  self.machine.algorithm.algorithm.commit()
244  self.send_final_state(self.COMPLETED)
245  else:
246  # Either there wasn't enough data or the algorithm failed
247  self.machine.fail()
248  self.send_final_state(self.FAILED)
249 
250 
252  """
253  Algorithm strategy to do run-by-run calibration of collected data.
254  Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
255  If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
256  the next run's data and tries again.
257 
258  Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
259  and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
260  is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
261  are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
262 
263  Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
264  This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration.
265 
266  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
267  a CalibrationAlgorithm C++ class directly.
268 """
269 
271  usable_params = {
272  "has_experiment_settings": bool,
273  "iov_coverage": IoV,
274  "step_size": int
275  }
276 
277 
278  allowed_granularities = ["run"]
279 
280  def __init__(self, algorithm):
281  """
282  """
283  super().__init__(algorithm)
284 
286  self.machine = AlgorithmMachine(self.algorithm)
287  if "step_size" not in self.algorithm.params:
288  self.algorithm.params["step_size"] = 1
289  self.first_execution = True
290 
291  def apply_experiment_settings(self, algorithm, experiment):
292  """
293  Apply experiment-dependent settings.
294  This is the default version, which does not do anything.
295  If necessary, it should be reimplemented by derived classes.
296  """
297  return
298 
299  def run(self, iov, iteration, queue):
300  """
301  Runs the algorithm machine over the collected data and fills the results.
302  """
303  if not self.is_valid():
304  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
305  self.queue = queue
306  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
307  # Now add all the necessary parameters for a strategy to run
308  machine_params = {}
309  machine_params["database_chain"] = self.database_chain
310  machine_params["dependent_databases"] = self.dependent_databases
311  machine_params["output_dir"] = self.output_dir
312  machine_params["output_database_dir"] = self.output_database_dir
313  machine_params["input_files"] = self.input_files
314  machine_params["ignored_runs"] = self.ignored_runs
315  self.machine.setup_from_dict(machine_params)
316  # Start moving through machine states
317  self.machine.setup_algorithm(iteration=iteration)
318  # After this point, the logging is in the stdout of the algorithm
319  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
320  runs_to_execute = []
321  all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
322  # If we were given a specific IoV to calibrate we just execute over runs in that IoV
323  if iov:
324  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
325  else:
326  runs_to_execute = all_runs_collected[:]
327 
328  # Remove the ignored runs from our run list to execute
329  if self.ignored_runs:
330  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
331  runs_to_execute.difference_update(set(self.ignored_runs))
332  # Sets aren't ordered so lets go back to lists and sort
333  runs_to_execute = sorted(runs_to_execute)
334 
335  # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
336  # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
337  # separately and prevent IoVs from crossing the boundary.
338  runs_to_execute = split_runs_by_exp(runs_to_execute)
339 
340  # Now iterate through the experiments, executing runs in blocks of 'step_size'. We DO NOT allow a payload IoV to
341  # extend over multiple experiments, only multiple runs
342  iov_coverage = None
343  if "iov_coverage" in self.algorithm.params:
344  B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
345  iov_coverage = self.algorithm.params["iov_coverage"]
346 
347  number_of_experiments = len(runs_to_execute)
348  # Iterate over experiment run lists
349  for i_exp, run_list in enumerate(runs_to_execute, start=1):
350 
351  # Apply experiment-dependent settings.
352  if "has_experiment_settings" in self.algorithm.params:
353  if self.algorithm.params["has_experiment_settings"]:
354  self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
355 
356  # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
357  # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
358  # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
359  # This is only true for the lowest and highest experiments in our input data.
360  # If we have multiple experiments the iov must be adjusted to avoid gaps at the iov boundaries
361  lowest_exprun = ExpRun(run_list[0].exp, 0)
362  highest_exprun = ExpRun(run_list[-1].exp, -1)
363 
364  if i_exp == 1:
365  lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low) if iov_coverage else run_list[0]
366  if i_exp == number_of_experiments:
367  highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high) if iov_coverage else run_list[-1]
368 
369  self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
370 
371  # Print any knowable gaps between result IoVs, if any are foun there is a problem.
372  gaps = self.find_iov_gaps()
373  # Dump them to a file for logging
374  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
375  json.dump(gaps, f)
376 
377  # If any results weren't successes we fail
378  if self.any_failed_iov():
379  self.send_final_state(self.FAILED)
380  else:
381  self.send_final_state(self.COMPLETED)
382 
383  def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
384  # The runs (data) we have left to execute from this run list
385  remaining_runs = run_list[:]
386  # The previous execution's runs
387  previous_runs = []
388  # The current runs we are executing
389  current_runs = []
390  # The last successful payload and result
391  last_successful_payloads = None
392  last_successful_result = None
393 
394  # Iterate over ExpRuns within an experiment in chunks of 'step_size'
395  for expruns in grouper(self.algorithm.params["step_size"], run_list):
396  # Already set up earlier the first time, so we shouldn't do it again
397  if not self.first_execution:
398  self.machine.setup_algorithm()
399  else:
400  self.first_execution = False
401 
402  # Add on the next step of runs
403  current_runs.extend(expruns)
404  # Remove them from our remaining runs
405  remaining_runs = [run for run in remaining_runs if run not in current_runs]
406 
407  # Is this the first payload of the experiment
408  if not last_successful_result:
409  B2INFO("Detected that this will be the first payload of this experiment.")
410  # If this is the first payload but we have other data, we need the IoV to cover from the
411  # lowest IoV extent requested up to the ExpRun right before the next run in the remaining runs list.
412  if remaining_runs:
413  apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
414  # If this is the first payload but there isn't more data, we set the IoV to cover the full range
415  else:
416  B2INFO("Detected that this will be the only payload of the experiment.")
417  apply_iov = IoV(*lowest_exprun, *highest_exprun)
418  # If there were previous successes
419  else:
420  if not remaining_runs:
421  B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.")
422  apply_iov = IoV(*current_runs[0], *highest_exprun)
423  # Othewise, it's just a normal IoV in the middle.
424  else:
425  B2INFO("Detected that there are more runs to execute in this experiment after this next execution.")
426  apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
427 
428  B2INFO(f"Executing and applying {apply_iov} to the payloads.")
429  self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
430  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
431 
432  # Does this count as a successful execution?
433  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
434  self.machine.complete()
435  # If we've succeeded but we have a previous success we can commit the previous payloads
436  # since we have new ones ready
437  if last_successful_payloads and last_successful_result:
438  B2INFO("Saving this execution's payloads to be committed later.")
439  # Save the payloads and result
440  new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
441  new_successful_result = self.machine.result
442  B2INFO("We just succeded in execution of the Algorithm."
443  f" Will now commit payloads from the previous success for {last_successful_result.iov}.")
444  self.machine.algorithm.algorithm.commit(last_successful_payloads)
445  self.results.append(last_successful_result)
446  self.send_result(last_successful_result)
447  # If there are remaining runs we need to have the current payloads ready to commit after the next execution
448  if remaining_runs:
449  last_successful_payloads = new_successful_payloads
450  last_successful_result = new_successful_result
451  # If there's not more runs to process we should also commit the new ones
452  else:
453  B2INFO("We have no more runs to process. "
454  f"Will now commit the most recent payloads for {new_successful_result.iov}.")
455  self.machine.algorithm.algorithm.commit(new_successful_payloads)
456  self.results.append(new_successful_result)
457  self.send_result(new_successful_result)
458  break
459  # if there's no previous success this must be the first run executed
460  else:
461  # Need to save payloads for later if we have a success but runs remain
462  if remaining_runs:
463  B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
464  # Save the payloads and result
465  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
466  last_successful_result = self.machine.result
467  # Need to commit and exit if we have a success and no remaining data
468  else:
469  B2INFO("We just succeeded in execution of the Algorithm."
470  " No runs left to be processed, so we are committing results of this execution.")
471  self.machine.algorithm.algorithm.commit()
472  self.results.append(self.machine.result)
473  self.send_result(self.machine.result)
474  break
475 
476  previous_runs = current_runs[:]
477  current_runs = []
478  # If it wasn't successful, was it due to lack of data in the runs?
479  elif (self.machine.result.result == AlgResult.not_enough_data.value):
480  B2INFO(f"There wasn't enough data in {self.machine.result.iov}.")
481  if remaining_runs:
482  B2INFO("Some runs remain to be processed. "
483  f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
484  elif not remaining_runs and not last_successful_result:
485  B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success."
486  " There wasn't enough data in the full input data requested.")
487  self.results.append(self.machine.result)
488  self.send_result(self.machine.result)
489  self.machine.fail()
490  break
491  elif not remaining_runs and last_successful_result:
492  B2INFO("There aren't any more runs remaining to merge with. But we had a previous success"
493  ", so we'll merge with the previous IoV.")
494  final_runs = current_runs[:]
495  current_runs = previous_runs
496  current_runs.extend(final_runs)
497  self.machine.fail()
498  elif self.machine.result.result == AlgResult.failure.value:
499  B2ERROR(f"{self.algorithm.name} returned failure exit code.")
500  self.results.append(self.machine.result)
501  self.send_result(self.machine.result)
502  self.machine.fail()
503  break
504  else:
505  # Check if we need to run a final execution on the previous execution + dangling set of runs
506  if current_runs:
507  self.machine.setup_algorithm()
508  apply_iov = IoV(last_successful_result.iov.exp_low,
509  last_successful_result.iov.run_low,
510  *highest_exprun)
511  B2INFO(f"Executing on {apply_iov}.")
512  self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
513  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
514  if (self.machine.result.result == AlgResult.ok.value) or (
515  self.machine.result.result == AlgResult.iterate.value):
516  self.machine.complete()
517  # Commit all the payloads and send out the results
518  self.machine.algorithm.algorithm.commit()
519  # Save the result
520  self.results.append(self.machine.result)
521  self.send_result(self.machine.result)
522  else:
523  # Save the result
524  self.results.append(self.machine.result)
525  self.send_result(self.machine.result)
526  # But failed
527  self.machine.fail()
528 
529 
531  """
532  Algorithm strategy to do run-by-run calibration of collected data.
533  Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
534 
535  This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
536  'not enough data' on the current run.
537 
538  Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
539  next run (if any are left).
540  Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
541 
542  .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
543  code.
544  The failure will prevent iteration/successful completion of the CAF though.
545 
546  .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
547  enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
548  The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
549  However, you will still have the database created that covers all the successfull runs.
550 
551  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
552  a CalibrationAlgorithm C++ class directly.
553 """
554 
555  allowed_granularities = ["run"]
556 
558  usable_params = {}
559 
560  def __init__(self, algorithm):
561  """
562  """
563  super().__init__(algorithm)
564 
566  self.machine = AlgorithmMachine(self.algorithm)
567 
568  def run(self, iov, iteration, queue):
569  """
570  Runs the algorithm machine over the collected data and fills the results.
571  """
572 
573  if not self.is_valid():
574  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
575  self.queue = queue
576 
577  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
578  # Now add all the necessary parameters for a strategy to run
579  machine_params = {}
580  machine_params["database_chain"] = self.database_chain
581  machine_params["dependent_databases"] = self.dependent_databases
582  machine_params["output_dir"] = self.output_dir
583  machine_params["output_database_dir"] = self.output_database_dir
584  machine_params["input_files"] = self.input_files
585  machine_params["ignored_runs"] = self.ignored_runs
586  self.machine.setup_from_dict(machine_params)
587  # Start moving through machine states
588  B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
589  self.machine.setup_algorithm(iteration=iteration)
590  # After this point, the logging is in the stdout of the algorithm
591  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
592 
593  all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
594  # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
595  if iov:
596  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
597  else:
598  runs_to_execute = all_runs_collected
599 
600  # Remove the ignored runs from our run list to execute
601  if self.ignored_runs:
602  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
603  runs_to_execute.difference_update(set(self.ignored_runs))
604  # Sets aren't ordered so lets go back to lists and sort
605  runs_to_execute = sorted(runs_to_execute)
606 
607  # Is this the first time executing the algorithm?
608  first_execution = True
609  for exprun in runs_to_execute:
610  if not first_execution:
611  self.machine.setup_algorithm()
612  current_runs = exprun
613  apply_iov = iov_from_runs([current_runs])
614  B2INFO(f"Executing on IoV = {apply_iov}.")
615  self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
616  first_execution = False
617  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
618  # Does this count as a successful execution?
619  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
620  # Commit the payloads and result
621  B2INFO(f"Committing payloads for {iov_from_runs([current_runs])}.")
622  self.machine.algorithm.algorithm.commit()
623  self.results.append(self.machine.result)
624  self.send_result(self.machine.result)
625  self.machine.complete()
626  # If it wasn't successful, was it due to lack of data in the runs?
627  elif (self.machine.result.result == AlgResult.not_enough_data.value):
628  B2INFO(f"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
629  self.results.append(self.machine.result)
630  self.send_result(self.machine.result)
631  self.machine.fail()
632  elif self.machine.result.result == AlgResult.failure.value:
633  B2ERROR(f"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
634  self.results.append(self.machine.result)
635  self.send_result(self.machine.result)
636  self.machine.fail()
637 
638  # Print any knowable gaps between result IoVs, if any are foun there is a problem.
639  gaps = self.find_iov_gaps()
640  # Dump them to a file for logging
641  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
642  json.dump(gaps, f)
643 
644  self.send_final_state(self.COMPLETED)
645 
646 
648  """
649  Algorithm strategy to first calculate run boundaries where execution should be attempted.
650  Runs the algorithm over the input data contained within the requested IoV of the boundaries,
651  starting with the first boundary data only.
652  If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
653  but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
654  but using run boundaries instead of runs directly.
655  Notice that boundaries cannot span multiple experiments.
656 
657  By default the algorithm will get the payload boundaries directly from the algorithm that need to
658  have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
659  is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
660  the need to define the ``isBoundaryRequired`` function.
661 
662  ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
663  experiment will be added if not already present. An empty list will thus produce a single payload for each
664  experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
665  behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
666  """
667 
669  usable_params = {
670  "iov_coverage": IoV,
671  "payload_boundaries": [] # [(exp1, run1), (exp2, run2), ...]
672  }
673 
674 
675  allowed_granularities = ["run"]
676 
677  def __init__(self, algorithm):
678  """
679  """
680  super().__init__(algorithm)
681 
683  self.machine = AlgorithmMachine(self.algorithm)
684  self.first_execution = True
685 
686  def run(self, iov, iteration, queue):
687  """
688  Runs the algorithm machine over the collected data and fills the results.
689  """
690  if not self.is_valid():
691  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
692  self.queue = queue
693  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
694  # Now add all the necessary parameters for a strategy to run
695  machine_params = {}
696  machine_params["database_chain"] = self.database_chain
697  machine_params["dependent_databases"] = self.dependent_databases
698  machine_params["output_dir"] = self.output_dir
699  machine_params["output_database_dir"] = self.output_database_dir
700  machine_params["input_files"] = self.input_files
701  machine_params["ignored_runs"] = self.ignored_runs
702  self.machine.setup_from_dict(machine_params)
703  # Start moving through machine states
704  self.machine.setup_algorithm(iteration=iteration)
705  # After this point, the logging is in the stdout of the algorithm
706  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
707  runs_to_execute = []
708  all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
709  # If we were given a specific IoV to calibrate we just execute over runs in that IoV
710  if iov:
711  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
712  else:
713  runs_to_execute = all_runs_collected[:]
714 
715  # Remove the ignored runs from our run list to execute
716  if self.ignored_runs:
717  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
718  runs_to_execute.difference_update(set(self.ignored_runs))
719  # Sets aren't ordered so lets go back to lists and sort
720  runs_to_execute = sorted(runs_to_execute)
721 
722  # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
723  # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
724  # separately and prevent IoVs from crossing the boundary.
725  runs_to_execute = split_runs_by_exp(runs_to_execute)
726 
727  # Now iterate through the experiments. We DO NOT allow a payload IoV to
728  # extend over multiple experiments, only multiple runs
729  iov_coverage = None
730  if "iov_coverage" in self.algorithm.params:
731  B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
732  iov_coverage = self.algorithm.params["iov_coverage"]
733 
734  payload_boundaries = None
735  if "payload_boundaries" in self.algorithm.params:
736  B2INFO(f"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
737  payload_boundaries = self.algorithm.params["payload_boundaries"]
738 
739  number_of_experiments = len(runs_to_execute)
740  B2INFO(f"We are iterating over {number_of_experiments} experiments.")
741 
742  # Iterate over experiment run lists
743  for i_exp, run_list in enumerate(runs_to_execute, start=1):
744  B2DEBUG(26, f"Run List for this experiment={run_list}")
745  current_experiment = run_list[0].exp
746  B2INFO(f"Executing over data from experiment {current_experiment}")
747  # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
748  # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
749  # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
750  # This is only true for the lowest and highest experiments in our input data.
751  if i_exp == 1:
752  if iov_coverage:
753  lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
754  else:
755  lowest_exprun = run_list[0]
756  # We are calibrating across multiple experiments so we shouldn't start from the middle but from the 0th run
757  else:
758  lowest_exprun = ExpRun(current_experiment, 0)
759 
760  # Override the normal value for the highest ExpRun (from data) if iov_coverage was set
761  if iov_coverage and i_exp == number_of_experiments:
762  highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
763  # If we have more experiments to execute then we wil be setting the final payload IoV in this experiment
764  # to be unbounded
765  elif i_exp < number_of_experiments:
766  highest_exprun = ExpRun(current_experiment, -1)
767  # Otherwise just get the values from data
768  else:
769  highest_exprun = run_list[-1]
770 
771  # Find the boundaries for this experiment's runs
772  vec_run_list = vector_from_runs(run_list)
773  if payload_boundaries is None:
774  # Find the boundaries using the findPayloadBoundaries implemented in the algorithm
775  B2INFO("Attempting to find payload boundaries.")
776  vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list)
777  # If this vector is empty then that's bad. Maybe the isBoundaryRequired function
778  # wasn't implemented? Either way we should stop.
779  if vec_boundaries.empty():
780  B2ERROR("No boundaries found but we are in a strategy that requires them! Failing...")
781  # Tell the Runner that we have failed
782  self.send_final_state(self.FAILED)
783  break
784  vec_boundaries = runs_from_vector(vec_boundaries)
785  else:
786  # Using boundaries set by user
787  B2INFO(f"Using as payload boundaries {payload_boundaries}.")
788  vec_boundaries = [ExpRun(exp, run) for exp, run in payload_boundaries]
789  # No need to check that vec_boundaries is not empty. In case it is we will anyway add
790  # a boundary at the first run of each experiment.
791  # Remove any boundaries not from the current experiment (only likely if they were set manually)
792  # We sort just to make everything easier later and just in case something mad happened.
793  run_boundaries = sorted([er for er in vec_boundaries if er.exp == current_experiment])
794  # In this strategy we consider separately each experiment. We then now check that the
795  # boundary (exp, 0) is present and if not we add it. It is indeed possible to miss it
796  # if the boundaries were given manually
797  first_exprun = ExpRun(current_experiment, 0)
798  if first_exprun not in run_boundaries:
799  B2WARNING(f"No boundary found at ({current_experiment}, 0), adding it.")
800  run_boundaries[0:0] = [first_exprun]
801  B2INFO((f"Found {len(run_boundaries)} boundaries for this experiment. "
802  "Checking if we have some data for all boundary IoVs..."))
803  # First figure out the run lists to use for each execution (potentially different from the applied IoVs)
804  # We use the boundaries and the run_list
805  boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
806  B2DEBUG(26, f"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
807  # If there were any boundary IoVs with no run data, just remove them. Otherwise they will execute over all data.
808  boundary_iovs_to_run_lists = {key: value for key, value in boundary_iovs_to_run_lists.items() if value}
809  B2DEBUG(26, f"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
810  # If any were removed then we might have gaps between the boundary IoVs. Fix those now by merging IoVs.
811  new_boundary_iovs_to_run_lists = {}
812  previous_boundary_iov = None
813  previous_boundary_run_list = None
814  for boundary_iov, run_list in boundary_iovs_to_run_lists.items():
815  if not previous_boundary_iov:
816  previous_boundary_iov = boundary_iov
817  previous_boundary_run_list = run_list
818  continue
819  # We are definitely dealiing with IoVs from one experiment so we can make assumptions here
820  if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
821  B2WARNING("Gap in boundary IoVs found before execution! "
822  "Will correct it by extending the previous boundary up to the next one.")
823  B2INFO(f"Original boundary IoV={previous_boundary_iov}")
824  previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
825  previous_boundary_iov.exp_high, boundary_iov.run_low-1)
826  B2INFO(f"New boundary IoV={previous_boundary_iov}")
827  new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
828  previous_boundary_iov = boundary_iov
829  previous_boundary_run_list = run_list
830  else:
831  new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
832  boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
833  B2DEBUG(26, f"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
834  # Actually execute now that we have an IoV list to apply
835  success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
836  if not success:
837  # Tell the Runner that we have failed
838  self.send_final_state(self.FAILED)
839  break
840  # Only executes if we didn't fail any experiment execution
841  else:
842  # Print any knowable gaps between result IoVs, if any are found there is a problem, but not necessarily too bad.
843  gaps = self.find_iov_gaps()
844  if gaps:
845  B2WARNING("There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
846  # Dump them to a file for logging
847  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
848  json.dump(gaps, f)
849 
850  # If any results weren't successes we fail
851  if self.any_failed_iov():
852  self.send_final_state(self.FAILED)
853  else:
854  self.send_final_state(self.COMPLETED)
855 
856  def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration):
857  """
858  Take the previously found boundaries and the run lists they correspond to and actually perform the
859  Algorithm execution. This is assumed to be for a single experiment.
860  """
861  # Copy of boundary IoVs
862  remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
863 
864  # The current runs we are executing
865  current_runs = []
866  # The IoV of the current boundary(s)
867  current_boundary_iov = None
868  # The current execution's applied IoV, may be different to the boundary IoV
869  current_iov = None
870 
871  # The last successful payload list and result. We hold on to them so that we can commit or discard later.
872  last_successful_payloads = None
873  last_successful_result = None
874  # The previous execution's runs
875  last_successful_runs = []
876  # The previous execution's applied IoV
877  last_successful_iov = None
878 
879  while True:
880  # Do we have previous successes?
881  if not last_successful_result:
882  if not current_runs:
883  # Did we actually have any boundaries?
884  if not remaining_boundary_iovs:
885  # Fail because we have no boundaries to use
886  B2ERROR("No boundaries found for the current experiment's run list. Failing the strategy.")
887  return False
888 
889  B2INFO("This appears to be the first attempted execution of the experiment.")
890  # Attempt to execute on the first boundary
891  current_boundary_iov = remaining_boundary_iovs.pop(0)
892  current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
893  # What if there is only one boundary? Need to apply the highest exprun
894  if not remaining_boundary_iovs:
895  current_iov = IoV(*lowest_exprun, *highest_exprun)
896  else:
897  current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
898  # Returned not enough data from first execution
899  else:
900  # Any remaining boundaries?
901  if not remaining_boundary_iovs:
902  # Fail because we have no boundaries to use
903  B2ERROR("Not enough data found for the current experiment's run list. Failing the strategy.")
904  return False
905 
906  B2INFO("There wasn't enough data previously. Merging with the runs from the next boundary.")
907  # Extend the previous run lists/iovs
908  next_boundary_iov = remaining_boundary_iovs.pop(0)
909  current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
910  next_boundary_iov.exp_high, next_boundary_iov.run_high)
911  current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
912  # At the last boundary? Need to apply the highest exprun
913  if not remaining_boundary_iovs:
914  current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
915  else:
916  current_iov = IoV(current_iov.exp_low, current_iov.run_low,
917  current_boundary_iov.exp_high, current_boundary_iov.run_high)
918 
919  self.execute_runs(current_runs, iteration, current_iov)
920 
921  # Does this count as a successful execution?
922  if self.alg_success():
923  # Commit previous values we were holding onto
924  B2INFO("Found a success. Will save the payloads for later.")
925  # Save success
926  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
927  last_successful_result = self.machine.result
928  last_successful_runs = current_runs[:]
929  last_successful_iov = current_iov
930  # Reset values for next loop
931  current_runs = []
932  current_boundary_iov = None
933  current_iov = None
934  self.machine.complete()
935  continue
936  elif self.machine.result.result == AlgResult.not_enough_data.value:
937  B2INFO("Not Enough Data result.")
938  # Just complete but leave the current runs alone for next loop
939  self.machine.complete()
940  continue
941  else:
942  B2ERROR("Hit a failure or some kind of result we can't continue from. Failing out...")
943  self.machine.fail()
944  return False
945  # Previous result exists
946  else:
947  # Previous loop was a success
948  if not current_runs:
949  # Remaining boundaries?
950  if not remaining_boundary_iovs:
951  # Out of data, can now commit
952  B2INFO("Finished this experiment's boundaries. "
953  f"Committing remaining payloads from {last_successful_result.iov}")
954  self.machine.algorithm.algorithm.commit(last_successful_payloads)
955  self.results.append(last_successful_result)
956  self.send_result(last_successful_result)
957  return True
958 
959  # Remaining boundaries exist so we try to execute
960  current_boundary_iov = remaining_boundary_iovs.pop(0)
961  current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
962  # What if there is only one boundary? Need to apply the highest exprun
963  if not remaining_boundary_iovs:
964  current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
965  else:
966  current_iov = current_boundary_iov
967 
968  # Returned not enough data from last execution
969  else:
970  # Any remaining boundaries?
971  if not remaining_boundary_iovs:
972  B2INFO("We have no remaining runs to increase the amount of data. "
973  "Instead we will merge with the previous successful runs.")
974  # Merge with previous success IoV
975  new_current_runs = last_successful_runs[:]
976  new_current_runs.extend(current_runs)
977  current_runs = new_current_runs[:]
978  current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
979  current_iov.exp_high, current_iov.run_high)
980  # We reset the last successful stuff because we are dropping it
981  last_successful_payloads = []
982  last_successful_result = None
983  last_successful_runs = []
984  last_successful_iov = None
985 
986  else:
987  B2INFO("Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
988  # Extend the previous run lists/iovs
989  next_boundary_iov = remaining_boundary_iovs.pop(0)
990  current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
991  next_boundary_iov.exp_high, next_boundary_iov.run_high)
992  # Extend previous execution's runs with the next set
993  current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
994  # At the last boundary? Need to apply the highest exprun
995  if not remaining_boundary_iovs:
996  current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
997  else:
998  current_iov = IoV(current_iov.exp_low, current_iov.run_low,
999  current_boundary_iov.exp_high, current_boundary_iov.run_high)
1000 
1001  self.execute_runs(current_runs, iteration, current_iov)
1002 
1003  # Does this count as a successful execution?
1004  if self.alg_success():
1005  # Commit previous values we were holding onto
1006  B2INFO("Found a success.")
1007  if last_successful_result:
1008  B2INFO("Can now commit the previous success.")
1009  self.machine.algorithm.algorithm.commit(last_successful_payloads)
1010  self.results.append(last_successful_result)
1011  self.send_result(last_successful_result)
1012  # Replace last success
1013  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
1014  last_successful_result = self.machine.result
1015  last_successful_runs = current_runs[:]
1016  last_successful_iov = current_iov
1017  # Reset values for next loop
1018  current_runs = []
1019  current_boundary_iov = None
1020  current_iov = None
1021  self.machine.complete()
1022  continue
1023  elif self.machine.result.result == AlgResult.not_enough_data.value:
1024  B2INFO("Not Enough Data result.")
1025  # Just complete but leave the current runs alone for next loop
1026  self.machine.complete()
1027  continue
1028  else:
1029  B2ERROR("Hit a failure or some other result we can't continue from. Failing out...")
1030  self.machine.fail()
1031  return False
1032 
1033  def execute_runs(self, runs, iteration, iov):
1034  # Already set up earlier the first time, so we shouldn't do it again
1035  if not self.first_execution:
1036  self.machine.setup_algorithm()
1037  else:
1038  self.first_execution = False
1039 
1040  B2INFO(f"Executing and applying {iov} to the payloads.")
1041  self.machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1042  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
1043 
1044  def alg_success(self):
1045  return ((self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value))
1046 
1047 
1048 class StrategyError(Exception):
1049  """
1050  Basic Exception for this type of class.
1051  """
strategies.AlgorithmStrategy.database_chain
database_chain
User defined database chain i.e.
Definition: strategies.py:81
strategies.SingleIOV
Definition: strategies.py:172
strategies.AlgorithmStrategy.output_dir
output_dir
The algorithm output directory which is mostly used to store the stdout file.
Definition: strategies.py:77
strategies.AlgorithmStrategy.find_iov_gaps
def find_iov_gaps(self)
Definition: strategies.py:125
strategies.AlgorithmStrategy.send_final_state
def send_final_state(self, state)
Definition: strategies.py:168
strategies.AlgorithmStrategy.input_files
input_files
Collector output files, will contain all files retured by the output patterns.
Definition: strategies.py:75
strategies.AlgorithmStrategy.setup_from_dict
def setup_from_dict(self, params)
Definition: strategies.py:99
strategies.SequentialBoundaries.run
def run(self, iov, iteration, queue)
Definition: strategies.py:686
strategies.AlgorithmStrategy.COMPLETED
string COMPLETED
Completed state.
Definition: strategies.py:64
strategies.AlgorithmStrategy.algorithm
algorithm
Algorithm() class that we're running.
Definition: strategies.py:73
strategies.SequentialRunByRun
Definition: strategies.py:251
strategies.SequentialRunByRun.apply_experiment_settings
def apply_experiment_settings(self, algorithm, experiment)
Definition: strategies.py:291
strategies.AlgorithmStrategy.run
def run(self, iov, iteration, queue)
Definition: strategies.py:93
strategies.AlgorithmStrategy.FAILED
string FAILED
Failed state.
Definition: strategies.py:67
strategies.SequentialBoundaries.execute_runs
def execute_runs(self, runs, iteration, iov)
Definition: strategies.py:1033
strategies.AlgorithmStrategy.send_result
def send_result(self, result)
Definition: strategies.py:165
strategies.AlgorithmStrategy
Definition: strategies.py:17
strategies.AlgorithmStrategy.dependent_databases
dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
Definition: strategies.py:83
strategies.SequentialRunByRun.run
def run(self, iov, iteration, queue)
Definition: strategies.py:299
strategies.SimpleRunByRun
Definition: strategies.py:530
strategies.AlgorithmStrategy.queue
queue
The multiprocessing Queue we use to pass back results one at a time.
Definition: strategies.py:90
strategies.SimpleRunByRun.run
def run(self, iov, iteration, queue)
Definition: strategies.py:568
strategies.AlgorithmStrategy.output_database_dir
output_database_dir
The output database directory for the localdb that the algorithm will commit to.
Definition: strategies.py:79
strategies.SequentialBoundaries.execute_over_boundaries
def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
Definition: strategies.py:856
strategies.SequentialRunByRun.machine
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:286
strategies.AlgorithmStrategy.results
results
The list of results objects which will be sent out before the end.
Definition: strategies.py:88
strategies.SequentialBoundaries
Definition: strategies.py:647
strategies.SequentialRunByRun.first_execution
first_execution
Definition: strategies.py:289
strategies.SingleIOV.run
def run(self, iov, iteration, queue)
Definition: strategies.py:192
strategies.SimpleRunByRun.machine
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:566
strategies.AlgorithmStrategy.required_true_attrs
list required_true_attrs
Attributes that must have a value that returns True when tested by :py:meth:is_valid.
Definition: strategies.py:51
strategies.SimpleRunByRun.__init__
def __init__(self, algorithm)
Definition: strategies.py:560
strategies.SingleIOV.machine
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:190
strategies.SequentialBoundaries.first_execution
first_execution
Definition: strategies.py:684
strategies.AlgorithmStrategy.required_attrs
list required_attrs
Required attributes that must exist before the strategy can run properly.
Definition: strategies.py:41
strategies.SequentialBoundaries.machine
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:683
strategies.AlgorithmStrategy.ignored_runs
ignored_runs
Runs that will not be included in ANY execution of the algorithm.
Definition: strategies.py:86
strategies.AlgorithmStrategy.is_valid
def is_valid(self)
Definition: strategies.py:107
strategies.SingleIOV.__init__
def __init__(self, algorithm)
Definition: strategies.py:184
strategies.SequentialBoundaries.__init__
def __init__(self, algorithm)
Definition: strategies.py:677
strategies.SequentialBoundaries.alg_success
def alg_success(self)
Definition: strategies.py:1044
strategies.SequentialRunByRun.__init__
def __init__(self, algorithm)
Definition: strategies.py:280
strategies.StrategyError
Definition: strategies.py:1048
strategies.AlgorithmStrategy.any_failed_iov
def any_failed_iov(self)
Definition: strategies.py:145
strategies.AlgorithmStrategy.__init__
def __init__(self, algorithm)
Definition: strategies.py:69