Belle II Software development
state_machines.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14import basf2
15
16from functools import partial
17from collections import defaultdict
18
19import pickle
20import glob
21import shutil
22import time
23from pathlib import Path
24import os
25import json
26
27from basf2 import create_path
28from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
29from basf2 import conditions as b2conditions
30from basf2.pickle_path import serialize_path
31
32from ROOT import Belle2 # noqa: make the Belle2 namespace available
33from ROOT.Belle2 import CalibrationAlgorithm
34
35from caf.utils import create_directories
36from caf.utils import method_dispatch
37from caf.utils import iov_from_runs
38from caf.utils import IoV_Result
39from caf.utils import get_iov_from_file
40from caf.backends import Job
41from caf.runners import AlgorithmsRunner
42
43
44class State():
45 """
46 Basic State object that can take enter and exit state methods and records
47 the state of a machine.
48
49 You should assign the self.on_enter or self.on_exit attributes to callback functions
50 or lists of them, if you need them.
51 """
52
53 def __init__(self, name, enter=None, exit=None):
54 """
55 Initialise State with a name and optional lists of callbacks.
56 """
57
58 self.name = name
59
60 self.on_enter = enter
61
62 self.on_exit = exit
63
64 @property
65 def on_enter(self):
66 """
67 Runs callbacks when a state is entered.
68 """
69 return self._on_enter
70
71 @on_enter.setter
72 def on_enter(self, callbacks):
73 """
74 """
75 self._on_enter = []
76 if callbacks:
77 self._add_callbacks(callbacks, self._on_enter)
78
79 @property
80 def on_exit(self):
81 """
82 Runs callbacks when a state is exited.
83 """
84 return self._on_exit
85
86 @on_exit.setter
87 def on_exit(self, callbacks):
88 """
89 """
90 self._on_exit = []
91 if callbacks:
92 self._add_callbacks(callbacks, self._on_exit)
93
94 @method_dispatch
95 def _add_callbacks(self, callback, attribute):
96 """
97 Adds callback to a property.
98 """
99 if callable(callback):
100 attribute.append(callback)
101 else:
102 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
103
104 @_add_callbacks.register(tuple)
105 @_add_callbacks.register(list)
106 def _(self, callbacks, attribute):
107 """
108 Alternate method for lists and tuples of function objects.
109 """
110 if callbacks:
111 for function in callbacks:
112 if callable(function):
113 attribute.append(function)
114 else:
115 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
116
117 def __str__(self):
118 """
119 """
120 return self.name
121
122 def __repr__(self):
123 """
124 """
125 return f"State(name={self.name})"
126
127 def __eq__(self, other):
128 """
129 """
130 if isinstance(other, str):
131 return self.name == other
132 else:
133 return self.name == other.name
134
135 def __hash__(self):
136 """
137 """
138 return hash(self.name)
139
140
141class Machine():
142 """
143 Parameters:
144 states (list[str]): A list of possible states of the machine.
145 initial_state (str):
146
147 Base class for a final state machine wrapper.
148 Implements the framework that a more complex machine can inherit from.
149
150 The `transitions` attribute is a dictionary of trigger name keys, each value of
151 which is another dictionary of 'source' states, 'dest' states, and 'conditions'
152 methods. 'conditions' should be a list of callables or a single one. A transition is
153 valid if it goes from an allowed state to an allowed state.
154 Conditions are optional but must be a callable that returns True or False based
155 on some state of the machine. They cannot have input arguments currently.
156
157 Every condition/before/after callback function MUST take ``**kwargs`` as the only
158 argument (except ``self`` if it's a class method). This is because it's basically
159 impossible to determine which arguments to pass to which functions for a transition.
160 Therefore this machine just enforces that every function should simply take ``**kwargs``
161 and use the dictionary of arguments (even if it doesn't need any arguments).
162 This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
163 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
164 will *not* work.
165 """
166
167 def __init__(self, states=None, initial_state="default_initial"):
168 """
169 Basic Setup of states and initial_state.
170 """
171
172 self.states = {}
173 if states:
174 for state in states:
175 self.add_state(state)
176 if initial_state != "default_initial":
177
178 self.initial_state = initial_state
179 else:
180 self.add_state(initial_state)
181
182 self._initial_state = State(initial_state)
183
184
185 self._state = self.initial_state
186
187 self.transitions = defaultdict(list)
188
189 def add_state(self, state, enter=None, exit=None):
190 """
191 Adds a single state to the list of possible ones.
192 Should be a unique string or a State object with a unique name.
193 """
194 if isinstance(state, str):
195 self.add_state(State(state, enter, exit))
196 elif isinstance(state, State):
197 if state.name not in self.states.keys():
198 self.states[state.name] = state
199 else:
200 B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
201 else:
202 B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
203
204 @property
205 def initial_state(self):
206 """
207 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
208 """
209 return self._initial_state
210
211 @initial_state.setter
212 def initial_state(self, state):
213 """
214 """
215 if state in self.states.keys():
216 self._initial_state = self.states[state]
217
218 self._state = self.states[state]
219 else:
220 raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
221
222 @property
223 def state(self):
224 """
225 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
226 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
227 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
228 """
229 return self._state
230
231 @state.setter
232 def state(self, state):
233 """
234 """
235 if isinstance(state, str):
236 state_name = state
237 else:
238 state_name = state.name
239
240 try:
241 state = self.states[state_name]
242 # Run exit callbacks of current state
243 for callback in self.state.on_exit:
244 callback(prior_state=self.state, new_state=state)
245 # Run enter callbacks of new state
246 for callback in state.on_enter:
247 callback(prior_state=self.state, new_state=state)
248 # Set the state
249 self._state = state
250 except KeyError:
251 raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
252
253 @staticmethod
254 def default_condition(**kwargs):
255 """
256 Method to always return True.
257 """
258 return True
259
260 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
261 """
262 Adds a single transition to the dictionary of possible ones.
263 Trigger is the method name that begins the transition between the
264 source state and the destination state.
265
266 The condition is an optional function that returns True or False
267 depending on the current state/input.
268 """
269 transition_dict = {}
270 try:
271 source = self.states[source]
272 dest = self.states[dest]
273 transition_dict["source"] = source
274 transition_dict["dest"] = dest
275 except KeyError as err:
276 B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
277 raise err
278 if conditions:
279 if isinstance(conditions, (list, tuple, set)):
280 transition_dict["conditions"] = list(conditions)
281 else:
282 transition_dict["conditions"] = [conditions]
283 else:
284 transition_dict["conditions"] = [Machine.default_condition]
285
286 if not before:
287 before = []
288 if isinstance(before, (list, tuple, set)):
289 transition_dict["before"] = list(before)
290 else:
291 transition_dict["before"] = [before]
292
293 if not after:
294 after = []
295 if isinstance(after, (list, tuple, set)):
296 transition_dict["after"] = list(after)
297 else:
298 transition_dict["after"] = [after]
299
300 self.transitions[trigger].append(transition_dict)
301
302 def __getattr__(self, name, **kwargs):
303 """
304 Allows us to create a new method for each trigger on the fly.
305 If there is no trigger name in the machine to match, then the normal
306 AttributeError is called.
307 """
308 possible_transitions = self.get_transitions(self.state)
309 if name not in possible_transitions:
310 raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
311 transition_dict = self.get_transition_dict(self.state, name)
312 return partial(self._trigger, name, transition_dict, **kwargs)
313
314 def _trigger(self, transition_name, transition_dict, **kwargs):
315 """
316 Runs the transition logic. Callbacks are evaluated in the order:
317 conditions -> before -> <new state set here> -> after.
318 """
319 dest, conditions, before_callbacks, after_callbacks = (
320 transition_dict["dest"],
321 transition_dict["conditions"],
322 transition_dict["before"],
323 transition_dict["after"]
324 )
325 # Returns True only if every condition returns True when called
326 if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
327 for before_func in before_callbacks:
328 self._callback(before_func, **kwargs)
329
330 self.state = dest
331 for after_func in after_callbacks:
332 self._callback(after_func, **kwargs)
333 else:
334 raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
335 "evaluated False")
336
337 @staticmethod
338 def _callback(func, **kwargs):
339 """
340 Calls a condition/before/after.. function using arguments passed (or not).
341 """
342 return func(**kwargs)
343
344 def get_transitions(self, source):
345 """
346 Returns allowed transitions from a given state.
347 """
348 possible_transitions = []
349 for transition, transition_dicts in self.transitions.items():
350 for transition_dict in transition_dicts:
351 if transition_dict["source"] == source:
352 possible_transitions.append(transition)
353 return possible_transitions
354
355 def get_transition_dict(self, state, transition):
356 """
357 Returns the transition dictionary for a state and transition out of it.
358 """
359 transition_dicts = self.transitions[transition]
360 for transition_dict in transition_dicts:
361 if transition_dict["source"] == state:
362 return transition_dict
363 else:
364 raise KeyError(f"No transition from state {state} with the name {transition}.")
365
366 def save_graph(self, filename, graphname):
367 """
368 Does a simple dot file creation to visualise states and transiitons.
369 """
370 with open(filename, "w") as dotfile:
371 dotfile.write("digraph " + graphname + " {\n")
372 for state in self.states.keys():
373 dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
374 for trigger, transition_dicts in self.transitions.items():
375 for transition in transition_dicts:
376 dotfile.write('"' + transition["source"].name + '" -> "' +
377 transition["dest"].name + '" [label="' + trigger + '"]\n')
378 dotfile.write("}\n")
379
380
381class CalibrationMachine(Machine):
382 """
383 A state machine to handle `Calibration` objects and the flow of
384 processing for them.
385 """
386
387 collector_input_dir = 'collector_input'
388 collector_output_dir = 'collector_output'
389 algorithm_output_dir = 'algorithm_output'
390
391 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
392 """
393 Takes a Calibration object from the caf framework and lets you
394 set the initial state.
395 """
396
397 self.default_states = [State("init", enter=[self._update_cal_state,
398 self._log_new_state]),
399 State("running_collector", enter=[self._update_cal_state,
400 self._log_new_state]),
401 State("collector_failed", enter=[self._update_cal_state,
402 self._log_new_state]),
403 State("collector_completed", enter=[self._update_cal_state,
404 self._log_new_state]),
405 State("running_algorithms", enter=[self._update_cal_state,
406 self._log_new_state]),
407 State("algorithms_failed", enter=[self._update_cal_state,
408 self._log_new_state]),
409 State("algorithms_completed", enter=[self._update_cal_state,
410 self._log_new_state]),
411 State("completed", enter=[self._update_cal_state,
412 self._log_new_state]),
413 State("failed", enter=[self._update_cal_state,
414 self._log_new_state])
415 ]
416
417 super().__init__(self.default_states, initial_state)
418
419
420 self.calibration = calibration
421 # Monkey Patching for the win!
422
423 self.calibration.machine = self
424
425 self.iteration = iteration
426
427 self.collector_backend = None
428
429 self._algorithm_results = {}
430
431 self._runner_final_state = None
432
433 self.iov_to_calibrate = iov_to_calibrate
434
435 self.root_dir = Path(os.getcwd(), calibration.name)
436
437
440 self._collector_timing = {}
441
442
443 self._collector_jobs = {}
444
445 self.add_transition("submit_collector", "init", "running_collector",
446 conditions=self.dependencies_completed,
447 before=[self._make_output_dir,
448 self._resolve_file_paths,
449 self._build_iov_dicts,
450 self._create_collector_jobs,
451 self._submit_collections,
452 self._dump_job_config])
453 self.add_transition("fail", "running_collector", "collector_failed",
454 conditions=self._collection_failed)
455 self.add_transition("complete", "running_collector", "collector_completed",
456 conditions=self._collection_completed)
457 self.add_transition("run_algorithms", "collector_completed", "running_algorithms",
458 before=self._check_valid_collector_output,
459 after=[self._run_algorithms,
460 self.automatic_transition])
461 self.add_transition("complete", "running_algorithms", "algorithms_completed",
462 after=self.automatic_transition,
463 conditions=self._runner_not_failed)
464 self.add_transition("fail", "running_algorithms", "algorithms_failed",
465 conditions=self._runner_failed)
466 self.add_transition("iterate", "algorithms_completed", "init",
467 conditions=[self._require_iteration,
468 self._below_max_iterations],
469 after=self._increment_iteration)
470 self.add_transition("finish", "algorithms_completed", "completed",
471 conditions=self._no_require_iteration,
472 before=self._prepare_final_db)
473 self.add_transition("fail_fully", "algorithms_failed", "failed")
474 self.add_transition("fail_fully", "collector_failed", "failed")
475
476 def _update_cal_state(self, **kwargs):
477 self.calibration.state = str(kwargs["new_state"])
478
479 def files_containing_iov(self, file_paths, files_to_iovs, iov):
480 """
481 Lookup function that returns all files from the file_paths that
482 overlap with this IoV.
483 """
484 # Files that contain an Exp,Run range that overlaps with given IoV
485 overlapping_files = set()
486
487 for file_path, file_iov in files_to_iovs.items():
488 if file_iov.overlaps(iov) and (file_path in file_paths):
489 overlapping_files.add(file_path)
490 return overlapping_files
491
492 def _dump_job_config(self):
493 """
494 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered later in case of failure.
495 """
496 # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
497 # submits the jobs this might need to wait a while.
498 while any(map(lambda j: j.status == "init", self._collector_jobs.values())):
499 B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
500 time.sleep(5)
501
502 for collection_name, job in self._collector_jobs.items():
503 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
504 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
505 collection_name, collector_job_output_file_name)
506 job.dump_to_json(output_file)
507
508 def _recover_collector_jobs(self):
509 """
510 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
511 """
512 for collection_name, collection in self.calibration.collections.items():
513 output_file = self.root_dir.joinpath(str(self.iteration),
514 self.collector_input_dir,
515 collection_name,
516 collection.job_config)
517 self._collector_jobs[collection_name] = Job.from_json(output_file)
518
519 def _iov_requested(self):
520 """
521 """
522 if self.iov_to_calibrate:
523 B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
524 return True
525 else:
526 B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
527 return False
528
529 def _resolve_file_paths(self):
530 """
531 """
532 pass
533
534 def _build_iov_dicts(self):
535 """
536 Build IoV file dictionary for each collection if required.
537 """
538 iov_requested = self._iov_requested()
539 if iov_requested or self.calibration.ignored_runs:
540 for coll_name, collection in self.calibration.collections.items():
541 if not collection.files_to_iovs:
542 B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
543 f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
544 " Filling dictionary from input file metadata."
545 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
546
547 files_to_iovs = {}
548 for file_path in collection.input_files:
549 files_to_iovs[file_path] = get_iov_from_file(file_path)
550 collection.files_to_iovs = files_to_iovs
551 else:
552 B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
553 f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
554 else:
555 B2INFO("No File to IoV mapping required.")
556
557 def _below_max_iterations(self):
558 """
559 """
560 return self.iteration < self.calibration.max_iterations
561
562 def _increment_iteration(self):
563 """
564 """
565 self.iteration += 1
566 self.calibration.iteration = self.iteration
567
568 def _collection_completed(self):
569 """
570 Did all the collections succeed?
571 """
572 B2DEBUG(29, "Checking for failed collector job.")
573 if self._collector_jobs_ready():
574 return all([job.status == "completed" for job in self._collector_jobs.values()])
575
576 def _collection_failed(self):
577 """
578 Did any of the collections fail?
579 """
580 B2DEBUG(29, "Checking for failed collector job.")
581 if self._collector_jobs_ready():
582 return any([job.status == "failed" for job in self._collector_jobs.values()])
583
584 def _runner_not_failed(self):
585 """
586 Returns:
587 bool: If AlgorithmsRunner succeeded return True.
588 """
589 return not self._runner_failed()
590
591 def _runner_failed(self):
592 """
593 Returns:
594 bool: If AlgorithmsRunner failed return True.
595 """
596 if self._runner_final_state == AlgorithmsRunner.FAILED:
597 return True
598 else:
599 return False
600
601 def _collector_jobs_ready(self):
602 """
603 """
604 since_last_update = time.time() - self._collector_timing["last_update"]
605 if since_last_update > self.calibration.collector_full_update_interval:
606 B2INFO("Updating full collector job statuses.")
607 for job in self._collector_jobs.values():
608 job.update_status()
609 self._collector_timing["last_update"] = time.time()
610 if job.subjobs:
611 num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
612 total_subjobs = len(job.subjobs)
613 B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
614 f" Calibration {self.calibration.name} Job {job.name}.")
615 return all([job.ready() for job in self._collector_jobs.values()])
616
617 def _submit_collections(self):
618 """
619 """
620 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
621 self._collector_timing["start"] = time.time()
622 self._collector_timing["last_update"] = time.time()
623
624 def _no_require_iteration(self):
625 """
626 """
627 if self._require_iteration() and self._below_max_iterations():
628 return False
629 elif self._require_iteration() and not self._below_max_iterations():
630 B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
631 return True
632 elif not self._require_iteration():
633 return True
634
635 def _require_iteration(self):
636 """
637 """
638 iteration_called = False
639 for alg_name, results in self._algorithm_results[self.iteration].items():
640 for result in results:
641 if result.result == CalibrationAlgorithm.c_Iterate:
642 iteration_called = True
643 break
644 if iteration_called:
645 break
646 return iteration_called
647
648 def _log_new_state(self, **kwargs):
649 """
650 """
651 B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
652
653 def dependencies_completed(self):
654 """
655 Condition function to check that the dependencies of our calibration are in the 'completed' state.
656 Technically only need to check explicit dependencies.
657 """
658 for calibration in self.calibration.dependencies:
659 if not calibration.state == calibration.end_state:
660 return False
661 else:
662 return True
663
664 def automatic_transition(self):
665 """
666 Automatically try all transitions out of this state once. Tries fail last.
667 """
668 possible_transitions = self.get_transitions(self.state)
669 for transition in possible_transitions:
670 try:
671 if transition != "fail":
672 getattr(self, transition)()
673 break
674 except ConditionError:
675 continue
676 else:
677 if "fail" in possible_transitions:
678 getattr(self, "fail")()
679 else:
680 raise MachineError(f"Failed to automatically transition out of {self.state} state.")
681
682 def _make_output_dir(self):
683 """
684 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
685 Also creates s
686 """
687 create_directories(self.root_dir, overwrite=False)
688
689 def _make_collector_path(self, name, collection):
690 """
691 Creates a basf2 path for the correct collector and serializes it in the
692 self.output_dir/<calibration_name>/<iteration>/paths directory
693 """
694 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
695 # Automatically overwrite any previous directory
696 create_directories(path_output_dir)
697 path_file_name = collection.collector.name() + '.path'
698 path_file_name = path_output_dir / path_file_name
699 # Create empty path and add collector to it
700 coll_path = create_path()
701 coll_path.add_module(collection.collector)
702 # Dump the basf2 path to file
703 with open(path_file_name, 'bw') as serialized_path_file:
704 pickle.dump(serialize_path(coll_path), serialized_path_file)
705 # Return the pickle file path for addition to the input sandbox
706 return str(path_file_name.absolute())
707
708 def _make_pre_collector_path(self, name, collection):
709 """
710 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
711 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
712 """
713 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
714 coll_path = collection.pre_collector_path
715 path_file_name = 'pre_collector.path'
716 path_file_name = os.path.join(path_output_dir, path_file_name)
717 # Dump the basf2 path to file
718 with open(path_file_name, 'bw') as serialized_path_file:
719 pickle.dump(serialize_path(coll_path), serialized_path_file)
720 # Return the pickle file path for addition to the input sandbox
721 return path_file_name
722
723 def _create_collector_jobs(self):
724 """
725 Creates a Job object for the collections of this iteration, ready for submission
726 to backend.
727 """
728 for collection_name, collection in self.calibration.collections.items():
729 iteration_dir = self.root_dir.joinpath(str(self.iteration))
730 job = Job('_'.join([self.calibration.name, collection_name, 'Iteration', str(self.iteration)]))
731 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
732 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
733 # Remove previous failed attempt to avoid problems
734 if job.output_dir.exists():
735 B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
736 f"Deleting {job.output_dir} before re-submitting.")
737 shutil.rmtree(job.output_dir)
738 job.cmd = collection.job_cmd
739 job.append_current_basf2_setup_cmds()
740 job.input_sandbox_files.append(collection.job_script)
741 collector_path_file = Path(self._make_collector_path(collection_name, collection))
742 job.input_sandbox_files.append(collector_path_file)
743 if collection.pre_collector_path:
744 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
745 job.input_sandbox_files.append(pre_collector_path_file)
746
747 # Want to figure out which local databases are required for this job and their paths
748 list_dependent_databases = []
749
750 # Here we add the finished databases of previous calibrations that we depend on.
751 # We can assume that the databases exist as we can't be here until they have returned
752 for dependency in self.calibration.dependencies:
753 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
754 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
755 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
756
757 # Add previous iteration databases from this calibration
758 if self.iteration > 0:
759 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
760 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
761 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
762 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
763
764 # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
765 # collector path
766 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
767
768 # Need to pass setup info to collector which would be tricky as arguments
769 # We make a dictionary and pass it in as json
770 job_config = {}
771 # Apply the user-set Calibration database chain to the base of the overall chain.
772 json_db_chain = []
773 for database in collection.database_chain:
774 if database.db_type == 'local':
775 json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
776 elif database.db_type == 'central':
777 json_db_chain.append(('central', database.global_tag))
778 else:
779 raise ValueError(f"Unknown database type {database.db_type}.")
780 # CAF created ones for dependent calibrations and previous iterations of this calibration
781 for database in list_dependent_databases:
782 json_db_chain.append(('local', database))
783 job_config['database_chain'] = json_db_chain
784
785 job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
786 with open(job_config_file_path, 'w') as job_config_file:
787 json.dump(job_config, job_config_file, indent=2)
788 job.input_sandbox_files.append(job_config_file_path)
789
790 # Define the input files
791 input_data_files = set(collection.input_files)
792 # Reduce the input data files to only those that overlap with the optional requested IoV
793 if self.iov_to_calibrate:
794 input_data_files = self.files_containing_iov(input_data_files,
795 collection.files_to_iovs,
796 self.iov_to_calibrate)
797 # Remove any files that ONLY contain runs from our optional ignored_runs list
798 files_to_ignore = set()
799 for exprun in self.calibration.ignored_runs:
800 for input_file in input_data_files:
801 file_iov = self.calibration.files_to_iovs[input_file]
802 if file_iov == exprun.make_iov():
803 B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
804 f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
805 "is being removed from input files list.")
806 files_to_ignore.add(input_file)
807 input_data_files.difference_update(files_to_ignore)
808
809 if not input_data_files:
810 raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
811 f" and Collection '{collection_name}'.")
812 job.input_files = list(input_data_files)
813
814 job.splitter = collection.splitter
815 job.backend_args = collection.backend_args
816 # Output patterns to be returned from collector job
817 job.output_patterns = collection.output_patterns
818 B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
819 self._collector_jobs[collection_name] = job
820
821 def _check_valid_collector_output(self):
822 B2INFO("Checking that Collector output exists for all collector jobs "
823 f"using {self.calibration.name}.output_patterns.")
824 if not self._collector_jobs:
825 B2INFO("We're restarting so we'll recreate the collector Job object.")
826 self._recover_collector_jobs()
827
828 for job in self._collector_jobs.values():
829 if not job.subjobs:
830 output_files = []
831 for pattern in job.output_patterns:
832 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
833 if not output_files:
834 raise MachineError("No output files from Collector Job")
835 else:
836 for subjob in job.subjobs.values():
837 output_files = []
838 for pattern in subjob.output_patterns:
839 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
840 if not output_files:
841 raise MachineError(f"No output files from Collector {subjob}")
842
843 def _run_algorithms(self):
844 """
845 Runs the Calibration Algorithms for this calibration machine.
846
847 Will run them sequentially locally (possible benefits to using a
848 processing pool for low memory algorithms later on.)
849 """
850 # Get an instance of the Runner for these algorithms and run it
851 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
852 algs_runner.algorithms = self.calibration.algorithms
853 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
854 output_database_dir = algorithm_output_dir.joinpath("outputdb")
855 # Remove it, if we failed previously, to start clean
856 if algorithm_output_dir.exists():
857 B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
858 f"Deleting and recreating {algorithm_output_dir}.")
859 create_directories(algorithm_output_dir)
860 B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
861 algs_runner.output_database_dir = output_database_dir
862 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
863 input_files = []
864
865 for job in self._collector_jobs.values():
866 if job.subjobs:
867 for subjob in job.subjobs.values():
868 for pattern in subjob.output_patterns:
869 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
870 else:
871 for pattern in job.output_patterns:
872 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
873
874 algs_runner.input_files = input_files
875
876 # Add any user defined database chain for this calibration
877 algs_runner.database_chain = self.calibration.database_chain
878
879 # Here we add the finished databases of previous calibrations that we depend on.
880 # We can assume that the databases exist as we can't be here until they have returned
881 list_dependent_databases = []
882 for dependency in self.calibration.dependencies:
883 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
884 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
885 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
886
887 # Add previous iteration databases from this calibration
888 if self.iteration > 0:
889 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
890 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
891 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
892 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
893 algs_runner.dependent_databases = list_dependent_databases
894
895 algs_runner.ignored_runs = self.calibration.ignored_runs
896
897 try:
898 algs_runner.run(self.iov_to_calibrate, self.iteration)
899 except Exception as err:
900 print(err)
901 # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
902 # results. But here we had an actual exception so we just force into failure instead.
903 self._state = State("algorithms_failed")
904 self._algorithm_results[self.iteration] = algs_runner.results
905 self._runner_final_state = algs_runner.final_state
906
907 def _prepare_final_db(self):
908 """
909 Take the last iteration's outputdb and copy it to a more easily findable place.
910 """
911 database_location = self.root_dir.joinpath(str(self.iteration),
912 self.calibration.alg_output_dir,
913 'outputdb')
914 final_database_location = self.root_dir.joinpath('outputdb')
915 if final_database_location.exists():
916 B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
917 shutil.rmtree(final_database_location)
918 shutil.copytree(database_location, final_database_location)
919
920
921class AlgorithmMachine(Machine):
922 """
923 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
924 """
925
926
928 required_attrs = ["algorithm",
929 "dependent_databases",
930 "database_chain",
931 "output_dir",
932 "output_database_dir",
933 "input_files"
934 ]
935
936
937 required_true_attrs = ["algorithm",
938 "output_dir",
939 "output_database_dir",
940 "input_files"
941 ]
942
943 def __init__(self, algorithm=None, initial_state="init"):
944 """
945 Takes an Algorithm object from the caf framework and defines the transitions.
946 """
947
948 self.default_states = [State("init"),
949 State("ready"),
950 State("running_algorithm"),
951 State("completed"),
952 State("failed")]
953
954 super().__init__(self.default_states, initial_state)
955
956
957 self.algorithm = algorithm
958
959 self.input_files = []
960
961 self.dependent_databases = []
962
964 self.database_chain = []
965
966 self.output_dir = ""
967
968 self.output_database_dir = ""
969
970 self.result = None
971
972 self.add_transition("setup_algorithm", "init", "ready",
973 before=[self._setup_logging,
974 self._change_working_dir,
975 self._setup_database_chain,
976 self._set_input_data,
977 self._pre_algorithm])
978 self.add_transition("execute_runs", "ready", "running_algorithm",
979 after=self._execute_over_iov)
980 self.add_transition("complete", "running_algorithm", "completed")
981 self.add_transition("fail", "running_algorithm", "failed")
982 self.add_transition("fail", "ready", "failed")
983 self.add_transition("setup_algorithm", "completed", "ready")
984 self.add_transition("setup_algorithm", "failed", "ready")
985
986 def setup_from_dict(self, params):
987 """
988 Parameters:
989 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name. """
990 for attribute_name, value in params.items():
991 setattr(self, attribute_name, value)
992
993 def is_valid(self):
994 """
995 Returns:
996 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
997 """
998 B2INFO(f"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
999 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
1000 for attribute_name in self.required_attrs:
1001 if not hasattr(self, attribute_name):
1002 B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1003 return False
1004 # Check if any attributes that need actual values haven't been set or were empty
1005 for attribute_name in self.required_true_attrs:
1006 if not getattr(self, attribute_name):
1007 B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1008 return False
1009 return True
1010
1011 def _create_output_dir(self, **kwargs):
1012 """
1013 Create working/output directory of algorithm. Any old directory is overwritten.
1014 """
1015 create_directories(Path(self.output_dir), overwrite=True)
1016
1017 def _setup_database_chain(self, **kwargs):
1018 """
1019 Apply all databases in the correct order.
1020 """
1021 # We deliberately override the normal database ordering because we don't want input files GTs to affect
1022 # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1023 b2conditions.reset()
1024 b2conditions.override_globaltags()
1025
1026 # Apply all the databases in order, starting with the user-set chain for this Calibration
1027 for database in self.database_chain:
1028 if database.db_type == 'local':
1029 B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1030 f"for {self.algorithm.name}.")
1031 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1032 elif database.db_type == 'central':
1033 B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1034 f"for {self.algorithm.name}.")
1035 b2conditions.prepend_globaltag(database.global_tag)
1036 else:
1037 raise ValueError(f"Unknown database type {database.db_type}.")
1038 # Here we add the finished databases of previous calibrations that we depend on.
1039 # We can assume that the databases exist as we can't be here until they have returned
1040 # with OK status.
1041 for filename, directory in self.dependent_databases:
1042 B2INFO(f"Adding Local Database {filename} to head of chain of local databases created by"
1043 f" a dependent calibration, for {self.algorithm.name}.")
1044 b2conditions.prepend_testing_payloads(filename)
1045
1046 # Create a directory to store the payloads of this algorithm
1047 create_directories(Path(self.output_database_dir), overwrite=False)
1048
1049 # add local database to save payloads
1050 B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1051 # Things have changed. We now need to do the expert settings to create a database directly.
1052 # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1053 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath("database.txt")))
1054
1055 def _setup_logging(self, **kwargs):
1056 """
1057 """
1058 # add logfile for output
1059 log_file = os.path.join(self.output_dir, self.algorithm.name + '_stdout')
1060 B2INFO(f"Output log file at {log_file}.")
1061 basf2.reset_log()
1062 basf2.set_log_level(basf2.LogLevel.INFO)
1063 basf2.log_to_file(log_file)
1064
1065 def _change_working_dir(self, **kwargs):
1066 """
1067 """
1068 B2INFO(f"Changing current working directory to {self.output_dir}.")
1069 os.chdir(self.output_dir)
1070
1071 def _pre_algorithm(self, **kwargs):
1072 """
1073 Call the user defined algorithm setup function.
1074 """
1075 B2INFO("Running Pre-Algorithm function (if exists)")
1076 if self.algorithm.pre_algorithm:
1077 # We have to re-pass in the algorithm here because an outside user has created this method.
1078 # So the method isn't bound to the instance properly.
1079 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs["iteration"])
1080
1081 def _execute_over_iov(self, **kwargs):
1082 """
1083 Does the actual execute of the algorithm on an IoV and records the result.
1084 """
1085 B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1086
1087 runs_to_execute = kwargs["runs"]
1088 iov = kwargs["apply_iov"]
1089 iteration = kwargs["iteration"]
1090 if not iov:
1091 iov = iov_from_runs(runs_to_execute)
1092 B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1093 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1094 self.result = IoV_Result(iov, alg_result)
1095
1096 def _set_input_data(self, **kwargs):
1097 self.algorithm.data_input(self.input_files)
1098
1099
1100class MachineError(Exception):
1101 """
1102 Base exception class for this module.
1103 """
1104
1105
1106class ConditionError(MachineError):
1107 """
1108 Exception for when conditions fail during a transition.
1109 """
1110
1111
1112class TransitionError(MachineError):
1113 """
1114 Exception for when transitions fail.
1115 """
1116
1117# @endcond
1118