14 from functools
import partial
15 from collections
import defaultdict
21 from pathlib
import Path
25 from basf2
import create_path
26 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
27 from basf2
import conditions
as b2conditions
30 from ROOT.Belle2
import CalibrationAlgorithm
32 from caf.utils
import create_directories
33 from caf.utils
import method_dispatch
34 from caf.utils
import iov_from_runs
35 from caf.utils
import IoV_Result
36 from caf.utils
import get_iov_from_file
37 from caf.backends
import Job
38 from caf.runners
import AlgorithmsRunner
43 Basic State object that can take enter and exit state methods and records
44 the state of a machine.
46 You should assign the self.on_enter or self.on_exit attributes to callback functions
47 or lists of them, if you need them.
50 def __init__(self, name, enter=None, exit=None):
52 Initialise State with a name and optional lists of callbacks.
64 Runs callbacks when a state is entered.
79 Runs callbacks when a state is exited.
94 Adds callback to a property.
96 if callable(callback):
97 attribute.append(callback)
99 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
101 @_add_callbacks.register(tuple)
102 @_add_callbacks.register(list)
103 def _(self, callbacks, attribute):
105 Alternate method for lists and tuples of function objects.
108 for function
in callbacks:
109 if callable(function):
110 attribute.append(function)
112 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
122 return f
"State(name={self.name})"
127 if isinstance(other, str):
128 return self.
namename == other
130 return self.
namename == other.name
135 return hash(self.
namename)
141 states (list[str]): A list of possible states of the machine.
144 Base class for a final state machine wrapper.
145 Implements the framwork that a more complex machine can inherit from.
147 The `transitions` attribute is a dictionary of trigger name keys, each value of
148 which is another dictionary of 'source' states, 'dest' states, and 'conditions'
149 methods. 'conditions' should be a list of callables or a single one. A transition is
150 valid if it goes from an allowed state to an allowed state.
151 Conditions are optional but must be a callable that returns True or False based
152 on some state of the machine. They cannot have input arguments currently.
154 Every condition/before/after callback function MUST take ``**kwargs`` as the only
155 argument (except ``self`` if it's a class method). This is because it's basically
156 impossible to determine which arguments to pass to which functions for a transition.
157 Therefore this machine just enforces that every function should simply take ``**kwargs``
158 and use the dictionary of arguments (even if it doesn't need any arguments).
160 This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
161 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
165 def __init__(self, states=None, initial_state="default_initial"):
167 Basic Setup of states and initial_state.
174 if initial_state !=
"default_initial":
189 Adds a single state to the list of possible ones.
190 Should be a unique string or a State object with a unique name.
192 if isinstance(state, str):
194 elif isinstance(state, State):
195 if state.name
not in self.
statesstates.keys():
196 self.
statesstates[state.name] = state
198 B2WARNING(f
"You asked to add a state {state} but it was already in the machine states.")
200 B2WARNING(f
"You asked to add a state {state} but it wasn't a State or str object")
205 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
209 @initial_state.setter
213 if state
in self.
statesstates.keys():
218 raise KeyError(f
"Attempted to set state to '{state}' which is not in the 'states' attribute!")
223 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
224 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
225 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
233 if isinstance(state, str):
236 state_name = state.name
239 state = self.
statesstates[state_name]
244 for callback
in state.on_enter:
249 raise MachineError(f
"Attempted to set state to '{state}' which not in the 'states' attribute!")
254 Method to always return True.
258 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
260 Adds a single transition to the dictionary of possible ones.
261 Trigger is the method name that begins the transtion between the
262 source state and the destination state.
264 The condition is an optional function that returns True or False
265 depending on the current state/input.
269 source = self.
statesstates[source]
270 dest = self.
statesstates[dest]
271 transition_dict[
"source"] = source
272 transition_dict[
"dest"] = dest
273 except KeyError
as err:
274 B2WARNING(
"Tried to add a transition where the source or dest isn't in the list of states")
277 if isinstance(conditions, (list, tuple, set)):
278 transition_dict[
"conditions"] = list(conditions)
280 transition_dict[
"conditions"] = [conditions]
282 transition_dict[
"conditions"] = [Machine.default_condition]
286 if isinstance(before, (list, tuple, set)):
287 transition_dict[
"before"] = list(before)
289 transition_dict[
"before"] = [before]
293 if isinstance(after, (list, tuple, set)):
294 transition_dict[
"after"] = list(after)
296 transition_dict[
"after"] = [after]
298 self.
transitionstransitions[trigger].append(transition_dict)
302 Allows us to create a new method for each trigger on the fly.
303 If there is no trigger name in the machine to match, then the normal
304 AttributeError is called.
307 if name
not in possible_transitions:
308 raise AttributeError(f
"{name} does not exist in transitions for state {self.state}.")
310 return partial(self.
_trigger_trigger, name, transition_dict, **kwargs)
312 def _trigger(self, transition_name, transition_dict, **kwargs):
314 Runs the transition logic. Callbacks are evaluated in the order:
315 conditions -> before -> <new state set here> -> after.
317 dest, conditions, before_callbacks, after_callbacks = (
318 transition_dict[
"dest"],
319 transition_dict[
"conditions"],
320 transition_dict[
"before"],
321 transition_dict[
"after"]
324 if all(map(
lambda condition: self.
_callback_callback(condition, **kwargs), conditions)):
325 for before_func
in before_callbacks:
326 self.
_callback_callback(before_func, **kwargs)
329 for after_func
in after_callbacks:
330 self.
_callback_callback(after_func, **kwargs)
332 raise ConditionError((f
"Transition '{transition_name}' called for but one or more conditions "
338 Calls a condition/before/after.. function using arguments passed (or not).
340 return func(**kwargs)
344 Returns allowed transitions from a given state.
346 possible_transitions = []
347 for transition, transition_dicts
in self.
transitionstransitions.items():
348 for transition_dict
in transition_dicts:
349 if transition_dict[
"source"] == source:
350 possible_transitions.append(transition)
351 return possible_transitions
355 Returns the transition dictionary for a state and transition out of it.
357 transition_dicts = self.
transitionstransitions[transition]
358 for transition_dict
in transition_dicts:
359 if transition_dict[
"source"] == state:
360 return transition_dict
362 raise KeyError(f
"No transition from state {state} with the name {transition}.")
366 Does a simple dot file creation to visualise states and transiitons.
368 with open(filename,
"w")
as dotfile:
369 dotfile.write(
"digraph " + graphname +
" {\n")
370 for state
in self.
statesstates.keys():
371 dotfile.write(
'"' + state +
'" [shape=ellipse, color=black]\n')
372 for trigger, transition_dicts
in self.
transitionstransitions.items():
373 for transition
in transition_dicts:
374 dotfile.write(
'"' + transition[
"source"].name +
'" -> "' +
375 transition[
"dest"].name +
'" [label="' + trigger +
'"]\n')
381 A state machine to handle `Calibration` objects and the flow of
385 collector_input_dir =
'collector_input'
386 collector_output_dir =
'collector_output'
387 algorithm_output_dir =
'algorithm_output'
389 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
391 Takes a Calibration object from the caf framework and lets you
392 set the initial state.
433 self.
root_dirroot_dir = Path(os.getcwd(), calibration.name)
443 self.
add_transitionadd_transition(
"submit_collector",
"init",
"running_collector",
451 self.
add_transitionadd_transition(
"fail",
"running_collector",
"collector_failed",
453 self.
add_transitionadd_transition(
"complete",
"running_collector",
"collector_completed",
455 self.
add_transitionadd_transition(
"run_algorithms",
"collector_completed",
"running_algorithms",
459 self.
add_transitionadd_transition(
"complete",
"running_algorithms",
"algorithms_completed",
462 self.
add_transitionadd_transition(
"fail",
"running_algorithms",
"algorithms_failed",
464 self.
add_transitionadd_transition(
"iterate",
"algorithms_completed",
"init",
468 self.
add_transitionadd_transition(
"finish",
"algorithms_completed",
"completed",
471 self.
add_transitionadd_transition(
"fail_fully",
"algorithms_failed",
"failed")
472 self.
add_transitionadd_transition(
"fail_fully",
"collector_failed",
"failed")
474 def _update_cal_state(self, **kwargs):
475 self.
calibrationcalibration.state = str(kwargs[
"new_state"])
479 Lookup function that returns all files from the file_paths that
480 overlap with this IoV.
483 overlapping_files = set()
485 for file_path, file_iov
in files_to_iovs.items():
486 if file_iov.overlaps(iov)
and (file_path
in file_paths):
487 overlapping_files.add(file_path)
488 return overlapping_files
492 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
493 later in case of failure.
497 while any(map(
lambda j: j.status ==
"init", self.
_collector_jobs_collector_jobs.values())):
498 B2DEBUG(29,
"Some Collector Jobs still in 'init' state. Waiting...")
501 for collection_name, job
in self.
_collector_jobs_collector_jobs.items():
502 collector_job_output_file_name = self.
calibrationcalibration.collections[collection_name].job_config
504 collection_name, collector_job_output_file_name)
505 job.dump_to_json(output_file)
509 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
511 for collection_name, collection
in self.
calibrationcalibration.collections.items():
515 collection.job_config)
516 self.
_collector_jobs_collector_jobs[collection_name] = Job.from_json(output_file)
522 B2DEBUG(20, f
"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
525 B2DEBUG(20, f
"No overall IoV requested for calibration: {self.calibration.name}.")
535 Build IoV file dictionary for each collection if required.
537 iov_requested = self._iov_requested()
538 if iov_requested
or self.calibration.ignored_runs:
539 for coll_name, collection
in self.calibration.collections.items():
540 if not collection.files_to_iovs:
541 B2INFO(
"Creating IoV dictionaries to map files to (Exp,Run) ranges for"
542 f
" Calibration '{self.calibration.name} and Collection '{coll_name}'."
543 " Filling dictionary from input file metadata."
544 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
547 for file_path
in collection.input_files:
548 files_to_iovs[file_path] = get_iov_from_file(file_path)
549 collection.files_to_iovs = files_to_iovs
551 B2INFO(
"Using File to IoV mapping from 'files_to_iovs' attribute for "
552 f
"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
554 B2INFO(
"No File to IoV mapping required.")
569 Did all the collections succeed?
571 B2DEBUG(29,
"Checking for failed collector job.")
573 return all([job.status ==
"completed" for job
in self.
_collector_jobs_collector_jobs.values()])
577 Did any of the collections fail?
579 B2DEBUG(29,
"Checking for failed collector job.")
581 return any([job.status ==
"failed" for job
in self.
_collector_jobs_collector_jobs.values()])
586 bool: If AlgorithmsRunner succeeded return True.
593 bool: If AlgorithmsRunner failed return True.
603 since_last_update = time.time() - self.
_collector_timing_collector_timing[
"last_update"]
604 if since_last_update > self.
calibrationcalibration.collector_full_update_interval:
605 B2INFO(
"Updating full collector job statuses.")
610 num_completed = sum((subjob.status
in subjob.exit_statuses)
for subjob
in job.subjobs.values())
611 total_subjobs = len(job.subjobs)
612 B2INFO(f
"{num_completed}/{total_subjobs} Collector SubJobs finished in"
613 f
" Calibration {self.calibration.name} Job {job.name}.")
614 return all([job.ready()
for job
in self.
_collector_jobs_collector_jobs.values()])
629 B2INFO(f
"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
637 iteration_called =
False
639 for result
in results:
640 if result.result == CalibrationAlgorithm.c_Iterate:
641 iteration_called =
True
645 return iteration_called
650 B2INFO(f
"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
654 Condition function to check that the dependencies of our calibration are in the 'completed' state.
655 Technically only need to check explicit dependencies.
657 for calibration
in self.
calibrationcalibration.dependencies:
658 if not calibration.state == calibration.end_state:
665 Automatically try all transitions out of this state once. Tries fail last.
668 for transition
in possible_transitions:
670 if transition !=
"fail":
671 getattr(self, transition)()
673 except ConditionError:
676 if "fail" in possible_transitions:
677 getattr(self,
"fail")()
679 raise MachineError(f
"Failed to automatically transition out of {self.state} state.")
683 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
686 create_directories(self.
root_dirroot_dir, overwrite=
False)
690 Creates a basf2 path for the correct collector and serializes it in the
691 self.output_dir/<calibration_name>/<iteration>/paths directory
695 create_directories(path_output_dir)
696 path_file_name = collection.collector.name() +
'.path'
697 path_file_name = path_output_dir / path_file_name
699 coll_path = create_path()
700 coll_path.add_module(collection.collector)
702 with open(path_file_name,
'bw')
as serialized_path_file:
703 pickle.dump(serialize_path(coll_path), serialized_path_file)
705 return str(path_file_name.absolute())
709 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
710 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
713 coll_path = collection.pre_collector_path
714 path_file_name =
'pre_collector.path'
715 path_file_name = os.path.join(path_output_dir, path_file_name)
717 with open(path_file_name,
'bw')
as serialized_path_file:
718 pickle.dump(serialize_path(coll_path), serialized_path_file)
720 return path_file_name
724 Creates a Job object for the collections of this iteration, ready for submission
727 for collection_name, collection
in self.
calibrationcalibration.collections.items():
729 job = Job(
'_'.join([self.
calibrationcalibration.name, collection_name,
'Iteration', str(self.
iterationiteration)]))
730 job.output_dir = iteration_dir.joinpath(self.
collector_output_dircollector_output_dir, collection_name)
731 job.working_dir = iteration_dir.joinpath(self.
collector_output_dircollector_output_dir, collection_name)
733 if job.output_dir.exists():
734 B2INFO(f
"Previous output directory for {self.calibration.name} collector {collection_name} exists."
735 f
"Deleting {job.output_dir} before re-submitting.")
736 shutil.rmtree(job.output_dir)
737 job.cmd = collection.job_cmd
738 job.append_current_basf2_setup_cmds()
739 job.input_sandbox_files.append(collection.job_script)
740 collector_path_file = Path(self.
_make_collector_path_make_collector_path(collection_name, collection))
741 job.input_sandbox_files.append(collector_path_file)
742 if collection.pre_collector_path:
744 job.input_sandbox_files.append(pre_collector_path_file)
747 list_dependent_databases = []
751 for dependency
in self.
calibrationcalibration.dependencies:
752 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
753 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
754 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
758 previous_iteration_dir = self.
root_dirroot_dir.joinpath(str(self.
iterationiteration - 1))
759 database_dir = os.path.join(previous_iteration_dir, self.
calibrationcalibration.alg_output_dir,
'outputdb')
760 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
761 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
772 for database
in collection.database_chain:
773 if database.db_type ==
'local':
774 json_db_chain.append((
'local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
775 elif database.db_type ==
'central':
776 json_db_chain.append((
'central', database.global_tag))
778 raise ValueError(f
"Unknown database type {database.db_type}.")
780 for database
in list_dependent_databases:
781 json_db_chain.append((
'local', database))
782 job_config[
'database_chain'] = json_db_chain
784 job_config_file_path = input_data_directory.joinpath(
'collector_config.json').absolute()
785 with open(job_config_file_path,
'w')
as job_config_file:
786 json.dump(job_config, job_config_file, indent=2)
787 job.input_sandbox_files.append(job_config_file_path)
790 input_data_files = set(collection.input_files)
794 collection.files_to_iovs,
797 files_to_ignore = set()
798 for exprun
in self.
calibrationcalibration.ignored_runs:
799 for input_file
in input_data_files:
800 file_iov = self.
calibrationcalibration.files_to_iovs[input_file]
801 if file_iov == exprun.make_iov():
802 B2INFO(f
"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
803 f
"Therefore the input file '{input_file}' from Collection '{collection_name}' "
804 "is being removed from input files list.")
805 files_to_ignore.add(input_file)
806 input_data_files.difference_update(files_to_ignore)
808 if not input_data_files:
809 raise MachineError(f
"No valid input files for Calibration '{self.calibration.name}' "
810 f
" and Collection '{collection_name}'.")
811 job.input_files = list(input_data_files)
813 job.splitter = collection.splitter
814 job.backend_args = collection.backend_args
816 job.output_patterns = collection.output_patterns
817 B2DEBUG(20, f
"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
820 def _check_valid_collector_output(self):
821 B2INFO(
"Checking that Collector output exists for all colector jobs "
822 f
"using {self.calibration.name}.output_patterns.")
824 B2INFO(
"We're restarting so we'll recreate the collector Job object.")
830 for pattern
in job.output_patterns:
831 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
833 raise MachineError(
"No output files from Collector Job")
835 for subjob
in job.subjobs.values():
837 for pattern
in subjob.output_patterns:
838 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
840 raise MachineError(f
"No output files from Collector {subjob}")
844 Runs the Calibration Algorithms for this calibration machine.
846 Will run them sequentially locally (possible benefits to using a
847 processing pool for low memory algorithms later on.)
851 algs_runner.algorithms = self.
calibrationcalibration.algorithms
853 output_database_dir = algorithm_output_dir.joinpath(
"outputdb")
855 if algorithm_output_dir.exists():
856 B2INFO(f
"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
857 f
"Deleting and recreating {algorithm_output_dir}.")
858 create_directories(algorithm_output_dir)
859 B2INFO(f
"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
860 algs_runner.output_database_dir = output_database_dir
866 for subjob
in job.subjobs.values():
867 for pattern
in subjob.output_patterns:
868 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
870 for pattern
in job.output_patterns:
871 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
873 algs_runner.input_files = input_files
876 algs_runner.database_chain = self.
calibrationcalibration.database_chain
880 list_dependent_databases = []
881 for dependency
in self.
calibrationcalibration.dependencies:
882 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
883 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
884 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
888 previous_iteration_dir = self.
root_dirroot_dir.joinpath(str(self.
iterationiteration - 1))
889 database_dir = os.path.join(previous_iteration_dir, self.
calibrationcalibration.alg_output_dir,
'outputdb')
890 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
891 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
892 algs_runner.dependent_databases = list_dependent_databases
894 algs_runner.ignored_runs = self.
calibrationcalibration.ignored_runs
898 except Exception
as err:
908 Take the last iteration's outputdb and copy it to a more easily findable place.
913 final_database_location = self.
root_dirroot_dir.joinpath(
'outputdb')
914 if final_database_location.exists():
915 B2INFO(f
"Removing previous final output database for {self.calibration.name} before copying new one.")
916 shutil.rmtree(final_database_location)
917 shutil.copytree(database_location, final_database_location)
922 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
927 required_attrs = [
"algorithm",
928 "dependent_databases",
931 "output_database_dir",
936 required_true_attrs = [
"algorithm",
938 "output_database_dir",
942 def __init__(self, algorithm=None, initial_state="init"):
944 Takes an Algorithm object from the caf framework and defines the transitions.
949 State(
"running_algorithm"),
971 self.
add_transitionadd_transition(
"setup_algorithm",
"init",
"ready",
977 self.
add_transitionadd_transition(
"execute_runs",
"ready",
"running_algorithm",
979 self.
add_transitionadd_transition(
"complete",
"running_algorithm",
"completed")
980 self.
add_transitionadd_transition(
"fail",
"running_algorithm",
"failed")
982 self.
add_transitionadd_transition(
"setup_algorithm",
"completed",
"ready")
983 self.
add_transitionadd_transition(
"setup_algorithm",
"failed",
"ready")
988 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
990 for attribute_name, value
in params.items():
991 setattr(self, attribute_name, value)
996 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
998 B2INFO(
"Checking validity of current setup of AlgorithmMachine for {}.".format(self.
algorithmalgorithm.name))
1001 if not hasattr(self, attribute_name):
1002 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1006 if not getattr(self, attribute_name):
1007 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} returned False.")
1013 Create working/output directory of algorithm. Any old directory is overwritten.
1015 create_directories(Path(self.
output_diroutput_dir), overwrite=
True)
1019 Apply all databases in the correct order.
1023 b2conditions.reset()
1024 b2conditions.override_globaltags()
1028 if database.db_type ==
'local':
1029 B2INFO(f
"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1030 f
"for {self.algorithm.name}.")
1031 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1032 elif database.db_type ==
'central':
1033 B2INFO(f
"Adding Central database tag {database.global_tag} to head of GT chain, "
1034 f
"for {self.algorithm.name}.")
1035 b2conditions.prepend_globaltag(database.global_tag)
1037 raise ValueError(f
"Unknown database type {database.db_type}.")
1042 B2INFO((f
"Adding Local Database {filename} to head of chain of local databases created by"
1043 f
" a dependent calibration, for {self.algorithm.name}."))
1044 b2conditions.prepend_testing_payloads(filename)
1050 B2INFO(f
"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1053 b2conditions.expert_settings(save_payloads=str(self.
output_database_diroutput_database_dir.joinpath(
"database.txt")))
1059 log_file = os.path.join(self.
output_diroutput_dir, self.
algorithmalgorithm.name +
'_stdout')
1060 B2INFO(f
"Output log file at {log_file}.")
1062 basf2.set_log_level(basf2.LogLevel.INFO)
1063 basf2.log_to_file(log_file)
1068 B2INFO(f
"Changing current working directory to {self.output_dir}.")
1073 Call the user defined algorithm setup function.
1075 B2INFO(
"Running Pre-Algorithm function (if exists)")
1076 if self.
algorithmalgorithm.pre_algorithm:
1079 self.
algorithmalgorithm.pre_algorithm(self.
algorithmalgorithm.algorithm, kwargs[
"iteration"])
1083 Does the actual execute of the algorithm on an IoV and records the result.
1085 B2INFO(f
"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1087 runs_to_execute = kwargs[
"runs"]
1088 iov = kwargs[
"apply_iov"]
1089 iteration = kwargs[
"iteration"]
1091 iov = iov_from_runs(runs_to_execute)
1092 B2INFO(f
"Execution will use {iov} for labelling payloads by default.")
1093 alg_result = self.
algorithmalgorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1094 self.
resultresult = IoV_Result(iov, alg_result)
1096 def _set_input_data(self, **kwargs):
1102 Base exception class for this module.
1106 class ConditionError(MachineError):
1108 Exception for when conditions fail during a transition.
1114 Exception for when transitions fail.
def _setup_database_chain(self, **kwargs)
list required_true_attrs
Attributes that must have a value that returns True when tested.
output_database_dir
The output database directory for the localdb that the algorithm will commit to.
input_files
Collector output files, will contain all files retured by the output patterns.
default_states
Default states for the AlgorithmMachine.
def _change_working_dir(self, **kwargs)
def __init__(self, algorithm=None, initial_state="init")
list required_attrs
Required attributes that must exist before the machine can run properly.
algorithm
Algorithm() object whose state we are modelling.
database_chain
Assigned database chain to the overall Calibration object, or to the 'default' Collection.
result
IoV_Result object for a single execution, will be reset upon a new execution.
def setup_from_dict(self, params)
def _set_input_data(self, **kwargs)
dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
def _setup_logging(self, **kwargs)
def _execute_over_iov(self, **kwargs)
def _pre_algorithm(self, **kwargs)
output_dir
The algorithm output directory which is mostly used to store the stdout file.
def _create_output_dir(self, **kwargs)
def _build_iov_dicts(self)
def _log_new_state(self, **kwargs)
def _resolve_file_paths(self)
root_dir
root directory for this Calibration
def _recover_collector_jobs(self)
def _runner_not_failed(self)
def _update_cal_state(self, **kwargs)
def _create_collector_jobs(self)
_runner_final_state
Final state of the algorithm runner for the current iteration.
def _dump_job_config(self)
def _below_max_iterations(self)
default_states
States that are defaults to the CalibrationMachine (could override later)
def _run_algorithms(self)
_algorithm_results
Results of each iteration for all algorithms of this calibration.
def automatic_transition(self)
def _collection_failed(self)
iov_to_calibrate
IoV to be executed, currently will loop over all runs in IoV.
def files_containing_iov(self, file_paths, files_to_iovs, iov)
def _require_iteration(self)
iteration
Allows calibration object to hold a refernce to the machine controlling it.
def _make_collector_path(self, name, collection)
def _make_pre_collector_path(self, name, collection)
_collector_timing
Times of various useful updates to the collector job e.g.
def _no_require_iteration(self)
collector_backend
Backend used for this calibration machine collector.
def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0)
def _make_output_dir(self)
def _collector_jobs_ready(self)
def _increment_iteration(self)
def dependencies_completed(self)
string collector_input_dir
_collector_jobs
The collector jobs used for submission.
calibration
Calibration object whose state we are modelling.
def _submit_collections(self)
string collector_output_dir
def _collection_completed(self)
def _prepare_final_db(self)
def _check_valid_collector_output(self)
def add_state(self, state, enter=None, exit=None)
def get_transitions(self, source)
def _callback(func, **kwargs)
transitions
Allowed transitions between states.
def save_graph(self, filename, graphname)
def get_transition_dict(self, state, transition)
def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None)
def default_condition(**kwargs)
def _trigger(self, transition_name, transition_dict, **kwargs)
def __init__(self, states=None, initial_state="default_initial")
states
Valid states for this machine.
initial_state
Pointless docstring since it's a property.
def initial_state(self, state)
_initial_state
Actual attribute holding initial state for this machine.
def __getattr__(self, name, **kwargs)
state
Current State of machine.
_state
Actual attribute holding the Current state.
def __init__(self, name, enter=None, exit=None)
def on_exit(self, callbacks)
def on_enter(self, callbacks)
def _add_callbacks(self, callback, attribute)
on_enter
Callback list when entering state.
on_exit
Callback list when exiting state.
def _(self, callbacks, attribute)