16from functools
import partial
17from collections
import defaultdict
23from pathlib
import Path
27from basf2
import create_path
28from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
29from basf2
import conditions
as b2conditions
32from ROOT.Belle2
import CalibrationAlgorithm
34from caf.utils
import create_directories
35from caf.utils
import method_dispatch
36from caf.utils
import iov_from_runs
37from caf.utils
import IoV_Result
38from caf.utils
import get_iov_from_file
39from caf.backends
import Job
40from caf.runners
import AlgorithmsRunner
45 Basic State object that can take enter and exit state methods
and records
46 the state of a machine.
48 You should assign the self.on_enter
or self.on_exit attributes to callback functions
49 or lists of them,
if you need them.
52 def __init__(self, name, enter=None, exit=None):
54 Initialise State with a name
and optional lists of callbacks.
66 Runs callbacks when a state is entered.
71 def on_enter(self, callbacks):
76 self._add_callbacks(callbacks, self._on_enter)
81 Runs callbacks when a state is exited.
86 def on_exit(self, callbacks):
91 self._add_callbacks(callbacks, self._on_exit)
94 def _add_callbacks(self, callback, attribute):
96 Adds callback to a property.
98 if callable(callback):
99 attribute.append(callback)
101 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
103 @_add_callbacks.register(tuple)
104 @_add_callbacks.register(list)
105 def _(self, callbacks, attribute):
107 Alternate method for lists
and tuples of function objects.
110 for function
in callbacks:
111 if callable(function):
112 attribute.append(function)
114 B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
124 return f
"State(name={self.name})"
126 def __eq__(self, other):
129 if isinstance(other, str):
130 return self.name == other
132 return self.name == other.name
137 return hash(self.name)
143 states (list[str]): A list of possible states of the machine.
146 Base class for a final state machine
wrapper.
147 Implements the framwork that a more complex machine can inherit
from.
149 The `transitions` attribute
is a dictionary of trigger name keys, each value of
150 which
is another dictionary of
'source' states,
'dest' states,
and 'conditions'
151 methods.
'conditions' should be a list of callables
or a single one. A transition
is
152 valid
if it goes
from an allowed state to an allowed state.
153 Conditions are optional but must be a callable that returns
True or False based
154 on some state of the machine. They cannot have input arguments currently.
156 Every condition/before/after callback function MUST take ``**kwargs``
as the only
157 argument (
except ``self``
if it
's a class method). This is because it's basically
158 impossible to determine which arguments to
pass to which functions
for a transition.
159 Therefore this machine just enforces that every function should simply take ``**kwargs``
160 and use the dictionary of arguments (even
if it doesn
't need any arguments).
161 This also means that if you call a trigger
with arguments e.g. ``machine.walk(speed=5)``
162 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
166 def __init__(self, states=None, initial_state="default_initial"):
168 Basic Setup of states and initial_state.
174 self.add_state(state)
175 if initial_state !=
"default_initial":
177 self.initial_state = initial_state
179 self.add_state(initial_state)
181 self._initial_state = State(initial_state)
184 self._state = self.initial_state
186 self.transitions = defaultdict(list)
188 def add_state(self, state, enter=None, exit=None):
190 Adds a single state to the list of possible ones.
191 Should be a unique string or a State object
with a unique name.
193 if isinstance(state, str):
194 self.add_state(State(state, enter, exit))
195 elif isinstance(state, State):
196 if state.name
not in self.states.keys():
197 self.states[state.name] = state
199 B2WARNING(f
"You asked to add a state {state} but it was already in the machine states.")
201 B2WARNING(f
"You asked to add a state {state} but it wasn't a State or str object")
204 def initial_state(self):
206 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
208 return self._initial_state
210 @initial_state.setter
211 def initial_state(self, state):
214 if state
in self.states.keys():
215 self._initial_state = self.states[state]
217 self._state = self.states[state]
219 raise KeyError(f
"Attempted to set state to '{state}' which is not in the 'states' attribute!")
224 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
225 current state and enter method of the new one. To get around the behaviour e.g.
for setting initial states,
226 either use the `initial_state` property
or directly set the _state attribute itself (at your own risk!).
231 def state(self, state):
234 if isinstance(state, str):
237 state_name = state.name
240 state = self.states[state_name]
242 for callback
in self.state.on_exit:
243 callback(prior_state=self.state, new_state=state)
245 for callback
in state.on_enter:
246 callback(prior_state=self.state, new_state=state)
250 raise MachineError(f
"Attempted to set state to '{state}' which not in the 'states' attribute!")
253 def default_condition(**kwargs):
255 Method to always return True.
259 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
261 Adds a single transition to the dictionary of possible ones.
262 Trigger is the method name that begins the transtion between the
263 source state
and the destination state.
265 The condition
is an optional function that returns
True or False
266 depending on the current state/input.
270 source = self.states[source]
271 dest = self.states[dest]
272 transition_dict[
"source"] = source
273 transition_dict[
"dest"] = dest
274 except KeyError
as err:
275 B2WARNING(
"Tried to add a transition where the source or dest isn't in the list of states")
278 if isinstance(conditions, (list, tuple, set)):
279 transition_dict[
"conditions"] = list(conditions)
281 transition_dict[
"conditions"] = [conditions]
283 transition_dict[
"conditions"] = [Machine.default_condition]
287 if isinstance(before, (list, tuple, set)):
288 transition_dict[
"before"] = list(before)
290 transition_dict[
"before"] = [before]
294 if isinstance(after, (list, tuple, set)):
295 transition_dict[
"after"] = list(after)
297 transition_dict[
"after"] = [after]
299 self.transitions[trigger].append(transition_dict)
301 def __getattr__(self, name, **kwargs):
303 Allows us to create a new method for each trigger on the fly.
304 If there
is no trigger name
in the machine to match, then the normal
305 AttributeError
is called.
307 possible_transitions = self.get_transitions(self.state)
308 if name
not in possible_transitions:
309 raise AttributeError(f
"{name} does not exist in transitions for state {self.state}.")
310 transition_dict = self.get_transition_dict(self.state, name)
311 return partial(self._trigger, name, transition_dict, **kwargs)
313 def _trigger(self, transition_name, transition_dict, **kwargs):
315 Runs the transition logic. Callbacks are evaluated in the order:
316 conditions -> before -> <new state set here> -> after.
318 dest, conditions, before_callbacks, after_callbacks = (
319 transition_dict["dest"],
320 transition_dict[
"conditions"],
321 transition_dict[
"before"],
322 transition_dict[
"after"]
325 if all(map(
lambda condition: self._callback(condition, **kwargs), conditions)):
326 for before_func
in before_callbacks:
327 self._callback(before_func, **kwargs)
330 for after_func
in after_callbacks:
331 self._callback(after_func, **kwargs)
333 raise ConditionError(f
"Transition '{transition_name}' called for but one or more conditions "
337 def _callback(func, **kwargs):
339 Calls a condition/before/after.. function using arguments passed (or not).
341 return func(**kwargs)
343 def get_transitions(self, source):
345 Returns allowed transitions from a given state.
347 possible_transitions = []
348 for transition, transition_dicts
in self.transitions.items():
349 for transition_dict
in transition_dicts:
350 if transition_dict[
"source"] == source:
351 possible_transitions.append(transition)
352 return possible_transitions
354 def get_transition_dict(self, state, transition):
356 Returns the transition dictionary for a state
and transition out of it.
358 transition_dicts = self.transitions[transition]
359 for transition_dict
in transition_dicts:
360 if transition_dict[
"source"] == state:
361 return transition_dict
363 raise KeyError(f
"No transition from state {state} with the name {transition}.")
365 def save_graph(self, filename, graphname):
367 Does a simple dot file creation to visualise states and transiitons.
369 with open(filename,
"w")
as dotfile:
370 dotfile.write(
"digraph " + graphname +
" {\n")
371 for state
in self.states.keys():
372 dotfile.write(
'"' + state +
'" [shape=ellipse, color=black]\n')
373 for trigger, transition_dicts
in self.transitions.items():
374 for transition
in transition_dicts:
375 dotfile.write(
'"' + transition[
"source"].name +
'" -> "' +
376 transition[
"dest"].name +
'" [label="' + trigger +
'"]\n')
380class CalibrationMachine(Machine):
382 A state machine to handle `Calibration` objects and the flow of
386 collector_input_dir = 'collector_input'
387 collector_output_dir =
'collector_output'
388 algorithm_output_dir =
'algorithm_output'
390 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
392 Takes a Calibration object from the caf framework
and lets you
393 set the initial state.
396 self.default_states = [State("init", enter=[self._update_cal_state,
397 self._log_new_state]),
398 State(
"running_collector", enter=[self._update_cal_state,
399 self._log_new_state]),
400 State(
"collector_failed", enter=[self._update_cal_state,
401 self._log_new_state]),
402 State(
"collector_completed", enter=[self._update_cal_state,
403 self._log_new_state]),
404 State(
"running_algorithms", enter=[self._update_cal_state,
405 self._log_new_state]),
406 State(
"algorithms_failed", enter=[self._update_cal_state,
407 self._log_new_state]),
408 State(
"algorithms_completed", enter=[self._update_cal_state,
409 self._log_new_state]),
410 State(
"completed", enter=[self._update_cal_state,
411 self._log_new_state]),
412 State(
"failed", enter=[self._update_cal_state,
413 self._log_new_state])
416 super().__init__(self.default_states, initial_state)
419 self.calibration = calibration
422 self.calibration.machine = self
424 self.iteration = iteration
426 self.collector_backend =
None
428 self._algorithm_results = {}
430 self._runner_final_state =
None
432 self.iov_to_calibrate = iov_to_calibrate
434 self.root_dir = Path(os.getcwd(), calibration.name)
439 self._collector_timing = {}
442 self._collector_jobs = {}
444 self.add_transition(
"submit_collector",
"init",
"running_collector",
445 conditions=self.dependencies_completed,
446 before=[self._make_output_dir,
447 self._resolve_file_paths,
448 self._build_iov_dicts,
449 self._create_collector_jobs,
450 self._submit_collections,
451 self._dump_job_config])
452 self.add_transition(
"fail",
"running_collector",
"collector_failed",
453 conditions=self._collection_failed)
454 self.add_transition(
"complete",
"running_collector",
"collector_completed",
455 conditions=self._collection_completed)
456 self.add_transition(
"run_algorithms",
"collector_completed",
"running_algorithms",
457 before=self._check_valid_collector_output,
458 after=[self._run_algorithms,
459 self.automatic_transition])
460 self.add_transition(
"complete",
"running_algorithms",
"algorithms_completed",
461 after=self.automatic_transition,
462 conditions=self._runner_not_failed)
463 self.add_transition(
"fail",
"running_algorithms",
"algorithms_failed",
464 conditions=self._runner_failed)
465 self.add_transition(
"iterate",
"algorithms_completed",
"init",
466 conditions=[self._require_iteration,
467 self._below_max_iterations],
468 after=self._increment_iteration)
469 self.add_transition(
"finish",
"algorithms_completed",
"completed",
470 conditions=self._no_require_iteration,
471 before=self._prepare_final_db)
472 self.add_transition(
"fail_fully",
"algorithms_failed",
"failed")
473 self.add_transition(
"fail_fully",
"collector_failed",
"failed")
475 def _update_cal_state(self, **kwargs):
476 self.calibration.state = str(kwargs[
"new_state"])
478 def files_containing_iov(self, file_paths, files_to_iovs, iov):
480 Lookup function that returns all files from the file_paths that
481 overlap
with this IoV.
484 overlapping_files = set()
486 for file_path, file_iov
in files_to_iovs.items():
487 if file_iov.overlaps(iov)
and (file_path
in file_paths):
488 overlapping_files.add(file_path)
489 return overlapping_files
491 def _dump_job_config(self):
493 Dumps the `Job` object for the collections to JSON files so that it
's configuration can be recovered later in case of failure.
497 while any(map(
lambda j: j.status ==
"init", self._collector_jobs.values())):
498 B2DEBUG(29,
"Some Collector Jobs still in 'init' state. Waiting...")
501 for collection_name, job
in self._collector_jobs.items():
502 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
503 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
504 collection_name, collector_job_output_file_name)
505 job.dump_to_json(output_file)
507 def _recover_collector_jobs(self):
509 Recovers the `Job` object for the collector
from a JSON file
in the event that we are starting
from a reset.
511 for collection_name, collection
in self.calibration.collections.items():
512 output_file = self.root_dir.joinpath(str(self.iteration),
513 self.collector_input_dir,
515 collection.job_config)
516 self._collector_jobs[collection_name] = Job.from_json(output_file)
518 def _iov_requested(self):
521 if self.iov_to_calibrate:
522 B2DEBUG(20, f
"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
525 B2DEBUG(20, f
"No overall IoV requested for calibration: {self.calibration.name}.")
528 def _resolve_file_paths(self):
533 def _build_iov_dicts(self):
535 Build IoV file dictionary for each collection
if required.
537 iov_requested = self._iov_requested()
538 if iov_requested
or self.calibration.ignored_runs:
539 for coll_name, collection
in self.calibration.collections.items():
540 if not collection.files_to_iovs:
541 B2INFO(
"Creating IoV dictionaries to map files to (Exp,Run) ranges for"
542 f
" Calibration '{self.calibration.name} and Collection '{coll_name}'."
543 " Filling dictionary from input file metadata."
544 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
547 for file_path
in collection.input_files:
548 files_to_iovs[file_path] = get_iov_from_file(file_path)
549 collection.files_to_iovs = files_to_iovs
551 B2INFO(
"Using File to IoV mapping from 'files_to_iovs' attribute for "
552 f
"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
554 B2INFO(
"No File to IoV mapping required.")
556 def _below_max_iterations(self):
559 return self.iteration < self.calibration.max_iterations
561 def _increment_iteration(self):
565 self.calibration.iteration = self.iteration
567 def _collection_completed(self):
569 Did all the collections succeed?
571 B2DEBUG(29, "Checking for failed collector job.")
572 if self._collector_jobs_ready():
573 return all([job.status ==
"completed" for job
in self._collector_jobs.values()])
575 def _collection_failed(self):
577 Did any of the collections fail?
579 B2DEBUG(29, "Checking for failed collector job.")
580 if self._collector_jobs_ready():
581 return any([job.status ==
"failed" for job
in self._collector_jobs.values()])
583 def _runner_not_failed(self):
586 bool: If AlgorithmsRunner succeeded return True.
588 return not self._runner_failed()
590 def _runner_failed(self):
593 bool: If AlgorithmsRunner failed return True.
595 if self._runner_final_state == AlgorithmsRunner.FAILED:
600 def _collector_jobs_ready(self):
603 since_last_update = time.time() - self._collector_timing["last_update"]
604 if since_last_update > self.calibration.collector_full_update_interval:
605 B2INFO(
"Updating full collector job statuses.")
606 for job
in self._collector_jobs.values():
608 self._collector_timing[
"last_update"] = time.time()
610 num_completed = sum((subjob.status
in subjob.exit_statuses)
for subjob
in job.subjobs.values())
611 total_subjobs = len(job.subjobs)
612 B2INFO(f
"{num_completed}/{total_subjobs} Collector SubJobs finished in"
613 f
" Calibration {self.calibration.name} Job {job.name}.")
614 return all([job.ready()
for job
in self._collector_jobs.values()])
616 def _submit_collections(self):
619 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
620 self._collector_timing["start"] = time.time()
621 self._collector_timing[
"last_update"] = time.time()
623 def _no_require_iteration(self):
626 if self._require_iteration()
and self._below_max_iterations():
628 elif self._require_iteration()
and not self._below_max_iterations():
629 B2INFO(f
"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
631 elif not self._require_iteration():
634 def _require_iteration(self):
637 iteration_called = False
638 for alg_name, results
in self._algorithm_results[self.iteration].items():
639 for result
in results:
640 if result.result == CalibrationAlgorithm.c_Iterate:
641 iteration_called =
True
645 return iteration_called
647 def _log_new_state(self, **kwargs):
650 B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
652 def dependencies_completed(self):
654 Condition function to check that the dependencies of our calibration are in the
'completed' state.
655 Technically only need to check explicit dependencies.
657 for calibration
in self.calibration.dependencies:
658 if not calibration.state == calibration.end_state:
663 def automatic_transition(self):
665 Automatically try all transitions out of this state once. Tries fail last.
667 possible_transitions = self.get_transitions(self.state)
668 for transition
in possible_transitions:
670 if transition !=
"fail":
671 getattr(self, transition)()
673 except ConditionError:
676 if "fail" in possible_transitions:
677 getattr(self,
"fail")()
679 raise MachineError(f
"Failed to automatically transition out of {self.state} state.")
681 def _make_output_dir(self):
683 Creates the overall root directory of the Calibration. Will not overwrite
if it already exists.
686 create_directories(self.root_dir, overwrite=False)
688 def _make_collector_path(self, name, collection):
690 Creates a basf2 path for the correct collector
and serializes it
in the
691 self.output_dir/<calibration_name>/<iteration>/paths directory
693 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
695 create_directories(path_output_dir)
696 path_file_name = collection.collector.name() +
'.path'
697 path_file_name = path_output_dir / path_file_name
699 coll_path = create_path()
700 coll_path.add_module(collection.collector)
702 with open(path_file_name,
'bw')
as serialized_path_file:
703 pickle.dump(serialize_path(coll_path), serialized_path_file)
705 return str(path_file_name.absolute())
707 def _make_pre_collector_path(self, name, collection):
709 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path)
and serializes it
in the
710 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
712 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
713 coll_path = collection.pre_collector_path
714 path_file_name = 'pre_collector.path'
715 path_file_name = os.path.join(path_output_dir, path_file_name)
717 with open(path_file_name,
'bw')
as serialized_path_file:
718 pickle.dump(serialize_path(coll_path), serialized_path_file)
720 return path_file_name
722 def _create_collector_jobs(self):
724 Creates a Job object for the collections of this iteration, ready
for submission
727 for collection_name, collection
in self.calibration.collections.items():
728 iteration_dir = self.root_dir.joinpath(str(self.iteration))
729 job = Job(
'_'.join([self.calibration.name, collection_name,
'Iteration', str(self.iteration)]))
730 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
731 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
733 if job.output_dir.exists():
734 B2INFO(f
"Previous output directory for {self.calibration.name} collector {collection_name} exists."
735 f
"Deleting {job.output_dir} before re-submitting.")
736 shutil.rmtree(job.output_dir)
737 job.cmd = collection.job_cmd
738 job.append_current_basf2_setup_cmds()
739 job.input_sandbox_files.append(collection.job_script)
740 collector_path_file = Path(self._make_collector_path(collection_name, collection))
741 job.input_sandbox_files.append(collector_path_file)
742 if collection.pre_collector_path:
743 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
744 job.input_sandbox_files.append(pre_collector_path_file)
747 list_dependent_databases = []
751 for dependency
in self.calibration.dependencies:
752 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
753 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
754 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
757 if self.iteration > 0:
758 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
759 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir,
'outputdb')
760 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
761 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
765 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
772 for database
in collection.database_chain:
773 if database.db_type ==
'local':
774 json_db_chain.append((
'local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
775 elif database.db_type ==
'central':
776 json_db_chain.append((
'central', database.global_tag))
778 raise ValueError(f
"Unknown database type {database.db_type}.")
780 for database
in list_dependent_databases:
781 json_db_chain.append((
'local', database))
782 job_config[
'database_chain'] = json_db_chain
784 job_config_file_path = input_data_directory.joinpath(
'collector_config.json').absolute()
785 with open(job_config_file_path,
'w')
as job_config_file:
786 json.dump(job_config, job_config_file, indent=2)
787 job.input_sandbox_files.append(job_config_file_path)
790 input_data_files = set(collection.input_files)
792 if self.iov_to_calibrate:
793 input_data_files = self.files_containing_iov(input_data_files,
794 collection.files_to_iovs,
795 self.iov_to_calibrate)
797 files_to_ignore = set()
798 for exprun
in self.calibration.ignored_runs:
799 for input_file
in input_data_files:
800 file_iov = self.calibration.files_to_iovs[input_file]
801 if file_iov == exprun.make_iov():
802 B2INFO(f
"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
803 f
"Therefore the input file '{input_file}' from Collection '{collection_name}' "
804 "is being removed from input files list.")
805 files_to_ignore.add(input_file)
806 input_data_files.difference_update(files_to_ignore)
808 if not input_data_files:
809 raise MachineError(f
"No valid input files for Calibration '{self.calibration.name}' "
810 f
" and Collection '{collection_name}'.")
811 job.input_files = list(input_data_files)
813 job.splitter = collection.splitter
814 job.backend_args = collection.backend_args
816 job.output_patterns = collection.output_patterns
817 B2DEBUG(20, f
"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
818 self._collector_jobs[collection_name] = job
820 def _check_valid_collector_output(self):
821 B2INFO(
"Checking that Collector output exists for all colector jobs "
822 f
"using {self.calibration.name}.output_patterns.")
823 if not self._collector_jobs:
824 B2INFO(
"We're restarting so we'll recreate the collector Job object.")
825 self._recover_collector_jobs()
827 for job
in self._collector_jobs.values():
830 for pattern
in job.output_patterns:
831 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
833 raise MachineError(
"No output files from Collector Job")
835 for subjob
in job.subjobs.values():
837 for pattern
in subjob.output_patterns:
838 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
840 raise MachineError(f
"No output files from Collector {subjob}")
842 def _run_algorithms(self):
844 Runs the Calibration Algorithms for this calibration machine.
846 Will run them sequentially locally (possible benefits to using a
847 processing pool
for low memory algorithms later on.)
850 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
851 algs_runner.algorithms = self.calibration.algorithms
852 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
853 output_database_dir = algorithm_output_dir.joinpath(
"outputdb")
855 if algorithm_output_dir.exists():
856 B2INFO(f
"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
857 f
"Deleting and recreating {algorithm_output_dir}.")
858 create_directories(algorithm_output_dir)
859 B2INFO(f
"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
860 algs_runner.output_database_dir = output_database_dir
861 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
864 for job
in self._collector_jobs.values():
866 for subjob
in job.subjobs.values():
867 for pattern
in subjob.output_patterns:
868 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
870 for pattern
in job.output_patterns:
871 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
873 algs_runner.input_files = input_files
876 algs_runner.database_chain = self.calibration.database_chain
880 list_dependent_databases = []
881 for dependency
in self.calibration.dependencies:
882 database_dir = os.path.join(os.getcwd(), dependency.name,
'outputdb')
883 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
884 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
887 if self.iteration > 0:
888 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
889 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir,
'outputdb')
890 list_dependent_databases.append((os.path.join(database_dir,
'database.txt'), database_dir))
891 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
892 algs_runner.dependent_databases = list_dependent_databases
894 algs_runner.ignored_runs = self.calibration.ignored_runs
897 algs_runner.run(self.iov_to_calibrate, self.iteration)
898 except Exception
as err:
902 self._state = State(
"algorithms_failed")
903 self._algorithm_results[self.iteration] = algs_runner.results
904 self._runner_final_state = algs_runner.final_state
906 def _prepare_final_db(self):
908 Take the last iteration's outputdb and copy it to a more easily findable place.
910 database_location = self.root_dir.joinpath(str(self.iteration),
911 self.calibration.alg_output_dir,
913 final_database_location = self.root_dir.joinpath(
'outputdb')
914 if final_database_location.exists():
915 B2INFO(f
"Removing previous final output database for {self.calibration.name} before copying new one.")
916 shutil.rmtree(final_database_location)
917 shutil.copytree(database_location, final_database_location)
920class AlgorithmMachine(Machine):
922 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
927 required_attrs = ["algorithm",
928 "dependent_databases",
931 "output_database_dir",
936 required_true_attrs = [
"algorithm",
938 "output_database_dir",
942 def __init__(self, algorithm=None, initial_state="init"):
944 Takes an Algorithm object from the caf framework
and defines the transitions.
947 self.default_states = [State("init"),
949 State(
"running_algorithm"),
953 super().__init__(self.default_states, initial_state)
956 self.algorithm = algorithm
958 self.input_files = []
960 self.dependent_databases = []
963 self.database_chain = []
967 self.output_database_dir =
""
971 self.add_transition(
"setup_algorithm",
"init",
"ready",
972 before=[self._setup_logging,
973 self._change_working_dir,
974 self._setup_database_chain,
975 self._set_input_data,
976 self._pre_algorithm])
977 self.add_transition(
"execute_runs",
"ready",
"running_algorithm",
978 after=self._execute_over_iov)
979 self.add_transition(
"complete",
"running_algorithm",
"completed")
980 self.add_transition(
"fail",
"running_algorithm",
"failed")
981 self.add_transition(
"fail",
"ready",
"failed")
982 self.add_transition(
"setup_algorithm",
"completed",
"ready")
983 self.add_transition(
"setup_algorithm",
"failed",
"ready")
985 def setup_from_dict(self, params):
988 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name. """
989 for attribute_name, value
in params.items():
990 setattr(self, attribute_name, value)
995 bool: Whether or not this machine has been set up correctly
with all its necessary attributes.
997 B2INFO(f"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
999 for attribute_name
in self.required_attrs:
1000 if not hasattr(self, attribute_name):
1001 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1004 for attribute_name
in self.required_true_attrs:
1005 if not getattr(self, attribute_name):
1006 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} returned False.")
1010 def _create_output_dir(self, **kwargs):
1012 Create working/output directory of algorithm. Any old directory is overwritten.
1014 create_directories(Path(self.output_dir), overwrite=True)
1016 def _setup_database_chain(self, **kwargs):
1018 Apply all databases in the correct order.
1022 b2conditions.reset()
1023 b2conditions.override_globaltags()
1026 for database
in self.database_chain:
1027 if database.db_type ==
'local':
1028 B2INFO(f
"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1029 f
"for {self.algorithm.name}.")
1030 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1031 elif database.db_type ==
'central':
1032 B2INFO(f
"Adding Central database tag {database.global_tag} to head of GT chain, "
1033 f
"for {self.algorithm.name}.")
1034 b2conditions.prepend_globaltag(database.global_tag)
1036 raise ValueError(f
"Unknown database type {database.db_type}.")
1040 for filename, directory
in self.dependent_databases:
1041 B2INFO(f
"Adding Local Database {filename} to head of chain of local databases created by"
1042 f
" a dependent calibration, for {self.algorithm.name}.")
1043 b2conditions.prepend_testing_payloads(filename)
1046 create_directories(Path(self.output_database_dir), overwrite=
False)
1049 B2INFO(f
"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1052 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath(
"database.txt")))
1054 def _setup_logging(self, **kwargs):
1058 log_file = os.path.join(self.output_dir, self.algorithm.name +
'_stdout')
1059 B2INFO(f
"Output log file at {log_file}.")
1061 basf2.set_log_level(basf2.LogLevel.INFO)
1062 basf2.log_to_file(log_file)
1064 def _change_working_dir(self, **kwargs):
1067 B2INFO(f"Changing current working directory to {self.output_dir}.")
1068 os.chdir(self.output_dir)
1070 def _pre_algorithm(self, **kwargs):
1072 Call the user defined algorithm setup function.
1074 B2INFO("Running Pre-Algorithm function (if exists)")
1075 if self.algorithm.pre_algorithm:
1078 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs[
"iteration"])
1080 def _execute_over_iov(self, **kwargs):
1082 Does the actual execute of the algorithm on an IoV and records the result.
1084 B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1086 runs_to_execute = kwargs[
"runs"]
1087 iov = kwargs[
"apply_iov"]
1088 iteration = kwargs[
"iteration"]
1090 iov = iov_from_runs(runs_to_execute)
1091 B2INFO(f
"Execution will use {iov} for labelling payloads by default.")
1092 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1093 self.result = IoV_Result(iov, alg_result)
1095 def _set_input_data(self, **kwargs):
1096 self.algorithm.data_input(self.input_files)
1099class MachineError(Exception):
1101 Base exception class for this module.
1105class ConditionError(MachineError):
1107 Exception for when conditions fail during a transition.
1111class TransitionError(MachineError):
1113 Exception for when transitions fail.