16 from functools 
import partial
 
   17 from collections 
import defaultdict
 
   23 from pathlib 
import Path
 
   27 from basf2 
import create_path
 
   28 from basf2 
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
 
   29 from basf2 
import conditions 
as b2conditions
 
   32 from ROOT.Belle2 
import CalibrationAlgorithm
 
   34 from caf.utils 
import create_directories
 
   35 from caf.utils 
import method_dispatch
 
   36 from caf.utils 
import iov_from_runs
 
   37 from caf.utils 
import IoV_Result
 
   38 from caf.utils 
import get_iov_from_file
 
   39 from caf.backends 
import Job
 
   40 from caf.runners 
import AlgorithmsRunner
 
   45     Basic State object that can take enter and exit state methods and records 
   46     the state of a machine. 
   48     You should assign the self.on_enter or self.on_exit attributes to callback functions 
   49     or lists of them, if you need them. 
   52     def __init__(self, name, enter=None, exit=None):
 
   54         Initialise State with a name and optional lists of callbacks. 
   66         Runs callbacks when a state is entered. 
   71     def on_enter(self, callbacks):
 
   76             self._add_callbacks(callbacks, self._on_enter)
 
   81         Runs callbacks when a state is exited. 
   86     def on_exit(self, callbacks):
 
   91             self._add_callbacks(callbacks, self._on_exit)
 
   94     def _add_callbacks(self, callback, attribute):
 
   96         Adds callback to a property. 
   98         if callable(callback):
 
   99             attribute.append(callback)
 
  101             B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
 
  103     @_add_callbacks.register(tuple) 
  104     @_add_callbacks.register(list) 
  105     def _(self, callbacks, attribute):
 
  107         Alternate method for lists and tuples of function objects. 
  110             for function 
in callbacks:
 
  111                 if callable(function):
 
  112                     attribute.append(function)
 
  114                     B2ERROR(f
"Something other than a function (callable) passed into State {self.name}.")
 
  124         return f
"State(name={self.name})" 
  126     def __eq__(self, other):
 
  129         if isinstance(other, str):
 
  130             return self.name == other
 
  132             return self.name == other.name
 
  137         return hash(self.name)
 
  143       states (list[str]): A list of possible states of the machine. 
  146     Base class for a final state machine wrapper. 
  147     Implements the framwork that a more complex machine can inherit from. 
  149     The `transitions` attribute is a dictionary of trigger name keys, each value of 
  150     which is another dictionary of 'source' states, 'dest' states, and 'conditions' 
  151     methods. 'conditions' should be a list of callables or a single one. A transition is 
  152     valid if it goes from an allowed state to an allowed state. 
  153     Conditions are optional but must be a callable that returns True or False based 
  154     on some state of the machine. They cannot have input arguments currently. 
  156     Every condition/before/after callback function MUST take ``**kwargs`` as the only 
  157     argument (except ``self`` if it's a class method). This is because it's basically 
  158     impossible to determine which arguments to pass to which functions for a transition. 
  159     Therefore this machine just enforces that every function should simply take ``**kwargs`` 
  160     and use the dictionary of arguments (even if it doesn't need any arguments). 
  162     This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)`` 
  163     you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)`` 
  167     def __init__(self, states=None, initial_state="default_initial"):
 
  169         Basic Setup of states and initial_state. 
  175                 self.add_state(state)
 
  176         if initial_state != 
"default_initial":
 
  178             self.initial_state = initial_state
 
  180             self.add_state(initial_state)
 
  182             self._initial_state = State(initial_state)
 
  185         self._state = self.initial_state
 
  187         self.transitions = defaultdict(list)
 
  189     def add_state(self, state, enter=None, exit=None):
 
  191         Adds a single state to the list of possible ones. 
  192         Should be a unique string or a State object with a unique name. 
  194         if isinstance(state, str):
 
  195             self.add_state(State(state, enter, exit))
 
  196         elif isinstance(state, State):
 
  197             if state.name 
not in self.states.keys():
 
  198                 self.states[state.name] = state
 
  200                 B2WARNING(f
"You asked to add a state {state} but it was already in the machine states.")
 
  202             B2WARNING(f
"You asked to add a state {state} but it wasn't a State or str object")
 
  205     def initial_state(self):
 
  207         The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set. 
  209         return self._initial_state
 
  211     @initial_state.setter 
  212     def initial_state(self, state):
 
  215         if state 
in self.states.keys():
 
  216             self._initial_state = self.states[state]
 
  218             self._state = self.states[state]
 
  220             raise KeyError(f
"Attempted to set state to '{state}' which is not in the 'states' attribute!")
 
  225                 The current state of the machine. Actually a `property` decorator. It will call the exit method of the 
  226                 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states, 
  227                 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!). 
  232     def state(self, state):
 
  235         if isinstance(state, str):
 
  238             state_name = state.name
 
  241             state = self.states[state_name]
 
  243             for callback 
in self.state.on_exit:
 
  244                 callback(prior_state=self.state, new_state=state)
 
  246             for callback 
in state.on_enter:
 
  247                 callback(prior_state=self.state, new_state=state)
 
  251             raise MachineError(f
"Attempted to set state to '{state}' which not in the 'states' attribute!")
 
  254     def default_condition(**kwargs):
 
  256         Method to always return True. 
  260     def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
 
  262         Adds a single transition to the dictionary of possible ones. 
  263         Trigger is the method name that begins the transtion between the 
  264         source state and the destination state. 
  266         The condition is an optional function that returns True or False 
  267         depending on the current state/input. 
  271             source = self.states[source]
 
  272             dest = self.states[dest]
 
  273             transition_dict[
"source"] = source
 
  274             transition_dict[
"dest"] = dest
 
  275         except KeyError 
as err:
 
  276             B2WARNING(
"Tried to add a transition where the source or dest isn't in the list of states")
 
  279             if isinstance(conditions, (list, tuple, set)):
 
  280                 transition_dict[
"conditions"] = list(conditions)
 
  282                 transition_dict[
"conditions"] = [conditions]
 
  284             transition_dict[
"conditions"] = [Machine.default_condition]
 
  288         if isinstance(before, (list, tuple, set)):
 
  289             transition_dict[
"before"] = list(before)
 
  291             transition_dict[
"before"] = [before]
 
  295         if isinstance(after, (list, tuple, set)):
 
  296             transition_dict[
"after"] = list(after)
 
  298             transition_dict[
"after"] = [after]
 
  300         self.transitions[trigger].append(transition_dict)
 
  302     def __getattr__(self, name, **kwargs):
 
  304         Allows us to create a new method for each trigger on the fly. 
  305         If there is no trigger name in the machine to match, then the normal 
  306         AttributeError is called. 
  308         possible_transitions = self.get_transitions(self.state)
 
  309         if name 
not in possible_transitions:
 
  310             raise AttributeError(f
"{name} does not exist in transitions for state {self.state}.")
 
  311         transition_dict = self.get_transition_dict(self.state, name)
 
  312         return partial(self._trigger, name, transition_dict, **kwargs)
 
  314     def _trigger(self, transition_name, transition_dict, **kwargs):
 
  316         Runs the transition logic. Callbacks are evaluated in the order: 
  317         conditions -> before -> <new state set here> -> after. 
  319         dest, conditions, before_callbacks, after_callbacks = (
 
  320             transition_dict[
"dest"],
 
  321             transition_dict[
"conditions"],
 
  322             transition_dict[
"before"],
 
  323             transition_dict[
"after"]
 
  326         if all(map(
lambda condition: self._callback(condition, **kwargs), conditions)):
 
  327             for before_func 
in before_callbacks:
 
  328                 self._callback(before_func, **kwargs)
 
  331             for after_func 
in after_callbacks:
 
  332                 self._callback(after_func, **kwargs)
 
  334             raise ConditionError(f
"Transition '{transition_name}' called for but one or more conditions " 
  338     def _callback(func, **kwargs):
 
  340         Calls a condition/before/after.. function using arguments passed (or not). 
  342         return func(**kwargs)
 
  344     def get_transitions(self, source):
 
  346         Returns allowed transitions from a given state. 
  348         possible_transitions = []
 
  349         for transition, transition_dicts 
in self.transitions.items():
 
  350             for transition_dict 
in transition_dicts:
 
  351                 if transition_dict[
"source"] == source:
 
  352                     possible_transitions.append(transition)
 
  353         return possible_transitions
 
  355     def get_transition_dict(self, state, transition):
 
  357         Returns the transition dictionary for a state and transition out of it. 
  359         transition_dicts = self.transitions[transition]
 
  360         for transition_dict 
in transition_dicts:
 
  361             if transition_dict[
"source"] == state:
 
  362                 return transition_dict
 
  364             raise KeyError(f
"No transition from state {state} with the name {transition}.")
 
  366     def save_graph(self, filename, graphname):
 
  368         Does a simple dot file creation to visualise states and transiitons. 
  370         with open(filename, 
"w") 
as dotfile:
 
  371             dotfile.write(
"digraph " + graphname + 
" {\n")
 
  372             for state 
in self.states.keys():
 
  373                 dotfile.write(
'"' + state + 
'" [shape=ellipse, color=black]\n')
 
  374             for trigger, transition_dicts 
in self.transitions.items():
 
  375                 for transition 
in transition_dicts:
 
  376                     dotfile.write(
'"' + transition[
"source"].name + 
'" -> "' +
 
  377                                   transition[
"dest"].name + 
'" [label="' + trigger + 
'"]\n')
 
  381 class CalibrationMachine(Machine):
 
  383     A state machine to handle `Calibration` objects and the flow of 
  387     collector_input_dir = 
'collector_input' 
  388     collector_output_dir = 
'collector_output' 
  389     algorithm_output_dir = 
'algorithm_output' 
  391     def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
 
  393         Takes a Calibration object from the caf framework and lets you 
  394         set the initial state. 
  397         self.default_states = [State(
"init", enter=[self._update_cal_state,
 
  398                                                     self._log_new_state]),
 
  399                                State(
"running_collector", enter=[self._update_cal_state,
 
  400                                                                  self._log_new_state]),
 
  401                                State(
"collector_failed", enter=[self._update_cal_state,
 
  402                                                                 self._log_new_state]),
 
  403                                State(
"collector_completed", enter=[self._update_cal_state,
 
  404                                                                    self._log_new_state]),
 
  405                                State(
"running_algorithms", enter=[self._update_cal_state,
 
  406                                                                   self._log_new_state]),
 
  407                                State(
"algorithms_failed", enter=[self._update_cal_state,
 
  408                                                                  self._log_new_state]),
 
  409                                State(
"algorithms_completed", enter=[self._update_cal_state,
 
  410                                                                     self._log_new_state]),
 
  411                                State(
"completed", enter=[self._update_cal_state,
 
  412                                                          self._log_new_state]),
 
  413                                State(
"failed", enter=[self._update_cal_state,
 
  414                                                       self._log_new_state])
 
  417         super().__init__(self.default_states, initial_state)
 
  420         self.calibration = calibration
 
  423         self.calibration.machine = self
 
  425         self.iteration = iteration
 
  427         self.collector_backend = 
None 
  429         self._algorithm_results = {}
 
  431         self._runner_final_state = 
None 
  433         self.iov_to_calibrate = iov_to_calibrate
 
  435         self.root_dir = Path(os.getcwd(), calibration.name)
 
  440         self._collector_timing = {}
 
  443         self._collector_jobs = {}
 
  445         self.add_transition(
"submit_collector", 
"init", 
"running_collector",
 
  446                             conditions=self.dependencies_completed,
 
  447                             before=[self._make_output_dir,
 
  448                                     self._resolve_file_paths,
 
  449                                     self._build_iov_dicts,
 
  450                                     self._create_collector_jobs,
 
  451                                     self._submit_collections,
 
  452                                     self._dump_job_config])
 
  453         self.add_transition(
"fail", 
"running_collector", 
"collector_failed",
 
  454                             conditions=self._collection_failed)
 
  455         self.add_transition(
"complete", 
"running_collector", 
"collector_completed",
 
  456                             conditions=self._collection_completed)
 
  457         self.add_transition(
"run_algorithms", 
"collector_completed", 
"running_algorithms",
 
  458                             before=self._check_valid_collector_output,
 
  459                             after=[self._run_algorithms,
 
  460                                    self.automatic_transition])
 
  461         self.add_transition(
"complete", 
"running_algorithms", 
"algorithms_completed",
 
  462                             after=self.automatic_transition,
 
  463                             conditions=self._runner_not_failed)
 
  464         self.add_transition(
"fail", 
"running_algorithms", 
"algorithms_failed",
 
  465                             conditions=self._runner_failed)
 
  466         self.add_transition(
"iterate", 
"algorithms_completed", 
"init",
 
  467                             conditions=[self._require_iteration,
 
  468                                         self._below_max_iterations],
 
  469                             after=self._increment_iteration)
 
  470         self.add_transition(
"finish", 
"algorithms_completed", 
"completed",
 
  471                             conditions=self._no_require_iteration,
 
  472                             before=self._prepare_final_db)
 
  473         self.add_transition(
"fail_fully", 
"algorithms_failed", 
"failed")
 
  474         self.add_transition(
"fail_fully", 
"collector_failed", 
"failed")
 
  476     def _update_cal_state(self, **kwargs):
 
  477         self.calibration.state = str(kwargs[
"new_state"])
 
  479     def files_containing_iov(self, file_paths, files_to_iovs, iov):
 
  481         Lookup function that returns all files from the file_paths that 
  482         overlap with this IoV. 
  485         overlapping_files = set()
 
  487         for file_path, file_iov 
in files_to_iovs.items():
 
  488             if file_iov.overlaps(iov) 
and (file_path 
in file_paths):
 
  489                 overlapping_files.add(file_path)
 
  490         return overlapping_files
 
  492     def _dump_job_config(self):
 
  494         Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered 
  495         later in case of failure. 
  499         while any(map(
lambda j: j.status == 
"init", self._collector_jobs.values())):
 
  500             B2DEBUG(29, 
"Some Collector Jobs still in 'init' state. Waiting...")
 
  503         for collection_name, job 
in self._collector_jobs.items():
 
  504             collector_job_output_file_name = self.calibration.collections[collection_name].job_config
 
  505             output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
 
  506                                                  collection_name, collector_job_output_file_name)
 
  507             job.dump_to_json(output_file)
 
  509     def _recover_collector_jobs(self):
 
  511         Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset. 
  513         for collection_name, collection 
in self.calibration.collections.items():
 
  514             output_file = self.root_dir.joinpath(str(self.iteration),
 
  515                                                  self.collector_input_dir,
 
  517                                                  collection.job_config)
 
  518             self._collector_jobs[collection_name] = Job.from_json(output_file)
 
  520     def _iov_requested(self):
 
  523         if self.iov_to_calibrate:
 
  524             B2DEBUG(20, f
"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
 
  527             B2DEBUG(20, f
"No overall IoV requested for calibration: {self.calibration.name}.")
 
  530     def _resolve_file_paths(self):
 
  535     def _build_iov_dicts(self):
 
  537         Build IoV file dictionary for each collection if required. 
  539         iov_requested = self._iov_requested()
 
  540         if iov_requested 
or self.calibration.ignored_runs:
 
  541             for coll_name, collection 
in self.calibration.collections.items():
 
  542                 if not collection.files_to_iovs:
 
  543                     B2INFO(
"Creating IoV dictionaries to map files to (Exp,Run) ranges for" 
  544                            f
" Calibration '{self.calibration.name} and Collection '{coll_name}'." 
  545                            " Filling dictionary from input file metadata." 
  546                            " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
 
  549                     for file_path 
in collection.input_files:
 
  550                         files_to_iovs[file_path] = get_iov_from_file(file_path)
 
  551                     collection.files_to_iovs = files_to_iovs
 
  553                     B2INFO(
"Using File to IoV mapping from 'files_to_iovs' attribute for " 
  554                            f
"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
 
  556             B2INFO(
"No File to IoV mapping required.")
 
  558     def _below_max_iterations(self):
 
  561         return self.iteration < self.calibration.max_iterations
 
  563     def _increment_iteration(self):
 
  567         self.calibration.iteration = self.iteration
 
  569     def _collection_completed(self):
 
  571         Did all the collections succeed? 
  573         B2DEBUG(29, 
"Checking for failed collector job.")
 
  574         if self._collector_jobs_ready():
 
  575             return all([job.status == 
"completed" for job 
in self._collector_jobs.values()])
 
  577     def _collection_failed(self):
 
  579         Did any of the collections fail? 
  581         B2DEBUG(29, 
"Checking for failed collector job.")
 
  582         if self._collector_jobs_ready():
 
  583             return any([job.status == 
"failed" for job 
in self._collector_jobs.values()])
 
  585     def _runner_not_failed(self):
 
  588             bool: If AlgorithmsRunner succeeded return True. 
  590         return not self._runner_failed()
 
  592     def _runner_failed(self):
 
  595             bool: If AlgorithmsRunner failed return True. 
  597         if self._runner_final_state == AlgorithmsRunner.FAILED:
 
  602     def _collector_jobs_ready(self):
 
  605         since_last_update = time.time() - self._collector_timing[
"last_update"]
 
  606         if since_last_update > self.calibration.collector_full_update_interval:
 
  607             B2INFO(
"Updating full collector job statuses.")
 
  608             for job 
in self._collector_jobs.values():
 
  610                 self._collector_timing[
"last_update"] = time.time()
 
  612                     num_completed = sum((subjob.status 
in subjob.exit_statuses) 
for subjob 
in job.subjobs.values())
 
  613                     total_subjobs = len(job.subjobs)
 
  614                     B2INFO(f
"{num_completed}/{total_subjobs} Collector SubJobs finished in" 
  615                            f
" Calibration {self.calibration.name} Job {job.name}.")
 
  616         return all([job.ready() 
for job 
in self._collector_jobs.values()])
 
  618     def _submit_collections(self):
 
  621         self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
 
  622         self._collector_timing[
"start"] = time.time()
 
  623         self._collector_timing[
"last_update"] = time.time()
 
  625     def _no_require_iteration(self):
 
  628         if self._require_iteration() 
and self._below_max_iterations():
 
  630         elif self._require_iteration() 
and not self._below_max_iterations():
 
  631             B2INFO(f
"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
 
  633         elif not self._require_iteration():
 
  636     def _require_iteration(self):
 
  639         iteration_called = 
False 
  640         for alg_name, results 
in self._algorithm_results[self.iteration].items():
 
  641             for result 
in results:
 
  642                 if result.result == CalibrationAlgorithm.c_Iterate:
 
  643                     iteration_called = 
True 
  647         return iteration_called
 
  649     def _log_new_state(self, **kwargs):
 
  652         B2INFO(f
"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
 
  654     def dependencies_completed(self):
 
  656         Condition function to check that the dependencies of our calibration are in the 'completed' state. 
  657         Technically only need to check explicit dependencies. 
  659         for calibration 
in self.calibration.dependencies:
 
  660             if not calibration.state == calibration.end_state:
 
  665     def automatic_transition(self):
 
  667         Automatically try all transitions out of this state once. Tries fail last. 
  669         possible_transitions = self.get_transitions(self.state)
 
  670         for transition 
in possible_transitions:
 
  672                 if transition != 
"fail":
 
  673                     getattr(self, transition)()
 
  675             except ConditionError:
 
  678             if "fail" in possible_transitions:
 
  679                 getattr(self, 
"fail")()
 
  681                 raise MachineError(f
"Failed to automatically transition out of {self.state} state.")
 
  683     def _make_output_dir(self):
 
  685         Creates the overall root directory of the Calibration. Will not overwrite if it already exists. 
  688         create_directories(self.root_dir, overwrite=
False)
 
  690     def _make_collector_path(self, name, collection):
 
  692         Creates a basf2 path for the correct collector and serializes it in the 
  693         self.output_dir/<calibration_name>/<iteration>/paths directory 
  695         path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
 
  697         create_directories(path_output_dir)
 
  698         path_file_name = collection.collector.name() + 
'.path' 
  699         path_file_name = path_output_dir / path_file_name
 
  701         coll_path = create_path()
 
  702         coll_path.add_module(collection.collector)
 
  704         with open(path_file_name, 
'bw') 
as serialized_path_file:
 
  705             pickle.dump(serialize_path(coll_path), serialized_path_file)
 
  707         return str(path_file_name.absolute())
 
  709     def _make_pre_collector_path(self, name, collection):
 
  711         Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the 
  712         self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory. 
  714         path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
 
  715         coll_path = collection.pre_collector_path
 
  716         path_file_name = 
'pre_collector.path' 
  717         path_file_name = os.path.join(path_output_dir, path_file_name)
 
  719         with open(path_file_name, 
'bw') 
as serialized_path_file:
 
  720             pickle.dump(serialize_path(coll_path), serialized_path_file)
 
  722         return path_file_name
 
  724     def _create_collector_jobs(self):
 
  726         Creates a Job object for the collections of this iteration, ready for submission 
  729         for collection_name, collection 
in self.calibration.collections.items():
 
  730             iteration_dir = self.root_dir.joinpath(str(self.iteration))
 
  731             job = Job(
'_'.join([self.calibration.name, collection_name, 
'Iteration', str(self.iteration)]))
 
  732             job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
 
  733             job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
 
  735             if job.output_dir.exists():
 
  736                 B2INFO(f
"Previous output directory for {self.calibration.name} collector {collection_name} exists." 
  737                        f
"Deleting {job.output_dir} before re-submitting.")
 
  738                 shutil.rmtree(job.output_dir)
 
  739             job.cmd = collection.job_cmd
 
  740             job.append_current_basf2_setup_cmds()
 
  741             job.input_sandbox_files.append(collection.job_script)
 
  742             collector_path_file = Path(self._make_collector_path(collection_name, collection))
 
  743             job.input_sandbox_files.append(collector_path_file)
 
  744             if collection.pre_collector_path:
 
  745                 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
 
  746                 job.input_sandbox_files.append(pre_collector_path_file)
 
  749             list_dependent_databases = []
 
  753             for dependency 
in self.calibration.dependencies:
 
  754                 database_dir = os.path.join(os.getcwd(), dependency.name, 
'outputdb')
 
  755                 B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
 
  756                 list_dependent_databases.append((os.path.join(database_dir, 
'database.txt'), database_dir))
 
  759             if self.iteration > 0:
 
  760                 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
 
  761                 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 
'outputdb')
 
  762                 list_dependent_databases.append((os.path.join(database_dir, 
'database.txt'), database_dir))
 
  763                 B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
 
  767             input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
 
  774             for database 
in collection.database_chain:
 
  775                 if database.db_type == 
'local':
 
  776                     json_db_chain.append((
'local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
 
  777                 elif database.db_type == 
'central':
 
  778                     json_db_chain.append((
'central', database.global_tag))
 
  780                     raise ValueError(f
"Unknown database type {database.db_type}.")
 
  782             for database 
in list_dependent_databases:
 
  783                 json_db_chain.append((
'local', database))
 
  784             job_config[
'database_chain'] = json_db_chain
 
  786             job_config_file_path = input_data_directory.joinpath(
'collector_config.json').absolute()
 
  787             with open(job_config_file_path, 
'w') 
as job_config_file:
 
  788                 json.dump(job_config, job_config_file, indent=2)
 
  789             job.input_sandbox_files.append(job_config_file_path)
 
  792             input_data_files = set(collection.input_files)
 
  794             if self.iov_to_calibrate:
 
  795                 input_data_files = self.files_containing_iov(input_data_files,
 
  796                                                              collection.files_to_iovs,
 
  797                                                              self.iov_to_calibrate)
 
  799             files_to_ignore = set()
 
  800             for exprun 
in self.calibration.ignored_runs:
 
  801                 for input_file 
in input_data_files:
 
  802                     file_iov = self.calibration.files_to_iovs[input_file]
 
  803                     if file_iov == exprun.make_iov():
 
  804                         B2INFO(f
"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. " 
  805                                f
"Therefore the input file '{input_file}' from Collection '{collection_name}' " 
  806                                "is being removed from input files list.")
 
  807                         files_to_ignore.add(input_file)
 
  808             input_data_files.difference_update(files_to_ignore)
 
  810             if not input_data_files:
 
  811                 raise MachineError(f
"No valid input files for Calibration '{self.calibration.name}' " 
  812                                    f
" and Collection '{collection_name}'.")
 
  813             job.input_files = list(input_data_files)
 
  815             job.splitter = collection.splitter
 
  816             job.backend_args = collection.backend_args
 
  818             job.output_patterns = collection.output_patterns
 
  819             B2DEBUG(20, f
"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
 
  820             self._collector_jobs[collection_name] = job
 
  822     def _check_valid_collector_output(self):
 
  823         B2INFO(
"Checking that Collector output exists for all colector jobs " 
  824                f
"using {self.calibration.name}.output_patterns.")
 
  825         if not self._collector_jobs:
 
  826             B2INFO(
"We're restarting so we'll recreate the collector Job object.")
 
  827             self._recover_collector_jobs()
 
  829         for job 
in self._collector_jobs.values():
 
  832                 for pattern 
in job.output_patterns:
 
  833                     output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
 
  835                     raise MachineError(
"No output files from Collector Job")
 
  837                 for subjob 
in job.subjobs.values():
 
  839                     for pattern 
in subjob.output_patterns:
 
  840                         output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
 
  842                         raise MachineError(f
"No output files from Collector {subjob}")
 
  844     def _run_algorithms(self):
 
  846         Runs the Calibration Algorithms for this calibration machine. 
  848         Will run them sequentially locally (possible benefits to using a 
  849         processing pool for low memory algorithms later on.) 
  852         algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
 
  853         algs_runner.algorithms = self.calibration.algorithms
 
  854         algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
 
  855         output_database_dir = algorithm_output_dir.joinpath(
"outputdb")
 
  857         if algorithm_output_dir.exists():
 
  858             B2INFO(f
"Output directory for {self.calibration.name} already exists from a previous CAF attempt. " 
  859                    f
"Deleting and recreating {algorithm_output_dir}.")
 
  860         create_directories(algorithm_output_dir)
 
  861         B2INFO(f
"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
 
  862         algs_runner.output_database_dir = output_database_dir
 
  863         algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
 
  866         for job 
in self._collector_jobs.values():
 
  868                 for subjob 
in job.subjobs.values():
 
  869                     for pattern 
in subjob.output_patterns:
 
  870                         input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
 
  872                 for pattern 
in job.output_patterns:
 
  873                     input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
 
  875         algs_runner.input_files = input_files
 
  878         algs_runner.database_chain = self.calibration.database_chain
 
  882         list_dependent_databases = []
 
  883         for dependency 
in self.calibration.dependencies:
 
  884             database_dir = os.path.join(os.getcwd(), dependency.name, 
'outputdb')
 
  885             B2INFO(f
"Adding local database from {dependency.name} for use by {self.calibration.name}.")
 
  886             list_dependent_databases.append((os.path.join(database_dir, 
'database.txt'), database_dir))
 
  889         if self.iteration > 0:
 
  890             previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
 
  891             database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 
'outputdb')
 
  892             list_dependent_databases.append((os.path.join(database_dir, 
'database.txt'), database_dir))
 
  893             B2INFO(f
"Adding local database from previous iteration of {self.calibration.name}.")
 
  894         algs_runner.dependent_databases = list_dependent_databases
 
  896         algs_runner.ignored_runs = self.calibration.ignored_runs
 
  899             algs_runner.run(self.iov_to_calibrate, self.iteration)
 
  900         except Exception 
as err:
 
  904             self._state = State(
"algorithms_failed")
 
  905         self._algorithm_results[self.iteration] = algs_runner.results
 
  906         self._runner_final_state = algs_runner.final_state
 
  908     def _prepare_final_db(self):
 
  910         Take the last iteration's outputdb and copy it to a more easily findable place. 
  912         database_location = self.root_dir.joinpath(str(self.iteration),
 
  913                                                    self.calibration.alg_output_dir,
 
  915         final_database_location = self.root_dir.joinpath(
'outputdb')
 
  916         if final_database_location.exists():
 
  917             B2INFO(f
"Removing previous final output database for {self.calibration.name} before copying new one.")
 
  918             shutil.rmtree(final_database_location)
 
  919         shutil.copytree(database_location, final_database_location)
 
  922 class AlgorithmMachine(Machine):
 
  924     A state machine to handle the logic of running the algorithm on the overall runs contained in the data. 
  929     required_attrs = [
"algorithm",
 
  930                       "dependent_databases",
 
  933                       "output_database_dir",
 
  938     required_true_attrs = [
"algorithm",
 
  940                            "output_database_dir",
 
  944     def __init__(self, algorithm=None, initial_state="init"):
 
  946         Takes an Algorithm object from the caf framework and defines the transitions. 
  949         self.default_states = [State(
"init"),
 
  951                                State(
"running_algorithm"),
 
  955         super().__init__(self.default_states, initial_state)
 
  958         self.algorithm = algorithm
 
  960         self.input_files = []
 
  962         self.dependent_databases = []
 
  965         self.database_chain = []
 
  969         self.output_database_dir = 
"" 
  973         self.add_transition(
"setup_algorithm", 
"init", 
"ready",
 
  974                             before=[self._setup_logging,
 
  975                                     self._change_working_dir,
 
  976                                     self._setup_database_chain,
 
  977                                     self._set_input_data,
 
  978                                     self._pre_algorithm])
 
  979         self.add_transition(
"execute_runs", 
"ready", 
"running_algorithm",
 
  980                             after=self._execute_over_iov)
 
  981         self.add_transition(
"complete", 
"running_algorithm", 
"completed")
 
  982         self.add_transition(
"fail", 
"running_algorithm", 
"failed")
 
  983         self.add_transition(
"fail", 
"ready", 
"failed")
 
  984         self.add_transition(
"setup_algorithm", 
"completed", 
"ready")
 
  985         self.add_transition(
"setup_algorithm", 
"failed", 
"ready")
 
  987     def setup_from_dict(self, params):
 
  990             params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name. 
  992         for attribute_name, value 
in params.items():
 
  993             setattr(self, attribute_name, value)
 
  998             bool: Whether or not this machine has been set up correctly with all its necessary attributes. 
 1000         B2INFO(
"Checking validity of current setup of AlgorithmMachine for {}.".format(self.algorithm.name))
 
 1002         for attribute_name 
in self.required_attrs:
 
 1003             if not hasattr(self, attribute_name):
 
 1004                 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} doesn't exist.")
 
 1007         for attribute_name 
in self.required_true_attrs:
 
 1008             if not getattr(self, attribute_name):
 
 1009                 B2ERROR(f
"AlgorithmMachine attribute {attribute_name} returned False.")
 
 1013     def _create_output_dir(self, **kwargs):
 
 1015         Create working/output directory of algorithm. Any old directory is overwritten. 
 1017         create_directories(Path(self.output_dir), overwrite=
True)
 
 1019     def _setup_database_chain(self, **kwargs):
 
 1021         Apply all databases in the correct order. 
 1025         b2conditions.reset()
 
 1026         b2conditions.override_globaltags()
 
 1029         for database 
in self.database_chain:
 
 1030             if database.db_type == 
'local':
 
 1031                 B2INFO(f
"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, " 
 1032                        f
"for {self.algorithm.name}.")
 
 1033                 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
 
 1034             elif database.db_type == 
'central':
 
 1035                 B2INFO(f
"Adding Central database tag {database.global_tag} to head of GT chain, " 
 1036                        f
"for {self.algorithm.name}.")
 
 1037                 b2conditions.prepend_globaltag(database.global_tag)
 
 1039                 raise ValueError(f
"Unknown database type {database.db_type}.")
 
 1043         for filename, directory 
in self.dependent_databases:
 
 1044             B2INFO(f
"Adding Local Database {filename} to head of chain of local databases created by" 
 1045                    f
" a dependent calibration, for {self.algorithm.name}.")
 
 1046             b2conditions.prepend_testing_payloads(filename)
 
 1049         create_directories(Path(self.output_database_dir), overwrite=
False)
 
 1052         B2INFO(f
"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
 
 1055         b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath(
"database.txt")))
 
 1057     def _setup_logging(self, **kwargs):
 
 1061         log_file = os.path.join(self.output_dir, self.algorithm.name + 
'_stdout')
 
 1062         B2INFO(f
"Output log file at {log_file}.")
 
 1064         basf2.set_log_level(basf2.LogLevel.INFO)
 
 1065         basf2.log_to_file(log_file)
 
 1067     def _change_working_dir(self, **kwargs):
 
 1070         B2INFO(f
"Changing current working directory to {self.output_dir}.")
 
 1071         os.chdir(self.output_dir)
 
 1073     def _pre_algorithm(self, **kwargs):
 
 1075         Call the user defined algorithm setup function. 
 1077         B2INFO(
"Running Pre-Algorithm function (if exists)")
 
 1078         if self.algorithm.pre_algorithm:
 
 1081             self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs[
"iteration"])
 
 1083     def _execute_over_iov(self, **kwargs):
 
 1085         Does the actual execute of the algorithm on an IoV and records the result. 
 1087         B2INFO(f
"Running {self.algorithm.name} in working directory {os.getcwd()}.")
 
 1089         runs_to_execute = kwargs[
"runs"]
 
 1090         iov = kwargs[
"apply_iov"]
 
 1091         iteration = kwargs[
"iteration"]
 
 1093             iov = iov_from_runs(runs_to_execute)
 
 1094         B2INFO(f
"Execution will use {iov} for labelling payloads by default.")
 
 1095         alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
 
 1096         self.result = IoV_Result(iov, alg_result)
 
 1098     def _set_input_data(self, **kwargs):
 
 1099         self.algorithm.data_input(self.input_files)
 
 1102 class MachineError(Exception):
 
 1104     Base exception class for this module. 
 1108 class ConditionError(MachineError):
 
 1110     Exception for when conditions fail during a transition. 
 1114 class TransitionError(MachineError):
 
 1116     Exception for when transitions fail.