9 """Custom calibration strategy for KLM strip efficiency."""
14 from ROOT
import Belle2
16 from caf.utils
import AlgResult, IoV
17 from caf.utils
import runs_overlapping_iov, runs_from_vector
18 from caf.utils
import split_runs_by_exp
19 from caf.strategies
import AlgorithmStrategy, StrategyError
20 from caf.state_machines
import AlgorithmMachine
21 from ROOT.Belle2
import KLMStripEfficiencyAlgorithm
22 from klm_strategies_common
import get_lowest_exprun, get_highest_exprun, \
23 calibration_result_string
28 Custom strategy for executing the KLM strip efficiency. Requires complex
31 This uses a `caf.state_machines.AlgorithmMachine` to actually execute
32 the various steps rather than operating on a CalibrationAlgorithm
38 usable_params = {
'iov_coverage': IoV}
47 self.
machinemachine = AlgorithmMachine(self.algorithm)
51 def run(self, iov, iteration, queue):
53 Runs the algorithm machine over the collected data and
56 if not self.is_valid():
57 raise StrategyError(
'The strategy KLMStripEfficiency was not '
62 basf2.B2INFO(f
'Setting up {self.__class__.__name__} strategy '
63 f
'for {self.algorithm.name}')
66 machine_params[
'database_chain'] = self.database_chain
67 machine_params[
'dependent_databases'] = self.dependent_databases
68 machine_params[
'output_dir'] = self.output_dir
69 machine_params[
'output_database_dir'] = self.output_database_dir
70 machine_params[
'input_files'] = self.input_files
71 machine_params[
'ignored_runs'] = self.ignored_runs
72 self.
machinemachine.setup_from_dict(machine_params)
74 basf2.B2INFO(f
'Starting AlgorithmMachine of {self.algorithm.name}')
75 self.algorithm.algorithm.setCalibrationStage(
76 KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement)
79 self.
machinemachine.setup_algorithm(iteration=iteration)
81 basf2.B2INFO(f
'Beginning execution of {self.algorithm.name} using '
82 f
'strategy {self.__class__.__name__}')
85 runs = self.algorithm.algorithm.getRunListFromAllData()
86 all_runs_collected = set(runs_from_vector(runs))
89 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
91 runs_to_execute = all_runs_collected
94 basf2.B2INFO(f
'Removing the ignored_runs from the runs '
95 f
'to execute for {self.algorithm.name}')
96 runs_to_execute.difference_update(set(self.ignored_runs))
99 runs_to_execute = sorted(runs_to_execute)
100 runs_to_execute = split_runs_by_exp(runs_to_execute)
104 if 'iov_coverage' in self.algorithm.params:
105 iov_coverage = self.algorithm.params[
'iov_coverage']
108 number_of_experiments = len(runs_to_execute)
109 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
110 lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
111 run_list, iov_coverage)
112 highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
113 run_list, iov_coverage)
115 lowest_exprun, highest_exprun)
118 self.send_result(self.
machinemachine.result)
119 if (self.
machinemachine.result.result == AlgResult.ok.value)
or \
120 (self.
machinemachine.result.result == AlgResult.iterate.value):
121 self.send_final_state(self.COMPLETED)
123 self.send_final_state(self.FAILED)
126 calibration_stage, output_file):
128 Execute over run list.
131 self.
machinemachine.setup_algorithm()
134 self.
machinemachine.algorithm.algorithm.setForcedCalibration(
136 self.
machinemachine.algorithm.algorithm.setCalibrationStage(calibration_stage)
137 if (output_file
is not None):
138 self.
machinemachine.algorithm.algorithm.setOutputFileName(output_file)
139 self.
machinemachine.execute_runs(runs=run_list, iteration=iteration,
141 if (self.
machinemachine.result.result == AlgResult.ok.value)
or \
142 (self.
machinemachine.result.result == AlgResult.iterate.value):
148 lowest_exprun, highest_exprun):
150 Process runs from experiment.
158 for exp_run
in experiment_runs:
160 [exp_run], iteration,
False,
161 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
None)
162 result = self.
machinemachine.result.result
163 algorithm_results = KLMStripEfficiencyAlgorithm.Results(
164 self.
machinemachine.algorithm.algorithm.getResults())
167 if (algorithm_results.getExtHits() > 0):
168 run_data.append([exp_run.run, result, [exp_run],
169 algorithm_results,
'',
None])
170 result_str = calibration_result_string(result)
171 basf2.B2INFO(f
'Run {int(exp_run.run)}: {result_str}.')
174 run_data.sort(key=
lambda x: x[0])
179 while (i < len(run_data)):
180 if (run_data[i][1] == 2):
182 while (run_data[j][1] == 2):
184 if (j >= len(run_data)):
186 run_ranges.append([i, j])
193 def can_merge(run_data, run_not_enough_data, run_normal):
194 return run_data[run_not_enough_data][3].newExtHitsPlanes(
195 run_data[run_normal][3].getExtHitsPlane()) == 0
197 for run_range
in run_ranges:
198 next_run = run_range[1]
202 if (next_run < len(run_data)):
203 while (i >= run_range[0]):
204 if (can_merge(run_data, i, next_run)):
205 basf2.B2INFO(f
'Run {int(run_data[i][0])} (not enough data) can be merged into the next normal run ' +
206 f
'{int(run_data[next_run][0])}.')
207 run_data[i][4] =
'next'
209 basf2.B2INFO(f
'Run {int(run_data[i][0])} (not enough data) cannot be merged into the next normal run ' +
210 f
'{int(run_data[next_run][0])}, will try the previous one.')
213 if (i < run_range[0]):
215 previous_run = run_range[0] - 1
216 if (previous_run >= 0):
218 if (can_merge(run_data, j, previous_run)):
219 basf2.B2INFO(f
'Run {int(run_data[j][0])} (not enough data) can be merged into the previous normal run ' +
220 f
'{int(run_data[previous_run][0])}.')
221 run_data[j][4] =
'previous'
223 basf2.B2INFO(f
'Run {int(run_data[j][0])} (not enough data) cannot be merged into the previous normal ' +
224 f
'run {int(run_data[previous_run][0])}.')
229 basf2.B2INFO(
'A range of runs with not enough data is found that cannot be merged into neither previous nor ' +
230 f
'next normal run: from {int(run_data[j][0])} to {int(run_data[i][0])}.')
232 run_data[j][4] =
'none'
239 while (i < len(run_data) - 1):
240 while ((run_data[i][1] == 2)
and (run_data[i + 1][1] == 2)):
241 if (run_data[i][4] != run_data[i + 1][4]):
243 basf2.B2INFO(f
'Merging run {int(run_data[i + 1][0])} (not enough data) into run {int(run_data[i][0])} ' +
244 '(not enough data).')
245 run_data[i][2].extend(run_data[i + 1][2])
248 run_data[i][2], iteration,
False,
249 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
None)
250 run_data[i][1] = self.
machinemachine.result.result
251 run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
252 self.
machinemachine.algorithm.algorithm.getResults())
253 result_str = calibration_result_string(run_data[i][1])
254 basf2.B2INFO(f
'Run {int(run_data[i][0])}: {result_str}.')
255 if (i >= len(run_data) - 1):
260 def merge_runs(run_data, run_not_enough_data, run_normal, forced):
261 basf2.B2INFO(f
'Merging run {int(run_data[run_not_enough_data][0])} (not enough data) into run ' +
262 f
'{int(run_data[run_normal][0])} (normal).')
263 run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
265 run_data[run_normal][2], iteration, forced,
266 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
None)
267 run_data[run_normal][1] = self.
machinemachine.result.result
268 run_data[run_normal][3] = KLMStripEfficiencyAlgorithm.Results(
269 self.
machinemachine.algorithm.algorithm.getResults())
270 result_str = calibration_result_string(run_data[run_normal][1])
271 basf2.B2INFO(f
'Run {int(run_data[run_normal][0])}: {result_str}.')
272 if (run_data[run_normal][1] != 0):
273 basf2.B2FATAL(f
'Merging run {int(run_data[run_not_enough_data][0])} into ' +
274 f
'run {int(run_data[run_normal][0])} failed.')
275 del run_data[run_not_enough_data]
278 while (i < len(run_data)):
279 if (run_data[i][1] == 2):
280 if (run_data[i][4] ==
'next'):
281 merge_runs(run_data, i, i + 1,
False)
282 elif (run_data[i][4] ==
'previous'):
283 merge_runs(run_data, i, i - 1,
False)
289 while (i < len(run_data)):
290 if (run_data[i][1] == 2
and run_data[i][4] ==
'none'):
291 new_planes_previous = -1
293 if (i < len(run_data) - 1):
294 new_planes_next = run_data[i][3].newExtHitsPlanes(
295 run_data[i + 1][3].getExtHitsPlane())
296 basf2.B2INFO(f
'There are {int(new_planes_next)} new active modules in run {int(run_data[i][0])} ' +
297 f
'relatively to run {int(run_data[i + 1][0])}.')
299 new_planes_previous = run_data[i][3].newExtHitsPlanes(
300 run_data[i - 1][3].getExtHitsPlane())
301 basf2.B2INFO(f
'There are {int(new_planes_previous)} new active modules in run {int(run_data[i][0])} ' +
302 f
'relatively to run {int(run_data[i - 1][0])}.')
311 if (new_planes_previous >= 0
and new_planes_next < 0):
312 run_for_merging = i - 1
313 elif (new_planes_previous < 0
and new_planes_next >= 0):
314 run_for_merging = i + 1
315 elif (new_planes_previous >= 0
and new_planes_next >= 0):
316 if (new_planes_previous < new_planes_next):
317 run_for_merging = i - 1
319 run_for_merging = i + 1
321 basf2.B2INFO(f
'Cannot determine run for merging for run {int(run_data[i][0])}, performing its' +
322 ' forced calibration.')
324 run_data[i][2], iteration,
True,
325 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
327 run_data[i][1] = self.
machinemachine.result.result
328 run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
329 self.
machinemachine.algorithm.algorithm.getResults())
330 result_str = calibration_result_string(run_data[i][1])
331 basf2.B2INFO(f
'Run {int(run_data[i][0])}: {result_str}.')
332 if (run_data[i][1] != 0):
333 basf2.B2FATAL(f
'Forced calibration of run {int(run_data[i][0])} failed.')
334 if (run_for_merging >= 0):
335 merge_runs(run_data, i, run_for_merging,
True)
344 while (i < len(run_data)):
346 while (j < len(run_data)):
347 planes_differ =
False
348 if (run_data[j][3].newMeasuredPlanes(
349 run_data[i][3].getEfficiency()) != 0):
351 if (run_data[i][3].newMeasuredPlanes(
352 run_data[j][3].getEfficiency()) != 0):
355 basf2.B2INFO(f
'Run {int(run_data[j][0])}: the set of planes is different from run {int(run_data[i][0])}.')
358 basf2.B2INFO(f
'Run {int(run_data[j][0])}: the set of planes is the same as for run {int(run_data[i][0])}.')
360 run_ranges.append([i, j])
366 if (
not os.path.isdir(
'efficiency')):
367 os.mkdir(
'efficiency')
370 def merge_runs_2(run_data, run_1, run_2, forced):
371 basf2.B2INFO(f
'Merging run {int(run_data[run_2][0])} into run {int(run_data[run_1][0])}.')
372 run_data[run_1][2].extend(run_data[run_2][2])
373 output_file = f
'efficiency/efficiency_{int(run_data[run_1][2][0].exp)}_{int(run_data[run_1][2][0].run)}.root'
375 run_data[run_1][2], iteration, forced,
376 KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
378 run_data[run_1][1] = self.
machinemachine.result.result
379 run_data[run_1][3] = KLMStripEfficiencyAlgorithm.Results(
380 self.
machinemachine.algorithm.algorithm.getResults())
381 run_data[run_1][5] = \
382 self.
machinemachine.algorithm.algorithm.getPayloadValues()
383 result_str = calibration_result_string(run_data[run_1][1])
384 basf2.B2INFO(f
'Run {int(run_data[run_1][0])}: {result_str}; requested precision {0.02:f}, achieved precision ' +
385 f
'{run_data[run_1][3].getAchievedPrecision():f}.')
387 for run_range
in run_ranges:
389 while (i < run_range[1]):
390 output_file = f
'efficiency/efficiency_{int(run_data[i][2][0].exp)}_{int(run_data[i][2][0].run)}.root'
392 if (i == run_range[1] - 1):
393 forced_calibration =
True
395 forced_calibration =
False
397 run_data[i][2], iteration, forced_calibration,
398 KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
400 run_data[i][1] = self.
machinemachine.result.result
401 run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
402 self.
machinemachine.algorithm.algorithm.getResults())
404 self.
machinemachine.algorithm.algorithm.getPayloadValues()
405 result_str = calibration_result_string(run_data[i][1])
406 basf2.B2INFO(f
'Run {int(run_data[i][0])}: {result_str}; requested precision {0.02:f}, achieved precision ' +
407 f
'{run_data[i][3].getAchievedPrecision():f}.')
408 if (run_data[i][1] == 2):
410 while (j < run_range[1]):
413 if (j == run_range[1] - 1):
414 forced_calibration =
True
416 forced_calibration =
False
417 merge_runs_2(run_data, i, j, forced_calibration)
420 if (run_data[i][1] == 0):
427 while (i < len(run_data)):
428 if (run_data[i][1] == -1):
434 def commit_payload(run_data, run):
435 basf2.B2INFO(f
'Writing run {int(run_data[run][0])}.')
436 self.
machinemachine.algorithm.algorithm.commit(run_data[run][5])
438 for i
in range(0, len(run_data)):
440 run_data[i][2].sort(key=
lambda x: x.run)
441 first_run = run_data[i][2][0].run
444 run_data[i][5].front().iov = \
448 iov = run_data[previous_run][5].front().iov
449 if (previous_run == 0):
450 run_data[previous_run][5].front().iov = \
452 lowest_exprun.exp, lowest_exprun.run,
453 experiment, first_run - 1)
455 run_data[previous_run][5].front().iov = \
457 experiment, first_run - 1)
458 commit_payload(run_data, previous_run)
463 if (i == len(run_data) - 1):
464 iov = run_data[i][5].front().iov
465 run_data[i][5].front().iov = \
467 experiment, iov.getRunLow(),
468 highest_exprun.exp, highest_exprun.run)
469 commit_payload(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
first_execution
Flag for the first execution of this AlgorithmStrategy.
def __init__(self, algorithm)
def execute_over_run_list(self, run_list, iteration, forced_calibration, calibration_stage, output_file)
queue
The multiprocessing queue used to pass back results one at a time.