Belle II Software  release-06-02-00
strategies.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
13 from caf.utils import AlgResult
14 from caf.utils import B2INFO_MULTILINE
15 from caf.utils import runs_overlapping_iov, runs_from_vector
16 from caf.utils import iov_from_runs, split_runs_by_exp, vector_from_runs
17 from caf.utils import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
18 from caf.utils import IoV, ExpRun
19 from caf.state_machines import AlgorithmMachine
20 
21 from abc import ABC, abstractmethod
22 import json
23 
24 
25 class AlgorithmStrategy(ABC):
26  """
27  Base class for Algorithm strategies. These do the actual execution of a single
28  algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
29  how database payloads are passed between executions, and whether or not final payloads have an IoV
30  that is independent to the actual runs used to calculates them.
31 
32  Parameters:
33  algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
34 
35  This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
36  When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
37 
38  If you define your derived class with an __init__ method, then you should first call the base class
39  `AlgorithmStrategy.__init__()` method via super() e.g.
40 
41  >>> def __init__(self):
42  >>> super().__init__()
43 
44  The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
45  in the required way defined by the options you have selected/attributes set.
46  """
47 
49  required_attrs = ["algorithm",
50  "database_chain",
51  "dependent_databases",
52  "output_dir",
53  "output_database_dir",
54  "input_files",
55  "ignored_runs"
56  ]
57 
58 
59  required_true_attrs = ["algorithm",
60  "output_dir",
61  "output_database_dir",
62  "input_files"
63  ]
64 
65 
66  allowed_granularities = ["run", "all"]
67 
68 
69  FINISHED_RESULTS = "DONE"
70 
71 
72  COMPLETED = "COMPLETED"
73 
74 
75  FAILED = "FAILED"
76 
77  def __init__(self, algorithm):
78  """
79  """
80 
81  self.algorithmalgorithm = algorithm
82 
83  self.input_filesinput_files = []
84 
85  self.output_diroutput_dir = ""
86 
87  self.output_database_diroutput_database_dir = ""
88 
89  self.database_chaindatabase_chain = []
90 
91  self.dependent_databasesdependent_databases = []
92 
94  self.ignored_runsignored_runs = []
95 
96  self.resultsresults = []
97 
98  self.queuequeue = None
99 
100  @abstractmethod
101  def run(self, iov, iteration, queue):
102  """
103  Abstract method that needs to be implemented. It will be called to actually execute the
104  algorithm.
105  """
106 
107  def setup_from_dict(self, params):
108  """
109  Parameters:
110  params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
111  """
112  for attribute_name, value in params.items():
113  setattr(self, attribute_name, value)
114 
115  def is_valid(self):
116  """
117  Returns:
118  bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
119  """
120  B2INFO("Checking validity of current AlgorithmStrategy setup.")
121  # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
122  for attribute_name in self.required_attrsrequired_attrs:
123  if not hasattr(self, attribute_name):
124  B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
125  return False
126  # Check if any attributes that need actual values haven't been set or were empty
127  for attribute_name in self.required_true_attrsrequired_true_attrs:
128  if not getattr(self, attribute_name):
129  B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.")
130  return False
131  return True
132 
133  def find_iov_gaps(self):
134  """
135  Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
136  not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
137  within the same experiment are found.
138 
139  Returns:
140  iov_gaps(list[IoV])
141  """
142  iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.resultsresults]))
143  if iov_gaps:
144  gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."]
145  gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.")
146  gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs")
147  gap_msg.append("unless you edit the final database.txt yourself.")
148  B2INFO_MULTILINE(gap_msg)
149  for iov in iov_gaps:
150  B2INFO(f"{iov} not covered by any execution of the algorithm.")
151  return iov_gaps
152 
153  def any_failed_iov(self):
154  """
155  Returns:
156  bool: If any result in the current results list has a failed algorithm code we return True
157  """
158  failed_results = []
159  for result in self.resultsresults:
160  if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value:
161  failed_results.append(result)
162  if failed_results:
163  B2WARNING("Failed results found.")
164  for result in failed_results:
165  if result.result == AlgResult.failure.value:
166  B2ERROR(f"c_Failure returned for {result.iov}.")
167  elif result.result == AlgResult.not_enough_data.value:
168  B2WARNING(f"c_NotEnoughData returned for {result.iov}.")
169  return True
170  else:
171  return False
172 
173  def send_result(self, result):
174  self.queuequeue.put({"type": "result", "value": result})
175 
176  def send_final_state(self, state):
177  self.queuequeue.put({"type": "final_state", "value": state})
178 
179 
181  """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
182  data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
183  that was executed.
184 
185  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
186  a CalibrationAlgorithm C++ class directly.
187 """
188 
190  usable_params = {"apply_iov": IoV}
191 
192  def __init__(self, algorithm):
193  """
194  """
195  super().__init__(algorithm)
196 
198  self.machinemachine = AlgorithmMachine(self.algorithmalgorithm)
199 
200  def run(self, iov, iteration, queue):
201  """
202  Runs the algorithm machine over the collected data and fills the results.
203  """
204  if not self.is_validis_valid():
205  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
206  self.queuequeuequeue = queue
207 
208  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
209  # Now add all the necessary parameters for a strategy to run
210  machine_params = {}
211  machine_params["database_chain"] = self.database_chaindatabase_chain
212  machine_params["dependent_databases"] = self.dependent_databasesdependent_databases
213  machine_params["output_dir"] = self.output_diroutput_dir
214  machine_params["output_database_dir"] = self.output_database_diroutput_database_dir
215  machine_params["input_files"] = self.input_filesinput_files
216  machine_params["ignored_runs"] = self.ignored_runsignored_runs
217  self.machinemachine.setup_from_dict(machine_params)
218  # Start moving through machine states
219  B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
220  self.machinemachine.setup_algorithm(iteration=iteration)
221  # After this point, the logging is in the stdout of the algorithm
222  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
223 
224  all_runs_collected = set(runs_from_vector(self.algorithmalgorithm.algorithm.getRunListFromAllData()))
225  # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
226  if iov:
227  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
228  else:
229  runs_to_execute = all_runs_collected
230 
231  # Remove the ignored runs from our run list to execute
232  if self.ignored_runsignored_runs:
233  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
234  runs_to_execute.difference_update(set(self.ignored_runsignored_runs))
235  # Sets aren't ordered so lets go back to lists and sort
236  runs_to_execute = sorted(runs_to_execute)
237  apply_iov = None
238  if "apply_iov" in self.algorithmalgorithm.params:
239  apply_iov = self.algorithmalgorithm.params["apply_iov"]
240  self.machinemachine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
241  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
242 
243  # Send out the result to the runner
244  self.send_resultsend_result(self.machinemachine.result)
245 
246  # Make sure the algorithm state and commit is done
247  if (self.machinemachine.result.result == AlgResult.ok.value) or (self.machinemachine.result.result == AlgResult.iterate.value):
248  # Valid exit codes mean we can complete properly
249  self.machinemachine.complete()
250  # Commit all the payloads and send out the results
251  self.machinemachine.algorithm.algorithm.commit()
252  self.send_final_statesend_final_state(self.COMPLETEDCOMPLETED)
253  else:
254  # Either there wasn't enough data or the algorithm failed
255  self.machinemachine.fail()
256  self.send_final_statesend_final_state(self.FAILEDFAILED)
257 
258 
260  """
261  Algorithm strategy to do run-by-run calibration of collected data.
262  Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
263  If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
264  the next run's data and tries again.
265 
266  Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
267  and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
268  is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
269  are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
270 
271  Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
272  This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration.
273 
274  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
275  a CalibrationAlgorithm C++ class directly.
276 """
277 
279  usable_params = {
280  "has_experiment_settings": bool,
281  "iov_coverage": IoV,
282  "step_size": int
283  }
284 
285 
286  allowed_granularities = ["run"]
287 
288  def __init__(self, algorithm):
289  """
290  """
291  super().__init__(algorithm)
292 
294  self.machinemachine = AlgorithmMachine(self.algorithmalgorithm)
295  if "step_size" not in self.algorithmalgorithm.params:
296  self.algorithmalgorithm.params["step_size"] = 1
297  self.first_executionfirst_execution = True
298 
299  def apply_experiment_settings(self, algorithm, experiment):
300  """
301  Apply experiment-dependent settings.
302  This is the default version, which does not do anything.
303  If necessary, it should be reimplemented by derived classes.
304  """
305  return
306 
307  def run(self, iov, iteration, queue):
308  """
309  Runs the algorithm machine over the collected data and fills the results.
310  """
311  if not self.is_valid():
312  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
313  self.queue = queue
314  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
315  # Now add all the necessary parameters for a strategy to run
316  machine_params = {}
317  machine_params["database_chain"] = self.database_chain
318  machine_params["dependent_databases"] = self.dependent_databases
319  machine_params["output_dir"] = self.output_dir
320  machine_params["output_database_dir"] = self.output_database_dir
321  machine_params["input_files"] = self.input_files
322  machine_params["ignored_runs"] = self.ignored_runs
323  self.machine.setup_from_dict(machine_params)
324  # Start moving through machine states
325  self.machine.setup_algorithm(iteration=iteration)
326  # After this point, the logging is in the stdout of the algorithm
327  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
328  runs_to_execute = []
329  all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
330  # If we were given a specific IoV to calibrate we just execute over runs in that IoV
331  if iov:
332  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
333  else:
334  runs_to_execute = all_runs_collected[:]
335 
336  # Remove the ignored runs from our run list to execute
337  if self.ignored_runs:
338  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
339  runs_to_execute.difference_update(set(self.ignored_runs))
340  # Sets aren't ordered so lets go back to lists and sort
341  runs_to_execute = sorted(runs_to_execute)
342 
343  # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
344  # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
345  # separately and prevent IoVs from crossing the boundary.
346  runs_to_execute = split_runs_by_exp(runs_to_execute)
347 
348  # Now iterate through the experiments, executing runs in blocks of 'step_size'. We DO NOT allow a payload IoV to
349  # extend over multiple experiments, only multiple runs
350  iov_coverage = None
351  if "iov_coverage" in self.algorithm.params:
352  B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
353  iov_coverage = self.algorithm.params["iov_coverage"]
354 
355  number_of_experiments = len(runs_to_execute)
356  # Iterate over experiment run lists
357  for i_exp, run_list in enumerate(runs_to_execute, start=1):
358 
359  # Apply experiment-dependent settings.
360  if "has_experiment_settings" in self.algorithm.params:
361  if self.algorithm.params["has_experiment_settings"]:
362  self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
363 
364  # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
365  # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
366  # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
367  # This is only true for the lowest and highest experiments in our input data.
368  # If we have multiple experiments the iov must be adjusted to avoid gaps at the iov boundaries
369  lowest_exprun = ExpRun(run_list[0].exp, 0)
370  highest_exprun = ExpRun(run_list[-1].exp, -1)
371 
372  if i_exp == 1:
373  lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low) if iov_coverage else run_list[0]
374  if i_exp == number_of_experiments:
375  highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high) if iov_coverage else run_list[-1]
376 
377  self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
378 
379  # Print any knowable gaps between result IoVs, if any are foun there is a problem.
380  gaps = self.find_iov_gaps()
381  # Dump them to a file for logging
382  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
383  json.dump(gaps, f)
384 
385  # If any results weren't successes we fail
386  if self.any_failed_iov():
387  self.send_final_state(self.FAILED)
388  else:
389  self.send_final_state(self.COMPLETED)
390 
391  def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
392  # The runs (data) we have left to execute from this run list
393  remaining_runs = run_list[:]
394  # The previous execution's runs
395  previous_runs = []
396  # The current runs we are executing
397  current_runs = []
398  # The last successful payload and result
399  last_successful_payloads = None
400  last_successful_result = None
401 
402  # Iterate over ExpRuns within an experiment in chunks of 'step_size'
403  for expruns in grouper(self.algorithm.params["step_size"], run_list):
404  # Already set up earlier the first time, so we shouldn't do it again
405  if not self.first_execution:
406  self.machine.setup_algorithm()
407  else:
408  self.first_execution = False
409 
410  # Add on the next step of runs
411  current_runs.extend(expruns)
412  # Remove them from our remaining runs
413  remaining_runs = [run for run in remaining_runs if run not in current_runs]
414 
415  # Is this the first payload of the experiment
416  if not last_successful_result:
417  B2INFO("Detected that this will be the first payload of this experiment.")
418  # If this is the first payload but we have other data, we need the IoV to cover from the
419  # lowest IoV extent requested up to the ExpRun right before the next run in the remaining runs list.
420  if remaining_runs:
421  apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
422  # If this is the first payload but there isn't more data, we set the IoV to cover the full range
423  else:
424  B2INFO("Detected that this will be the only payload of the experiment.")
425  apply_iov = IoV(*lowest_exprun, *highest_exprun)
426  # If there were previous successes
427  else:
428  if not remaining_runs:
429  B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.")
430  apply_iov = IoV(*current_runs[0], *highest_exprun)
431  # Othewise, it's just a normal IoV in the middle.
432  else:
433  B2INFO("Detected that there are more runs to execute in this experiment after this next execution.")
434  apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
435 
436  B2INFO(f"Executing and applying {apply_iov} to the payloads.")
437  self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
438  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
439 
440  # Does this count as a successful execution?
441  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
442  self.machine.complete()
443  # If we've succeeded but we have a previous success we can commit the previous payloads
444  # since we have new ones ready
445  if last_successful_payloads and last_successful_result:
446  B2INFO("Saving this execution's payloads to be committed later.")
447  # Save the payloads and result
448  new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
449  new_successful_result = self.machine.result
450  B2INFO("We just succeded in execution of the Algorithm."
451  f" Will now commit payloads from the previous success for {last_successful_result.iov}.")
452  self.machine.algorithm.algorithm.commit(last_successful_payloads)
453  self.results.append(last_successful_result)
454  self.send_result(last_successful_result)
455  # If there are remaining runs we need to have the current payloads ready to commit after the next execution
456  if remaining_runs:
457  last_successful_payloads = new_successful_payloads
458  last_successful_result = new_successful_result
459  # If there's not more runs to process we should also commit the new ones
460  else:
461  B2INFO("We have no more runs to process. "
462  f"Will now commit the most recent payloads for {new_successful_result.iov}.")
463  self.machine.algorithm.algorithm.commit(new_successful_payloads)
464  self.results.append(new_successful_result)
465  self.send_result(new_successful_result)
466  break
467  # if there's no previous success this must be the first run executed
468  else:
469  # Need to save payloads for later if we have a success but runs remain
470  if remaining_runs:
471  B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
472  # Save the payloads and result
473  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
474  last_successful_result = self.machine.result
475  # Need to commit and exit if we have a success and no remaining data
476  else:
477  B2INFO("We just succeeded in execution of the Algorithm."
478  " No runs left to be processed, so we are committing results of this execution.")
479  self.machine.algorithm.algorithm.commit()
480  self.results.append(self.machine.result)
481  self.send_result(self.machine.result)
482  break
483 
484  previous_runs = current_runs[:]
485  current_runs = []
486  # If it wasn't successful, was it due to lack of data in the runs?
487  elif (self.machine.result.result == AlgResult.not_enough_data.value):
488  B2INFO(f"There wasn't enough data in {self.machine.result.iov}.")
489  if remaining_runs:
490  B2INFO("Some runs remain to be processed. "
491  f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
492  elif not remaining_runs and not last_successful_result:
493  B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success."
494  " There wasn't enough data in the full input data requested.")
495  self.results.append(self.machine.result)
496  self.send_result(self.machine.result)
497  self.machine.fail()
498  break
499  elif not remaining_runs and last_successful_result:
500  B2INFO("There aren't any more runs remaining to merge with. But we had a previous success"
501  ", so we'll merge with the previous IoV.")
502  final_runs = current_runs[:]
503  current_runs = previous_runs
504  current_runs.extend(final_runs)
505  self.machine.fail()
506  elif self.machine.result.result == AlgResult.failure.value:
507  B2ERROR(f"{self.algorithm.name} returned failure exit code.")
508  self.results.append(self.machine.result)
509  self.send_result(self.machine.result)
510  self.machine.fail()
511  break
512  else:
513  # Check if we need to run a final execution on the previous execution + dangling set of runs
514  if current_runs:
515  self.machine.setup_algorithm()
516  apply_iov = IoV(last_successful_result.iov.exp_low,
517  last_successful_result.iov.run_low,
518  *highest_exprun)
519  B2INFO(f"Executing on {apply_iov}.")
520  self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
521  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
522  if (self.machine.result.result == AlgResult.ok.value) or (
523  self.machine.result.result == AlgResult.iterate.value):
524  self.machine.complete()
525  # Commit all the payloads and send out the results
526  self.machine.algorithm.algorithm.commit()
527  # Save the result
528  self.results.append(self.machine.result)
529  self.send_result(self.machine.result)
530  else:
531  # Save the result
532  self.results.append(self.machine.result)
533  self.send_result(self.machine.result)
534  # But failed
535  self.machine.fail()
536 
537 
539  """
540  Algorithm strategy to do run-by-run calibration of collected data.
541  Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
542 
543  This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
544  'not enough data' on the current run.
545 
546  Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
547  next run (if any are left).
548  Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
549 
550  .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
551  code.
552  The failure will prevent iteration/successful completion of the CAF though.
553 
554  .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
555  enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
556  The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
557  However, you will still have the database created that covers all the successfull runs.
558 
559  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
560  a CalibrationAlgorithm C++ class directly.
561 """
562 
563  allowed_granularities = ["run"]
564 
566  usable_params = {}
567 
568  def __init__(self, algorithm):
569  """
570  """
571  super().__init__(algorithm)
572 
574  self.machinemachine = AlgorithmMachine(self.algorithmalgorithm)
575 
576  def run(self, iov, iteration, queue):
577  """
578  Runs the algorithm machine over the collected data and fills the results.
579  """
580 
581  if not self.is_validis_valid():
582  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
583  self.queuequeuequeue = queue
584 
585  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
586  # Now add all the necessary parameters for a strategy to run
587  machine_params = {}
588  machine_params["database_chain"] = self.database_chaindatabase_chain
589  machine_params["dependent_databases"] = self.dependent_databasesdependent_databases
590  machine_params["output_dir"] = self.output_diroutput_dir
591  machine_params["output_database_dir"] = self.output_database_diroutput_database_dir
592  machine_params["input_files"] = self.input_filesinput_files
593  machine_params["ignored_runs"] = self.ignored_runsignored_runs
594  self.machinemachine.setup_from_dict(machine_params)
595  # Start moving through machine states
596  B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
597  self.machinemachine.setup_algorithm(iteration=iteration)
598  # After this point, the logging is in the stdout of the algorithm
599  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
600 
601  all_runs_collected = set(runs_from_vector(self.algorithmalgorithm.algorithm.getRunListFromAllData()))
602  # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
603  if iov:
604  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
605  else:
606  runs_to_execute = all_runs_collected
607 
608  # Remove the ignored runs from our run list to execute
609  if self.ignored_runsignored_runs:
610  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
611  runs_to_execute.difference_update(set(self.ignored_runsignored_runs))
612  # Sets aren't ordered so lets go back to lists and sort
613  runs_to_execute = sorted(runs_to_execute)
614 
615  # Is this the first time executing the algorithm?
616  first_execution = True
617  for exprun in runs_to_execute:
618  if not first_execution:
619  self.machinemachine.setup_algorithm()
620  current_runs = exprun
621  apply_iov = iov_from_runs([current_runs])
622  B2INFO(f"Executing on IoV = {apply_iov}.")
623  self.machinemachine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
624  first_execution = False
625  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
626  # Does this count as a successful execution?
627  if (self.machinemachine.result.result == AlgResult.ok.value) or (self.machinemachine.result.result == AlgResult.iterate.value):
628  # Commit the payloads and result
629  B2INFO(f"Committing payloads for {iov_from_runs([current_runs])}.")
630  self.machinemachine.algorithm.algorithm.commit()
631  self.resultsresults.append(self.machinemachine.result)
632  self.send_resultsend_result(self.machinemachine.result)
633  self.machinemachine.complete()
634  # If it wasn't successful, was it due to lack of data in the runs?
635  elif (self.machinemachine.result.result == AlgResult.not_enough_data.value):
636  B2INFO(f"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
637  self.resultsresults.append(self.machinemachine.result)
638  self.send_resultsend_result(self.machinemachine.result)
639  self.machinemachine.fail()
640  elif self.machinemachine.result.result == AlgResult.failure.value:
641  B2ERROR(f"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
642  self.resultsresults.append(self.machinemachine.result)
643  self.send_resultsend_result(self.machinemachine.result)
644  self.machinemachine.fail()
645 
646  # Print any knowable gaps between result IoVs, if any are foun there is a problem.
647  gaps = self.find_iov_gapsfind_iov_gaps()
648  # Dump them to a file for logging
649  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
650  json.dump(gaps, f)
651 
652  self.send_final_statesend_final_state(self.COMPLETEDCOMPLETED)
653 
654 
656  """
657  Algorithm strategy to first calculate run boundaries where execution should be attempted.
658  Runs the algorithm over the input data contained within the requested IoV of the boundaries,
659  starting with the first boundary data only.
660  If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
661  but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
662  but using run boundaries instead of runs directly.
663  Notice that boundaries cannot span multiple experiments.
664 
665  By default the algorithm will get the payload boundaries directly from the algorithm that need to
666  have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
667  is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
668  the need to define the ``isBoundaryRequired`` function.
669 
670  ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
671  experiment will be added if not already present. An empty list will thus produce a single payload for each
672  experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
673  behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
674  """
675 
677  usable_params = {
678  "iov_coverage": IoV,
679  "payload_boundaries": [] # [(exp1, run1), (exp2, run2), ...]
680  }
681 
682 
683  allowed_granularities = ["run"]
684 
685  def __init__(self, algorithm):
686  """
687  """
688  super().__init__(algorithm)
689 
691  self.machinemachine = AlgorithmMachine(self.algorithmalgorithm)
692  self.first_executionfirst_execution = True
693 
694  def run(self, iov, iteration, queue):
695  """
696  Runs the algorithm machine over the collected data and fills the results.
697  """
698  if not self.is_validis_valid():
699  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
700  self.queuequeuequeue = queue
701  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
702  # Now add all the necessary parameters for a strategy to run
703  machine_params = {}
704  machine_params["database_chain"] = self.database_chaindatabase_chain
705  machine_params["dependent_databases"] = self.dependent_databasesdependent_databases
706  machine_params["output_dir"] = self.output_diroutput_dir
707  machine_params["output_database_dir"] = self.output_database_diroutput_database_dir
708  machine_params["input_files"] = self.input_filesinput_files
709  machine_params["ignored_runs"] = self.ignored_runsignored_runs
710  self.machinemachine.setup_from_dict(machine_params)
711  # Start moving through machine states
712  self.machinemachine.setup_algorithm(iteration=iteration)
713  # After this point, the logging is in the stdout of the algorithm
714  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
715  runs_to_execute = []
716  all_runs_collected = runs_from_vector(self.algorithmalgorithm.algorithm.getRunListFromAllData())
717  # If we were given a specific IoV to calibrate we just execute over runs in that IoV
718  if iov:
719  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
720  else:
721  runs_to_execute = all_runs_collected[:]
722 
723  # Remove the ignored runs from our run list to execute
724  if self.ignored_runsignored_runs:
725  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
726  runs_to_execute.difference_update(set(self.ignored_runsignored_runs))
727  # Sets aren't ordered so lets go back to lists and sort
728  runs_to_execute = sorted(runs_to_execute)
729 
730  # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
731  # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
732  # separately and prevent IoVs from crossing the boundary.
733  runs_to_execute = split_runs_by_exp(runs_to_execute)
734 
735  # Now iterate through the experiments. We DO NOT allow a payload IoV to
736  # extend over multiple experiments, only multiple runs
737  iov_coverage = None
738  if "iov_coverage" in self.algorithmalgorithm.params:
739  B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
740  iov_coverage = self.algorithmalgorithm.params["iov_coverage"]
741 
742  payload_boundaries = None
743  if "payload_boundaries" in self.algorithmalgorithm.params:
744  B2INFO(f"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
745  payload_boundaries = self.algorithmalgorithm.params["payload_boundaries"]
746 
747  number_of_experiments = len(runs_to_execute)
748  B2INFO(f"We are iterating over {number_of_experiments} experiments.")
749 
750  # Iterate over experiment run lists
751  for i_exp, run_list in enumerate(runs_to_execute, start=1):
752  B2DEBUG(26, f"Run List for this experiment={run_list}")
753  current_experiment = run_list[0].exp
754  B2INFO(f"Executing over data from experiment {current_experiment}")
755  # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
756  # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
757  # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
758  # This is only true for the lowest and highest experiments in our input data.
759  if i_exp == 1:
760  if iov_coverage:
761  lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
762  else:
763  lowest_exprun = run_list[0]
764  # We are calibrating across multiple experiments so we shouldn't start from the middle but from the 0th run
765  else:
766  lowest_exprun = ExpRun(current_experiment, 0)
767 
768  # Override the normal value for the highest ExpRun (from data) if iov_coverage was set
769  if iov_coverage and i_exp == number_of_experiments:
770  highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
771  # If we have more experiments to execute then we wil be setting the final payload IoV in this experiment
772  # to be unbounded
773  elif i_exp < number_of_experiments:
774  highest_exprun = ExpRun(current_experiment, -1)
775  # Otherwise just get the values from data
776  else:
777  highest_exprun = run_list[-1]
778 
779  # Find the boundaries for this experiment's runs
780  vec_run_list = vector_from_runs(run_list)
781  if payload_boundaries is None:
782  # Find the boundaries using the findPayloadBoundaries implemented in the algorithm
783  B2INFO("Attempting to find payload boundaries.")
784  vec_boundaries = self.algorithmalgorithm.algorithm.findPayloadBoundaries(vec_run_list)
785  # If this vector is empty then that's bad. Maybe the isBoundaryRequired function
786  # wasn't implemented? Either way we should stop.
787  if vec_boundaries.empty():
788  B2ERROR("No boundaries found but we are in a strategy that requires them! Failing...")
789  # Tell the Runner that we have failed
790  self.send_final_statesend_final_state(self.FAILEDFAILED)
791  break
792  vec_boundaries = runs_from_vector(vec_boundaries)
793  else:
794  # Using boundaries set by user
795  B2INFO(f"Using as payload boundaries {payload_boundaries}.")
796  vec_boundaries = [ExpRun(exp, run) for exp, run in payload_boundaries]
797  # No need to check that vec_boundaries is not empty. In case it is we will anyway add
798  # a boundary at the first run of each experiment.
799  # Remove any boundaries not from the current experiment (only likely if they were set manually)
800  # We sort just to make everything easier later and just in case something mad happened.
801  run_boundaries = sorted([er for er in vec_boundaries if er.exp == current_experiment])
802  # In this strategy we consider separately each experiment. We then now check that the
803  # boundary (exp, 0) is present and if not we add it. It is indeed possible to miss it
804  # if the boundaries were given manually
805  first_exprun = ExpRun(current_experiment, 0)
806  if first_exprun not in run_boundaries:
807  B2WARNING(f"No boundary found at ({current_experiment}, 0), adding it.")
808  run_boundaries[0:0] = [first_exprun]
809  B2INFO((f"Found {len(run_boundaries)} boundaries for this experiment. "
810  "Checking if we have some data for all boundary IoVs..."))
811  # First figure out the run lists to use for each execution (potentially different from the applied IoVs)
812  # We use the boundaries and the run_list
813  boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
814  B2DEBUG(26, f"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
815  # If there were any boundary IoVs with no run data, just remove them. Otherwise they will execute over all data.
816  boundary_iovs_to_run_lists = {key: value for key, value in boundary_iovs_to_run_lists.items() if value}
817  B2DEBUG(26, f"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
818  # If any were removed then we might have gaps between the boundary IoVs. Fix those now by merging IoVs.
819  new_boundary_iovs_to_run_lists = {}
820  previous_boundary_iov = None
821  previous_boundary_run_list = None
822  for boundary_iov, run_list in boundary_iovs_to_run_lists.items():
823  if not previous_boundary_iov:
824  previous_boundary_iov = boundary_iov
825  previous_boundary_run_list = run_list
826  continue
827  # We are definitely dealiing with IoVs from one experiment so we can make assumptions here
828  if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
829  B2WARNING("Gap in boundary IoVs found before execution! "
830  "Will correct it by extending the previous boundary up to the next one.")
831  B2INFO(f"Original boundary IoV={previous_boundary_iov}")
832  previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
833  previous_boundary_iov.exp_high, boundary_iov.run_low-1)
834  B2INFO(f"New boundary IoV={previous_boundary_iov}")
835  new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
836  previous_boundary_iov = boundary_iov
837  previous_boundary_run_list = run_list
838  else:
839  new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
840  boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
841  B2DEBUG(26, f"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
842  # Actually execute now that we have an IoV list to apply
843  success = self.execute_over_boundariesexecute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
844  if not success:
845  # Tell the Runner that we have failed
846  self.send_final_statesend_final_state(self.FAILEDFAILED)
847  break
848  # Only executes if we didn't fail any experiment execution
849  else:
850  # Print any knowable gaps between result IoVs, if any are found there is a problem, but not necessarily too bad.
851  gaps = self.find_iov_gapsfind_iov_gaps()
852  if gaps:
853  B2WARNING("There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
854  # Dump them to a file for logging
855  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
856  json.dump(gaps, f)
857 
858  # If any results weren't successes we fail
859  if self.any_failed_iovany_failed_iov():
860  self.send_final_statesend_final_state(self.FAILEDFAILED)
861  else:
862  self.send_final_statesend_final_state(self.COMPLETEDCOMPLETED)
863 
864  def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration):
865  """
866  Take the previously found boundaries and the run lists they correspond to and actually perform the
867  Algorithm execution. This is assumed to be for a single experiment.
868  """
869  # Copy of boundary IoVs
870  remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
871 
872  # The current runs we are executing
873  current_runs = []
874  # The IoV of the current boundary(s)
875  current_boundary_iov = None
876  # The current execution's applied IoV, may be different to the boundary IoV
877  current_iov = None
878 
879  # The last successful payload list and result. We hold on to them so that we can commit or discard later.
880  last_successful_payloads = None
881  last_successful_result = None
882  # The previous execution's runs
883  last_successful_runs = []
884  # The previous execution's applied IoV
885  last_successful_iov = None
886 
887  while True:
888  # Do we have previous successes?
889  if not last_successful_result:
890  if not current_runs:
891  # Did we actually have any boundaries?
892  if not remaining_boundary_iovs:
893  # Fail because we have no boundaries to use
894  B2ERROR("No boundaries found for the current experiment's run list. Failing the strategy.")
895  return False
896 
897  B2INFO("This appears to be the first attempted execution of the experiment.")
898  # Attempt to execute on the first boundary
899  current_boundary_iov = remaining_boundary_iovs.pop(0)
900  current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
901  # What if there is only one boundary? Need to apply the highest exprun
902  if not remaining_boundary_iovs:
903  current_iov = IoV(*lowest_exprun, *highest_exprun)
904  else:
905  current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
906  # Returned not enough data from first execution
907  else:
908  # Any remaining boundaries?
909  if not remaining_boundary_iovs:
910  # Fail because we have no boundaries to use
911  B2ERROR("Not enough data found for the current experiment's run list. Failing the strategy.")
912  return False
913 
914  B2INFO("There wasn't enough data previously. Merging with the runs from the next boundary.")
915  # Extend the previous run lists/iovs
916  next_boundary_iov = remaining_boundary_iovs.pop(0)
917  current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
918  next_boundary_iov.exp_high, next_boundary_iov.run_high)
919  current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
920  # At the last boundary? Need to apply the highest exprun
921  if not remaining_boundary_iovs:
922  current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
923  else:
924  current_iov = IoV(current_iov.exp_low, current_iov.run_low,
925  current_boundary_iov.exp_high, current_boundary_iov.run_high)
926 
927  self.execute_runsexecute_runs(current_runs, iteration, current_iov)
928 
929  # Does this count as a successful execution?
930  if self.alg_successalg_success():
931  # Commit previous values we were holding onto
932  B2INFO("Found a success. Will save the payloads for later.")
933  # Save success
934  last_successful_payloads = self.machinemachine.algorithm.algorithm.getPayloadValues()
935  last_successful_result = self.machinemachine.result
936  last_successful_runs = current_runs[:]
937  last_successful_iov = current_iov
938  # Reset values for next loop
939  current_runs = []
940  current_boundary_iov = None
941  current_iov = None
942  self.machinemachine.complete()
943  continue
944  elif self.machinemachine.result.result == AlgResult.not_enough_data.value:
945  B2INFO("Not Enough Data result.")
946  # Just complete but leave the current runs alone for next loop
947  self.machinemachine.complete()
948  continue
949  else:
950  B2ERROR("Hit a failure or some kind of result we can't continue from. Failing out...")
951  self.machinemachine.fail()
952  return False
953  # Previous result exists
954  else:
955  # Previous loop was a success
956  if not current_runs:
957  # Remaining boundaries?
958  if not remaining_boundary_iovs:
959  # Out of data, can now commit
960  B2INFO("Finished this experiment's boundaries. "
961  f"Committing remaining payloads from {last_successful_result.iov}")
962  self.machinemachine.algorithm.algorithm.commit(last_successful_payloads)
963  self.resultsresults.append(last_successful_result)
964  self.send_resultsend_result(last_successful_result)
965  return True
966 
967  # Remaining boundaries exist so we try to execute
968  current_boundary_iov = remaining_boundary_iovs.pop(0)
969  current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
970  # What if there is only one boundary? Need to apply the highest exprun
971  if not remaining_boundary_iovs:
972  current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
973  else:
974  current_iov = current_boundary_iov
975 
976  # Returned not enough data from last execution
977  else:
978  # Any remaining boundaries?
979  if not remaining_boundary_iovs:
980  B2INFO("We have no remaining runs to increase the amount of data. "
981  "Instead we will merge with the previous successful runs.")
982  # Merge with previous success IoV
983  new_current_runs = last_successful_runs[:]
984  new_current_runs.extend(current_runs)
985  current_runs = new_current_runs[:]
986  current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
987  current_iov.exp_high, current_iov.run_high)
988  # We reset the last successful stuff because we are dropping it
989  last_successful_payloads = []
990  last_successful_result = None
991  last_successful_runs = []
992  last_successful_iov = None
993 
994  else:
995  B2INFO("Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
996  # Extend the previous run lists/iovs
997  next_boundary_iov = remaining_boundary_iovs.pop(0)
998  current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
999  next_boundary_iov.exp_high, next_boundary_iov.run_high)
1000  # Extend previous execution's runs with the next set
1001  current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
1002  # At the last boundary? Need to apply the highest exprun
1003  if not remaining_boundary_iovs:
1004  current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
1005  else:
1006  current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1007  current_boundary_iov.exp_high, current_boundary_iov.run_high)
1008 
1009  self.execute_runsexecute_runs(current_runs, iteration, current_iov)
1010 
1011  # Does this count as a successful execution?
1012  if self.alg_successalg_success():
1013  # Commit previous values we were holding onto
1014  B2INFO("Found a success.")
1015  if last_successful_result:
1016  B2INFO("Can now commit the previous success.")
1017  self.machinemachine.algorithm.algorithm.commit(last_successful_payloads)
1018  self.resultsresults.append(last_successful_result)
1019  self.send_resultsend_result(last_successful_result)
1020  # Replace last success
1021  last_successful_payloads = self.machinemachine.algorithm.algorithm.getPayloadValues()
1022  last_successful_result = self.machinemachine.result
1023  last_successful_runs = current_runs[:]
1024  last_successful_iov = current_iov
1025  # Reset values for next loop
1026  current_runs = []
1027  current_boundary_iov = None
1028  current_iov = None
1029  self.machinemachine.complete()
1030  continue
1031  elif self.machinemachine.result.result == AlgResult.not_enough_data.value:
1032  B2INFO("Not Enough Data result.")
1033  # Just complete but leave the current runs alone for next loop
1034  self.machinemachine.complete()
1035  continue
1036  else:
1037  B2ERROR("Hit a failure or some other result we can't continue from. Failing out...")
1038  self.machinemachine.fail()
1039  return False
1040 
1041  def execute_runs(self, runs, iteration, iov):
1042  # Already set up earlier the first time, so we shouldn't do it again
1043  if not self.first_executionfirst_execution:
1044  self.machinemachine.setup_algorithm()
1045  else:
1046  self.first_executionfirst_execution = False
1047 
1048  B2INFO(f"Executing and applying {iov} to the payloads.")
1049  self.machinemachine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1050  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
1051 
1052  def alg_success(self):
1053  return ((self.machinemachine.result.result == AlgResult.ok.value) or (self.machinemachine.result.result == AlgResult.iterate.value))
1054 
1055 
1056 class StrategyError(Exception):
1057  """
1058  Basic Exception for this type of class.
1059  """
list required_true_attrs
Attributes that must have a value that returns True when tested by :py:meth:is_valid.
Definition: strategies.py:59
output_database_dir
The output database directory for the localdb that the algorithm will commit to.
Definition: strategies.py:87
string COMPLETED
Completed state.
Definition: strategies.py:72
ignored_runs
Runs that will not be included in ANY execution of the algorithm.
Definition: strategies.py:94
def send_result(self, result)
Definition: strategies.py:173
input_files
Collector output files, will contain all files retured by the output patterns.
Definition: strategies.py:83
string FAILED
Failed state.
Definition: strategies.py:75
results
The list of results objects which will be sent out before the end.
Definition: strategies.py:96
def run(self, iov, iteration, queue)
Definition: strategies.py:101
def send_final_state(self, state)
Definition: strategies.py:176
list required_attrs
Required attributes that must exist before the strategy can run properly.
Definition: strategies.py:49
algorithm
Algorithm() class that we're running.
Definition: strategies.py:81
database_chain
User defined database chain i.e.
Definition: strategies.py:89
def setup_from_dict(self, params)
Definition: strategies.py:107
def __init__(self, algorithm)
Definition: strategies.py:77
dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
Definition: strategies.py:91
output_dir
The algorithm output directory which is mostly used to store the stdout file.
Definition: strategies.py:85
queue
The multiprocessing Queue we use to pass back results one at a time.
Definition: strategies.py:98
def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
Definition: strategies.py:864
def run(self, iov, iteration, queue)
Definition: strategies.py:694
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:691
def execute_runs(self, runs, iteration, iov)
Definition: strategies.py:1041
def __init__(self, algorithm)
Definition: strategies.py:685
def apply_experiment_settings(self, algorithm, experiment)
Definition: strategies.py:299
def run(self, iov, iteration, queue)
Definition: strategies.py:307
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:294
def __init__(self, algorithm)
Definition: strategies.py:288
def run(self, iov, iteration, queue)
Definition: strategies.py:576
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:574
def __init__(self, algorithm)
Definition: strategies.py:568
def run(self, iov, iteration, queue)
Definition: strategies.py:200
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It...
Definition: strategies.py:198
def __init__(self, algorithm)
Definition: strategies.py:192