Belle II Software  release-06-01-15
state_machines.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 import basf2
13 
14 from functools import partial
15 from collections import defaultdict
16 
17 import pickle
18 import glob
19 import shutil
20 import time
21 from pathlib import Path
22 import os
23 import json
24 
25 from basf2 import create_path
26 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
27 from basf2 import conditions as b2conditions
28 from basf2.pickle_path import serialize_path
29 
30 from ROOT.Belle2 import CalibrationAlgorithm
31 
32 from caf.utils import create_directories
33 from caf.utils import method_dispatch
34 from caf.utils import iov_from_runs
35 from caf.utils import IoV_Result
36 from caf.utils import get_iov_from_file
37 from caf.backends import Job
38 from caf.runners import AlgorithmsRunner
39 
40 
41 class State():
42  """
43  Basic State object that can take enter and exit state methods and records
44  the state of a machine.
45 
46  You should assign the self.on_enter or self.on_exit attributes to callback functions
47  or lists of them, if you need them.
48  """
49 
50  def __init__(self, name, enter=None, exit=None):
51  """
52  Initialise State with a name and optional lists of callbacks.
53  """
54 
55  self.namename = name
56 
57  self.on_enteron_enteron_enteron_enter = enter
58 
59  self.on_exiton_exiton_exiton_exit = exit
60 
61  @property
62  def on_enter(self):
63  """
64  Runs callbacks when a state is entered.
65  """
66  return self._on_enter_on_enter
67 
68  @on_enter.setter
69  def on_enter(self, callbacks):
70  """
71  """
72  self._on_enter_on_enter = []
73  if callbacks:
74  self._add_callbacks_add_callbacks(callbacks, self._on_enter_on_enter)
75 
76  @property
77  def on_exit(self):
78  """
79  Runs callbacks when a state is exited.
80  """
81  return self._on_exit_on_exit
82 
83  @on_exit.setter
84  def on_exit(self, callbacks):
85  """
86  """
87  self._on_exit_on_exit = []
88  if callbacks:
89  self._add_callbacks_add_callbacks(callbacks, self._on_exit_on_exit)
90 
91  @method_dispatch
92  def _add_callbacks(self, callback, attribute):
93  """
94  Adds callback to a property.
95  """
96  if callable(callback):
97  attribute.append(callback)
98  else:
99  B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
100 
101  @_add_callbacks.register(tuple)
102  @_add_callbacks.register(list)
103  def _(self, callbacks, attribute):
104  """
105  Alternate method for lists and tuples of function objects.
106  """
107  if callbacks:
108  for function in callbacks:
109  if callable(function):
110  attribute.append(function)
111  else:
112  B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
113 
114  def __str__(self):
115  """
116  """
117  return self.namename
118 
119  def __repr__(self):
120  """
121  """
122  return f"State(name={self.name})"
123 
124  def __eq__(self, other):
125  """
126  """
127  if isinstance(other, str):
128  return self.namename == other
129  else:
130  return self.namename == other.name
131 
132  def __hash__(self):
133  """
134  """
135  return hash(self.namename)
136 
137 
138 class Machine():
139  """
140  Parameters:
141  states (list[str]): A list of possible states of the machine.
142  initial_state (str):
143 
144  Base class for a final state machine wrapper.
145  Implements the framwork that a more complex machine can inherit from.
146 
147  The `transitions` attribute is a dictionary of trigger name keys, each value of
148  which is another dictionary of 'source' states, 'dest' states, and 'conditions'
149  methods. 'conditions' should be a list of callables or a single one. A transition is
150  valid if it goes from an allowed state to an allowed state.
151  Conditions are optional but must be a callable that returns True or False based
152  on some state of the machine. They cannot have input arguments currently.
153 
154  Every condition/before/after callback function MUST take ``**kwargs`` as the only
155  argument (except ``self`` if it's a class method). This is because it's basically
156  impossible to determine which arguments to pass to which functions for a transition.
157  Therefore this machine just enforces that every function should simply take ``**kwargs``
158  and use the dictionary of arguments (even if it doesn't need any arguments).
159 
160  This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
161  you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
162  will *not* work.
163  """
164 
165  def __init__(self, states=None, initial_state="default_initial"):
166  """
167  Basic Setup of states and initial_state.
168  """
169 
170  self.statesstates = {}
171  if states:
172  for state in states:
173  self.add_stateadd_state(state)
174  if initial_state != "default_initial":
175 
176  self.initial_stateinitial_stateinitial_stateinitial_state = initial_state
177  else:
178  self.add_stateadd_state(initial_state)
179 
180  self._initial_state_initial_state = State(initial_state)
181 
182 
183  self._state_state = self.initial_stateinitial_stateinitial_stateinitial_state
184 
185  self.transitionstransitions = defaultdict(list)
186 
187  def add_state(self, state, enter=None, exit=None):
188  """
189  Adds a single state to the list of possible ones.
190  Should be a unique string or a State object with a unique name.
191  """
192  if isinstance(state, str):
193  self.add_stateadd_state(State(state, enter, exit))
194  elif isinstance(state, State):
195  if state.name not in self.statesstates.keys():
196  self.statesstates[state.name] = state
197  else:
198  B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
199  else:
200  B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
201 
202  @property
203  def initial_state(self):
204  """
205  The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
206  """
207  return self._initial_state_initial_state
208 
209  @initial_state.setter
210  def initial_state(self, state):
211  """
212  """
213  if state in self.statesstates.keys():
214  self._initial_state_initial_state = self.statesstates[state]
215 
216  self._state_state = self.statesstates[state]
217  else:
218  raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
219 
220  @property
221  def state(self):
222  """
223  The current state of the machine. Actually a `property` decorator. It will call the exit method of the
224  current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
225  either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
226  """
227  return self._state_state
228 
229  @state.setter
230  def state(self, state):
231  """
232  """
233  if isinstance(state, str):
234  state_name = state
235  else:
236  state_name = state.name
237 
238  try:
239  state = self.statesstates[state_name]
240  # Run exit callbacks of current state
241  for callback in self.statestatestatestate.on_exit:
242  callback(prior_state=self.statestatestatestate, new_state=state)
243  # Run enter callbacks of new state
244  for callback in state.on_enter:
245  callback(prior_state=self.statestatestatestate, new_state=state)
246  # Set the state
247  self._state_state = state
248  except KeyError:
249  raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
250 
251  @staticmethod
252  def default_condition(**kwargs):
253  """
254  Method to always return True.
255  """
256  return True
257 
258  def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
259  """
260  Adds a single transition to the dictionary of possible ones.
261  Trigger is the method name that begins the transtion between the
262  source state and the destination state.
263 
264  The condition is an optional function that returns True or False
265  depending on the current state/input.
266  """
267  transition_dict = {}
268  try:
269  source = self.statesstates[source]
270  dest = self.statesstates[dest]
271  transition_dict["source"] = source
272  transition_dict["dest"] = dest
273  except KeyError as err:
274  B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
275  raise err
276  if conditions:
277  if isinstance(conditions, (list, tuple, set)):
278  transition_dict["conditions"] = list(conditions)
279  else:
280  transition_dict["conditions"] = [conditions]
281  else:
282  transition_dict["conditions"] = [Machine.default_condition]
283 
284  if not before:
285  before = []
286  if isinstance(before, (list, tuple, set)):
287  transition_dict["before"] = list(before)
288  else:
289  transition_dict["before"] = [before]
290 
291  if not after:
292  after = []
293  if isinstance(after, (list, tuple, set)):
294  transition_dict["after"] = list(after)
295  else:
296  transition_dict["after"] = [after]
297 
298  self.transitionstransitions[trigger].append(transition_dict)
299 
300  def __getattr__(self, name, **kwargs):
301  """
302  Allows us to create a new method for each trigger on the fly.
303  If there is no trigger name in the machine to match, then the normal
304  AttributeError is called.
305  """
306  possible_transitions = self.get_transitionsget_transitions(self.statestatestatestate)
307  if name not in possible_transitions:
308  raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
309  transition_dict = self.get_transition_dictget_transition_dict(self.statestatestatestate, name)
310  return partial(self._trigger_trigger, name, transition_dict, **kwargs)
311 
312  def _trigger(self, transition_name, transition_dict, **kwargs):
313  """
314  Runs the transition logic. Callbacks are evaluated in the order:
315  conditions -> before -> <new state set here> -> after.
316  """
317  dest, conditions, before_callbacks, after_callbacks = (
318  transition_dict["dest"],
319  transition_dict["conditions"],
320  transition_dict["before"],
321  transition_dict["after"]
322  )
323  # Returns True only if every condition returns True when called
324  if all(map(lambda condition: self._callback_callback(condition, **kwargs), conditions)):
325  for before_func in before_callbacks:
326  self._callback_callback(before_func, **kwargs)
327 
328  self.statestatestatestate = dest
329  for after_func in after_callbacks:
330  self._callback_callback(after_func, **kwargs)
331  else:
332  raise ConditionError((f"Transition '{transition_name}' called for but one or more conditions "
333  "evaluated False"))
334 
335  @staticmethod
336  def _callback(func, **kwargs):
337  """
338  Calls a condition/before/after.. function using arguments passed (or not).
339  """
340  return func(**kwargs)
341 
342  def get_transitions(self, source):
343  """
344  Returns allowed transitions from a given state.
345  """
346  possible_transitions = []
347  for transition, transition_dicts in self.transitionstransitions.items():
348  for transition_dict in transition_dicts:
349  if transition_dict["source"] == source:
350  possible_transitions.append(transition)
351  return possible_transitions
352 
353  def get_transition_dict(self, state, transition):
354  """
355  Returns the transition dictionary for a state and transition out of it.
356  """
357  transition_dicts = self.transitionstransitions[transition]
358  for transition_dict in transition_dicts:
359  if transition_dict["source"] == state:
360  return transition_dict
361  else:
362  raise KeyError(f"No transition from state {state} with the name {transition}.")
363 
364  def save_graph(self, filename, graphname):
365  """
366  Does a simple dot file creation to visualise states and transiitons.
367  """
368  with open(filename, "w") as dotfile:
369  dotfile.write("digraph " + graphname + " {\n")
370  for state in self.statesstates.keys():
371  dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
372  for trigger, transition_dicts in self.transitionstransitions.items():
373  for transition in transition_dicts:
374  dotfile.write('"' + transition["source"].name + '" -> "' +
375  transition["dest"].name + '" [label="' + trigger + '"]\n')
376  dotfile.write("}\n")
377 
378 
380  """
381  A state machine to handle `Calibration` objects and the flow of
382  processing for them.
383  """
384 
385  collector_input_dir = 'collector_input'
386  collector_output_dir = 'collector_output'
387  algorithm_output_dir = 'algorithm_output'
388 
389  def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
390  """
391  Takes a Calibration object from the caf framework and lets you
392  set the initial state.
393  """
394 
395  self.default_statesdefault_states = [State("init", enter=[self._update_cal_state_update_cal_state,
396  self._log_new_state_log_new_state]),
397  State("running_collector", enter=[self._update_cal_state_update_cal_state,
398  self._log_new_state_log_new_state]),
399  State("collector_failed", enter=[self._update_cal_state_update_cal_state,
400  self._log_new_state_log_new_state]),
401  State("collector_completed", enter=[self._update_cal_state_update_cal_state,
402  self._log_new_state_log_new_state]),
403  State("running_algorithms", enter=[self._update_cal_state_update_cal_state,
404  self._log_new_state_log_new_state]),
405  State("algorithms_failed", enter=[self._update_cal_state_update_cal_state,
406  self._log_new_state_log_new_state]),
407  State("algorithms_completed", enter=[self._update_cal_state_update_cal_state,
408  self._log_new_state_log_new_state]),
409  State("completed", enter=[self._update_cal_state_update_cal_state,
410  self._log_new_state_log_new_state]),
411  State("failed", enter=[self._update_cal_state_update_cal_state,
412  self._log_new_state_log_new_state])
413  ]
414 
415  super().__init__(self.default_statesdefault_states, initial_state)
416 
417 
418  self.calibrationcalibration = calibration
419  # Monkey Patching for the win!
420 
421  self.calibrationcalibration.machine = self
422 
423  self.iterationiteration = iteration
424 
425  self.collector_backendcollector_backend = None
426 
427  self._algorithm_results_algorithm_results = {}
428 
429  self._runner_final_state_runner_final_state = None
430 
431  self.iov_to_calibrateiov_to_calibrate = iov_to_calibrate
432 
433  self.root_dirroot_dir = Path(os.getcwd(), calibration.name)
434 
435 
438  self._collector_timing_collector_timing = {}
439 
440 
441  self._collector_jobs_collector_jobs = {}
442 
443  self.add_transitionadd_transition("submit_collector", "init", "running_collector",
444  conditions=self.dependencies_completeddependencies_completed,
445  before=[self._make_output_dir_make_output_dir,
446  self._resolve_file_paths_resolve_file_paths,
447  self._build_iov_dicts_build_iov_dicts,
448  self._create_collector_jobs_create_collector_jobs,
449  self._submit_collections_submit_collections,
450  self._dump_job_config_dump_job_config])
451  self.add_transitionadd_transition("fail", "running_collector", "collector_failed",
452  conditions=self._collection_failed_collection_failed)
453  self.add_transitionadd_transition("complete", "running_collector", "collector_completed",
454  conditions=self._collection_completed_collection_completed)
455  self.add_transitionadd_transition("run_algorithms", "collector_completed", "running_algorithms",
456  before=self._check_valid_collector_output_check_valid_collector_output,
457  after=[self._run_algorithms_run_algorithms,
458  self.automatic_transitionautomatic_transition])
459  self.add_transitionadd_transition("complete", "running_algorithms", "algorithms_completed",
460  after=self.automatic_transitionautomatic_transition,
461  conditions=self._runner_not_failed_runner_not_failed)
462  self.add_transitionadd_transition("fail", "running_algorithms", "algorithms_failed",
463  conditions=self._runner_failed_runner_failed)
464  self.add_transitionadd_transition("iterate", "algorithms_completed", "init",
465  conditions=[self._require_iteration_require_iteration,
466  self._below_max_iterations_below_max_iterations],
467  after=self._increment_iteration_increment_iteration)
468  self.add_transitionadd_transition("finish", "algorithms_completed", "completed",
469  conditions=self._no_require_iteration_no_require_iteration,
470  before=self._prepare_final_db_prepare_final_db)
471  self.add_transitionadd_transition("fail_fully", "algorithms_failed", "failed")
472  self.add_transitionadd_transition("fail_fully", "collector_failed", "failed")
473 
474  def _update_cal_state(self, **kwargs):
475  self.calibrationcalibration.state = str(kwargs["new_state"])
476 
477  def files_containing_iov(self, file_paths, files_to_iovs, iov):
478  """
479  Lookup function that returns all files from the file_paths that
480  overlap with this IoV.
481  """
482  # Files that contain an Exp,Run range that overlaps with given IoV
483  overlapping_files = set()
484 
485  for file_path, file_iov in files_to_iovs.items():
486  if file_iov.overlaps(iov) and (file_path in file_paths):
487  overlapping_files.add(file_path)
488  return overlapping_files
489 
490  def _dump_job_config(self):
491  """
492  Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
493  later in case of failure.
494  """
495  # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
496  # submits the jobs this might need to wait a while.
497  while any(map(lambda j: j.status == "init", self._collector_jobs_collector_jobs.values())):
498  B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
499  time.sleep(5)
500 
501  for collection_name, job in self._collector_jobs_collector_jobs.items():
502  collector_job_output_file_name = self.calibrationcalibration.collections[collection_name].job_config
503  output_file = self.root_dirroot_dir.joinpath(str(self.iterationiteration), self.collector_input_dircollector_input_dir,
504  collection_name, collector_job_output_file_name)
505  job.dump_to_json(output_file)
506 
508  """
509  Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
510  """
511  for collection_name, collection in self.calibrationcalibration.collections.items():
512  output_file = self.root_dirroot_dir.joinpath(str(self.iterationiteration),
513  self.collector_input_dircollector_input_dir,
514  collection_name,
515  collection.job_config)
516  self._collector_jobs_collector_jobs[collection_name] = Job.from_json(output_file)
517 
518  def _iov_requested(self):
519  """
520  """
521  if self.iov_to_calibrateiov_to_calibrate:
522  B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
523  return True
524  else:
525  B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
526  return False
527 
529  """
530  """
531  pass
532 
533  def _build_iov_dicts(self):
534  """
535  Build IoV file dictionary for each collection if required.
536  """
537  iov_requested = self._iov_requested()
538  if iov_requested or self.calibration.ignored_runs:
539  for coll_name, collection in self.calibration.collections.items():
540  if not collection.files_to_iovs:
541  B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
542  f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
543  " Filling dictionary from input file metadata."
544  " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
545 
546  files_to_iovs = {}
547  for file_path in collection.input_files:
548  files_to_iovs[file_path] = get_iov_from_file(file_path)
549  collection.files_to_iovs = files_to_iovs
550  else:
551  B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
552  f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
553  else:
554  B2INFO("No File to IoV mapping required.")
555 
557  """
558  """
559  return self.iterationiteration < self.calibrationcalibration.max_iterations
560 
562  """
563  """
564  self.iterationiteration += 1
565  self.calibrationcalibration.iteration = self.iterationiteration
566 
568  """
569  Did all the collections succeed?
570  """
571  B2DEBUG(29, "Checking for failed collector job.")
572  if self._collector_jobs_ready_collector_jobs_ready():
573  return all([job.status == "completed" for job in self._collector_jobs_collector_jobs.values()])
574 
576  """
577  Did any of the collections fail?
578  """
579  B2DEBUG(29, "Checking for failed collector job.")
580  if self._collector_jobs_ready_collector_jobs_ready():
581  return any([job.status == "failed" for job in self._collector_jobs_collector_jobs.values()])
582 
584  """
585  Returns:
586  bool: If AlgorithmsRunner succeeded return True.
587  """
588  return not self._runner_failed_runner_failed()
589 
590  def _runner_failed(self):
591  """
592  Returns:
593  bool: If AlgorithmsRunner failed return True.
594  """
595  if self._runner_final_state_runner_final_state == AlgorithmsRunner.FAILED:
596  return True
597  else:
598  return False
599 
601  """
602  """
603  since_last_update = time.time() - self._collector_timing_collector_timing["last_update"]
604  if since_last_update > self.calibrationcalibration.collector_full_update_interval:
605  B2INFO("Updating full collector job statuses.")
606  for job in self._collector_jobs_collector_jobs.values():
607  job.update_status()
608  self._collector_timing_collector_timing["last_update"] = time.time()
609  if job.subjobs:
610  num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
611  total_subjobs = len(job.subjobs)
612  B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
613  f" Calibration {self.calibration.name} Job {job.name}.")
614  return all([job.ready() for job in self._collector_jobs_collector_jobs.values()])
615 
617  """
618  """
619  self.calibrationcalibration.jobs_to_submit.extend(list(self._collector_jobs_collector_jobs.values()))
620  self._collector_timing_collector_timing["start"] = time.time()
621  self._collector_timing_collector_timing["last_update"] = time.time()
622 
624  """
625  """
626  if self._require_iteration_require_iteration() and self._below_max_iterations_below_max_iterations():
627  return False
628  elif self._require_iteration_require_iteration() and not self._below_max_iterations_below_max_iterations():
629  B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
630  return True
631  elif not self._require_iteration_require_iteration():
632  return True
633 
635  """
636  """
637  iteration_called = False
638  for alg_name, results in self._algorithm_results_algorithm_results[self.iterationiteration].items():
639  for result in results:
640  if result.result == CalibrationAlgorithm.c_Iterate:
641  iteration_called = True
642  break
643  if iteration_called:
644  break
645  return iteration_called
646 
647  def _log_new_state(self, **kwargs):
648  """
649  """
650  B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
651 
653  """
654  Condition function to check that the dependencies of our calibration are in the 'completed' state.
655  Technically only need to check explicit dependencies.
656  """
657  for calibration in self.calibrationcalibration.dependencies:
658  if not calibration.state == calibration.end_state:
659  return False
660  else:
661  return True
662 
664  """
665  Automatically try all transitions out of this state once. Tries fail last.
666  """
667  possible_transitions = self.get_transitionsget_transitions(self.statestatestatestate)
668  for transition in possible_transitions:
669  try:
670  if transition != "fail":
671  getattr(self, transition)()
672  break
673  except ConditionError:
674  continue
675  else:
676  if "fail" in possible_transitions:
677  getattr(self, "fail")()
678  else:
679  raise MachineError(f"Failed to automatically transition out of {self.state} state.")
680 
681  def _make_output_dir(self):
682  """
683  Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
684  Also creates s
685  """
686  create_directories(self.root_dirroot_dir, overwrite=False)
687 
688  def _make_collector_path(self, name, collection):
689  """
690  Creates a basf2 path for the correct collector and serializes it in the
691  self.output_dir/<calibration_name>/<iteration>/paths directory
692  """
693  path_output_dir = self.root_dirroot_dir.joinpath(str(self.iterationiteration), self.collector_input_dircollector_input_dir, name)
694  # Automatically overwrite any previous directory
695  create_directories(path_output_dir)
696  path_file_name = collection.collector.name() + '.path'
697  path_file_name = path_output_dir / path_file_name
698  # Create empty path and add collector to it
699  coll_path = create_path()
700  coll_path.add_module(collection.collector)
701  # Dump the basf2 path to file
702  with open(path_file_name, 'bw') as serialized_path_file:
703  pickle.dump(serialize_path(coll_path), serialized_path_file)
704  # Return the pickle file path for addition to the input sandbox
705  return str(path_file_name.absolute())
706 
707  def _make_pre_collector_path(self, name, collection):
708  """
709  Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
710  self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
711  """
712  path_output_dir = self.root_dirroot_dir.joinpath(str(self.iterationiteration), self.collector_input_dircollector_input_dir, name)
713  coll_path = collection.pre_collector_path
714  path_file_name = 'pre_collector.path'
715  path_file_name = os.path.join(path_output_dir, path_file_name)
716  # Dump the basf2 path to file
717  with open(path_file_name, 'bw') as serialized_path_file:
718  pickle.dump(serialize_path(coll_path), serialized_path_file)
719  # Return the pickle file path for addition to the input sandbox
720  return path_file_name
721 
723  """
724  Creates a Job object for the collections of this iteration, ready for submission
725  to backend.
726  """
727  for collection_name, collection in self.calibrationcalibration.collections.items():
728  iteration_dir = self.root_dirroot_dir.joinpath(str(self.iterationiteration))
729  job = Job('_'.join([self.calibrationcalibration.name, collection_name, 'Iteration', str(self.iterationiteration)]))
730  job.output_dir = iteration_dir.joinpath(self.collector_output_dircollector_output_dir, collection_name)
731  job.working_dir = iteration_dir.joinpath(self.collector_output_dircollector_output_dir, collection_name)
732  # Remove previous failed attempt to avoid problems
733  if job.output_dir.exists():
734  B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
735  f"Deleting {job.output_dir} before re-submitting.")
736  shutil.rmtree(job.output_dir)
737  job.cmd = collection.job_cmd
738  job.append_current_basf2_setup_cmds()
739  job.input_sandbox_files.append(collection.job_script)
740  collector_path_file = Path(self._make_collector_path_make_collector_path(collection_name, collection))
741  job.input_sandbox_files.append(collector_path_file)
742  if collection.pre_collector_path:
743  pre_collector_path_file = Path(self._make_pre_collector_path_make_pre_collector_path(collection_name, collection))
744  job.input_sandbox_files.append(pre_collector_path_file)
745 
746  # Want to figure out which local databases are required for this job and their paths
747  list_dependent_databases = []
748 
749  # Here we add the finished databases of previous calibrations that we depend on.
750  # We can assume that the databases exist as we can't be here until they have returned
751  for dependency in self.calibrationcalibration.dependencies:
752  database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
753  B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
754  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
755 
756  # Add previous iteration databases from this calibration
757  if self.iterationiteration > 0:
758  previous_iteration_dir = self.root_dirroot_dir.joinpath(str(self.iterationiteration - 1))
759  database_dir = os.path.join(previous_iteration_dir, self.calibrationcalibration.alg_output_dir, 'outputdb')
760  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
761  B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
762 
763  # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
764  # collector path
765  input_data_directory = self.root_dirroot_dir.joinpath(str(self.iterationiteration), self.collector_input_dircollector_input_dir, collection_name)
766 
767  # Need to pass setup info to collector which would be tricky as arguments
768  # We make a dictionary and pass it in as json
769  job_config = {}
770  # Apply the user-set Calibration database chain to the base of the overall chain.
771  json_db_chain = []
772  for database in collection.database_chain:
773  if database.db_type == 'local':
774  json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
775  elif database.db_type == 'central':
776  json_db_chain.append(('central', database.global_tag))
777  else:
778  raise ValueError(f"Unknown database type {database.db_type}.")
779  # CAF created ones for dependent calibrations and previous iterations of this calibration
780  for database in list_dependent_databases:
781  json_db_chain.append(('local', database))
782  job_config['database_chain'] = json_db_chain
783 
784  job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
785  with open(job_config_file_path, 'w') as job_config_file:
786  json.dump(job_config, job_config_file, indent=2)
787  job.input_sandbox_files.append(job_config_file_path)
788 
789  # Define the input files
790  input_data_files = set(collection.input_files)
791  # Reduce the input data files to only those that overlap with the optional requested IoV
792  if self.iov_to_calibrateiov_to_calibrate:
793  input_data_files = self.files_containing_iovfiles_containing_iov(input_data_files,
794  collection.files_to_iovs,
795  self.iov_to_calibrateiov_to_calibrate)
796  # Remove any files that ONLY contain runs from our optional ignored_runs list
797  files_to_ignore = set()
798  for exprun in self.calibrationcalibration.ignored_runs:
799  for input_file in input_data_files:
800  file_iov = self.calibrationcalibration.files_to_iovs[input_file]
801  if file_iov == exprun.make_iov():
802  B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
803  f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
804  "is being removed from input files list.")
805  files_to_ignore.add(input_file)
806  input_data_files.difference_update(files_to_ignore)
807 
808  if not input_data_files:
809  raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
810  f" and Collection '{collection_name}'.")
811  job.input_files = list(input_data_files)
812 
813  job.splitter = collection.splitter
814  job.backend_args = collection.backend_args
815  # Output patterns to be returned from collector job
816  job.output_patterns = collection.output_patterns
817  B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
818  self._collector_jobs_collector_jobs[collection_name] = job
819 
820  def _check_valid_collector_output(self):
821  B2INFO("Checking that Collector output exists for all colector jobs "
822  f"using {self.calibration.name}.output_patterns.")
823  if not self._collector_jobs_collector_jobs:
824  B2INFO("We're restarting so we'll recreate the collector Job object.")
825  self._recover_collector_jobs_recover_collector_jobs()
826 
827  for job in self._collector_jobs_collector_jobs.values():
828  if not job.subjobs:
829  output_files = []
830  for pattern in job.output_patterns:
831  output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
832  if not output_files:
833  raise MachineError("No output files from Collector Job")
834  else:
835  for subjob in job.subjobs.values():
836  output_files = []
837  for pattern in subjob.output_patterns:
838  output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
839  if not output_files:
840  raise MachineError(f"No output files from Collector {subjob}")
841 
842  def _run_algorithms(self):
843  """
844  Runs the Calibration Algorithms for this calibration machine.
845 
846  Will run them sequentially locally (possible benefits to using a
847  processing pool for low memory algorithms later on.)
848  """
849  # Get an instance of the Runner for these algorithms and run it
850  algs_runner = self.calibrationcalibration.algorithms_runner(name=self.calibrationcalibration.name)
851  algs_runner.algorithms = self.calibrationcalibration.algorithms
852  algorithm_output_dir = self.root_dirroot_dir.joinpath(str(self.iterationiteration), self.calibrationcalibration.alg_output_dir)
853  output_database_dir = algorithm_output_dir.joinpath("outputdb")
854  # Remove it, if we failed previously, to start clean
855  if algorithm_output_dir.exists():
856  B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
857  f"Deleting and recreating {algorithm_output_dir}.")
858  create_directories(algorithm_output_dir)
859  B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
860  algs_runner.output_database_dir = output_database_dir
861  algs_runner.output_dir = self.root_dirroot_dir.joinpath(str(self.iterationiteration), self.calibrationcalibration.alg_output_dir)
862  input_files = []
863 
864  for job in self._collector_jobs_collector_jobs.values():
865  if job.subjobs:
866  for subjob in job.subjobs.values():
867  for pattern in subjob.output_patterns:
868  input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
869  else:
870  for pattern in job.output_patterns:
871  input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
872 
873  algs_runner.input_files = input_files
874 
875  # Add any user defined database chain for this calibration
876  algs_runner.database_chain = self.calibrationcalibration.database_chain
877 
878  # Here we add the finished databases of previous calibrations that we depend on.
879  # We can assume that the databases exist as we can't be here until they have returned
880  list_dependent_databases = []
881  for dependency in self.calibrationcalibration.dependencies:
882  database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
883  B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
884  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
885 
886  # Add previous iteration databases from this calibration
887  if self.iterationiteration > 0:
888  previous_iteration_dir = self.root_dirroot_dir.joinpath(str(self.iterationiteration - 1))
889  database_dir = os.path.join(previous_iteration_dir, self.calibrationcalibration.alg_output_dir, 'outputdb')
890  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
891  B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
892  algs_runner.dependent_databases = list_dependent_databases
893 
894  algs_runner.ignored_runs = self.calibrationcalibration.ignored_runs
895 
896  try:
897  algs_runner.run(self.iov_to_calibrateiov_to_calibrate, self.iterationiteration)
898  except Exception as err:
899  print(err)
900  # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
901  # results. But here we had an actual exception so we just force into failure instead.
902  self._state_state_state = State("algorithms_failed")
903  self._algorithm_results_algorithm_results[self.iterationiteration] = algs_runner.results
904  self._runner_final_state_runner_final_state = algs_runner.final_state
905 
906  def _prepare_final_db(self):
907  """
908  Take the last iteration's outputdb and copy it to a more easily findable place.
909  """
910  database_location = self.root_dirroot_dir.joinpath(str(self.iterationiteration),
911  self.calibrationcalibration.alg_output_dir,
912  'outputdb')
913  final_database_location = self.root_dirroot_dir.joinpath('outputdb')
914  if final_database_location.exists():
915  B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
916  shutil.rmtree(final_database_location)
917  shutil.copytree(database_location, final_database_location)
918 
919 
921  """
922  A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
923  """
924 
925 
927  required_attrs = ["algorithm",
928  "dependent_databases",
929  "database_chain",
930  "output_dir",
931  "output_database_dir",
932  "input_files"
933  ]
934 
935 
936  required_true_attrs = ["algorithm",
937  "output_dir",
938  "output_database_dir",
939  "input_files"
940  ]
941 
942  def __init__(self, algorithm=None, initial_state="init"):
943  """
944  Takes an Algorithm object from the caf framework and defines the transitions.
945  """
946 
947  self.default_statesdefault_states = [State("init"),
948  State("ready"),
949  State("running_algorithm"),
950  State("completed"),
951  State("failed")]
952 
953  super().__init__(self.default_statesdefault_states, initial_state)
954 
955 
956  self.algorithmalgorithm = algorithm
957 
958  self.input_filesinput_files = []
959 
960  self.dependent_databasesdependent_databases = []
961 
963  self.database_chaindatabase_chain = []
964 
965  self.output_diroutput_dir = ""
966 
967  self.output_database_diroutput_database_dir = ""
968 
969  self.resultresult = None
970 
971  self.add_transitionadd_transition("setup_algorithm", "init", "ready",
972  before=[self._setup_logging_setup_logging,
973  self._change_working_dir_change_working_dir,
974  self._setup_database_chain_setup_database_chain,
975  self._set_input_data_set_input_data,
976  self._pre_algorithm_pre_algorithm])
977  self.add_transitionadd_transition("execute_runs", "ready", "running_algorithm",
978  after=self._execute_over_iov_execute_over_iov)
979  self.add_transitionadd_transition("complete", "running_algorithm", "completed")
980  self.add_transitionadd_transition("fail", "running_algorithm", "failed")
981  self.add_transitionadd_transition("fail", "ready", "failed")
982  self.add_transitionadd_transition("setup_algorithm", "completed", "ready")
983  self.add_transitionadd_transition("setup_algorithm", "failed", "ready")
984 
985  def setup_from_dict(self, params):
986  """
987  Parameters:
988  params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
989  """
990  for attribute_name, value in params.items():
991  setattr(self, attribute_name, value)
992 
993  def is_valid(self):
994  """
995  Returns:
996  bool: Whether or not this machine has been set up correctly with all its necessary attributes.
997  """
998  B2INFO("Checking validity of current setup of AlgorithmMachine for {}.".format(self.algorithmalgorithm.name))
999  # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
1000  for attribute_name in self.required_attrsrequired_attrs:
1001  if not hasattr(self, attribute_name):
1002  B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1003  return False
1004  # Check if any attributes that need actual values haven't been set or were empty
1005  for attribute_name in self.required_true_attrsrequired_true_attrs:
1006  if not getattr(self, attribute_name):
1007  B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1008  return False
1009  return True
1010 
1011  def _create_output_dir(self, **kwargs):
1012  """
1013  Create working/output directory of algorithm. Any old directory is overwritten.
1014  """
1015  create_directories(Path(self.output_diroutput_dir), overwrite=True)
1016 
1017  def _setup_database_chain(self, **kwargs):
1018  """
1019  Apply all databases in the correct order.
1020  """
1021  # We deliberately override the normal database ordering because we don't want input files GTs to affect
1022  # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1023  b2conditions.reset()
1024  b2conditions.override_globaltags()
1025 
1026  # Apply all the databases in order, starting with the user-set chain for this Calibration
1027  for database in self.database_chaindatabase_chain:
1028  if database.db_type == 'local':
1029  B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1030  f"for {self.algorithm.name}.")
1031  b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1032  elif database.db_type == 'central':
1033  B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1034  f"for {self.algorithm.name}.")
1035  b2conditions.prepend_globaltag(database.global_tag)
1036  else:
1037  raise ValueError(f"Unknown database type {database.db_type}.")
1038  # Here we add the finished databases of previous calibrations that we depend on.
1039  # We can assume that the databases exist as we can't be here until they have returned
1040  # with OK status.
1041  for filename, directory in self.dependent_databasesdependent_databases:
1042  B2INFO((f"Adding Local Database {filename} to head of chain of local databases created by"
1043  f" a dependent calibration, for {self.algorithm.name}."))
1044  b2conditions.prepend_testing_payloads(filename)
1045 
1046  # Create a directory to store the payloads of this algorithm
1047  create_directories(Path(self.output_database_diroutput_database_dir), overwrite=False)
1048 
1049  # add local database to save payloads
1050  B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1051  # Things have changed. We now need to do the expert settings to create a database directly.
1052  # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1053  b2conditions.expert_settings(save_payloads=str(self.output_database_diroutput_database_dir.joinpath("database.txt")))
1054 
1055  def _setup_logging(self, **kwargs):
1056  """
1057  """
1058  # add logfile for output
1059  log_file = os.path.join(self.output_diroutput_dir, self.algorithmalgorithm.name + '_stdout')
1060  B2INFO(f"Output log file at {log_file}.")
1061  basf2.reset_log()
1062  basf2.set_log_level(basf2.LogLevel.INFO)
1063  basf2.log_to_file(log_file)
1064 
1065  def _change_working_dir(self, **kwargs):
1066  """
1067  """
1068  B2INFO(f"Changing current working directory to {self.output_dir}.")
1069  os.chdir(self.output_diroutput_dir)
1070 
1071  def _pre_algorithm(self, **kwargs):
1072  """
1073  Call the user defined algorithm setup function.
1074  """
1075  B2INFO("Running Pre-Algorithm function (if exists)")
1076  if self.algorithmalgorithm.pre_algorithm:
1077  # We have to re-pass in the algorithm here because an outside user has created this method.
1078  # So the method isn't bound to the instance properly.
1079  self.algorithmalgorithm.pre_algorithm(self.algorithmalgorithm.algorithm, kwargs["iteration"])
1080 
1081  def _execute_over_iov(self, **kwargs):
1082  """
1083  Does the actual execute of the algorithm on an IoV and records the result.
1084  """
1085  B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1086 
1087  runs_to_execute = kwargs["runs"]
1088  iov = kwargs["apply_iov"]
1089  iteration = kwargs["iteration"]
1090  if not iov:
1091  iov = iov_from_runs(runs_to_execute)
1092  B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1093  alg_result = self.algorithmalgorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1094  self.resultresult = IoV_Result(iov, alg_result)
1095 
1096  def _set_input_data(self, **kwargs):
1097  self.algorithmalgorithm.data_input(self.input_filesinput_files)
1098 
1099 
1100 class MachineError(Exception):
1101  """
1102  Base exception class for this module.
1103  """
1104 
1105 
1106 class ConditionError(MachineError):
1107  """
1108  Exception for when conditions fail during a transition.
1109  """
1110 
1111 
1113  """
1114  Exception for when transitions fail.
1115  """
def _setup_database_chain(self, **kwargs)
list required_true_attrs
Attributes that must have a value that returns True when tested.
output_database_dir
The output database directory for the localdb that the algorithm will commit to.
input_files
Collector output files, will contain all files retured by the output patterns.
default_states
Default states for the AlgorithmMachine.
def _change_working_dir(self, **kwargs)
def __init__(self, algorithm=None, initial_state="init")
list required_attrs
Required attributes that must exist before the machine can run properly.
algorithm
Algorithm() object whose state we are modelling.
database_chain
Assigned database chain to the overall Calibration object, or to the 'default' Collection.
result
IoV_Result object for a single execution, will be reset upon a new execution.
def _set_input_data(self, **kwargs)
dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
def _setup_logging(self, **kwargs)
def _execute_over_iov(self, **kwargs)
def _pre_algorithm(self, **kwargs)
output_dir
The algorithm output directory which is mostly used to store the stdout file.
def _create_output_dir(self, **kwargs)
def _log_new_state(self, **kwargs)
root_dir
root directory for this Calibration
def _update_cal_state(self, **kwargs)
_runner_final_state
Final state of the algorithm runner for the current iteration.
default_states
States that are defaults to the CalibrationMachine (could override later)
_algorithm_results
Results of each iteration for all algorithms of this calibration.
iov_to_calibrate
IoV to be executed, currently will loop over all runs in IoV.
def files_containing_iov(self, file_paths, files_to_iovs, iov)
iteration
Allows calibration object to hold a refernce to the machine controlling it.
def _make_collector_path(self, name, collection)
def _make_pre_collector_path(self, name, collection)
_collector_timing
Times of various useful updates to the collector job e.g.
collector_backend
Backend used for this calibration machine collector.
def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0)
_collector_jobs
The collector jobs used for submission.
calibration
Calibration object whose state we are modelling.
def add_state(self, state, enter=None, exit=None)
def get_transitions(self, source)
def _callback(func, **kwargs)
transitions
Allowed transitions between states.
def save_graph(self, filename, graphname)
def get_transition_dict(self, state, transition)
def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None)
def default_condition(**kwargs)
def _trigger(self, transition_name, transition_dict, **kwargs)
def __init__(self, states=None, initial_state="default_initial")
states
Valid states for this machine.
initial_state
Pointless docstring since it's a property.
def state(self, state)
def initial_state(self, state)
_initial_state
Actual attribute holding initial state for this machine.
def __getattr__(self, name, **kwargs)
state
Current State of machine.
_state
Actual attribute holding the Current state.
def __init__(self, name, enter=None, exit=None)
def on_exit(self, callbacks)
def on_enter(self, callbacks)
def _add_callbacks(self, callback, attribute)
name
Name of the State.
on_enter
Callback list when entering state.
def __eq__(self, other)
on_exit
Callback list when exiting state.
def _(self, callbacks, attribute)