Belle II Software development
CalibrationMachine Class Reference
Inheritance diagram for CalibrationMachine:
Machine

Public Member Functions

 __init__ (self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0)
 
 files_containing_iov (self, file_paths, files_to_iovs, iov)
 
 dependencies_completed (self)
 
 automatic_transition (self)
 
 initial_state (self)
 
 initial_state (self, state)
 
 state (self)
 
 state (self, state)
 
 add_state (self, state, enter=None, exit=None)
 
 add_transition (self, trigger, source, dest, conditions=None, before=None, after=None)
 
 __getattr__ (self, name, **kwargs)
 
 get_transitions (self, source)
 
 get_transition_dict (self, state, transition)
 
 save_graph (self, filename, graphname)
 

Static Public Member Functions

 default_condition (**kwargs)
 

Public Attributes

list default_states
 States that are defaults to the CalibrationMachine (could override later)
 
 calibration = calibration
 Calibration object whose state we are modelling.
 
 iteration = iteration
 Which iteration step are we in.
 
 collector_backend = None
 Backend used for this calibration machine collector.
 
 iov_to_calibrate = iov_to_calibrate
 IoV to be executed, currently will loop over all runs in IoV.
 
 root_dir = Path(os.getcwd(), calibration.name)
 root directory for this Calibration
 
dict states = {}
 Valid states for this machine.
 
 initial_state = initial_state
 Pointless docstring since it's a property.
 
 transitions = defaultdict(list)
 Allowed transitions between states.
 
 state = dest
 Current State of machine.
 

Static Public Attributes

str collector_input_dir = 'collector_input'
 input directory of collector
 
str collector_output_dir = 'collector_output'
 output directory of collector
 
str algorithm_output_dir = 'algorithm_output'
 output directory of algorithm
 

Protected Member Functions

 _update_cal_state (self, **kwargs)
 
 _dump_job_config (self)
 
 _recover_collector_jobs (self)
 
 _iov_requested (self)
 
 _resolve_file_paths (self)
 
 _build_iov_dicts (self)
 
 _below_max_iterations (self)
 
 _increment_iteration (self)
 
 _collection_completed (self)
 
 _collection_failed (self)
 
 _runner_not_failed (self)
 
 _runner_failed (self)
 
 _collector_jobs_ready (self)
 
 _submit_collections (self)
 
 _no_require_iteration (self)
 
 _require_iteration (self)
 
 _log_new_state (self, **kwargs)
 
 _make_output_dir (self)
 
 _make_collector_path (self, name, collection)
 
 _make_pre_collector_path (self, name, collection)
 
 _create_collector_jobs (self)
 
 _check_valid_collector_output (self)
 
 _run_algorithms (self)
 
 _prepare_final_db (self)
 
 _trigger (self, transition_name, transition_dict, **kwargs)
 

Static Protected Member Functions

 _callback (func, **kwargs)
 

Protected Attributes

dict _algorithm_results = {}
 Results of each iteration for all algorithms of this calibration.
 
 _runner_final_state = None
 Final state of the algorithm runner for the current iteration.
 
dict _collector_timing = {}
 Times of various useful updates to the collector job e.g.
 
dict _collector_jobs = {}
 The collector jobs used for submission.
 
dict _initial_state = State(initial_state)
 Actual attribute holding initial state for this machine.
 
dict _state = self.initial_state
 Actual attribute holding the Current state.
 

Detailed Description

A state machine to handle `Calibration` objects and the flow of
processing for them.

Definition at line 387 of file state_machines.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
calibration,
iov_to_calibrate = None,
initial_state = "init",
iteration = 0 )
Takes a Calibration object from the caf framework and lets you
set the initial state.

Definition at line 400 of file state_machines.py.

400 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
401 """
402 Takes a Calibration object from the caf framework and lets you
403 set the initial state.
404 """
405
406 self.default_states = [State("init", enter=[self._update_cal_state,
407 self._log_new_state]),
408 State("running_collector", enter=[self._update_cal_state,
409 self._log_new_state]),
410 State("collector_failed", enter=[self._update_cal_state,
411 self._log_new_state]),
412 State("collector_completed", enter=[self._update_cal_state,
413 self._log_new_state]),
414 State("running_algorithms", enter=[self._update_cal_state,
415 self._log_new_state]),
416 State("algorithms_failed", enter=[self._update_cal_state,
417 self._log_new_state]),
418 State("algorithms_completed", enter=[self._update_cal_state,
419 self._log_new_state]),
420 State("completed", enter=[self._update_cal_state,
421 self._log_new_state]),
422 State("failed", enter=[self._update_cal_state,
423 self._log_new_state])
424 ]
425
426 super().__init__(self.default_states, initial_state)
427
428
429 self.calibration = calibration
430 # Monkey Patching for the win!
431
432 self.calibration.machine = self
433
434 self.iteration = iteration
435
436 self.collector_backend = None
437
438 self._algorithm_results = {}
439
440 self._runner_final_state = None
441
442 self.iov_to_calibrate = iov_to_calibrate
443
444 self.root_dir = Path(os.getcwd(), calibration.name)
445
446
449 self._collector_timing = {}
450
451
452 self._collector_jobs = {}
453
454 self.add_transition("submit_collector", "init", "running_collector",
455 conditions=self.dependencies_completed,
456 before=[self._make_output_dir,
457 self._resolve_file_paths,
458 self._build_iov_dicts,
459 self._create_collector_jobs,
460 self._submit_collections,
461 self._dump_job_config])
462 self.add_transition("fail", "running_collector", "collector_failed",
463 conditions=self._collection_failed)
464 self.add_transition("complete", "running_collector", "collector_completed",
465 conditions=self._collection_completed)
466 self.add_transition("run_algorithms", "collector_completed", "running_algorithms",
467 before=self._check_valid_collector_output,
468 after=[self._run_algorithms,
469 self.automatic_transition])
470 self.add_transition("complete", "running_algorithms", "algorithms_completed",
471 after=self.automatic_transition,
472 conditions=self._runner_not_failed)
473 self.add_transition("fail", "running_algorithms", "algorithms_failed",
474 conditions=self._runner_failed)
475 self.add_transition("iterate", "algorithms_completed", "init",
476 conditions=[self._require_iteration,
477 self._below_max_iterations],
478 after=self._increment_iteration)
479 self.add_transition("finish", "algorithms_completed", "completed",
480 conditions=self._no_require_iteration,
481 before=self._prepare_final_db)
482 self.add_transition("fail_fully", "algorithms_failed", "failed")
483 self.add_transition("fail_fully", "collector_failed", "failed")
484

Member Function Documentation

◆ __getattr__()

__getattr__ ( self,
name,
** kwargs )
inherited
Allows us to create a new method for each trigger on the fly.
If there is no trigger name in the machine to match, then the normal
AttributeError is called.

Definition at line 306 of file state_machines.py.

306 def __getattr__(self, name, **kwargs):
307 """
308 Allows us to create a new method for each trigger on the fly.
309 If there is no trigger name in the machine to match, then the normal
310 AttributeError is called.
311 """
312 possible_transitions = self.get_transitions(self.state)
313 if name not in possible_transitions:
314 raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
315 transition_dict = self.get_transition_dict(self.state, name)
316 # \cond silence doxygen warning about _trigger
317 return partial(self._trigger, name, transition_dict, **kwargs)
318 # \endcond
319

◆ _below_max_iterations()

_below_max_iterations ( self)
protected
 

Definition at line 568 of file state_machines.py.

568 def _below_max_iterations(self):
569 """
570 """
571 return self.iteration < self.calibration.max_iterations
572

◆ _build_iov_dicts()

_build_iov_dicts ( self)
protected
Build IoV file dictionary for each collection if required.

Definition at line 545 of file state_machines.py.

545 def _build_iov_dicts(self):
546 """
547 Build IoV file dictionary for each collection if required.
548 """
549 iov_requested = self._iov_requested()
550 if iov_requested or self.calibration.ignored_runs:
551 for coll_name, collection in self.calibration.collections.items():
552 if not collection.files_to_iovs:
553 B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
554 f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
555 " Filling dictionary from input file metadata."
556 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
557
558 files_to_iovs = {}
559 for file_path in collection.input_files:
560 files_to_iovs[file_path] = get_iov_from_file(file_path)
561 collection.files_to_iovs = files_to_iovs
562 else:
563 B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
564 f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
565 else:
566 B2INFO("No File to IoV mapping required.")
567

◆ _callback()

_callback ( func,
** kwargs )
staticprotectedinherited
Calls a condition/before/after.. function using arguments passed (or not).

Definition at line 344 of file state_machines.py.

344 def _callback(func, **kwargs):
345 """
346 Calls a condition/before/after.. function using arguments passed (or not).
347 """
348 return func(**kwargs)
349

◆ _check_valid_collector_output()

_check_valid_collector_output ( self)
protected
check that collector output is valid

Definition at line 834 of file state_machines.py.

834 def _check_valid_collector_output(self):
835 """check that collector output is valid"""
836 B2INFO("Checking that Collector output exists for all collector jobs "
837 f"using {self.calibration.name}.output_patterns.")
838 if not self._collector_jobs:
839 B2INFO("We're restarting so we'll recreate the collector Job object.")
840 self._recover_collector_jobs()
841
842 for job in self._collector_jobs.values():
843 if not job.subjobs:
844 output_files = []
845 for pattern in job.output_patterns:
846 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
847 if not output_files:
848 raise MachineError("No output files from Collector Job")
849 else:
850 for subjob in job.subjobs.values():
851 output_files = []
852 for pattern in subjob.output_patterns:
853 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
854 if not output_files:
855 raise MachineError(f"No output files from Collector {subjob}")
856

◆ _collection_completed()

_collection_completed ( self)
protected
Did all the collections succeed?

Definition at line 579 of file state_machines.py.

579 def _collection_completed(self):
580 """
581 Did all the collections succeed?
582 """
583 B2DEBUG(29, "Checking for failed collector job.")
584 if self._collector_jobs_ready():
585 return all([job.status == "completed" for job in self._collector_jobs.values()])
586

◆ _collection_failed()

_collection_failed ( self)
protected
Did any of the collections fail?

Definition at line 587 of file state_machines.py.

587 def _collection_failed(self):
588 """
589 Did any of the collections fail?
590 """
591 B2DEBUG(29, "Checking for failed collector job.")
592 if self._collector_jobs_ready():
593 return any([job.status == "failed" for job in self._collector_jobs.values()])
594

◆ _collector_jobs_ready()

_collector_jobs_ready ( self)
protected
 

Definition at line 612 of file state_machines.py.

612 def _collector_jobs_ready(self):
613 """
614 """
615 since_last_update = time.time() - self._collector_timing["last_update"]
616 if since_last_update > self.calibration.collector_full_update_interval:
617 B2INFO("Updating full collector job statuses.")
618 for job in self._collector_jobs.values():
619 job.update_status()
620 self._collector_timing["last_update"] = time.time()
621 if job.subjobs:
622 num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
623 total_subjobs = len(job.subjobs)
624 B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
625 f" Calibration {self.calibration.name} Job {job.name}.")
626 return all([job.ready() for job in self._collector_jobs.values()])
627

◆ _create_collector_jobs()

_create_collector_jobs ( self)
protected
Creates a Job object for the collections of this iteration, ready for submission
to backend.

Definition at line 734 of file state_machines.py.

734 def _create_collector_jobs(self):
735 """
736 Creates a Job object for the collections of this iteration, ready for submission
737 to backend.
738 """
739 for collection_name, collection in self.calibration.collections.items():
740 iteration_dir = self.root_dir.joinpath(str(self.iteration))
741 job = Job('_'.join([self.calibration.name, collection_name, 'Iteration', str(self.iteration)]))
742 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
743 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
744 # Remove previous failed attempt to avoid problems
745 if job.output_dir.exists():
746 B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
747 f"Deleting {job.output_dir} before re-submitting.")
748 shutil.rmtree(job.output_dir)
749 job.cmd = collection.job_cmd
750 job.append_current_basf2_setup_cmds()
751 job.input_sandbox_files.append(collection.job_script)
752 collector_path_file = Path(self._make_collector_path(collection_name, collection))
753 job.input_sandbox_files.append(collector_path_file)
754 if collection.pre_collector_path:
755 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
756 job.input_sandbox_files.append(pre_collector_path_file)
757
758 # Want to figure out which local databases are required for this job and their paths
759 list_dependent_databases = []
760
761 # Here we add the finished databases of previous calibrations that we depend on.
762 # We can assume that the databases exist as we can't be here until they have returned
763 for dependency in self.calibration.dependencies:
764 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
765 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
766 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
767
768 # Add previous iteration databases from this calibration
769 if self.iteration > 0:
770 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
771 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
772 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
773 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
774
775 # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
776 # collector path
777 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
778
779 # Need to pass setup info to collector which would be tricky as arguments
780 # We make a dictionary and pass it in as json
781 job_config = {}
782 # Apply the user-set Calibration database chain to the base of the overall chain.
783 json_db_chain = []
784 for database in collection.database_chain:
785 if database.db_type == 'local':
786 json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
787 elif database.db_type == 'central':
788 json_db_chain.append(('central', database.global_tag))
789 else:
790 raise ValueError(f"Unknown database type {database.db_type}.")
791 # CAF created ones for dependent calibrations and previous iterations of this calibration
792 for database in list_dependent_databases:
793 json_db_chain.append(('local', database))
794 job_config['database_chain'] = json_db_chain
795
796 job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
797 with open(job_config_file_path, 'w') as job_config_file:
798 json.dump(job_config, job_config_file, indent=2)
799 job.input_sandbox_files.append(job_config_file_path)
800
801 # Define the input files
802 input_data_files = set(collection.input_files)
803 # Reduce the input data files to only those that overlap with the optional requested IoV
804 # Local variable avoids doxygen "no uniquely matching class member" warning
805 iov_to_calibrate = self.iov_to_calibrate
806 if iov_to_calibrate:
807 input_data_files = self.files_containing_iov(input_data_files,
808 collection.files_to_iovs,
809 iov_to_calibrate)
810 # Remove any files that ONLY contain runs from our optional ignored_runs list
811 files_to_ignore = set()
812 for exprun in self.calibration.ignored_runs:
813 for input_file in input_data_files:
814 file_iov = self.calibration.files_to_iovs[input_file]
815 if file_iov == exprun.make_iov():
816 B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
817 f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
818 "is being removed from input files list.")
819 files_to_ignore.add(input_file)
820 input_data_files.difference_update(files_to_ignore)
821
822 if not input_data_files:
823 raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
824 f" and Collection '{collection_name}'.")
825 job.input_files = list(input_data_files)
826
827 job.splitter = collection.splitter
828 job.backend_args = collection.backend_args
829 # Output patterns to be returned from collector job
830 job.output_patterns = collection.output_patterns
831 B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
832 self._collector_jobs[collection_name] = job
833

◆ _dump_job_config()

_dump_job_config ( self)
protected
Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
later in case of failure.

Definition at line 502 of file state_machines.py.

502 def _dump_job_config(self):
503 """
504 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
505 later in case of failure.
506 """
507 # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
508 # submits the jobs this might need to wait a while.
509 while any(map(lambda j: j.status == "init", self._collector_jobs.values())):
510 B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
511 time.sleep(5)
512
513 for collection_name, job in self._collector_jobs.items():
514 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
515 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
516 collection_name, collector_job_output_file_name)
517 job.dump_to_json(output_file)
518
STL class.

◆ _increment_iteration()

_increment_iteration ( self)
protected
 

Definition at line 573 of file state_machines.py.

573 def _increment_iteration(self):
574 """
575 """
576 self.iteration += 1
577 self.calibration.iteration = self.iteration
578

◆ _iov_requested()

_iov_requested ( self)
protected
 

Definition at line 530 of file state_machines.py.

530 def _iov_requested(self):
531 """
532 """
533 if self.iov_to_calibrate:
534 B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
535 return True
536 else:
537 B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
538 return False
539

◆ _log_new_state()

_log_new_state ( self,
** kwargs )
protected
 

Definition at line 659 of file state_machines.py.

659 def _log_new_state(self, **kwargs):
660 """
661 """
662 B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
663

◆ _make_collector_path()

_make_collector_path ( self,
name,
collection )
protected
Creates a basf2 path for the correct collector and serializes it in the
self.output_dir/<calibration_name>/<iteration>/paths directory

Definition at line 700 of file state_machines.py.

700 def _make_collector_path(self, name, collection):
701 """
702 Creates a basf2 path for the correct collector and serializes it in the
703 self.output_dir/<calibration_name>/<iteration>/paths directory
704 """
705 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
706 # Automatically overwrite any previous directory
707 create_directories(path_output_dir)
708 path_file_name = collection.collector.name() + '.path'
709 path_file_name = path_output_dir / path_file_name
710 # Create empty path and add collector to it
711 coll_path = create_path()
712 coll_path.add_module(collection.collector)
713 # Dump the basf2 path to file
714 with open(path_file_name, 'bw') as serialized_path_file:
715 pickle.dump(serialize_path(coll_path), serialized_path_file)
716 # Return the pickle file path for addition to the input sandbox
717 return str(path_file_name.absolute())
718

◆ _make_output_dir()

_make_output_dir ( self)
protected
Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
Also creates s

Definition at line 693 of file state_machines.py.

693 def _make_output_dir(self):
694 """
695 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
696 Also creates s
697 """
698 create_directories(self.root_dir, overwrite=False)
699

◆ _make_pre_collector_path()

_make_pre_collector_path ( self,
name,
collection )
protected
Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.

Definition at line 719 of file state_machines.py.

719 def _make_pre_collector_path(self, name, collection):
720 """
721 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
722 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
723 """
724 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
725 coll_path = collection.pre_collector_path
726 path_file_name = 'pre_collector.path'
727 path_file_name = os.path.join(path_output_dir, path_file_name)
728 # Dump the basf2 path to file
729 with open(path_file_name, 'bw') as serialized_path_file:
730 pickle.dump(serialize_path(coll_path), serialized_path_file)
731 # Return the pickle file path for addition to the input sandbox
732 return path_file_name
733

◆ _no_require_iteration()

_no_require_iteration ( self)
protected
 

Definition at line 635 of file state_machines.py.

635 def _no_require_iteration(self):
636 """
637 """
638 if self._require_iteration() and self._below_max_iterations():
639 return False
640 elif self._require_iteration() and not self._below_max_iterations():
641 B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
642 return True
643 elif not self._require_iteration():
644 return True
645

◆ _prepare_final_db()

_prepare_final_db ( self)
protected
Take the last iteration's outputdb and copy it to a more easily findable place.

Definition at line 921 of file state_machines.py.

921 def _prepare_final_db(self):
922 """
923 Take the last iteration's outputdb and copy it to a more easily findable place.
924 """
925 database_location = self.root_dir.joinpath(str(self.iteration),
926 self.calibration.alg_output_dir,
927 'outputdb')
928 final_database_location = self.root_dir.joinpath('outputdb')
929 if final_database_location.exists():
930 B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
931 shutil.rmtree(final_database_location)
932 shutil.copytree(database_location, final_database_location)
933
934

◆ _recover_collector_jobs()

_recover_collector_jobs ( self)
protected
Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.

Definition at line 519 of file state_machines.py.

519 def _recover_collector_jobs(self):
520 """
521 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
522 """
523 for collection_name, collection in self.calibration.collections.items():
524 output_file = self.root_dir.joinpath(str(self.iteration),
525 self.collector_input_dir,
526 collection_name,
527 collection.job_config)
528 self._collector_jobs[collection_name] = Job.from_json(output_file)
529

◆ _require_iteration()

_require_iteration ( self)
protected
 

Definition at line 646 of file state_machines.py.

646 def _require_iteration(self):
647 """
648 """
649 iteration_called = False
650 for alg_name, results in self._algorithm_results[self.iteration].items():
651 for result in results:
652 if result.result == CalibrationAlgorithm.c_Iterate:
653 iteration_called = True
654 break
655 if iteration_called:
656 break
657 return iteration_called
658

◆ _resolve_file_paths()

_resolve_file_paths ( self)
protected
 

Definition at line 540 of file state_machines.py.

540 def _resolve_file_paths(self):
541 """
542 """
543 pass
544

◆ _run_algorithms()

_run_algorithms ( self)
protected
Runs the Calibration Algorithms for this calibration machine.

Will run them sequentially locally (possible benefits to using a
processing pool for low memory algorithms later on.)

Definition at line 857 of file state_machines.py.

857 def _run_algorithms(self):
858 """
859 Runs the Calibration Algorithms for this calibration machine.
860
861 Will run them sequentially locally (possible benefits to using a
862 processing pool for low memory algorithms later on.)
863 """
864 # Get an instance of the Runner for these algorithms and run it
865 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
866 algs_runner.algorithms = self.calibration.algorithms
867 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
868 output_database_dir = algorithm_output_dir.joinpath("outputdb")
869 # Remove it, if we failed previously, to start clean
870 if algorithm_output_dir.exists():
871 B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
872 f"Deleting and recreating {algorithm_output_dir}.")
873 create_directories(algorithm_output_dir)
874 B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
875 algs_runner.output_database_dir = output_database_dir
876 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
877 input_files = []
878
879 for job in self._collector_jobs.values():
880 if job.subjobs:
881 for subjob in job.subjobs.values():
882 for pattern in subjob.output_patterns:
883 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
884 else:
885 for pattern in job.output_patterns:
886 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
887
888 algs_runner.input_files = input_files
889
890 # Add any user defined database chain for this calibration
891 algs_runner.database_chain = self.calibration.database_chain
892
893 # Here we add the finished databases of previous calibrations that we depend on.
894 # We can assume that the databases exist as we can't be here until they have returned
895 list_dependent_databases = []
896 for dependency in self.calibration.dependencies:
897 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
898 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
899 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
900
901 # Add previous iteration databases from this calibration
902 if self.iteration > 0:
903 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
904 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
905 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
906 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
907 algs_runner.dependent_databases = list_dependent_databases
908
909 algs_runner.ignored_runs = self.calibration.ignored_runs
910
911 try:
912 algs_runner.run(self.iov_to_calibrate, self.iteration)
913 except Exception as err:
914 print(err)
915 # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
916 # results. But here we had an actual exception so we just force into failure instead.
917 self._state = State("algorithms_failed")
918 self._algorithm_results[self.iteration] = algs_runner.results
919 self._runner_final_state = algs_runner.final_state
920

◆ _runner_failed()

_runner_failed ( self)
protected
Returns:
    bool: If AlgorithmsRunner failed return True.

Definition at line 602 of file state_machines.py.

602 def _runner_failed(self):
603 """
604 Returns:
605 bool: If AlgorithmsRunner failed return True.
606 """
607 if self._runner_final_state == AlgorithmsRunner.FAILED:
608 return True
609 else:
610 return False
611

◆ _runner_not_failed()

_runner_not_failed ( self)
protected
Returns:
    bool: If AlgorithmsRunner succeeded return True.

Definition at line 595 of file state_machines.py.

595 def _runner_not_failed(self):
596 """
597 Returns:
598 bool: If AlgorithmsRunner succeeded return True.
599 """
600 return not self._runner_failed()
601

◆ _submit_collections()

_submit_collections ( self)
protected
 

Definition at line 628 of file state_machines.py.

628 def _submit_collections(self):
629 """
630 """
631 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
632 self._collector_timing["start"] = time.time()
633 self._collector_timing["last_update"] = time.time()
634

◆ _trigger()

_trigger ( self,
transition_name,
transition_dict,
** kwargs )
protectedinherited
Runs the transition logic. Callbacks are evaluated in the order:
conditions -> before -> <new state set here> -> after.

Definition at line 320 of file state_machines.py.

320 def _trigger(self, transition_name, transition_dict, **kwargs):
321 """
322 Runs the transition logic. Callbacks are evaluated in the order:
323 conditions -> before -> <new state set here> -> after.
324 """
325 dest, conditions, before_callbacks, after_callbacks = (
326 transition_dict["dest"],
327 transition_dict["conditions"],
328 transition_dict["before"],
329 transition_dict["after"]
330 )
331 # Returns True only if every condition returns True when called
332 if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
333 for before_func in before_callbacks:
334 self._callback(before_func, **kwargs)
335
336 self.state = dest
337 for after_func in after_callbacks:
338 self._callback(after_func, **kwargs)
339 else:
340 raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
341 "evaluated False")
342

◆ _update_cal_state()

_update_cal_state ( self,
** kwargs )
protected
update calibration state

Definition at line 485 of file state_machines.py.

485 def _update_cal_state(self, **kwargs):
486 """update calibration state"""
487 self.calibration.state = str(kwargs["new_state"])
488

◆ add_state()

add_state ( self,
state,
enter = None,
exit = None )
inherited
Adds a single state to the list of possible ones.
Should be a unique string or a State object with a unique name.

Definition at line 193 of file state_machines.py.

193 def add_state(self, state, enter=None, exit=None):
194 """
195 Adds a single state to the list of possible ones.
196 Should be a unique string or a State object with a unique name.
197 """
198 if isinstance(state, str):
199 self.add_state(State(state, enter, exit))
200 elif isinstance(state, State):
201 if state.name not in self.states.keys():
202 self.states[state.name] = state
203 else:
204 B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
205 else:
206 B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
207

◆ add_transition()

add_transition ( self,
trigger,
source,
dest,
conditions = None,
before = None,
after = None )
inherited
Adds a single transition to the dictionary of possible ones.
Trigger is the method name that begins the transition between the
source state and the destination state.

The condition is an optional function that returns True or False
depending on the current state/input.

Definition at line 264 of file state_machines.py.

264 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
265 """
266 Adds a single transition to the dictionary of possible ones.
267 Trigger is the method name that begins the transition between the
268 source state and the destination state.
269
270 The condition is an optional function that returns True or False
271 depending on the current state/input.
272 """
273 transition_dict = {}
274 try:
275 source = self.states[source]
276 dest = self.states[dest]
277 transition_dict["source"] = source
278 transition_dict["dest"] = dest
279 except KeyError as err:
280 B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
281 raise err
282 if conditions:
283 if isinstance(conditions, (list, tuple, set)):
284 transition_dict["conditions"] = list(conditions)
285 else:
286 transition_dict["conditions"] = [conditions]
287 else:
288 transition_dict["conditions"] = [Machine.default_condition]
289
290 if not before:
291 before = []
292 if isinstance(before, (list, tuple, set)):
293 transition_dict["before"] = list(before)
294 else:
295 transition_dict["before"] = [before]
296
297 if not after:
298 after = []
299 if isinstance(after, (list, tuple, set)):
300 transition_dict["after"] = list(after)
301 else:
302 transition_dict["after"] = [after]
303
304 self.transitions[trigger].append(transition_dict)
305

◆ automatic_transition()

automatic_transition ( self)
Automatically try all transitions out of this state once. Tries fail last.

Definition at line 675 of file state_machines.py.

675 def automatic_transition(self):
676 """
677 Automatically try all transitions out of this state once. Tries fail last.
678 """
679 possible_transitions = self.get_transitions(self.state)
680 for transition in possible_transitions:
681 try:
682 if transition != "fail":
683 getattr(self, transition)()
684 break
685 except ConditionError:
686 continue
687 else:
688 if "fail" in possible_transitions:
689 getattr(self, "fail")()
690 else:
691 raise MachineError(f"Failed to automatically transition out of {self.state} state.")
692

◆ default_condition()

default_condition ( ** kwargs)
staticinherited
Method to always return True.

Definition at line 258 of file state_machines.py.

258 def default_condition(**kwargs):
259 """
260 Method to always return True.
261 """
262 return True
263

◆ dependencies_completed()

dependencies_completed ( self)
Condition function to check that the dependencies of our calibration are in the 'completed' state.
Technically only need to check explicit dependencies.

Definition at line 664 of file state_machines.py.

664 def dependencies_completed(self):
665 """
666 Condition function to check that the dependencies of our calibration are in the 'completed' state.
667 Technically only need to check explicit dependencies.
668 """
669 for calibration in self.calibration.dependencies:
670 if not calibration.state == calibration.end_state:
671 return False
672 else:
673 return True
674

◆ files_containing_iov()

files_containing_iov ( self,
file_paths,
files_to_iovs,
iov )
Lookup function that returns all files from the file_paths that
overlap with this IoV.

Definition at line 489 of file state_machines.py.

489 def files_containing_iov(self, file_paths, files_to_iovs, iov):
490 """
491 Lookup function that returns all files from the file_paths that
492 overlap with this IoV.
493 """
494 # Files that contain an Exp,Run range that overlaps with given IoV
495 overlapping_files = set()
496
497 for file_path, file_iov in files_to_iovs.items():
498 if file_iov.overlaps(iov) and (file_path in file_paths):
499 overlapping_files.add(file_path)
500 return overlapping_files
501

◆ get_transition_dict()

get_transition_dict ( self,
state,
transition )
inherited
Returns the transition dictionary for a state and transition out of it.

Definition at line 361 of file state_machines.py.

361 def get_transition_dict(self, state, transition):
362 """
363 Returns the transition dictionary for a state and transition out of it.
364 """
365 transition_dicts = self.transitions[transition]
366 for transition_dict in transition_dicts:
367 if transition_dict["source"] == state:
368 return transition_dict
369 else:
370 raise KeyError(f"No transition from state {state} with the name {transition}.")
371

◆ get_transitions()

get_transitions ( self,
source )
inherited
Returns allowed transitions from a given state.

Definition at line 350 of file state_machines.py.

350 def get_transitions(self, source):
351 """
352 Returns allowed transitions from a given state.
353 """
354 possible_transitions = []
355 for transition, transition_dicts in self.transitions.items():
356 for transition_dict in transition_dicts:
357 if transition_dict["source"] == source:
358 possible_transitions.append(transition)
359 return possible_transitions
360

◆ initial_state() [1/2]

initial_state ( self)
inherited
The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.

Definition at line 209 of file state_machines.py.

209 def initial_state(self):
210 """
211 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
212 """
213 return self._initial_state
214

◆ initial_state() [2/2]

initial_state ( self,
state )
inherited
 

Definition at line 216 of file state_machines.py.

216 def initial_state(self, state):
217 """
218 """
219 if state in self.states.keys():
220 self._initial_state = self.states[state]
221
222 self._state = self.states[state]
223 else:
224 raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
225

◆ save_graph()

save_graph ( self,
filename,
graphname )
inherited
Does a simple dot file creation to visualise states and transiitons.

Definition at line 372 of file state_machines.py.

372 def save_graph(self, filename, graphname):
373 """
374 Does a simple dot file creation to visualise states and transiitons.
375 """
376 with open(filename, "w") as dotfile:
377 dotfile.write("digraph " + graphname + " {\n")
378 for state in self.states.keys():
379 dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
380 for trigger, transition_dicts in self.transitions.items():
381 for transition in transition_dicts:
382 dotfile.write('"' + transition["source"].name + '" -> "' +
383 transition["dest"].name + '" [label="' + trigger + '"]\n')
384 dotfile.write("}\n")
385
386

◆ state() [1/2]

state ( self)
inherited
        The current state of the machine. Actually a `property` decorator. It will call the exit method of the
        current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
        either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).

Definition at line 227 of file state_machines.py.

227 def state(self):
228 """
229 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
230 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
231 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
232 """
233 return self._state
234

◆ state() [2/2]

state ( self,
state )
inherited
 

Definition at line 236 of file state_machines.py.

236 def state(self, state):
237 """
238 """
239 if isinstance(state, str):
240 state_name = state
241 else:
242 state_name = state.name
243
244 try:
245 state = self.states[state_name]
246 # Run exit callbacks of current state
247 for callback in self.state.on_exit:
248 callback(prior_state=self.state, new_state=state)
249 # Run enter callbacks of new state
250 for callback in state.on_enter:
251 callback(prior_state=self.state, new_state=state)
252 # Set the state
253 self._state = state
254 except KeyError:
255 raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
256

Member Data Documentation

◆ _algorithm_results

dict _algorithm_results = {}
protected

Results of each iteration for all algorithms of this calibration.

Definition at line 438 of file state_machines.py.

◆ _collector_jobs

dict _collector_jobs = {}
protected

The collector jobs used for submission.

Definition at line 452 of file state_machines.py.

◆ _collector_timing

dict _collector_timing = {}
protected

Times of various useful updates to the collector job e.g.

start, elapsed, last update Used to periodically call update_status on the collector job and find out an overall number of jobs remaining + estimated remaining time

Definition at line 449 of file state_machines.py.

◆ _initial_state

dict _initial_state = State(initial_state)
protectedinherited

Actual attribute holding initial state for this machine.

Definition at line 186 of file state_machines.py.

◆ _runner_final_state

_runner_final_state = None
protected

Final state of the algorithm runner for the current iteration.

Definition at line 440 of file state_machines.py.

◆ _state

dict _state = self.initial_state
protectedinherited

Actual attribute holding the Current state.

Definition at line 189 of file state_machines.py.

◆ algorithm_output_dir

str algorithm_output_dir = 'algorithm_output'
static

output directory of algorithm

Definition at line 398 of file state_machines.py.

◆ calibration

calibration = calibration

Calibration object whose state we are modelling.

Definition at line 429 of file state_machines.py.

◆ collector_backend

collector_backend = None

Backend used for this calibration machine collector.

Definition at line 436 of file state_machines.py.

◆ collector_input_dir

str collector_input_dir = 'collector_input'
static

input directory of collector

Definition at line 394 of file state_machines.py.

◆ collector_output_dir

str collector_output_dir = 'collector_output'
static

output directory of collector

Definition at line 396 of file state_machines.py.

◆ default_states

default_states
Initial value:
= [State("init", enter=[self._update_cal_state,
self._log_new_state]),
State("running_collector", enter=[self._update_cal_state,
self._log_new_state]),
State("collector_failed", enter=[self._update_cal_state,
self._log_new_state]),
State("collector_completed", enter=[self._update_cal_state,
self._log_new_state]),
State("running_algorithms", enter=[self._update_cal_state,
self._log_new_state]),
State("algorithms_failed", enter=[self._update_cal_state,
self._log_new_state]),
State("algorithms_completed", enter=[self._update_cal_state,
self._log_new_state]),
State("completed", enter=[self._update_cal_state,
self._log_new_state]),
State("failed", enter=[self._update_cal_state,
self._log_new_state])
]

States that are defaults to the CalibrationMachine (could override later)

Definition at line 406 of file state_machines.py.

◆ initial_state

initial_state = initial_state
inherited

Pointless docstring since it's a property.

Definition at line 182 of file state_machines.py.

◆ iov_to_calibrate

iov_to_calibrate = iov_to_calibrate

IoV to be executed, currently will loop over all runs in IoV.

Definition at line 442 of file state_machines.py.

◆ iteration

iteration = iteration

Which iteration step are we in.

Definition at line 434 of file state_machines.py.

◆ root_dir

root_dir = Path(os.getcwd(), calibration.name)

root directory for this Calibration

Definition at line 444 of file state_machines.py.

◆ state

state = dest
inherited

Current State of machine.

Definition at line 336 of file state_machines.py.

◆ states

dict states = {}
inherited

Valid states for this machine.

Definition at line 176 of file state_machines.py.

◆ transitions

transitions = defaultdict(list)
inherited

Allowed transitions between states.

Definition at line 191 of file state_machines.py.


The documentation for this class was generated from the following file: