Belle II Software development
state_machines.py
1#!/usr/bin/env python3
2
3
10
11import basf2
12
13from functools import partial
14from collections import defaultdict
15
16import pickle
17import glob
18import shutil
19import time
20from pathlib import Path
21import os
22import json
23
24from basf2 import create_path
25from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
26from basf2 import conditions as b2conditions
27from basf2.pickle_path import serialize_path
28
29from ROOT import Belle2 # noqa: make the Belle2 namespace available
30from ROOT.Belle2 import CalibrationAlgorithm
31
32from caf.utils import create_directories
33from caf.utils import method_dispatch
34from caf.utils import iov_from_runs
35from caf.utils import IoV_Result
36from caf.utils import get_iov_from_file
37from caf.backends import Job
38from caf.runners import AlgorithmsRunner
39
40
41class State():
42 """
43 Basic State object that can take enter and exit state methods and records
44 the state of a machine.
45
46 You should assign the self.on_enter or self.on_exit attributes to callback functions
47 or lists of them, if you need them.
48 """
49
50 def __init__(self, name, enter=None, exit=None):
51 """
52 Initialise State with a name and optional lists of callbacks.
53 """
54
55 self.name = name
56
57 self.on_enter = enter
58
59 self.on_exit = exit
60
61 @property
62 def on_enter(self):
63 """
64 Runs callbacks when a state is entered.
65 """
66 return self._on_enter
67
68 @on_enter.setter
69 def on_enter(self, callbacks):
70 """
71 """
72
73 self._on_enter = []
74 if callbacks:
75 self._add_callbacks(callbacks, self._on_enter)
76
77 @property
78 def on_exit(self):
79 """
80 Runs callbacks when a state is exited.
81 """
82 return self._on_exit
83
84 @on_exit.setter
85 def on_exit(self, callbacks):
86 """
87 """
88
89 self._on_exit = []
90 if callbacks:
91 self._add_callbacks(callbacks, self._on_exit)
92
93 @method_dispatch
94 def _add_callbacks(self, callback, attribute):
95 """
96 Adds callback to a property.
97 """
98 if callable(callback):
99 attribute.append(callback)
100 else:
101 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
102
103 @_add_callbacks.register(tuple)
104 @_add_callbacks.register(list)
105 def _(self, callbacks, attribute):
106 """
107 Alternate method for lists and tuples of function objects.
108 """
109 if callbacks:
110 for function in callbacks:
111 if callable(function):
112 attribute.append(function)
113 else:
114 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
115
116 def __str__(self):
117 """
118 """
119 return self.name
120
121 def __repr__(self):
122 """
123 """
124 return f"State(name={self.name})"
125
126 def __eq__(self, other):
127 """
128 """
129 if isinstance(other, str):
130 return self.name == other
131 else:
132 return self.name == other.name
133
134 def __hash__(self):
135 """
136 """
137 return hash(self.name)
138
139
140class Machine():
141 """
142 Parameters:
143 states (list[str]): A list of possible states of the machine.
144 initial_state (str):
145
146 Base class for a final state machine wrapper.
147 Implements the framework that a more complex machine can inherit from.
148
149 The `transitions` attribute is a dictionary of trigger name keys, each value of
150 which is another dictionary of 'source' states, 'dest' states, and 'conditions'
151 methods. 'conditions' should be a list of callables or a single one. A transition is
152 valid if it goes from an allowed state to an allowed state.
153 Conditions are optional but must be a callable that returns True or False based
154 on some state of the machine. They cannot have input arguments currently.
155
156 Every condition/before/after callback function MUST take ``**kwargs`` as the only
157 argument (except ``self`` if it's a class method). This is because it's basically
158 impossible to determine which arguments to pass to which functions for a transition.
159 Therefore this machine just enforces that every function should simply take ``**kwargs``
160 and use the dictionary of arguments (even if it doesn't need any arguments).
161
162 This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
163 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
164 will *not* work.
165 """
166
167 def __init__(self, states=None, initial_state="default_initial"):
168 """
169 Basic Setup of states and initial_state.
170 """
171
172 self.states = {}
173 if states:
174 for state in states:
175 self.add_state(state)
176 if initial_state != "default_initial":
177
178 self.initial_state = initial_state
179 else:
180 self.add_state(initial_state)
181
182 self._initial_state = State(initial_state)
183
184
186
187 self.transitions = defaultdict(list)
188
189 def add_state(self, state, enter=None, exit=None):
190 """
191 Adds a single state to the list of possible ones.
192 Should be a unique string or a State object with a unique name.
193 """
194 if isinstance(state, str):
195 self.add_state(State(state, enter, exit))
196 elif isinstance(state, State):
197 if state.name not in self.states.keys():
198 self.states[state.name] = state
199 else:
200 B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
201 else:
202 B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
203
204 @property
205 def initial_state(self):
206 """
207 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
208 """
209 return self._initial_state
210
211 @initial_state.setter
212 def initial_state(self, state):
213 """
214 """
215 if state in self.states.keys():
216 self._initial_state = self.states[state]
217
218 self._state = self.states[state]
219 else:
220 raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
221
222 @property
223 def state(self):
224 """
225 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
226 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
227 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
228 """
229 return self._state
230
231 @state.setter
232 def state(self, state):
233 """
234 """
235 if isinstance(state, str):
236 state_name = state
237 else:
238 state_name = state.name
239
240 try:
241 state = self.states[state_name]
242 # Run exit callbacks of current state
243 for callback in self.state.on_exit:
244 callback(prior_state=self.state, new_state=state)
245 # Run enter callbacks of new state
246 for callback in state.on_enter:
247 callback(prior_state=self.state, new_state=state)
248 # Set the state
249 self._state = state
250 except KeyError:
251 raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
252
253 @staticmethod
254 def default_condition(**kwargs):
255 """
256 Method to always return True.
257 """
258 return True
259
260 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
261 """
262 Adds a single transition to the dictionary of possible ones.
263 Trigger is the method name that begins the transition between the
264 source state and the destination state.
265
266 The condition is an optional function that returns True or False
267 depending on the current state/input.
268 """
269 transition_dict = {}
270 try:
271 source = self.states[source]
272 dest = self.states[dest]
273 transition_dict["source"] = source
274 transition_dict["dest"] = dest
275 except KeyError as err:
276 B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
277 raise err
278 if conditions:
279 if isinstance(conditions, (list, tuple, set)):
280 transition_dict["conditions"] = list(conditions)
281 else:
282 transition_dict["conditions"] = [conditions]
283 else:
284 transition_dict["conditions"] = [Machine.default_condition]
285
286 if not before:
287 before = []
288 if isinstance(before, (list, tuple, set)):
289 transition_dict["before"] = list(before)
290 else:
291 transition_dict["before"] = [before]
292
293 if not after:
294 after = []
295 if isinstance(after, (list, tuple, set)):
296 transition_dict["after"] = list(after)
297 else:
298 transition_dict["after"] = [after]
299
300 self.transitions[trigger].append(transition_dict)
301
302 def __getattr__(self, name, **kwargs):
303 """
304 Allows us to create a new method for each trigger on the fly.
305 If there is no trigger name in the machine to match, then the normal
306 AttributeError is called.
307 """
308 possible_transitions = self.get_transitions(self.state)
309 if name not in possible_transitions:
310 raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
311 transition_dict = self.get_transition_dict(self.state, name)
312 # \cond silence doxygen warning about _trigger
313 return partial(self._trigger, name, transition_dict, **kwargs)
314 # \endcond
315
316 def _trigger(self, transition_name, transition_dict, **kwargs):
317 """
318 Runs the transition logic. Callbacks are evaluated in the order:
319 conditions -> before -> <new state set here> -> after.
320 """
321 dest, conditions, before_callbacks, after_callbacks = (
322 transition_dict["dest"],
323 transition_dict["conditions"],
324 transition_dict["before"],
325 transition_dict["after"]
326 )
327 # Returns True only if every condition returns True when called
328 if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
329 for before_func in before_callbacks:
330 self._callback(before_func, **kwargs)
331
332 self.state = dest
333 for after_func in after_callbacks:
334 self._callback(after_func, **kwargs)
335 else:
336 raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
337 "evaluated False")
338
339 @staticmethod
340 def _callback(func, **kwargs):
341 """
342 Calls a condition/before/after.. function using arguments passed (or not).
343 """
344 return func(**kwargs)
345
346 def get_transitions(self, source):
347 """
348 Returns allowed transitions from a given state.
349 """
350 possible_transitions = []
351 for transition, transition_dicts in self.transitions.items():
352 for transition_dict in transition_dicts:
353 if transition_dict["source"] == source:
354 possible_transitions.append(transition)
355 return possible_transitions
356
357 def get_transition_dict(self, state, transition):
358 """
359 Returns the transition dictionary for a state and transition out of it.
360 """
361 transition_dicts = self.transitions[transition]
362 for transition_dict in transition_dicts:
363 if transition_dict["source"] == state:
364 return transition_dict
365 else:
366 raise KeyError(f"No transition from state {state} with the name {transition}.")
367
368 def save_graph(self, filename, graphname):
369 """
370 Does a simple dot file creation to visualise states and transiitons.
371 """
372 with open(filename, "w") as dotfile:
373 dotfile.write("digraph " + graphname + " {\n")
374 for state in self.states.keys():
375 dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
376 for trigger, transition_dicts in self.transitions.items():
377 for transition in transition_dicts:
378 dotfile.write('"' + transition["source"].name + '" -> "' +
379 transition["dest"].name + '" [label="' + trigger + '"]\n')
380 dotfile.write("}\n")
381
382
384 """
385 A state machine to handle `Calibration` objects and the flow of
386 processing for them.
387 """
388
389
390 collector_input_dir = 'collector_input'
391
392 collector_output_dir = 'collector_output'
393
394 algorithm_output_dir = 'algorithm_output'
395
396 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
397 """
398 Takes a Calibration object from the caf framework and lets you
399 set the initial state.
400 """
401
402 self.default_states = [State("init", enter=[self._update_cal_state,
403 self._log_new_state]),
404 State("running_collector", enter=[self._update_cal_state,
405 self._log_new_state]),
406 State("collector_failed", enter=[self._update_cal_state,
407 self._log_new_state]),
408 State("collector_completed", enter=[self._update_cal_state,
409 self._log_new_state]),
410 State("running_algorithms", enter=[self._update_cal_state,
411 self._log_new_state]),
412 State("algorithms_failed", enter=[self._update_cal_state,
413 self._log_new_state]),
414 State("algorithms_completed", enter=[self._update_cal_state,
415 self._log_new_state]),
416 State("completed", enter=[self._update_cal_state,
417 self._log_new_state]),
418 State("failed", enter=[self._update_cal_state,
419 self._log_new_state])
420 ]
421
422 super().__init__(self.default_states, initial_state)
423
424
425 self.calibration = calibration
426 # Monkey Patching for the win!
427
428 self.calibration.machine = self
429
430 self.iteration = iteration
431
433
435
437
438 self.iov_to_calibrate = iov_to_calibrate
439
440 self.root_dir = Path(os.getcwd(), calibration.name)
441
442
446
447
449
450 self.add_transition("submit_collector", "init", "running_collector",
451 conditions=self.dependencies_completed,
452 before=[self._make_output_dir,
454 self._build_iov_dicts,
457 self._dump_job_config])
458 self.add_transition("fail", "running_collector", "collector_failed",
459 conditions=self._collection_failed)
460 self.add_transition("complete", "running_collector", "collector_completed",
461 conditions=self._collection_completed)
462 self.add_transition("run_algorithms", "collector_completed", "running_algorithms",
464 after=[self._run_algorithms,
466 self.add_transition("complete", "running_algorithms", "algorithms_completed",
467 after=self.automatic_transition,
468 conditions=self._runner_not_failed)
469 self.add_transition("fail", "running_algorithms", "algorithms_failed",
470 conditions=self._runner_failed)
471 self.add_transition("iterate", "algorithms_completed", "init",
472 conditions=[self._require_iteration,
474 after=self._increment_iteration)
475 self.add_transition("finish", "algorithms_completed", "completed",
476 conditions=self._no_require_iteration,
477 before=self._prepare_final_db)
478 self.add_transition("fail_fully", "algorithms_failed", "failed")
479 self.add_transition("fail_fully", "collector_failed", "failed")
480
481 def _update_cal_state(self, **kwargs):
482 """update calibration state"""
483 self.calibration.state = str(kwargs["new_state"])
484
485 def files_containing_iov(self, file_paths, files_to_iovs, iov):
486 """
487 Lookup function that returns all files from the file_paths that
488 overlap with this IoV.
489 """
490 # Files that contain an Exp,Run range that overlaps with given IoV
491 overlapping_files = set()
492
493 for file_path, file_iov in files_to_iovs.items():
494 if file_iov.overlaps(iov) and (file_path in file_paths):
495 overlapping_files.add(file_path)
496 return overlapping_files
497
499 """
500 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
501 later in case of failure.
502 """
503 # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
504 # submits the jobs this might need to wait a while.
505 while any(map(lambda j: j.status == "init", self._collector_jobs.values())):
506 B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
507 time.sleep(5)
508
509 for collection_name, job in self._collector_jobs.items():
510 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
511 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
512 collection_name, collector_job_output_file_name)
513 job.dump_to_json(output_file)
514
516 """
517 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
518 """
519 for collection_name, collection in self.calibration.collections.items():
520 output_file = self.root_dir.joinpath(str(self.iteration),
522 collection_name,
523 collection.job_config)
524 self._collector_jobs[collection_name] = Job.from_json(output_file)
525
526 def _iov_requested(self):
527 """
528 """
529 if self.iov_to_calibrate:
530 B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
531 return True
532 else:
533 B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
534 return False
535
537 """
538 """
539 pass
540
542 """
543 Build IoV file dictionary for each collection if required.
544 """
545 iov_requested = self._iov_requested()
546 if iov_requested or self.calibration.ignored_runs:
547 for coll_name, collection in self.calibration.collections.items():
548 if not collection.files_to_iovs:
549 B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
550 f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
551 " Filling dictionary from input file metadata."
552 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
553
554 files_to_iovs = {}
555 for file_path in collection.input_files:
556 files_to_iovs[file_path] = get_iov_from_file(file_path)
557 collection.files_to_iovs = files_to_iovs
558 else:
559 B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
560 f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
561 else:
562 B2INFO("No File to IoV mapping required.")
563
565 """
566 """
567 return self.iteration < self.calibration.max_iterations
568
570 """
571 """
572 self.iteration += 1
573 self.calibration.iteration = self.iteration
574
576 """
577 Did all the collections succeed?
578 """
579 B2DEBUG(29, "Checking for failed collector job.")
580 if self._collector_jobs_ready():
581 return all([job.status == "completed" for job in self._collector_jobs.values()])
582
584 """
585 Did any of the collections fail?
586 """
587 B2DEBUG(29, "Checking for failed collector job.")
588 if self._collector_jobs_ready():
589 return any([job.status == "failed" for job in self._collector_jobs.values()])
590
592 """
593 Returns:
594 bool: If AlgorithmsRunner succeeded return True.
595 """
596 return not self._runner_failed()
597
598 def _runner_failed(self):
599 """
600 Returns:
601 bool: If AlgorithmsRunner failed return True.
602 """
603 if self._runner_final_state == AlgorithmsRunner.FAILED:
604 return True
605 else:
606 return False
607
609 """
610 """
611 since_last_update = time.time() - self._collector_timing["last_update"]
612 if since_last_update > self.calibration.collector_full_update_interval:
613 B2INFO("Updating full collector job statuses.")
614 for job in self._collector_jobs.values():
615 job.update_status()
616 self._collector_timing["last_update"] = time.time()
617 if job.subjobs:
618 num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
619 total_subjobs = len(job.subjobs)
620 B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
621 f" Calibration {self.calibration.name} Job {job.name}.")
622 return all([job.ready() for job in self._collector_jobs.values()])
623
625 """
626 """
627 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
628 self._collector_timing["start"] = time.time()
629 self._collector_timing["last_update"] = time.time()
630
632 """
633 """
634 if self._require_iteration() and self._below_max_iterations():
635 return False
636 elif self._require_iteration() and not self._below_max_iterations():
637 B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
638 return True
639 elif not self._require_iteration():
640 return True
641
643 """
644 """
645 iteration_called = False
646 for alg_name, results in self._algorithm_results[self.iteration].items():
647 for result in results:
648 if result.result == CalibrationAlgorithm.c_Iterate:
649 iteration_called = True
650 break
651 if iteration_called:
652 break
653 return iteration_called
654
655 def _log_new_state(self, **kwargs):
656 """
657 """
658 B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
659
661 """
662 Condition function to check that the dependencies of our calibration are in the 'completed' state.
663 Technically only need to check explicit dependencies.
664 """
665 for calibration in self.calibration.dependencies:
666 if not calibration.state == calibration.end_state:
667 return False
668 else:
669 return True
670
672 """
673 Automatically try all transitions out of this state once. Tries fail last.
674 """
675 possible_transitions = self.get_transitions(self.state)
676 for transition in possible_transitions:
677 try:
678 if transition != "fail":
679 getattr(self, transition)()
680 break
681 except ConditionError:
682 continue
683 else:
684 if "fail" in possible_transitions:
685 getattr(self, "fail")()
686 else:
687 raise MachineError(f"Failed to automatically transition out of {self.state} state.")
688
690 """
691 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
692 Also creates s
693 """
694 create_directories(self.root_dir, overwrite=False)
695
696 def _make_collector_path(self, name, collection):
697 """
698 Creates a basf2 path for the correct collector and serializes it in the
699 self.output_dir/<calibration_name>/<iteration>/paths directory
700 """
701 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
702 # Automatically overwrite any previous directory
703 create_directories(path_output_dir)
704 path_file_name = collection.collector.name() + '.path'
705 path_file_name = path_output_dir / path_file_name
706 # Create empty path and add collector to it
707 coll_path = create_path()
708 coll_path.add_module(collection.collector)
709 # Dump the basf2 path to file
710 with open(path_file_name, 'bw') as serialized_path_file:
711 pickle.dump(serialize_path(coll_path), serialized_path_file)
712 # Return the pickle file path for addition to the input sandbox
713 return str(path_file_name.absolute())
714
715 def _make_pre_collector_path(self, name, collection):
716 """
717 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
718 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
719 """
720 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
721 coll_path = collection.pre_collector_path
722 path_file_name = 'pre_collector.path'
723 path_file_name = os.path.join(path_output_dir, path_file_name)
724 # Dump the basf2 path to file
725 with open(path_file_name, 'bw') as serialized_path_file:
726 pickle.dump(serialize_path(coll_path), serialized_path_file)
727 # Return the pickle file path for addition to the input sandbox
728 return path_file_name
729
731 """
732 Creates a Job object for the collections of this iteration, ready for submission
733 to backend.
734 """
735 for collection_name, collection in self.calibration.collections.items():
736 iteration_dir = self.root_dir.joinpath(str(self.iteration))
737 job = Job('_'.join([self.calibration.name, collection_name, 'Iteration', str(self.iteration)]))
738 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
739 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
740 # Remove previous failed attempt to avoid problems
741 if job.output_dir.exists():
742 B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
743 f"Deleting {job.output_dir} before re-submitting.")
744 shutil.rmtree(job.output_dir)
745 job.cmd = collection.job_cmd
746 job.append_current_basf2_setup_cmds()
747 job.input_sandbox_files.append(collection.job_script)
748 collector_path_file = Path(self._make_collector_path(collection_name, collection))
749 job.input_sandbox_files.append(collector_path_file)
750 if collection.pre_collector_path:
751 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
752 job.input_sandbox_files.append(pre_collector_path_file)
753
754 # Want to figure out which local databases are required for this job and their paths
755 list_dependent_databases = []
756
757 # Here we add the finished databases of previous calibrations that we depend on.
758 # We can assume that the databases exist as we can't be here until they have returned
759 for dependency in self.calibration.dependencies:
760 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
761 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
762 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
763
764 # Add previous iteration databases from this calibration
765 if self.iteration > 0:
766 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
767 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
768 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
769 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
770
771 # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
772 # collector path
773 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
774
775 # Need to pass setup info to collector which would be tricky as arguments
776 # We make a dictionary and pass it in as json
777 job_config = {}
778 # Apply the user-set Calibration database chain to the base of the overall chain.
779 json_db_chain = []
780 for database in collection.database_chain:
781 if database.db_type == 'local':
782 json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
783 elif database.db_type == 'central':
784 json_db_chain.append(('central', database.global_tag))
785 else:
786 raise ValueError(f"Unknown database type {database.db_type}.")
787 # CAF created ones for dependent calibrations and previous iterations of this calibration
788 for database in list_dependent_databases:
789 json_db_chain.append(('local', database))
790 job_config['database_chain'] = json_db_chain
791
792 job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
793 with open(job_config_file_path, 'w') as job_config_file:
794 json.dump(job_config, job_config_file, indent=2)
795 job.input_sandbox_files.append(job_config_file_path)
796
797 # Define the input files
798 input_data_files = set(collection.input_files)
799 # Reduce the input data files to only those that overlap with the optional requested IoV
800 if self.iov_to_calibrate:
801 input_data_files = self.files_containing_iov(input_data_files,
802 collection.files_to_iovs,
803 self.iov_to_calibrate)
804 # Remove any files that ONLY contain runs from our optional ignored_runs list
805 files_to_ignore = set()
806 for exprun in self.calibration.ignored_runs:
807 for input_file in input_data_files:
808 file_iov = self.calibration.files_to_iovs[input_file]
809 if file_iov == exprun.make_iov():
810 B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
811 f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
812 "is being removed from input files list.")
813 files_to_ignore.add(input_file)
814 input_data_files.difference_update(files_to_ignore)
815
816 if not input_data_files:
817 raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
818 f" and Collection '{collection_name}'.")
819 job.input_files = list(input_data_files)
820
821 job.splitter = collection.splitter
822 job.backend_args = collection.backend_args
823 # Output patterns to be returned from collector job
824 job.output_patterns = collection.output_patterns
825 B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
826 self._collector_jobs[collection_name] = job
827
829 """check that collector output is valid"""
830 B2INFO("Checking that Collector output exists for all collector jobs "
831 f"using {self.calibration.name}.output_patterns.")
832 if not self._collector_jobs:
833 B2INFO("We're restarting so we'll recreate the collector Job object.")
835
836 for job in self._collector_jobs.values():
837 if not job.subjobs:
838 output_files = []
839 for pattern in job.output_patterns:
840 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
841 if not output_files:
842 raise MachineError("No output files from Collector Job")
843 else:
844 for subjob in job.subjobs.values():
845 output_files = []
846 for pattern in subjob.output_patterns:
847 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
848 if not output_files:
849 raise MachineError(f"No output files from Collector {subjob}")
850
852 """
853 Runs the Calibration Algorithms for this calibration machine.
854
855 Will run them sequentially locally (possible benefits to using a
856 processing pool for low memory algorithms later on.)
857 """
858 # Get an instance of the Runner for these algorithms and run it
859 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
860 algs_runner.algorithms = self.calibration.algorithms
861 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
862 output_database_dir = algorithm_output_dir.joinpath("outputdb")
863 # Remove it, if we failed previously, to start clean
864 if algorithm_output_dir.exists():
865 B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
866 f"Deleting and recreating {algorithm_output_dir}.")
867 create_directories(algorithm_output_dir)
868 B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
869 algs_runner.output_database_dir = output_database_dir
870 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
871 input_files = []
872
873 for job in self._collector_jobs.values():
874 if job.subjobs:
875 for subjob in job.subjobs.values():
876 for pattern in subjob.output_patterns:
877 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
878 else:
879 for pattern in job.output_patterns:
880 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
881
882 algs_runner.input_files = input_files
883
884 # Add any user defined database chain for this calibration
885 algs_runner.database_chain = self.calibration.database_chain
886
887 # Here we add the finished databases of previous calibrations that we depend on.
888 # We can assume that the databases exist as we can't be here until they have returned
889 list_dependent_databases = []
890 for dependency in self.calibration.dependencies:
891 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
892 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
893 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
894
895 # Add previous iteration databases from this calibration
896 if self.iteration > 0:
897 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
898 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
899 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
900 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
901 algs_runner.dependent_databases = list_dependent_databases
902
903 algs_runner.ignored_runs = self.calibration.ignored_runs
904
905 try:
906 algs_runner.run(self.iov_to_calibrate, self.iteration)
907 except Exception as err:
908 print(err)
909 # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
910 # results. But here we had an actual exception so we just force into failure instead.
911 self._state = State("algorithms_failed")
912 self._algorithm_results[self.iteration] = algs_runner.results
913 self._runner_final_state = algs_runner.final_state
914
916 """
917 Take the last iteration's outputdb and copy it to a more easily findable place.
918 """
919 database_location = self.root_dir.joinpath(str(self.iteration),
920 self.calibration.alg_output_dir,
921 'outputdb')
922 final_database_location = self.root_dir.joinpath('outputdb')
923 if final_database_location.exists():
924 B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
925 shutil.rmtree(final_database_location)
926 shutil.copytree(database_location, final_database_location)
927
928
930 """
931 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
932 """
933
934
936 required_attrs = ["algorithm",
937 "dependent_databases",
938 "database_chain",
939 "output_dir",
940 "output_database_dir",
941 "input_files"
942 ]
943
944
945 required_true_attrs = ["algorithm",
946 "output_dir",
947 "output_database_dir",
948 "input_files"
949 ]
950
951 def __init__(self, algorithm=None, initial_state="init"):
952 """
953 Takes an Algorithm object from the caf framework and defines the transitions.
954 """
955
956 self.default_states = [State("init"),
957 State("ready"),
958 State("running_algorithm"),
959 State("completed"),
960 State("failed")]
961
962 super().__init__(self.default_states, initial_state)
963
964
965 self.algorithm = algorithm
966
967 self.input_files = []
968
970
973
974 self.output_dir = ""
975
977
978 self.result = None
979
980 self.add_transition("setup_algorithm", "init", "ready",
981 before=[self._setup_logging,
984 self._set_input_data,
985 self._pre_algorithm])
986 self.add_transition("execute_runs", "ready", "running_algorithm",
987 after=self._execute_over_iov)
988 self.add_transition("complete", "running_algorithm", "completed")
989 self.add_transition("fail", "running_algorithm", "failed")
990 self.add_transition("fail", "ready", "failed")
991 self.add_transition("setup_algorithm", "completed", "ready")
992 self.add_transition("setup_algorithm", "failed", "ready")
993
994 def setup_from_dict(self, params):
995 """
996 Parameters:
997 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name.
998 """
999 for attribute_name, value in params.items():
1000 setattr(self, attribute_name, value)
1001
1002 def is_valid(self):
1003 """
1004 Returns:
1005 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
1006 """
1007 B2INFO(f"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
1008 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
1009 for attribute_name in self.required_attrs:
1010 if not hasattr(self, attribute_name):
1011 B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1012 return False
1013 # Check if any attributes that need actual values haven't been set or were empty
1014 for attribute_name in self.required_true_attrs:
1015 if not getattr(self, attribute_name):
1016 B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1017 return False
1018 return True
1019
1020 def _create_output_dir(self, **kwargs):
1021 """
1022 Create working/output directory of algorithm. Any old directory is overwritten.
1023 """
1024 create_directories(Path(self.output_dir), overwrite=True)
1025
1026 def _setup_database_chain(self, **kwargs):
1027 """
1028 Apply all databases in the correct order.
1029 """
1030 # We deliberately override the normal database ordering because we don't want input files GTs to affect
1031 # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1032 b2conditions.reset()
1033 b2conditions.override_globaltags()
1034
1035 # Apply all the databases in order, starting with the user-set chain for this Calibration
1036 for database in self.database_chain:
1037 if database.db_type == 'local':
1038 B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1039 f"for {self.algorithm.name}.")
1040 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1041 elif database.db_type == 'central':
1042 B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1043 f"for {self.algorithm.name}.")
1044 b2conditions.prepend_globaltag(database.global_tag)
1045 else:
1046 raise ValueError(f"Unknown database type {database.db_type}.")
1047 # Here we add the finished databases of previous calibrations that we depend on.
1048 # We can assume that the databases exist as we can't be here until they have returned
1049 # with OK status.
1050 for filename, directory in self.dependent_databases:
1051 B2INFO(f"Adding Local Database {filename} to head of chain of local databases created by"
1052 f" a dependent calibration, for {self.algorithm.name}.")
1053 b2conditions.prepend_testing_payloads(filename)
1054
1055 # Create a directory to store the payloads of this algorithm
1056 create_directories(Path(self.output_database_dir), overwrite=False)
1057
1058 # add local database to save payloads
1059 B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1060 # Things have changed. We now need to do the expert settings to create a database directly.
1061 # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1062 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath("database.txt")))
1063
1064 def _setup_logging(self, **kwargs):
1065 """
1066 """
1067 # add logfile for output
1068 log_file = os.path.join(self.output_dir, self.algorithm.name + '_stdout')
1069 B2INFO(f"Output log file at {log_file}.")
1070 basf2.reset_log()
1071 basf2.set_log_level(basf2.LogLevel.INFO)
1072 basf2.log_to_file(log_file)
1073
1074 def _change_working_dir(self, **kwargs):
1075 """
1076 """
1077 B2INFO(f"Changing current working directory to {self.output_dir}.")
1078 os.chdir(self.output_dir)
1079
1080 def _pre_algorithm(self, **kwargs):
1081 """
1082 Call the user defined algorithm setup function.
1083 """
1084 B2INFO("Running Pre-Algorithm function (if exists)")
1085 if self.algorithm.pre_algorithm:
1086 # We have to re-pass in the algorithm here because an outside user has created this method.
1087 # So the method isn't bound to the instance properly.
1088 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs["iteration"])
1089
1090 def _execute_over_iov(self, **kwargs):
1091 """
1092 Does the actual execute of the algorithm on an IoV and records the result.
1093 """
1094 B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1095
1096 runs_to_execute = kwargs["runs"]
1097 iov = kwargs["apply_iov"]
1098 iteration = kwargs["iteration"]
1099 if not iov:
1100 iov = iov_from_runs(runs_to_execute)
1101 B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1102 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1103 self.result = IoV_Result(iov, alg_result)
1104
1105 def _set_input_data(self, **kwargs):
1106 """set input data"""
1107 self.algorithm.data_input(self.input_files)
1108
1109
1110class MachineError(Exception):
1111 """
1112 Base exception class for this module.
1113 """
1114
1115
1116class ConditionError(MachineError):
1117 """
1118 Exception for when conditions fail during a transition.
1119 """
1120
1121
1123 """
1124 Exception for when transitions fail.
1125 """
STL class.
list required_true_attrs
Attributes that must have a value that returns True when tested.
str output_database_dir
The output database directory for the localdb that the algorithm will commit to.
list input_files
Collector output files, will contain all files returned by the output patterns.
list dependent_databases
CAF created local databases from previous calibrations that this calibration/algorithm depends on.
list database_chain
Assigned database chain to the overall Calibration object, or to the 'default' Collection.
list default_states
Default states for the AlgorithmMachine.
__init__(self, algorithm=None, initial_state="init")
list required_attrs
Required attributes that must exist before the machine can run properly.
algorithm
Algorithm() object whose state we are modelling.
result
IoV_Result object for a single execution, will be reset upon a new execution.
str output_dir
The algorithm output directory which is mostly used to store the stdout file.
root_dir
root directory for this Calibration
_make_pre_collector_path(self, name, collection)
dict _collector_timing
Times of various useful updates to the collector job e.g.
files_containing_iov(self, file_paths, files_to_iovs, iov)
_make_collector_path(self, name, collection)
_runner_final_state
Final state of the algorithm runner for the current iteration.
list default_states
States that are defaults to the CalibrationMachine (could override later)
str collector_output_dir
output directory of collector
iov_to_calibrate
IoV to be executed, currently will loop over all runs in IoV.
dict _collector_jobs
The collector jobs used for submission.
__init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0)
iteration
Which iteration step are we in.
str collector_input_dir
input directory of collector
collector_backend
Backend used for this calibration machine collector.
dict _algorithm_results
Results of each iteration for all algorithms of this calibration.
calibration
Calibration object whose state we are modelling.
add_transition(self, trigger, source, dest, conditions=None, before=None, after=None)
_trigger(self, transition_name, transition_dict, **kwargs)
dict states
Valid states for this machine.
save_graph(self, filename, graphname)
__init__(self, states=None, initial_state="default_initial")
transitions
Allowed transitions between states.
_callback(func, **kwargs)
add_state(self, state, enter=None, exit=None)
get_transitions(self, source)
dict _state
Actual attribute holding the Current state.
initial_state
Pointless docstring since it's a property.
dict _initial_state
Actual attribute holding initial state for this machine.
state
Current State of machine.
__getattr__(self, name, **kwargs)
get_transition_dict(self, state, transition)
_add_callbacks(self, callback, attribute)
__init__(self, name, enter=None, exit=None)
name
Name of the State.
on_enter
Callback list when entering state.
list _on_enter
set state as empty list when entering it
on_exit
Callback list when exiting state.
list _on_exit
set state as empty list when exiting it
_(self, callbacks, attribute)