Belle II Software  release-08-01-10
strategies.py
1 #!/usr/bin/env python3
2 
3 # disable doxygen check for this file
4 # @cond
5 
6 
13 
14 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
15 from caf.utils import AlgResult
16 from caf.utils import B2INFO_MULTILINE
17 from caf.utils import runs_overlapping_iov, runs_from_vector
18 from caf.utils import iov_from_runs, split_runs_by_exp, vector_from_runs
19 from caf.utils import find_gaps_in_iov_list, grouper, find_run_lists_from_boundaries
20 from caf.utils import IoV, ExpRun
21 from caf.state_machines import AlgorithmMachine
22 
23 from abc import ABC, abstractmethod
24 import json
25 
26 
27 class AlgorithmStrategy(ABC):
28  """
29  Base class for Algorithm strategies. These do the actual execution of a single
30  algorithm on collected data. Each strategy may be quite different in terms of how fast it may be,
31  how database payloads are passed between executions, and whether or not final payloads have an IoV
32  that is independent to the actual runs used to calculates them.
33 
34  Parameters:
35  algorithm (:py:class:`caf.framework.Algorithm`): The algorithm we will run
36 
37  This base class defines the basic attributes and methods that will be automatically used by the selected AlgorithmRunner.
38  When defining a derived class you are free to use these attributes or to implement as much functionality as you want.
39 
40  If you define your derived class with an __init__ method, then you should first call the base class
41  `AlgorithmStrategy.__init__()` method via super() e.g.
42 
43  >>> def __init__(self):
44  >>> super().__init__()
45 
46  The most important method to implement is :py:meth:`AlgorithmStrategy.run` which will take an algorithm and execute it
47  in the required way defined by the options you have selected/attributes set.
48  """
49 
51  required_attrs = ["algorithm",
52  "database_chain",
53  "dependent_databases",
54  "output_dir",
55  "output_database_dir",
56  "input_files",
57  "ignored_runs"
58  ]
59 
60 
61  required_true_attrs = ["algorithm",
62  "output_dir",
63  "output_database_dir",
64  "input_files"
65  ]
66 
67 
68  allowed_granularities = ["run", "all"]
69 
70 
71  FINISHED_RESULTS = "DONE"
72 
73 
74  COMPLETED = "COMPLETED"
75 
76 
77  FAILED = "FAILED"
78 
79  def __init__(self, algorithm):
80  """
81  """
82 
83  self.algorithm = algorithm
84 
85  self.input_files = []
86 
87  self.output_dir = ""
88 
89  self.output_database_dir = ""
90 
91  self.database_chain = []
92 
93  self.dependent_databases = []
94 
96  self.ignored_runs = []
97 
98  self.results = []
99 
100  self.queue = None
101 
102  @abstractmethod
103  def run(self, iov, iteration, queue):
104  """
105  Abstract method that needs to be implemented. It will be called to actually execute the
106  algorithm.
107  """
108 
109  def setup_from_dict(self, params):
110  """
111  Parameters:
112  params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
113  """
114  for attribute_name, value in params.items():
115  setattr(self, attribute_name, value)
116 
117  def is_valid(self):
118  """
119  Returns:
120  bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
121  """
122  B2INFO("Checking validity of current AlgorithmStrategy setup.")
123  # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
124  for attribute_name in self.required_attrs:
125  if not hasattr(self, attribute_name):
126  B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
127  return False
128  # Check if any attributes that need actual values haven't been set or were empty
129  for attribute_name in self.required_true_attrs:
130  if not getattr(self, attribute_name):
131  B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.")
132  return False
133  return True
134 
135  def find_iov_gaps(self):
136  """
137  Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
138  not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
139  within the same experiment are found.
140 
141  Returns:
142  iov_gaps(list[IoV])
143  """
144  iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.results]))
145  if iov_gaps:
146  gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."]
147  gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.")
148  gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs")
149  gap_msg.append("unless you edit the final database.txt yourself.")
150  B2INFO_MULTILINE(gap_msg)
151  for iov in iov_gaps:
152  B2INFO(f"{iov} not covered by any execution of the algorithm.")
153  return iov_gaps
154 
155  def any_failed_iov(self):
156  """
157  Returns:
158  bool: If any result in the current results list has a failed algorithm code we return True
159  """
160  failed_results = []
161  for result in self.results:
162  if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value:
163  failed_results.append(result)
164  if failed_results:
165  B2WARNING("Failed results found.")
166  for result in failed_results:
167  if result.result == AlgResult.failure.value:
168  B2ERROR(f"c_Failure returned for {result.iov}.")
169  elif result.result == AlgResult.not_enough_data.value:
170  B2WARNING(f"c_NotEnoughData returned for {result.iov}.")
171  return True
172  else:
173  return False
174 
175  def send_result(self, result):
176  self.queue.put({"type": "result", "value": result})
177 
178  def send_final_state(self, state):
179  self.queue.put({"type": "final_state", "value": state})
180 
181 
182 class SingleIOV(AlgorithmStrategy):
183  """The fastest and simplest Algorithm strategy. Runs the algorithm only once over all of the input
184  data or only the data corresponding to the requested IoV. The payload IoV is the set to the same as the one
185  that was executed.
186 
187  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
188  a CalibrationAlgorithm C++ class directly.
189 """
190 
192  usable_params = {"apply_iov": IoV}
193 
194  def __init__(self, algorithm):
195  """
196  """
197  super().__init__(algorithm)
198 
200  self.machine = AlgorithmMachine(self.algorithm)
201 
202  def run(self, iov, iteration, queue):
203  """
204  Runs the algorithm machine over the collected data and fills the results.
205  """
206  if not self.is_valid():
207  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
208  self.queue = queue
209 
210  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
211  # Now add all the necessary parameters for a strategy to run
212  machine_params = {}
213  machine_params["database_chain"] = self.database_chain
214  machine_params["dependent_databases"] = self.dependent_databases
215  machine_params["output_dir"] = self.output_dir
216  machine_params["output_database_dir"] = self.output_database_dir
217  machine_params["input_files"] = self.input_files
218  machine_params["ignored_runs"] = self.ignored_runs
219  self.machine.setup_from_dict(machine_params)
220  # Start moving through machine states
221  B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
222  self.machine.setup_algorithm(iteration=iteration)
223  # After this point, the logging is in the stdout of the algorithm
224  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
225 
226  all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
227  # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
228  if iov:
229  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
230  else:
231  runs_to_execute = all_runs_collected
232 
233  # Remove the ignored runs from our run list to execute
234  if self.ignored_runs:
235  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
236  runs_to_execute.difference_update(set(self.ignored_runs))
237  # Sets aren't ordered so lets go back to lists and sort
238  runs_to_execute = sorted(runs_to_execute)
239  apply_iov = None
240  if "apply_iov" in self.algorithm.params:
241  apply_iov = self.algorithm.params["apply_iov"]
242  self.machine.execute_runs(runs=runs_to_execute, iteration=iteration, apply_iov=apply_iov)
243  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
244 
245  # Send out the result to the runner
246  self.send_result(self.machine.result)
247 
248  # Make sure the algorithm state and commit is done
249  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
250  # Valid exit codes mean we can complete properly
251  self.machine.complete()
252  # Commit all the payloads and send out the results
253  self.machine.algorithm.algorithm.commit()
254  self.send_final_state(self.COMPLETED)
255  else:
256  # Either there wasn't enough data or the algorithm failed
257  self.machine.fail()
258  self.send_final_state(self.FAILED)
259 
260 
261 class SequentialRunByRun(AlgorithmStrategy):
262  """
263  Algorithm strategy to do run-by-run calibration of collected data.
264  Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
265  If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
266  the next run's data and tries again.
267 
268  Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
269  and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
270  is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
271  are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.
272 
273  Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
274  This means that there shouldn't be any IoVs that don't get a new payload by the end of runnning an iteration.
275 
276  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
277  a CalibrationAlgorithm C++ class directly.
278 """
279 
281  usable_params = {
282  "has_experiment_settings": bool,
283  "iov_coverage": IoV,
284  "step_size": int
285  }
286 
287 
288  allowed_granularities = ["run"]
289 
290  def __init__(self, algorithm):
291  """
292  """
293  super().__init__(algorithm)
294 
296  self.machine = AlgorithmMachine(self.algorithm)
297  if "step_size" not in self.algorithm.params:
298  self.algorithm.params["step_size"] = 1
299  self.first_execution = True
300 
301  def apply_experiment_settings(self, algorithm, experiment):
302  """
303  Apply experiment-dependent settings.
304  This is the default version, which does not do anything.
305  If necessary, it should be reimplemented by derived classes.
306  """
307  return
308 
309  def run(self, iov, iteration, queue):
310  """
311  Runs the algorithm machine over the collected data and fills the results.
312  """
313  if not self.is_valid():
314  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
315  self.queue = queue
316  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
317  # Now add all the necessary parameters for a strategy to run
318  machine_params = {}
319  machine_params["database_chain"] = self.database_chain
320  machine_params["dependent_databases"] = self.dependent_databases
321  machine_params["output_dir"] = self.output_dir
322  machine_params["output_database_dir"] = self.output_database_dir
323  machine_params["input_files"] = self.input_files
324  machine_params["ignored_runs"] = self.ignored_runs
325  self.machine.setup_from_dict(machine_params)
326  # Start moving through machine states
327  self.machine.setup_algorithm(iteration=iteration)
328  # After this point, the logging is in the stdout of the algorithm
329  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
330  runs_to_execute = []
331  all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
332  # If we were given a specific IoV to calibrate we just execute over runs in that IoV
333  if iov:
334  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
335  else:
336  runs_to_execute = all_runs_collected[:]
337 
338  # Remove the ignored runs from our run list to execute
339  if self.ignored_runs:
340  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
341  runs_to_execute.difference_update(set(self.ignored_runs))
342  # Sets aren't ordered so lets go back to lists and sort
343  runs_to_execute = sorted(runs_to_execute)
344 
345  # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
346  # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
347  # separately and prevent IoVs from crossing the boundary.
348  runs_to_execute = split_runs_by_exp(runs_to_execute)
349 
350  # Now iterate through the experiments, executing runs in blocks of 'step_size'. We DO NOT allow a payload IoV to
351  # extend over multiple experiments, only multiple runs
352  iov_coverage = None
353  if "iov_coverage" in self.algorithm.params:
354  B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
355  iov_coverage = self.algorithm.params["iov_coverage"]
356 
357  number_of_experiments = len(runs_to_execute)
358  # Iterate over experiment run lists
359  for i_exp, run_list in enumerate(runs_to_execute, start=1):
360 
361  # Apply experiment-dependent settings.
362  if "has_experiment_settings" in self.algorithm.params:
363  if self.algorithm.params["has_experiment_settings"]:
364  self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
365 
366  # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
367  # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
368  # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
369  # This is only true for the lowest and highest experiments in our input data.
370  # If we have multiple experiments the iov must be adjusted to avoid gaps at the iov boundaries
371  lowest_exprun = ExpRun(run_list[0].exp, 0)
372  highest_exprun = ExpRun(run_list[-1].exp, -1)
373 
374  if i_exp == 1:
375  lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low) if iov_coverage else run_list[0]
376  if i_exp == number_of_experiments:
377  highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high) if iov_coverage else run_list[-1]
378 
379  self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
380 
381  # Print any knowable gaps between result IoVs, if any are foun there is a problem.
382  gaps = self.find_iov_gaps()
383  # Dump them to a file for logging
384  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
385  json.dump(gaps, f)
386 
387  # If any results weren't successes we fail
388  if self.any_failed_iov():
389  self.send_final_state(self.FAILED)
390  else:
391  self.send_final_state(self.COMPLETED)
392 
393  def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
394  # The runs (data) we have left to execute from this run list
395  remaining_runs = run_list[:]
396  # The previous execution's runs
397  previous_runs = []
398  # The current runs we are executing
399  current_runs = []
400  # The last successful payload and result
401  last_successful_payloads = None
402  last_successful_result = None
403 
404  # Iterate over ExpRuns within an experiment in chunks of 'step_size'
405  for expruns in grouper(self.algorithm.params["step_size"], run_list):
406  # Already set up earlier the first time, so we shouldn't do it again
407  if not self.first_execution:
408  self.machine.setup_algorithm()
409  else:
410  self.first_execution = False
411 
412  # Add on the next step of runs
413  current_runs.extend(expruns)
414  # Remove them from our remaining runs
415  remaining_runs = [run for run in remaining_runs if run not in current_runs]
416 
417  # Is this the first payload of the experiment
418  if not last_successful_result:
419  B2INFO("Detected that this will be the first payload of this experiment.")
420  # If this is the first payload but we have other data, we need the IoV to cover from the
421  # lowest IoV extent requested up to the ExpRun right before the next run in the remaining runs list.
422  if remaining_runs:
423  apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
424  # If this is the first payload but there isn't more data, we set the IoV to cover the full range
425  else:
426  B2INFO("Detected that this will be the only payload of the experiment.")
427  apply_iov = IoV(*lowest_exprun, *highest_exprun)
428  # If there were previous successes
429  else:
430  if not remaining_runs:
431  B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.")
432  apply_iov = IoV(*current_runs[0], *highest_exprun)
433  # Othewise, it's just a normal IoV in the middle.
434  else:
435  B2INFO("Detected that there are more runs to execute in this experiment after this next execution.")
436  apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
437 
438  B2INFO(f"Executing and applying {apply_iov} to the payloads.")
439  self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
440  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
441 
442  # Does this count as a successful execution?
443  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
444  self.machine.complete()
445  # If we've succeeded but we have a previous success we can commit the previous payloads
446  # since we have new ones ready
447  if last_successful_payloads and last_successful_result:
448  B2INFO("Saving this execution's payloads to be committed later.")
449  # Save the payloads and result
450  new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
451  new_successful_result = self.machine.result
452  B2INFO("We just succeded in execution of the Algorithm."
453  f" Will now commit payloads from the previous success for {last_successful_result.iov}.")
454  self.machine.algorithm.algorithm.commit(last_successful_payloads)
455  self.results.append(last_successful_result)
456  self.send_result(last_successful_result)
457  # If there are remaining runs we need to have the current payloads ready to commit after the next execution
458  if remaining_runs:
459  last_successful_payloads = new_successful_payloads
460  last_successful_result = new_successful_result
461  # If there's not more runs to process we should also commit the new ones
462  else:
463  B2INFO("We have no more runs to process. "
464  f"Will now commit the most recent payloads for {new_successful_result.iov}.")
465  self.machine.algorithm.algorithm.commit(new_successful_payloads)
466  self.results.append(new_successful_result)
467  self.send_result(new_successful_result)
468  break
469  # if there's no previous success this must be the first run executed
470  else:
471  # Need to save payloads for later if we have a success but runs remain
472  if remaining_runs:
473  B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
474  # Save the payloads and result
475  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
476  last_successful_result = self.machine.result
477  # Need to commit and exit if we have a success and no remaining data
478  else:
479  B2INFO("We just succeeded in execution of the Algorithm."
480  " No runs left to be processed, so we are committing results of this execution.")
481  self.machine.algorithm.algorithm.commit()
482  self.results.append(self.machine.result)
483  self.send_result(self.machine.result)
484  break
485 
486  previous_runs = current_runs[:]
487  current_runs = []
488  # If it wasn't successful, was it due to lack of data in the runs?
489  elif (self.machine.result.result == AlgResult.not_enough_data.value):
490  B2INFO(f"There wasn't enough data in {self.machine.result.iov}.")
491  if remaining_runs:
492  B2INFO("Some runs remain to be processed. "
493  f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
494  elif not remaining_runs and not last_successful_result:
495  B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success."
496  " There wasn't enough data in the full input data requested.")
497  self.results.append(self.machine.result)
498  self.send_result(self.machine.result)
499  self.machine.fail()
500  break
501  elif not remaining_runs and last_successful_result:
502  B2INFO("There aren't any more runs remaining to merge with. But we had a previous success"
503  ", so we'll merge with the previous IoV.")
504  final_runs = current_runs[:]
505  current_runs = previous_runs
506  current_runs.extend(final_runs)
507  self.machine.fail()
508  elif self.machine.result.result == AlgResult.failure.value:
509  B2ERROR(f"{self.algorithm.name} returned failure exit code.")
510  self.results.append(self.machine.result)
511  self.send_result(self.machine.result)
512  self.machine.fail()
513  break
514  else:
515  # Check if we need to run a final execution on the previous execution + dangling set of runs
516  if current_runs:
517  self.machine.setup_algorithm()
518  apply_iov = IoV(last_successful_result.iov.exp_low,
519  last_successful_result.iov.run_low,
520  *highest_exprun)
521  B2INFO(f"Executing on {apply_iov}.")
522  self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
523  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
524  if (self.machine.result.result == AlgResult.ok.value) or (
525  self.machine.result.result == AlgResult.iterate.value):
526  self.machine.complete()
527  # Commit all the payloads and send out the results
528  self.machine.algorithm.algorithm.commit()
529  # Save the result
530  self.results.append(self.machine.result)
531  self.send_result(self.machine.result)
532  else:
533  # Save the result
534  self.results.append(self.machine.result)
535  self.send_result(self.machine.result)
536  # But failed
537  self.machine.fail()
538 
539 
540 class SimpleRunByRun(AlgorithmStrategy):
541  """
542  Algorithm strategy to do run-by-run calibration of collected data.
543  Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
544 
545  This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
546  'not enough data' on the current run.
547 
548  Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
549  next run (if any are left).
550  Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.
551 
552  .. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
553  code.
554  The failure will prevent iteration/successful completion of the CAF though.
555 
556  .. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
557  enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
558  The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
559  However, you will still have the database created that covers all the successfull runs.
560 
561  This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
562  a CalibrationAlgorithm C++ class directly.
563 """
564 
565  allowed_granularities = ["run"]
566 
568  usable_params = {}
569 
570  def __init__(self, algorithm):
571  """
572  """
573  super().__init__(algorithm)
574 
576  self.machine = AlgorithmMachine(self.algorithm)
577 
578  def run(self, iov, iteration, queue):
579  """
580  Runs the algorithm machine over the collected data and fills the results.
581  """
582 
583  if not self.is_valid():
584  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
585  self.queue = queue
586 
587  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
588  # Now add all the necessary parameters for a strategy to run
589  machine_params = {}
590  machine_params["database_chain"] = self.database_chain
591  machine_params["dependent_databases"] = self.dependent_databases
592  machine_params["output_dir"] = self.output_dir
593  machine_params["output_database_dir"] = self.output_database_dir
594  machine_params["input_files"] = self.input_files
595  machine_params["ignored_runs"] = self.ignored_runs
596  self.machine.setup_from_dict(machine_params)
597  # Start moving through machine states
598  B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
599  self.machine.setup_algorithm(iteration=iteration)
600  # After this point, the logging is in the stdout of the algorithm
601  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
602 
603  all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
604  # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
605  if iov:
606  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
607  else:
608  runs_to_execute = all_runs_collected
609 
610  # Remove the ignored runs from our run list to execute
611  if self.ignored_runs:
612  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
613  runs_to_execute.difference_update(set(self.ignored_runs))
614  # Sets aren't ordered so lets go back to lists and sort
615  runs_to_execute = sorted(runs_to_execute)
616 
617  # Is this the first time executing the algorithm?
618  first_execution = True
619  for exprun in runs_to_execute:
620  if not first_execution:
621  self.machine.setup_algorithm()
622  current_runs = exprun
623  apply_iov = iov_from_runs([current_runs])
624  B2INFO(f"Executing on IoV = {apply_iov}.")
625  self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
626  first_execution = False
627  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
628  # Does this count as a successful execution?
629  if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
630  # Commit the payloads and result
631  B2INFO(f"Committing payloads for {iov_from_runs([current_runs])}.")
632  self.machine.algorithm.algorithm.commit()
633  self.results.append(self.machine.result)
634  self.send_result(self.machine.result)
635  self.machine.complete()
636  # If it wasn't successful, was it due to lack of data in the runs?
637  elif (self.machine.result.result == AlgResult.not_enough_data.value):
638  B2INFO(f"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
639  self.results.append(self.machine.result)
640  self.send_result(self.machine.result)
641  self.machine.fail()
642  elif self.machine.result.result == AlgResult.failure.value:
643  B2ERROR(f"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
644  self.results.append(self.machine.result)
645  self.send_result(self.machine.result)
646  self.machine.fail()
647 
648  # Print any knowable gaps between result IoVs, if any are foun there is a problem.
649  gaps = self.find_iov_gaps()
650  # Dump them to a file for logging
651  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
652  json.dump(gaps, f)
653 
654  self.send_final_state(self.COMPLETED)
655 
656 
657 class SequentialBoundaries(AlgorithmStrategy):
658  """
659  Algorithm strategy to first calculate run boundaries where execution should be attempted.
660  Runs the algorithm over the input data contained within the requested IoV of the boundaries,
661  starting with the first boundary data only.
662  If the algorithm returns 'not enough data' on the current boundary IoV, it won't commit the payloads,
663  but instead adds the next boundarie's data and tries again. Basically the same logic as `SequentialRunByRun`
664  but using run boundaries instead of runs directly.
665  Notice that boundaries cannot span multiple experiments.
666 
667  By default the algorithm will get the payload boundaries directly from the algorithm that need to
668  have inplemented the function ``isBoundaryRequired``. If the desired boundaries are already known it
669  is possible to pass them directly setting the algorithm parameter ``payload_boundaries`` and avoid
670  the need to define the ``isBoundaryRequired`` function.
671 
672  ``payload_boundaries`` is a list ``[(exp1, run1), (exp2, run2), ...]``. A boundary at the beginning of each
673  experiment will be added if not already present. An empty list will thus produce a single payload for each
674  experiment. A ``payload_boundaries`` set to ``None`` is equivalent to not passing it and restores the default
675  behaviour where the boundaries are computed in the ``isBoundaryRequired`` function of the algorithm.
676  """
677 
679  usable_params = {
680  "iov_coverage": IoV,
681  "payload_boundaries": [] # [(exp1, run1), (exp2, run2), ...]
682  }
683 
684 
685  allowed_granularities = ["run"]
686 
687  def __init__(self, algorithm):
688  """
689  """
690  super().__init__(algorithm)
691 
693  self.machine = AlgorithmMachine(self.algorithm)
694  self.first_execution = True
695 
696  def run(self, iov, iteration, queue):
697  """
698  Runs the algorithm machine over the collected data and fills the results.
699  """
700  if not self.is_valid():
701  raise StrategyError("This AlgorithmStrategy was not set up correctly!")
702  self.queue = queue
703  B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
704  # Now add all the necessary parameters for a strategy to run
705  machine_params = {}
706  machine_params["database_chain"] = self.database_chain
707  machine_params["dependent_databases"] = self.dependent_databases
708  machine_params["output_dir"] = self.output_dir
709  machine_params["output_database_dir"] = self.output_database_dir
710  machine_params["input_files"] = self.input_files
711  machine_params["ignored_runs"] = self.ignored_runs
712  self.machine.setup_from_dict(machine_params)
713  # Start moving through machine states
714  self.machine.setup_algorithm(iteration=iteration)
715  # After this point, the logging is in the stdout of the algorithm
716  B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
717  runs_to_execute = []
718  all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
719  # If we were given a specific IoV to calibrate we just execute over runs in that IoV
720  if iov:
721  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
722  else:
723  runs_to_execute = all_runs_collected[:]
724 
725  # Remove the ignored runs from our run list to execute
726  if self.ignored_runs:
727  B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
728  runs_to_execute.difference_update(set(self.ignored_runs))
729  # Sets aren't ordered so lets go back to lists and sort
730  runs_to_execute = sorted(runs_to_execute)
731 
732  # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
733  # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
734  # separately and prevent IoVs from crossing the boundary.
735  runs_to_execute = split_runs_by_exp(runs_to_execute)
736 
737  # Now iterate through the experiments. We DO NOT allow a payload IoV to
738  # extend over multiple experiments, only multiple runs
739  iov_coverage = None
740  if "iov_coverage" in self.algorithm.params:
741  B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
742  iov_coverage = self.algorithm.params["iov_coverage"]
743 
744  payload_boundaries = None
745  if "payload_boundaries" in self.algorithm.params:
746  B2INFO(f"Detected that you have set payload_boundaries to {self.algorithm.params['payload_boundaries']}.")
747  payload_boundaries = self.algorithm.params["payload_boundaries"]
748 
749  number_of_experiments = len(runs_to_execute)
750  B2INFO(f"We are iterating over {number_of_experiments} experiments.")
751 
752  # Iterate over experiment run lists
753  for i_exp, run_list in enumerate(runs_to_execute, start=1):
754  B2DEBUG(26, f"Run List for this experiment={run_list}")
755  current_experiment = run_list[0].exp
756  B2INFO(f"Executing over data from experiment {current_experiment}")
757  # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
758  # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
759  # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
760  # This is only true for the lowest and highest experiments in our input data.
761  if i_exp == 1:
762  if iov_coverage:
763  lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low)
764  else:
765  lowest_exprun = run_list[0]
766  # We are calibrating across multiple experiments so we shouldn't start from the middle but from the 0th run
767  else:
768  lowest_exprun = ExpRun(current_experiment, 0)
769 
770  # Override the normal value for the highest ExpRun (from data) if iov_coverage was set
771  if iov_coverage and i_exp == number_of_experiments:
772  highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high)
773  # If we have more experiments to execute then we wil be setting the final payload IoV in this experiment
774  # to be unbounded
775  elif i_exp < number_of_experiments:
776  highest_exprun = ExpRun(current_experiment, -1)
777  # Otherwise just get the values from data
778  else:
779  highest_exprun = run_list[-1]
780 
781  # Find the boundaries for this experiment's runs
782  vec_run_list = vector_from_runs(run_list)
783  if payload_boundaries is None:
784  # Find the boundaries using the findPayloadBoundaries implemented in the algorithm
785  B2INFO("Attempting to find payload boundaries.")
786  vec_boundaries = self.algorithm.algorithm.findPayloadBoundaries(vec_run_list)
787  # If this vector is empty then that's bad. Maybe the isBoundaryRequired function
788  # wasn't implemented? Either way we should stop.
789  if vec_boundaries.empty():
790  B2ERROR("No boundaries found but we are in a strategy that requires them! Failing...")
791  # Tell the Runner that we have failed
792  self.send_final_state(self.FAILED)
793  break
794  vec_boundaries = runs_from_vector(vec_boundaries)
795  else:
796  # Using boundaries set by user
797  B2INFO(f"Using as payload boundaries {payload_boundaries}.")
798  vec_boundaries = [ExpRun(exp, run) for exp, run in payload_boundaries]
799  # No need to check that vec_boundaries is not empty. In case it is we will anyway add
800  # a boundary at the first run of each experiment.
801  # Remove any boundaries not from the current experiment (only likely if they were set manually)
802  # We sort just to make everything easier later and just in case something mad happened.
803  run_boundaries = sorted([er for er in vec_boundaries if er.exp == current_experiment])
804  # In this strategy we consider separately each experiment. We then now check that the
805  # boundary (exp, 0) is present and if not we add it. It is indeed possible to miss it
806  # if the boundaries were given manually
807  first_exprun = ExpRun(current_experiment, 0)
808  if first_exprun not in run_boundaries:
809  B2WARNING(f"No boundary found at ({current_experiment}, 0), adding it.")
810  run_boundaries[0:0] = [first_exprun]
811  B2INFO(f"Found {len(run_boundaries)} boundaries for this experiment. "
812  "Checking if we have some data for all boundary IoVs...")
813  # First figure out the run lists to use for each execution (potentially different from the applied IoVs)
814  # We use the boundaries and the run_list
815  boundary_iovs_to_run_lists = find_run_lists_from_boundaries(run_boundaries, run_list)
816  B2DEBUG(26, f"Boundary IoVs before checking data = {boundary_iovs_to_run_lists}")
817  # If there were any boundary IoVs with no run data, just remove them. Otherwise they will execute over all data.
818  boundary_iovs_to_run_lists = {key: value for key, value in boundary_iovs_to_run_lists.items() if value}
819  B2DEBUG(26, f"Boundary IoVs after checking data = {boundary_iovs_to_run_lists}")
820  # If any were removed then we might have gaps between the boundary IoVs. Fix those now by merging IoVs.
821  new_boundary_iovs_to_run_lists = {}
822  previous_boundary_iov = None
823  previous_boundary_run_list = None
824  for boundary_iov, run_list in boundary_iovs_to_run_lists.items():
825  if not previous_boundary_iov:
826  previous_boundary_iov = boundary_iov
827  previous_boundary_run_list = run_list
828  continue
829  # We are definitely dealiing with IoVs from one experiment so we can make assumptions here
830  if previous_boundary_iov.run_high != (boundary_iov.run_low-1):
831  B2WARNING("Gap in boundary IoVs found before execution! "
832  "Will correct it by extending the previous boundary up to the next one.")
833  B2INFO(f"Original boundary IoV={previous_boundary_iov}")
834  previous_boundary_iov = IoV(previous_boundary_iov.exp_low, previous_boundary_iov.run_low,
835  previous_boundary_iov.exp_high, boundary_iov.run_low-1)
836  B2INFO(f"New boundary IoV={previous_boundary_iov}")
837  new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
838  previous_boundary_iov = boundary_iov
839  previous_boundary_run_list = run_list
840  else:
841  new_boundary_iovs_to_run_lists[previous_boundary_iov] = previous_boundary_run_list
842  boundary_iovs_to_run_lists = new_boundary_iovs_to_run_lists
843  B2DEBUG(26, f"Boundary IoVs after fixing gaps = {boundary_iovs_to_run_lists}")
844  # Actually execute now that we have an IoV list to apply
845  success = self.execute_over_boundaries(boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration)
846  if not success:
847  # Tell the Runner that we have failed
848  self.send_final_state(self.FAILED)
849  break
850  # Only executes if we didn't fail any experiment execution
851  else:
852  # Print any knowable gaps between result IoVs, if any are found there is a problem, but not necessarily too bad.
853  gaps = self.find_iov_gaps()
854  if gaps:
855  B2WARNING("There were gaps between the output IoV payloads! See the JSON file in the algorithm output directory.")
856  # Dump them to a file for logging
857  with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
858  json.dump(gaps, f)
859 
860  # If any results weren't successes we fail
861  if self.any_failed_iov():
862  self.send_final_state(self.FAILED)
863  else:
864  self.send_final_state(self.COMPLETED)
865 
866  def execute_over_boundaries(self, boundary_iovs_to_run_lists, lowest_exprun, highest_exprun, iteration):
867  """
868  Take the previously found boundaries and the run lists they correspond to and actually perform the
869  Algorithm execution. This is assumed to be for a single experiment.
870  """
871  # Copy of boundary IoVs
872  remaining_boundary_iovs = sorted(list(boundary_iovs_to_run_lists.keys())[:])
873 
874  # The current runs we are executing
875  current_runs = []
876  # The IoV of the current boundary(s)
877  current_boundary_iov = None
878  # The current execution's applied IoV, may be different to the boundary IoV
879  current_iov = None
880 
881  # The last successful payload list and result. We hold on to them so that we can commit or discard later.
882  last_successful_payloads = None
883  last_successful_result = None
884  # The previous execution's runs
885  last_successful_runs = []
886  # The previous execution's applied IoV
887  last_successful_iov = None
888 
889  while True:
890  # Do we have previous successes?
891  if not last_successful_result:
892  if not current_runs:
893  # Did we actually have any boundaries?
894  if not remaining_boundary_iovs:
895  # Fail because we have no boundaries to use
896  B2ERROR("No boundaries found for the current experiment's run list. Failing the strategy.")
897  return False
898 
899  B2INFO("This appears to be the first attempted execution of the experiment.")
900  # Attempt to execute on the first boundary
901  current_boundary_iov = remaining_boundary_iovs.pop(0)
902  current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
903  # What if there is only one boundary? Need to apply the highest exprun
904  if not remaining_boundary_iovs:
905  current_iov = IoV(*lowest_exprun, *highest_exprun)
906  else:
907  current_iov = IoV(*lowest_exprun, current_boundary_iov.exp_high, current_boundary_iov.run_high)
908  # Returned not enough data from first execution
909  else:
910  # Any remaining boundaries?
911  if not remaining_boundary_iovs:
912  # Fail because we have no boundaries to use
913  B2ERROR("Not enough data found for the current experiment's run list. Failing the strategy.")
914  return False
915 
916  B2INFO("There wasn't enough data previously. Merging with the runs from the next boundary.")
917  # Extend the previous run lists/iovs
918  next_boundary_iov = remaining_boundary_iovs.pop(0)
919  current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
920  next_boundary_iov.exp_high, next_boundary_iov.run_high)
921  current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
922  # At the last boundary? Need to apply the highest exprun
923  if not remaining_boundary_iovs:
924  current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
925  else:
926  current_iov = IoV(current_iov.exp_low, current_iov.run_low,
927  current_boundary_iov.exp_high, current_boundary_iov.run_high)
928 
929  self.execute_runs(current_runs, iteration, current_iov)
930 
931  # Does this count as a successful execution?
932  if self.alg_success():
933  # Commit previous values we were holding onto
934  B2INFO("Found a success. Will save the payloads for later.")
935  # Save success
936  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
937  last_successful_result = self.machine.result
938  last_successful_runs = current_runs[:]
939  last_successful_iov = current_iov
940  # Reset values for next loop
941  current_runs = []
942  current_boundary_iov = None
943  current_iov = None
944  self.machine.complete()
945  continue
946  elif self.machine.result.result == AlgResult.not_enough_data.value:
947  B2INFO("Not Enough Data result.")
948  # Just complete but leave the current runs alone for next loop
949  self.machine.complete()
950  continue
951  else:
952  B2ERROR("Hit a failure or some kind of result we can't continue from. Failing out...")
953  self.machine.fail()
954  return False
955  # Previous result exists
956  else:
957  # Previous loop was a success
958  if not current_runs:
959  # Remaining boundaries?
960  if not remaining_boundary_iovs:
961  # Out of data, can now commit
962  B2INFO("Finished this experiment's boundaries. "
963  f"Committing remaining payloads from {last_successful_result.iov}")
964  self.machine.algorithm.algorithm.commit(last_successful_payloads)
965  self.results.append(last_successful_result)
966  self.send_result(last_successful_result)
967  return True
968 
969  # Remaining boundaries exist so we try to execute
970  current_boundary_iov = remaining_boundary_iovs.pop(0)
971  current_runs = boundary_iovs_to_run_lists[current_boundary_iov]
972  # What if there is only one boundary? Need to apply the highest exprun
973  if not remaining_boundary_iovs:
974  current_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low, *highest_exprun)
975  else:
976  current_iov = current_boundary_iov
977 
978  # Returned not enough data from last execution
979  else:
980  # Any remaining boundaries?
981  if not remaining_boundary_iovs:
982  B2INFO("We have no remaining runs to increase the amount of data. "
983  "Instead we will merge with the previous successful runs.")
984  # Merge with previous success IoV
985  new_current_runs = last_successful_runs[:]
986  new_current_runs.extend(current_runs)
987  current_runs = new_current_runs[:]
988  current_iov = IoV(last_successful_iov.exp_low, last_successful_iov.run_low,
989  current_iov.exp_high, current_iov.run_high)
990  # We reset the last successful stuff because we are dropping it
991  last_successful_payloads = []
992  last_successful_result = None
993  last_successful_runs = []
994  last_successful_iov = None
995 
996  else:
997  B2INFO("Since there wasn't enough data previously, we will merge with the runs from the next boundary.")
998  # Extend the previous run lists/iovs
999  next_boundary_iov = remaining_boundary_iovs.pop(0)
1000  current_boundary_iov = IoV(current_boundary_iov.exp_low, current_boundary_iov.run_low,
1001  next_boundary_iov.exp_high, next_boundary_iov.run_high)
1002  # Extend previous execution's runs with the next set
1003  current_runs.extend(boundary_iovs_to_run_lists[next_boundary_iov])
1004  # At the last boundary? Need to apply the highest exprun
1005  if not remaining_boundary_iovs:
1006  current_iov = IoV(current_iov.exp_low, current_iov.run_low, *highest_exprun)
1007  else:
1008  current_iov = IoV(current_iov.exp_low, current_iov.run_low,
1009  current_boundary_iov.exp_high, current_boundary_iov.run_high)
1010 
1011  self.execute_runs(current_runs, iteration, current_iov)
1012 
1013  # Does this count as a successful execution?
1014  if self.alg_success():
1015  # Commit previous values we were holding onto
1016  B2INFO("Found a success.")
1017  if last_successful_result:
1018  B2INFO("Can now commit the previous success.")
1019  self.machine.algorithm.algorithm.commit(last_successful_payloads)
1020  self.results.append(last_successful_result)
1021  self.send_result(last_successful_result)
1022  # Replace last success
1023  last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
1024  last_successful_result = self.machine.result
1025  last_successful_runs = current_runs[:]
1026  last_successful_iov = current_iov
1027  # Reset values for next loop
1028  current_runs = []
1029  current_boundary_iov = None
1030  current_iov = None
1031  self.machine.complete()
1032  continue
1033  elif self.machine.result.result == AlgResult.not_enough_data.value:
1034  B2INFO("Not Enough Data result.")
1035  # Just complete but leave the current runs alone for next loop
1036  self.machine.complete()
1037  continue
1038  else:
1039  B2ERROR("Hit a failure or some other result we can't continue from. Failing out...")
1040  self.machine.fail()
1041  return False
1042 
1043  def execute_runs(self, runs, iteration, iov):
1044  # Already set up earlier the first time, so we shouldn't do it again
1045  if not self.first_execution:
1046  self.machine.setup_algorithm()
1047  else:
1048  self.first_execution = False
1049 
1050  B2INFO(f"Executing and applying {iov} to the payloads.")
1051  self.machine.execute_runs(runs=runs, iteration=iteration, apply_iov=iov)
1052  B2INFO(f"Finished execution with result code {self.machine.result.result}.")
1053 
1054  def alg_success(self):
1055  return ((self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value))
1056 
1057 
1058 class StrategyError(Exception):
1059  """
1060  Basic Exception for this type of class.
1061  """
1062 
1063 # @endcond