6 from functools
import partial
7 from collections
import defaultdict
13 from pathlib
import Path
17 from basf2
import create_path
18 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
19 from basf2
import conditions
as b2conditions
22 from ROOT.Belle2
import CalibrationAlgorithm
24 from caf.utils
import create_directories
25 from caf.utils
import method_dispatch
26 from caf.utils
import iov_from_runs
27 from caf.utils
import IoV_Result
28 from caf.utils
import get_iov_from_file
29 from caf.backends
import Job
30 from caf.backends
import LSF
31 from caf.backends
import PBS
32 from caf.backends
import Local
33 from caf.runners
import AlgorithmsRunner
38 Basic State object that can take enter and exit state methods and records
39 the state of a machine.
41 You should assign the self.on_enter or self.on_exit attributes to callback functions
42 or lists of them, if you need them.
45 def __init__(self, name, enter=None, exit=None):
47 Initialise State with a name and optional lists of callbacks.
59 Runs callbacks when a state is entered.
74 Runs callbacks when a state is exited.
89 Adds callback to a property.
91 if callable(callback):
92 attribute.append(callback)
94 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
96 @_add_callbacks.register(tuple)
97 @_add_callbacks.register(list)
98 def _(self, callbacks, attribute):
100 Alternate method for lists and tuples of function objects.
103 for function
in callbacks:
104 if callable(function):
105 attribute.append(function)
107 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
117 return f
"State(name={self.name})"
122 if isinstance(other, str):
123 return self.
name == other
125 return self.
name == other.name
130 return hash(self.
name)
136 states (list[str]): A list of possible states of the machine.
139 Base class for a final state machine wrapper.
140 Implements the framwork that a more complex machine can inherit from.
142 The `transitions` attribute is a dictionary of trigger name keys, each value of
143 which is another dictionary of 'source' states, 'dest' states, and 'conditions'
144 methods. 'conditions' should be a list of callables or a single one. A transition is
145 valid if it goes from an allowed state to an allowed state.
146 Conditions are optional but must be a callable that returns True or False based
147 on some state of the machine. They cannot have input arguments currently.
149 Every condition/before/after callback function MUST take ``**kwargs`` as the only
150 argument (except ``self`` if it's a class method). This is because it's basically
151 impossible to determine which arguments to pass to which functions for a transition.
152 Therefore this machine just enforces that every function should simply take ``**kwargs``
153 and use the dictionary of arguments (even if it doesn't need any arguments).
155 This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
156 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
160 def __init__(self, states=None, initial_state="default_initial"):
162 Basic Setup of states and initial_state.
169 if initial_state !=
"default_initial":
184 Adds a single state to the list of possible ones.
185 Should be a unique string or a State object with a unique name.
187 if isinstance(state, str):
189 elif isinstance(state, State):
190 if state.name
not in self.
states.keys():
191 self.
states[state.name] = state
193 B2WARNING(f
"You asked to add a state {state} but it was already in the machine states.")
195 B2WARNING(f
"You asked to add a state {state} but it wasn't a State or str object")
200 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
204 @initial_state.setter
208 if state
in self.
states.keys():
213 raise KeyError(f
"Attempted to set state to '{state}' which is not in the 'states' attribute!")
218 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
219 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
220 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
228 if isinstance(state, str):
231 state_name = state.name
234 state = self.
states[state_name]
236 for callback
in self.
state.on_exit:
237 callback(prior_state=self.
state, new_state=state)
239 for callback
in state.on_enter:
240 callback(prior_state=self.
state, new_state=state)
244 raise MachineError(f
"Attempted to set state to '{state}' which not in the 'states' attribute!")
249 Method to always return True.
253 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
255 Adds a single transition to the dictionary of possible ones.
256 Trigger is the method name that begins the transtion between the
257 source state and the destination state.
259 The condition is an optional function that returns True or False
260 depending on the current state/input.
264 source = self.
states[source]
266 transition_dict[
"source"] = source
267 transition_dict[
"dest"] = dest
268 except KeyError
as err:
269 B2WARNING(
"Tried to add a transition where the source or dest isn't in the list of states")
272 if isinstance(conditions, (list, tuple, set)):
273 transition_dict[
"conditions"] = list(conditions)
275 transition_dict[
"conditions"] = [conditions]
277 transition_dict[
"conditions"] = [Machine.default_condition]
281 if isinstance(before, (list, tuple, set)):
282 transition_dict[
"before"] = list(before)
284 transition_dict[
"before"] = [before]
288 if isinstance(after, (list, tuple, set)):
289 transition_dict[
"after"] = list(after)
291 transition_dict[
"after"] = [after]
297 Allows us to create a new method for each trigger on the fly.
298 If there is no trigger name in the machine to match, then the normal
299 AttributeError is called.
302 if name
not in possible_transitions:
303 raise AttributeError(f
"{name} does not exist in transitions for state {self.state}.")
305 return partial(self.
_trigger, name, transition_dict, **kwargs)
307 def _trigger(self, transition_name, transition_dict, **kwargs):
309 Runs the transition logic. Callbacks are evaluated in the order:
310 conditions -> before -> <new state set here> -> after.
312 source, dest, conditions, before_callbacks, after_callbacks = (transition_dict[
"source"],
313 transition_dict[
"dest"],
314 transition_dict[
"conditions"],
315 transition_dict[
"before"],
316 transition_dict[
"after"])
318 if all(map(
lambda condition: self.
_callback(condition, **kwargs), conditions)):
319 for before_func
in before_callbacks:
323 for after_func
in after_callbacks:
326 raise ConditionError((f
"Transition '{transition_name}' called for but one or more conditions "
332 Calls a condition/before/after.. function using arguments passed (or not).
334 return func(**kwargs)
338 Returns allowed transitions from a given state.
340 possible_transitions = []
341 for transition, transition_dicts
in self.
transitions.items():
342 for transition_dict
in transition_dicts:
343 if transition_dict[
"source"] == source:
344 possible_transitions.append(transition)
345 return possible_transitions
349 Returns the transition dictionary for a state and transition out of it.
352 for transition_dict
in transition_dicts:
353 if transition_dict[
"source"] == state:
354 return transition_dict
356 raise KeyError(f
"No transition from state {state} with the name {transition}.")
360 Does a simple dot file creation to visualise states and transiitons.
362 with open(filename,
"w")
as dotfile:
363 dotfile.write(
"digraph " + graphname +
" {\n")
364 for state
in self.
states.keys():
365 dotfile.write(
'"' + state +
'" [shape=ellipse, color=black]\n')
366 for trigger, transition_dicts
in self.
transitions.items():
367 for transition
in transition_dicts:
368 dotfile.write(
'"' + transition[
"source"].name +
'" -> "' +
369 transition[
"dest"].name +
'" [label="' + trigger +
'"]\n')
375 A state machine to handle `Calibration` objects and the flow of
379 collector_input_dir =
'collector_input'
380 collector_output_dir =
'collector_output'
381 algorithm_output_dir =
'algorithm_output'
383 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
385 Takes a Calibration object from the caf framework and lets you
386 set the initial state.
427 self.
root_dir = Path(os.getcwd(), calibration.name)
437 self.
add_transition(
"submit_collector",
"init",
"running_collector",
445 self.
add_transition(
"fail",
"running_collector",
"collector_failed",
447 self.
add_transition(
"complete",
"running_collector",
"collector_completed",
449 self.
add_transition(
"run_algorithms",
"collector_completed",
"running_algorithms",
453 self.
add_transition(
"complete",
"running_algorithms",
"algorithms_completed",
456 self.
add_transition(
"fail",
"running_algorithms",
"algorithms_failed",
462 self.
add_transition(
"finish",
"algorithms_completed",
"completed",
468 def _update_cal_state(self, **kwargs):
473 Lookup function that returns all files from the file_paths that
474 overlap with this IoV.
477 overlapping_files = set()
479 for file_path, file_iov
in files_to_iovs.items():
480 if file_iov.overlaps(iov)
and (file_path
in file_paths):
481 overlapping_files.add(file_path)
482 return overlapping_files
486 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
487 later in case of failure.
491 while any(map(
lambda j: j.status ==
"init", self.
_collector_jobs.values())):
492 B2DEBUG(29,
"Some Collector Jobs still in 'init' state. Waiting...")
496 collector_job_output_file_name = self.
calibration.collections[collection_name].job_config
498 collection_name, collector_job_output_file_name)
499 job.dump_to_json(output_file)
503 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
505 for collection_name, collection
in self.
calibration.collections.items():
509 collection.job_config)
516 B2DEBUG(20, f
"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
519 B2DEBUG(20, f
"No overall IoV requested for calibration: {self.calibration.name}.")
529 Build IoV file dictionary for each collection if required.
531 iov_requested = self._iov_requested()
532 if iov_requested
or self.calibration.ignored_runs:
533 for coll_name, collection
in self.calibration.collections.items():
534 if not collection.files_to_iovs:
535 B2INFO(
"Creating IoV dictionaries to map files to (Exp,Run) ranges for"
536 f
" Calibration '{self.calibration.name} and Collection '{coll_name}'."
537 " Filling dictionary from input file metadata."
538 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
541 for file_path
in collection.input_files:
542 files_to_iovs[file_path] = get_iov_from_file(file_path)
543 collection.files_to_iovs = files_to_iovs
545 B2INFO(
"Using File to IoV mapping from 'files_to_iovs' attribute for "
546 f
"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
548 B2INFO(
"No File to IoV mapping required.")
563 Did all the collections succeed?
565 B2DEBUG(29,
"Checking for failed collector job.")
567 return all([job.status ==
"completed" for job
in self.
_collector_jobs.values()])
571 Did any of the collections fail?
573 B2DEBUG(29,
"Checking for failed collector job.")
575 return any([job.status ==
"failed" for job
in self.
_collector_jobs.values()])
580 bool: If AlgorithmsRunner succeeded return True.
587 bool: If AlgorithmsRunner failed return True.
598 if since_last_update > self.
calibration.collector_full_update_interval:
599 B2INFO(
"Updating full collector job statuses.")
604 num_completed = sum((subjob.status
in subjob.exit_statuses)
for subjob
in job.subjobs.values())
605 total_subjobs = len(job.subjobs)
606 B2INFO(f
"{num_completed}/{total_subjobs} Collector SubJobs finished in"
607 f
" Calibration {self.calibration.name} Job {job.name}.")
623 B2INFO(f
"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
631 iteration_called =
False
633 for result
in results:
634 if result.result == CalibrationAlgorithm.c_Iterate:
635 iteration_called =
True
639 return iteration_called
644 B2INFO(f
"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
648 Condition function to check that the dependencies of our calibration are in the 'completed' state.
649 Technically only need to check explicit dependencies.
652 if not calibration.state == calibration.end_state:
659 Automatically try all transitions out of this state once. Tries fail last.
662 for transition
in possible_transitions:
664 if transition !=
"fail":
665 getattr(self, transition)()
667 except ConditionError:
670 if "fail" in possible_transitions:
671 getattr(self,
"fail")()
673 raise MachineError(f
"Failed to automatically transition out of {self.state} state.")
677 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
680 create_directories(self.
root_dir, overwrite=
False)
684 Creates a basf2 path for the correct collector and serializes it in the
685 self.output_dir/<calibration_name>/<iteration>/paths directory
689 create_directories(path_output_dir)
690 path_file_name = collection.collector.name() +
'.path'
691 path_file_name = path_output_dir / path_file_name
693 coll_path = create_path()
694 coll_path.add_module(collection.collector)
696 with open(path_file_name,
'bw')
as serialized_path_file:
697 pickle.dump(serialize_path(coll_path), serialized_path_file)
699 return str(path_file_name.absolute())
703 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
704 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
707 coll_path = collection.pre_collector_path
708 path_file_name =
'pre_collector.path'
709 path_file_name = os.path.join(path_output_dir, path_file_name)
711 with open(path_file_name,
'bw')
as serialized_path_file:
712 pickle.dump(serialize_path(coll_path), serialized_path_file)
714 return path_file_name
718 Creates a Job object for the collections of this iteration, ready for submission
721 for collection_name, collection
in self.
calibration.collections.items():
727 if job.output_dir.exists():
728 B2INFO(f
"Previous output directory for {self.calibration.name} collector {collection_name} exists."
729 f
"Deleting {job.output_dir} before re-submitting.")
730 shutil.rmtree(job.output_dir)
731 job.cmd = collection.job_cmd
732 job.append_current_basf2_setup_cmds()
733 job.input_sandbox_files.append(collection.job_script)
735 job.input_sandbox_files.append(collector_path_file)
736 if collection.pre_collector_path:
738 job.input_sandbox_files.append(pre_collector_path_file)
741 list_dependent_databases = []
746 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
747 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
748 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
753 database_dir = os.path.join(previous_iteration_dir, self.
calibration.alg_output_dir,
'outputdb')
754 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
755 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
766 for database
in collection.database_chain:
767 if database.db_type ==
'local':
768 json_db_chain.append((
'local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
769 elif database.db_type ==
'central':
770 json_db_chain.append((
'central', database.global_tag))
772 raise ValueError(f
"Unknown database type {database.db_type}.")
774 for database
in list_dependent_databases:
775 json_db_chain.append((
'local', database))
776 job_config[
'database_chain'] = json_db_chain
778 job_config_file_path = input_data_directory.joinpath(
'collector_config.json').absolute()
779 with open(job_config_file_path,
'w')
as job_config_file:
780 json.dump(job_config, job_config_file, indent=2)
781 job.input_sandbox_files.append(job_config_file_path)
784 input_data_files = set(collection.input_files)
788 collection.files_to_iovs,
791 files_to_ignore = set()
793 for input_file
in input_data_files:
794 file_iov = self.
calibration.files_to_iovs[input_file]
795 if file_iov == exprun.make_iov():
796 B2INFO(f
"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
797 f
"Therefore the input file '{input_file}' from Collection '{collection_name}' "
798 "is being removed from input files list.")
799 files_to_ignore.add(input_file)
800 input_data_files.difference_update(files_to_ignore)
802 if not input_data_files:
803 raise MachineError(f
"No valid input files for Calibration '{self.calibration.name}' "
804 f
" and Collection '{collection_name}'.")
805 job.input_files = list(input_data_files)
807 job.splitter = collection.splitter
808 job.backend_args = collection.backend_args
810 job.output_patterns = collection.output_patterns
811 B2DEBUG(20, f
"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
814 def _check_valid_collector_output(self):
815 B2INFO(
"Checking that Collector output exists for all colector jobs "
816 f
"using {self.calibration.name}.output_patterns.")
818 B2INFO(
"We're restarting so we'll recreate the collector Job object.")
824 for pattern
in job.output_patterns:
825 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
827 raise MachineError(
"No output files from Collector Job")
829 for subjob
in job.subjobs.values():
831 for pattern
in subjob.output_patterns:
832 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
834 raise MachineError(f
"No output files from Collector {subjob}")
838 Runs the Calibration Algorithms for this calibration machine.
840 Will run them sequentially locally (possible benefits to using a
841 processing pool for low memory algorithms later on.)
845 algs_runner.algorithms = self.
calibration.algorithms
847 output_database_dir = algorithm_output_dir.joinpath(
"outputdb")
849 if algorithm_output_dir.exists():
850 B2INFO(f
"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
851 f
"Deleting and recreating {algorithm_output_dir}.")
852 create_directories(algorithm_output_dir)
853 B2INFO(f
"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
854 algs_runner.output_database_dir = output_database_dir
860 for subjob
in job.subjobs.values():
861 for pattern
in subjob.output_patterns:
862 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
864 for pattern
in job.output_patterns:
865 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
867 algs_runner.input_files = input_files
870 algs_runner.database_chain = self.
calibration.database_chain
874 list_dependent_databases = []
876 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
877 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
878 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
883 database_dir = os.path.join(previous_iteration_dir, self.
calibration.alg_output_dir,
'outputdb')
884 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
885 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
886 algs_runner.dependent_databases = list_dependent_databases
888 algs_runner.ignored_runs = self.
calibration.ignored_runs
892 except Exception
as err:
902 Take the last iteration's outputdb and copy it to a more easily findable place.
907 final_database_location = self.
root_dir.joinpath(
'outputdb')
908 if final_database_location.exists():
909 B2INFO(f
"Removing previous final output database for {self.calibration.name} before copying new one.")
910 shutil.rmtree(final_database_location)
911 shutil.copytree(database_location, final_database_location)
916 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
921 required_attrs = [
"algorithm",
922 "dependent_databases",
925 "output_database_dir",
930 required_true_attrs = [
"algorithm",
932 "output_database_dir",
936 def __init__(self, algorithm=None, initial_state="init"):
938 Takes an Algorithm object from the caf framework and defines the transitions.
943 State(
"running_algorithm"),
982 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
984 for attribute_name, value
in params.items():
985 setattr(self, attribute_name, value)
990 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
992 B2INFO(
"Checking validity of current setup of AlgorithmMachine for {}.".format(self.
algorithm.name))
995 if not hasattr(self, attribute_name):
996 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1000 if not getattr(self, attribute_name):
1001 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} returned False.")
1007 Create working/output directory of algorithm. Any old directory is overwritten.
1009 create_directories(Path(self.
output_dir), overwrite=
True)
1013 Apply all databases in the correct order.
1017 b2conditions.reset()
1018 b2conditions.override_globaltags()
1022 if database.db_type ==
'local':
1023 B2INFO(f
"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1024 f
"for {self.algorithm.name}.")
1025 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1026 elif database.db_type ==
'central':
1027 B2INFO(f
"Adding Central database tag {database.global_tag} to head of GT chain, "
1028 f
"for {self.algorithm.name}.")
1029 b2conditions.prepend_globaltag(database.global_tag)
1031 raise ValueError(f
"Unknown database type {database.db_type}.")
1036 B2INFO((f
"Adding Local Database {filename} to head of chain of local databases created by"
1037 f
" a dependent calibration, for {self.algorithm.name}."))
1038 b2conditions.prepend_testing_payloads(filename)
1044 B2INFO(f
"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1047 b2conditions.expert_settings(save_payloads=str(self.
output_database_dir.joinpath(
"database.txt")))
1054 B2INFO(f
"Output log file at {log_file}.")
1056 basf2.set_log_level(basf2.LogLevel.INFO)
1057 basf2.log_to_file(log_file)
1062 B2INFO(f
"Changing current working directory to {self.output_dir}.")
1067 Call the user defined algorithm setup function.
1069 B2INFO(
"Running Pre-Algorithm function (if exists)")
1077 Does the actual execute of the algorithm on an IoV and records the result.
1079 B2INFO(f
"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1081 runs_to_execute = kwargs[
"runs"]
1082 iov = kwargs[
"apply_iov"]
1083 iteration = kwargs[
"iteration"]
1085 iov = iov_from_runs(runs_to_execute)
1086 B2INFO(f
"Execution will use {iov} for labelling payloads by default.")
1087 alg_result = self.
algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1088 self.
result = IoV_Result(iov, alg_result)
1090 def _set_input_data(self, **kwargs):
1096 Base exception class for this module.
1100 class ConditionError(MachineError):
1102 Exception for when conditions fail during a transition.
1108 Exception for when transitions fail.