Belle II Software  release-05-01-25
state_machines.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 import basf2
5 
6 from functools import partial
7 from collections import defaultdict
8 
9 import pickle
10 import glob
11 import shutil
12 import time
13 from pathlib import Path
14 import os
15 import json
16 
17 from basf2 import create_path
18 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
19 from basf2 import conditions as b2conditions
20 from basf2.pickle_path import serialize_path
21 
22 from ROOT.Belle2 import CalibrationAlgorithm
23 
24 from caf.utils import create_directories
25 from caf.utils import method_dispatch
26 from caf.utils import iov_from_runs
27 from caf.utils import IoV_Result
28 from caf.utils import get_iov_from_file
29 from caf.backends import Job
30 from caf.backends import LSF
31 from caf.backends import PBS
32 from caf.backends import Local
33 from caf.runners import AlgorithmsRunner
34 
35 
36 class State():
37  """
38  Basic State object that can take enter and exit state methods and records
39  the state of a machine.
40 
41  You should assign the self.on_enter or self.on_exit attributes to callback functions
42  or lists of them, if you need them.
43  """
44 
45  def __init__(self, name, enter=None, exit=None):
46  """
47  Initialise State with a name and optional lists of callbacks.
48  """
49 
50  self.name = name
51 
52  self.on_enter = enter
53 
54  self.on_exit = exit
55 
56  @property
57  def on_enter(self):
58  """
59  Runs callbacks when a state is entered.
60  """
61  return self._on_enter
62 
63  @on_enter.setter
64  def on_enter(self, callbacks):
65  """
66  """
67  self._on_enter = []
68  if callbacks:
69  self._add_callbacks(callbacks, self._on_enter)
70 
71  @property
72  def on_exit(self):
73  """
74  Runs callbacks when a state is exited.
75  """
76  return self._on_exit
77 
78  @on_exit.setter
79  def on_exit(self, callbacks):
80  """
81  """
82  self._on_exit = []
83  if callbacks:
84  self._add_callbacks(callbacks, self._on_exit)
85 
86  @method_dispatch
87  def _add_callbacks(self, callback, attribute):
88  """
89  Adds callback to a property.
90  """
91  if callable(callback):
92  attribute.append(callback)
93  else:
94  B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
95 
96  @_add_callbacks.register(tuple)
97  @_add_callbacks.register(list)
98  def _(self, callbacks, attribute):
99  """
100  Alternate method for lists and tuples of function objects.
101  """
102  if callbacks:
103  for function in callbacks:
104  if callable(function):
105  attribute.append(function)
106  else:
107  B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
108 
109  def __str__(self):
110  """
111  """
112  return self.name
113 
114  def __repr__(self):
115  """
116  """
117  return f"State(name={self.name})"
118 
119  def __eq__(self, other):
120  """
121  """
122  if isinstance(other, str):
123  return self.name == other
124  else:
125  return self.name == other.name
126 
127  def __hash__(self):
128  """
129  """
130  return hash(self.name)
131 
132 
133 class Machine():
134  """
135  Parameters:
136  states (list[str]): A list of possible states of the machine.
137  initial_state (str):
138 
139  Base class for a final state machine wrapper.
140  Implements the framwork that a more complex machine can inherit from.
141 
142  The `transitions` attribute is a dictionary of trigger name keys, each value of
143  which is another dictionary of 'source' states, 'dest' states, and 'conditions'
144  methods. 'conditions' should be a list of callables or a single one. A transition is
145  valid if it goes from an allowed state to an allowed state.
146  Conditions are optional but must be a callable that returns True or False based
147  on some state of the machine. They cannot have input arguments currently.
148 
149  Every condition/before/after callback function MUST take ``**kwargs`` as the only
150  argument (except ``self`` if it's a class method). This is because it's basically
151  impossible to determine which arguments to pass to which functions for a transition.
152  Therefore this machine just enforces that every function should simply take ``**kwargs``
153  and use the dictionary of arguments (even if it doesn't need any arguments).
154 
155  This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
156  you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
157  will *not* work.
158  """
159 
160  def __init__(self, states=None, initial_state="default_initial"):
161  """
162  Basic Setup of states and initial_state.
163  """
164 
165  self.states = {}
166  if states:
167  for state in states:
168  self.add_state(state)
169  if initial_state != "default_initial":
170 
171  self.initial_state = initial_state
172  else:
173  self.add_state(initial_state)
174 
175  self._initial_state = State(initial_state)
176 
177 
178  self._state = self.initial_state
179 
180  self.transitions = defaultdict(list)
181 
182  def add_state(self, state, enter=None, exit=None):
183  """
184  Adds a single state to the list of possible ones.
185  Should be a unique string or a State object with a unique name.
186  """
187  if isinstance(state, str):
188  self.add_state(State(state, enter, exit))
189  elif isinstance(state, State):
190  if state.name not in self.states.keys():
191  self.states[state.name] = state
192  else:
193  B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
194  else:
195  B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
196 
197  @property
198  def initial_state(self):
199  """
200  The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
201  """
202  return self._initial_state
203 
204  @initial_state.setter
205  def initial_state(self, state):
206  """
207  """
208  if state in self.states.keys():
209  self._initial_state = self.states[state]
210 
211  self._state = self.states[state]
212  else:
213  raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
214 
215  @property
216  def state(self):
217  """
218  The current state of the machine. Actually a `property` decorator. It will call the exit method of the
219  current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
220  either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
221  """
222  return self._state
223 
224  @state.setter
225  def state(self, state):
226  """
227  """
228  if isinstance(state, str):
229  state_name = state
230  else:
231  state_name = state.name
232 
233  try:
234  state = self.states[state_name]
235  # Run exit callbacks of current state
236  for callback in self.state.on_exit:
237  callback(prior_state=self.state, new_state=state)
238  # Run enter callbacks of new state
239  for callback in state.on_enter:
240  callback(prior_state=self.state, new_state=state)
241  # Set the state
242  self._state = state
243  except KeyError:
244  raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
245 
246  @staticmethod
247  def default_condition(**kwargs):
248  """
249  Method to always return True.
250  """
251  return True
252 
253  def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
254  """
255  Adds a single transition to the dictionary of possible ones.
256  Trigger is the method name that begins the transtion between the
257  source state and the destination state.
258 
259  The condition is an optional function that returns True or False
260  depending on the current state/input.
261  """
262  transition_dict = {}
263  try:
264  source = self.states[source]
265  dest = self.states[dest]
266  transition_dict["source"] = source
267  transition_dict["dest"] = dest
268  except KeyError as err:
269  B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
270  raise err
271  if conditions:
272  if isinstance(conditions, (list, tuple, set)):
273  transition_dict["conditions"] = list(conditions)
274  else:
275  transition_dict["conditions"] = [conditions]
276  else:
277  transition_dict["conditions"] = [Machine.default_condition]
278 
279  if not before:
280  before = []
281  if isinstance(before, (list, tuple, set)):
282  transition_dict["before"] = list(before)
283  else:
284  transition_dict["before"] = [before]
285 
286  if not after:
287  after = []
288  if isinstance(after, (list, tuple, set)):
289  transition_dict["after"] = list(after)
290  else:
291  transition_dict["after"] = [after]
292 
293  self.transitions[trigger].append(transition_dict)
294 
295  def __getattr__(self, name, **kwargs):
296  """
297  Allows us to create a new method for each trigger on the fly.
298  If there is no trigger name in the machine to match, then the normal
299  AttributeError is called.
300  """
301  possible_transitions = self.get_transitions(self.state)
302  if name not in possible_transitions:
303  raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
304  transition_dict = self.get_transition_dict(self.state, name)
305  return partial(self._trigger, name, transition_dict, **kwargs)
306 
307  def _trigger(self, transition_name, transition_dict, **kwargs):
308  """
309  Runs the transition logic. Callbacks are evaluated in the order:
310  conditions -> before -> <new state set here> -> after.
311  """
312  source, dest, conditions, before_callbacks, after_callbacks = (transition_dict["source"],
313  transition_dict["dest"],
314  transition_dict["conditions"],
315  transition_dict["before"],
316  transition_dict["after"])
317  # Returns True only if every condition returns True when called
318  if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
319  for before_func in before_callbacks:
320  self._callback(before_func, **kwargs)
321 
322  self.state = dest
323  for after_func in after_callbacks:
324  self._callback(after_func, **kwargs)
325  else:
326  raise ConditionError((f"Transition '{transition_name}' called for but one or more conditions "
327  "evaluated False"))
328 
329  @staticmethod
330  def _callback(func, **kwargs):
331  """
332  Calls a condition/before/after.. function using arguments passed (or not).
333  """
334  return func(**kwargs)
335 
336  def get_transitions(self, source):
337  """
338  Returns allowed transitions from a given state.
339  """
340  possible_transitions = []
341  for transition, transition_dicts in self.transitions.items():
342  for transition_dict in transition_dicts:
343  if transition_dict["source"] == source:
344  possible_transitions.append(transition)
345  return possible_transitions
346 
347  def get_transition_dict(self, state, transition):
348  """
349  Returns the transition dictionary for a state and transition out of it.
350  """
351  transition_dicts = self.transitions[transition]
352  for transition_dict in transition_dicts:
353  if transition_dict["source"] == state:
354  return transition_dict
355  else:
356  raise KeyError(f"No transition from state {state} with the name {transition}.")
357 
358  def save_graph(self, filename, graphname):
359  """
360  Does a simple dot file creation to visualise states and transiitons.
361  """
362  with open(filename, "w") as dotfile:
363  dotfile.write("digraph " + graphname + " {\n")
364  for state in self.states.keys():
365  dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
366  for trigger, transition_dicts in self.transitions.items():
367  for transition in transition_dicts:
368  dotfile.write('"' + transition["source"].name + '" -> "' +
369  transition["dest"].name + '" [label="' + trigger + '"]\n')
370  dotfile.write("}\n")
371 
372 
374  """
375  A state machine to handle `Calibration` objects and the flow of
376  processing for them.
377  """
378 
379  collector_input_dir = 'collector_input'
380  collector_output_dir = 'collector_output'
381  algorithm_output_dir = 'algorithm_output'
382 
383  def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
384  """
385  Takes a Calibration object from the caf framework and lets you
386  set the initial state.
387  """
388 
389  self.default_states = [State("init", enter=[self._update_cal_state,
390  self._log_new_state]),
391  State("running_collector", enter=[self._update_cal_state,
392  self._log_new_state]),
393  State("collector_failed", enter=[self._update_cal_state,
394  self._log_new_state]),
395  State("collector_completed", enter=[self._update_cal_state,
396  self._log_new_state]),
397  State("running_algorithms", enter=[self._update_cal_state,
398  self._log_new_state]),
399  State("algorithms_failed", enter=[self._update_cal_state,
400  self._log_new_state]),
401  State("algorithms_completed", enter=[self._update_cal_state,
402  self._log_new_state]),
403  State("completed", enter=[self._update_cal_state,
404  self._log_new_state]),
405  State("failed", enter=[self._update_cal_state,
406  self._log_new_state])
407  ]
408 
409  super().__init__(self.default_states, initial_state)
410 
411 
412  self.calibration = calibration
413  # Monkey Patching for the win!
414 
415  self.calibration.machine = self
416 
417  self.iteration = iteration
418 
419  self.collector_backend = None
420 
422 
424 
425  self.iov_to_calibrate = iov_to_calibrate
426 
427  self.root_dir = Path(os.getcwd(), calibration.name)
428 
429 
433 
434 
435  self._collector_jobs = {}
436 
437  self.add_transition("submit_collector", "init", "running_collector",
438  conditions=self.dependencies_completed,
439  before=[self._make_output_dir,
440  self._resolve_file_paths,
441  self._build_iov_dicts,
443  self._submit_collections,
444  self._dump_job_config])
445  self.add_transition("fail", "running_collector", "collector_failed",
446  conditions=self._collection_failed)
447  self.add_transition("complete", "running_collector", "collector_completed",
448  conditions=self._collection_completed)
449  self.add_transition("run_algorithms", "collector_completed", "running_algorithms",
450  before=self._check_valid_collector_output,
451  after=[self._run_algorithms,
452  self.automatic_transition])
453  self.add_transition("complete", "running_algorithms", "algorithms_completed",
454  after=self.automatic_transition,
455  conditions=self._runner_not_failed)
456  self.add_transition("fail", "running_algorithms", "algorithms_failed",
457  conditions=self._runner_failed)
458  self.add_transition("iterate", "algorithms_completed", "init",
459  conditions=[self._require_iteration,
460  self._below_max_iterations],
461  after=self._increment_iteration)
462  self.add_transition("finish", "algorithms_completed", "completed",
463  conditions=self._no_require_iteration,
464  before=self._prepare_final_db)
465  self.add_transition("fail_fully", "algorithms_failed", "failed")
466  self.add_transition("fail_fully", "collector_failed", "failed")
467 
468  def _update_cal_state(self, **kwargs):
469  self.calibration.state = str(kwargs["new_state"])
470 
471  def files_containing_iov(self, file_paths, files_to_iovs, iov):
472  """
473  Lookup function that returns all files from the file_paths that
474  overlap with this IoV.
475  """
476  # Files that contain an Exp,Run range that overlaps with given IoV
477  overlapping_files = set()
478 
479  for file_path, file_iov in files_to_iovs.items():
480  if file_iov.overlaps(iov) and (file_path in file_paths):
481  overlapping_files.add(file_path)
482  return overlapping_files
483 
484  def _dump_job_config(self):
485  """
486  Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
487  later in case of failure.
488  """
489  # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
490  # submits the jobs this might need to wait a while.
491  while any(map(lambda j: j.status == "init", self._collector_jobs.values())):
492  B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
493  time.sleep(5)
494 
495  for collection_name, job in self._collector_jobs.items():
496  collector_job_output_file_name = self.calibration.collections[collection_name].job_config
497  output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
498  collection_name, collector_job_output_file_name)
499  job.dump_to_json(output_file)
500 
502  """
503  Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
504  """
505  for collection_name, collection in self.calibration.collections.items():
506  output_file = self.root_dir.joinpath(str(self.iteration),
507  self.collector_input_dir,
508  collection_name,
509  collection.job_config)
510  self._collector_jobs[collection_name] = Job.from_json(output_file)
511 
512  def _iov_requested(self):
513  """
514  """
515  if self.iov_to_calibrate:
516  B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
517  return True
518  else:
519  B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
520  return False
521 
523  """
524  """
525  pass
526 
527  def _build_iov_dicts(self):
528  """
529  Build IoV file dictionary for each collection if required.
530  """
531  iov_requested = self._iov_requested()
532  if iov_requested or self.calibration.ignored_runs:
533  for coll_name, collection in self.calibration.collections.items():
534  if not collection.files_to_iovs:
535  B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
536  f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
537  " Filling dictionary from input file metadata."
538  " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
539 
540  files_to_iovs = {}
541  for file_path in collection.input_files:
542  files_to_iovs[file_path] = get_iov_from_file(file_path)
543  collection.files_to_iovs = files_to_iovs
544  else:
545  B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
546  f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
547  else:
548  B2INFO("No File to IoV mapping required.")
549 
551  """
552  """
553  return self.iteration < self.calibration.max_iterations
554 
556  """
557  """
558  self.iteration += 1
559  self.calibration.iteration = self.iteration
560 
562  """
563  Did all the collections succeed?
564  """
565  B2DEBUG(29, "Checking for failed collector job.")
566  if self._collector_jobs_ready():
567  return all([job.status == "completed" for job in self._collector_jobs.values()])
568 
570  """
571  Did any of the collections fail?
572  """
573  B2DEBUG(29, "Checking for failed collector job.")
574  if self._collector_jobs_ready():
575  return any([job.status == "failed" for job in self._collector_jobs.values()])
576 
578  """
579  Returns:
580  bool: If AlgorithmsRunner succeeded return True.
581  """
582  return not self._runner_failed()
583 
584  def _runner_failed(self):
585  """
586  Returns:
587  bool: If AlgorithmsRunner failed return True.
588  """
589  if self._runner_final_state == AlgorithmsRunner.FAILED:
590  return True
591  else:
592  return False
593 
595  """
596  """
597  since_last_update = time.time() - self._collector_timing["last_update"]
598  if since_last_update > self.calibration.collector_full_update_interval:
599  B2INFO("Updating full collector job statuses.")
600  for job in self._collector_jobs.values():
601  job.update_status()
602  self._collector_timing["last_update"] = time.time()
603  if job.subjobs:
604  num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
605  total_subjobs = len(job.subjobs)
606  B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
607  f" Calibration {self.calibration.name} Job {job.name}.")
608  return all([job.ready() for job in self._collector_jobs.values()])
609 
611  """
612  """
613  self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
614  self._collector_timing["start"] = time.time()
615  self._collector_timing["last_update"] = time.time()
616 
618  """
619  """
620  if self._require_iteration() and self._below_max_iterations():
621  return False
622  elif self._require_iteration() and not self._below_max_iterations():
623  B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
624  return True
625  elif not self._require_iteration():
626  return True
627 
629  """
630  """
631  iteration_called = False
632  for alg_name, results in self._algorithm_results[self.iteration].items():
633  for result in results:
634  if result.result == CalibrationAlgorithm.c_Iterate:
635  iteration_called = True
636  break
637  if iteration_called:
638  break
639  return iteration_called
640 
641  def _log_new_state(self, **kwargs):
642  """
643  """
644  B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
645 
647  """
648  Condition function to check that the dependencies of our calibration are in the 'completed' state.
649  Technically only need to check explicit dependencies.
650  """
651  for calibration in self.calibration.dependencies:
652  if not calibration.state == calibration.end_state:
653  return False
654  else:
655  return True
656 
658  """
659  Automatically try all transitions out of this state once. Tries fail last.
660  """
661  possible_transitions = self.get_transitions(self.state)
662  for transition in possible_transitions:
663  try:
664  if transition != "fail":
665  getattr(self, transition)()
666  break
667  except ConditionError:
668  continue
669  else:
670  if "fail" in possible_transitions:
671  getattr(self, "fail")()
672  else:
673  raise MachineError(f"Failed to automatically transition out of {self.state} state.")
674 
675  def _make_output_dir(self):
676  """
677  Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
678  Also creates s
679  """
680  create_directories(self.root_dir, overwrite=False)
681 
682  def _make_collector_path(self, name, collection):
683  """
684  Creates a basf2 path for the correct collector and serializes it in the
685  self.output_dir/<calibration_name>/<iteration>/paths directory
686  """
687  path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
688  # Automatically overwrite any previous directory
689  create_directories(path_output_dir)
690  path_file_name = collection.collector.name() + '.path'
691  path_file_name = path_output_dir / path_file_name
692  # Create empty path and add collector to it
693  coll_path = create_path()
694  coll_path.add_module(collection.collector)
695  # Dump the basf2 path to file
696  with open(path_file_name, 'bw') as serialized_path_file:
697  pickle.dump(serialize_path(coll_path), serialized_path_file)
698  # Return the pickle file path for addition to the input sandbox
699  return str(path_file_name.absolute())
700 
701  def _make_pre_collector_path(self, name, collection):
702  """
703  Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
704  self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
705  """
706  path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
707  coll_path = collection.pre_collector_path
708  path_file_name = 'pre_collector.path'
709  path_file_name = os.path.join(path_output_dir, path_file_name)
710  # Dump the basf2 path to file
711  with open(path_file_name, 'bw') as serialized_path_file:
712  pickle.dump(serialize_path(coll_path), serialized_path_file)
713  # Return the pickle file path for addition to the input sandbox
714  return path_file_name
715 
717  """
718  Creates a Job object for the collections of this iteration, ready for submission
719  to backend.
720  """
721  for collection_name, collection in self.calibration.collections.items():
722  iteration_dir = self.root_dir.joinpath(str(self.iteration))
723  job = Job('_'.join([self.calibration.name, collection_name, 'Iteration', str(self.iteration)]))
724  job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
725  job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
726  # Remove previous failed attempt to avoid problems
727  if job.output_dir.exists():
728  B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
729  f"Deleting {job.output_dir} before re-submitting.")
730  shutil.rmtree(job.output_dir)
731  job.cmd = collection.job_cmd
732  job.append_current_basf2_setup_cmds()
733  job.input_sandbox_files.append(collection.job_script)
734  collector_path_file = Path(self._make_collector_path(collection_name, collection))
735  job.input_sandbox_files.append(collector_path_file)
736  if collection.pre_collector_path:
737  pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
738  job.input_sandbox_files.append(pre_collector_path_file)
739 
740  # Want to figure out which local databases are required for this job and their paths
741  list_dependent_databases = []
742 
743  # Here we add the finished databases of previous calibrations that we depend on.
744  # We can assume that the databases exist as we can't be here until they have returned
745  for dependency in self.calibration.dependencies:
746  database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
747  B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
748  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
749 
750  # Add previous iteration databases from this calibration
751  if self.iteration > 0:
752  previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
753  database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
754  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
755  B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
756 
757  # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
758  # collector path
759  input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
760 
761  # Need to pass setup info to collector which would be tricky as arguments
762  # We make a dictionary and pass it in as json
763  job_config = {}
764  # Apply the user-set Calibration database chain to the base of the overall chain.
765  json_db_chain = []
766  for database in collection.database_chain:
767  if database.db_type == 'local':
768  json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
769  elif database.db_type == 'central':
770  json_db_chain.append(('central', database.global_tag))
771  else:
772  raise ValueError(f"Unknown database type {database.db_type}.")
773  # CAF created ones for dependent calibrations and previous iterations of this calibration
774  for database in list_dependent_databases:
775  json_db_chain.append(('local', database))
776  job_config['database_chain'] = json_db_chain
777 
778  job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
779  with open(job_config_file_path, 'w') as job_config_file:
780  json.dump(job_config, job_config_file, indent=2)
781  job.input_sandbox_files.append(job_config_file_path)
782 
783  # Define the input files
784  input_data_files = set(collection.input_files)
785  # Reduce the input data files to only those that overlap with the optional requested IoV
786  if self.iov_to_calibrate:
787  input_data_files = self.files_containing_iov(input_data_files,
788  collection.files_to_iovs,
789  self.iov_to_calibrate)
790  # Remove any files that ONLY contain runs from our optional ignored_runs list
791  files_to_ignore = set()
792  for exprun in self.calibration.ignored_runs:
793  for input_file in input_data_files:
794  file_iov = self.calibration.files_to_iovs[input_file]
795  if file_iov == exprun.make_iov():
796  B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
797  f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
798  "is being removed from input files list.")
799  files_to_ignore.add(input_file)
800  input_data_files.difference_update(files_to_ignore)
801 
802  if not input_data_files:
803  raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
804  f" and Collection '{collection_name}'.")
805  job.input_files = list(input_data_files)
806 
807  job.splitter = collection.splitter
808  job.backend_args = collection.backend_args
809  # Output patterns to be returned from collector job
810  job.output_patterns = collection.output_patterns
811  B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
812  self._collector_jobs[collection_name] = job
813 
814  def _check_valid_collector_output(self):
815  B2INFO("Checking that Collector output exists for all colector jobs "
816  f"using {self.calibration.name}.output_patterns.")
817  if not self._collector_jobs:
818  B2INFO("We're restarting so we'll recreate the collector Job object.")
820 
821  for job in self._collector_jobs.values():
822  if not job.subjobs:
823  output_files = []
824  for pattern in job.output_patterns:
825  output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
826  if not output_files:
827  raise MachineError("No output files from Collector Job")
828  else:
829  for subjob in job.subjobs.values():
830  output_files = []
831  for pattern in subjob.output_patterns:
832  output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
833  if not output_files:
834  raise MachineError(f"No output files from Collector {subjob}")
835 
836  def _run_algorithms(self):
837  """
838  Runs the Calibration Algorithms for this calibration machine.
839 
840  Will run them sequentially locally (possible benefits to using a
841  processing pool for low memory algorithms later on.)
842  """
843  # Get an instance of the Runner for these algorithms and run it
844  algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
845  algs_runner.algorithms = self.calibration.algorithms
846  algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
847  output_database_dir = algorithm_output_dir.joinpath("outputdb")
848  # Remove it, if we failed previously, to start clean
849  if algorithm_output_dir.exists():
850  B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
851  f"Deleting and recreating {algorithm_output_dir}.")
852  create_directories(algorithm_output_dir)
853  B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
854  algs_runner.output_database_dir = output_database_dir
855  algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
856  input_files = []
857 
858  for job in self._collector_jobs.values():
859  if job.subjobs:
860  for subjob in job.subjobs.values():
861  for pattern in subjob.output_patterns:
862  input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
863  else:
864  for pattern in job.output_patterns:
865  input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
866 
867  algs_runner.input_files = input_files
868 
869  # Add any user defined database chain for this calibration
870  algs_runner.database_chain = self.calibration.database_chain
871 
872  # Here we add the finished databases of previous calibrations that we depend on.
873  # We can assume that the databases exist as we can't be here until they have returned
874  list_dependent_databases = []
875  for dependency in self.calibration.dependencies:
876  database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
877  B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
878  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
879 
880  # Add previous iteration databases from this calibration
881  if self.iteration > 0:
882  previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
883  database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
884  list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
885  B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
886  algs_runner.dependent_databases = list_dependent_databases
887 
888  algs_runner.ignored_runs = self.calibration.ignored_runs
889 
890  try:
891  algs_runner.run(self.iov_to_calibrate, self.iteration)
892  except Exception as err:
893  print(err)
894  # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
895  # results. But here we had an actual exception so we just force into failure instead.
896  self._state = State("algorithms_failed")
897  self._algorithm_results[self.iteration] = algs_runner.results
898  self._runner_final_state = algs_runner.final_state
899 
900  def _prepare_final_db(self):
901  """
902  Take the last iteration's outputdb and copy it to a more easily findable place.
903  """
904  database_location = self.root_dir.joinpath(str(self.iteration),
905  self.calibration.alg_output_dir,
906  'outputdb')
907  final_database_location = self.root_dir.joinpath('outputdb')
908  if final_database_location.exists():
909  B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
910  shutil.rmtree(final_database_location)
911  shutil.copytree(database_location, final_database_location)
912 
913 
915  """
916  A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
917  """
918 
919 
921  required_attrs = ["algorithm",
922  "dependent_databases",
923  "database_chain",
924  "output_dir",
925  "output_database_dir",
926  "input_files"
927  ]
928 
929 
930  required_true_attrs = ["algorithm",
931  "output_dir",
932  "output_database_dir",
933  "input_files"
934  ]
935 
936  def __init__(self, algorithm=None, initial_state="init"):
937  """
938  Takes an Algorithm object from the caf framework and defines the transitions.
939  """
940 
941  self.default_states = [State("init"),
942  State("ready"),
943  State("running_algorithm"),
944  State("completed"),
945  State("failed")]
946 
947  super().__init__(self.default_states, initial_state)
948 
949 
950  self.algorithm = algorithm
951 
952  self.input_files = []
953 
955 
957  self.database_chain = []
958 
959  self.output_dir = ""
960 
962 
963  self.result = None
964 
965  self.add_transition("setup_algorithm", "init", "ready",
966  before=[self._setup_logging,
967  self._change_working_dir,
969  self._set_input_data,
970  self._pre_algorithm])
971  self.add_transition("execute_runs", "ready", "running_algorithm",
972  after=self._execute_over_iov)
973  self.add_transition("complete", "running_algorithm", "completed")
974  self.add_transition("fail", "running_algorithm", "failed")
975  self.add_transition("fail", "ready", "failed")
976  self.add_transition("setup_algorithm", "completed", "ready")
977  self.add_transition("setup_algorithm", "failed", "ready")
978 
979  def setup_from_dict(self, params):
980  """
981  Parameters:
982  params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
983  """
984  for attribute_name, value in params.items():
985  setattr(self, attribute_name, value)
986 
987  def is_valid(self):
988  """
989  Returns:
990  bool: Whether or not this machine has been set up correctly with all its necessary attributes.
991  """
992  B2INFO("Checking validity of current setup of AlgorithmMachine for {}.".format(self.algorithm.name))
993  # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
994  for attribute_name in self.required_attrs:
995  if not hasattr(self, attribute_name):
996  B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
997  return False
998  # Check if any attributes that need actual values haven't been set or were empty
999  for attribute_name in self.required_true_attrs:
1000  if not getattr(self, attribute_name):
1001  B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1002  return False
1003  return True
1004 
1005  def _create_output_dir(self, **kwargs):
1006  """
1007  Create working/output directory of algorithm. Any old directory is overwritten.
1008  """
1009  create_directories(Path(self.output_dir), overwrite=True)
1010 
1011  def _setup_database_chain(self, **kwargs):
1012  """
1013  Apply all databases in the correct order.
1014  """
1015  # We deliberately override the normal database ordering because we don't want input files GTs to affect
1016  # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1017  b2conditions.reset()
1018  b2conditions.override_globaltags()
1019 
1020  # Apply all the databases in order, starting with the user-set chain for this Calibration
1021  for database in self.database_chain:
1022  if database.db_type == 'local':
1023  B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1024  f"for {self.algorithm.name}.")
1025  b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1026  elif database.db_type == 'central':
1027  B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1028  f"for {self.algorithm.name}.")
1029  b2conditions.prepend_globaltag(database.global_tag)
1030  else:
1031  raise ValueError(f"Unknown database type {database.db_type}.")
1032  # Here we add the finished databases of previous calibrations that we depend on.
1033  # We can assume that the databases exist as we can't be here until they have returned
1034  # with OK status.
1035  for filename, directory in self.dependent_databases:
1036  B2INFO((f"Adding Local Database {filename} to head of chain of local databases created by"
1037  f" a dependent calibration, for {self.algorithm.name}."))
1038  b2conditions.prepend_testing_payloads(filename)
1039 
1040  # Create a directory to store the payloads of this algorithm
1041  create_directories(Path(self.output_database_dir), overwrite=False)
1042 
1043  # add local database to save payloads
1044  B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1045  # Things have changed. We now need to do the expert settings to create a database directly.
1046  # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1047  b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath("database.txt")))
1048 
1049  def _setup_logging(self, **kwargs):
1050  """
1051  """
1052  # add logfile for output
1053  log_file = os.path.join(self.output_dir, self.algorithm.name + '_stdout')
1054  B2INFO(f"Output log file at {log_file}.")
1055  basf2.reset_log()
1056  basf2.set_log_level(basf2.LogLevel.INFO)
1057  basf2.log_to_file(log_file)
1058 
1059  def _change_working_dir(self, **kwargs):
1060  """
1061  """
1062  B2INFO(f"Changing current working directory to {self.output_dir}.")
1063  os.chdir(self.output_dir)
1064 
1065  def _pre_algorithm(self, **kwargs):
1066  """
1067  Call the user defined algorithm setup function.
1068  """
1069  B2INFO("Running Pre-Algorithm function (if exists)")
1070  if self.algorithm.pre_algorithm:
1071  # We have to re-pass in the algorithm here because an outside user has created this method.
1072  # So the method isn't bound to the instance properly.
1073  self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs["iteration"])
1074 
1075  def _execute_over_iov(self, **kwargs):
1076  """
1077  Does the actual execute of the algorithm on an IoV and records the result.
1078  """
1079  B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1080 
1081  runs_to_execute = kwargs["runs"]
1082  iov = kwargs["apply_iov"]
1083  iteration = kwargs["iteration"]
1084  if not iov:
1085  iov = iov_from_runs(runs_to_execute)
1086  B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1087  alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1088  self.result = IoV_Result(iov, alg_result)
1089 
1090  def _set_input_data(self, **kwargs):
1091  self.algorithm.data_input(self.input_files)
1092 
1093 
1094 class MachineError(Exception):
1095  """
1096  Base exception class for this module.
1097  """
1098 
1099 
1100 class ConditionError(MachineError):
1101  """
1102  Exception for when conditions fail during a transition.
1103  """
1104 
1105 
1107  """
1108  Exception for when transitions fail.
1109  """
state_machines.CalibrationMachine._resolve_file_paths
def _resolve_file_paths(self)
Definition: state_machines.py:522
state_machines.AlgorithmMachine.output_dir
output_dir
The algorithm output directory which is mostly used to store the stdout file.
Definition: state_machines.py:959
state_machines.CalibrationMachine._collector_timing
_collector_timing
Times of various useful updates to the collector job e.g.
Definition: state_machines.py:432
state_machines.Machine.states
states
Valid states for this machine.
Definition: state_machines.py:165
state_machines.AlgorithmMachine.dependent_databases
dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
Definition: state_machines.py:954
state_machines.CalibrationMachine._make_output_dir
def _make_output_dir(self)
Definition: state_machines.py:675
state_machines.CalibrationMachine._run_algorithms
def _run_algorithms(self)
Definition: state_machines.py:836
state_machines.CalibrationMachine._dump_job_config
def _dump_job_config(self)
Definition: state_machines.py:484
state_machines.CalibrationMachine._runner_not_failed
def _runner_not_failed(self)
Definition: state_machines.py:577
state_machines.CalibrationMachine.collector_output_dir
string collector_output_dir
Definition: state_machines.py:380
state_machines.AlgorithmMachine._setup_logging
def _setup_logging(self, **kwargs)
Definition: state_machines.py:1049
state_machines.CalibrationMachine.collector_input_dir
string collector_input_dir
Definition: state_machines.py:379
state_machines.Machine._state
_state
Actual attribute holding the Current state.
Definition: state_machines.py:178
state_machines.State.__repr__
def __repr__(self)
Definition: state_machines.py:114
state_machines.CalibrationMachine._create_collector_jobs
def _create_collector_jobs(self)
Definition: state_machines.py:716
state_machines.Machine.add_transition
def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None)
Definition: state_machines.py:253
state_machines.State.name
name
Name of the State.
Definition: state_machines.py:50
state_machines.CalibrationMachine.dependencies_completed
def dependencies_completed(self)
Definition: state_machines.py:646
state_machines.CalibrationMachine._recover_collector_jobs
def _recover_collector_jobs(self)
Definition: state_machines.py:501
state_machines.State.__hash__
def __hash__(self)
Definition: state_machines.py:127
state_machines.Machine._initial_state
_initial_state
Actual attribute holding initial state for this machine.
Definition: state_machines.py:175
state_machines.AlgorithmMachine.required_true_attrs
list required_true_attrs
Attributes that must have a value that returns True when tested.
Definition: state_machines.py:930
state_machines.CalibrationMachine._prepare_final_db
def _prepare_final_db(self)
Definition: state_machines.py:900
state_machines.CalibrationMachine._collector_jobs
_collector_jobs
The collector jobs used for submission.
Definition: state_machines.py:435
state_machines.State.__str__
def __str__(self)
Definition: state_machines.py:109
state_machines.CalibrationMachine._collector_jobs_ready
def _collector_jobs_ready(self)
Definition: state_machines.py:594
state_machines.AlgorithmMachine.__init__
def __init__(self, algorithm=None, initial_state="init")
Definition: state_machines.py:936
state_machines.Machine.save_graph
def save_graph(self, filename, graphname)
Definition: state_machines.py:358
state_machines.CalibrationMachine
Definition: state_machines.py:373
state_machines.AlgorithmMachine._pre_algorithm
def _pre_algorithm(self, **kwargs)
Definition: state_machines.py:1065
state_machines.Machine.add_state
def add_state(self, state, enter=None, exit=None)
Definition: state_machines.py:182
state_machines.AlgorithmMachine._change_working_dir
def _change_working_dir(self, **kwargs)
Definition: state_machines.py:1059
state_machines.CalibrationMachine._below_max_iterations
def _below_max_iterations(self)
Definition: state_machines.py:550
state_machines.Machine._trigger
def _trigger(self, transition_name, transition_dict, **kwargs)
Definition: state_machines.py:307
state_machines.CalibrationMachine.collector_backend
collector_backend
Backend used for this calibration machine collector.
Definition: state_machines.py:419
state_machines.CalibrationMachine._require_iteration
def _require_iteration(self)
Definition: state_machines.py:628
basf2.pickle_path
Definition: pickle_path.py:1
state_machines.CalibrationMachine._no_require_iteration
def _no_require_iteration(self)
Definition: state_machines.py:617
state_machines.CalibrationMachine.files_containing_iov
def files_containing_iov(self, file_paths, files_to_iovs, iov)
Definition: state_machines.py:471
state_machines.CalibrationMachine._update_cal_state
def _update_cal_state(self, **kwargs)
Definition: state_machines.py:468
state_machines.Machine
Definition: state_machines.py:133
state_machines.CalibrationMachine._collection_failed
def _collection_failed(self)
Definition: state_machines.py:569
state_machines.State.__eq__
def __eq__(self, other)
Definition: state_machines.py:119
state_machines.Machine.get_transitions
def get_transitions(self, source)
Definition: state_machines.py:336
state_machines.CalibrationMachine._make_pre_collector_path
def _make_pre_collector_path(self, name, collection)
Definition: state_machines.py:701
state_machines.AlgorithmMachine.input_files
input_files
Collector output files, will contain all files retured by the output patterns.
Definition: state_machines.py:952
state_machines.CalibrationMachine.iteration
iteration
Allows calibration object to hold a refernce to the machine controlling it.
Definition: state_machines.py:417
state_machines.CalibrationMachine._iov_requested
def _iov_requested(self)
Definition: state_machines.py:512
state_machines.ConditionError
Definition: state_machines.py:1100
state_machines.Machine._callback
def _callback(func, **kwargs)
Definition: state_machines.py:330
state_machines.CalibrationMachine._increment_iteration
def _increment_iteration(self)
Definition: state_machines.py:555
state_machines.AlgorithmMachine.is_valid
def is_valid(self)
Definition: state_machines.py:987
state_machines.AlgorithmMachine._set_input_data
def _set_input_data(self, **kwargs)
Definition: state_machines.py:1090
state_machines.State._on_exit
_on_exit
Definition: state_machines.py:82
state_machines.CalibrationMachine._check_valid_collector_output
def _check_valid_collector_output(self)
Definition: state_machines.py:814
state_machines.CalibrationMachine._runner_failed
def _runner_failed(self)
Definition: state_machines.py:584
state_machines.CalibrationMachine._build_iov_dicts
def _build_iov_dicts(self)
Definition: state_machines.py:527
state_machines.Machine.default_condition
def default_condition(**kwargs)
Definition: state_machines.py:247
state_machines.AlgorithmMachine.required_attrs
list required_attrs
Required attributes that must exist before the machine can run properly.
Definition: state_machines.py:921
state_machines.CalibrationMachine._collection_completed
def _collection_completed(self)
Definition: state_machines.py:561
state_machines.State
Definition: state_machines.py:36
state_machines.State.on_exit
on_exit
Callback list when exiting state.
Definition: state_machines.py:54
state_machines.AlgorithmMachine._create_output_dir
def _create_output_dir(self, **kwargs)
Definition: state_machines.py:1005
state_machines.AlgorithmMachine._execute_over_iov
def _execute_over_iov(self, **kwargs)
Definition: state_machines.py:1075
state_machines.CalibrationMachine.calibration
calibration
Calibration object whose state we are modelling.
Definition: state_machines.py:412
state_machines.State._add_callbacks
def _add_callbacks(self, callback, attribute)
Definition: state_machines.py:87
state_machines.CalibrationMachine.automatic_transition
def automatic_transition(self)
Definition: state_machines.py:657
state_machines.AlgorithmMachine._setup_database_chain
def _setup_database_chain(self, **kwargs)
Definition: state_machines.py:1011
state_machines.CalibrationMachine.default_states
default_states
States that are defaults to the CalibrationMachine (could override later)
Definition: state_machines.py:389
state_machines.CalibrationMachine._log_new_state
def _log_new_state(self, **kwargs)
Definition: state_machines.py:641
state_machines.Machine.transitions
transitions
Allowed transitions between states.
Definition: state_machines.py:180
state_machines.CalibrationMachine.__init__
def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0)
Definition: state_machines.py:383
state_machines.MachineError
Definition: state_machines.py:1094
state_machines.Machine.__getattr__
def __getattr__(self, name, **kwargs)
Definition: state_machines.py:295
state_machines.AlgorithmMachine.output_database_dir
output_database_dir
The output database directory for the localdb that the algorithm will commit to.
Definition: state_machines.py:961
state_machines.AlgorithmMachine.result
result
IoV_Result object for a single execution, will be reset upon a new execution.
Definition: state_machines.py:963
state_machines.TransitionError
Definition: state_machines.py:1106
state_machines.CalibrationMachine._algorithm_results
_algorithm_results
Results of each iteration for all algorithms of this calibration.
Definition: state_machines.py:421
state_machines.State._
def _(self, callbacks, attribute)
Definition: state_machines.py:98
state_machines.AlgorithmMachine
Definition: state_machines.py:914
state_machines.Machine.get_transition_dict
def get_transition_dict(self, state, transition)
Definition: state_machines.py:347
state_machines.AlgorithmMachine.database_chain
database_chain
Assigned database chain to the overall Calibration object, or to the 'default' Collection.
Definition: state_machines.py:957
state_machines.CalibrationMachine._runner_final_state
_runner_final_state
Final state of the algorithm runner for the current iteration.
Definition: state_machines.py:423
state_machines.Machine.__init__
def __init__(self, states=None, initial_state="default_initial")
Definition: state_machines.py:160
state_machines.CalibrationMachine._submit_collections
def _submit_collections(self)
Definition: state_machines.py:610
state_machines.State.on_enter
on_enter
Callback list when entering state.
Definition: state_machines.py:52
state_machines.AlgorithmMachine.setup_from_dict
def setup_from_dict(self, params)
Definition: state_machines.py:979
state_machines.AlgorithmMachine.algorithm
algorithm
Algorithm() object whose state we are modelling.
Definition: state_machines.py:950
state_machines.CalibrationMachine.root_dir
root_dir
root directory for this Calibration
Definition: state_machines.py:427
state_machines.CalibrationMachine.iov_to_calibrate
iov_to_calibrate
IoV to be executed, currently will loop over all runs in IoV.
Definition: state_machines.py:425
state_machines.State.__init__
def __init__(self, name, enter=None, exit=None)
Definition: state_machines.py:45
state_machines.AlgorithmMachine.default_states
default_states
Default states for the AlgorithmMachine.
Definition: state_machines.py:941
state_machines.Machine.initial_state
initial_state
Pointless docstring since it's a property.
Definition: state_machines.py:171
state_machines.Machine.state
state
Current State of machine.
Definition: state_machines.py:322
state_machines.State._on_enter
_on_enter
Definition: state_machines.py:67
state_machines.CalibrationMachine._make_collector_path
def _make_collector_path(self, name, collection)
Definition: state_machines.py:682