Belle II Software development
state_machines.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14import basf2
15
16from functools import partial
17from collections import defaultdict
18
19import pickle
20import glob
21import shutil
22import time
23from pathlib import Path
24import os
25import json
26
27from basf2 import create_path
28from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
29from basf2 import conditions as b2conditions
30from basf2.pickle_path import serialize_path
31
32from ROOT import Belle2 # noqa: make the Belle2 namespace available
33from ROOT.Belle2 import CalibrationAlgorithm
34
35from caf.utils import create_directories
36from caf.utils import method_dispatch
37from caf.utils import iov_from_runs
38from caf.utils import IoV_Result
39from caf.utils import get_iov_from_file
40from caf.backends import Job
41from caf.runners import AlgorithmsRunner
42
43
44class State():
45 """
46 Basic State object that can take enter and exit state methods and records
47 the state of a machine.
48
49 You should assign the self.on_enter or self.on_exit attributes to callback functions
50 or lists of them, if you need them.
51 """
52
53 def __init__(self, name, enter=None, exit=None):
54 """
55 Initialise State with a name and optional lists of callbacks.
56 """
57
58 self.name = name
59
60 self.on_enter = enter
61
62 self.on_exit = exit
63
64 @property
65 def on_enter(self):
66 """
67 Runs callbacks when a state is entered.
68 """
69 return self._on_enter
70
71 @on_enter.setter
72 def on_enter(self, callbacks):
73 """
74 """
75 self._on_enter = []
76 if callbacks:
77 self._add_callbacks(callbacks, self._on_enter)
78
79 @property
80 def on_exit(self):
81 """
82 Runs callbacks when a state is exited.
83 """
84 return self._on_exit
85
86 @on_exit.setter
87 def on_exit(self, callbacks):
88 """
89 """
90 self._on_exit = []
91 if callbacks:
92 self._add_callbacks(callbacks, self._on_exit)
93
94 @method_dispatch
95 def _add_callbacks(self, callback, attribute):
96 """
97 Adds callback to a property.
98 """
99 if callable(callback):
100 attribute.append(callback)
101 else:
102 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
103
104 @_add_callbacks.register(tuple)
105 @_add_callbacks.register(list)
106 def _(self, callbacks, attribute):
107 """
108 Alternate method for lists and tuples of function objects.
109 """
110 if callbacks:
111 for function in callbacks:
112 if callable(function):
113 attribute.append(function)
114 else:
115 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
116
117 def __str__(self):
118 """
119 """
120 return self.name
121
122 def __repr__(self):
123 """
124 """
125 return f"State(name={self.name})"
126
127 def __eq__(self, other):
128 """
129 """
130 if isinstance(other, str):
131 return self.name == other
132 else:
133 return self.name == other.name
134
135 def __hash__(self):
136 """
137 """
138 return hash(self.name)
139
140
141class Machine():
142 """
143 Parameters:
144 states (list[str]): A list of possible states of the machine.
145 initial_state (str):
146
147 Base class for a final state machine wrapper.
148 Implements the framework that a more complex machine can inherit from.
149
150 The `transitions` attribute is a dictionary of trigger name keys, each value of
151 which is another dictionary of 'source' states, 'dest' states, and 'conditions'
152 methods. 'conditions' should be a list of callables or a single one. A transition is
153 valid if it goes from an allowed state to an allowed state.
154 Conditions are optional but must be a callable that returns True or False based
155 on some state of the machine. They cannot have input arguments currently.
156
157 Every condition/before/after callback function MUST take ``**kwargs`` as the only
158 argument (except ``self`` if it's a class method). This is because it's basically
159 impossible to determine which arguments to pass to which functions for a transition.
160 Therefore this machine just enforces that every function should simply take ``**kwargs``
161 and use the dictionary of arguments (even if it doesn't need any arguments).
162
163 This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
164 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
165 will *not* work.
166 """
167
168 def __init__(self, states=None, initial_state="default_initial"):
169 """
170 Basic Setup of states and initial_state.
171 """
172
173 self.states = {}
174 if states:
175 for state in states:
176 self.add_state(state)
177 if initial_state != "default_initial":
178
179 self.initial_state = initial_state
180 else:
181 self.add_state(initial_state)
182
183 self._initial_state = State(initial_state)
184
185
186 self._state = self.initial_state
187
188 self.transitions = defaultdict(list)
189
190 def add_state(self, state, enter=None, exit=None):
191 """
192 Adds a single state to the list of possible ones.
193 Should be a unique string or a State object with a unique name.
194 """
195 if isinstance(state, str):
196 self.add_state(State(state, enter, exit))
197 elif isinstance(state, State):
198 if state.name not in self.states.keys():
199 self.states[state.name] = state
200 else:
201 B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
202 else:
203 B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
204
205 @property
206 def initial_state(self):
207 """
208 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
209 """
210 return self._initial_state
211
212 @initial_state.setter
213 def initial_state(self, state):
214 """
215 """
216 if state in self.states.keys():
217 self._initial_state = self.states[state]
218
219 self._state = self.states[state]
220 else:
221 raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
222
223 @property
224 def state(self):
225 """
226 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
227 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
228 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
229 """
230 return self._state
231
232 @state.setter
233 def state(self, state):
234 """
235 """
236 if isinstance(state, str):
237 state_name = state
238 else:
239 state_name = state.name
240
241 try:
242 state = self.states[state_name]
243 # Run exit callbacks of current state
244 for callback in self.state.on_exit:
245 callback(prior_state=self.state, new_state=state)
246 # Run enter callbacks of new state
247 for callback in state.on_enter:
248 callback(prior_state=self.state, new_state=state)
249 # Set the state
250 self._state = state
251 except KeyError:
252 raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
253
254 @staticmethod
255 def default_condition(**kwargs):
256 """
257 Method to always return True.
258 """
259 return True
260
261 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
262 """
263 Adds a single transition to the dictionary of possible ones.
264 Trigger is the method name that begins the transition between the
265 source state and the destination state.
266
267 The condition is an optional function that returns True or False
268 depending on the current state/input.
269 """
270 transition_dict = {}
271 try:
272 source = self.states[source]
273 dest = self.states[dest]
274 transition_dict["source"] = source
275 transition_dict["dest"] = dest
276 except KeyError as err:
277 B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
278 raise err
279 if conditions:
280 if isinstance(conditions, (list, tuple, set)):
281 transition_dict["conditions"] = list(conditions)
282 else:
283 transition_dict["conditions"] = [conditions]
284 else:
285 transition_dict["conditions"] = [Machine.default_condition]
286
287 if not before:
288 before = []
289 if isinstance(before, (list, tuple, set)):
290 transition_dict["before"] = list(before)
291 else:
292 transition_dict["before"] = [before]
293
294 if not after:
295 after = []
296 if isinstance(after, (list, tuple, set)):
297 transition_dict["after"] = list(after)
298 else:
299 transition_dict["after"] = [after]
300
301 self.transitions[trigger].append(transition_dict)
302
303 def __getattr__(self, name, **kwargs):
304 """
305 Allows us to create a new method for each trigger on the fly.
306 If there is no trigger name in the machine to match, then the normal
307 AttributeError is called.
308 """
309 possible_transitions = self.get_transitions(self.state)
310 if name not in possible_transitions:
311 raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
312 transition_dict = self.get_transition_dict(self.state, name)
313 return partial(self._trigger, name, transition_dict, **kwargs)
314
315 def _trigger(self, transition_name, transition_dict, **kwargs):
316 """
317 Runs the transition logic. Callbacks are evaluated in the order:
318 conditions -> before -> <new state set here> -> after.
319 """
320 dest, conditions, before_callbacks, after_callbacks = (
321 transition_dict["dest"],
322 transition_dict["conditions"],
323 transition_dict["before"],
324 transition_dict["after"]
325 )
326 # Returns True only if every condition returns True when called
327 if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
328 for before_func in before_callbacks:
329 self._callback(before_func, **kwargs)
330
331 self.state = dest
332 for after_func in after_callbacks:
333 self._callback(after_func, **kwargs)
334 else:
335 raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
336 "evaluated False")
337
338 @staticmethod
339 def _callback(func, **kwargs):
340 """
341 Calls a condition/before/after.. function using arguments passed (or not).
342 """
343 return func(**kwargs)
344
345 def get_transitions(self, source):
346 """
347 Returns allowed transitions from a given state.
348 """
349 possible_transitions = []
350 for transition, transition_dicts in self.transitions.items():
351 for transition_dict in transition_dicts:
352 if transition_dict["source"] == source:
353 possible_transitions.append(transition)
354 return possible_transitions
355
356 def get_transition_dict(self, state, transition):
357 """
358 Returns the transition dictionary for a state and transition out of it.
359 """
360 transition_dicts = self.transitions[transition]
361 for transition_dict in transition_dicts:
362 if transition_dict["source"] == state:
363 return transition_dict
364 else:
365 raise KeyError(f"No transition from state {state} with the name {transition}.")
366
367 def save_graph(self, filename, graphname):
368 """
369 Does a simple dot file creation to visualise states and transiitons.
370 """
371 with open(filename, "w") as dotfile:
372 dotfile.write("digraph " + graphname + " {\n")
373 for state in self.states.keys():
374 dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
375 for trigger, transition_dicts in self.transitions.items():
376 for transition in transition_dicts:
377 dotfile.write('"' + transition["source"].name + '" -> "' +
378 transition["dest"].name + '" [label="' + trigger + '"]\n')
379 dotfile.write("}\n")
380
381
382class CalibrationMachine(Machine):
383 """
384 A state machine to handle `Calibration` objects and the flow of
385 processing for them.
386 """
387
388 collector_input_dir = 'collector_input'
389 collector_output_dir = 'collector_output'
390 algorithm_output_dir = 'algorithm_output'
391
392 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
393 """
394 Takes a Calibration object from the caf framework and lets you
395 set the initial state.
396 """
397
398 self.default_states = [State("init", enter=[self._update_cal_state,
399 self._log_new_state]),
400 State("running_collector", enter=[self._update_cal_state,
401 self._log_new_state]),
402 State("collector_failed", enter=[self._update_cal_state,
403 self._log_new_state]),
404 State("collector_completed", enter=[self._update_cal_state,
405 self._log_new_state]),
406 State("running_algorithms", enter=[self._update_cal_state,
407 self._log_new_state]),
408 State("algorithms_failed", enter=[self._update_cal_state,
409 self._log_new_state]),
410 State("algorithms_completed", enter=[self._update_cal_state,
411 self._log_new_state]),
412 State("completed", enter=[self._update_cal_state,
413 self._log_new_state]),
414 State("failed", enter=[self._update_cal_state,
415 self._log_new_state])
416 ]
417
418 super().__init__(self.default_states, initial_state)
419
420
421 self.calibration = calibration
422 # Monkey Patching for the win!
423
424 self.calibration.machine = self
425
426 self.iteration = iteration
427
428 self.collector_backend = None
429
430 self._algorithm_results = {}
431
432 self._runner_final_state = None
433
434 self.iov_to_calibrate = iov_to_calibrate
435
436 self.root_dir = Path(os.getcwd(), calibration.name)
437
438
441 self._collector_timing = {}
442
443
444 self._collector_jobs = {}
445
446 self.add_transition("submit_collector", "init", "running_collector",
447 conditions=self.dependencies_completed,
448 before=[self._make_output_dir,
449 self._resolve_file_paths,
450 self._build_iov_dicts,
451 self._create_collector_jobs,
452 self._submit_collections,
453 self._dump_job_config])
454 self.add_transition("fail", "running_collector", "collector_failed",
455 conditions=self._collection_failed)
456 self.add_transition("complete", "running_collector", "collector_completed",
457 conditions=self._collection_completed)
458 self.add_transition("run_algorithms", "collector_completed", "running_algorithms",
459 before=self._check_valid_collector_output,
460 after=[self._run_algorithms,
461 self.automatic_transition])
462 self.add_transition("complete", "running_algorithms", "algorithms_completed",
463 after=self.automatic_transition,
464 conditions=self._runner_not_failed)
465 self.add_transition("fail", "running_algorithms", "algorithms_failed",
466 conditions=self._runner_failed)
467 self.add_transition("iterate", "algorithms_completed", "init",
468 conditions=[self._require_iteration,
469 self._below_max_iterations],
470 after=self._increment_iteration)
471 self.add_transition("finish", "algorithms_completed", "completed",
472 conditions=self._no_require_iteration,
473 before=self._prepare_final_db)
474 self.add_transition("fail_fully", "algorithms_failed", "failed")
475 self.add_transition("fail_fully", "collector_failed", "failed")
476
477 def _update_cal_state(self, **kwargs):
478 self.calibration.state = str(kwargs["new_state"])
479
480 def files_containing_iov(self, file_paths, files_to_iovs, iov):
481 """
482 Lookup function that returns all files from the file_paths that
483 overlap with this IoV.
484 """
485 # Files that contain an Exp,Run range that overlaps with given IoV
486 overlapping_files = set()
487
488 for file_path, file_iov in files_to_iovs.items():
489 if file_iov.overlaps(iov) and (file_path in file_paths):
490 overlapping_files.add(file_path)
491 return overlapping_files
492
493 def _dump_job_config(self):
494 """
495 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
496 later in case of failure.
497 """
498 # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
499 # submits the jobs this might need to wait a while.
500 while any(map(lambda j: j.status == "init", self._collector_jobs.values())):
501 B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
502 time.sleep(5)
503
504 for collection_name, job in self._collector_jobs.items():
505 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
506 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
507 collection_name, collector_job_output_file_name)
508 job.dump_to_json(output_file)
509
510 def _recover_collector_jobs(self):
511 """
512 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
513 """
514 for collection_name, collection in self.calibration.collections.items():
515 output_file = self.root_dir.joinpath(str(self.iteration),
516 self.collector_input_dir,
517 collection_name,
518 collection.job_config)
519 self._collector_jobs[collection_name] = Job.from_json(output_file)
520
521 def _iov_requested(self):
522 """
523 """
524 if self.iov_to_calibrate:
525 B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
526 return True
527 else:
528 B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
529 return False
530
531 def _resolve_file_paths(self):
532 """
533 """
534 pass
535
536 def _build_iov_dicts(self):
537 """
538 Build IoV file dictionary for each collection if required.
539 """
540 iov_requested = self._iov_requested()
541 if iov_requested or self.calibration.ignored_runs:
542 for coll_name, collection in self.calibration.collections.items():
543 if not collection.files_to_iovs:
544 B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
545 f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
546 " Filling dictionary from input file metadata."
547 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
548
549 files_to_iovs = {}
550 for file_path in collection.input_files:
551 files_to_iovs[file_path] = get_iov_from_file(file_path)
552 collection.files_to_iovs = files_to_iovs
553 else:
554 B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
555 f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
556 else:
557 B2INFO("No File to IoV mapping required.")
558
559 def _below_max_iterations(self):
560 """
561 """
562 return self.iteration < self.calibration.max_iterations
563
564 def _increment_iteration(self):
565 """
566 """
567 self.iteration += 1
568 self.calibration.iteration = self.iteration
569
570 def _collection_completed(self):
571 """
572 Did all the collections succeed?
573 """
574 B2DEBUG(29, "Checking for failed collector job.")
575 if self._collector_jobs_ready():
576 return all([job.status == "completed" for job in self._collector_jobs.values()])
577
578 def _collection_failed(self):
579 """
580 Did any of the collections fail?
581 """
582 B2DEBUG(29, "Checking for failed collector job.")
583 if self._collector_jobs_ready():
584 return any([job.status == "failed" for job in self._collector_jobs.values()])
585
586 def _runner_not_failed(self):
587 """
588 Returns:
589 bool: If AlgorithmsRunner succeeded return True.
590 """
591 return not self._runner_failed()
592
593 def _runner_failed(self):
594 """
595 Returns:
596 bool: If AlgorithmsRunner failed return True.
597 """
598 if self._runner_final_state == AlgorithmsRunner.FAILED:
599 return True
600 else:
601 return False
602
603 def _collector_jobs_ready(self):
604 """
605 """
606 since_last_update = time.time() - self._collector_timing["last_update"]
607 if since_last_update > self.calibration.collector_full_update_interval:
608 B2INFO("Updating full collector job statuses.")
609 for job in self._collector_jobs.values():
610 job.update_status()
611 self._collector_timing["last_update"] = time.time()
612 if job.subjobs:
613 num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
614 total_subjobs = len(job.subjobs)
615 B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
616 f" Calibration {self.calibration.name} Job {job.name}.")
617 return all([job.ready() for job in self._collector_jobs.values()])
618
619 def _submit_collections(self):
620 """
621 """
622 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
623 self._collector_timing["start"] = time.time()
624 self._collector_timing["last_update"] = time.time()
625
626 def _no_require_iteration(self):
627 """
628 """
629 if self._require_iteration() and self._below_max_iterations():
630 return False
631 elif self._require_iteration() and not self._below_max_iterations():
632 B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
633 return True
634 elif not self._require_iteration():
635 return True
636
637 def _require_iteration(self):
638 """
639 """
640 iteration_called = False
641 for alg_name, results in self._algorithm_results[self.iteration].items():
642 for result in results:
643 if result.result == CalibrationAlgorithm.c_Iterate:
644 iteration_called = True
645 break
646 if iteration_called:
647 break
648 return iteration_called
649
650 def _log_new_state(self, **kwargs):
651 """
652 """
653 B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
654
655 def dependencies_completed(self):
656 """
657 Condition function to check that the dependencies of our calibration are in the 'completed' state.
658 Technically only need to check explicit dependencies.
659 """
660 for calibration in self.calibration.dependencies:
661 if not calibration.state == calibration.end_state:
662 return False
663 else:
664 return True
665
666 def automatic_transition(self):
667 """
668 Automatically try all transitions out of this state once. Tries fail last.
669 """
670 possible_transitions = self.get_transitions(self.state)
671 for transition in possible_transitions:
672 try:
673 if transition != "fail":
674 getattr(self, transition)()
675 break
676 except ConditionError:
677 continue
678 else:
679 if "fail" in possible_transitions:
680 getattr(self, "fail")()
681 else:
682 raise MachineError(f"Failed to automatically transition out of {self.state} state.")
683
684 def _make_output_dir(self):
685 """
686 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
687 Also creates s
688 """
689 create_directories(self.root_dir, overwrite=False)
690
691 def _make_collector_path(self, name, collection):
692 """
693 Creates a basf2 path for the correct collector and serializes it in the
694 self.output_dir/<calibration_name>/<iteration>/paths directory
695 """
696 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
697 # Automatically overwrite any previous directory
698 create_directories(path_output_dir)
699 path_file_name = collection.collector.name() + '.path'
700 path_file_name = path_output_dir / path_file_name
701 # Create empty path and add collector to it
702 coll_path = create_path()
703 coll_path.add_module(collection.collector)
704 # Dump the basf2 path to file
705 with open(path_file_name, 'bw') as serialized_path_file:
706 pickle.dump(serialize_path(coll_path), serialized_path_file)
707 # Return the pickle file path for addition to the input sandbox
708 return str(path_file_name.absolute())
709
710 def _make_pre_collector_path(self, name, collection):
711 """
712 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
713 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
714 """
715 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
716 coll_path = collection.pre_collector_path
717 path_file_name = 'pre_collector.path'
718 path_file_name = os.path.join(path_output_dir, path_file_name)
719 # Dump the basf2 path to file
720 with open(path_file_name, 'bw') as serialized_path_file:
721 pickle.dump(serialize_path(coll_path), serialized_path_file)
722 # Return the pickle file path for addition to the input sandbox
723 return path_file_name
724
725 def _create_collector_jobs(self):
726 """
727 Creates a Job object for the collections of this iteration, ready for submission
728 to backend.
729 """
730 for collection_name, collection in self.calibration.collections.items():
731 iteration_dir = self.root_dir.joinpath(str(self.iteration))
732 job = Job('_'.join([self.calibration.name, collection_name, 'Iteration', str(self.iteration)]))
733 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
734 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
735 # Remove previous failed attempt to avoid problems
736 if job.output_dir.exists():
737 B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
738 f"Deleting {job.output_dir} before re-submitting.")
739 shutil.rmtree(job.output_dir)
740 job.cmd = collection.job_cmd
741 job.append_current_basf2_setup_cmds()
742 job.input_sandbox_files.append(collection.job_script)
743 collector_path_file = Path(self._make_collector_path(collection_name, collection))
744 job.input_sandbox_files.append(collector_path_file)
745 if collection.pre_collector_path:
746 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
747 job.input_sandbox_files.append(pre_collector_path_file)
748
749 # Want to figure out which local databases are required for this job and their paths
750 list_dependent_databases = []
751
752 # Here we add the finished databases of previous calibrations that we depend on.
753 # We can assume that the databases exist as we can't be here until they have returned
754 for dependency in self.calibration.dependencies:
755 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
756 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
757 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
758
759 # Add previous iteration databases from this calibration
760 if self.iteration > 0:
761 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
762 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
763 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
764 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
765
766 # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
767 # collector path
768 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
769
770 # Need to pass setup info to collector which would be tricky as arguments
771 # We make a dictionary and pass it in as json
772 job_config = {}
773 # Apply the user-set Calibration database chain to the base of the overall chain.
774 json_db_chain = []
775 for database in collection.database_chain:
776 if database.db_type == 'local':
777 json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
778 elif database.db_type == 'central':
779 json_db_chain.append(('central', database.global_tag))
780 else:
781 raise ValueError(f"Unknown database type {database.db_type}.")
782 # CAF created ones for dependent calibrations and previous iterations of this calibration
783 for database in list_dependent_databases:
784 json_db_chain.append(('local', database))
785 job_config['database_chain'] = json_db_chain
786
787 job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
788 with open(job_config_file_path, 'w') as job_config_file:
789 json.dump(job_config, job_config_file, indent=2)
790 job.input_sandbox_files.append(job_config_file_path)
791
792 # Define the input files
793 input_data_files = set(collection.input_files)
794 # Reduce the input data files to only those that overlap with the optional requested IoV
795 if self.iov_to_calibrate:
796 input_data_files = self.files_containing_iov(input_data_files,
797 collection.files_to_iovs,
798 self.iov_to_calibrate)
799 # Remove any files that ONLY contain runs from our optional ignored_runs list
800 files_to_ignore = set()
801 for exprun in self.calibration.ignored_runs:
802 for input_file in input_data_files:
803 file_iov = self.calibration.files_to_iovs[input_file]
804 if file_iov == exprun.make_iov():
805 B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
806 f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
807 "is being removed from input files list.")
808 files_to_ignore.add(input_file)
809 input_data_files.difference_update(files_to_ignore)
810
811 if not input_data_files:
812 raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
813 f" and Collection '{collection_name}'.")
814 job.input_files = list(input_data_files)
815
816 job.splitter = collection.splitter
817 job.backend_args = collection.backend_args
818 # Output patterns to be returned from collector job
819 job.output_patterns = collection.output_patterns
820 B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
821 self._collector_jobs[collection_name] = job
822
823 def _check_valid_collector_output(self):
824 B2INFO("Checking that Collector output exists for all collector jobs "
825 f"using {self.calibration.name}.output_patterns.")
826 if not self._collector_jobs:
827 B2INFO("We're restarting so we'll recreate the collector Job object.")
828 self._recover_collector_jobs()
829
830 for job in self._collector_jobs.values():
831 if not job.subjobs:
832 output_files = []
833 for pattern in job.output_patterns:
834 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
835 if not output_files:
836 raise MachineError("No output files from Collector Job")
837 else:
838 for subjob in job.subjobs.values():
839 output_files = []
840 for pattern in subjob.output_patterns:
841 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
842 if not output_files:
843 raise MachineError(f"No output files from Collector {subjob}")
844
845 def _run_algorithms(self):
846 """
847 Runs the Calibration Algorithms for this calibration machine.
848
849 Will run them sequentially locally (possible benefits to using a
850 processing pool for low memory algorithms later on.)
851 """
852 # Get an instance of the Runner for these algorithms and run it
853 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
854 algs_runner.algorithms = self.calibration.algorithms
855 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
856 output_database_dir = algorithm_output_dir.joinpath("outputdb")
857 # Remove it, if we failed previously, to start clean
858 if algorithm_output_dir.exists():
859 B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
860 f"Deleting and recreating {algorithm_output_dir}.")
861 create_directories(algorithm_output_dir)
862 B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
863 algs_runner.output_database_dir = output_database_dir
864 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
865 input_files = []
866
867 for job in self._collector_jobs.values():
868 if job.subjobs:
869 for subjob in job.subjobs.values():
870 for pattern in subjob.output_patterns:
871 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
872 else:
873 for pattern in job.output_patterns:
874 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
875
876 algs_runner.input_files = input_files
877
878 # Add any user defined database chain for this calibration
879 algs_runner.database_chain = self.calibration.database_chain
880
881 # Here we add the finished databases of previous calibrations that we depend on.
882 # We can assume that the databases exist as we can't be here until they have returned
883 list_dependent_databases = []
884 for dependency in self.calibration.dependencies:
885 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
886 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
887 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
888
889 # Add previous iteration databases from this calibration
890 if self.iteration > 0:
891 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
892 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
893 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
894 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
895 algs_runner.dependent_databases = list_dependent_databases
896
897 algs_runner.ignored_runs = self.calibration.ignored_runs
898
899 try:
900 algs_runner.run(self.iov_to_calibrate, self.iteration)
901 except Exception as err:
902 print(err)
903 # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
904 # results. But here we had an actual exception so we just force into failure instead.
905 self._state = State("algorithms_failed")
906 self._algorithm_results[self.iteration] = algs_runner.results
907 self._runner_final_state = algs_runner.final_state
908
909 def _prepare_final_db(self):
910 """
911 Take the last iteration's outputdb and copy it to a more easily findable place.
912 """
913 database_location = self.root_dir.joinpath(str(self.iteration),
914 self.calibration.alg_output_dir,
915 'outputdb')
916 final_database_location = self.root_dir.joinpath('outputdb')
917 if final_database_location.exists():
918 B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
919 shutil.rmtree(final_database_location)
920 shutil.copytree(database_location, final_database_location)
921
922
923class AlgorithmMachine(Machine):
924 """
925 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
926 """
927
928
930 required_attrs = ["algorithm",
931 "dependent_databases",
932 "database_chain",
933 "output_dir",
934 "output_database_dir",
935 "input_files"
936 ]
937
938
939 required_true_attrs = ["algorithm",
940 "output_dir",
941 "output_database_dir",
942 "input_files"
943 ]
944
945 def __init__(self, algorithm=None, initial_state="init"):
946 """
947 Takes an Algorithm object from the caf framework and defines the transitions.
948 """
949
950 self.default_states = [State("init"),
951 State("ready"),
952 State("running_algorithm"),
953 State("completed"),
954 State("failed")]
955
956 super().__init__(self.default_states, initial_state)
957
958
959 self.algorithm = algorithm
960
961 self.input_files = []
962
963 self.dependent_databases = []
964
966 self.database_chain = []
967
968 self.output_dir = ""
969
970 self.output_database_dir = ""
971
972 self.result = None
973
974 self.add_transition("setup_algorithm", "init", "ready",
975 before=[self._setup_logging,
976 self._change_working_dir,
977 self._setup_database_chain,
978 self._set_input_data,
979 self._pre_algorithm])
980 self.add_transition("execute_runs", "ready", "running_algorithm",
981 after=self._execute_over_iov)
982 self.add_transition("complete", "running_algorithm", "completed")
983 self.add_transition("fail", "running_algorithm", "failed")
984 self.add_transition("fail", "ready", "failed")
985 self.add_transition("setup_algorithm", "completed", "ready")
986 self.add_transition("setup_algorithm", "failed", "ready")
987
988 def setup_from_dict(self, params):
989 """
990 Parameters:
991 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
992 """
993 for attribute_name, value in params.items():
994 setattr(self, attribute_name, value)
995
996 def is_valid(self):
997 """
998 Returns:
999 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
1000 """
1001 B2INFO(f"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
1002 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
1003 for attribute_name in self.required_attrs:
1004 if not hasattr(self, attribute_name):
1005 B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1006 return False
1007 # Check if any attributes that need actual values haven't been set or were empty
1008 for attribute_name in self.required_true_attrs:
1009 if not getattr(self, attribute_name):
1010 B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1011 return False
1012 return True
1013
1014 def _create_output_dir(self, **kwargs):
1015 """
1016 Create working/output directory of algorithm. Any old directory is overwritten.
1017 """
1018 create_directories(Path(self.output_dir), overwrite=True)
1019
1020 def _setup_database_chain(self, **kwargs):
1021 """
1022 Apply all databases in the correct order.
1023 """
1024 # We deliberately override the normal database ordering because we don't want input files GTs to affect
1025 # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1026 b2conditions.reset()
1027 b2conditions.override_globaltags()
1028
1029 # Apply all the databases in order, starting with the user-set chain for this Calibration
1030 for database in self.database_chain:
1031 if database.db_type == 'local':
1032 B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1033 f"for {self.algorithm.name}.")
1034 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1035 elif database.db_type == 'central':
1036 B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1037 f"for {self.algorithm.name}.")
1038 b2conditions.prepend_globaltag(database.global_tag)
1039 else:
1040 raise ValueError(f"Unknown database type {database.db_type}.")
1041 # Here we add the finished databases of previous calibrations that we depend on.
1042 # We can assume that the databases exist as we can't be here until they have returned
1043 # with OK status.
1044 for filename, directory in self.dependent_databases:
1045 B2INFO(f"Adding Local Database {filename} to head of chain of local databases created by"
1046 f" a dependent calibration, for {self.algorithm.name}.")
1047 b2conditions.prepend_testing_payloads(filename)
1048
1049 # Create a directory to store the payloads of this algorithm
1050 create_directories(Path(self.output_database_dir), overwrite=False)
1051
1052 # add local database to save payloads
1053 B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1054 # Things have changed. We now need to do the expert settings to create a database directly.
1055 # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1056 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath("database.txt")))
1057
1058 def _setup_logging(self, **kwargs):
1059 """
1060 """
1061 # add logfile for output
1062 log_file = os.path.join(self.output_dir, self.algorithm.name + '_stdout')
1063 B2INFO(f"Output log file at {log_file}.")
1064 basf2.reset_log()
1065 basf2.set_log_level(basf2.LogLevel.INFO)
1066 basf2.log_to_file(log_file)
1067
1068 def _change_working_dir(self, **kwargs):
1069 """
1070 """
1071 B2INFO(f"Changing current working directory to {self.output_dir}.")
1072 os.chdir(self.output_dir)
1073
1074 def _pre_algorithm(self, **kwargs):
1075 """
1076 Call the user defined algorithm setup function.
1077 """
1078 B2INFO("Running Pre-Algorithm function (if exists)")
1079 if self.algorithm.pre_algorithm:
1080 # We have to re-pass in the algorithm here because an outside user has created this method.
1081 # So the method isn't bound to the instance properly.
1082 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs["iteration"])
1083
1084 def _execute_over_iov(self, **kwargs):
1085 """
1086 Does the actual execute of the algorithm on an IoV and records the result.
1087 """
1088 B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1089
1090 runs_to_execute = kwargs["runs"]
1091 iov = kwargs["apply_iov"]
1092 iteration = kwargs["iteration"]
1093 if not iov:
1094 iov = iov_from_runs(runs_to_execute)
1095 B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1096 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1097 self.result = IoV_Result(iov, alg_result)
1098
1099 def _set_input_data(self, **kwargs):
1100 self.algorithm.data_input(self.input_files)
1101
1102
1103class MachineError(Exception):
1104 """
1105 Base exception class for this module.
1106 """
1107
1108
1109class ConditionError(MachineError):
1110 """
1111 Exception for when conditions fail during a transition.
1112 """
1113
1114
1115class TransitionError(MachineError):
1116 """
1117 Exception for when transitions fail.
1118 """
1119
1120# @endcond
STL class.