Belle II Software development
CalibrationMachine Class Reference
Inheritance diagram for CalibrationMachine:
Machine

Public Member Functions

 __init__ (self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0)
 
 files_containing_iov (self, file_paths, files_to_iovs, iov)
 
 dependencies_completed (self)
 
 automatic_transition (self)
 
 initial_state (self)
 
 initial_state (self, state)
 
 state (self)
 
 state (self, state)
 
 add_state (self, state, enter=None, exit=None)
 
 add_transition (self, trigger, source, dest, conditions=None, before=None, after=None)
 
 __getattr__ (self, name, **kwargs)
 
 get_transitions (self, source)
 
 get_transition_dict (self, state, transition)
 
 save_graph (self, filename, graphname)
 

Static Public Member Functions

 default_condition (**kwargs)
 

Public Attributes

list default_states
 States that are defaults to the CalibrationMachine (could override later)
 
 calibration = calibration
 Calibration object whose state we are modelling.
 
 iteration = iteration
 Which iteration step are we in.
 
 collector_backend = None
 Backend used for this calibration machine collector.
 
 iov_to_calibrate = iov_to_calibrate
 IoV to be executed, currently will loop over all runs in IoV.
 
 root_dir = Path(os.getcwd(), calibration.name)
 root directory for this Calibration
 
dict states = {}
 Valid states for this machine.
 
 initial_state = initial_state
 Pointless docstring since it's a property.
 
 transitions = defaultdict(list)
 Allowed transitions between states.
 
 state = dest
 Current State of machine.
 

Static Public Attributes

str collector_input_dir = 'collector_input'
 input directory of collector
 
str collector_output_dir = 'collector_output'
 output directory of collector
 
str algorithm_output_dir = 'algorithm_output'
 output directory of algorithm
 

Protected Member Functions

 _update_cal_state (self, **kwargs)
 
 _dump_job_config (self)
 
 _recover_collector_jobs (self)
 
 _iov_requested (self)
 
 _resolve_file_paths (self)
 
 _build_iov_dicts (self)
 
 _below_max_iterations (self)
 
 _increment_iteration (self)
 
 _collection_completed (self)
 
 _collection_failed (self)
 
 _runner_not_failed (self)
 
 _runner_failed (self)
 
 _collector_jobs_ready (self)
 
 _submit_collections (self)
 
 _no_require_iteration (self)
 
 _require_iteration (self)
 
 _log_new_state (self, **kwargs)
 
 _make_output_dir (self)
 
 _make_collector_path (self, name, collection)
 
 _make_pre_collector_path (self, name, collection)
 
 _create_collector_jobs (self)
 
 _check_valid_collector_output (self)
 
 _run_algorithms (self)
 
 _prepare_final_db (self)
 
 _trigger (self, transition_name, transition_dict, **kwargs)
 

Static Protected Member Functions

 _callback (func, **kwargs)
 

Protected Attributes

dict _algorithm_results = {}
 Results of each iteration for all algorithms of this calibration.
 
 _runner_final_state = None
 Final state of the algorithm runner for the current iteration.
 
dict _collector_timing = {}
 Times of various useful updates to the collector job e.g.
 
dict _collector_jobs = {}
 The collector jobs used for submission.
 
dict _initial_state = State(initial_state)
 Actual attribute holding initial state for this machine.
 
dict _state = self.initial_state
 Actual attribute holding the Current state.
 

Detailed Description

A state machine to handle `Calibration` objects and the flow of
processing for them.

Definition at line 383 of file state_machines.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
calibration,
iov_to_calibrate = None,
initial_state = "init",
iteration = 0 )
Takes a Calibration object from the caf framework and lets you
set the initial state.

Definition at line 396 of file state_machines.py.

396 def __init__(self, calibration, iov_to_calibrate=None, initial_state="init", iteration=0):
397 """
398 Takes a Calibration object from the caf framework and lets you
399 set the initial state.
400 """
401
402 self.default_states = [State("init", enter=[self._update_cal_state,
403 self._log_new_state]),
404 State("running_collector", enter=[self._update_cal_state,
405 self._log_new_state]),
406 State("collector_failed", enter=[self._update_cal_state,
407 self._log_new_state]),
408 State("collector_completed", enter=[self._update_cal_state,
409 self._log_new_state]),
410 State("running_algorithms", enter=[self._update_cal_state,
411 self._log_new_state]),
412 State("algorithms_failed", enter=[self._update_cal_state,
413 self._log_new_state]),
414 State("algorithms_completed", enter=[self._update_cal_state,
415 self._log_new_state]),
416 State("completed", enter=[self._update_cal_state,
417 self._log_new_state]),
418 State("failed", enter=[self._update_cal_state,
419 self._log_new_state])
420 ]
421
422 super().__init__(self.default_states, initial_state)
423
424
425 self.calibration = calibration
426 # Monkey Patching for the win!
427
428 self.calibration.machine = self
429
430 self.iteration = iteration
431
432 self.collector_backend = None
433
434 self._algorithm_results = {}
435
436 self._runner_final_state = None
437
438 self.iov_to_calibrate = iov_to_calibrate
439
440 self.root_dir = Path(os.getcwd(), calibration.name)
441
442
445 self._collector_timing = {}
446
447
448 self._collector_jobs = {}
449
450 self.add_transition("submit_collector", "init", "running_collector",
451 conditions=self.dependencies_completed,
452 before=[self._make_output_dir,
453 self._resolve_file_paths,
454 self._build_iov_dicts,
455 self._create_collector_jobs,
456 self._submit_collections,
457 self._dump_job_config])
458 self.add_transition("fail", "running_collector", "collector_failed",
459 conditions=self._collection_failed)
460 self.add_transition("complete", "running_collector", "collector_completed",
461 conditions=self._collection_completed)
462 self.add_transition("run_algorithms", "collector_completed", "running_algorithms",
463 before=self._check_valid_collector_output,
464 after=[self._run_algorithms,
465 self.automatic_transition])
466 self.add_transition("complete", "running_algorithms", "algorithms_completed",
467 after=self.automatic_transition,
468 conditions=self._runner_not_failed)
469 self.add_transition("fail", "running_algorithms", "algorithms_failed",
470 conditions=self._runner_failed)
471 self.add_transition("iterate", "algorithms_completed", "init",
472 conditions=[self._require_iteration,
473 self._below_max_iterations],
474 after=self._increment_iteration)
475 self.add_transition("finish", "algorithms_completed", "completed",
476 conditions=self._no_require_iteration,
477 before=self._prepare_final_db)
478 self.add_transition("fail_fully", "algorithms_failed", "failed")
479 self.add_transition("fail_fully", "collector_failed", "failed")
480

Member Function Documentation

◆ __getattr__()

__getattr__ ( self,
name,
** kwargs )
inherited
Allows us to create a new method for each trigger on the fly.
If there is no trigger name in the machine to match, then the normal
AttributeError is called.

Definition at line 302 of file state_machines.py.

302 def __getattr__(self, name, **kwargs):
303 """
304 Allows us to create a new method for each trigger on the fly.
305 If there is no trigger name in the machine to match, then the normal
306 AttributeError is called.
307 """
308 possible_transitions = self.get_transitions(self.state)
309 if name not in possible_transitions:
310 raise AttributeError(f"{name} does not exist in transitions for state {self.state}.")
311 transition_dict = self.get_transition_dict(self.state, name)
312 # \cond silence doxygen warning about _trigger
313 return partial(self._trigger, name, transition_dict, **kwargs)
314 # \endcond
315

◆ _below_max_iterations()

_below_max_iterations ( self)
protected
 

Definition at line 564 of file state_machines.py.

564 def _below_max_iterations(self):
565 """
566 """
567 return self.iteration < self.calibration.max_iterations
568

◆ _build_iov_dicts()

_build_iov_dicts ( self)
protected
Build IoV file dictionary for each collection if required.

Definition at line 541 of file state_machines.py.

541 def _build_iov_dicts(self):
542 """
543 Build IoV file dictionary for each collection if required.
544 """
545 iov_requested = self._iov_requested()
546 if iov_requested or self.calibration.ignored_runs:
547 for coll_name, collection in self.calibration.collections.items():
548 if not collection.files_to_iovs:
549 B2INFO("Creating IoV dictionaries to map files to (Exp,Run) ranges for"
550 f" Calibration '{self.calibration.name} and Collection '{coll_name}'."
551 " Filling dictionary from input file metadata."
552 " If this is slow, set the 'files_to_iovs' attribute of each Collection before running.")
553
554 files_to_iovs = {}
555 for file_path in collection.input_files:
556 files_to_iovs[file_path] = get_iov_from_file(file_path)
557 collection.files_to_iovs = files_to_iovs
558 else:
559 B2INFO("Using File to IoV mapping from 'files_to_iovs' attribute for "
560 f"Calibration '{self.calibration.name}' and Collection '{coll_name}'.")
561 else:
562 B2INFO("No File to IoV mapping required.")
563

◆ _callback()

_callback ( func,
** kwargs )
staticprotectedinherited
Calls a condition/before/after.. function using arguments passed (or not).

Definition at line 340 of file state_machines.py.

340 def _callback(func, **kwargs):
341 """
342 Calls a condition/before/after.. function using arguments passed (or not).
343 """
344 return func(**kwargs)
345

◆ _check_valid_collector_output()

_check_valid_collector_output ( self)
protected
check that collector output is valid

Definition at line 828 of file state_machines.py.

828 def _check_valid_collector_output(self):
829 """check that collector output is valid"""
830 B2INFO("Checking that Collector output exists for all collector jobs "
831 f"using {self.calibration.name}.output_patterns.")
832 if not self._collector_jobs:
833 B2INFO("We're restarting so we'll recreate the collector Job object.")
834 self._recover_collector_jobs()
835
836 for job in self._collector_jobs.values():
837 if not job.subjobs:
838 output_files = []
839 for pattern in job.output_patterns:
840 output_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
841 if not output_files:
842 raise MachineError("No output files from Collector Job")
843 else:
844 for subjob in job.subjobs.values():
845 output_files = []
846 for pattern in subjob.output_patterns:
847 output_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
848 if not output_files:
849 raise MachineError(f"No output files from Collector {subjob}")
850

◆ _collection_completed()

_collection_completed ( self)
protected
Did all the collections succeed?

Definition at line 575 of file state_machines.py.

575 def _collection_completed(self):
576 """
577 Did all the collections succeed?
578 """
579 B2DEBUG(29, "Checking for failed collector job.")
580 if self._collector_jobs_ready():
581 return all([job.status == "completed" for job in self._collector_jobs.values()])
582

◆ _collection_failed()

_collection_failed ( self)
protected
Did any of the collections fail?

Definition at line 583 of file state_machines.py.

583 def _collection_failed(self):
584 """
585 Did any of the collections fail?
586 """
587 B2DEBUG(29, "Checking for failed collector job.")
588 if self._collector_jobs_ready():
589 return any([job.status == "failed" for job in self._collector_jobs.values()])
590

◆ _collector_jobs_ready()

_collector_jobs_ready ( self)
protected
 

Definition at line 608 of file state_machines.py.

608 def _collector_jobs_ready(self):
609 """
610 """
611 since_last_update = time.time() - self._collector_timing["last_update"]
612 if since_last_update > self.calibration.collector_full_update_interval:
613 B2INFO("Updating full collector job statuses.")
614 for job in self._collector_jobs.values():
615 job.update_status()
616 self._collector_timing["last_update"] = time.time()
617 if job.subjobs:
618 num_completed = sum((subjob.status in subjob.exit_statuses) for subjob in job.subjobs.values())
619 total_subjobs = len(job.subjobs)
620 B2INFO(f"{num_completed}/{total_subjobs} Collector SubJobs finished in"
621 f" Calibration {self.calibration.name} Job {job.name}.")
622 return all([job.ready() for job in self._collector_jobs.values()])
623

◆ _create_collector_jobs()

_create_collector_jobs ( self)
protected
Creates a Job object for the collections of this iteration, ready for submission
to backend.

Definition at line 730 of file state_machines.py.

730 def _create_collector_jobs(self):
731 """
732 Creates a Job object for the collections of this iteration, ready for submission
733 to backend.
734 """
735 for collection_name, collection in self.calibration.collections.items():
736 iteration_dir = self.root_dir.joinpath(str(self.iteration))
737 job = Job('_'.join([self.calibration.name, collection_name, 'Iteration', str(self.iteration)]))
738 job.output_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
739 job.working_dir = iteration_dir.joinpath(self.collector_output_dir, collection_name)
740 # Remove previous failed attempt to avoid problems
741 if job.output_dir.exists():
742 B2INFO(f"Previous output directory for {self.calibration.name} collector {collection_name} exists."
743 f"Deleting {job.output_dir} before re-submitting.")
744 shutil.rmtree(job.output_dir)
745 job.cmd = collection.job_cmd
746 job.append_current_basf2_setup_cmds()
747 job.input_sandbox_files.append(collection.job_script)
748 collector_path_file = Path(self._make_collector_path(collection_name, collection))
749 job.input_sandbox_files.append(collector_path_file)
750 if collection.pre_collector_path:
751 pre_collector_path_file = Path(self._make_pre_collector_path(collection_name, collection))
752 job.input_sandbox_files.append(pre_collector_path_file)
753
754 # Want to figure out which local databases are required for this job and their paths
755 list_dependent_databases = []
756
757 # Here we add the finished databases of previous calibrations that we depend on.
758 # We can assume that the databases exist as we can't be here until they have returned
759 for dependency in self.calibration.dependencies:
760 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
761 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
762 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
763
764 # Add previous iteration databases from this calibration
765 if self.iteration > 0:
766 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
767 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
768 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
769 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
770
771 # Let's use a directory to store some files later for input to the collector jobs. Should already exist from
772 # collector path
773 input_data_directory = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, collection_name)
774
775 # Need to pass setup info to collector which would be tricky as arguments
776 # We make a dictionary and pass it in as json
777 job_config = {}
778 # Apply the user-set Calibration database chain to the base of the overall chain.
779 json_db_chain = []
780 for database in collection.database_chain:
781 if database.db_type == 'local':
782 json_db_chain.append(('local', (database.filepath.as_posix(), database.payload_dir.as_posix())))
783 elif database.db_type == 'central':
784 json_db_chain.append(('central', database.global_tag))
785 else:
786 raise ValueError(f"Unknown database type {database.db_type}.")
787 # CAF created ones for dependent calibrations and previous iterations of this calibration
788 for database in list_dependent_databases:
789 json_db_chain.append(('local', database))
790 job_config['database_chain'] = json_db_chain
791
792 job_config_file_path = input_data_directory.joinpath('collector_config.json').absolute()
793 with open(job_config_file_path, 'w') as job_config_file:
794 json.dump(job_config, job_config_file, indent=2)
795 job.input_sandbox_files.append(job_config_file_path)
796
797 # Define the input files
798 input_data_files = set(collection.input_files)
799 # Reduce the input data files to only those that overlap with the optional requested IoV
800 if self.iov_to_calibrate:
801 input_data_files = self.files_containing_iov(input_data_files,
802 collection.files_to_iovs,
803 self.iov_to_calibrate)
804 # Remove any files that ONLY contain runs from our optional ignored_runs list
805 files_to_ignore = set()
806 for exprun in self.calibration.ignored_runs:
807 for input_file in input_data_files:
808 file_iov = self.calibration.files_to_iovs[input_file]
809 if file_iov == exprun.make_iov():
810 B2INFO(f"You have asked for {exprun} to be ignored for Calibration '{self.calibration.name}'. "
811 f"Therefore the input file '{input_file}' from Collection '{collection_name}' "
812 "is being removed from input files list.")
813 files_to_ignore.add(input_file)
814 input_data_files.difference_update(files_to_ignore)
815
816 if not input_data_files:
817 raise MachineError(f"No valid input files for Calibration '{self.calibration.name}' "
818 f" and Collection '{collection_name}'.")
819 job.input_files = list(input_data_files)
820
821 job.splitter = collection.splitter
822 job.backend_args = collection.backend_args
823 # Output patterns to be returned from collector job
824 job.output_patterns = collection.output_patterns
825 B2DEBUG(20, f"Collector job for {self.calibration.name}:{collection_name}:\n{job}")
826 self._collector_jobs[collection_name] = job
827

◆ _dump_job_config()

_dump_job_config ( self)
protected
Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
later in case of failure.

Definition at line 498 of file state_machines.py.

498 def _dump_job_config(self):
499 """
500 Dumps the `Job` object for the collections to JSON files so that it's configuration can be recovered
501 later in case of failure.
502 """
503 # Wait for jobs (+subjobs) to be submitted so that all information is filled. Since the parent CAF object asynchronously
504 # submits the jobs this might need to wait a while.
505 while any(map(lambda j: j.status == "init", self._collector_jobs.values())):
506 B2DEBUG(29, "Some Collector Jobs still in 'init' state. Waiting...")
507 time.sleep(5)
508
509 for collection_name, job in self._collector_jobs.items():
510 collector_job_output_file_name = self.calibration.collections[collection_name].job_config
511 output_file = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir,
512 collection_name, collector_job_output_file_name)
513 job.dump_to_json(output_file)
514
STL class.

◆ _increment_iteration()

_increment_iteration ( self)
protected
 

Definition at line 569 of file state_machines.py.

569 def _increment_iteration(self):
570 """
571 """
572 self.iteration += 1
573 self.calibration.iteration = self.iteration
574

◆ _iov_requested()

_iov_requested ( self)
protected
 

Definition at line 526 of file state_machines.py.

526 def _iov_requested(self):
527 """
528 """
529 if self.iov_to_calibrate:
530 B2DEBUG(20, f"Overall IoV {self.iov_to_calibrate} requested for calibration: {self.calibration.name}.")
531 return True
532 else:
533 B2DEBUG(20, f"No overall IoV requested for calibration: {self.calibration.name}.")
534 return False
535

◆ _log_new_state()

_log_new_state ( self,
** kwargs )
protected
 

Definition at line 655 of file state_machines.py.

655 def _log_new_state(self, **kwargs):
656 """
657 """
658 B2INFO(f"Calibration Machine {self.calibration.name} moved to state {kwargs['new_state'].name}.")
659

◆ _make_collector_path()

_make_collector_path ( self,
name,
collection )
protected
Creates a basf2 path for the correct collector and serializes it in the
self.output_dir/<calibration_name>/<iteration>/paths directory

Definition at line 696 of file state_machines.py.

696 def _make_collector_path(self, name, collection):
697 """
698 Creates a basf2 path for the correct collector and serializes it in the
699 self.output_dir/<calibration_name>/<iteration>/paths directory
700 """
701 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
702 # Automatically overwrite any previous directory
703 create_directories(path_output_dir)
704 path_file_name = collection.collector.name() + '.path'
705 path_file_name = path_output_dir / path_file_name
706 # Create empty path and add collector to it
707 coll_path = create_path()
708 coll_path.add_module(collection.collector)
709 # Dump the basf2 path to file
710 with open(path_file_name, 'bw') as serialized_path_file:
711 pickle.dump(serialize_path(coll_path), serialized_path_file)
712 # Return the pickle file path for addition to the input sandbox
713 return str(path_file_name.absolute())
714

◆ _make_output_dir()

_make_output_dir ( self)
protected
Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
Also creates s

Definition at line 689 of file state_machines.py.

689 def _make_output_dir(self):
690 """
691 Creates the overall root directory of the Calibration. Will not overwrite if it already exists.
692 Also creates s
693 """
694 create_directories(self.root_dir, overwrite=False)
695

◆ _make_pre_collector_path()

_make_pre_collector_path ( self,
name,
collection )
protected
Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.

Definition at line 715 of file state_machines.py.

715 def _make_pre_collector_path(self, name, collection):
716 """
717 Creates a basf2 path for the collectors setup path (Collection.pre_collector_path) and serializes it in the
718 self.output_dir/<calibration_name>/<iteration>/<colector_output>/<name> directory.
719 """
720 path_output_dir = self.root_dir.joinpath(str(self.iteration), self.collector_input_dir, name)
721 coll_path = collection.pre_collector_path
722 path_file_name = 'pre_collector.path'
723 path_file_name = os.path.join(path_output_dir, path_file_name)
724 # Dump the basf2 path to file
725 with open(path_file_name, 'bw') as serialized_path_file:
726 pickle.dump(serialize_path(coll_path), serialized_path_file)
727 # Return the pickle file path for addition to the input sandbox
728 return path_file_name
729

◆ _no_require_iteration()

_no_require_iteration ( self)
protected
 

Definition at line 631 of file state_machines.py.

631 def _no_require_iteration(self):
632 """
633 """
634 if self._require_iteration() and self._below_max_iterations():
635 return False
636 elif self._require_iteration() and not self._below_max_iterations():
637 B2INFO(f"Reached maximum number of iterations ({self.calibration.max_iterations}), will complete now.")
638 return True
639 elif not self._require_iteration():
640 return True
641

◆ _prepare_final_db()

_prepare_final_db ( self)
protected
Take the last iteration's outputdb and copy it to a more easily findable place.

Definition at line 915 of file state_machines.py.

915 def _prepare_final_db(self):
916 """
917 Take the last iteration's outputdb and copy it to a more easily findable place.
918 """
919 database_location = self.root_dir.joinpath(str(self.iteration),
920 self.calibration.alg_output_dir,
921 'outputdb')
922 final_database_location = self.root_dir.joinpath('outputdb')
923 if final_database_location.exists():
924 B2INFO(f"Removing previous final output database for {self.calibration.name} before copying new one.")
925 shutil.rmtree(final_database_location)
926 shutil.copytree(database_location, final_database_location)
927
928

◆ _recover_collector_jobs()

_recover_collector_jobs ( self)
protected
Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.

Definition at line 515 of file state_machines.py.

515 def _recover_collector_jobs(self):
516 """
517 Recovers the `Job` object for the collector from a JSON file in the event that we are starting from a reset.
518 """
519 for collection_name, collection in self.calibration.collections.items():
520 output_file = self.root_dir.joinpath(str(self.iteration),
521 self.collector_input_dir,
522 collection_name,
523 collection.job_config)
524 self._collector_jobs[collection_name] = Job.from_json(output_file)
525

◆ _require_iteration()

_require_iteration ( self)
protected
 

Definition at line 642 of file state_machines.py.

642 def _require_iteration(self):
643 """
644 """
645 iteration_called = False
646 for alg_name, results in self._algorithm_results[self.iteration].items():
647 for result in results:
648 if result.result == CalibrationAlgorithm.c_Iterate:
649 iteration_called = True
650 break
651 if iteration_called:
652 break
653 return iteration_called
654

◆ _resolve_file_paths()

_resolve_file_paths ( self)
protected
 

Definition at line 536 of file state_machines.py.

536 def _resolve_file_paths(self):
537 """
538 """
539 pass
540

◆ _run_algorithms()

_run_algorithms ( self)
protected
Runs the Calibration Algorithms for this calibration machine.

Will run them sequentially locally (possible benefits to using a
processing pool for low memory algorithms later on.)

Definition at line 851 of file state_machines.py.

851 def _run_algorithms(self):
852 """
853 Runs the Calibration Algorithms for this calibration machine.
854
855 Will run them sequentially locally (possible benefits to using a
856 processing pool for low memory algorithms later on.)
857 """
858 # Get an instance of the Runner for these algorithms and run it
859 algs_runner = self.calibration.algorithms_runner(name=self.calibration.name)
860 algs_runner.algorithms = self.calibration.algorithms
861 algorithm_output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
862 output_database_dir = algorithm_output_dir.joinpath("outputdb")
863 # Remove it, if we failed previously, to start clean
864 if algorithm_output_dir.exists():
865 B2INFO(f"Output directory for {self.calibration.name} already exists from a previous CAF attempt. "
866 f"Deleting and recreating {algorithm_output_dir}.")
867 create_directories(algorithm_output_dir)
868 B2INFO(f"Output local database for {self.calibration.name} will be stored at {output_database_dir}.")
869 algs_runner.output_database_dir = output_database_dir
870 algs_runner.output_dir = self.root_dir.joinpath(str(self.iteration), self.calibration.alg_output_dir)
871 input_files = []
872
873 for job in self._collector_jobs.values():
874 if job.subjobs:
875 for subjob in job.subjobs.values():
876 for pattern in subjob.output_patterns:
877 input_files.extend(glob.glob(os.path.join(subjob.output_dir, pattern)))
878 else:
879 for pattern in job.output_patterns:
880 input_files.extend(glob.glob(os.path.join(job.output_dir, pattern)))
881
882 algs_runner.input_files = input_files
883
884 # Add any user defined database chain for this calibration
885 algs_runner.database_chain = self.calibration.database_chain
886
887 # Here we add the finished databases of previous calibrations that we depend on.
888 # We can assume that the databases exist as we can't be here until they have returned
889 list_dependent_databases = []
890 for dependency in self.calibration.dependencies:
891 database_dir = os.path.join(os.getcwd(), dependency.name, 'outputdb')
892 B2INFO(f"Adding local database from {dependency.name} for use by {self.calibration.name}.")
893 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
894
895 # Add previous iteration databases from this calibration
896 if self.iteration > 0:
897 previous_iteration_dir = self.root_dir.joinpath(str(self.iteration - 1))
898 database_dir = os.path.join(previous_iteration_dir, self.calibration.alg_output_dir, 'outputdb')
899 list_dependent_databases.append((os.path.join(database_dir, 'database.txt'), database_dir))
900 B2INFO(f"Adding local database from previous iteration of {self.calibration.name}.")
901 algs_runner.dependent_databases = list_dependent_databases
902
903 algs_runner.ignored_runs = self.calibration.ignored_runs
904
905 try:
906 algs_runner.run(self.iov_to_calibrate, self.iteration)
907 except Exception as err:
908 print(err)
909 # We directly set the state without triggering the transition because normally we fail based on checking the algorithm
910 # results. But here we had an actual exception so we just force into failure instead.
911 self._state = State("algorithms_failed")
912 self._algorithm_results[self.iteration] = algs_runner.results
913 self._runner_final_state = algs_runner.final_state
914

◆ _runner_failed()

_runner_failed ( self)
protected
Returns:
    bool: If AlgorithmsRunner failed return True.

Definition at line 598 of file state_machines.py.

598 def _runner_failed(self):
599 """
600 Returns:
601 bool: If AlgorithmsRunner failed return True.
602 """
603 if self._runner_final_state == AlgorithmsRunner.FAILED:
604 return True
605 else:
606 return False
607

◆ _runner_not_failed()

_runner_not_failed ( self)
protected
Returns:
    bool: If AlgorithmsRunner succeeded return True.

Definition at line 591 of file state_machines.py.

591 def _runner_not_failed(self):
592 """
593 Returns:
594 bool: If AlgorithmsRunner succeeded return True.
595 """
596 return not self._runner_failed()
597

◆ _submit_collections()

_submit_collections ( self)
protected
 

Definition at line 624 of file state_machines.py.

624 def _submit_collections(self):
625 """
626 """
627 self.calibration.jobs_to_submit.extend(list(self._collector_jobs.values()))
628 self._collector_timing["start"] = time.time()
629 self._collector_timing["last_update"] = time.time()
630

◆ _trigger()

_trigger ( self,
transition_name,
transition_dict,
** kwargs )
protectedinherited
Runs the transition logic. Callbacks are evaluated in the order:
conditions -> before -> <new state set here> -> after.

Definition at line 316 of file state_machines.py.

316 def _trigger(self, transition_name, transition_dict, **kwargs):
317 """
318 Runs the transition logic. Callbacks are evaluated in the order:
319 conditions -> before -> <new state set here> -> after.
320 """
321 dest, conditions, before_callbacks, after_callbacks = (
322 transition_dict["dest"],
323 transition_dict["conditions"],
324 transition_dict["before"],
325 transition_dict["after"]
326 )
327 # Returns True only if every condition returns True when called
328 if all(map(lambda condition: self._callback(condition, **kwargs), conditions)):
329 for before_func in before_callbacks:
330 self._callback(before_func, **kwargs)
331
332 self.state = dest
333 for after_func in after_callbacks:
334 self._callback(after_func, **kwargs)
335 else:
336 raise ConditionError(f"Transition '{transition_name}' called for but one or more conditions "
337 "evaluated False")
338

◆ _update_cal_state()

_update_cal_state ( self,
** kwargs )
protected
update calibration state

Definition at line 481 of file state_machines.py.

481 def _update_cal_state(self, **kwargs):
482 """update calibration state"""
483 self.calibration.state = str(kwargs["new_state"])
484

◆ add_state()

add_state ( self,
state,
enter = None,
exit = None )
inherited
Adds a single state to the list of possible ones.
Should be a unique string or a State object with a unique name.

Definition at line 189 of file state_machines.py.

189 def add_state(self, state, enter=None, exit=None):
190 """
191 Adds a single state to the list of possible ones.
192 Should be a unique string or a State object with a unique name.
193 """
194 if isinstance(state, str):
195 self.add_state(State(state, enter, exit))
196 elif isinstance(state, State):
197 if state.name not in self.states.keys():
198 self.states[state.name] = state
199 else:
200 B2WARNING(f"You asked to add a state {state} but it was already in the machine states.")
201 else:
202 B2WARNING(f"You asked to add a state {state} but it wasn't a State or str object")
203

◆ add_transition()

add_transition ( self,
trigger,
source,
dest,
conditions = None,
before = None,
after = None )
inherited
Adds a single transition to the dictionary of possible ones.
Trigger is the method name that begins the transition between the
source state and the destination state.

The condition is an optional function that returns True or False
depending on the current state/input.

Definition at line 260 of file state_machines.py.

260 def add_transition(self, trigger, source, dest, conditions=None, before=None, after=None):
261 """
262 Adds a single transition to the dictionary of possible ones.
263 Trigger is the method name that begins the transition between the
264 source state and the destination state.
265
266 The condition is an optional function that returns True or False
267 depending on the current state/input.
268 """
269 transition_dict = {}
270 try:
271 source = self.states[source]
272 dest = self.states[dest]
273 transition_dict["source"] = source
274 transition_dict["dest"] = dest
275 except KeyError as err:
276 B2WARNING("Tried to add a transition where the source or dest isn't in the list of states")
277 raise err
278 if conditions:
279 if isinstance(conditions, (list, tuple, set)):
280 transition_dict["conditions"] = list(conditions)
281 else:
282 transition_dict["conditions"] = [conditions]
283 else:
284 transition_dict["conditions"] = [Machine.default_condition]
285
286 if not before:
287 before = []
288 if isinstance(before, (list, tuple, set)):
289 transition_dict["before"] = list(before)
290 else:
291 transition_dict["before"] = [before]
292
293 if not after:
294 after = []
295 if isinstance(after, (list, tuple, set)):
296 transition_dict["after"] = list(after)
297 else:
298 transition_dict["after"] = [after]
299
300 self.transitions[trigger].append(transition_dict)
301

◆ automatic_transition()

automatic_transition ( self)
Automatically try all transitions out of this state once. Tries fail last.

Definition at line 671 of file state_machines.py.

671 def automatic_transition(self):
672 """
673 Automatically try all transitions out of this state once. Tries fail last.
674 """
675 possible_transitions = self.get_transitions(self.state)
676 for transition in possible_transitions:
677 try:
678 if transition != "fail":
679 getattr(self, transition)()
680 break
681 except ConditionError:
682 continue
683 else:
684 if "fail" in possible_transitions:
685 getattr(self, "fail")()
686 else:
687 raise MachineError(f"Failed to automatically transition out of {self.state} state.")
688

◆ default_condition()

default_condition ( ** kwargs)
staticinherited
Method to always return True.

Definition at line 254 of file state_machines.py.

254 def default_condition(**kwargs):
255 """
256 Method to always return True.
257 """
258 return True
259

◆ dependencies_completed()

dependencies_completed ( self)
Condition function to check that the dependencies of our calibration are in the 'completed' state.
Technically only need to check explicit dependencies.

Definition at line 660 of file state_machines.py.

660 def dependencies_completed(self):
661 """
662 Condition function to check that the dependencies of our calibration are in the 'completed' state.
663 Technically only need to check explicit dependencies.
664 """
665 for calibration in self.calibration.dependencies:
666 if not calibration.state == calibration.end_state:
667 return False
668 else:
669 return True
670

◆ files_containing_iov()

files_containing_iov ( self,
file_paths,
files_to_iovs,
iov )
Lookup function that returns all files from the file_paths that
overlap with this IoV.

Definition at line 485 of file state_machines.py.

485 def files_containing_iov(self, file_paths, files_to_iovs, iov):
486 """
487 Lookup function that returns all files from the file_paths that
488 overlap with this IoV.
489 """
490 # Files that contain an Exp,Run range that overlaps with given IoV
491 overlapping_files = set()
492
493 for file_path, file_iov in files_to_iovs.items():
494 if file_iov.overlaps(iov) and (file_path in file_paths):
495 overlapping_files.add(file_path)
496 return overlapping_files
497

◆ get_transition_dict()

get_transition_dict ( self,
state,
transition )
inherited
Returns the transition dictionary for a state and transition out of it.

Definition at line 357 of file state_machines.py.

357 def get_transition_dict(self, state, transition):
358 """
359 Returns the transition dictionary for a state and transition out of it.
360 """
361 transition_dicts = self.transitions[transition]
362 for transition_dict in transition_dicts:
363 if transition_dict["source"] == state:
364 return transition_dict
365 else:
366 raise KeyError(f"No transition from state {state} with the name {transition}.")
367

◆ get_transitions()

get_transitions ( self,
source )
inherited
Returns allowed transitions from a given state.

Definition at line 346 of file state_machines.py.

346 def get_transitions(self, source):
347 """
348 Returns allowed transitions from a given state.
349 """
350 possible_transitions = []
351 for transition, transition_dicts in self.transitions.items():
352 for transition_dict in transition_dicts:
353 if transition_dict["source"] == source:
354 possible_transitions.append(transition)
355 return possible_transitions
356

◆ initial_state() [1/2]

initial_state ( self)
inherited
The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.

Definition at line 205 of file state_machines.py.

205 def initial_state(self):
206 """
207 The initial state of the machine. Needs a special property to prevent trying to run on_enter callbacks when set.
208 """
209 return self._initial_state
210

◆ initial_state() [2/2]

initial_state ( self,
state )
inherited
 

Definition at line 212 of file state_machines.py.

212 def initial_state(self, state):
213 """
214 """
215 if state in self.states.keys():
216 self._initial_state = self.states[state]
217
218 self._state = self.states[state]
219 else:
220 raise KeyError(f"Attempted to set state to '{state}' which is not in the 'states' attribute!")
221

◆ save_graph()

save_graph ( self,
filename,
graphname )
inherited
Does a simple dot file creation to visualise states and transiitons.

Definition at line 368 of file state_machines.py.

368 def save_graph(self, filename, graphname):
369 """
370 Does a simple dot file creation to visualise states and transiitons.
371 """
372 with open(filename, "w") as dotfile:
373 dotfile.write("digraph " + graphname + " {\n")
374 for state in self.states.keys():
375 dotfile.write('"' + state + '" [shape=ellipse, color=black]\n')
376 for trigger, transition_dicts in self.transitions.items():
377 for transition in transition_dicts:
378 dotfile.write('"' + transition["source"].name + '" -> "' +
379 transition["dest"].name + '" [label="' + trigger + '"]\n')
380 dotfile.write("}\n")
381
382

◆ state() [1/2]

state ( self)
inherited
        The current state of the machine. Actually a `property` decorator. It will call the exit method of the
        current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
        either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).

Definition at line 223 of file state_machines.py.

223 def state(self):
224 """
225 The current state of the machine. Actually a `property` decorator. It will call the exit method of the
226 current state and enter method of the new one. To get around the behaviour e.g. for setting initial states,
227 either use the `initial_state` property or directly set the _state attribute itself (at your own risk!).
228 """
229 return self._state
230

◆ state() [2/2]

state ( self,
state )
inherited
 

Definition at line 232 of file state_machines.py.

232 def state(self, state):
233 """
234 """
235 if isinstance(state, str):
236 state_name = state
237 else:
238 state_name = state.name
239
240 try:
241 state = self.states[state_name]
242 # Run exit callbacks of current state
243 for callback in self.state.on_exit:
244 callback(prior_state=self.state, new_state=state)
245 # Run enter callbacks of new state
246 for callback in state.on_enter:
247 callback(prior_state=self.state, new_state=state)
248 # Set the state
249 self._state = state
250 except KeyError:
251 raise MachineError(f"Attempted to set state to '{state}' which not in the 'states' attribute!")
252

Member Data Documentation

◆ _algorithm_results

dict _algorithm_results = {}
protected

Results of each iteration for all algorithms of this calibration.

Definition at line 434 of file state_machines.py.

◆ _collector_jobs

dict _collector_jobs = {}
protected

The collector jobs used for submission.

Definition at line 448 of file state_machines.py.

◆ _collector_timing

dict _collector_timing = {}
protected

Times of various useful updates to the collector job e.g.

start, elapsed, last update Used to periodically call update_status on the collector job and find out an overall number of jobs remaining + estimated remaining time

Definition at line 445 of file state_machines.py.

◆ _initial_state

dict _initial_state = State(initial_state)
protectedinherited

Actual attribute holding initial state for this machine.

Definition at line 182 of file state_machines.py.

◆ _runner_final_state

_runner_final_state = None
protected

Final state of the algorithm runner for the current iteration.

Definition at line 436 of file state_machines.py.

◆ _state

dict _state = self.initial_state
protectedinherited

Actual attribute holding the Current state.

Definition at line 185 of file state_machines.py.

◆ algorithm_output_dir

str algorithm_output_dir = 'algorithm_output'
static

output directory of algorithm

Definition at line 394 of file state_machines.py.

◆ calibration

calibration = calibration

Calibration object whose state we are modelling.

Definition at line 425 of file state_machines.py.

◆ collector_backend

collector_backend = None

Backend used for this calibration machine collector.

Definition at line 432 of file state_machines.py.

◆ collector_input_dir

str collector_input_dir = 'collector_input'
static

input directory of collector

Definition at line 390 of file state_machines.py.

◆ collector_output_dir

str collector_output_dir = 'collector_output'
static

output directory of collector

Definition at line 392 of file state_machines.py.

◆ default_states

default_states
Initial value:
= [State("init", enter=[self._update_cal_state,
self._log_new_state]),
State("running_collector", enter=[self._update_cal_state,
self._log_new_state]),
State("collector_failed", enter=[self._update_cal_state,
self._log_new_state]),
State("collector_completed", enter=[self._update_cal_state,
self._log_new_state]),
State("running_algorithms", enter=[self._update_cal_state,
self._log_new_state]),
State("algorithms_failed", enter=[self._update_cal_state,
self._log_new_state]),
State("algorithms_completed", enter=[self._update_cal_state,
self._log_new_state]),
State("completed", enter=[self._update_cal_state,
self._log_new_state]),
State("failed", enter=[self._update_cal_state,
self._log_new_state])
]

States that are defaults to the CalibrationMachine (could override later)

Definition at line 402 of file state_machines.py.

◆ initial_state

initial_state = initial_state
inherited

Pointless docstring since it's a property.

Definition at line 178 of file state_machines.py.

◆ iov_to_calibrate

iov_to_calibrate = iov_to_calibrate

IoV to be executed, currently will loop over all runs in IoV.

Definition at line 438 of file state_machines.py.

◆ iteration

iteration = iteration

Which iteration step are we in.

Definition at line 430 of file state_machines.py.

◆ root_dir

root_dir = Path(os.getcwd(), calibration.name)

root directory for this Calibration

Definition at line 440 of file state_machines.py.

◆ state

state = dest
inherited

Current State of machine.

Definition at line 332 of file state_machines.py.

◆ states

dict states = {}
inherited

Valid states for this machine.

Definition at line 172 of file state_machines.py.

◆ transitions

transitions = defaultdict(list)
inherited

Allowed transitions between states.

Definition at line 187 of file state_machines.py.


The documentation for this class was generated from the following file: