Belle II Software development
state_machines.py
1#!/usr/bin/env python3
2
3
10
11import basf2
12
13from functools import partial
14from collections import defaultdict
15
16import pickle
17import glob
18import shutil
19import time
20from pathlib import Path
21import os
22import json
23
24from basf2 import create_path
25from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
26from basf2 import conditions as b2conditions
27from basf2.pickle_path import serialize_path
28
29from ROOT import Belle2 # noqa: make the Belle2 namespace available
30from ROOT.Belle2 import CalibrationAlgorithm
31
32from caf.utils import create_directories
33from caf.utils import method_dispatch
34from caf.utils import iov_from_runs
35from caf.utils import IoV_Result
36from caf.utils import get_iov_from_file
37from caf.backends import Job
38from caf.runners import AlgorithmsRunner
39
40
41class State():
42 """
43 Basic State object that can take enter and exit state methods and records
44 the state of a machine.
45
46 You should assign the self.on_enter or self.on_exit attributes to callback functions
47 or lists of them, if you need them.
48 """
49
50 def __init__(self, name, enter=None, exit=None):
51 """
52 Initialise State with a name and optional lists of callbacks.
53 """
54
55 self.name = name
56
57 self.on_enter = enter
58
59 self.on_exit = exit
60
61 @property
62 def on_enter(self):
63 """
64 Runs callbacks when a state is entered.
65 """
66 return self._on_enter
67
68 @on_enter.setter
69 def on_enter(self, callbacks):
70 """
71 """
72
73 self._on_enter = []
74 if callbacks:
75 self._add_callbacks(callbacks, self._on_enter)
76
77 @property
78 def on_exit(self):
79 """
80 Runs callbacks when a state is exited.
81 """
82 return self._on_exit
83
84 @on_exit.setter
85 def on_exit(self, callbacks):
86 """
87 """
88
89 self._on_exit = []
90 if callbacks:
91 self._add_callbacks(callbacks, self._on_exit)
92
93 @method_dispatch
94 def _add_callbacks(self, callback, attribute):
95 """
96 Adds callback to a property.
97 """
98 if callable(callback):
99 attribute.append(callback)
100 else:
101 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
102
103 # Doxygen doesn't understand @method_dispatch overloads (def _) and emits
104 # "no uniquely matching class member found" warnings. Hide them from doxygen.
105 # @cond
106 @_add_callbacks.register(tuple)
107 @_add_callbacks.register(list)
108 def _(self, callbacks, attribute):
109 """
110 Alternate method for lists and tuples of function objects.
111 """
112 if callbacks:
113 for function in callbacks:
114 if callable(function):
115 attribute.append(function)
116 else:
117 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
118 # @endcond
119
120 def __str__(self):
121 """
122 """
123 return self.name
124
125 def __repr__(self):
126 """
127 """
128 return f"State(name={self.name})"
129
130 def __eq__(self, other):
131 """
132 """
133 if isinstance(other, str):
134 return self.name == other
135 else:
136 return self.name == other.name
137
138 def __hash__(self):
139 """
140 """
141 return hash(self.name)
142
143
144class Machine():
145 """
146 Parameters:
147 states (list[str]): A list of possible states of the machine.
148 initial_state (str):
149
150 Base class for a final state machine wrapper.
151 Implements the framework that a more complex machine can inherit from.
152
153 The `transitions` attribute is a dictionary of trigger name keys, each value of
154 which is another dictionary of 'source' states, 'dest' states, and 'conditions'
155 methods. 'conditions' should be a list of callables or a single one. A transition is
156 valid if it goes from an allowed state to an allowed state.
157 Conditions are optional but must be a callable that returns True or False based
158 on some state of the machine. They cannot have input arguments currently.
159
160 Every condition/before/after callback function MUST take ``**kwargs`` as the only
161 argument (except ``self`` if it's a class method). This is because it's basically
162 impossible to determine which arguments to pass to which functions for a transition.
163 Therefore this machine just enforces that every function should simply take ``**kwargs``
164 and use the dictionary of arguments (even if it doesn't need any arguments).
165
166 This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
167 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
168 will *not* work.
169 """
170
171 def __init__(self, states=None, initial_state="default_initial"):
172 """
173 Basic Setup of states and initial_state.
174 """
175
176 self.states = {}
177 if states:
178 for state in states:
179 self.add_state(state)
180 if initial_state != "default_initial":
181
182 self.initial_state = initial_state
183 else:
184 self.add_state(initial_state)
185
186 self._initial_state = State(initial_state)
187
188
190
191 self.transitions = defaultdict(list)
192
193 def add_state(self, state, enter=None, exit=None):
194 """
195 Adds a single state to the list of possible ones.
196 Should be a unique string or a State object with a unique name.
197 """
198 if isinstance(state, str):
199 self.add_state(State(state, enter, exit))
200 elif isinstance(state, State):
201 if state.name not in self.states.keys():
202 self.states[state.name] = state
203 else:
204 B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
205 else:
206 B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
207
208 @property
209 def initial_state(self):
210 """
211 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
212 """
213 return self._initial_state
214
215 @initial_state.setter
216 def initial_state(self, state):
217 """
218 """
219 if state in self.states.keys():
220 self._initial_state = self.states[state]
221
222 self._state = self.states[state]
223 else:
224 raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
225
226 @property
227 def state(self):
228 """
229 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
230 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
231 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
232 """
233 return self._state
234
235 @state.setter
236 def state(self, state):
237 """
238 """
239 if isinstance(state, str):
240 state_name = state
241 else:
242 state_name = state.name
243
244 try:
245 state = self.states[state_name]
246 # Run exit callbacks of current state
247 for callback in self.state.on_exit:
248 callback(prior_state=self.state, new_state=state)
249 # Run enter callbacks of new state
250 for callback in state.on_enter:
251 callback(prior_state=self.state, new_state=state)
252 # Set the state
253 self._state = state
254 except KeyError:
255 raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
256
257 @staticmethod
258 def default_condition(**kwargs):
259 """
260 Method to always return True.
261 """
262 return True
263
264 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
265 """
266 Adds a single transition to the dictionary of possible ones.
267 Trigger is the method name that begins the transition between the
268 source state and the destination state.
269
270 The condition is an optional function that returns True or False
271 depending on the current state/input.
272 """
273 transition_dict = {}
274 try:
275 source = self.states[source]
276 dest = self.states[dest]
277 transition_dict["source"] = source
278 transition_dict["dest"] = dest
279 except KeyError as err:
280 B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
281 raise err
282 if conditions:
283 if isinstance(conditions, (list, tuple, set)):
284 transition_dict["conditions"] = list(conditions)
285 else:
286 transition_dict["conditions"] = [conditions]
287 else:
288 transition_dict["conditions"] = [Machine.default_condition]
289
290 if not before:
291 before = []
292 if isinstance(before, (list, tuple, set)):
293 transition_dict["before"] = list(before)
294 else:
295 transition_dict["before"] = [before]
296
297 if not after:
298 after = []
299 if isinstance(after, (list, tuple, set)):
300 transition_dict["after"] = list(after)
301 else:
302 transition_dict["after"] = [after]
303
304 self.transitions[trigger].append(transition_dict)
305
306 def __getattr__(self, name, **kwargs):
307 """
308 Allows us to create a new method for each trigger on the fly.
309 If there is no trigger name in the machine to match, then the normal
310 AttributeError is called.
311 """
312 possible_transitions = self.get_transitions(self.state)
313 if name not in possible_transitions:
314 raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
315 transition_dict = self.get_transition_dict(self.state, name)
316 # \cond silence doxygen warning about _trigger
317 return partial(self._trigger, name, transition_dict, **kwargs)
318 # \endcond
319
320 def _trigger(self, transition_name, transition_dict, **kwargs):
321 """
322 Runs the transition logic. Callbacks are evaluated in the order:
323 conditions -> before -> <new state set here> -> after.
324 """
325 dest, conditions, before_callbacks, after_callbacks = (
326 transition_dict["dest"],
327 transition_dict["conditions"],
328 transition_dict["before"],
329 transition_dict["after"]
330 )
331 # Returns True only if every condition returns True when called
332 if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
333 for before_func in before_callbacks:
334 self._callback(before_func, **kwargs)
335
336 self.state = dest
337 for after_func in after_callbacks:
338 self._callback(after_func, **kwargs)
339 else:
340 raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
341 "evaluated False")
342
343 @staticmethod
344 def _callback(func, **kwargs):
345 """
346 Calls a condition/before/after.. function using arguments passed (or not).
347 """
348 return func(**kwargs)
349
350 def get_transitions(self, source):
351 """
352 Returns allowed transitions from a given state.
353 """
354 possible_transitions = []
355 for transition, transition_dicts in self.transitions.items():
356 for transition_dict in transition_dicts:
357 if transition_dict["source"] == source:
358 possible_transitions.append(transition)
359 return possible_transitions
360
361 def get_transition_dict(self, state, transition):
362 """
363 Returns the transition dictionary for a state and transition out of it.
364 """
365 transition_dicts = self.transitions[transition]
366 for transition_dict in transition_dicts:
367 if transition_dict["source"] == state:
368 return transition_dict
369 else:
370 raise KeyError(f"No transition from state {state} with the name {transition}.")
371
372 def save_graph(self, filename, graphname):
373 """
374 Does a simple dot file creation to visualise states and transiitons.
375 """
376 with open(filename, "w") as dotfile:
377 dotfile.write("digraph " + graphname + " {\n")
378 for state in self.states.keys():
379 dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
380 for trigger, transition_dicts in self.transitions.items():
381 for transition in transition_dicts:
382 dotfile.write('"' + transition["source"].name + '" -> "' +
383 transition["dest"].name + '" [label="' + trigger + '"]\n')
384 dotfile.write("}\n")
385
386
388 """
389 A state machine to handle `Calibration` objects and the flow of
390 processing for them.
391 """
392
393
394 collector_input_dir = 'collector_input'
395
396 collector_output_dir = 'collector_output'
397
398 algorithm_output_dir = 'algorithm_output'
399
400 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
401 """
402 Takes a Calibration object from the caf framework and lets you
403 set the initial state.
404 """
405
406 self.default_states = [State("init", enter=[self._update_cal_state,
407 self._log_new_state]),
408 State("running_collector", enter=[self._update_cal_state,
409 self._log_new_state]),
410 State("collector_failed", enter=[self._update_cal_state,
411 self._log_new_state]),
412 State("collector_completed", enter=[self._update_cal_state,
413 self._log_new_state]),
414 State("running_algorithms", enter=[self._update_cal_state,
415 self._log_new_state]),
416 State("algorithms_failed", enter=[self._update_cal_state,
417 self._log_new_state]),
418 State("algorithms_completed", enter=[self._update_cal_state,
419 self._log_new_state]),
420 State("completed", enter=[self._update_cal_state,
421 self._log_new_state]),
422 State("failed", enter=[self._update_cal_state,
423 self._log_new_state])
424 ]
425
426 super().__init__(self.default_states, initial_state)
427
428
429 self.calibration = calibration
430 # Monkey Patching for the win!
431
432 self.calibration.machine = self
433
434 self.iteration = iteration
435
437
439
441
442 self.iov_to_calibrate = iov_to_calibrate
443
444 self.root_dir = Path(os.getcwd(), calibration.name)
445
446
450
451
453
454 self.add_transition("submit_collector", "init", "running_collector",
455 conditions=self.dependencies_completed,
456 before=[self._make_output_dir,
458 self._build_iov_dicts,
461 self._dump_job_config])
462 self.add_transition("fail", "running_collector", "collector_failed",
463 conditions=self._collection_failed)
464 self.add_transition("complete", "running_collector", "collector_completed",
465 conditions=self._collection_completed)
466 self.add_transition("run_algorithms", "collector_completed", "running_algorithms",
468 after=[self._run_algorithms,
470 self.add_transition("complete", "running_algorithms", "algorithms_completed",
471 after=self.automatic_transition,
472 conditions=self._runner_not_failed)
473 self.add_transition("fail", "running_algorithms", "algorithms_failed",
474 conditions=self._runner_failed)
475 self.add_transition("iterate", "algorithms_completed", "init",
476 conditions=[self._require_iteration,
478 after=self._increment_iteration)
479 self.add_transition("finish", "algorithms_completed", "completed",
480 conditions=self._no_require_iteration,
481 before=self._prepare_final_db)
482 self.add_transition("fail_fully", "algorithms_failed", "failed")
483 self.add_transition("fail_fully", "collector_failed", "failed")
484
485 def _update_cal_state(self, **kwargs):
486 """update calibration state"""
487 self.calibration.state = str(kwargs["new_state"])
488
489 def files_containing_iov(self, file_paths, files_to_iovs, iov):
490 """
491 Lookup function that returns all files from the file_paths that
492 overlap with this IoV.
493 """
494 # Files that contain an Exp,Run range that overlaps with given IoV
495 overlapping_files = set()
496
497 for file_path, file_iov in files_to_iovs.items():
498 if file_iov.overlaps(iov) and (file_path in file_paths):
499 overlapping_files.add(file_path)
500 return overlapping_files
501
503 """
504 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
505 later in case of failure.
506 """
507 # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
508 # submits the jobs this might need to wait a while.
509 while any(map(lambda j: j.status == "init", self._collector_jobs.values())):
510 B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
511 time.sleep(5)
512
513 for collection_name, job in self._collector_jobs.items():
514 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
515 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
516 collection_name, collector_job_output_file_name)
517 job.dump_to_json(output_file)
518
520 """
521 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
522 """
523 for collection_name, collection in self.calibration.collections.items():
524 output_file = self.root_dir.joinpath(str(self.iteration),
526 collection_name,
527 collection.job_config)
528 self._collector_jobs[collection_name] = Job.from_json(output_file)
529
530 def _iov_requested(self):
531 """
532 """
533 if self.iov_to_calibrate:
534 B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
535 return True
536 else:
537 B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
538 return False
539
541 """
542 """
543 pass
544
546 """
547 Build IoV file dictionary for each collection if required.
548 """
549 iov_requested = self._iov_requested()
550 if iov_requested or self.calibration.ignored_runs:
551 for coll_name, collection in self.calibration.collections.items():
552 if not collection.files_to_iovs:
553 B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
554 f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
555 " Filling dictionary from input file metadata."
556 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
557
558 files_to_iovs = {}
559 for file_path in collection.input_files:
560 files_to_iovs[file_path] = get_iov_from_file(file_path)
561 collection.files_to_iovs = files_to_iovs
562 else:
563 B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
564 f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
565 else:
566 B2INFO("No File to IoV mapping required.")
567
569 """
570 """
571 return self.iteration < self.calibration.max_iterations
572
574 """
575 """
576 self.iteration += 1
577 self.calibration.iteration = self.iteration
578
580 """
581 Did all the collections succeed?
582 """
583 B2DEBUG(29, "Checking for failed collector job.")
584 if self._collector_jobs_ready():
585 return all([job.status == "completed" for job in self._collector_jobs.values()])
586
588 """
589 Did any of the collections fail?
590 """
591 B2DEBUG(29, "Checking for failed collector job.")
592 if self._collector_jobs_ready():
593 return any([job.status == "failed" for job in self._collector_jobs.values()])
594
596 """
597 Returns:
598 bool: If AlgorithmsRunner succeeded return True.
599 """
600 return not self._runner_failed()
601
602 def _runner_failed(self):
603 """
604 Returns:
605 bool: If AlgorithmsRunner failed return True.
606 """
607 if self._runner_final_state == AlgorithmsRunner.FAILED:
608 return True
609 else:
610 return False
611
613 """
614 """
615 since_last_update = time.time() - self._collector_timing["last_update"]
616 if since_last_update > self.calibration.collector_full_update_interval:
617 B2INFO("Updating full collector job statuses.")
618 for job in self._collector_jobs.values():
619 job.update_status()
620 self._collector_timing["last_update"] = time.time()
621 if job.subjobs:
622 num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
623 total_subjobs = len(job.subjobs)
624 B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
625 f" Calibration {self.calibration.name} Job {job.name}.")
626 return all([job.ready() for job in self._collector_jobs.values()])
627
629 """
630 """
631 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
632 self._collector_timing["start"] = time.time()
633 self._collector_timing["last_update"] = time.time()
634
636 """
637 """
638 if self._require_iteration() and self._below_max_iterations():
639 return False
640 elif self._require_iteration() and not self._below_max_iterations():
641 B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
642 return True
643 elif not self._require_iteration():
644 return True
645
647 """
648 """
649 iteration_called = False
650 for alg_name, results in self._algorithm_results[self.iteration].items():
651 for result in results:
652 if result.result == CalibrationAlgorithm.c_Iterate:
653 iteration_called = True
654 break
655 if iteration_called:
656 break
657 return iteration_called
658
659 def _log_new_state(self, **kwargs):
660 """
661 """
662 B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
663
665 """
666 Condition function to check that the dependencies of our calibration are in the 'completed' state.
667 Technically only need to check explicit dependencies.
668 """
669 for calibration in self.calibration.dependencies:
670 if not calibration.state == calibration.end_state:
671 return False
672 else:
673 return True
674
676 """
677 Automatically try all transitions out of this state once. Tries fail last.
678 """
679 possible_transitions = self.get_transitions(self.state)
680 for transition in possible_transitions:
681 try:
682 if transition != "fail":
683 getattr(self, transition)()
684 break
685 except ConditionError:
686 continue
687 else:
688 if "fail" in possible_transitions:
689 getattr(self, "fail")()
690 else:
691 raise MachineError(f"Failed to automatically transition out of {self.state} state.")
692
694 """
695 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
696 Also creates s
697 """
698 create_directories(self.root_dir, overwrite=False)
699
700 def _make_collector_path(self, name, collection):
701 """
702 Creates a basf2 path for the correct collector and serializes it in the
703 self.output_dir/<calibration_name>/<iteration>/paths directory
704 """
705 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
706 # Automatically overwrite any previous directory
707 create_directories(path_output_dir)
708 path_file_name = collection.collector.name() + '.path'
709 path_file_name = path_output_dir / path_file_name
710 # Create empty path and add collector to it
711 coll_path = create_path()
712 coll_path.add_module(collection.collector)
713 # Dump the basf2 path to file
714 with open(path_file_name, 'bw') as serialized_path_file:
715 pickle.dump(serialize_path(coll_path), serialized_path_file)
716 # Return the pickle file path for addition to the input sandbox
717 return str(path_file_name.absolute())
718
719 def _make_pre_collector_path(self, name, collection):
720 """
721 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
722 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
723 """
724 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
725 coll_path = collection.pre_collector_path
726 path_file_name = 'pre_collector.path'
727 path_file_name = os.path.join(path_output_dir, path_file_name)
728 # Dump the basf2 path to file
729 with open(path_file_name, 'bw') as serialized_path_file:
730 pickle.dump(serialize_path(coll_path), serialized_path_file)
731 # Return the pickle file path for addition to the input sandbox
732 return path_file_name
733
735 """
736 Creates a Job object for the collections of this iteration, ready for submission
737 to backend.
738 """
739 for collection_name, collection in self.calibration.collections.items():
740 iteration_dir = self.root_dir.joinpath(str(self.iteration))
741 job = Job('_'.join([self.calibration.name, collection_name, 'Iteration', str(self.iteration)]))
742 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
743 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
744 # Remove previous failed attempt to avoid problems
745 if job.output_dir.exists():
746 B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
747 f"Deleting {job.output_dir} before re-submitting.")
748 shutil.rmtree(job.output_dir)
749 job.cmd = collection.job_cmd
750 job.append_current_basf2_setup_cmds()
751 job.input_sandbox_files.append(collection.job_script)
752 collector_path_file = Path(self._make_collector_path(collection_name, collection))
753 job.input_sandbox_files.append(collector_path_file)
754 if collection.pre_collector_path:
755 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
756 job.input_sandbox_files.append(pre_collector_path_file)
757
758 # Want to figure out which local databases are required for this job and their paths
759 list_dependent_databases = []
760
761 # Here we add the finished databases of previous calibrations that we depend on.
762 # We can assume that the databases exist as we can't be here until they have returned
763 for dependency in self.calibration.dependencies:
764 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
765 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
766 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
767
768 # Add previous iteration databases from this calibration
769 if self.iteration > 0:
770 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
771 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
772 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
773 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
774
775 # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
776 # collector path
777 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
778
779 # Need to pass setup info to collector which would be tricky as arguments
780 # We make a dictionary and pass it in as json
781 job_config = {}
782 # Apply the user-set Calibration database chain to the base of the overall chain.
783 json_db_chain = []
784 for database in collection.database_chain:
785 if database.db_type == 'local':
786 json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
787 elif database.db_type == 'central':
788 json_db_chain.append(('central', database.global_tag))
789 else:
790 raise ValueError(f"Unknown database type {database.db_type}.")
791 # CAF created ones for dependent calibrations and previous iterations of this calibration
792 for database in list_dependent_databases:
793 json_db_chain.append(('local', database))
794 job_config['database_chain'] = json_db_chain
795
796 job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
797 with open(job_config_file_path, 'w') as job_config_file:
798 json.dump(job_config, job_config_file, indent=2)
799 job.input_sandbox_files.append(job_config_file_path)
800
801 # Define the input files
802 input_data_files = set(collection.input_files)
803 # Reduce the input data files to only those that overlap with the optional requested IoV
804 # Local variable avoids doxygen "no uniquely matching class member" warning
805 iov_to_calibrate = self.iov_to_calibrate
806 if iov_to_calibrate:
807 input_data_files = self.files_containing_iov(input_data_files,
808 collection.files_to_iovs,
809 iov_to_calibrate)
810 # Remove any files that ONLY contain runs from our optional ignored_runs list
811 files_to_ignore = set()
812 for exprun in self.calibration.ignored_runs:
813 for input_file in input_data_files:
814 file_iov = self.calibration.files_to_iovs[input_file]
815 if file_iov == exprun.make_iov():
816 B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
817 f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
818 "is being removed from input files list.")
819 files_to_ignore.add(input_file)
820 input_data_files.difference_update(files_to_ignore)
821
822 if not input_data_files:
823 raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
824 f" and Collection '{collection_name}'.")
825 job.input_files = list(input_data_files)
826
827 job.splitter = collection.splitter
828 job.backend_args = collection.backend_args
829 # Output patterns to be returned from collector job
830 job.output_patterns = collection.output_patterns
831 B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
832 self._collector_jobs[collection_name] = job
833
835 """check that collector output is valid"""
836 B2INFO("Checking that Collector output exists for all collector jobs "
837 f"using {self.calibration.name}.output_patterns.")
838 if not self._collector_jobs:
839 B2INFO("We're restarting so we'll recreate the collector Job object.")
841
842 for job in self._collector_jobs.values():
843 if not job.subjobs:
844 output_files = []
845 for pattern in job.output_patterns:
846 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
847 if not output_files:
848 raise MachineError("No output files from Collector Job")
849 else:
850 for subjob in job.subjobs.values():
851 output_files = []
852 for pattern in subjob.output_patterns:
853 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
854 if not output_files:
855 raise MachineError(f"No output files from Collector {subjob}")
856
858 """
859 Runs the Calibration Algorithms for this calibration machine.
860
861 Will run them sequentially locally (possible benefits to using a
862 processing pool for low memory algorithms later on.)
863 """
864 # Get an instance of the Runner for these algorithms and run it
865 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
866 algs_runner.algorithms = self.calibration.algorithms
867 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
868 output_database_dir = algorithm_output_dir.joinpath("outputdb")
869 # Remove it, if we failed previously, to start clean
870 if algorithm_output_dir.exists():
871 B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
872 f"Deleting and recreating {algorithm_output_dir}.")
873 create_directories(algorithm_output_dir)
874 B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
875 algs_runner.output_database_dir = output_database_dir
876 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
877 input_files = []
878
879 for job in self._collector_jobs.values():
880 if job.subjobs:
881 for subjob in job.subjobs.values():
882 for pattern in subjob.output_patterns:
883 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
884 else:
885 for pattern in job.output_patterns:
886 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
887
888 algs_runner.input_files = input_files
889
890 # Add any user defined database chain for this calibration
891 algs_runner.database_chain = self.calibration.database_chain
892
893 # Here we add the finished databases of previous calibrations that we depend on.
894 # We can assume that the databases exist as we can't be here until they have returned
895 list_dependent_databases = []
896 for dependency in self.calibration.dependencies:
897 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
898 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
899 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
900
901 # Add previous iteration databases from this calibration
902 if self.iteration > 0:
903 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
904 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
905 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
906 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
907 algs_runner.dependent_databases = list_dependent_databases
908
909 algs_runner.ignored_runs = self.calibration.ignored_runs
910
911 try:
912 algs_runner.run(self.iov_to_calibrate, self.iteration)
913 except Exception as err:
914 print(err)
915 # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
916 # results. But here we had an actual exception so we just force into failure instead.
917 self._state = State("algorithms_failed")
918 self._algorithm_results[self.iteration] = algs_runner.results
919 self._runner_final_state = algs_runner.final_state
920
922 """
923 Take the last iteration's outputdb and copy it to a more easily findable place.
924 """
925 database_location = self.root_dir.joinpath(str(self.iteration),
926 self.calibration.alg_output_dir,
927 'outputdb')
928 final_database_location = self.root_dir.joinpath('outputdb')
929 if final_database_location.exists():
930 B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
931 shutil.rmtree(final_database_location)
932 shutil.copytree(database_location, final_database_location)
933
934
936 """
937 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
938 """
939
940
942 required_attrs = ["algorithm",
943 "dependent_databases",
944 "database_chain",
945 "output_dir",
946 "output_database_dir",
947 "input_files"
948 ]
949
950
951 required_true_attrs = ["algorithm",
952 "output_dir",
953 "output_database_dir",
954 "input_files"
955 ]
956
957 def __init__(self, algorithm=None, initial_state="init"):
958 """
959 Takes an Algorithm object from the caf framework and defines the transitions.
960 """
961
962 self.default_states = [State("init"),
963 State("ready"),
964 State("running_algorithm"),
965 State("completed"),
966 State("failed")]
967
968 super().__init__(self.default_states, initial_state)
969
970
971 self.algorithm = algorithm
972
973 self.input_files = []
974
976
979
980 self.output_dir = ""
981
983
984 self.result = None
985
986 self.add_transition("setup_algorithm", "init", "ready",
987 before=[self._setup_logging,
990 self._set_input_data,
991 self._pre_algorithm])
992 self.add_transition("execute_runs", "ready", "running_algorithm",
993 after=self._execute_over_iov)
994 self.add_transition("complete", "running_algorithm", "completed")
995 self.add_transition("fail", "running_algorithm", "failed")
996 self.add_transition("fail", "ready", "failed")
997 self.add_transition("setup_algorithm", "completed", "ready")
998 self.add_transition("setup_algorithm", "failed", "ready")
999
1000 def setup_from_dict(self, params):
1001 """
1002 Parameters:
1003 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
1004 """
1005 for attribute_name, value in params.items():
1006 setattr(self, attribute_name, value)
1007
1008 def is_valid(self):
1009 """
1010 Returns:
1011 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
1012 """
1013 B2INFO(f"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
1014 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
1015 for attribute_name in self.required_attrs:
1016 if not hasattr(self, attribute_name):
1017 B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1018 return False
1019 # Check if any attributes that need actual values haven't been set or were empty
1020 for attribute_name in self.required_true_attrs:
1021 if not getattr(self, attribute_name):
1022 B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1023 return False
1024 return True
1025
1026 def _create_output_dir(self, **kwargs):
1027 """
1028 Create working/output directory of algorithm. Any old directory is overwritten.
1029 """
1030 create_directories(Path(self.output_dir), overwrite=True)
1031
1032 def _setup_database_chain(self, **kwargs):
1033 """
1034 Apply all databases in the correct order.
1035 """
1036 # We deliberately override the normal database ordering because we don't want input files GTs to affect
1037 # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1038 b2conditions.reset()
1039 b2conditions.override_globaltags()
1040
1041 # Apply all the databases in order, starting with the user-set chain for this Calibration
1042 # Local variable avoids doxygen "no uniquely matching class member" warning
1043 database_chain = self.database_chain
1044 for database in database_chain:
1045 if database.db_type == 'local':
1046 B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1047 f"for {self.algorithm.name}.")
1048 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1049 elif database.db_type == 'central':
1050 B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1051 f"for {self.algorithm.name}.")
1052 b2conditions.prepend_globaltag(database.global_tag)
1053 else:
1054 raise ValueError(f"Unknown database type {database.db_type}.")
1055 # Here we add the finished databases of previous calibrations that we depend on.
1056 # We can assume that the databases exist as we can't be here until they have returned
1057 # with OK status.
1058 for filename, directory in self.dependent_databases:
1059 B2INFO(f"Adding Local Database {filename} to head of chain of local databases created by"
1060 f" a dependent calibration, for {self.algorithm.name}.")
1061 b2conditions.prepend_testing_payloads(filename)
1062
1063 # Create a directory to store the payloads of this algorithm
1064 create_directories(Path(self.output_database_dir), overwrite=False)
1065
1066 # add local database to save payloads
1067 B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1068 # Things have changed. We now need to do the expert settings to create a database directly.
1069 # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1070 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath("database.txt")))
1071
1072 def _setup_logging(self, **kwargs):
1073 """
1074 """
1075 # add logfile for output
1076 log_file = os.path.join(self.output_dir, self.algorithm.name + '_stdout')
1077 B2INFO(f"Output log file at {log_file}.")
1078 basf2.reset_log()
1079 basf2.set_log_level(basf2.LogLevel.INFO)
1080 basf2.log_to_file(log_file)
1081
1082 def _change_working_dir(self, **kwargs):
1083 """
1084 """
1085 B2INFO(f"Changing current working directory to {self.output_dir}.")
1086 os.chdir(self.output_dir)
1087
1088 def _pre_algorithm(self, **kwargs):
1089 """
1090 Call the user defined algorithm setup function.
1091 """
1092 B2INFO("Running Pre-Algorithm function (if exists)")
1093 if self.algorithm.pre_algorithm:
1094 # We have to re-pass in the algorithm here because an outside user has created this method.
1095 # So the method isn't bound to the instance properly.
1096 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs["iteration"])
1097
1098 def _execute_over_iov(self, **kwargs):
1099 """
1100 Does the actual execute of the algorithm on an IoV and records the result.
1101 """
1102 B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1103
1104 runs_to_execute = kwargs["runs"]
1105 iov = kwargs["apply_iov"]
1106 iteration = kwargs["iteration"]
1107 if not iov:
1108 iov = iov_from_runs(runs_to_execute)
1109 B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1110 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1111 self.result = IoV_Result(iov, alg_result)
1112
1113 def _set_input_data(self, **kwargs):
1114 """set input data"""
1115 self.algorithm.data_input(self.input_files)
1116
1117
1118class MachineError(Exception):
1119 """
1120 Base exception class for this module.
1121 """
1122
1123
1124class ConditionError(MachineError):
1125 """
1126 Exception for when conditions fail during a transition.
1127 """
1128
1129
1131 """
1132 Exception for when transitions fail.
1133 """
STL class.
list required_true_attrs
Attributes that must have a value that returns True when tested.
str output_database_dir
The output database directory for the localdb that the algorithm will commit to.
list input_files
Collector output files, will contain all files returned by the output patterns.
list dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
list database_chain
Assigned database chain to the overall Calibration object, or to the 'default' Collection.
list default_states
Default states for the AlgorithmMachine.
__init__(self, algorithm=None, initial_state="init")
list required_attrs
Required attributes that must exist before the machine can run properly.
algorithm
Algorithm() object whose state we are modelling.
result
IoV_Result object for a single execution, will be reset upon a new execution.
str output_dir
The algorithm output directory which is mostly used to store the stdout file.
root_dir
root directory for this Calibration
_make_pre_collector_path(self, name, collection)
dict _collector_timing
Times of various useful updates to the collector job e.g.
files_containing_iov(self, file_paths, files_to_iovs, iov)
_make_collector_path(self, name, collection)
_runner_final_state
Final state of the algorithm runner for the current iteration.
list default_states
States that are defaults to the CalibrationMachine (could override later)
str collector_output_dir
output directory of collector
iov_to_calibrate
IoV to be executed, currently will loop over all runs in IoV.
dict _collector_jobs
The collector jobs used for submission.
__init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0)
iteration
Which iteration step are we in.
str collector_input_dir
input directory of collector
collector_backend
Backend used for this calibration machine collector.
dict _algorithm_results
Results of each iteration for all algorithms of this calibration.
calibration
Calibration object whose state we are modelling.
add_transition(self, trigger, source, dest, conditions=None, before=None, after=None)
_trigger(self, transition_name, transition_dict, **kwargs)
dict states
Valid states for this machine.
save_graph(self, filename, graphname)
__init__(self, states=None, initial_state="default_initial")
transitions
Allowed transitions between states.
_callback(func, **kwargs)
add_state(self, state, enter=None, exit=None)
get_transitions(self, source)
dict _state
Actual attribute holding the Current state.
initial_state
Pointless docstring since it's a property.
dict _initial_state
Actual attribute holding initial state for this machine.
state
Current State of machine.
__getattr__(self, name, **kwargs)
get_transition_dict(self, state, transition)
_add_callbacks(self, callback, attribute)
__init__(self, name, enter=None, exit=None)
name
Name of the State.
on_enter
Callback list when entering state.
list _on_enter
set state as empty list when entering it
on_exit
Callback list when exiting state.
list _on_exit
set state as empty list when exiting it