16 from functools
import partial
17 from collections
import defaultdict
23 from pathlib
import Path
27 from basf2
import create_path
28 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
29 from basf2
import conditions
as b2conditions
32 from ROOT.Belle2
import CalibrationAlgorithm
34 from caf.utils
import create_directories
35 from caf.utils
import method_dispatch
36 from caf.utils
import iov_from_runs
37 from caf.utils
import IoV_Result
38 from caf.utils
import get_iov_from_file
39 from caf.backends
import Job
40 from caf.runners
import AlgorithmsRunner
45 Basic State object that can take enter and exit state methods and records
46 the state of a machine.
48 You should assign the self.on_enter or self.on_exit attributes to callback functions
49 or lists of them, if you need them.
52 def __init__(self, name, enter=None, exit=None):
54 Initialise State with a name and optional lists of callbacks.
66 Runs callbacks when a state is entered.
71 def on_enter(self, callbacks):
76 self._add_callbacks(callbacks, self._on_enter)
81 Runs callbacks when a state is exited.
86 def on_exit(self, callbacks):
91 self._add_callbacks(callbacks, self._on_exit)
94 def _add_callbacks(self, callback, attribute):
96 Adds callback to a property.
98 if callable(callback):
99 attribute.append(callback)
101 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
103 @_add_callbacks.register(tuple)
104 @_add_callbacks.register(list)
105 def _(self, callbacks, attribute):
107 Alternate method for lists and tuples of function objects.
110 for function
in callbacks:
111 if callable(function):
112 attribute.append(function)
114 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
124 return f
"State(name={self.name})"
126 def __eq__(self, other):
129 if isinstance(other, str):
130 return self.name == other
132 return self.name == other.name
137 return hash(self.name)
143 states (list[str]): A list of possible states of the machine.
146 Base class for a final state machine wrapper.
147 Implements the framwork that a more complex machine can inherit from.
149 The `transitions` attribute is a dictionary of trigger name keys, each value of
150 which is another dictionary of 'source' states, 'dest' states, and 'conditions'
151 methods. 'conditions' should be a list of callables or a single one. A transition is
152 valid if it goes from an allowed state to an allowed state.
153 Conditions are optional but must be a callable that returns True or False based
154 on some state of the machine. They cannot have input arguments currently.
156 Every condition/before/after callback function MUST take ``**kwargs`` as the only
157 argument (except ``self`` if it's a class method). This is because it's basically
158 impossible to determine which arguments to pass to which functions for a transition.
159 Therefore this machine just enforces that every function should simply take ``**kwargs``
160 and use the dictionary of arguments (even if it doesn't need any arguments).
162 This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
163 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
167 def __init__(self, states=None, initial_state="default_initial"):
169 Basic Setup of states and initial_state.
175 self.add_state(state)
176 if initial_state !=
"default_initial":
178 self.initial_state = initial_state
180 self.add_state(initial_state)
182 self._initial_state = State(initial_state)
185 self._state = self.initial_state
187 self.transitions = defaultdict(list)
189 def add_state(self, state, enter=None, exit=None):
191 Adds a single state to the list of possible ones.
192 Should be a unique string or a State object with a unique name.
194 if isinstance(state, str):
195 self.add_state(State(state, enter, exit))
196 elif isinstance(state, State):
197 if state.name
not in self.states.keys():
198 self.states[state.name] = state
200 B2WARNING(f
"You asked to add a state {state} but it was already in the machine states.")
202 B2WARNING(f
"You asked to add a state {state} but it wasn't a State or str object")
205 def initial_state(self):
207 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
209 return self._initial_state
211 @initial_state.setter
212 def initial_state(self, state):
215 if state
in self.states.keys():
216 self._initial_state = self.states[state]
218 self._state = self.states[state]
220 raise KeyError(f
"Attempted to set state to '{state}' which is not in the 'states' attribute!")
225 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
226 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
227 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
232 def state(self, state):
235 if isinstance(state, str):
238 state_name = state.name
241 state = self.states[state_name]
243 for callback
in self.state.on_exit:
244 callback(prior_state=self.state, new_state=state)
246 for callback
in state.on_enter:
247 callback(prior_state=self.state, new_state=state)
251 raise MachineError(f
"Attempted to set state to '{state}' which not in the 'states' attribute!")
254 def default_condition(**kwargs):
256 Method to always return True.
260 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
262 Adds a single transition to the dictionary of possible ones.
263 Trigger is the method name that begins the transtion between the
264 source state and the destination state.
266 The condition is an optional function that returns True or False
267 depending on the current state/input.
271 source = self.states[source]
272 dest = self.states[dest]
273 transition_dict[
"source"] = source
274 transition_dict[
"dest"] = dest
275 except KeyError
as err:
276 B2WARNING(
"Tried to add a transition where the source or dest isn't in the list of states")
279 if isinstance(conditions, (list, tuple, set)):
280 transition_dict[
"conditions"] = list(conditions)
282 transition_dict[
"conditions"] = [conditions]
284 transition_dict[
"conditions"] = [Machine.default_condition]
288 if isinstance(before, (list, tuple, set)):
289 transition_dict[
"before"] = list(before)
291 transition_dict[
"before"] = [before]
295 if isinstance(after, (list, tuple, set)):
296 transition_dict[
"after"] = list(after)
298 transition_dict[
"after"] = [after]
300 self.transitions[trigger].append(transition_dict)
302 def __getattr__(self, name, **kwargs):
304 Allows us to create a new method for each trigger on the fly.
305 If there is no trigger name in the machine to match, then the normal
306 AttributeError is called.
308 possible_transitions = self.get_transitions(self.state)
309 if name
not in possible_transitions:
310 raise AttributeError(f
"{name} does not exist in transitions for state {self.state}.")
311 transition_dict = self.get_transition_dict(self.state, name)
312 return partial(self._trigger, name, transition_dict, **kwargs)
314 def _trigger(self, transition_name, transition_dict, **kwargs):
316 Runs the transition logic. Callbacks are evaluated in the order:
317 conditions -> before -> <new state set here> -> after.
319 dest, conditions, before_callbacks, after_callbacks = (
320 transition_dict[
"dest"],
321 transition_dict[
"conditions"],
322 transition_dict[
"before"],
323 transition_dict[
"after"]
326 if all(map(
lambda condition: self._callback(condition, **kwargs), conditions)):
327 for before_func
in before_callbacks:
328 self._callback(before_func, **kwargs)
331 for after_func
in after_callbacks:
332 self._callback(after_func, **kwargs)
334 raise ConditionError(f
"Transition '{transition_name}' called for but one or more conditions "
338 def _callback(func, **kwargs):
340 Calls a condition/before/after.. function using arguments passed (or not).
342 return func(**kwargs)
344 def get_transitions(self, source):
346 Returns allowed transitions from a given state.
348 possible_transitions = []
349 for transition, transition_dicts
in self.transitions.items():
350 for transition_dict
in transition_dicts:
351 if transition_dict[
"source"] == source:
352 possible_transitions.append(transition)
353 return possible_transitions
355 def get_transition_dict(self, state, transition):
357 Returns the transition dictionary for a state and transition out of it.
359 transition_dicts = self.transitions[transition]
360 for transition_dict
in transition_dicts:
361 if transition_dict[
"source"] == state:
362 return transition_dict
364 raise KeyError(f
"No transition from state {state} with the name {transition}.")
366 def save_graph(self, filename, graphname):
368 Does a simple dot file creation to visualise states and transiitons.
370 with open(filename,
"w")
as dotfile:
371 dotfile.write(
"digraph " + graphname +
" {\n")
372 for state
in self.states.keys():
373 dotfile.write(
'"' + state +
'" [shape=ellipse, color=black]\n')
374 for trigger, transition_dicts
in self.transitions.items():
375 for transition
in transition_dicts:
376 dotfile.write(
'"' + transition[
"source"].name +
'" -> "' +
377 transition[
"dest"].name +
'" [label="' + trigger +
'"]\n')
381 class CalibrationMachine(Machine):
383 A state machine to handle `Calibration` objects and the flow of
387 collector_input_dir =
'collector_input'
388 collector_output_dir =
'collector_output'
389 algorithm_output_dir =
'algorithm_output'
391 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
393 Takes a Calibration object from the caf framework and lets you
394 set the initial state.
397 self.default_states = [State(
"init", enter=[self._update_cal_state,
398 self._log_new_state]),
399 State(
"running_collector", enter=[self._update_cal_state,
400 self._log_new_state]),
401 State(
"collector_failed", enter=[self._update_cal_state,
402 self._log_new_state]),
403 State(
"collector_completed", enter=[self._update_cal_state,
404 self._log_new_state]),
405 State(
"running_algorithms", enter=[self._update_cal_state,
406 self._log_new_state]),
407 State(
"algorithms_failed", enter=[self._update_cal_state,
408 self._log_new_state]),
409 State(
"algorithms_completed", enter=[self._update_cal_state,
410 self._log_new_state]),
411 State(
"completed", enter=[self._update_cal_state,
412 self._log_new_state]),
413 State(
"failed", enter=[self._update_cal_state,
414 self._log_new_state])
417 super().__init__(self.default_states, initial_state)
420 self.calibration = calibration
423 self.calibration.machine = self
425 self.iteration = iteration
427 self.collector_backend =
None
429 self._algorithm_results = {}
431 self._runner_final_state =
None
433 self.iov_to_calibrate = iov_to_calibrate
435 self.root_dir = Path(os.getcwd(), calibration.name)
440 self._collector_timing = {}
443 self._collector_jobs = {}
445 self.add_transition(
"submit_collector",
"init",
"running_collector",
446 conditions=self.dependencies_completed,
447 before=[self._make_output_dir,
448 self._resolve_file_paths,
449 self._build_iov_dicts,
450 self._create_collector_jobs,
451 self._submit_collections,
452 self._dump_job_config])
453 self.add_transition(
"fail",
"running_collector",
"collector_failed",
454 conditions=self._collection_failed)
455 self.add_transition(
"complete",
"running_collector",
"collector_completed",
456 conditions=self._collection_completed)
457 self.add_transition(
"run_algorithms",
"collector_completed",
"running_algorithms",
458 before=self._check_valid_collector_output,
459 after=[self._run_algorithms,
460 self.automatic_transition])
461 self.add_transition(
"complete",
"running_algorithms",
"algorithms_completed",
462 after=self.automatic_transition,
463 conditions=self._runner_not_failed)
464 self.add_transition(
"fail",
"running_algorithms",
"algorithms_failed",
465 conditions=self._runner_failed)
466 self.add_transition(
"iterate",
"algorithms_completed",
"init",
467 conditions=[self._require_iteration,
468 self._below_max_iterations],
469 after=self._increment_iteration)
470 self.add_transition(
"finish",
"algorithms_completed",
"completed",
471 conditions=self._no_require_iteration,
472 before=self._prepare_final_db)
473 self.add_transition(
"fail_fully",
"algorithms_failed",
"failed")
474 self.add_transition(
"fail_fully",
"collector_failed",
"failed")
476 def _update_cal_state(self, **kwargs):
477 self.calibration.state = str(kwargs[
"new_state"])
479 def files_containing_iov(self, file_paths, files_to_iovs, iov):
481 Lookup function that returns all files from the file_paths that
482 overlap with this IoV.
485 overlapping_files = set()
487 for file_path, file_iov
in files_to_iovs.items():
488 if file_iov.overlaps(iov)
and (file_path
in file_paths):
489 overlapping_files.add(file_path)
490 return overlapping_files
492 def _dump_job_config(self):
494 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
495 later in case of failure.
499 while any(map(
lambda j: j.status ==
"init", self._collector_jobs.values())):
500 B2DEBUG(29,
"Some Collector Jobs still in 'init' state. Waiting...")
503 for collection_name, job
in self._collector_jobs.items():
504 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
505 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
506 collection_name, collector_job_output_file_name)
507 job.dump_to_json(output_file)
509 def _recover_collector_jobs(self):
511 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
513 for collection_name, collection
in self.calibration.collections.items():
514 output_file = self.root_dir.joinpath(str(self.iteration),
515 self.collector_input_dir,
517 collection.job_config)
518 self._collector_jobs[collection_name] = Job.from_json(output_file)
520 def _iov_requested(self):
523 if self.iov_to_calibrate:
524 B2DEBUG(20, f
"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
527 B2DEBUG(20, f
"No overall IoV requested for calibration: {self.calibration.name}.")
530 def _resolve_file_paths(self):
535 def _build_iov_dicts(self):
537 Build IoV file dictionary for each collection if required.
539 iov_requested = self._iov_requested()
540 if iov_requested
or self.calibration.ignored_runs:
541 for coll_name, collection
in self.calibration.collections.items():
542 if not collection.files_to_iovs:
543 B2INFO(
"Creating IoV dictionaries to map files to (Exp,Run) ranges for"
544 f
" Calibration '{self.calibration.name} and Collection '{coll_name}'."
545 " Filling dictionary from input file metadata."
546 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
549 for file_path
in collection.input_files:
550 files_to_iovs[file_path] = get_iov_from_file(file_path)
551 collection.files_to_iovs = files_to_iovs
553 B2INFO(
"Using File to IoV mapping from 'files_to_iovs' attribute for "
554 f
"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
556 B2INFO(
"No File to IoV mapping required.")
558 def _below_max_iterations(self):
561 return self.iteration < self.calibration.max_iterations
563 def _increment_iteration(self):
567 self.calibration.iteration = self.iteration
569 def _collection_completed(self):
571 Did all the collections succeed?
573 B2DEBUG(29,
"Checking for failed collector job.")
574 if self._collector_jobs_ready():
575 return all([job.status ==
"completed" for job
in self._collector_jobs.values()])
577 def _collection_failed(self):
579 Did any of the collections fail?
581 B2DEBUG(29,
"Checking for failed collector job.")
582 if self._collector_jobs_ready():
583 return any([job.status ==
"failed" for job
in self._collector_jobs.values()])
585 def _runner_not_failed(self):
588 bool: If AlgorithmsRunner succeeded return True.
590 return not self._runner_failed()
592 def _runner_failed(self):
595 bool: If AlgorithmsRunner failed return True.
597 if self._runner_final_state == AlgorithmsRunner.FAILED:
602 def _collector_jobs_ready(self):
605 since_last_update = time.time() - self._collector_timing[
"last_update"]
606 if since_last_update > self.calibration.collector_full_update_interval:
607 B2INFO(
"Updating full collector job statuses.")
608 for job
in self._collector_jobs.values():
610 self._collector_timing[
"last_update"] = time.time()
612 num_completed = sum((subjob.status
in subjob.exit_statuses)
for subjob
in job.subjobs.values())
613 total_subjobs = len(job.subjobs)
614 B2INFO(f
"{num_completed}/{total_subjobs} Collector SubJobs finished in"
615 f
" Calibration {self.calibration.name} Job {job.name}.")
616 return all([job.ready()
for job
in self._collector_jobs.values()])
618 def _submit_collections(self):
621 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
622 self._collector_timing[
"start"] = time.time()
623 self._collector_timing[
"last_update"] = time.time()
625 def _no_require_iteration(self):
628 if self._require_iteration()
and self._below_max_iterations():
630 elif self._require_iteration()
and not self._below_max_iterations():
631 B2INFO(f
"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
633 elif not self._require_iteration():
636 def _require_iteration(self):
639 iteration_called =
False
640 for alg_name, results
in self._algorithm_results[self.iteration].items():
641 for result
in results:
642 if result.result == CalibrationAlgorithm.c_Iterate:
643 iteration_called =
True
647 return iteration_called
649 def _log_new_state(self, **kwargs):
652 B2INFO(f
"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
654 def dependencies_completed(self):
656 Condition function to check that the dependencies of our calibration are in the 'completed' state.
657 Technically only need to check explicit dependencies.
659 for calibration
in self.calibration.dependencies:
660 if not calibration.state == calibration.end_state:
665 def automatic_transition(self):
667 Automatically try all transitions out of this state once. Tries fail last.
669 possible_transitions = self.get_transitions(self.state)
670 for transition
in possible_transitions:
672 if transition !=
"fail":
673 getattr(self, transition)()
675 except ConditionError:
678 if "fail" in possible_transitions:
679 getattr(self,
"fail")()
681 raise MachineError(f
"Failed to automatically transition out of {self.state} state.")
683 def _make_output_dir(self):
685 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
688 create_directories(self.root_dir, overwrite=
False)
690 def _make_collector_path(self, name, collection):
692 Creates a basf2 path for the correct collector and serializes it in the
693 self.output_dir/<calibration_name>/<iteration>/paths directory
695 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
697 create_directories(path_output_dir)
698 path_file_name = collection.collector.name() +
'.path'
699 path_file_name = path_output_dir / path_file_name
701 coll_path = create_path()
702 coll_path.add_module(collection.collector)
704 with open(path_file_name,
'bw')
as serialized_path_file:
705 pickle.dump(serialize_path(coll_path), serialized_path_file)
707 return str(path_file_name.absolute())
709 def _make_pre_collector_path(self, name, collection):
711 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
712 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
714 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
715 coll_path = collection.pre_collector_path
716 path_file_name =
'pre_collector.path'
717 path_file_name = os.path.join(path_output_dir, path_file_name)
719 with open(path_file_name,
'bw')
as serialized_path_file:
720 pickle.dump(serialize_path(coll_path), serialized_path_file)
722 return path_file_name
724 def _create_collector_jobs(self):
726 Creates a Job object for the collections of this iteration, ready for submission
729 for collection_name, collection
in self.calibration.collections.items():
730 iteration_dir = self.root_dir.joinpath(str(self.iteration))
731 job = Job(
'_'.join([self.calibration.name, collection_name,
'Iteration', str(self.iteration)]))
732 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
733 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
735 if job.output_dir.exists():
736 B2INFO(f
"Previous output directory for {self.calibration.name} collector {collection_name} exists."
737 f
"Deleting {job.output_dir} before re-submitting.")
738 shutil.rmtree(job.output_dir)
739 job.cmd = collection.job_cmd
740 job.append_current_basf2_setup_cmds()
741 job.input_sandbox_files.append(collection.job_script)
742 collector_path_file = Path(self._make_collector_path(collection_name, collection))
743 job.input_sandbox_files.append(collector_path_file)
744 if collection.pre_collector_path:
745 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
746 job.input_sandbox_files.append(pre_collector_path_file)
749 list_dependent_databases = []
753 for dependency
in self.calibration.dependencies:
754 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
755 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
756 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
759 if self.iteration > 0:
760 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
761 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir,
'outputdb')
762 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
763 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
767 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
774 for database
in collection.database_chain:
775 if database.db_type ==
'local':
776 json_db_chain.append((
'local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
777 elif database.db_type ==
'central':
778 json_db_chain.append((
'central', database.global_tag))
780 raise ValueError(f
"Unknown database type {database.db_type}.")
782 for database
in list_dependent_databases:
783 json_db_chain.append((
'local', database))
784 job_config[
'database_chain'] = json_db_chain
786 job_config_file_path = input_data_directory.joinpath(
'collector_config.json').absolute()
787 with open(job_config_file_path,
'w')
as job_config_file:
788 json.dump(job_config, job_config_file, indent=2)
789 job.input_sandbox_files.append(job_config_file_path)
792 input_data_files = set(collection.input_files)
794 if self.iov_to_calibrate:
795 input_data_files = self.files_containing_iov(input_data_files,
796 collection.files_to_iovs,
797 self.iov_to_calibrate)
799 files_to_ignore = set()
800 for exprun
in self.calibration.ignored_runs:
801 for input_file
in input_data_files:
802 file_iov = self.calibration.files_to_iovs[input_file]
803 if file_iov == exprun.make_iov():
804 B2INFO(f
"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
805 f
"Therefore the input file '{input_file}' from Collection '{collection_name}' "
806 "is being removed from input files list.")
807 files_to_ignore.add(input_file)
808 input_data_files.difference_update(files_to_ignore)
810 if not input_data_files:
811 raise MachineError(f
"No valid input files for Calibration '{self.calibration.name}' "
812 f
" and Collection '{collection_name}'.")
813 job.input_files = list(input_data_files)
815 job.splitter = collection.splitter
816 job.backend_args = collection.backend_args
818 job.output_patterns = collection.output_patterns
819 B2DEBUG(20, f
"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
820 self._collector_jobs[collection_name] = job
822 def _check_valid_collector_output(self):
823 B2INFO(
"Checking that Collector output exists for all colector jobs "
824 f
"using {self.calibration.name}.output_patterns.")
825 if not self._collector_jobs:
826 B2INFO(
"We're restarting so we'll recreate the collector Job object.")
827 self._recover_collector_jobs()
829 for job
in self._collector_jobs.values():
832 for pattern
in job.output_patterns:
833 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
835 raise MachineError(
"No output files from Collector Job")
837 for subjob
in job.subjobs.values():
839 for pattern
in subjob.output_patterns:
840 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
842 raise MachineError(f
"No output files from Collector {subjob}")
844 def _run_algorithms(self):
846 Runs the Calibration Algorithms for this calibration machine.
848 Will run them sequentially locally (possible benefits to using a
849 processing pool for low memory algorithms later on.)
852 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
853 algs_runner.algorithms = self.calibration.algorithms
854 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
855 output_database_dir = algorithm_output_dir.joinpath(
"outputdb")
857 if algorithm_output_dir.exists():
858 B2INFO(f
"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
859 f
"Deleting and recreating {algorithm_output_dir}.")
860 create_directories(algorithm_output_dir)
861 B2INFO(f
"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
862 algs_runner.output_database_dir = output_database_dir
863 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
866 for job
in self._collector_jobs.values():
868 for subjob
in job.subjobs.values():
869 for pattern
in subjob.output_patterns:
870 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
872 for pattern
in job.output_patterns:
873 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
875 algs_runner.input_files = input_files
878 algs_runner.database_chain = self.calibration.database_chain
882 list_dependent_databases = []
883 for dependency
in self.calibration.dependencies:
884 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
885 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
886 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
889 if self.iteration > 0:
890 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
891 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir,
'outputdb')
892 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
893 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
894 algs_runner.dependent_databases = list_dependent_databases
896 algs_runner.ignored_runs = self.calibration.ignored_runs
899 algs_runner.run(self.iov_to_calibrate, self.iteration)
900 except Exception
as err:
904 self._state = State(
"algorithms_failed")
905 self._algorithm_results[self.iteration] = algs_runner.results
906 self._runner_final_state = algs_runner.final_state
908 def _prepare_final_db(self):
910 Take the last iteration's outputdb and copy it to a more easily findable place.
912 database_location = self.root_dir.joinpath(str(self.iteration),
913 self.calibration.alg_output_dir,
915 final_database_location = self.root_dir.joinpath(
'outputdb')
916 if final_database_location.exists():
917 B2INFO(f
"Removing previous final output database for {self.calibration.name} before copying new one.")
918 shutil.rmtree(final_database_location)
919 shutil.copytree(database_location, final_database_location)
922 class AlgorithmMachine(Machine):
924 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
929 required_attrs = [
"algorithm",
930 "dependent_databases",
933 "output_database_dir",
938 required_true_attrs = [
"algorithm",
940 "output_database_dir",
944 def __init__(self, algorithm=None, initial_state="init"):
946 Takes an Algorithm object from the caf framework and defines the transitions.
949 self.default_states = [State(
"init"),
951 State(
"running_algorithm"),
955 super().__init__(self.default_states, initial_state)
958 self.algorithm = algorithm
960 self.input_files = []
962 self.dependent_databases = []
965 self.database_chain = []
969 self.output_database_dir =
""
973 self.add_transition(
"setup_algorithm",
"init",
"ready",
974 before=[self._setup_logging,
975 self._change_working_dir,
976 self._setup_database_chain,
977 self._set_input_data,
978 self._pre_algorithm])
979 self.add_transition(
"execute_runs",
"ready",
"running_algorithm",
980 after=self._execute_over_iov)
981 self.add_transition(
"complete",
"running_algorithm",
"completed")
982 self.add_transition(
"fail",
"running_algorithm",
"failed")
983 self.add_transition(
"fail",
"ready",
"failed")
984 self.add_transition(
"setup_algorithm",
"completed",
"ready")
985 self.add_transition(
"setup_algorithm",
"failed",
"ready")
987 def setup_from_dict(self, params):
990 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
992 for attribute_name, value
in params.items():
993 setattr(self, attribute_name, value)
998 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
1000 B2INFO(
"Checking validity of current setup of AlgorithmMachine for {}.".format(self.algorithm.name))
1002 for attribute_name
in self.required_attrs:
1003 if not hasattr(self, attribute_name):
1004 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1007 for attribute_name
in self.required_true_attrs:
1008 if not getattr(self, attribute_name):
1009 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} returned False.")
1013 def _create_output_dir(self, **kwargs):
1015 Create working/output directory of algorithm. Any old directory is overwritten.
1017 create_directories(Path(self.output_dir), overwrite=
True)
1019 def _setup_database_chain(self, **kwargs):
1021 Apply all databases in the correct order.
1025 b2conditions.reset()
1026 b2conditions.override_globaltags()
1029 for database
in self.database_chain:
1030 if database.db_type ==
'local':
1031 B2INFO(f
"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1032 f
"for {self.algorithm.name}.")
1033 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1034 elif database.db_type ==
'central':
1035 B2INFO(f
"Adding Central database tag {database.global_tag} to head of GT chain, "
1036 f
"for {self.algorithm.name}.")
1037 b2conditions.prepend_globaltag(database.global_tag)
1039 raise ValueError(f
"Unknown database type {database.db_type}.")
1043 for filename, directory
in self.dependent_databases:
1044 B2INFO(f
"Adding Local Database {filename} to head of chain of local databases created by"
1045 f
" a dependent calibration, for {self.algorithm.name}.")
1046 b2conditions.prepend_testing_payloads(filename)
1049 create_directories(Path(self.output_database_dir), overwrite=
False)
1052 B2INFO(f
"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1055 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath(
"database.txt")))
1057 def _setup_logging(self, **kwargs):
1061 log_file = os.path.join(self.output_dir, self.algorithm.name +
'_stdout')
1062 B2INFO(f
"Output log file at {log_file}.")
1064 basf2.set_log_level(basf2.LogLevel.INFO)
1065 basf2.log_to_file(log_file)
1067 def _change_working_dir(self, **kwargs):
1070 B2INFO(f
"Changing current working directory to {self.output_dir}.")
1071 os.chdir(self.output_dir)
1073 def _pre_algorithm(self, **kwargs):
1075 Call the user defined algorithm setup function.
1077 B2INFO(
"Running Pre-Algorithm function (if exists)")
1078 if self.algorithm.pre_algorithm:
1081 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs[
"iteration"])
1083 def _execute_over_iov(self, **kwargs):
1085 Does the actual execute of the algorithm on an IoV and records the result.
1087 B2INFO(f
"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1089 runs_to_execute = kwargs[
"runs"]
1090 iov = kwargs[
"apply_iov"]
1091 iteration = kwargs[
"iteration"]
1093 iov = iov_from_runs(runs_to_execute)
1094 B2INFO(f
"Execution will use {iov} for labelling payloads by default.")
1095 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1096 self.result = IoV_Result(iov, alg_result)
1098 def _set_input_data(self, **kwargs):
1099 self.algorithm.data_input(self.input_files)
1102 class MachineError(Exception):
1104 Base exception class for this module.
1108 class ConditionError(MachineError):
1110 Exception for when conditions fail during a transition.
1114 class TransitionError(MachineError):
1116 Exception for when transitions fail.