Belle II Software  release-08-01-10
state_machines.py
1 #!/usr/bin/env python3
2 
3 # disable doxygen check for this file
4 # @cond
5 
6 
13 
14 import basf2
15 
16 from functools import partial
17 from collections import defaultdict
18 
19 import pickle
20 import glob
21 import shutil
22 import time
23 from pathlib import Path
24 import os
25 import json
26 
27 from basf2 import create_path
28 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
29 from basf2 import conditions as b2conditions
30 from basf2.pickle_path import serialize_path
31 
32 from ROOT.Belle2 import CalibrationAlgorithm
33 
34 from caf.utils import create_directories
35 from caf.utils import method_dispatch
36 from caf.utils import iov_from_runs
37 from caf.utils import IoV_Result
38 from caf.utils import get_iov_from_file
39 from caf.backends import Job
40 from caf.runners import AlgorithmsRunner
41 
42 
43 class State():
44  """
45  Basic State object that can take enter and exit state methods and records
46  the state of a machine.
47 
48  You should assign the self.on_enter or self.on_exit attributes to callback functions
49  or lists of them, if you need them.
50  """
51 
52  def __init__(self, name, enter=None, exit=None):
53  """
54  Initialise State with a name and optional lists of callbacks.
55  """
56 
57  self.name = name
58 
59  self.on_enter = enter
60 
61  self.on_exit = exit
62 
63  @property
64  def on_enter(self):
65  """
66  Runs callbacks when a state is entered.
67  """
68  return self._on_enter
69 
70  @on_enter.setter
71  def on_enter(self, callbacks):
72  """
73  """
74  self._on_enter = []
75  if callbacks:
76  self._add_callbacks(callbacks, self._on_enter)
77 
78  @property
79  def on_exit(self):
80  """
81  Runs callbacks when a state is exited.
82  """
83  return self._on_exit
84 
85  @on_exit.setter
86  def on_exit(self, callbacks):
87  """
88  """
89  self._on_exit = []
90  if callbacks:
91  self._add_callbacks(callbacks, self._on_exit)
92 
93  @method_dispatch
94  def _add_callbacks(self, callback, attribute):
95  """
96  Adds callback to a property.
97  """
98  if callable(callback):
99  attribute.append(callback)
100  else:
101  B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
102 
103  @_add_callbacks.register(tuple)
104  @_add_callbacks.register(list)
105  def _(self, callbacks, attribute):
106  """
107  Alternate method for lists and tuples of function objects.
108  """
109  if callbacks:
110  for function in callbacks:
111  if callable(function):
112  attribute.append(function)
113  else:
114  B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
115 
116  def __str__(self):
117  """
118  """
119  return self.name
120 
121  def __repr__(self):
122  """
123  """
124  return f"State(name={self.name})"
125 
126  def __eq__(self, other):
127  """
128  """
129  if isinstance(other, str):
130  return self.name == other
131  else:
132  return self.name == other.name
133 
134  def __hash__(self):
135  """
136  """
137  return hash(self.name)
138 
139 
140 class Machine():
141  """
142  Parameters:
143  states (list[str]): A list of possible states of the machine.
144  initial_state (str):
145 
146  Base class for a final state machine wrapper.
147  Implements the framwork that a more complex machine can inherit from.
148 
149  The `transitions` attribute is a dictionary of trigger name keys, each value of
150  which is another dictionary of 'source' states, 'dest' states, and 'conditions'
151  methods. 'conditions' should be a list of callables or a single one. A transition is
152  valid if it goes from an allowed state to an allowed state.
153  Conditions are optional but must be a callable that returns True or False based
154  on some state of the machine. They cannot have input arguments currently.
155 
156  Every condition/before/after callback function MUST take ``**kwargs`` as the only
157  argument (except ``self`` if it's a class method). This is because it's basically
158  impossible to determine which arguments to pass to which functions for a transition.
159  Therefore this machine just enforces that every function should simply take ``**kwargs``
160  and use the dictionary of arguments (even if it doesn't need any arguments).
161 
162  This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
163  you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
164  will *not* work.
165  """
166 
167  def __init__(self, states=None, initial_state="default_initial"):
168  """
169  Basic Setup of states and initial_state.
170  """
171 
172  self.states = {}
173  if states:
174  for state in states:
175  self.add_state(state)
176  if initial_state != "default_initial":
177 
178  self.initial_state = initial_state
179  else:
180  self.add_state(initial_state)
181 
182  self._initial_state = State(initial_state)
183 
184 
185  self._state = self.initial_state
186 
187  self.transitions = defaultdict(list)
188 
189  def add_state(self, state, enter=None, exit=None):
190  """
191  Adds a single state to the list of possible ones.
192  Should be a unique string or a State object with a unique name.
193  """
194  if isinstance(state, str):
195  self.add_state(State(state, enter, exit))
196  elif isinstance(state, State):
197  if state.name not in self.states.keys():
198  self.states[state.name] = state
199  else:
200  B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
201  else:
202  B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
203 
204  @property
205  def initial_state(self):
206  """
207  The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
208  """
209  return self._initial_state
210 
211  @initial_state.setter
212  def initial_state(self, state):
213  """
214  """
215  if state in self.states.keys():
216  self._initial_state = self.states[state]
217 
218  self._state = self.states[state]
219  else:
220  raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
221 
222  @property
223  def state(self):
224  """
225  The current state of the machine. Actually a `property` decorator. It will call the exit method of the
226  current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
227  either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
228  """
229  return self._state
230 
231  @state.setter
232  def state(self, state):
233  """
234  """
235  if isinstance(state, str):
236  state_name = state
237  else:
238  state_name = state.name
239 
240  try:
241  state = self.states[state_name]
242  # Run exit callbacks of current state
243  for callback in self.state.on_exit:
244  callback(prior_state=self.state, new_state=state)
245  # Run enter callbacks of new state
246  for callback in state.on_enter:
247  callback(prior_state=self.state, new_state=state)
248  # Set the state
249  self._state = state
250  except KeyError:
251  raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
252 
253  @staticmethod
254  def default_condition(**kwargs):
255  """
256  Method to always return True.
257  """
258  return True
259 
260  def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
261  """
262  Adds a single transition to the dictionary of possible ones.
263  Trigger is the method name that begins the transtion between the
264  source state and the destination state.
265 
266  The condition is an optional function that returns True or False
267  depending on the current state/input.
268  """
269  transition_dict = {}
270  try:
271  source = self.states[source]
272  dest = self.states[dest]
273  transition_dict["source"] = source
274  transition_dict["dest"] = dest
275  except KeyError as err:
276  B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
277  raise err
278  if conditions:
279  if isinstance(conditions, (list, tuple, set)):
280  transition_dict["conditions"] = list(conditions)
281  else:
282  transition_dict["conditions"] = [conditions]
283  else:
284  transition_dict["conditions"] = [Machine.default_condition]
285 
286  if not before:
287  before = []
288  if isinstance(before, (list, tuple, set)):
289  transition_dict["before"] = list(before)
290  else:
291  transition_dict["before"] = [before]
292 
293  if not after:
294  after = []
295  if isinstance(after, (list, tuple, set)):
296  transition_dict["after"] = list(after)
297  else:
298  transition_dict["after"] = [after]
299 
300  self.transitions[trigger].append(transition_dict)
301 
302  def __getattr__(self, name, **kwargs):
303  """
304  Allows us to create a new method for each trigger on the fly.
305  If there is no trigger name in the machine to match, then the normal
306  AttributeError is called.
307  """
308  possible_transitions = self.get_transitions(self.state)
309  if name not in possible_transitions:
310  raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
311  transition_dict = self.get_transition_dict(self.state, name)
312  return partial(self._trigger, name, transition_dict, **kwargs)
313 
314  def _trigger(self, transition_name, transition_dict, **kwargs):
315  """
316  Runs the transition logic. Callbacks are evaluated in the order:
317  conditions -> before -> <new state set here> -> after.
318  """
319  dest, conditions, before_callbacks, after_callbacks = (
320  transition_dict["dest"],
321  transition_dict["conditions"],
322  transition_dict["before"],
323  transition_dict["after"]
324  )
325  # Returns True only if every condition returns True when called
326  if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
327  for before_func in before_callbacks:
328  self._callback(before_func, **kwargs)
329 
330  self.state = dest
331  for after_func in after_callbacks:
332  self._callback(after_func, **kwargs)
333  else:
334  raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
335  "evaluated False")
336 
337  @staticmethod
338  def _callback(func, **kwargs):
339  """
340  Calls a condition/before/after.. function using arguments passed (or not).
341  """
342  return func(**kwargs)
343 
344  def get_transitions(self, source):
345  """
346  Returns allowed transitions from a given state.
347  """
348  possible_transitions = []
349  for transition, transition_dicts in self.transitions.items():
350  for transition_dict in transition_dicts:
351  if transition_dict["source"] == source:
352  possible_transitions.append(transition)
353  return possible_transitions
354 
355  def get_transition_dict(self, state, transition):
356  """
357  Returns the transition dictionary for a state and transition out of it.
358  """
359  transition_dicts = self.transitions[transition]
360  for transition_dict in transition_dicts:
361  if transition_dict["source"] == state:
362  return transition_dict
363  else:
364  raise KeyError(f"No transition from state {state} with the name {transition}.")
365 
366  def save_graph(self, filename, graphname):
367  """
368  Does a simple dot file creation to visualise states and transiitons.
369  """
370  with open(filename, "w") as dotfile:
371  dotfile.write("digraph " + graphname + " {\n")
372  for state in self.states.keys():
373  dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
374  for trigger, transition_dicts in self.transitions.items():
375  for transition in transition_dicts:
376  dotfile.write('"' + transition["source"].name + '" -> "' +
377  transition["dest"].name + '" [label="' + trigger + '"]\n')
378  dotfile.write("}\n")
379 
380 
381 class CalibrationMachine(Machine):
382  """
383  A state machine to handle `Calibration` objects and the flow of
384  processing for them.
385  """
386 
387  collector_input_dir = 'collector_input'
388  collector_output_dir = 'collector_output'
389  algorithm_output_dir = 'algorithm_output'
390 
391  def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
392  """
393  Takes a Calibration object from the caf framework and lets you
394  set the initial state.
395  """
396 
397  self.default_states = [State("init", enter=[self._update_cal_state,
398  self._log_new_state]),
399  State("running_collector", enter=[self._update_cal_state,
400  self._log_new_state]),
401  State("collector_failed", enter=[self._update_cal_state,
402  self._log_new_state]),
403  State("collector_completed", enter=[self._update_cal_state,
404  self._log_new_state]),
405  State("running_algorithms", enter=[self._update_cal_state,
406  self._log_new_state]),
407  State("algorithms_failed", enter=[self._update_cal_state,
408  self._log_new_state]),
409  State("algorithms_completed", enter=[self._update_cal_state,
410  self._log_new_state]),
411  State("completed", enter=[self._update_cal_state,
412  self._log_new_state]),
413  State("failed", enter=[self._update_cal_state,
414  self._log_new_state])
415  ]
416 
417  super().__init__(self.default_states, initial_state)
418 
419 
420  self.calibration = calibration
421  # Monkey Patching for the win!
422 
423  self.calibration.machine = self
424 
425  self.iteration = iteration
426 
427  self.collector_backend = None
428 
429  self._algorithm_results = {}
430 
431  self._runner_final_state = None
432 
433  self.iov_to_calibrate = iov_to_calibrate
434 
435  self.root_dir = Path(os.getcwd(), calibration.name)
436 
437 
440  self._collector_timing = {}
441 
442 
443  self._collector_jobs = {}
444 
445  self.add_transition("submit_collector", "init", "running_collector",
446  conditions=self.dependencies_completed,
447  before=[self._make_output_dir,
448  self._resolve_file_paths,
449  self._build_iov_dicts,
450  self._create_collector_jobs,
451  self._submit_collections,
452  self._dump_job_config])
453  self.add_transition("fail", "running_collector", "collector_failed",
454  conditions=self._collection_failed)
455  self.add_transition("complete", "running_collector", "collector_completed",
456  conditions=self._collection_completed)
457  self.add_transition("run_algorithms", "collector_completed", "running_algorithms",
458  before=self._check_valid_collector_output,
459  after=[self._run_algorithms,
460  self.automatic_transition])
461  self.add_transition("complete", "running_algorithms", "algorithms_completed",
462  after=self.automatic_transition,
463  conditions=self._runner_not_failed)
464  self.add_transition("fail", "running_algorithms", "algorithms_failed",
465  conditions=self._runner_failed)
466  self.add_transition("iterate", "algorithms_completed", "init",
467  conditions=[self._require_iteration,
468  self._below_max_iterations],
469  after=self._increment_iteration)
470  self.add_transition("finish", "algorithms_completed", "completed",
471  conditions=self._no_require_iteration,
472  before=self._prepare_final_db)
473  self.add_transition("fail_fully", "algorithms_failed", "failed")
474  self.add_transition("fail_fully", "collector_failed", "failed")
475 
476  def _update_cal_state(self, **kwargs):
477  self.calibration.state = str(kwargs["new_state"])
478 
479  def files_containing_iov(self, file_paths, files_to_iovs, iov):
480  """
481  Lookup function that returns all files from the file_paths that
482  overlap with this IoV.
483  """
484  # Files that contain an Exp,Run range that overlaps with given IoV
485  overlapping_files = set()
486 
487  for file_path, file_iov in files_to_iovs.items():
488  if file_iov.overlaps(iov) and (file_path in file_paths):
489  overlapping_files.add(file_path)
490  return overlapping_files
491 
492  def _dump_job_config(self):
493  """
494  Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
495  later in case of failure.
496  """
497  # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
498  # submits the jobs this might need to wait a while.
499  while any(map(lambda j: j.status == "init", self._collector_jobs.values())):
500  B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
501  time.sleep(5)
502 
503  for collection_name, job in self._collector_jobs.items():
504  collector_job_output_file_name = self.calibration.collections[collection_name].job_config
505  output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
506  collection_name, collector_job_output_file_name)
507  job.dump_to_json(output_file)
508 
509  def _recover_collector_jobs(self):
510  """
511  Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
512  """
513  for collection_name, collection in self.calibration.collections.items():
514  output_file = self.root_dir.joinpath(str(self.iteration),
515  self.collector_input_dir,
516  collection_name,
517  collection.job_config)
518  self._collector_jobs[collection_name] = Job.from_json(output_file)
519 
520  def _iov_requested(self):
521  """
522  """
523  if self.iov_to_calibrate:
524  B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
525  return True
526  else:
527  B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
528  return False
529 
530  def _resolve_file_paths(self):
531  """
532  """
533  pass
534 
535  def _build_iov_dicts(self):
536  """
537  Build IoV file dictionary for each collection if required.
538  """
539  iov_requested = self._iov_requested()
540  if iov_requested or self.calibration.ignored_runs:
541  for coll_name, collection in self.calibration.collections.items():
542  if not collection.files_to_iovs:
543  B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
544  f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
545  " Filling dictionary from input file metadata."
546  " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
547 
548  files_to_iovs = {}
549  for file_path in collection.input_files:
550  files_to_iovs[file_path] = get_iov_from_file(file_path)
551  collection.files_to_iovs = files_to_iovs
552  else:
553  B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
554  f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
555  else:
556  B2INFO("No File to IoV mapping required.")
557 
558  def _below_max_iterations(self):
559  """
560  """
561  return self.iteration < self.calibration.max_iterations
562 
563  def _increment_iteration(self):
564  """
565  """
566  self.iteration += 1
567  self.calibration.iteration = self.iteration
568 
569  def _collection_completed(self):
570  """
571  Did all the collections succeed?
572  """
573  B2DEBUG(29, "Checking for failed collector job.")
574  if self._collector_jobs_ready():
575  return all([job.status == "completed" for job in self._collector_jobs.values()])
576 
577  def _collection_failed(self):
578  """
579  Did any of the collections fail?
580  """
581  B2DEBUG(29, "Checking for failed collector job.")
582  if self._collector_jobs_ready():
583  return any([job.status == "failed" for job in self._collector_jobs.values()])
584 
585  def _runner_not_failed(self):
586  """
587  Returns:
588  bool: If AlgorithmsRunner succeeded return True.
589  """
590  return not self._runner_failed()
591 
592  def _runner_failed(self):
593  """
594  Returns:
595  bool: If AlgorithmsRunner failed return True.
596  """
597  if self._runner_final_state == AlgorithmsRunner.FAILED:
598  return True
599  else:
600  return False
601 
602  def _collector_jobs_ready(self):
603  """
604  """
605  since_last_update = time.time() - self._collector_timing["last_update"]
606  if since_last_update > self.calibration.collector_full_update_interval:
607  B2INFO("Updating full collector job statuses.")
608  for job in self._collector_jobs.values():
609  job.update_status()
610  self._collector_timing["last_update"] = time.time()
611  if job.subjobs:
612  num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
613  total_subjobs = len(job.subjobs)
614  B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
615  f" Calibration {self.calibration.name} Job {job.name}.")
616  return all([job.ready() for job in self._collector_jobs.values()])
617 
618  def _submit_collections(self):
619  """
620  """
621  self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
622  self._collector_timing["start"] = time.time()
623  self._collector_timing["last_update"] = time.time()
624 
625  def _no_require_iteration(self):
626  """
627  """
628  if self._require_iteration() and self._below_max_iterations():
629  return False
630  elif self._require_iteration() and not self._below_max_iterations():
631  B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
632  return True
633  elif not self._require_iteration():
634  return True
635 
636  def _require_iteration(self):
637  """
638  """
639  iteration_called = False
640  for alg_name, results in self._algorithm_results[self.iteration].items():
641  for result in results:
642  if result.result == CalibrationAlgorithm.c_Iterate:
643  iteration_called = True
644  break
645  if iteration_called:
646  break
647  return iteration_called
648 
649  def _log_new_state(self, **kwargs):
650  """
651  """
652  B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
653 
654  def dependencies_completed(self):
655  """
656  Condition function to check that the dependencies of our calibration are in the 'completed' state.
657  Technically only need to check explicit dependencies.
658  """
659  for calibration in self.calibration.dependencies:
660  if not calibration.state == calibration.end_state:
661  return False
662  else:
663  return True
664 
665  def automatic_transition(self):
666  """
667  Automatically try all transitions out of this state once. Tries fail last.
668  """
669  possible_transitions = self.get_transitions(self.state)
670  for transition in possible_transitions:
671  try:
672  if transition != "fail":
673  getattr(self, transition)()
674  break
675  except ConditionError:
676  continue
677  else:
678  if "fail" in possible_transitions:
679  getattr(self, "fail")()
680  else:
681  raise MachineError(f"Failed to automatically transition out of {self.state} state.")
682 
683  def _make_output_dir(self):
684  """
685  Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
686  Also creates s
687  """
688  create_directories(self.root_dir, overwrite=False)
689 
690  def _make_collector_path(self, name, collection):
691  """
692  Creates a basf2 path for the correct collector and serializes it in the
693  self.output_dir/<calibration_name>/<iteration>/paths directory
694  """
695  path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
696  # Automatically overwrite any previous directory
697  create_directories(path_output_dir)
698  path_file_name = collection.collector.name() + '.path'
699  path_file_name = path_output_dir / path_file_name
700  # Create empty path and add collector to it
701  coll_path = create_path()
702  coll_path.add_module(collection.collector)
703  # Dump the basf2 path to file
704  with open(path_file_name, 'bw') as serialized_path_file:
705  pickle.dump(serialize_path(coll_path), serialized_path_file)
706  # Return the pickle file path for addition to the input sandbox
707  return str(path_file_name.absolute())
708 
709  def _make_pre_collector_path(self, name, collection):
710  """
711  Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
712  self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
713  """
714  path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
715  coll_path = collection.pre_collector_path
716  path_file_name = 'pre_collector.path'
717  path_file_name = os.path.join(path_output_dir, path_file_name)
718  # Dump the basf2 path to file
719  with open(path_file_name, 'bw') as serialized_path_file:
720  pickle.dump(serialize_path(coll_path), serialized_path_file)
721  # Return the pickle file path for addition to the input sandbox
722  return path_file_name
723 
724  def _create_collector_jobs(self):
725  """
726  Creates a Job object for the collections of this iteration, ready for submission
727  to backend.
728  """
729  for collection_name, collection in self.calibration.collections.items():
730  iteration_dir = self.root_dir.joinpath(str(self.iteration))
731  job = Job('_'.join([self.calibration.name, collection_name, 'Iteration', str(self.iteration)]))
732  job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
733  job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
734  # Remove previous failed attempt to avoid problems
735  if job.output_dir.exists():
736  B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
737  f"Deleting {job.output_dir} before re-submitting.")
738  shutil.rmtree(job.output_dir)
739  job.cmd = collection.job_cmd
740  job.append_current_basf2_setup_cmds()
741  job.input_sandbox_files.append(collection.job_script)
742  collector_path_file = Path(self._make_collector_path(collection_name, collection))
743  job.input_sandbox_files.append(collector_path_file)
744  if collection.pre_collector_path:
745  pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
746  job.input_sandbox_files.append(pre_collector_path_file)
747 
748  # Want to figure out which local databases are required for this job and their paths
749  list_dependent_databases = []
750 
751  # Here we add the finished databases of previous calibrations that we depend on.
752  # We can assume that the databases exist as we can't be here until they have returned
753  for dependency in self.calibration.dependencies:
754  database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
755  B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
756  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
757 
758  # Add previous iteration databases from this calibration
759  if self.iteration > 0:
760  previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
761  database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
762  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
763  B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
764 
765  # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
766  # collector path
767  input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
768 
769  # Need to pass setup info to collector which would be tricky as arguments
770  # We make a dictionary and pass it in as json
771  job_config = {}
772  # Apply the user-set Calibration database chain to the base of the overall chain.
773  json_db_chain = []
774  for database in collection.database_chain:
775  if database.db_type == 'local':
776  json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
777  elif database.db_type == 'central':
778  json_db_chain.append(('central', database.global_tag))
779  else:
780  raise ValueError(f"Unknown database type {database.db_type}.")
781  # CAF created ones for dependent calibrations and previous iterations of this calibration
782  for database in list_dependent_databases:
783  json_db_chain.append(('local', database))
784  job_config['database_chain'] = json_db_chain
785 
786  job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
787  with open(job_config_file_path, 'w') as job_config_file:
788  json.dump(job_config, job_config_file, indent=2)
789  job.input_sandbox_files.append(job_config_file_path)
790 
791  # Define the input files
792  input_data_files = set(collection.input_files)
793  # Reduce the input data files to only those that overlap with the optional requested IoV
794  if self.iov_to_calibrate:
795  input_data_files = self.files_containing_iov(input_data_files,
796  collection.files_to_iovs,
797  self.iov_to_calibrate)
798  # Remove any files that ONLY contain runs from our optional ignored_runs list
799  files_to_ignore = set()
800  for exprun in self.calibration.ignored_runs:
801  for input_file in input_data_files:
802  file_iov = self.calibration.files_to_iovs[input_file]
803  if file_iov == exprun.make_iov():
804  B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
805  f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
806  "is being removed from input files list.")
807  files_to_ignore.add(input_file)
808  input_data_files.difference_update(files_to_ignore)
809 
810  if not input_data_files:
811  raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
812  f" and Collection '{collection_name}'.")
813  job.input_files = list(input_data_files)
814 
815  job.splitter = collection.splitter
816  job.backend_args = collection.backend_args
817  # Output patterns to be returned from collector job
818  job.output_patterns = collection.output_patterns
819  B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
820  self._collector_jobs[collection_name] = job
821 
822  def _check_valid_collector_output(self):
823  B2INFO("Checking that Collector output exists for all colector jobs "
824  f"using {self.calibration.name}.output_patterns.")
825  if not self._collector_jobs:
826  B2INFO("We're restarting so we'll recreate the collector Job object.")
827  self._recover_collector_jobs()
828 
829  for job in self._collector_jobs.values():
830  if not job.subjobs:
831  output_files = []
832  for pattern in job.output_patterns:
833  output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
834  if not output_files:
835  raise MachineError("No output files from Collector Job")
836  else:
837  for subjob in job.subjobs.values():
838  output_files = []
839  for pattern in subjob.output_patterns:
840  output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
841  if not output_files:
842  raise MachineError(f"No output files from Collector {subjob}")
843 
844  def _run_algorithms(self):
845  """
846  Runs the Calibration Algorithms for this calibration machine.
847 
848  Will run them sequentially locally (possible benefits to using a
849  processing pool for low memory algorithms later on.)
850  """
851  # Get an instance of the Runner for these algorithms and run it
852  algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
853  algs_runner.algorithms = self.calibration.algorithms
854  algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
855  output_database_dir = algorithm_output_dir.joinpath("outputdb")
856  # Remove it, if we failed previously, to start clean
857  if algorithm_output_dir.exists():
858  B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
859  f"Deleting and recreating {algorithm_output_dir}.")
860  create_directories(algorithm_output_dir)
861  B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
862  algs_runner.output_database_dir = output_database_dir
863  algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
864  input_files = []
865 
866  for job in self._collector_jobs.values():
867  if job.subjobs:
868  for subjob in job.subjobs.values():
869  for pattern in subjob.output_patterns:
870  input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
871  else:
872  for pattern in job.output_patterns:
873  input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
874 
875  algs_runner.input_files = input_files
876 
877  # Add any user defined database chain for this calibration
878  algs_runner.database_chain = self.calibration.database_chain
879 
880  # Here we add the finished databases of previous calibrations that we depend on.
881  # We can assume that the databases exist as we can't be here until they have returned
882  list_dependent_databases = []
883  for dependency in self.calibration.dependencies:
884  database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
885  B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
886  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
887 
888  # Add previous iteration databases from this calibration
889  if self.iteration > 0:
890  previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
891  database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
892  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
893  B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
894  algs_runner.dependent_databases = list_dependent_databases
895 
896  algs_runner.ignored_runs = self.calibration.ignored_runs
897 
898  try:
899  algs_runner.run(self.iov_to_calibrate, self.iteration)
900  except Exception as err:
901  print(err)
902  # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
903  # results. But here we had an actual exception so we just force into failure instead.
904  self._state = State("algorithms_failed")
905  self._algorithm_results[self.iteration] = algs_runner.results
906  self._runner_final_state = algs_runner.final_state
907 
908  def _prepare_final_db(self):
909  """
910  Take the last iteration's outputdb and copy it to a more easily findable place.
911  """
912  database_location = self.root_dir.joinpath(str(self.iteration),
913  self.calibration.alg_output_dir,
914  'outputdb')
915  final_database_location = self.root_dir.joinpath('outputdb')
916  if final_database_location.exists():
917  B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
918  shutil.rmtree(final_database_location)
919  shutil.copytree(database_location, final_database_location)
920 
921 
922 class AlgorithmMachine(Machine):
923  """
924  A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
925  """
926 
927 
929  required_attrs = ["algorithm",
930  "dependent_databases",
931  "database_chain",
932  "output_dir",
933  "output_database_dir",
934  "input_files"
935  ]
936 
937 
938  required_true_attrs = ["algorithm",
939  "output_dir",
940  "output_database_dir",
941  "input_files"
942  ]
943 
944  def __init__(self, algorithm=None, initial_state="init"):
945  """
946  Takes an Algorithm object from the caf framework and defines the transitions.
947  """
948 
949  self.default_states = [State("init"),
950  State("ready"),
951  State("running_algorithm"),
952  State("completed"),
953  State("failed")]
954 
955  super().__init__(self.default_states, initial_state)
956 
957 
958  self.algorithm = algorithm
959 
960  self.input_files = []
961 
962  self.dependent_databases = []
963 
965  self.database_chain = []
966 
967  self.output_dir = ""
968 
969  self.output_database_dir = ""
970 
971  self.result = None
972 
973  self.add_transition("setup_algorithm", "init", "ready",
974  before=[self._setup_logging,
975  self._change_working_dir,
976  self._setup_database_chain,
977  self._set_input_data,
978  self._pre_algorithm])
979  self.add_transition("execute_runs", "ready", "running_algorithm",
980  after=self._execute_over_iov)
981  self.add_transition("complete", "running_algorithm", "completed")
982  self.add_transition("fail", "running_algorithm", "failed")
983  self.add_transition("fail", "ready", "failed")
984  self.add_transition("setup_algorithm", "completed", "ready")
985  self.add_transition("setup_algorithm", "failed", "ready")
986 
987  def setup_from_dict(self, params):
988  """
989  Parameters:
990  params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
991  """
992  for attribute_name, value in params.items():
993  setattr(self, attribute_name, value)
994 
995  def is_valid(self):
996  """
997  Returns:
998  bool: Whether or not this machine has been set up correctly with all its necessary attributes.
999  """
1000  B2INFO("Checking validity of current setup of AlgorithmMachine for {}.".format(self.algorithm.name))
1001  # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
1002  for attribute_name in self.required_attrs:
1003  if not hasattr(self, attribute_name):
1004  B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1005  return False
1006  # Check if any attributes that need actual values haven't been set or were empty
1007  for attribute_name in self.required_true_attrs:
1008  if not getattr(self, attribute_name):
1009  B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1010  return False
1011  return True
1012 
1013  def _create_output_dir(self, **kwargs):
1014  """
1015  Create working/output directory of algorithm. Any old directory is overwritten.
1016  """
1017  create_directories(Path(self.output_dir), overwrite=True)
1018 
1019  def _setup_database_chain(self, **kwargs):
1020  """
1021  Apply all databases in the correct order.
1022  """
1023  # We deliberately override the normal database ordering because we don't want input files GTs to affect
1024  # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1025  b2conditions.reset()
1026  b2conditions.override_globaltags()
1027 
1028  # Apply all the databases in order, starting with the user-set chain for this Calibration
1029  for database in self.database_chain:
1030  if database.db_type == 'local':
1031  B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1032  f"for {self.algorithm.name}.")
1033  b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1034  elif database.db_type == 'central':
1035  B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1036  f"for {self.algorithm.name}.")
1037  b2conditions.prepend_globaltag(database.global_tag)
1038  else:
1039  raise ValueError(f"Unknown database type {database.db_type}.")
1040  # Here we add the finished databases of previous calibrations that we depend on.
1041  # We can assume that the databases exist as we can't be here until they have returned
1042  # with OK status.
1043  for filename, directory in self.dependent_databases:
1044  B2INFO(f"Adding Local Database {filename} to head of chain of local databases created by"
1045  f" a dependent calibration, for {self.algorithm.name}.")
1046  b2conditions.prepend_testing_payloads(filename)
1047 
1048  # Create a directory to store the payloads of this algorithm
1049  create_directories(Path(self.output_database_dir), overwrite=False)
1050 
1051  # add local database to save payloads
1052  B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1053  # Things have changed. We now need to do the expert settings to create a database directly.
1054  # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1055  b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath("database.txt")))
1056 
1057  def _setup_logging(self, **kwargs):
1058  """
1059  """
1060  # add logfile for output
1061  log_file = os.path.join(self.output_dir, self.algorithm.name + '_stdout')
1062  B2INFO(f"Output log file at {log_file}.")
1063  basf2.reset_log()
1064  basf2.set_log_level(basf2.LogLevel.INFO)
1065  basf2.log_to_file(log_file)
1066 
1067  def _change_working_dir(self, **kwargs):
1068  """
1069  """
1070  B2INFO(f"Changing current working directory to {self.output_dir}.")
1071  os.chdir(self.output_dir)
1072 
1073  def _pre_algorithm(self, **kwargs):
1074  """
1075  Call the user defined algorithm setup function.
1076  """
1077  B2INFO("Running Pre-Algorithm function (if exists)")
1078  if self.algorithm.pre_algorithm:
1079  # We have to re-pass in the algorithm here because an outside user has created this method.
1080  # So the method isn't bound to the instance properly.
1081  self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs["iteration"])
1082 
1083  def _execute_over_iov(self, **kwargs):
1084  """
1085  Does the actual execute of the algorithm on an IoV and records the result.
1086  """
1087  B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1088 
1089  runs_to_execute = kwargs["runs"]
1090  iov = kwargs["apply_iov"]
1091  iteration = kwargs["iteration"]
1092  if not iov:
1093  iov = iov_from_runs(runs_to_execute)
1094  B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1095  alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1096  self.result = IoV_Result(iov, alg_result)
1097 
1098  def _set_input_data(self, **kwargs):
1099  self.algorithm.data_input(self.input_files)
1100 
1101 
1102 class MachineError(Exception):
1103  """
1104  Base exception class for this module.
1105  """
1106 
1107 
1108 class ConditionError(MachineError):
1109  """
1110  Exception for when conditions fail during a transition.
1111  """
1112 
1113 
1114 class TransitionError(MachineError):
1115  """
1116  Exception for when transitions fail.
1117  """
1118 
1119 # @endcond