Belle II Software development
SimpleRunByRun Class Reference
Inheritance diagram for SimpleRunByRun:
AlgorithmStrategy

Public Member Functions

 __init__ (self, algorithm)
 
 run (self, iov, iteration, queue)
 
 setup_from_dict (self, params)
 
 is_valid (self)
 
 find_iov_gaps (self)
 
 any_failed_iov (self)
 
 send_result (self, result)
 
 send_final_state (self, state)
 

Public Attributes

 machine = AlgorithmMachine(self.algorithm)
 :py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It gets setup properly in :py:func:run
 
 algorithm = algorithm
 Algorithm() class that we're running.
 
list input_files = []
 Collector output files, will contain all files returned by the output patterns.
 
str output_dir = ""
 The algorithm output directory which is mostly used to store the stdout file.
 
str output_database_dir = ""
 The output database directory for the localdb that the algorithm will commit to.
 
list database_chain = []
 User defined database chain i.e.
 
list dependent_databases = []
 CAF created local databases from previous calibrations that this calibration/algorithm depends on.
 
list ignored_runs = []
 Runs that will not be included in ANY execution of the algorithm.
 
list results = []
 The list of results objects which will be sent out before the end.
 
 queue = None
 The multiprocessing Queue we use to pass back results one at a time.
 

Static Public Attributes

dict usable_params = {}
 The params that you could set on the Algorithm object which this Strategy would use.
 
list required_attrs
 Required attributes that must exist before the strategy can run properly.
 
list required_true_attrs
 Attributes that must have a value that returns True when tested by :py:meth:is_valid.
 
list allowed_granularities = ["run", "all"]
 Granularity of collector that can be run by this algorithm properly.
 
str FINISHED_RESULTS = "DONE"
 Signal value that is put into the Queue when there are no more results left.
 
str COMPLETED = "COMPLETED"
 Completed state.
 
str FAILED = "FAILED"
 Failed state.
 

Detailed Description

Algorithm strategy to do run-by-run calibration of collected data.
Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.

This strategy differs from `SequentialRunByRun` in that it *will not merge run data* if the algorithm returns
'not enough data' on the current run.

Once an execution on a run returns *any* result 'iterate', 'ok', 'not_enough_data', or 'failure', we move onto the
next run (if any are left).
Committing of payloads to the outputdb only happens for 'iterate' or 'ok' return codes.

.. important:: Unlike most other strategies, this one won't immediately fail and return if a run returns a 'failure' exit
               code.
               The failure will prevent iteration/successful completion of the CAF though.

.. warning:: Since this strategy doesn't try to merge data from runs, if *any* run in your input data doesn't contain
             enough data to complete the algorithm successfully, you won't be able to get a successful calibration.
             The CAF then won't allow you to iterate this calibration, or pass the constants onward to another calibration.
             However, you will still have the database created that covers all the successful runs.

This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
a CalibrationAlgorithm C++ class directly.

Definition at line 541 of file strategies.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
algorithm )
 

Definition at line 571 of file strategies.py.

571 def __init__(self, algorithm):
572 """
573 """
574 super().__init__(algorithm)
575
577 self.machine = AlgorithmMachine(self.algorithm)
578

Member Function Documentation

◆ any_failed_iov()

any_failed_iov ( self)
inherited
Returns:
    bool: If any result in the current results list has a failed algorithm code we return True

Definition at line 152 of file strategies.py.

152 def any_failed_iov(self):
153 """
154 Returns:
155 bool: If any result in the current results list has a failed algorithm code we return True
156 """
157 failed_results = []
158 for result in self.results:
159 if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value:
160 failed_results.append(result)
161 if failed_results:
162 B2WARNING("Failed results found.")
163 for result in failed_results:
164 if result.result == AlgResult.failure.value:
165 B2ERROR(f"c_Failure returned for {result.iov}.")
166 elif result.result == AlgResult.not_enough_data.value:
167 B2WARNING(f"c_NotEnoughData returned for {result.iov}.")
168 return True
169 else:
170 return False
171

◆ find_iov_gaps()

find_iov_gaps ( self)
inherited
Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
within the same experiment are found.

Returns:
    iov_gaps(list[IoV])

Definition at line 132 of file strategies.py.

132 def find_iov_gaps(self):
133 """
134 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
135 not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
136 within the same experiment are found.
137
138 Returns:
139 iov_gaps(list[IoV])
140 """
141 iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.results]))
142 if iov_gaps:
143 gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."]
144 gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.")
145 gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs")
146 gap_msg.append("unless you edit the final database.txt yourself.")
147 B2INFO_MULTILINE(gap_msg)
148 for iov in iov_gaps:
149 B2INFO(f"{iov} not covered by any execution of the algorithm.")
150 return iov_gaps
151

◆ is_valid()

is_valid ( self)
inherited
Returns:
    bool: Whether or not this strategy has been set up correctly with all its necessary attributes.

Definition at line 114 of file strategies.py.

114 def is_valid(self):
115 """
116 Returns:
117 bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
118 """
119 B2INFO("Checking validity of current AlgorithmStrategy setup.")
120 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
121 for attribute_name in self.required_attrs:
122 if not hasattr(self, attribute_name):
123 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
124 return False
125 # Check if any attributes that need actual values haven't been set or were empty
126 for attribute_name in self.required_true_attrs:
127 if not getattr(self, attribute_name):
128 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.")
129 return False
130 return True
131

◆ run()

run ( self,
iov,
iteration,
queue )
Runs the algorithm machine over the collected data and fills the results.

Reimplemented from AlgorithmStrategy.

Definition at line 579 of file strategies.py.

579 def run(self, iov, iteration, queue):
580 """
581 Runs the algorithm machine over the collected data and fills the results.
582 """
583
584 if not self.is_valid():
585 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
586 self.queue = queue
587
588 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
589 # Now add all the necessary parameters for a strategy to run
590 machine_params = {}
591 machine_params["database_chain"] = self.database_chain
592 machine_params["dependent_databases"] = self.dependent_databases
593 machine_params["output_dir"] = self.output_dir
594 machine_params["output_database_dir"] = self.output_database_dir
595 machine_params["input_files"] = self.input_files
596 machine_params["ignored_runs"] = self.ignored_runs
597 self.machine.setup_from_dict(machine_params)
598 # Start moving through machine states
599 B2INFO(f"Starting AlgorithmMachine of {self.algorithm.name}.")
600 self.machine.setup_algorithm(iteration=iteration)
601 # After this point, the logging is in the stdout of the algorithm
602 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
603
604 all_runs_collected = set(runs_from_vector(self.algorithm.algorithm.getRunListFromAllData()))
605 # If we were given a specific IoV to calibrate we just execute all runs in that IoV at once
606 if iov:
607 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
608 else:
609 runs_to_execute = all_runs_collected
610
611 # Remove the ignored runs from our run list to execute
612 if self.ignored_runs:
613 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
614 runs_to_execute.difference_update(set(self.ignored_runs))
615 # Sets aren't ordered so lets go back to lists and sort
616 runs_to_execute = sorted(runs_to_execute)
617
618 # Is this the first time executing the algorithm?
619 first_execution = True
620 for exprun in runs_to_execute:
621 if not first_execution:
622 self.machine.setup_algorithm()
623 current_runs = exprun
624 apply_iov = iov_from_runs([current_runs])
625 B2INFO(f"Executing on IoV = {apply_iov}.")
626 self.machine.execute_runs(runs=[current_runs], iteration=iteration, apply_iov=apply_iov)
627 first_execution = False
628 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
629 # Does this count as a successful execution?
630 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
631 # Commit the payloads and result
632 B2INFO(f"Committing payloads for {iov_from_runs([current_runs])}.")
633 self.machine.algorithm.algorithm.commit()
634 self.results.append(self.machine.result)
635 self.send_result(self.machine.result)
636 self.machine.complete()
637 # If it wasn't successful, was it due to lack of data in the runs?
638 elif (self.machine.result.result == AlgResult.not_enough_data.value):
639 B2INFO(f"There wasn't enough data in the IoV {iov_from_runs([current_runs])}.")
640 self.results.append(self.machine.result)
641 self.send_result(self.machine.result)
642 self.machine.fail()
643 elif self.machine.result.result == AlgResult.failure.value:
644 B2ERROR(f"Failure exit code in the IoV {iov_from_runs([current_runs])}.")
645 self.results.append(self.machine.result)
646 self.send_result(self.machine.result)
647 self.machine.fail()
648
649 # Print any knowable gaps between result IoVs, if any are foun there is a problem.
650 gaps = self.find_iov_gaps()
651 # Dump them to a file for logging
652 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
653 json.dump(gaps, f)
654
655 self.send_final_state(self.COMPLETED)
656
657

◆ send_final_state()

send_final_state ( self,
state )
inherited
send final state

Definition at line 176 of file strategies.py.

176 def send_final_state(self, state):
177 """send final state"""
178 self.queue.put({"type": "final_state", "value": state})
179
180

◆ send_result()

send_result ( self,
result )
inherited
send result

Definition at line 172 of file strategies.py.

172 def send_result(self, result):
173 """send result"""
174 self.queue.put({"type": "result", "value": result})
175

◆ setup_from_dict()

setup_from_dict ( self,
params )
inherited
Parameters:
    params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.

Definition at line 106 of file strategies.py.

106 def setup_from_dict(self, params):
107 """
108 Parameters:
109 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
110 """
111 for attribute_name, value in params.items():
112 setattr(self, attribute_name, value)
113

Member Data Documentation

◆ algorithm

algorithm = algorithm
inherited

Algorithm() class that we're running.

Definition at line 80 of file strategies.py.

◆ allowed_granularities

list allowed_granularities = ["run", "all"]
staticinherited

Granularity of collector that can be run by this algorithm properly.

Definition at line 65 of file strategies.py.

◆ COMPLETED

str COMPLETED = "COMPLETED"
staticinherited

Completed state.

Definition at line 71 of file strategies.py.

◆ database_chain

list database_chain = []
inherited

User defined database chain i.e.

the default global tag, or if you have localdb's/tags for custom alignment etc

Definition at line 88 of file strategies.py.

◆ dependent_databases

list dependent_databases = []
inherited

CAF created local databases from previous calibrations that this calibration/algorithm depends on.

Definition at line 90 of file strategies.py.

◆ FAILED

str FAILED = "FAILED"
staticinherited

Failed state.

Definition at line 74 of file strategies.py.

◆ FINISHED_RESULTS

str FINISHED_RESULTS = "DONE"
staticinherited

Signal value that is put into the Queue when there are no more results left.

Definition at line 68 of file strategies.py.

◆ ignored_runs

list ignored_runs = []
inherited

Runs that will not be included in ANY execution of the algorithm.

Usually set by Calibration.ignored_runs. The different strategies may handle the resulting run gaps differently.

Definition at line 93 of file strategies.py.

◆ input_files

list input_files = []
inherited

Collector output files, will contain all files returned by the output patterns.

Definition at line 82 of file strategies.py.

◆ machine

machine = AlgorithmMachine(self.algorithm)

:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It gets setup properly in :py:func:run

Definition at line 577 of file strategies.py.

◆ output_database_dir

str output_database_dir = ""
inherited

The output database directory for the localdb that the algorithm will commit to.

Definition at line 86 of file strategies.py.

◆ output_dir

str output_dir = ""
inherited

The algorithm output directory which is mostly used to store the stdout file.

Definition at line 84 of file strategies.py.

◆ queue

queue = None
inherited

The multiprocessing Queue we use to pass back results one at a time.

Definition at line 97 of file strategies.py.

◆ required_attrs

list required_attrs
staticinherited
Initial value:
= ["algorithm",
"database_chain",
"dependent_databases",
"output_dir",
"output_database_dir",
"input_files",
"ignored_runs"
]

Required attributes that must exist before the strategy can run properly.

Some are allowed be values that return False when tested e.g. "" or []

Definition at line 48 of file strategies.py.

◆ required_true_attrs

list required_true_attrs
staticinherited
Initial value:
= ["algorithm",
"output_dir",
"output_database_dir",
"input_files"
]

Attributes that must have a value that returns True when tested by :py:meth:is_valid.

Definition at line 58 of file strategies.py.

◆ results

list results = []
inherited

The list of results objects which will be sent out before the end.

Definition at line 95 of file strategies.py.

◆ usable_params

dict usable_params = {}
static

The params that you could set on the Algorithm object which this Strategy would use.

Just here for documentation reasons.

Definition at line 569 of file strategies.py.


The documentation for this class was generated from the following file: