16from functools
import partial
17from collections
import defaultdict
23from pathlib
import Path
27from basf2
import create_path
28from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
29from basf2
import conditions
as b2conditions
32from ROOT
import Belle2
33from ROOT.Belle2
import CalibrationAlgorithm
35from caf.utils
import create_directories
36from caf.utils
import method_dispatch
37from caf.utils
import iov_from_runs
38from caf.utils
import IoV_Result
39from caf.utils
import get_iov_from_file
40from caf.backends
import Job
41from caf.runners
import AlgorithmsRunner
46 Basic State object that can take enter and exit state methods
and records
47 the state of a machine.
49 You should assign the self.on_enter
or self.on_exit attributes to callback functions
50 or lists of them,
if you need them.
53 def __init__(self, name, enter=None, exit=None):
55 Initialise State with a name
and optional lists of callbacks.
67 Runs callbacks when a state is entered.
72 def on_enter(self, callbacks):
77 self._add_callbacks(callbacks, self._on_enter)
82 Runs callbacks when a state is exited.
87 def on_exit(self, callbacks):
92 self._add_callbacks(callbacks, self._on_exit)
95 def _add_callbacks(self, callback, attribute):
97 Adds callback to a property.
99 if callable(callback):
100 attribute.append(callback)
102 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
104 @_add_callbacks.register(tuple)
105 @_add_callbacks.register(list)
106 def _(self, callbacks, attribute):
108 Alternate method for lists
and tuples of function objects.
111 for function
in callbacks:
112 if callable(function):
113 attribute.append(function)
115 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
125 return f
"State(name={self.name})"
127 def __eq__(self, other):
130 if isinstance(other, str):
131 return self.name == other
133 return self.name == other.name
138 return hash(self.name)
144 states (list[str]): A list of possible states of the machine.
147 Base class for a final state machine
wrapper.
148 Implements the framework that a more complex machine can inherit
from.
150 The `transitions` attribute
is a dictionary of trigger name keys, each value of
151 which
is another dictionary of
'source' states,
'dest' states,
and 'conditions'
152 methods.
'conditions' should be a list of callables
or a single one. A transition
is
153 valid
if it goes
from an allowed state to an allowed state.
154 Conditions are optional but must be a callable that returns
True or False based
155 on some state of the machine. They cannot have input arguments currently.
157 Every condition/before/after callback function MUST take ``**kwargs``
as the only
158 argument (
except ``self``
if it
's a class method). This is because it's basically
159 impossible to determine which arguments to
pass to which functions
for a transition.
160 Therefore this machine just enforces that every function should simply take ``**kwargs``
161 and use the dictionary of arguments (even
if it doesn
't need any arguments).
162 This also means that if you call a trigger
with arguments e.g. ``machine.walk(speed=5)``
163 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
167 def __init__(self, states=None, initial_state="default_initial"):
169 Basic Setup of states and initial_state.
175 self.add_state(state)
176 if initial_state !=
"default_initial":
178 self.initial_state = initial_state
180 self.add_state(initial_state)
182 self._initial_state = State(initial_state)
185 self._state = self.initial_state
187 self.transitions = defaultdict(list)
189 def add_state(self, state, enter=None, exit=None):
191 Adds a single state to the list of possible ones.
192 Should be a unique string or a State object
with a unique name.
194 if isinstance(state, str):
195 self.add_state(State(state, enter, exit))
196 elif isinstance(state, State):
197 if state.name
not in self.states.keys():
198 self.states[state.name] = state
200 B2WARNING(f
"You asked to add a state {state} but it was already in the machine states.")
202 B2WARNING(f
"You asked to add a state {state} but it wasn't a State or str object")
205 def initial_state(self):
207 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
209 return self._initial_state
211 @initial_state.setter
212 def initial_state(self, state):
215 if state
in self.states.keys():
216 self._initial_state = self.states[state]
218 self._state = self.states[state]
220 raise KeyError(f
"Attempted to set state to '{state}' which is not in the 'states' attribute!")
225 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
226 current state and enter method of the new one. To get around the behaviour e.g.
for setting initial states,
227 either use the `initial_state` property
or directly set the _state attribute itself (at your own risk!).
232 def state(self, state):
235 if isinstance(state, str):
238 state_name = state.name
241 state = self.states[state_name]
243 for callback
in self.state.on_exit:
244 callback(prior_state=self.state, new_state=state)
246 for callback
in state.on_enter:
247 callback(prior_state=self.state, new_state=state)
251 raise MachineError(f
"Attempted to set state to '{state}' which not in the 'states' attribute!")
254 def default_condition(**kwargs):
256 Method to always return True.
260 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
262 Adds a single transition to the dictionary of possible ones.
263 Trigger is the method name that begins the transition between the
264 source state
and the destination state.
266 The condition
is an optional function that returns
True or False
267 depending on the current state/input.
271 source = self.states[source]
272 dest = self.states[dest]
273 transition_dict[
"source"] = source
274 transition_dict[
"dest"] = dest
275 except KeyError
as err:
276 B2WARNING(
"Tried to add a transition where the source or dest isn't in the list of states")
279 if isinstance(conditions, (list, tuple, set)):
280 transition_dict[
"conditions"] = list(conditions)
282 transition_dict[
"conditions"] = [conditions]
284 transition_dict[
"conditions"] = [Machine.default_condition]
288 if isinstance(before, (list, tuple, set)):
289 transition_dict[
"before"] = list(before)
291 transition_dict[
"before"] = [before]
295 if isinstance(after, (list, tuple, set)):
296 transition_dict[
"after"] = list(after)
298 transition_dict[
"after"] = [after]
300 self.transitions[trigger].append(transition_dict)
302 def __getattr__(self, name, **kwargs):
304 Allows us to create a new method for each trigger on the fly.
305 If there
is no trigger name
in the machine to match, then the normal
306 AttributeError
is called.
308 possible_transitions = self.get_transitions(self.state)
309 if name
not in possible_transitions:
310 raise AttributeError(f
"{name} does not exist in transitions for state {self.state}.")
311 transition_dict = self.get_transition_dict(self.state, name)
312 return partial(self._trigger, name, transition_dict, **kwargs)
314 def _trigger(self, transition_name, transition_dict, **kwargs):
316 Runs the transition logic. Callbacks are evaluated in the order:
317 conditions -> before -> <new state set here> -> after.
319 dest, conditions, before_callbacks, after_callbacks = (
320 transition_dict["dest"],
321 transition_dict[
"conditions"],
322 transition_dict[
"before"],
323 transition_dict[
"after"]
326 if all(map(
lambda condition: self._callback(condition, **kwargs), conditions)):
327 for before_func
in before_callbacks:
328 self._callback(before_func, **kwargs)
331 for after_func
in after_callbacks:
332 self._callback(after_func, **kwargs)
334 raise ConditionError(f
"Transition '{transition_name}' called for but one or more conditions "
338 def _callback(func, **kwargs):
340 Calls a condition/before/after.. function using arguments passed (or not).
342 return func(**kwargs)
344 def get_transitions(self, source):
346 Returns allowed transitions from a given state.
348 possible_transitions = []
349 for transition, transition_dicts
in self.transitions.items():
350 for transition_dict
in transition_dicts:
351 if transition_dict[
"source"] == source:
352 possible_transitions.append(transition)
353 return possible_transitions
355 def get_transition_dict(self, state, transition):
357 Returns the transition dictionary for a state
and transition out of it.
359 transition_dicts = self.transitions[transition]
360 for transition_dict
in transition_dicts:
361 if transition_dict[
"source"] == state:
362 return transition_dict
364 raise KeyError(f
"No transition from state {state} with the name {transition}.")
366 def save_graph(self, filename, graphname):
368 Does a simple dot file creation to visualise states and transiitons.
370 with open(filename,
"w")
as dotfile:
371 dotfile.write(
"digraph " + graphname +
" {\n")
372 for state
in self.states.keys():
373 dotfile.write(
'"' + state +
'" [shape=ellipse, color=black]\n')
374 for trigger, transition_dicts
in self.transitions.items():
375 for transition
in transition_dicts:
376 dotfile.write(
'"' + transition[
"source"].name +
'" -> "' +
377 transition[
"dest"].name +
'" [label="' + trigger +
'"]\n')
381class CalibrationMachine(Machine):
383 A state machine to handle `Calibration` objects and the flow of
387 collector_input_dir = 'collector_input'
388 collector_output_dir =
'collector_output'
389 algorithm_output_dir =
'algorithm_output'
391 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
393 Takes a Calibration object from the caf framework
and lets you
394 set the initial state.
397 self.default_states = [State("init", enter=[self._update_cal_state,
398 self._log_new_state]),
399 State(
"running_collector", enter=[self._update_cal_state,
400 self._log_new_state]),
401 State(
"collector_failed", enter=[self._update_cal_state,
402 self._log_new_state]),
403 State(
"collector_completed", enter=[self._update_cal_state,
404 self._log_new_state]),
405 State(
"running_algorithms", enter=[self._update_cal_state,
406 self._log_new_state]),
407 State(
"algorithms_failed", enter=[self._update_cal_state,
408 self._log_new_state]),
409 State(
"algorithms_completed", enter=[self._update_cal_state,
410 self._log_new_state]),
411 State(
"completed", enter=[self._update_cal_state,
412 self._log_new_state]),
413 State(
"failed", enter=[self._update_cal_state,
414 self._log_new_state])
417 super().__init__(self.default_states, initial_state)
420 self.calibration = calibration
423 self.calibration.machine = self
425 self.iteration = iteration
427 self.collector_backend =
None
429 self._algorithm_results = {}
431 self._runner_final_state =
None
433 self.iov_to_calibrate = iov_to_calibrate
435 self.root_dir = Path(os.getcwd(), calibration.name)
440 self._collector_timing = {}
443 self._collector_jobs = {}
445 self.add_transition(
"submit_collector",
"init",
"running_collector",
446 conditions=self.dependencies_completed,
447 before=[self._make_output_dir,
448 self._resolve_file_paths,
449 self._build_iov_dicts,
450 self._create_collector_jobs,
451 self._submit_collections,
452 self._dump_job_config])
453 self.add_transition(
"fail",
"running_collector",
"collector_failed",
454 conditions=self._collection_failed)
455 self.add_transition(
"complete",
"running_collector",
"collector_completed",
456 conditions=self._collection_completed)
457 self.add_transition(
"run_algorithms",
"collector_completed",
"running_algorithms",
458 before=self._check_valid_collector_output,
459 after=[self._run_algorithms,
460 self.automatic_transition])
461 self.add_transition(
"complete",
"running_algorithms",
"algorithms_completed",
462 after=self.automatic_transition,
463 conditions=self._runner_not_failed)
464 self.add_transition(
"fail",
"running_algorithms",
"algorithms_failed",
465 conditions=self._runner_failed)
466 self.add_transition(
"iterate",
"algorithms_completed",
"init",
467 conditions=[self._require_iteration,
468 self._below_max_iterations],
469 after=self._increment_iteration)
470 self.add_transition(
"finish",
"algorithms_completed",
"completed",
471 conditions=self._no_require_iteration,
472 before=self._prepare_final_db)
473 self.add_transition(
"fail_fully",
"algorithms_failed",
"failed")
474 self.add_transition(
"fail_fully",
"collector_failed",
"failed")
476 def _update_cal_state(self, **kwargs):
477 self.calibration.state = str(kwargs[
"new_state"])
479 def files_containing_iov(self, file_paths, files_to_iovs, iov):
481 Lookup function that returns all files from the file_paths that
482 overlap
with this IoV.
485 overlapping_files = set()
487 for file_path, file_iov
in files_to_iovs.items():
488 if file_iov.overlaps(iov)
and (file_path
in file_paths):
489 overlapping_files.add(file_path)
490 return overlapping_files
492 def _dump_job_config(self):
494 Dumps the `Job` object for the collections to JSON files so that it
's configuration can be recovered later in case of failure.
498 while any(map(
lambda j: j.status ==
"init", self._collector_jobs.values())):
499 B2DEBUG(29,
"Some Collector Jobs still in 'init' state. Waiting...")
502 for collection_name, job
in self._collector_jobs.items():
503 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
504 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
505 collection_name, collector_job_output_file_name)
506 job.dump_to_json(output_file)
508 def _recover_collector_jobs(self):
510 Recovers the `Job` object for the collector
from a JSON file
in the event that we are starting
from a reset.
512 for collection_name, collection
in self.calibration.collections.items():
513 output_file = self.root_dir.joinpath(str(self.iteration),
514 self.collector_input_dir,
516 collection.job_config)
517 self._collector_jobs[collection_name] = Job.from_json(output_file)
519 def _iov_requested(self):
522 if self.iov_to_calibrate:
523 B2DEBUG(20, f
"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
526 B2DEBUG(20, f
"No overall IoV requested for calibration: {self.calibration.name}.")
529 def _resolve_file_paths(self):
534 def _build_iov_dicts(self):
536 Build IoV file dictionary for each collection
if required.
538 iov_requested = self._iov_requested()
539 if iov_requested
or self.calibration.ignored_runs:
540 for coll_name, collection
in self.calibration.collections.items():
541 if not collection.files_to_iovs:
542 B2INFO(
"Creating IoV dictionaries to map files to (Exp,Run) ranges for"
543 f
" Calibration '{self.calibration.name} and Collection '{coll_name}'."
544 " Filling dictionary from input file metadata."
545 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
548 for file_path
in collection.input_files:
549 files_to_iovs[file_path] = get_iov_from_file(file_path)
550 collection.files_to_iovs = files_to_iovs
552 B2INFO(
"Using File to IoV mapping from 'files_to_iovs' attribute for "
553 f
"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
555 B2INFO(
"No File to IoV mapping required.")
557 def _below_max_iterations(self):
560 return self.iteration < self.calibration.max_iterations
562 def _increment_iteration(self):
566 self.calibration.iteration = self.iteration
568 def _collection_completed(self):
570 Did all the collections succeed?
572 B2DEBUG(29, "Checking for failed collector job.")
573 if self._collector_jobs_ready():
574 return all([job.status ==
"completed" for job
in self._collector_jobs.values()])
576 def _collection_failed(self):
578 Did any of the collections fail?
580 B2DEBUG(29, "Checking for failed collector job.")
581 if self._collector_jobs_ready():
582 return any([job.status ==
"failed" for job
in self._collector_jobs.values()])
584 def _runner_not_failed(self):
587 bool: If AlgorithmsRunner succeeded return True.
589 return not self._runner_failed()
591 def _runner_failed(self):
594 bool: If AlgorithmsRunner failed return True.
596 if self._runner_final_state == AlgorithmsRunner.FAILED:
601 def _collector_jobs_ready(self):
604 since_last_update = time.time() - self._collector_timing["last_update"]
605 if since_last_update > self.calibration.collector_full_update_interval:
606 B2INFO(
"Updating full collector job statuses.")
607 for job
in self._collector_jobs.values():
609 self._collector_timing[
"last_update"] = time.time()
611 num_completed = sum((subjob.status
in subjob.exit_statuses)
for subjob
in job.subjobs.values())
612 total_subjobs = len(job.subjobs)
613 B2INFO(f
"{num_completed}/{total_subjobs} Collector SubJobs finished in"
614 f
" Calibration {self.calibration.name} Job {job.name}.")
615 return all([job.ready()
for job
in self._collector_jobs.values()])
617 def _submit_collections(self):
620 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
621 self._collector_timing["start"] = time.time()
622 self._collector_timing[
"last_update"] = time.time()
624 def _no_require_iteration(self):
627 if self._require_iteration()
and self._below_max_iterations():
629 elif self._require_iteration()
and not self._below_max_iterations():
630 B2INFO(f
"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
632 elif not self._require_iteration():
635 def _require_iteration(self):
638 iteration_called = False
639 for alg_name, results
in self._algorithm_results[self.iteration].items():
640 for result
in results:
641 if result.result == CalibrationAlgorithm.c_Iterate:
642 iteration_called =
True
646 return iteration_called
648 def _log_new_state(self, **kwargs):
651 B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
653 def dependencies_completed(self):
655 Condition function to check that the dependencies of our calibration are in the
'completed' state.
656 Technically only need to check explicit dependencies.
658 for calibration
in self.calibration.dependencies:
659 if not calibration.state == calibration.end_state:
664 def automatic_transition(self):
666 Automatically try all transitions out of this state once. Tries fail last.
668 possible_transitions = self.get_transitions(self.state)
669 for transition
in possible_transitions:
671 if transition !=
"fail":
672 getattr(self, transition)()
674 except ConditionError:
677 if "fail" in possible_transitions:
678 getattr(self,
"fail")()
680 raise MachineError(f
"Failed to automatically transition out of {self.state} state.")
682 def _make_output_dir(self):
684 Creates the overall root directory of the Calibration. Will not overwrite
if it already exists.
687 create_directories(self.root_dir, overwrite=False)
689 def _make_collector_path(self, name, collection):
691 Creates a basf2 path for the correct collector
and serializes it
in the
692 self.output_dir/<calibration_name>/<iteration>/paths directory
694 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
696 create_directories(path_output_dir)
697 path_file_name = collection.collector.name() +
'.path'
698 path_file_name = path_output_dir / path_file_name
700 coll_path = create_path()
701 coll_path.add_module(collection.collector)
703 with open(path_file_name,
'bw')
as serialized_path_file:
704 pickle.dump(serialize_path(coll_path), serialized_path_file)
706 return str(path_file_name.absolute())
708 def _make_pre_collector_path(self, name, collection):
710 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path)
and serializes it
in the
711 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
713 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
714 coll_path = collection.pre_collector_path
715 path_file_name = 'pre_collector.path'
716 path_file_name = os.path.join(path_output_dir, path_file_name)
718 with open(path_file_name,
'bw')
as serialized_path_file:
719 pickle.dump(serialize_path(coll_path), serialized_path_file)
721 return path_file_name
723 def _create_collector_jobs(self):
725 Creates a Job object for the collections of this iteration, ready
for submission
728 for collection_name, collection
in self.calibration.collections.items():
729 iteration_dir = self.root_dir.joinpath(str(self.iteration))
730 job = Job(
'_'.join([self.calibration.name, collection_name,
'Iteration', str(self.iteration)]))
731 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
732 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
734 if job.output_dir.exists():
735 B2INFO(f
"Previous output directory for {self.calibration.name} collector {collection_name} exists."
736 f
"Deleting {job.output_dir} before re-submitting.")
737 shutil.rmtree(job.output_dir)
738 job.cmd = collection.job_cmd
739 job.append_current_basf2_setup_cmds()
740 job.input_sandbox_files.append(collection.job_script)
741 collector_path_file = Path(self._make_collector_path(collection_name, collection))
742 job.input_sandbox_files.append(collector_path_file)
743 if collection.pre_collector_path:
744 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
745 job.input_sandbox_files.append(pre_collector_path_file)
748 list_dependent_databases = []
752 for dependency
in self.calibration.dependencies:
753 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
754 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
755 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
758 if self.iteration > 0:
759 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
760 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir,
'outputdb')
761 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
762 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
766 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
773 for database
in collection.database_chain:
774 if database.db_type ==
'local':
775 json_db_chain.append((
'local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
776 elif database.db_type ==
'central':
777 json_db_chain.append((
'central', database.global_tag))
779 raise ValueError(f
"Unknown database type {database.db_type}.")
781 for database
in list_dependent_databases:
782 json_db_chain.append((
'local', database))
783 job_config[
'database_chain'] = json_db_chain
785 job_config_file_path = input_data_directory.joinpath(
'collector_config.json').absolute()
786 with open(job_config_file_path,
'w')
as job_config_file:
787 json.dump(job_config, job_config_file, indent=2)
788 job.input_sandbox_files.append(job_config_file_path)
791 input_data_files = set(collection.input_files)
793 if self.iov_to_calibrate:
794 input_data_files = self.files_containing_iov(input_data_files,
795 collection.files_to_iovs,
796 self.iov_to_calibrate)
798 files_to_ignore = set()
799 for exprun
in self.calibration.ignored_runs:
800 for input_file
in input_data_files:
801 file_iov = self.calibration.files_to_iovs[input_file]
802 if file_iov == exprun.make_iov():
803 B2INFO(f
"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
804 f
"Therefore the input file '{input_file}' from Collection '{collection_name}' "
805 "is being removed from input files list.")
806 files_to_ignore.add(input_file)
807 input_data_files.difference_update(files_to_ignore)
809 if not input_data_files:
810 raise MachineError(f
"No valid input files for Calibration '{self.calibration.name}' "
811 f
" and Collection '{collection_name}'.")
812 job.input_files = list(input_data_files)
814 job.splitter = collection.splitter
815 job.backend_args = collection.backend_args
817 job.output_patterns = collection.output_patterns
818 B2DEBUG(20, f
"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
819 self._collector_jobs[collection_name] = job
821 def _check_valid_collector_output(self):
822 B2INFO(
"Checking that Collector output exists for all collector jobs "
823 f
"using {self.calibration.name}.output_patterns.")
824 if not self._collector_jobs:
825 B2INFO(
"We're restarting so we'll recreate the collector Job object.")
826 self._recover_collector_jobs()
828 for job
in self._collector_jobs.values():
831 for pattern
in job.output_patterns:
832 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
834 raise MachineError(
"No output files from Collector Job")
836 for subjob
in job.subjobs.values():
838 for pattern
in subjob.output_patterns:
839 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
841 raise MachineError(f
"No output files from Collector {subjob}")
843 def _run_algorithms(self):
845 Runs the Calibration Algorithms for this calibration machine.
847 Will run them sequentially locally (possible benefits to using a
848 processing pool
for low memory algorithms later on.)
851 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
852 algs_runner.algorithms = self.calibration.algorithms
853 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
854 output_database_dir = algorithm_output_dir.joinpath(
"outputdb")
856 if algorithm_output_dir.exists():
857 B2INFO(f
"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
858 f
"Deleting and recreating {algorithm_output_dir}.")
859 create_directories(algorithm_output_dir)
860 B2INFO(f
"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
861 algs_runner.output_database_dir = output_database_dir
862 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
865 for job
in self._collector_jobs.values():
867 for subjob
in job.subjobs.values():
868 for pattern
in subjob.output_patterns:
869 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
871 for pattern
in job.output_patterns:
872 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
874 algs_runner.input_files = input_files
877 algs_runner.database_chain = self.calibration.database_chain
881 list_dependent_databases = []
882 for dependency
in self.calibration.dependencies:
883 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
884 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
885 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
888 if self.iteration > 0:
889 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
890 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir,
'outputdb')
891 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
892 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
893 algs_runner.dependent_databases = list_dependent_databases
895 algs_runner.ignored_runs = self.calibration.ignored_runs
898 algs_runner.run(self.iov_to_calibrate, self.iteration)
899 except Exception
as err:
903 self._state = State(
"algorithms_failed")
904 self._algorithm_results[self.iteration] = algs_runner.results
905 self._runner_final_state = algs_runner.final_state
907 def _prepare_final_db(self):
909 Take the last iteration's outputdb and copy it to a more easily findable place.
911 database_location = self.root_dir.joinpath(str(self.iteration),
912 self.calibration.alg_output_dir,
914 final_database_location = self.root_dir.joinpath(
'outputdb')
915 if final_database_location.exists():
916 B2INFO(f
"Removing previous final output database for {self.calibration.name} before copying new one.")
917 shutil.rmtree(final_database_location)
918 shutil.copytree(database_location, final_database_location)
921class AlgorithmMachine(Machine):
923 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
928 required_attrs = ["algorithm",
929 "dependent_databases",
932 "output_database_dir",
937 required_true_attrs = [
"algorithm",
939 "output_database_dir",
943 def __init__(self, algorithm=None, initial_state="init"):
945 Takes an Algorithm object from the caf framework
and defines the transitions.
948 self.default_states = [State("init"),
950 State(
"running_algorithm"),
954 super().__init__(self.default_states, initial_state)
957 self.algorithm = algorithm
959 self.input_files = []
961 self.dependent_databases = []
964 self.database_chain = []
968 self.output_database_dir =
""
972 self.add_transition(
"setup_algorithm",
"init",
"ready",
973 before=[self._setup_logging,
974 self._change_working_dir,
975 self._setup_database_chain,
976 self._set_input_data,
977 self._pre_algorithm])
978 self.add_transition(
"execute_runs",
"ready",
"running_algorithm",
979 after=self._execute_over_iov)
980 self.add_transition(
"complete",
"running_algorithm",
"completed")
981 self.add_transition(
"fail",
"running_algorithm",
"failed")
982 self.add_transition(
"fail",
"ready",
"failed")
983 self.add_transition(
"setup_algorithm",
"completed",
"ready")
984 self.add_transition(
"setup_algorithm",
"failed",
"ready")
986 def setup_from_dict(self, params):
989 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name. """
990 for attribute_name, value
in params.items():
991 setattr(self, attribute_name, value)
996 bool: Whether or not this machine has been set up correctly
with all its necessary attributes.
998 B2INFO(f"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
1000 for attribute_name
in self.required_attrs:
1001 if not hasattr(self, attribute_name):
1002 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1005 for attribute_name
in self.required_true_attrs:
1006 if not getattr(self, attribute_name):
1007 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} returned False.")
1011 def _create_output_dir(self, **kwargs):
1013 Create working/output directory of algorithm. Any old directory is overwritten.
1015 create_directories(Path(self.output_dir), overwrite=True)
1017 def _setup_database_chain(self, **kwargs):
1019 Apply all databases in the correct order.
1023 b2conditions.reset()
1024 b2conditions.override_globaltags()
1027 for database
in self.database_chain:
1028 if database.db_type ==
'local':
1029 B2INFO(f
"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1030 f
"for {self.algorithm.name}.")
1031 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1032 elif database.db_type ==
'central':
1033 B2INFO(f
"Adding Central database tag {database.global_tag} to head of GT chain, "
1034 f
"for {self.algorithm.name}.")
1035 b2conditions.prepend_globaltag(database.global_tag)
1037 raise ValueError(f
"Unknown database type {database.db_type}.")
1041 for filename, directory
in self.dependent_databases:
1042 B2INFO(f
"Adding Local Database {filename} to head of chain of local databases created by"
1043 f
" a dependent calibration, for {self.algorithm.name}.")
1044 b2conditions.prepend_testing_payloads(filename)
1047 create_directories(Path(self.output_database_dir), overwrite=
False)
1050 B2INFO(f
"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1053 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath(
"database.txt")))
1055 def _setup_logging(self, **kwargs):
1059 log_file = os.path.join(self.output_dir, self.algorithm.name +
'_stdout')
1060 B2INFO(f
"Output log file at {log_file}.")
1062 basf2.set_log_level(basf2.LogLevel.INFO)
1063 basf2.log_to_file(log_file)
1065 def _change_working_dir(self, **kwargs):
1068 B2INFO(f"Changing current working directory to {self.output_dir}.")
1069 os.chdir(self.output_dir)
1071 def _pre_algorithm(self, **kwargs):
1073 Call the user defined algorithm setup function.
1075 B2INFO("Running Pre-Algorithm function (if exists)")
1076 if self.algorithm.pre_algorithm:
1079 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs[
"iteration"])
1081 def _execute_over_iov(self, **kwargs):
1083 Does the actual execute of the algorithm on an IoV and records the result.
1085 B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1087 runs_to_execute = kwargs[
"runs"]
1088 iov = kwargs[
"apply_iov"]
1089 iteration = kwargs[
"iteration"]
1091 iov = iov_from_runs(runs_to_execute)
1092 B2INFO(f
"Execution will use {iov} for labelling payloads by default.")
1093 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1094 self.result = IoV_Result(iov, alg_result)
1096 def _set_input_data(self, **kwargs):
1097 self.algorithm.data_input(self.input_files)
1100class MachineError(Exception):
1102 Base exception class for this module.
1106class ConditionError(MachineError):
1108 Exception for when conditions fail during a transition.
1112class TransitionError(MachineError):
1114 Exception for when transitions fail.