Belle II Software release-09-00-00
state_machines.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14import basf2
15
16from functools import partial
17from collections import defaultdict
18
19import pickle
20import glob
21import shutil
22import time
23from pathlib import Path
24import os
25import json
26
27from basf2 import create_path
28from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
29from basf2 import conditions as b2conditions
30from basf2.pickle_path import serialize_path
31
32from ROOT.Belle2 import CalibrationAlgorithm
33
34from caf.utils import create_directories
35from caf.utils import method_dispatch
36from caf.utils import iov_from_runs
37from caf.utils import IoV_Result
38from caf.utils import get_iov_from_file
39from caf.backends import Job
40from caf.runners import AlgorithmsRunner
41
42
43class State():
44 """
45 Basic State object that can take enter and exit state methods and records
46 the state of a machine.
47
48 You should assign the self.on_enter or self.on_exit attributes to callback functions
49 or lists of them, if you need them.
50 """
51
52 def __init__(self, name, enter=None, exit=None):
53 """
54 Initialise State with a name and optional lists of callbacks.
55 """
56
57 self.name = name
58
59 self.on_enter = enter
60
61 self.on_exit = exit
62
63 @property
64 def on_enter(self):
65 """
66 Runs callbacks when a state is entered.
67 """
68 return self._on_enter
69
70 @on_enter.setter
71 def on_enter(self, callbacks):
72 """
73 """
74 self._on_enter = []
75 if callbacks:
76 self._add_callbacks(callbacks, self._on_enter)
77
78 @property
79 def on_exit(self):
80 """
81 Runs callbacks when a state is exited.
82 """
83 return self._on_exit
84
85 @on_exit.setter
86 def on_exit(self, callbacks):
87 """
88 """
89 self._on_exit = []
90 if callbacks:
91 self._add_callbacks(callbacks, self._on_exit)
92
93 @method_dispatch
94 def _add_callbacks(self, callback, attribute):
95 """
96 Adds callback to a property.
97 """
98 if callable(callback):
99 attribute.append(callback)
100 else:
101 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
102
103 @_add_callbacks.register(tuple)
104 @_add_callbacks.register(list)
105 def _(self, callbacks, attribute):
106 """
107 Alternate method for lists and tuples of function objects.
108 """
109 if callbacks:
110 for function in callbacks:
111 if callable(function):
112 attribute.append(function)
113 else:
114 B2ERROR(f"Something other than a function (callable) passed into State {self.name}.")
115
116 def __str__(self):
117 """
118 """
119 return self.name
120
121 def __repr__(self):
122 """
123 """
124 return f"State(name={self.name})"
125
126 def __eq__(self, other):
127 """
128 """
129 if isinstance(other, str):
130 return self.name == other
131 else:
132 return self.name == other.name
133
134 def __hash__(self):
135 """
136 """
137 return hash(self.name)
138
139
140class Machine():
141 """
142 Parameters:
143 states (list[str]): A list of possible states of the machine.
144 initial_state (str):
145
146 Base class for a final state machine wrapper.
147 Implements the framwork that a more complex machine can inherit from.
148
149 The `transitions` attribute is a dictionary of trigger name keys, each value of
150 which is another dictionary of 'source' states, 'dest' states, and 'conditions'
151 methods. 'conditions' should be a list of callables or a single one. A transition is
152 valid if it goes from an allowed state to an allowed state.
153 Conditions are optional but must be a callable that returns True or False based
154 on some state of the machine. They cannot have input arguments currently.
155
156 Every condition/before/after callback function MUST take ``**kwargs`` as the only
157 argument (except ``self`` if it's a class method). This is because it's basically
158 impossible to determine which arguments to pass to which functions for a transition.
159 Therefore this machine just enforces that every function should simply take ``**kwargs``
160 and use the dictionary of arguments (even if it doesn't need any arguments).
161 This also means that if you call a trigger with arguments e.g. ``machine.walk(speed=5)``
162 you MUST use the keyword arguments rather than positional ones. So ``machine.walk(5)``
163 will *not* work.
164 """
165
166 def __init__(self, states=None, initial_state="default_initial"):
167 """
168 Basic Setup of states and initial_state.
169 """
170
171 self.states = {}
172 if states:
173 for state in states:
174 self.add_state(state)
175 if initial_state != "default_initial":
176
177 self.initial_state = initial_state
178 else:
179 self.add_state(initial_state)
180
181 self._initial_state = State(initial_state)
182
183
184 self._state = self.initial_state
185
186 self.transitions = defaultdict(list)
187
188 def add_state(self, state, enter=None, exit=None):
189 """
190 Adds a single state to the list of possible ones.
191 Should be a unique string or a State object with a unique name.
192 """
193 if isinstance(state, str):
194 self.add_state(State(state, enter, exit))
195 elif isinstance(state, State):
196 if state.name not in self.states.keys():
197 self.states[state.name] = state
198 else:
199 B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
200 else:
201 B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
202
203 @property
204 def initial_state(self):
205 """
206 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
207 """
208 return self._initial_state
209
210 @initial_state.setter
211 def initial_state(self, state):
212 """
213 """
214 if state in self.states.keys():
215 self._initial_state = self.states[state]
216
217 self._state = self.states[state]
218 else:
219 raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
220
221 @property
222 def state(self):
223 """
224 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
225 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
226 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
227 """
228 return self._state
229
230 @state.setter
231 def state(self, state):
232 """
233 """
234 if isinstance(state, str):
235 state_name = state
236 else:
237 state_name = state.name
238
239 try:
240 state = self.states[state_name]
241 # Run exit callbacks of current state
242 for callback in self.state.on_exit:
243 callback(prior_state=self.state, new_state=state)
244 # Run enter callbacks of new state
245 for callback in state.on_enter:
246 callback(prior_state=self.state, new_state=state)
247 # Set the state
248 self._state = state
249 except KeyError:
250 raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
251
252 @staticmethod
253 def default_condition(**kwargs):
254 """
255 Method to always return True.
256 """
257 return True
258
259 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
260 """
261 Adds a single transition to the dictionary of possible ones.
262 Trigger is the method name that begins the transtion between the
263 source state and the destination state.
264
265 The condition is an optional function that returns True or False
266 depending on the current state/input.
267 """
268 transition_dict = {}
269 try:
270 source = self.states[source]
271 dest = self.states[dest]
272 transition_dict["source"] = source
273 transition_dict["dest"] = dest
274 except KeyError as err:
275 B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
276 raise err
277 if conditions:
278 if isinstance(conditions, (list, tuple, set)):
279 transition_dict["conditions"] = list(conditions)
280 else:
281 transition_dict["conditions"] = [conditions]
282 else:
283 transition_dict["conditions"] = [Machine.default_condition]
284
285 if not before:
286 before = []
287 if isinstance(before, (list, tuple, set)):
288 transition_dict["before"] = list(before)
289 else:
290 transition_dict["before"] = [before]
291
292 if not after:
293 after = []
294 if isinstance(after, (list, tuple, set)):
295 transition_dict["after"] = list(after)
296 else:
297 transition_dict["after"] = [after]
298
299 self.transitions[trigger].append(transition_dict)
300
301 def __getattr__(self, name, **kwargs):
302 """
303 Allows us to create a new method for each trigger on the fly.
304 If there is no trigger name in the machine to match, then the normal
305 AttributeError is called.
306 """
307 possible_transitions = self.get_transitions(self.state)
308 if name not in possible_transitions:
309 raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
310 transition_dict = self.get_transition_dict(self.state, name)
311 return partial(self._trigger, name, transition_dict, **kwargs)
312
313 def _trigger(self, transition_name, transition_dict, **kwargs):
314 """
315 Runs the transition logic. Callbacks are evaluated in the order:
316 conditions -> before -> <new state set here> -> after.
317 """
318 dest, conditions, before_callbacks, after_callbacks = (
319 transition_dict["dest"],
320 transition_dict["conditions"],
321 transition_dict["before"],
322 transition_dict["after"]
323 )
324 # Returns True only if every condition returns True when called
325 if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
326 for before_func in before_callbacks:
327 self._callback(before_func, **kwargs)
328
329 self.state = dest
330 for after_func in after_callbacks:
331 self._callback(after_func, **kwargs)
332 else:
333 raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
334 "evaluated False")
335
336 @staticmethod
337 def _callback(func, **kwargs):
338 """
339 Calls a condition/before/after.. function using arguments passed (or not).
340 """
341 return func(**kwargs)
342
343 def get_transitions(self, source):
344 """
345 Returns allowed transitions from a given state.
346 """
347 possible_transitions = []
348 for transition, transition_dicts in self.transitions.items():
349 for transition_dict in transition_dicts:
350 if transition_dict["source"] == source:
351 possible_transitions.append(transition)
352 return possible_transitions
353
354 def get_transition_dict(self, state, transition):
355 """
356 Returns the transition dictionary for a state and transition out of it.
357 """
358 transition_dicts = self.transitions[transition]
359 for transition_dict in transition_dicts:
360 if transition_dict["source"] == state:
361 return transition_dict
362 else:
363 raise KeyError(f"No transition from state {state} with the name {transition}.")
364
365 def save_graph(self, filename, graphname):
366 """
367 Does a simple dot file creation to visualise states and transiitons.
368 """
369 with open(filename, "w") as dotfile:
370 dotfile.write("digraph " + graphname + " {\n")
371 for state in self.states.keys():
372 dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
373 for trigger, transition_dicts in self.transitions.items():
374 for transition in transition_dicts:
375 dotfile.write('"' + transition["source"].name + '" -> "' +
376 transition["dest"].name + '" [label="' + trigger + '"]\n')
377 dotfile.write("}\n")
378
379
380class CalibrationMachine(Machine):
381 """
382 A state machine to handle `Calibration` objects and the flow of
383 processing for them.
384 """
385
386 collector_input_dir = 'collector_input'
387 collector_output_dir = 'collector_output'
388 algorithm_output_dir = 'algorithm_output'
389
390 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
391 """
392 Takes a Calibration object from the caf framework and lets you
393 set the initial state.
394 """
395
396 self.default_states = [State("init", enter=[self._update_cal_state,
397 self._log_new_state]),
398 State("running_collector", enter=[self._update_cal_state,
399 self._log_new_state]),
400 State("collector_failed", enter=[self._update_cal_state,
401 self._log_new_state]),
402 State("collector_completed", enter=[self._update_cal_state,
403 self._log_new_state]),
404 State("running_algorithms", enter=[self._update_cal_state,
405 self._log_new_state]),
406 State("algorithms_failed", enter=[self._update_cal_state,
407 self._log_new_state]),
408 State("algorithms_completed", enter=[self._update_cal_state,
409 self._log_new_state]),
410 State("completed", enter=[self._update_cal_state,
411 self._log_new_state]),
412 State("failed", enter=[self._update_cal_state,
413 self._log_new_state])
414 ]
415
416 super().__init__(self.default_states, initial_state)
417
418
419 self.calibration = calibration
420 # Monkey Patching for the win!
421
422 self.calibration.machine = self
423
424 self.iteration = iteration
425
426 self.collector_backend = None
427
428 self._algorithm_results = {}
429
430 self._runner_final_state = None
431
432 self.iov_to_calibrate = iov_to_calibrate
433
434 self.root_dir = Path(os.getcwd(), calibration.name)
435
436
439 self._collector_timing = {}
440
441
442 self._collector_jobs = {}
443
444 self.add_transition("submit_collector", "init", "running_collector",
445 conditions=self.dependencies_completed,
446 before=[self._make_output_dir,
447 self._resolve_file_paths,
448 self._build_iov_dicts,
449 self._create_collector_jobs,
450 self._submit_collections,
451 self._dump_job_config])
452 self.add_transition("fail", "running_collector", "collector_failed",
453 conditions=self._collection_failed)
454 self.add_transition("complete", "running_collector", "collector_completed",
455 conditions=self._collection_completed)
456 self.add_transition("run_algorithms", "collector_completed", "running_algorithms",
457 before=self._check_valid_collector_output,
458 after=[self._run_algorithms,
459 self.automatic_transition])
460 self.add_transition("complete", "running_algorithms", "algorithms_completed",
461 after=self.automatic_transition,
462 conditions=self._runner_not_failed)
463 self.add_transition("fail", "running_algorithms", "algorithms_failed",
464 conditions=self._runner_failed)
465 self.add_transition("iterate", "algorithms_completed", "init",
466 conditions=[self._require_iteration,
467 self._below_max_iterations],
468 after=self._increment_iteration)
469 self.add_transition("finish", "algorithms_completed", "completed",
470 conditions=self._no_require_iteration,
471 before=self._prepare_final_db)
472 self.add_transition("fail_fully", "algorithms_failed", "failed")
473 self.add_transition("fail_fully", "collector_failed", "failed")
474
475 def _update_cal_state(self, **kwargs):
476 self.calibration.state = str(kwargs["new_state"])
477
478 def files_containing_iov(self, file_paths, files_to_iovs, iov):
479 """
480 Lookup function that returns all files from the file_paths that
481 overlap with this IoV.
482 """
483 # Files that contain an Exp,Run range that overlaps with given IoV
484 overlapping_files = set()
485
486 for file_path, file_iov in files_to_iovs.items():
487 if file_iov.overlaps(iov) and (file_path in file_paths):
488 overlapping_files.add(file_path)
489 return overlapping_files
490
491 def _dump_job_config(self):
492 """
493 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered later in case of failure.
494 """
495 # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
496 # submits the jobs this might need to wait a while.
497 while any(map(lambda j: j.status == "init", self._collector_jobs.values())):
498 B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
499 time.sleep(5)
500
501 for collection_name, job in self._collector_jobs.items():
502 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
503 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
504 collection_name, collector_job_output_file_name)
505 job.dump_to_json(output_file)
506
507 def _recover_collector_jobs(self):
508 """
509 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
510 """
511 for collection_name, collection in self.calibration.collections.items():
512 output_file = self.root_dir.joinpath(str(self.iteration),
513 self.collector_input_dir,
514 collection_name,
515 collection.job_config)
516 self._collector_jobs[collection_name] = Job.from_json(output_file)
517
518 def _iov_requested(self):
519 """
520 """
521 if self.iov_to_calibrate:
522 B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
523 return True
524 else:
525 B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
526 return False
527
528 def _resolve_file_paths(self):
529 """
530 """
531 pass
532
533 def _build_iov_dicts(self):
534 """
535 Build IoV file dictionary for each collection if required.
536 """
537 iov_requested = self._iov_requested()
538 if iov_requested or self.calibration.ignored_runs:
539 for coll_name, collection in self.calibration.collections.items():
540 if not collection.files_to_iovs:
541 B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
542 f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
543 " Filling dictionary from input file metadata."
544 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
545
546 files_to_iovs = {}
547 for file_path in collection.input_files:
548 files_to_iovs[file_path] = get_iov_from_file(file_path)
549 collection.files_to_iovs = files_to_iovs
550 else:
551 B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
552 f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
553 else:
554 B2INFO("No File to IoV mapping required.")
555
556 def _below_max_iterations(self):
557 """
558 """
559 return self.iteration < self.calibration.max_iterations
560
561 def _increment_iteration(self):
562 """
563 """
564 self.iteration += 1
565 self.calibration.iteration = self.iteration
566
567 def _collection_completed(self):
568 """
569 Did all the collections succeed?
570 """
571 B2DEBUG(29, "Checking for failed collector job.")
572 if self._collector_jobs_ready():
573 return all([job.status == "completed" for job in self._collector_jobs.values()])
574
575 def _collection_failed(self):
576 """
577 Did any of the collections fail?
578 """
579 B2DEBUG(29, "Checking for failed collector job.")
580 if self._collector_jobs_ready():
581 return any([job.status == "failed" for job in self._collector_jobs.values()])
582
583 def _runner_not_failed(self):
584 """
585 Returns:
586 bool: If AlgorithmsRunner succeeded return True.
587 """
588 return not self._runner_failed()
589
590 def _runner_failed(self):
591 """
592 Returns:
593 bool: If AlgorithmsRunner failed return True.
594 """
595 if self._runner_final_state == AlgorithmsRunner.FAILED:
596 return True
597 else:
598 return False
599
600 def _collector_jobs_ready(self):
601 """
602 """
603 since_last_update = time.time() - self._collector_timing["last_update"]
604 if since_last_update > self.calibration.collector_full_update_interval:
605 B2INFO("Updating full collector job statuses.")
606 for job in self._collector_jobs.values():
607 job.update_status()
608 self._collector_timing["last_update"] = time.time()
609 if job.subjobs:
610 num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
611 total_subjobs = len(job.subjobs)
612 B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
613 f" Calibration {self.calibration.name} Job {job.name}.")
614 return all([job.ready() for job in self._collector_jobs.values()])
615
616 def _submit_collections(self):
617 """
618 """
619 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
620 self._collector_timing["start"] = time.time()
621 self._collector_timing["last_update"] = time.time()
622
623 def _no_require_iteration(self):
624 """
625 """
626 if self._require_iteration() and self._below_max_iterations():
627 return False
628 elif self._require_iteration() and not self._below_max_iterations():
629 B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
630 return True
631 elif not self._require_iteration():
632 return True
633
634 def _require_iteration(self):
635 """
636 """
637 iteration_called = False
638 for alg_name, results in self._algorithm_results[self.iteration].items():
639 for result in results:
640 if result.result == CalibrationAlgorithm.c_Iterate:
641 iteration_called = True
642 break
643 if iteration_called:
644 break
645 return iteration_called
646
647 def _log_new_state(self, **kwargs):
648 """
649 """
650 B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
651
652 def dependencies_completed(self):
653 """
654 Condition function to check that the dependencies of our calibration are in the 'completed' state.
655 Technically only need to check explicit dependencies.
656 """
657 for calibration in self.calibration.dependencies:
658 if not calibration.state == calibration.end_state:
659 return False
660 else:
661 return True
662
663 def automatic_transition(self):
664 """
665 Automatically try all transitions out of this state once. Tries fail last.
666 """
667 possible_transitions = self.get_transitions(self.state)
668 for transition in possible_transitions:
669 try:
670 if transition != "fail":
671 getattr(self, transition)()
672 break
673 except ConditionError:
674 continue
675 else:
676 if "fail" in possible_transitions:
677 getattr(self, "fail")()
678 else:
679 raise MachineError(f"Failed to automatically transition out of {self.state} state.")
680
681 def _make_output_dir(self):
682 """
683 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
684 Also creates s
685 """
686 create_directories(self.root_dir, overwrite=False)
687
688 def _make_collector_path(self, name, collection):
689 """
690 Creates a basf2 path for the correct collector and serializes it in the
691 self.output_dir/<calibration_name>/<iteration>/paths directory
692 """
693 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
694 # Automatically overwrite any previous directory
695 create_directories(path_output_dir)
696 path_file_name = collection.collector.name() + '.path'
697 path_file_name = path_output_dir / path_file_name
698 # Create empty path and add collector to it
699 coll_path = create_path()
700 coll_path.add_module(collection.collector)
701 # Dump the basf2 path to file
702 with open(path_file_name, 'bw') as serialized_path_file:
703 pickle.dump(serialize_path(coll_path), serialized_path_file)
704 # Return the pickle file path for addition to the input sandbox
705 return str(path_file_name.absolute())
706
707 def _make_pre_collector_path(self, name, collection):
708 """
709 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
710 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
711 """
712 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
713 coll_path = collection.pre_collector_path
714 path_file_name = 'pre_collector.path'
715 path_file_name = os.path.join(path_output_dir, path_file_name)
716 # Dump the basf2 path to file
717 with open(path_file_name, 'bw') as serialized_path_file:
718 pickle.dump(serialize_path(coll_path), serialized_path_file)
719 # Return the pickle file path for addition to the input sandbox
720 return path_file_name
721
722 def _create_collector_jobs(self):
723 """
724 Creates a Job object for the collections of this iteration, ready for submission
725 to backend.
726 """
727 for collection_name, collection in self.calibration.collections.items():
728 iteration_dir = self.root_dir.joinpath(str(self.iteration))
729 job = Job('_'.join([self.calibration.name, collection_name, 'Iteration', str(self.iteration)]))
730 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
731 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
732 # Remove previous failed attempt to avoid problems
733 if job.output_dir.exists():
734 B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
735 f"Deleting {job.output_dir} before re-submitting.")
736 shutil.rmtree(job.output_dir)
737 job.cmd = collection.job_cmd
738 job.append_current_basf2_setup_cmds()
739 job.input_sandbox_files.append(collection.job_script)
740 collector_path_file = Path(self._make_collector_path(collection_name, collection))
741 job.input_sandbox_files.append(collector_path_file)
742 if collection.pre_collector_path:
743 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
744 job.input_sandbox_files.append(pre_collector_path_file)
745
746 # Want to figure out which local databases are required for this job and their paths
747 list_dependent_databases = []
748
749 # Here we add the finished databases of previous calibrations that we depend on.
750 # We can assume that the databases exist as we can't be here until they have returned
751 for dependency in self.calibration.dependencies:
752 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
753 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
754 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
755
756 # Add previous iteration databases from this calibration
757 if self.iteration > 0:
758 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
759 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
760 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
761 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
762
763 # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
764 # collector path
765 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
766
767 # Need to pass setup info to collector which would be tricky as arguments
768 # We make a dictionary and pass it in as json
769 job_config = {}
770 # Apply the user-set Calibration database chain to the base of the overall chain.
771 json_db_chain = []
772 for database in collection.database_chain:
773 if database.db_type == 'local':
774 json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
775 elif database.db_type == 'central':
776 json_db_chain.append(('central', database.global_tag))
777 else:
778 raise ValueError(f"Unknown database type {database.db_type}.")
779 # CAF created ones for dependent calibrations and previous iterations of this calibration
780 for database in list_dependent_databases:
781 json_db_chain.append(('local', database))
782 job_config['database_chain'] = json_db_chain
783
784 job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
785 with open(job_config_file_path, 'w') as job_config_file:
786 json.dump(job_config, job_config_file, indent=2)
787 job.input_sandbox_files.append(job_config_file_path)
788
789 # Define the input files
790 input_data_files = set(collection.input_files)
791 # Reduce the input data files to only those that overlap with the optional requested IoV
792 if self.iov_to_calibrate:
793 input_data_files = self.files_containing_iov(input_data_files,
794 collection.files_to_iovs,
795 self.iov_to_calibrate)
796 # Remove any files that ONLY contain runs from our optional ignored_runs list
797 files_to_ignore = set()
798 for exprun in self.calibration.ignored_runs:
799 for input_file in input_data_files:
800 file_iov = self.calibration.files_to_iovs[input_file]
801 if file_iov == exprun.make_iov():
802 B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
803 f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
804 "is being removed from input files list.")
805 files_to_ignore.add(input_file)
806 input_data_files.difference_update(files_to_ignore)
807
808 if not input_data_files:
809 raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
810 f" and Collection '{collection_name}'.")
811 job.input_files = list(input_data_files)
812
813 job.splitter = collection.splitter
814 job.backend_args = collection.backend_args
815 # Output patterns to be returned from collector job
816 job.output_patterns = collection.output_patterns
817 B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
818 self._collector_jobs[collection_name] = job
819
820 def _check_valid_collector_output(self):
821 B2INFO("Checking that Collector output exists for all colector jobs "
822 f"using {self.calibration.name}.output_patterns.")
823 if not self._collector_jobs:
824 B2INFO("We're restarting so we'll recreate the collector Job object.")
825 self._recover_collector_jobs()
826
827 for job in self._collector_jobs.values():
828 if not job.subjobs:
829 output_files = []
830 for pattern in job.output_patterns:
831 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
832 if not output_files:
833 raise MachineError("No output files from Collector Job")
834 else:
835 for subjob in job.subjobs.values():
836 output_files = []
837 for pattern in subjob.output_patterns:
838 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
839 if not output_files:
840 raise MachineError(f"No output files from Collector {subjob}")
841
842 def _run_algorithms(self):
843 """
844 Runs the Calibration Algorithms for this calibration machine.
845
846 Will run them sequentially locally (possible benefits to using a
847 processing pool for low memory algorithms later on.)
848 """
849 # Get an instance of the Runner for these algorithms and run it
850 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
851 algs_runner.algorithms = self.calibration.algorithms
852 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
853 output_database_dir = algorithm_output_dir.joinpath("outputdb")
854 # Remove it, if we failed previously, to start clean
855 if algorithm_output_dir.exists():
856 B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
857 f"Deleting and recreating {algorithm_output_dir}.")
858 create_directories(algorithm_output_dir)
859 B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
860 algs_runner.output_database_dir = output_database_dir
861 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
862 input_files = []
863
864 for job in self._collector_jobs.values():
865 if job.subjobs:
866 for subjob in job.subjobs.values():
867 for pattern in subjob.output_patterns:
868 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
869 else:
870 for pattern in job.output_patterns:
871 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
872
873 algs_runner.input_files = input_files
874
875 # Add any user defined database chain for this calibration
876 algs_runner.database_chain = self.calibration.database_chain
877
878 # Here we add the finished databases of previous calibrations that we depend on.
879 # We can assume that the databases exist as we can't be here until they have returned
880 list_dependent_databases = []
881 for dependency in self.calibration.dependencies:
882 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
883 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
884 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
885
886 # Add previous iteration databases from this calibration
887 if self.iteration > 0:
888 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
889 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
890 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
891 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
892 algs_runner.dependent_databases = list_dependent_databases
893
894 algs_runner.ignored_runs = self.calibration.ignored_runs
895
896 try:
897 algs_runner.run(self.iov_to_calibrate, self.iteration)
898 except Exception as err:
899 print(err)
900 # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
901 # results. But here we had an actual exception so we just force into failure instead.
902 self._state = State("algorithms_failed")
903 self._algorithm_results[self.iteration] = algs_runner.results
904 self._runner_final_state = algs_runner.final_state
905
906 def _prepare_final_db(self):
907 """
908 Take the last iteration's outputdb and copy it to a more easily findable place.
909 """
910 database_location = self.root_dir.joinpath(str(self.iteration),
911 self.calibration.alg_output_dir,
912 'outputdb')
913 final_database_location = self.root_dir.joinpath('outputdb')
914 if final_database_location.exists():
915 B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
916 shutil.rmtree(final_database_location)
917 shutil.copytree(database_location, final_database_location)
918
919
920class AlgorithmMachine(Machine):
921 """
922 A state machine to handle the logic of running the algorithm on the overall runs contained in the data.
923 """
924
925
927 required_attrs = ["algorithm",
928 "dependent_databases",
929 "database_chain",
930 "output_dir",
931 "output_database_dir",
932 "input_files"
933 ]
934
935
936 required_true_attrs = ["algorithm",
937 "output_dir",
938 "output_database_dir",
939 "input_files"
940 ]
941
942 def __init__(self, algorithm=None, initial_state="init"):
943 """
944 Takes an Algorithm object from the caf framework and defines the transitions.
945 """
946
947 self.default_states = [State("init"),
948 State("ready"),
949 State("running_algorithm"),
950 State("completed"),
951 State("failed")]
952
953 super().__init__(self.default_states, initial_state)
954
955
956 self.algorithm = algorithm
957
958 self.input_files = []
959
960 self.dependent_databases = []
961
963 self.database_chain = []
964
965 self.output_dir = ""
966
967 self.output_database_dir = ""
968
969 self.result = None
970
971 self.add_transition("setup_algorithm", "init", "ready",
972 before=[self._setup_logging,
973 self._change_working_dir,
974 self._setup_database_chain,
975 self._set_input_data,
976 self._pre_algorithm])
977 self.add_transition("execute_runs", "ready", "running_algorithm",
978 after=self._execute_over_iov)
979 self.add_transition("complete", "running_algorithm", "completed")
980 self.add_transition("fail", "running_algorithm", "failed")
981 self.add_transition("fail", "ready", "failed")
982 self.add_transition("setup_algorithm", "completed", "ready")
983 self.add_transition("setup_algorithm", "failed", "ready")
984
985 def setup_from_dict(self, params):
986 """
987 Parameters:
988 params (dict): Dictionary containing values to be assigned to the machine's attributes of the same name. """
989 for attribute_name, value in params.items():
990 setattr(self, attribute_name, value)
991
992 def is_valid(self):
993 """
994 Returns:
995 bool: Whether or not this machine has been set up correctly with all its necessary attributes.
996 """
997 B2INFO(f"Checking validity of current setup of AlgorithmMachine for {self.algorithm.name}.")
998 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
999 for attribute_name in self.required_attrs:
1000 if not hasattr(self, attribute_name):
1001 B2ERROR(f"AlgorithmMachine attribute {attribute_name} doesn't exist.")
1002 return False
1003 # Check if any attributes that need actual values haven't been set or were empty
1004 for attribute_name in self.required_true_attrs:
1005 if not getattr(self, attribute_name):
1006 B2ERROR(f"AlgorithmMachine attribute {attribute_name} returned False.")
1007 return False
1008 return True
1009
1010 def _create_output_dir(self, **kwargs):
1011 """
1012 Create working/output directory of algorithm. Any old directory is overwritten.
1013 """
1014 create_directories(Path(self.output_dir), overwrite=True)
1015
1016 def _setup_database_chain(self, **kwargs):
1017 """
1018 Apply all databases in the correct order.
1019 """
1020 # We deliberately override the normal database ordering because we don't want input files GTs to affect
1021 # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
1022 b2conditions.reset()
1023 b2conditions.override_globaltags()
1024
1025 # Apply all the databases in order, starting with the user-set chain for this Calibration
1026 for database in self.database_chain:
1027 if database.db_type == 'local':
1028 B2INFO(f"Adding Local Database {database.filepath.as_posix()} to head of chain of local databases, "
1029 f"for {self.algorithm.name}.")
1030 b2conditions.prepend_testing_payloads(database.filepath.as_posix())
1031 elif database.db_type == 'central':
1032 B2INFO(f"Adding Central database tag {database.global_tag} to head of GT chain, "
1033 f"for {self.algorithm.name}.")
1034 b2conditions.prepend_globaltag(database.global_tag)
1035 else:
1036 raise ValueError(f"Unknown database type {database.db_type}.")
1037 # Here we add the finished databases of previous calibrations that we depend on.
1038 # We can assume that the databases exist as we can't be here until they have returned
1039 # with OK status.
1040 for filename, directory in self.dependent_databases:
1041 B2INFO(f"Adding Local Database {filename} to head of chain of local databases created by"
1042 f" a dependent calibration, for {self.algorithm.name}.")
1043 b2conditions.prepend_testing_payloads(filename)
1044
1045 # Create a directory to store the payloads of this algorithm
1046 create_directories(Path(self.output_database_dir), overwrite=False)
1047
1048 # add local database to save payloads
1049 B2INFO(f"Output local database for {self.algorithm.name} stored at {self.output_database_dir}.")
1050 # Things have changed. We now need to do the expert settings to create a database directly.
1051 # LocalDB is readonly without this but we don't need 'use_local_database' during writing.
1052 b2conditions.expert_settings(save_payloads=str(self.output_database_dir.joinpath("database.txt")))
1053
1054 def _setup_logging(self, **kwargs):
1055 """
1056 """
1057 # add logfile for output
1058 log_file = os.path.join(self.output_dir, self.algorithm.name + '_stdout')
1059 B2INFO(f"Output log file at {log_file}.")
1060 basf2.reset_log()
1061 basf2.set_log_level(basf2.LogLevel.INFO)
1062 basf2.log_to_file(log_file)
1063
1064 def _change_working_dir(self, **kwargs):
1065 """
1066 """
1067 B2INFO(f"Changing current working directory to {self.output_dir}.")
1068 os.chdir(self.output_dir)
1069
1070 def _pre_algorithm(self, **kwargs):
1071 """
1072 Call the user defined algorithm setup function.
1073 """
1074 B2INFO("Running Pre-Algorithm function (if exists)")
1075 if self.algorithm.pre_algorithm:
1076 # We have to re-pass in the algorithm here because an outside user has created this method.
1077 # So the method isn't bound to the instance properly.
1078 self.algorithm.pre_algorithm(self.algorithm.algorithm, kwargs["iteration"])
1079
1080 def _execute_over_iov(self, **kwargs):
1081 """
1082 Does the actual execute of the algorithm on an IoV and records the result.
1083 """
1084 B2INFO(f"Running {self.algorithm.name} in working directory {os.getcwd()}.")
1085
1086 runs_to_execute = kwargs["runs"]
1087 iov = kwargs["apply_iov"]
1088 iteration = kwargs["iteration"]
1089 if not iov:
1090 iov = iov_from_runs(runs_to_execute)
1091 B2INFO(f"Execution will use {iov} for labelling payloads by default.")
1092 alg_result = self.algorithm.algorithm.execute(runs_to_execute, iteration, iov._cpp_iov)
1093 self.result = IoV_Result(iov, alg_result)
1094
1095 def _set_input_data(self, **kwargs):
1096 self.algorithm.data_input(self.input_files)
1097
1098
1099class MachineError(Exception):
1100 """
1101 Base exception class for this module.
1102 """
1103
1104
1105class ConditionError(MachineError):
1106 """
1107 Exception for when conditions fail during a transition.
1108 """
1109
1110
1111class TransitionError(MachineError):
1112 """
1113 Exception for when transitions fail.
1114 """
1115
1116# @endcond
1117