Belle II Software development
SeqAlgorithmsRunner Class Reference
Inheritance diagram for SeqAlgorithmsRunner:
AlgorithmsRunner Runner

Public Member Functions

 __init__ (self, name)
 
 run (self, iov, iteration)
 

Public Attributes

 name = name
 The name of this runner instance.
 
list input_files = []
 All of the output files made by the collector job and recovered by the "output_patterns".
 
list database_chain = []
 User set databases, can be used to apply your own constants and global tags.
 
list dependent_databases = []
 List of local databases created by previous CAF calibrations/iterations.
 
str output_database_dir = ""
 The directory of the local database we use to store algorithm payloads from this execution.
 
dict results = {}
 Algorithm results from each algorithm we execute.
 
 final_state = None
 Final state of runner.
 
 algorithms = None
 The list of algorithms that this runner executes.
 
str output_dir = ""
 Output directory of these algorithms, for logging mostly.
 

Static Public Attributes

str FAILED = "FAILED"
 failed
 
str COMPLETED = "COMPLETED"
 completed
 

Static Protected Member Functions

 _run_strategy (strategy, iov, iteration, queue)
 

Detailed Description

 

Definition at line 94 of file runners.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
name )
 

Definition at line 98 of file runners.py.

98 def __init__(self, name):
99 """
100 """
101 super().__init__(name)
102

Member Function Documentation

◆ _run_strategy()

_run_strategy ( strategy,
iov,
iteration,
queue )
staticprotected
Runs the AlgorithmStrategy sends back the results

Definition at line 187 of file runners.py.

187 def _run_strategy(strategy, iov, iteration, queue):
188 """Runs the AlgorithmStrategy sends back the results"""
189 strategy.run(iov, iteration, queue)
190 # Get the return codes of the algorithm for the IoVs found by the Process
191 B2INFO(f"Finished Strategy for {strategy.algorithm.name}.")
192
193

◆ run()

run ( self,
iov,
iteration )
 

Reimplemented from Runner.

Definition at line 103 of file runners.py.

103 def run(self, iov, iteration):
104 """
105 """
106 from caf.strategies import AlgorithmStrategy
107 B2INFO(f"SequentialAlgorithmsRunner begun for Calibration {self.name}.")
108 # First we do the setup of algorithm strategies
109 strategies = []
110 for algorithm in self.algorithms:
111 # Need to create an instance of the requested strategy and set the attributes
112 strategy = algorithm.strategy(algorithm)
113 # Now add all the necessary parameters for a strategy to run
114 strategy_params = {}
115 strategy_params["database_chain"] = self.database_chain
116 strategy_params["dependent_databases"] = self.dependent_databases
117 strategy_params["output_dir"] = self.output_dir
118 strategy_params["output_database_dir"] = self.output_database_dir
119 strategy_params["input_files"] = self.input_files
120 strategy_params["ignored_runs"] = self.ignored_runs
121 strategy.setup_from_dict(strategy_params)
122 strategies.append(strategy)
123
124 # We then fork off a copy of this python process so that we don't affect the original with logging changes
125 ctx = multiprocessing.get_context("fork")
126 for strategy in strategies:
127 queue = multiprocessing.SimpleQueue()
128 child = ctx.Process(target=SeqAlgorithmsRunner._run_strategy,
129 args=(strategy, iov, iteration, queue))
130
131 self.results[strategy.algorithm.name] = []
132 B2INFO(f"Starting subprocess of AlgorithmStrategy for {strategy.algorithm.name}.")
133 B2INFO("Logging will be diverted into algorithm output.")
134 child.start()
135 final_state = None
136 final_loop = False
137
138 B2INFO(f"Collecting results for {strategy.algorithm.name}.")
139 while True:
140 # Do we have results?
141 while not queue.empty():
142 output = queue.get()
143 B2DEBUG(29, f"Result from queue was {output}")
144 if output["type"] == "result":
145 self.results[strategy.algorithm.name].append(output["value"])
146 elif output["type"] == "final_state":
147 final_state = output["value"]
148 else:
149 raise RunnerError(f"Unknown result output: {output}")
150
151 # Still alive but not results at the moment? Wait a few seconds before checking.
152 if child.is_alive():
153 time.sleep(5)
154 continue
155 else:
156 # Reached a good ending of strategy
157 if final_state:
158 # Check the exitcode for failed Process()
159 if child.exitcode == 0:
160 B2INFO(f"AlgorithmStrategy subprocess for {strategy.algorithm.name} exited")
161 break
162 else:
163 raise RunnerError(f"Error during subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
164 # It might be possible that the subprocess has finished but all results weren't gathered yet.
165 else:
166 # Go around once more since all results should be in the queue waiting
167 if not final_loop:
168 final_loop = True
169 continue
170 else:
171 raise RunnerError(f"Strategy for {strategy.algorithm.name} "
172 "exited subprocess but without a final state!")
173
174 # Exit early and don't continue strategies as this one failed
175 if final_state == AlgorithmStrategy.FAILED:
176 B2ERROR(f"AlgorithmStrategy for {strategy.algorithm.name} failed. We will not proceed with any more algorithms")
177 self.final_state = self.FAILED
178 break
179
180 B2DEBUG(29, f"Finished subprocess of AlgorithmStrategy for {strategy.algorithm.name}")
181
182 if self.final_state != self.FAILED:
183 B2INFO(f"SequentialAlgorithmsRunner finished for Calibration {self.name}")
184 self.final_state = self.COMPLETED
185

Member Data Documentation

◆ algorithms

algorithms = None
inherited

The list of algorithms that this runner executes.

Definition at line 89 of file runners.py.

◆ COMPLETED

str COMPLETED = "COMPLETED"
staticinherited

completed

Definition at line 69 of file runners.py.

◆ database_chain

list database_chain = []
inherited

User set databases, can be used to apply your own constants and global tags.

Definition at line 79 of file runners.py.

◆ dependent_databases

list dependent_databases = []
inherited

List of local databases created by previous CAF calibrations/iterations.

Definition at line 81 of file runners.py.

◆ FAILED

str FAILED = "FAILED"
staticinherited

failed

Definition at line 67 of file runners.py.

◆ final_state

final_state = None
inherited

Final state of runner.

Definition at line 87 of file runners.py.

◆ input_files

list input_files = []
inherited

All of the output files made by the collector job and recovered by the "output_patterns".

Definition at line 77 of file runners.py.

◆ name

name = name
inherited

The name of this runner instance.

Definition at line 75 of file runners.py.

◆ output_database_dir

str output_database_dir = ""
inherited

The directory of the local database we use to store algorithm payloads from this execution.

Definition at line 83 of file runners.py.

◆ output_dir

str output_dir = ""
inherited

Output directory of these algorithms, for logging mostly.

Definition at line 91 of file runners.py.

◆ results

dict results = {}
inherited

Algorithm results from each algorithm we execute.

Definition at line 85 of file runners.py.


The documentation for this class was generated from the following file: