Belle II Software development
AlgorithmMachine Class Reference
Inheritance diagram for AlgorithmMachine:
Machine

Public Member Functions

 __init__ (self, algorithm=None, initial_state="init")
 
 setup_from_dict (self, params)
 
 is_valid (self)
 
 initial_state (self)
 
 initial_state (self, state)
 
 state (self)
 
 state (self, state)
 
 add_state (self, state, enter=None, exit=None)
 
 add_transition (self, trigger, source, dest, conditions=None, before=None, after=None)
 
 __getattr__ (self, name, **kwargs)
 
 get_transitions (self, source)
 
 get_transition_dict (self, state, transition)
 
 save_graph (self, filename, graphname)
 

Static Public Member Functions

 default_condition (**kwargs)
 

Public Attributes

list default_states
 Default states for the AlgorithmMachine.
 
 algorithm = algorithm
 Algorithm() object whose state we are modelling.
 
list input_files = []
 Collector output files, will contain all files returned by the output patterns.
 
list dependent_databases = []
 CAF created local databases from previous calibrations that this calibration/algorithm depends on.
 
list database_chain = []
 Assigned database chain to the overall Calibration object, or to the 'default' Collection.
 
str output_dir = ""
 The algorithm output directory which is mostly used to store the stdout file.
 
str output_database_dir = ""
 The output database directory for the localdb that the algorithm will commit to.
 
 result = None
 IoV_Result object for a single execution, will be reset upon a new execution.
 
dict states = {}
 Valid states for this machine.
 
 initial_state = initial_state
 Pointless docstring since it's a property.
 
 transitions = defaultdict(list)
 Allowed transitions between states.
 
 state = dest
 Current State of machine.
 

Static Public Attributes

list required_attrs
 Required attributes that must exist before the machine can run properly.
 
list required_true_attrs
 Attributes that must have a value that returns True when tested.
 

Protected Member Functions

 _create_output_dir (self, **kwargs)
 
 _setup_database_chain (self, **kwargs)
 
 _setup_logging (self, **kwargs)
 
 _change_working_dir (self, **kwargs)
 
 _pre_algorithm (self, **kwargs)
 
 _execute_over_iov (self, **kwargs)
 
 _set_input_data (self, **kwargs)
 
 _trigger (self, transition_name, transition_dict, **kwargs)
 

Static Protected Member Functions

 _callback (func, **kwargs)
 

Protected Attributes

dict _initial_state = State(initial_state)
 Actual attribute holding initial state for this machine.
 
dict _state = self.initial_state
 Actual attribute holding the Current state.
 

Detailed Description

A state machine to handle the logic of running the algorithm on the overall runs contained in the data.

Definition at line 929 of file state_machines.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
algorithm = None,
initial_state = "init" )
Takes an Algorithm object from the caf framework and defines the transitions.

Definition at line 951 of file state_machines.py.

951 def __init__(self, algorithm=None, initial_state="init"):
952 """
953 Takes an Algorithm object from the caf framework and defines the transitions.
954 """
955
956 self.default_states = [State("init"),
957 State("ready"),
958 State("running_algorithm"),
959 State("completed"),
960 State("failed")]
961
962 super().__init__(self.default_states, initial_state)
963
964
965 self.algorithm = algorithm
966
967 self.input_files = []
968
969 self.dependent_databases = []
970
972 self.database_chain = []
973
974 self.output_dir = ""
975
976 self.output_database_dir = ""
977
978 self.result = None
979
980 self.add_transition("setup_algorithm", "init", "ready",
981 before=[self._setup_logging,
982 self._change_working_dir,
983 self._setup_database_chain,
984 self._set_input_data,
985 self._pre_algorithm])
986 self.add_transition("execute_runs", "ready", "running_algorithm",
987 after=self._execute_over_iov)
988 self.add_transition("complete", "running_algorithm", "completed")
989 self.add_transition("fail", "running_algorithm", "failed")
990 self.add_transition("fail", "ready", "failed")
991 self.add_transition("setup_algorithm", "completed", "ready")
992 self.add_transition("setup_algorithm", "failed", "ready")
993

Member Function Documentation

◆ __getattr__()

__getattr__ ( self,
name,
** kwargs )
inherited
Allows us to create a new method for each trigger on the fly.
If there is no trigger name in the machine to match, then the normal
AttributeError is called.

Definition at line 302 of file state_machines.py.

302 def __getattr__(self, name, **kwargs):
303 """
304 Allows us to create a new method for each trigger on the fly.
305 If there is no trigger name in the machine to match, then the normal
306 AttributeError is called.
307 """
308 possible_transitions = self.get_transitions(self.state)
309 if name not in possible_transitions:
310 raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
311 transition_dict = self.get_transition_dict(self.state, name)
312 # \cond silence doxygen warning about _trigger
313 return partial(self._trigger, name, transition_dict, **kwargs)
314 # \endcond
315

◆ _callback()

_callback ( func,
** kwargs )
staticprotectedinherited
Calls a condition/before/after.. function using arguments passed (or not).

Definition at line 340 of file state_machines.py.

340 def _callback(func, **kwargs):
341 """
342 Calls a condition/before/after.. function using arguments passed (or not).
343 """
344 return func(**kwargs)
345

◆ _change_working_dir()

_change_working_dir ( self,
** kwargs )
protected
 

Definition at line 1074 of file state_machines.py.

1074 def _change_working_dir(self, **kwargs):
1075 """
1076 """
1077 B2INFO(f"Changing current working directory to {self.output_dir}.")
1078 os.chdir(self.output_dir)
1079

◆ _create_output_dir()

_create_output_dir ( self,
** kwargs )
protected
Create working/output directory of algorithm. Any old directory is overwritten.

Definition at line 1020 of file state_machines.py.

1020 def _create_output_dir(self, **kwargs):
1021 """
1022 Create working/output directory of algorithm. Any old directory is overwritten.
1023 """
1024 create_directories(Path(self.output_dir), overwrite=True)
1025

◆ _execute_over_iov()

_execute_over_iov ( self,
** kwargs )
protected
Does the actual execute of the algorithm on an IoV and records the result.

Definition at line 1090 of file state_machines.py.

1090 def _execute_over_iov(self, **kwargs):
1091 """
1092 Does the actual execute of the algorithm on an IoV and records the result.
1093 """
1094 B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1095
1096 runs_to_execute = kwargs["runs"]
1097 iov = kwargs["apply_iov"]
1098 iteration = kwargs["iteration"]
1099 if not iov:
1100 iov = iov_from_runs(runs_to_execute)
1101 B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1102 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1103 self.result = IoV_Result(iov, alg_result)
1104

◆ _pre_algorithm()

_pre_algorithm ( self,
** kwargs )
protected
Call the user defined algorithm setup function.

Definition at line 1080 of file state_machines.py.

1080 def _pre_algorithm(self, **kwargs):
1081 """
1082 Call the user defined algorithm setup function.
1083 """
1084 B2INFO("Running Pre-Algorithm function (if exists)")
1085 if self.algorithm.pre_algorithm:
1086 # We have to re-pass in the algorithm here because an outside user has created this method.
1087 # So the method isn't bound to the instance properly.
1088 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs["iteration"])
1089

◆ _set_input_data()

_set_input_data ( self,
** kwargs )
protected
set input data

Definition at line 1105 of file state_machines.py.

1105 def _set_input_data(self, **kwargs):
1106 """set input data"""
1107 self.algorithm.data_input(self.input_files)
1108
1109

◆ _setup_database_chain()

_setup_database_chain ( self,
** kwargs )
protected
Apply all databases in the correct order.

Definition at line 1026 of file state_machines.py.

1026 def _setup_database_chain(self, **kwargs):
1027 """
1028 Apply all databases in the correct order.
1029 """
1030 # We deliberately override the normal database ordering because we don't want input files GTs to affect
1031 # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1032 b2conditions.reset()
1033 b2conditions.override_globaltags()
1034
1035 # Apply all the databases in order, starting with the user-set chain for this Calibration
1036 for database in self.database_chain:
1037 if database.db_type == 'local':
1038 B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1039 f"for {self.algorithm.name}.")
1040 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1041 elif database.db_type == 'central':
1042 B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1043 f"for {self.algorithm.name}.")
1044 b2conditions.prepend_globaltag(database.global_tag)
1045 else:
1046 raise ValueError(f"Unknown database type {database.db_type}.")
1047 # Here we add the finished databases of previous calibrations that we depend on.
1048 # We can assume that the databases exist as we can't be here until they have returned
1049 # with OK status.
1050 for filename, directory in self.dependent_databases:
1051 B2INFO(f"Adding Local Database {filename} to head of chain of local databases created by"
1052 f" a dependent calibration, for {self.algorithm.name}.")
1053 b2conditions.prepend_testing_payloads(filename)
1054
1055 # Create a directory to store the payloads of this algorithm
1056 create_directories(Path(self.output_database_dir), overwrite=False)
1057
1058 # add local database to save payloads
1059 B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1060 # Things have changed. We now need to do the expert settings to create a database directly.
1061 # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1062 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath("database.txt")))
1063

◆ _setup_logging()

_setup_logging ( self,
** kwargs )
protected
 

Definition at line 1064 of file state_machines.py.

1064 def _setup_logging(self, **kwargs):
1065 """
1066 """
1067 # add logfile for output
1068 log_file = os.path.join(self.output_dir, self.algorithm.name + '_stdout')
1069 B2INFO(f"Output log file at {log_file}.")
1070 basf2.reset_log()
1071 basf2.set_log_level(basf2.LogLevel.INFO)
1072 basf2.log_to_file(log_file)
1073

◆ _trigger()

_trigger ( self,
transition_name,
transition_dict,
** kwargs )
protectedinherited
Runs the transition logic. Callbacks are evaluated in the order:
conditions -> before -> <new state set here> -> after.

Definition at line 316 of file state_machines.py.

316 def _trigger(self, transition_name, transition_dict, **kwargs):
317 """
318 Runs the transition logic. Callbacks are evaluated in the order:
319 conditions -> before -> <new state set here> -> after.
320 """
321 dest, conditions, before_callbacks, after_callbacks = (
322 transition_dict["dest"],
323 transition_dict["conditions"],
324 transition_dict["before"],
325 transition_dict["after"]
326 )
327 # Returns True only if every condition returns True when called
328 if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
329 for before_func in before_callbacks:
330 self._callback(before_func, **kwargs)
331
332 self.state = dest
333 for after_func in after_callbacks:
334 self._callback(after_func, **kwargs)
335 else:
336 raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
337 "evaluated False")
338
STL class.

◆ add_state()

add_state ( self,
state,
enter = None,
exit = None )
inherited
Adds a single state to the list of possible ones.
Should be a unique string or a State object with a unique name.

Definition at line 189 of file state_machines.py.

189 def add_state(self, state, enter=None, exit=None):
190 """
191 Adds a single state to the list of possible ones.
192 Should be a unique string or a State object with a unique name.
193 """
194 if isinstance(state, str):
195 self.add_state(State(state, enter, exit))
196 elif isinstance(state, State):
197 if state.name not in self.states.keys():
198 self.states[state.name] = state
199 else:
200 B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
201 else:
202 B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
203

◆ add_transition()

add_transition ( self,
trigger,
source,
dest,
conditions = None,
before = None,
after = None )
inherited
Adds a single transition to the dictionary of possible ones.
Trigger is the method name that begins the transition between the
source state and the destination state.

The condition is an optional function that returns True or False
depending on the current state/input.

Definition at line 260 of file state_machines.py.

260 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
261 """
262 Adds a single transition to the dictionary of possible ones.
263 Trigger is the method name that begins the transition between the
264 source state and the destination state.
265
266 The condition is an optional function that returns True or False
267 depending on the current state/input.
268 """
269 transition_dict = {}
270 try:
271 source = self.states[source]
272 dest = self.states[dest]
273 transition_dict["source"] = source
274 transition_dict["dest"] = dest
275 except KeyError as err:
276 B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
277 raise err
278 if conditions:
279 if isinstance(conditions, (list, tuple, set)):
280 transition_dict["conditions"] = list(conditions)
281 else:
282 transition_dict["conditions"] = [conditions]
283 else:
284 transition_dict["conditions"] = [Machine.default_condition]
285
286 if not before:
287 before = []
288 if isinstance(before, (list, tuple, set)):
289 transition_dict["before"] = list(before)
290 else:
291 transition_dict["before"] = [before]
292
293 if not after:
294 after = []
295 if isinstance(after, (list, tuple, set)):
296 transition_dict["after"] = list(after)
297 else:
298 transition_dict["after"] = [after]
299
300 self.transitions[trigger].append(transition_dict)
301

◆ default_condition()

default_condition ( ** kwargs)
staticinherited
Method to always return True.

Definition at line 254 of file state_machines.py.

254 def default_condition(**kwargs):
255 """
256 Method to always return True.
257 """
258 return True
259

◆ get_transition_dict()

get_transition_dict ( self,
state,
transition )
inherited
Returns the transition dictionary for a state and transition out of it.

Definition at line 357 of file state_machines.py.

357 def get_transition_dict(self, state, transition):
358 """
359 Returns the transition dictionary for a state and transition out of it.
360 """
361 transition_dicts = self.transitions[transition]
362 for transition_dict in transition_dicts:
363 if transition_dict["source"] == state:
364 return transition_dict
365 else:
366 raise KeyError(f"No transition from state {state} with the name {transition}.")
367

◆ get_transitions()

get_transitions ( self,
source )
inherited
Returns allowed transitions from a given state.

Definition at line 346 of file state_machines.py.

346 def get_transitions(self, source):
347 """
348 Returns allowed transitions from a given state.
349 """
350 possible_transitions = []
351 for transition, transition_dicts in self.transitions.items():
352 for transition_dict in transition_dicts:
353 if transition_dict["source"] == source:
354 possible_transitions.append(transition)
355 return possible_transitions
356

◆ initial_state() [1/2]

initial_state ( self)
inherited
The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.

Definition at line 205 of file state_machines.py.

205 def initial_state(self):
206 """
207 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
208 """
209 return self._initial_state
210

◆ initial_state() [2/2]

initial_state ( self,
state )
inherited
 

Definition at line 212 of file state_machines.py.

212 def initial_state(self, state):
213 """
214 """
215 if state in self.states.keys():
216 self._initial_state = self.states[state]
217
218 self._state = self.states[state]
219 else:
220 raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
221

◆ is_valid()

is_valid ( self)
Returns:
    bool: Whether or not this machine has been set up correctly with all its necessary attributes.

Definition at line 1002 of file state_machines.py.

1002 def is_valid(self):
1003 """
1004 Returns:
1005 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
1006 """
1007 B2INFO(f"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
1008 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
1009 for attribute_name in self.required_attrs:
1010 if not hasattr(self, attribute_name):
1011 B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1012 return False
1013 # Check if any attributes that need actual values haven't been set or were empty
1014 for attribute_name in self.required_true_attrs:
1015 if not getattr(self, attribute_name):
1016 B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1017 return False
1018 return True
1019

◆ save_graph()

save_graph ( self,
filename,
graphname )
inherited
Does a simple dot file creation to visualise states and transiitons.

Definition at line 368 of file state_machines.py.

368 def save_graph(self, filename, graphname):
369 """
370 Does a simple dot file creation to visualise states and transiitons.
371 """
372 with open(filename, "w") as dotfile:
373 dotfile.write("digraph " + graphname + " {\n")
374 for state in self.states.keys():
375 dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
376 for trigger, transition_dicts in self.transitions.items():
377 for transition in transition_dicts:
378 dotfile.write('"' + transition["source"].name + '" -> "' +
379 transition["dest"].name + '" [label="' + trigger + '"]\n')
380 dotfile.write("}\n")
381
382

◆ setup_from_dict()

setup_from_dict ( self,
params )
Parameters:
    params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.

Definition at line 994 of file state_machines.py.

994 def setup_from_dict(self, params):
995 """
996 Parameters:
997 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
998 """
999 for attribute_name, value in params.items():
1000 setattr(self, attribute_name, value)
1001

◆ state() [1/2]

state ( self)
inherited
        The current state of the machine. Actually a `property` decorator. It will call the exit method of the
        current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
        either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).

Definition at line 223 of file state_machines.py.

223 def state(self):
224 """
225 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
226 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
227 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
228 """
229 return self._state
230

◆ state() [2/2]

state ( self,
state )
inherited
 

Definition at line 232 of file state_machines.py.

232 def state(self, state):
233 """
234 """
235 if isinstance(state, str):
236 state_name = state
237 else:
238 state_name = state.name
239
240 try:
241 state = self.states[state_name]
242 # Run exit callbacks of current state
243 for callback in self.state.on_exit:
244 callback(prior_state=self.state, new_state=state)
245 # Run enter callbacks of new state
246 for callback in state.on_enter:
247 callback(prior_state=self.state, new_state=state)
248 # Set the state
249 self._state = state
250 except KeyError:
251 raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
252

Member Data Documentation

◆ _initial_state

dict _initial_state = State(initial_state)
protectedinherited

Actual attribute holding initial state for this machine.

Definition at line 182 of file state_machines.py.

◆ _state

dict _state = self.initial_state
protectedinherited

Actual attribute holding the Current state.

Definition at line 185 of file state_machines.py.

◆ algorithm

algorithm = algorithm

Algorithm() object whose state we are modelling.

Definition at line 965 of file state_machines.py.

◆ database_chain

list database_chain = []

Assigned database chain to the overall Calibration object, or to the 'default' Collection.

Database chains for manually created Collections have no effect here.

Definition at line 972 of file state_machines.py.

◆ default_states

default_states
Initial value:
= [State("init"),
State("ready"),
State("running_algorithm"),
State("completed"),
State("failed")]

Default states for the AlgorithmMachine.

Definition at line 956 of file state_machines.py.

◆ dependent_databases

list dependent_databases = []

CAF created local databases from previous calibrations that this calibration/algorithm depends on.

Definition at line 969 of file state_machines.py.

◆ initial_state

initial_state = initial_state
inherited

Pointless docstring since it's a property.

Definition at line 178 of file state_machines.py.

◆ input_files

input_files = []

Collector output files, will contain all files returned by the output patterns.

Definition at line 967 of file state_machines.py.

◆ output_database_dir

output_database_dir = ""

The output database directory for the localdb that the algorithm will commit to.

Definition at line 976 of file state_machines.py.

◆ output_dir

output_dir = ""

The algorithm output directory which is mostly used to store the stdout file.

Definition at line 974 of file state_machines.py.

◆ required_attrs

list required_attrs
static
Initial value:
= ["algorithm",
"dependent_databases",
"database_chain",
"output_dir",
"output_database_dir",
"input_files"
]

Required attributes that must exist before the machine can run properly.

Some are allowed to be values that return False when tested e.g. "" or []

Definition at line 936 of file state_machines.py.

◆ required_true_attrs

list required_true_attrs
static
Initial value:
= ["algorithm",
"output_dir",
"output_database_dir",
"input_files"
]

Attributes that must have a value that returns True when tested.

Definition at line 945 of file state_machines.py.

◆ result

result = None

IoV_Result object for a single execution, will be reset upon a new execution.

Definition at line 978 of file state_machines.py.

◆ state

state = dest
inherited

Current State of machine.

Definition at line 332 of file state_machines.py.

◆ states

dict states = {}
inherited

Valid states for this machine.

Definition at line 172 of file state_machines.py.

◆ transitions

transitions = defaultdict(list)
inherited

Allowed transitions between states.

Definition at line 187 of file state_machines.py.


The documentation for this class was generated from the following file: