Belle II Software development
AlgorithmMachine Class Reference
Inheritance diagram for AlgorithmMachine:
Machine

Public Member Functions

 __init__ (self, algorithm=None, initial_state="init")
 
 setup_from_dict (self, params)
 
 is_valid (self)
 
 initial_state (self)
 
 initial_state (self, state)
 
 state (self)
 
 state (self, state)
 
 add_state (self, state, enter=None, exit=None)
 
 add_transition (self, trigger, source, dest, conditions=None, before=None, after=None)
 
 __getattr__ (self, name, **kwargs)
 
 get_transitions (self, source)
 
 get_transition_dict (self, state, transition)
 
 save_graph (self, filename, graphname)
 

Static Public Member Functions

 default_condition (**kwargs)
 

Public Attributes

list default_states
 Default states for the AlgorithmMachine.
 
 algorithm = algorithm
 Algorithm() object whose state we are modelling.
 
list input_files = []
 Collector output files, will contain all files returned by the output patterns.
 
list dependent_databases = []
 CAF created local databases from previous calibrations that this calibration/algorithm depends on.
 
list database_chain = []
 Assigned database chain to the overall Calibration object, or to the 'default' Collection.
 
str output_dir = ""
 The algorithm output directory which is mostly used to store the stdout file.
 
str output_database_dir = ""
 The output database directory for the localdb that the algorithm will commit to.
 
 result = None
 IoV_Result object for a single execution, will be reset upon a new execution.
 
dict states = {}
 Valid states for this machine.
 
 initial_state = initial_state
 Pointless docstring since it's a property.
 
 transitions = defaultdict(list)
 Allowed transitions between states.
 
 state = dest
 Current State of machine.
 

Static Public Attributes

list required_attrs
 Required attributes that must exist before the machine can run properly.
 
list required_true_attrs
 Attributes that must have a value that returns True when tested.
 

Protected Member Functions

 _create_output_dir (self, **kwargs)
 
 _setup_database_chain (self, **kwargs)
 
 _setup_logging (self, **kwargs)
 
 _change_working_dir (self, **kwargs)
 
 _pre_algorithm (self, **kwargs)
 
 _execute_over_iov (self, **kwargs)
 
 _set_input_data (self, **kwargs)
 
 _trigger (self, transition_name, transition_dict, **kwargs)
 

Static Protected Member Functions

 _callback (func, **kwargs)
 

Protected Attributes

dict _initial_state = State(initial_state)
 Actual attribute holding initial state for this machine.
 
dict _state = self.initial_state
 Actual attribute holding the Current state.
 

Detailed Description

A state machine to handle the logic of running the algorithm on the overall runs contained in the data.

Definition at line 935 of file state_machines.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
algorithm = None,
initial_state = "init" )
Takes an Algorithm object from the caf framework and defines the transitions.

Definition at line 957 of file state_machines.py.

957 def __init__(self, algorithm=None, initial_state="init"):
958 """
959 Takes an Algorithm object from the caf framework and defines the transitions.
960 """
961
962 self.default_states = [State("init"),
963 State("ready"),
964 State("running_algorithm"),
965 State("completed"),
966 State("failed")]
967
968 super().__init__(self.default_states, initial_state)
969
970
971 self.algorithm = algorithm
972
973 self.input_files = []
974
975 self.dependent_databases = []
976
978 self.database_chain = []
979
980 self.output_dir = ""
981
982 self.output_database_dir = ""
983
984 self.result = None
985
986 self.add_transition("setup_algorithm", "init", "ready",
987 before=[self._setup_logging,
988 self._change_working_dir,
989 self._setup_database_chain,
990 self._set_input_data,
991 self._pre_algorithm])
992 self.add_transition("execute_runs", "ready", "running_algorithm",
993 after=self._execute_over_iov)
994 self.add_transition("complete", "running_algorithm", "completed")
995 self.add_transition("fail", "running_algorithm", "failed")
996 self.add_transition("fail", "ready", "failed")
997 self.add_transition("setup_algorithm", "completed", "ready")
998 self.add_transition("setup_algorithm", "failed", "ready")
999

Member Function Documentation

◆ __getattr__()

__getattr__ ( self,
name,
** kwargs )
inherited
Allows us to create a new method for each trigger on the fly.
If there is no trigger name in the machine to match, then the normal
AttributeError is called.

Definition at line 306 of file state_machines.py.

306 def __getattr__(self, name, **kwargs):
307 """
308 Allows us to create a new method for each trigger on the fly.
309 If there is no trigger name in the machine to match, then the normal
310 AttributeError is called.
311 """
312 possible_transitions = self.get_transitions(self.state)
313 if name not in possible_transitions:
314 raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
315 transition_dict = self.get_transition_dict(self.state, name)
316 # \cond silence doxygen warning about _trigger
317 return partial(self._trigger, name, transition_dict, **kwargs)
318 # \endcond
319

◆ _callback()

_callback ( func,
** kwargs )
staticprotectedinherited
Calls a condition/before/after.. function using arguments passed (or not).

Definition at line 344 of file state_machines.py.

344 def _callback(func, **kwargs):
345 """
346 Calls a condition/before/after.. function using arguments passed (or not).
347 """
348 return func(**kwargs)
349

◆ _change_working_dir()

_change_working_dir ( self,
** kwargs )
protected
 

Definition at line 1082 of file state_machines.py.

1082 def _change_working_dir(self, **kwargs):
1083 """
1084 """
1085 B2INFO(f"Changing current working directory to {self.output_dir}.")
1086 os.chdir(self.output_dir)
1087

◆ _create_output_dir()

_create_output_dir ( self,
** kwargs )
protected
Create working/output directory of algorithm. Any old directory is overwritten.

Definition at line 1026 of file state_machines.py.

1026 def _create_output_dir(self, **kwargs):
1027 """
1028 Create working/output directory of algorithm. Any old directory is overwritten.
1029 """
1030 create_directories(Path(self.output_dir), overwrite=True)
1031

◆ _execute_over_iov()

_execute_over_iov ( self,
** kwargs )
protected
Does the actual execute of the algorithm on an IoV and records the result.

Definition at line 1098 of file state_machines.py.

1098 def _execute_over_iov(self, **kwargs):
1099 """
1100 Does the actual execute of the algorithm on an IoV and records the result.
1101 """
1102 B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1103
1104 runs_to_execute = kwargs["runs"]
1105 iov = kwargs["apply_iov"]
1106 iteration = kwargs["iteration"]
1107 if not iov:
1108 iov = iov_from_runs(runs_to_execute)
1109 B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1110 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1111 self.result = IoV_Result(iov, alg_result)
1112

◆ _pre_algorithm()

_pre_algorithm ( self,
** kwargs )
protected
Call the user defined algorithm setup function.

Definition at line 1088 of file state_machines.py.

1088 def _pre_algorithm(self, **kwargs):
1089 """
1090 Call the user defined algorithm setup function.
1091 """
1092 B2INFO("Running Pre-Algorithm function (if exists)")
1093 if self.algorithm.pre_algorithm:
1094 # We have to re-pass in the algorithm here because an outside user has created this method.
1095 # So the method isn't bound to the instance properly.
1096 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs["iteration"])
1097

◆ _set_input_data()

_set_input_data ( self,
** kwargs )
protected
set input data

Definition at line 1113 of file state_machines.py.

1113 def _set_input_data(self, **kwargs):
1114 """set input data"""
1115 self.algorithm.data_input(self.input_files)
1116
1117

◆ _setup_database_chain()

_setup_database_chain ( self,
** kwargs )
protected
Apply all databases in the correct order.

Definition at line 1032 of file state_machines.py.

1032 def _setup_database_chain(self, **kwargs):
1033 """
1034 Apply all databases in the correct order.
1035 """
1036 # We deliberately override the normal database ordering because we don't want input files GTs to affect
1037 # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1038 b2conditions.reset()
1039 b2conditions.override_globaltags()
1040
1041 # Apply all the databases in order, starting with the user-set chain for this Calibration
1042 # Local variable avoids doxygen "no uniquely matching class member" warning
1043 database_chain = self.database_chain
1044 for database in database_chain:
1045 if database.db_type == 'local':
1046 B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1047 f"for {self.algorithm.name}.")
1048 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1049 elif database.db_type == 'central':
1050 B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1051 f"for {self.algorithm.name}.")
1052 b2conditions.prepend_globaltag(database.global_tag)
1053 else:
1054 raise ValueError(f"Unknown database type {database.db_type}.")
1055 # Here we add the finished databases of previous calibrations that we depend on.
1056 # We can assume that the databases exist as we can't be here until they have returned
1057 # with OK status.
1058 for filename, directory in self.dependent_databases:
1059 B2INFO(f"Adding Local Database {filename} to head of chain of local databases created by"
1060 f" a dependent calibration, for {self.algorithm.name}.")
1061 b2conditions.prepend_testing_payloads(filename)
1062
1063 # Create a directory to store the payloads of this algorithm
1064 create_directories(Path(self.output_database_dir), overwrite=False)
1065
1066 # add local database to save payloads
1067 B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1068 # Things have changed. We now need to do the expert settings to create a database directly.
1069 # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1070 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath("database.txt")))
1071

◆ _setup_logging()

_setup_logging ( self,
** kwargs )
protected
 

Definition at line 1072 of file state_machines.py.

1072 def _setup_logging(self, **kwargs):
1073 """
1074 """
1075 # add logfile for output
1076 log_file = os.path.join(self.output_dir, self.algorithm.name + '_stdout')
1077 B2INFO(f"Output log file at {log_file}.")
1078 basf2.reset_log()
1079 basf2.set_log_level(basf2.LogLevel.INFO)
1080 basf2.log_to_file(log_file)
1081

◆ _trigger()

_trigger ( self,
transition_name,
transition_dict,
** kwargs )
protectedinherited
Runs the transition logic. Callbacks are evaluated in the order:
conditions -> before -> <new state set here> -> after.

Definition at line 320 of file state_machines.py.

320 def _trigger(self, transition_name, transition_dict, **kwargs):
321 """
322 Runs the transition logic. Callbacks are evaluated in the order:
323 conditions -> before -> <new state set here> -> after.
324 """
325 dest, conditions, before_callbacks, after_callbacks = (
326 transition_dict["dest"],
327 transition_dict["conditions"],
328 transition_dict["before"],
329 transition_dict["after"]
330 )
331 # Returns True only if every condition returns True when called
332 if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
333 for before_func in before_callbacks:
334 self._callback(before_func, **kwargs)
335
336 self.state = dest
337 for after_func in after_callbacks:
338 self._callback(after_func, **kwargs)
339 else:
340 raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
341 "evaluated False")
342
STL class.

◆ add_state()

add_state ( self,
state,
enter = None,
exit = None )
inherited
Adds a single state to the list of possible ones.
Should be a unique string or a State object with a unique name.

Definition at line 193 of file state_machines.py.

193 def add_state(self, state, enter=None, exit=None):
194 """
195 Adds a single state to the list of possible ones.
196 Should be a unique string or a State object with a unique name.
197 """
198 if isinstance(state, str):
199 self.add_state(State(state, enter, exit))
200 elif isinstance(state, State):
201 if state.name not in self.states.keys():
202 self.states[state.name] = state
203 else:
204 B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
205 else:
206 B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
207

◆ add_transition()

add_transition ( self,
trigger,
source,
dest,
conditions = None,
before = None,
after = None )
inherited
Adds a single transition to the dictionary of possible ones.
Trigger is the method name that begins the transition between the
source state and the destination state.

The condition is an optional function that returns True or False
depending on the current state/input.

Definition at line 264 of file state_machines.py.

264 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
265 """
266 Adds a single transition to the dictionary of possible ones.
267 Trigger is the method name that begins the transition between the
268 source state and the destination state.
269
270 The condition is an optional function that returns True or False
271 depending on the current state/input.
272 """
273 transition_dict = {}
274 try:
275 source = self.states[source]
276 dest = self.states[dest]
277 transition_dict["source"] = source
278 transition_dict["dest"] = dest
279 except KeyError as err:
280 B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
281 raise err
282 if conditions:
283 if isinstance(conditions, (list, tuple, set)):
284 transition_dict["conditions"] = list(conditions)
285 else:
286 transition_dict["conditions"] = [conditions]
287 else:
288 transition_dict["conditions"] = [Machine.default_condition]
289
290 if not before:
291 before = []
292 if isinstance(before, (list, tuple, set)):
293 transition_dict["before"] = list(before)
294 else:
295 transition_dict["before"] = [before]
296
297 if not after:
298 after = []
299 if isinstance(after, (list, tuple, set)):
300 transition_dict["after"] = list(after)
301 else:
302 transition_dict["after"] = [after]
303
304 self.transitions[trigger].append(transition_dict)
305

◆ default_condition()

default_condition ( ** kwargs)
staticinherited
Method to always return True.

Definition at line 258 of file state_machines.py.

258 def default_condition(**kwargs):
259 """
260 Method to always return True.
261 """
262 return True
263

◆ get_transition_dict()

get_transition_dict ( self,
state,
transition )
inherited
Returns the transition dictionary for a state and transition out of it.

Definition at line 361 of file state_machines.py.

361 def get_transition_dict(self, state, transition):
362 """
363 Returns the transition dictionary for a state and transition out of it.
364 """
365 transition_dicts = self.transitions[transition]
366 for transition_dict in transition_dicts:
367 if transition_dict["source"] == state:
368 return transition_dict
369 else:
370 raise KeyError(f"No transition from state {state} with the name {transition}.")
371

◆ get_transitions()

get_transitions ( self,
source )
inherited
Returns allowed transitions from a given state.

Definition at line 350 of file state_machines.py.

350 def get_transitions(self, source):
351 """
352 Returns allowed transitions from a given state.
353 """
354 possible_transitions = []
355 for transition, transition_dicts in self.transitions.items():
356 for transition_dict in transition_dicts:
357 if transition_dict["source"] == source:
358 possible_transitions.append(transition)
359 return possible_transitions
360

◆ initial_state() [1/2]

initial_state ( self)
inherited
The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.

Definition at line 209 of file state_machines.py.

209 def initial_state(self):
210 """
211 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
212 """
213 return self._initial_state
214

◆ initial_state() [2/2]

initial_state ( self,
state )
inherited
 

Definition at line 216 of file state_machines.py.

216 def initial_state(self, state):
217 """
218 """
219 if state in self.states.keys():
220 self._initial_state = self.states[state]
221
222 self._state = self.states[state]
223 else:
224 raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
225

◆ is_valid()

is_valid ( self)
Returns:
    bool: Whether or not this machine has been set up correctly with all its necessary attributes.

Definition at line 1008 of file state_machines.py.

1008 def is_valid(self):
1009 """
1010 Returns:
1011 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
1012 """
1013 B2INFO(f"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
1014 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
1015 for attribute_name in self.required_attrs:
1016 if not hasattr(self, attribute_name):
1017 B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1018 return False
1019 # Check if any attributes that need actual values haven't been set or were empty
1020 for attribute_name in self.required_true_attrs:
1021 if not getattr(self, attribute_name):
1022 B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1023 return False
1024 return True
1025

◆ save_graph()

save_graph ( self,
filename,
graphname )
inherited
Does a simple dot file creation to visualise states and transiitons.

Definition at line 372 of file state_machines.py.

372 def save_graph(self, filename, graphname):
373 """
374 Does a simple dot file creation to visualise states and transiitons.
375 """
376 with open(filename, "w") as dotfile:
377 dotfile.write("digraph " + graphname + " {\n")
378 for state in self.states.keys():
379 dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
380 for trigger, transition_dicts in self.transitions.items():
381 for transition in transition_dicts:
382 dotfile.write('"' + transition["source"].name + '" -> "' +
383 transition["dest"].name + '" [label="' + trigger + '"]\n')
384 dotfile.write("}\n")
385
386

◆ setup_from_dict()

setup_from_dict ( self,
params )
Parameters:
    params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.

Definition at line 1000 of file state_machines.py.

1000 def setup_from_dict(self, params):
1001 """
1002 Parameters:
1003 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
1004 """
1005 for attribute_name, value in params.items():
1006 setattr(self, attribute_name, value)
1007

◆ state() [1/2]

state ( self)
inherited
        The current state of the machine. Actually a `property` decorator. It will call the exit method of the
        current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
        either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).

Definition at line 227 of file state_machines.py.

227 def state(self):
228 """
229 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
230 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
231 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
232 """
233 return self._state
234

◆ state() [2/2]

state ( self,
state )
inherited
 

Definition at line 236 of file state_machines.py.

236 def state(self, state):
237 """
238 """
239 if isinstance(state, str):
240 state_name = state
241 else:
242 state_name = state.name
243
244 try:
245 state = self.states[state_name]
246 # Run exit callbacks of current state
247 for callback in self.state.on_exit:
248 callback(prior_state=self.state, new_state=state)
249 # Run enter callbacks of new state
250 for callback in state.on_enter:
251 callback(prior_state=self.state, new_state=state)
252 # Set the state
253 self._state = state
254 except KeyError:
255 raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
256

Member Data Documentation

◆ _initial_state

dict _initial_state = State(initial_state)
protectedinherited

Actual attribute holding initial state for this machine.

Definition at line 186 of file state_machines.py.

◆ _state

dict _state = self.initial_state
protectedinherited

Actual attribute holding the Current state.

Definition at line 189 of file state_machines.py.

◆ algorithm

algorithm = algorithm

Algorithm() object whose state we are modelling.

Definition at line 971 of file state_machines.py.

◆ database_chain

list database_chain = []

Assigned database chain to the overall Calibration object, or to the 'default' Collection.

Database chains for manually created Collections have no effect here.

Definition at line 978 of file state_machines.py.

◆ default_states

default_states
Initial value:
= [State("init"),
State("ready"),
State("running_algorithm"),
State("completed"),
State("failed")]

Default states for the AlgorithmMachine.

Definition at line 962 of file state_machines.py.

◆ dependent_databases

list dependent_databases = []

CAF created local databases from previous calibrations that this calibration/algorithm depends on.

Definition at line 975 of file state_machines.py.

◆ initial_state

initial_state = initial_state
inherited

Pointless docstring since it's a property.

Definition at line 182 of file state_machines.py.

◆ input_files

input_files = []

Collector output files, will contain all files returned by the output patterns.

Definition at line 973 of file state_machines.py.

◆ output_database_dir

output_database_dir = ""

The output database directory for the localdb that the algorithm will commit to.

Definition at line 982 of file state_machines.py.

◆ output_dir

output_dir = ""

The algorithm output directory which is mostly used to store the stdout file.

Definition at line 980 of file state_machines.py.

◆ required_attrs

list required_attrs
static
Initial value:
= ["algorithm",
"dependent_databases",
"database_chain",
"output_dir",
"output_database_dir",
"input_files"
]

Required attributes that must exist before the machine can run properly.

Some are allowed to be values that return False when tested e.g. "" or []

Definition at line 942 of file state_machines.py.

◆ required_true_attrs

list required_true_attrs
static
Initial value:
= ["algorithm",
"output_dir",
"output_database_dir",
"input_files"
]

Attributes that must have a value that returns True when tested.

Definition at line 951 of file state_machines.py.

◆ result

result = None

IoV_Result object for a single execution, will be reset upon a new execution.

Definition at line 984 of file state_machines.py.

◆ state

state = dest
inherited

Current State of machine.

Definition at line 336 of file state_machines.py.

◆ states

dict states = {}
inherited

Valid states for this machine.

Definition at line 176 of file state_machines.py.

◆ transitions

transitions = defaultdict(list)
inherited

Allowed transitions between states.

Definition at line 191 of file state_machines.py.


The documentation for this class was generated from the following file: