16from functools
import partial
17from collections
import defaultdict
23from pathlib
import Path
27from basf2
import create_path
28from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
29from basf2
import conditions
as b2conditions
32from ROOT
import Belle2
33from ROOT.Belle2
import CalibrationAlgorithm
35from caf.utils
import create_directories
36from caf.utils
import method_dispatch
37from caf.utils
import iov_from_runs
38from caf.utils
import IoV_Result
39from caf.utils
import get_iov_from_file
40from caf.backends
import Job
41from caf.runners
import AlgorithmsRunner
46 Basic State object that can take enter and exit state methods and records
47 the state of a machine.
49 You should assign the self.on_enter or self.on_exit attributes to callback functions
50 or lists of them, if you need them.
53 def __init__(self, name, enter=None, exit=None):
55 Initialise State with a name and optional lists of callbacks.
67 Runs callbacks when a state is entered.
72 def on_enter(self, callbacks):
77 self._add_callbacks(callbacks, self._on_enter)
82 Runs callbacks when a state is exited.
87 def on_exit(self, callbacks):
92 self._add_callbacks(callbacks, self._on_exit)
95 def _add_callbacks(self, callback, attribute):
97 Adds callback to a property.
99 if callable(callback):
100 attribute.append(callback)
102 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
104 @_add_callbacks.register(tuple)
105 @_add_callbacks.register(list)
106 def _(self, callbacks, attribute):
108 Alternate method for lists and tuples of function objects.
111 for function
in callbacks:
112 if callable(function):
113 attribute.append(function)
115 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
125 return f
"State(name={self.name})"
127 def __eq__(self, other):
130 if isinstance(other, str):
131 return self.name == other
133 return self.name == other.name
138 return hash(self.name)
144 states (list[str]): A list of possible states of the machine.
147 Base class for a final state machine wrapper.
148 Implements the framework that a more complex machine can inherit from.
150 The `transitions` attribute is a dictionary of trigger name keys, each value of
151 which is another dictionary of 'source' states, 'dest' states, and 'conditions'
152 methods. 'conditions' should be a list of callables or a single one. A transition is
153 valid if it goes from an allowed state to an allowed state.
154 Conditions are optional but must be a callable that returns True or False based
155 on some state of the machine. They cannot have input arguments currently.
157 Every condition/before/after callback function MUST take ``**kwargs`` as the only
158 argument (except ``self`` if it's a class method). This is because it's basically
159 impossible to determine which arguments to pass to which functions for a transition.
160 Therefore this machine just enforces that every function should simply take ``**kwargs``
161 and use the dictionary of arguments (even if it doesn't need any arguments).
163 This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
164 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
168 def __init__(self, states=None, initial_state="default_initial"):
170 Basic Setup of states and initial_state.
176 self.add_state(state)
177 if initial_state !=
"default_initial":
179 self.initial_state = initial_state
181 self.add_state(initial_state)
183 self._initial_state = State(initial_state)
186 self._state = self.initial_state
188 self.transitions = defaultdict(list)
190 def add_state(self, state, enter=None, exit=None):
192 Adds a single state to the list of possible ones.
193 Should be a unique string or a State object with a unique name.
195 if isinstance(state, str):
196 self.add_state(State(state, enter, exit))
197 elif isinstance(state, State):
198 if state.name
not in self.states.keys():
199 self.states[state.name] = state
201 B2WARNING(f
"You asked to add a state {state} but it was already in the machine states.")
203 B2WARNING(f
"You asked to add a state {state} but it wasn't a State or str object")
206 def initial_state(self):
208 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
210 return self._initial_state
212 @initial_state.setter
213 def initial_state(self, state):
216 if state
in self.states.keys():
217 self._initial_state = self.states[state]
219 self._state = self.states[state]
221 raise KeyError(f
"Attempted to set state to '{state}' which is not in the 'states' attribute!")
226 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
227 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
228 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
233 def state(self, state):
236 if isinstance(state, str):
239 state_name = state.name
242 state = self.states[state_name]
244 for callback
in self.state.on_exit:
245 callback(prior_state=self.state, new_state=state)
247 for callback
in state.on_enter:
248 callback(prior_state=self.state, new_state=state)
252 raise MachineError(f
"Attempted to set state to '{state}' which not in the 'states' attribute!")
255 def default_condition(**kwargs):
257 Method to always return True.
261 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
263 Adds a single transition to the dictionary of possible ones.
264 Trigger is the method name that begins the transition between the
265 source state and the destination state.
267 The condition is an optional function that returns True or False
268 depending on the current state/input.
272 source = self.states[source]
273 dest = self.states[dest]
274 transition_dict[
"source"] = source
275 transition_dict[
"dest"] = dest
276 except KeyError
as err:
277 B2WARNING(
"Tried to add a transition where the source or dest isn't in the list of states")
280 if isinstance(conditions, (list, tuple, set)):
281 transition_dict[
"conditions"] = list(conditions)
283 transition_dict[
"conditions"] = [conditions]
285 transition_dict[
"conditions"] = [Machine.default_condition]
289 if isinstance(before, (list, tuple, set)):
290 transition_dict[
"before"] = list(before)
292 transition_dict[
"before"] = [before]
296 if isinstance(after, (list, tuple, set)):
297 transition_dict[
"after"] = list(after)
299 transition_dict[
"after"] = [after]
301 self.transitions[trigger].append(transition_dict)
303 def __getattr__(self, name, **kwargs):
305 Allows us to create a new method for each trigger on the fly.
306 If there is no trigger name in the machine to match, then the normal
307 AttributeError is called.
309 possible_transitions = self.get_transitions(self.state)
310 if name
not in possible_transitions:
311 raise AttributeError(f
"{name} does not exist in transitions for state {self.state}.")
312 transition_dict = self.get_transition_dict(self.state, name)
313 return partial(self._trigger, name, transition_dict, **kwargs)
315 def _trigger(self, transition_name, transition_dict, **kwargs):
317 Runs the transition logic. Callbacks are evaluated in the order:
318 conditions -> before -> <new state set here> -> after.
320 dest, conditions, before_callbacks, after_callbacks = (
321 transition_dict[
"dest"],
322 transition_dict[
"conditions"],
323 transition_dict[
"before"],
324 transition_dict[
"after"]
327 if all(
map(
lambda condition: self._callback(condition, **kwargs), conditions)):
328 for before_func
in before_callbacks:
329 self._callback(before_func, **kwargs)
332 for after_func
in after_callbacks:
333 self._callback(after_func, **kwargs)
335 raise ConditionError(f
"Transition '{transition_name}' called for but one or more conditions "
339 def _callback(func, **kwargs):
341 Calls a condition/before/after.. function using arguments passed (or not).
343 return func(**kwargs)
345 def get_transitions(self, source):
347 Returns allowed transitions from a given state.
349 possible_transitions = []
350 for transition, transition_dicts
in self.transitions.items():
351 for transition_dict
in transition_dicts:
352 if transition_dict[
"source"] == source:
353 possible_transitions.append(transition)
354 return possible_transitions
356 def get_transition_dict(self, state, transition):
358 Returns the transition dictionary for a state and transition out of it.
360 transition_dicts = self.transitions[transition]
361 for transition_dict
in transition_dicts:
362 if transition_dict[
"source"] == state:
363 return transition_dict
365 raise KeyError(f
"No transition from state {state} with the name {transition}.")
367 def save_graph(self, filename, graphname):
369 Does a simple dot file creation to visualise states and transiitons.
371 with open(filename,
"w")
as dotfile:
372 dotfile.write(
"digraph " + graphname +
" {\n")
373 for state
in self.states.keys():
374 dotfile.write(
'"' + state +
'" [shape=ellipse, color=black]\n')
375 for trigger, transition_dicts
in self.transitions.items():
376 for transition
in transition_dicts:
377 dotfile.write(
'"' + transition[
"source"].name +
'" -> "' +
378 transition[
"dest"].name +
'" [label="' + trigger +
'"]\n')
382class CalibrationMachine(Machine):
384 A state machine to handle `Calibration` objects and the flow of
388 collector_input_dir =
'collector_input'
389 collector_output_dir =
'collector_output'
390 algorithm_output_dir =
'algorithm_output'
392 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
394 Takes a Calibration object from the caf framework and lets you
395 set the initial state.
398 self.default_states = [State(
"init", enter=[self._update_cal_state,
399 self._log_new_state]),
400 State(
"running_collector", enter=[self._update_cal_state,
401 self._log_new_state]),
402 State(
"collector_failed", enter=[self._update_cal_state,
403 self._log_new_state]),
404 State(
"collector_completed", enter=[self._update_cal_state,
405 self._log_new_state]),
406 State(
"running_algorithms", enter=[self._update_cal_state,
407 self._log_new_state]),
408 State(
"algorithms_failed", enter=[self._update_cal_state,
409 self._log_new_state]),
410 State(
"algorithms_completed", enter=[self._update_cal_state,
411 self._log_new_state]),
412 State(
"completed", enter=[self._update_cal_state,
413 self._log_new_state]),
414 State(
"failed", enter=[self._update_cal_state,
415 self._log_new_state])
418 super().__init__(self.default_states, initial_state)
421 self.calibration = calibration
424 self.calibration.machine = self
426 self.iteration = iteration
428 self.collector_backend =
None
430 self._algorithm_results = {}
432 self._runner_final_state =
None
434 self.iov_to_calibrate = iov_to_calibrate
436 self.root_dir = Path(os.getcwd(), calibration.name)
441 self._collector_timing = {}
444 self._collector_jobs = {}
446 self.add_transition(
"submit_collector",
"init",
"running_collector",
447 conditions=self.dependencies_completed,
448 before=[self._make_output_dir,
449 self._resolve_file_paths,
450 self._build_iov_dicts,
451 self._create_collector_jobs,
452 self._submit_collections,
453 self._dump_job_config])
454 self.add_transition(
"fail",
"running_collector",
"collector_failed",
455 conditions=self._collection_failed)
456 self.add_transition(
"complete",
"running_collector",
"collector_completed",
457 conditions=self._collection_completed)
458 self.add_transition(
"run_algorithms",
"collector_completed",
"running_algorithms",
459 before=self._check_valid_collector_output,
460 after=[self._run_algorithms,
461 self.automatic_transition])
462 self.add_transition(
"complete",
"running_algorithms",
"algorithms_completed",
463 after=self.automatic_transition,
464 conditions=self._runner_not_failed)
465 self.add_transition(
"fail",
"running_algorithms",
"algorithms_failed",
466 conditions=self._runner_failed)
467 self.add_transition(
"iterate",
"algorithms_completed",
"init",
468 conditions=[self._require_iteration,
469 self._below_max_iterations],
470 after=self._increment_iteration)
471 self.add_transition(
"finish",
"algorithms_completed",
"completed",
472 conditions=self._no_require_iteration,
473 before=self._prepare_final_db)
474 self.add_transition(
"fail_fully",
"algorithms_failed",
"failed")
475 self.add_transition(
"fail_fully",
"collector_failed",
"failed")
477 def _update_cal_state(self, **kwargs):
478 self.calibration.state = str(kwargs[
"new_state"])
480 def files_containing_iov(self, file_paths, files_to_iovs, iov):
482 Lookup function that returns all files from the file_paths that
483 overlap with this IoV.
486 overlapping_files = set()
488 for file_path, file_iov
in files_to_iovs.items():
489 if file_iov.overlaps(iov)
and (file_path
in file_paths):
490 overlapping_files.add(file_path)
491 return overlapping_files
493 def _dump_job_config(self):
495 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
496 later in case of failure.
500 while any(
map(
lambda j: j.status ==
"init", self._collector_jobs.values())):
501 B2DEBUG(29,
"Some Collector Jobs still in 'init' state. Waiting...")
504 for collection_name, job
in self._collector_jobs.items():
505 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
506 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
507 collection_name, collector_job_output_file_name)
508 job.dump_to_json(output_file)
510 def _recover_collector_jobs(self):
512 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
514 for collection_name, collection
in self.calibration.collections.items():
515 output_file = self.root_dir.joinpath(str(self.iteration),
516 self.collector_input_dir,
518 collection.job_config)
519 self._collector_jobs[collection_name] = Job.from_json(output_file)
521 def _iov_requested(self):
524 if self.iov_to_calibrate:
525 B2DEBUG(20, f
"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
528 B2DEBUG(20, f
"No overall IoV requested for calibration: {self.calibration.name}.")
531 def _resolve_file_paths(self):
536 def _build_iov_dicts(self):
538 Build IoV file dictionary for each collection if required.
540 iov_requested = self._iov_requested()
541 if iov_requested
or self.calibration.ignored_runs:
542 for coll_name, collection
in self.calibration.collections.items():
543 if not collection.files_to_iovs:
544 B2INFO(
"Creating IoV dictionaries to map files to (Exp,Run) ranges for"
545 f
" Calibration '{self.calibration.name} and Collection '{coll_name}'."
546 " Filling dictionary from input file metadata."
547 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
550 for file_path
in collection.input_files:
551 files_to_iovs[file_path] = get_iov_from_file(file_path)
552 collection.files_to_iovs = files_to_iovs
554 B2INFO(
"Using File to IoV mapping from 'files_to_iovs' attribute for "
555 f
"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
557 B2INFO(
"No File to IoV mapping required.")
559 def _below_max_iterations(self):
562 return self.iteration < self.calibration.max_iterations
564 def _increment_iteration(self):
568 self.calibration.iteration = self.iteration
570 def _collection_completed(self):
572 Did all the collections succeed?
574 B2DEBUG(29,
"Checking for failed collector job.")
575 if self._collector_jobs_ready():
576 return all([job.status ==
"completed" for job
in self._collector_jobs.values()])
578 def _collection_failed(self):
580 Did any of the collections fail?
582 B2DEBUG(29,
"Checking for failed collector job.")
583 if self._collector_jobs_ready():
584 return any([job.status ==
"failed" for job
in self._collector_jobs.values()])
586 def _runner_not_failed(self):
589 bool: If AlgorithmsRunner succeeded return True.
591 return not self._runner_failed()
593 def _runner_failed(self):
596 bool: If AlgorithmsRunner failed return True.
598 if self._runner_final_state == AlgorithmsRunner.FAILED:
603 def _collector_jobs_ready(self):
606 since_last_update = time.time() - self._collector_timing[
"last_update"]
607 if since_last_update > self.calibration.collector_full_update_interval:
608 B2INFO(
"Updating full collector job statuses.")
609 for job
in self._collector_jobs.values():
611 self._collector_timing[
"last_update"] = time.time()
613 num_completed = sum((subjob.status
in subjob.exit_statuses)
for subjob
in job.subjobs.values())
614 total_subjobs = len(job.subjobs)
615 B2INFO(f
"{num_completed}/{total_subjobs} Collector SubJobs finished in"
616 f
" Calibration {self.calibration.name} Job {job.name}.")
617 return all([job.ready()
for job
in self._collector_jobs.values()])
619 def _submit_collections(self):
622 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
623 self._collector_timing[
"start"] = time.time()
624 self._collector_timing[
"last_update"] = time.time()
626 def _no_require_iteration(self):
629 if self._require_iteration()
and self._below_max_iterations():
631 elif self._require_iteration()
and not self._below_max_iterations():
632 B2INFO(f
"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
634 elif not self._require_iteration():
637 def _require_iteration(self):
640 iteration_called =
False
641 for alg_name, results
in self._algorithm_results[self.iteration].items():
642 for result
in results:
643 if result.result == CalibrationAlgorithm.c_Iterate:
644 iteration_called =
True
648 return iteration_called
650 def _log_new_state(self, **kwargs):
653 B2INFO(f
"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
655 def dependencies_completed(self):
657 Condition function to check that the dependencies of our calibration are in the 'completed' state.
658 Technically only need to check explicit dependencies.
660 for calibration
in self.calibration.dependencies:
661 if not calibration.state == calibration.end_state:
666 def automatic_transition(self):
668 Automatically try all transitions out of this state once. Tries fail last.
670 possible_transitions = self.get_transitions(self.state)
671 for transition
in possible_transitions:
673 if transition !=
"fail":
674 getattr(self, transition)()
676 except ConditionError:
679 if "fail" in possible_transitions:
680 getattr(self,
"fail")()
682 raise MachineError(f
"Failed to automatically transition out of {self.state} state.")
684 def _make_output_dir(self):
686 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
689 create_directories(self.root_dir, overwrite=
False)
691 def _make_collector_path(self, name, collection):
693 Creates a basf2 path for the correct collector and serializes it in the
694 self.output_dir/<calibration_name>/<iteration>/paths directory
696 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
698 create_directories(path_output_dir)
699 path_file_name = collection.collector.name() +
'.path'
700 path_file_name = path_output_dir / path_file_name
702 coll_path = create_path()
703 coll_path.add_module(collection.collector)
705 with open(path_file_name,
'bw')
as serialized_path_file:
706 pickle.dump(serialize_path(coll_path), serialized_path_file)
708 return str(path_file_name.absolute())
710 def _make_pre_collector_path(self, name, collection):
712 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
713 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
715 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
716 coll_path = collection.pre_collector_path
717 path_file_name =
'pre_collector.path'
718 path_file_name = os.path.join(path_output_dir, path_file_name)
720 with open(path_file_name,
'bw')
as serialized_path_file:
721 pickle.dump(serialize_path(coll_path), serialized_path_file)
723 return path_file_name
725 def _create_collector_jobs(self):
727 Creates a Job object for the collections of this iteration, ready for submission
730 for collection_name, collection
in self.calibration.collections.items():
731 iteration_dir = self.root_dir.joinpath(str(self.iteration))
732 job = Job(
'_'.join([self.calibration.name, collection_name,
'Iteration', str(self.iteration)]))
733 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
734 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
736 if job.output_dir.exists():
737 B2INFO(f
"Previous output directory for {self.calibration.name} collector {collection_name} exists."
738 f
"Deleting {job.output_dir} before re-submitting.")
739 shutil.rmtree(job.output_dir)
740 job.cmd = collection.job_cmd
741 job.append_current_basf2_setup_cmds()
742 job.input_sandbox_files.append(collection.job_script)
743 collector_path_file = Path(self._make_collector_path(collection_name, collection))
744 job.input_sandbox_files.append(collector_path_file)
745 if collection.pre_collector_path:
746 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
747 job.input_sandbox_files.append(pre_collector_path_file)
750 list_dependent_databases = []
754 for dependency
in self.calibration.dependencies:
755 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
756 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
757 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
760 if self.iteration > 0:
761 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
762 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir,
'outputdb')
763 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
764 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
768 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
775 for database
in collection.database_chain:
776 if database.db_type ==
'local':
777 json_db_chain.append((
'local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
778 elif database.db_type ==
'central':
779 json_db_chain.append((
'central', database.global_tag))
781 raise ValueError(f
"Unknown database type {database.db_type}.")
783 for database
in list_dependent_databases:
784 json_db_chain.append((
'local', database))
785 job_config[
'database_chain'] = json_db_chain
787 job_config_file_path = input_data_directory.joinpath(
'collector_config.json').absolute()
788 with open(job_config_file_path,
'w')
as job_config_file:
789 json.dump(job_config, job_config_file, indent=2)
790 job.input_sandbox_files.append(job_config_file_path)
793 input_data_files = set(collection.input_files)
795 if self.iov_to_calibrate:
796 input_data_files = self.files_containing_iov(input_data_files,
797 collection.files_to_iovs,
798 self.iov_to_calibrate)
800 files_to_ignore = set()
801 for exprun
in self.calibration.ignored_runs:
802 for input_file
in input_data_files:
803 file_iov = self.calibration.files_to_iovs[input_file]
804 if file_iov == exprun.make_iov():
805 B2INFO(f
"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
806 f
"Therefore the input file '{input_file}' from Collection '{collection_name}' "
807 "is being removed from input files list.")
808 files_to_ignore.add(input_file)
809 input_data_files.difference_update(files_to_ignore)
811 if not input_data_files:
812 raise MachineError(f
"No valid input files for Calibration '{self.calibration.name}' "
813 f
" and Collection '{collection_name}'.")
814 job.input_files = list(input_data_files)
816 job.splitter = collection.splitter
817 job.backend_args = collection.backend_args
819 job.output_patterns = collection.output_patterns
820 B2DEBUG(20, f
"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
821 self._collector_jobs[collection_name] = job
823 def _check_valid_collector_output(self):
824 B2INFO(
"Checking that Collector output exists for all collector jobs "
825 f
"using {self.calibration.name}.output_patterns.")
826 if not self._collector_jobs:
827 B2INFO(
"We're restarting so we'll recreate the collector Job object.")
828 self._recover_collector_jobs()
830 for job
in self._collector_jobs.values():
833 for pattern
in job.output_patterns:
834 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
836 raise MachineError(
"No output files from Collector Job")
838 for subjob
in job.subjobs.values():
840 for pattern
in subjob.output_patterns:
841 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
843 raise MachineError(f
"No output files from Collector {subjob}")
845 def _run_algorithms(self):
847 Runs the Calibration Algorithms for this calibration machine.
849 Will run them sequentially locally (possible benefits to using a
850 processing pool for low memory algorithms later on.)
853 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
854 algs_runner.algorithms = self.calibration.algorithms
855 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
856 output_database_dir = algorithm_output_dir.joinpath(
"outputdb")
858 if algorithm_output_dir.exists():
859 B2INFO(f
"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
860 f
"Deleting and recreating {algorithm_output_dir}.")
861 create_directories(algorithm_output_dir)
862 B2INFO(f
"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
863 algs_runner.output_database_dir = output_database_dir
864 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
867 for job
in self._collector_jobs.values():
869 for subjob
in job.subjobs.values():
870 for pattern
in subjob.output_patterns:
871 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
873 for pattern
in job.output_patterns:
874 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
876 algs_runner.input_files = input_files
879 algs_runner.database_chain = self.calibration.database_chain
883 list_dependent_databases = []
884 for dependency
in self.calibration.dependencies:
885 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
886 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
887 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
890 if self.iteration > 0:
891 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
892 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir,
'outputdb')
893 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
894 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
895 algs_runner.dependent_databases = list_dependent_databases
897 algs_runner.ignored_runs = self.calibration.ignored_runs
900 algs_runner.run(self.iov_to_calibrate, self.iteration)
901 except Exception
as err:
905 self._state = State(
"algorithms_failed")
906 self._algorithm_results[self.iteration] = algs_runner.results
907 self._runner_final_state = algs_runner.final_state
909 def _prepare_final_db(self):
911 Take the last iteration's outputdb and copy it to a more easily findable place.
913 database_location = self.root_dir.joinpath(str(self.iteration),
914 self.calibration.alg_output_dir,
916 final_database_location = self.root_dir.joinpath(
'outputdb')
917 if final_database_location.exists():
918 B2INFO(f
"Removing previous final output database for {self.calibration.name} before copying new one.")
919 shutil.rmtree(final_database_location)
920 shutil.copytree(database_location, final_database_location)
923class AlgorithmMachine(Machine):
925 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
930 required_attrs = [
"algorithm",
931 "dependent_databases",
934 "output_database_dir",
939 required_true_attrs = [
"algorithm",
941 "output_database_dir",
945 def __init__(self, algorithm=None, initial_state="init"):
947 Takes an Algorithm object from the caf framework and defines the transitions.
950 self.default_states = [State(
"init"),
952 State(
"running_algorithm"),
956 super().__init__(self.default_states, initial_state)
959 self.algorithm = algorithm
961 self.input_files = []
963 self.dependent_databases = []
966 self.database_chain = []
970 self.output_database_dir =
""
974 self.add_transition(
"setup_algorithm",
"init",
"ready",
975 before=[self._setup_logging,
976 self._change_working_dir,
977 self._setup_database_chain,
978 self._set_input_data,
979 self._pre_algorithm])
980 self.add_transition(
"execute_runs",
"ready",
"running_algorithm",
981 after=self._execute_over_iov)
982 self.add_transition(
"complete",
"running_algorithm",
"completed")
983 self.add_transition(
"fail",
"running_algorithm",
"failed")
984 self.add_transition(
"fail",
"ready",
"failed")
985 self.add_transition(
"setup_algorithm",
"completed",
"ready")
986 self.add_transition(
"setup_algorithm",
"failed",
"ready")
988 def setup_from_dict(self, params):
991 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
993 for attribute_name, value
in params.items():
994 setattr(self, attribute_name, value)
999 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
1001 B2INFO(f
"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
1003 for attribute_name
in self.required_attrs:
1004 if not hasattr(self, attribute_name):
1005 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1008 for attribute_name
in self.required_true_attrs:
1009 if not getattr(self, attribute_name):
1010 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} returned False.")
1014 def _create_output_dir(self, **kwargs):
1016 Create working/output directory of algorithm. Any old directory is overwritten.
1018 create_directories(Path(self.output_dir), overwrite=
True)
1020 def _setup_database_chain(self, **kwargs):
1022 Apply all databases in the correct order.
1026 b2conditions.reset()
1027 b2conditions.override_globaltags()
1030 for database
in self.database_chain:
1031 if database.db_type ==
'local':
1032 B2INFO(f
"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1033 f
"for {self.algorithm.name}.")
1034 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1035 elif database.db_type ==
'central':
1036 B2INFO(f
"Adding Central database tag {database.global_tag} to head of GT chain, "
1037 f
"for {self.algorithm.name}.")
1038 b2conditions.prepend_globaltag(database.global_tag)
1040 raise ValueError(f
"Unknown database type {database.db_type}.")
1044 for filename, directory
in self.dependent_databases:
1045 B2INFO(f
"Adding Local Database {filename} to head of chain of local databases created by"
1046 f
" a dependent calibration, for {self.algorithm.name}.")
1047 b2conditions.prepend_testing_payloads(filename)
1050 create_directories(Path(self.output_database_dir), overwrite=
False)
1053 B2INFO(f
"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1056 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath(
"database.txt")))
1058 def _setup_logging(self, **kwargs):
1062 log_file = os.path.join(self.output_dir, self.algorithm.name +
'_stdout')
1063 B2INFO(f
"Output log file at {log_file}.")
1065 basf2.set_log_level(basf2.LogLevel.INFO)
1066 basf2.log_to_file(log_file)
1068 def _change_working_dir(self, **kwargs):
1071 B2INFO(f
"Changing current working directory to {self.output_dir}.")
1072 os.chdir(self.output_dir)
1074 def _pre_algorithm(self, **kwargs):
1076 Call the user defined algorithm setup function.
1078 B2INFO(
"Running Pre-Algorithm function (if exists)")
1079 if self.algorithm.pre_algorithm:
1082 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs[
"iteration"])
1084 def _execute_over_iov(self, **kwargs):
1086 Does the actual execute of the algorithm on an IoV and records the result.
1088 B2INFO(f
"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1090 runs_to_execute = kwargs[
"runs"]
1091 iov = kwargs[
"apply_iov"]
1092 iteration = kwargs[
"iteration"]
1094 iov = iov_from_runs(runs_to_execute)
1095 B2INFO(f
"Execution will use {iov} for labelling payloads by default.")
1096 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1097 self.result = IoV_Result(iov, alg_result)
1099 def _set_input_data(self, **kwargs):
1100 self.algorithm.data_input(self.input_files)
1103class MachineError(Exception):
1105 Base exception class for this module.
1109class ConditionError(MachineError):
1111 Exception for when conditions fail during a transition.
1115class TransitionError(MachineError):
1117 Exception for when transitions fail.