11 """Custom calibration strategy for KLM strip efficiency."""
16 from ROOT
import Belle2
18 from caf.utils
import AlgResult, IoV
19 from caf.utils
import runs_overlapping_iov, runs_from_vector
20 from caf.utils
import split_runs_by_exp
21 from caf.strategies
import AlgorithmStrategy, StrategyError
22 from caf.state_machines
import AlgorithmMachine
23 from ROOT.Belle2
import KLMStripEfficiencyAlgorithm
24 from klm_strategies_common
import get_lowest_exprun, get_highest_exprun, \
25 calibration_result_string
30 Custom strategy for executing the KLM strip efficiency. Requires complex
33 This uses a `caf.state_machines.AlgorithmMachine` to actually execute
34 the various steps rather than operating on a CalibrationAlgorithm
40 usable_params = {
'iov_coverage': IoV}
49 self.
machinemachine = AlgorithmMachine(self.algorithm)
53 def run(self, iov, iteration, queue):
55 Runs the algorithm machine over the collected data and
58 if not self.is_valid():
59 raise StrategyError(
'The strategy KLMStripEfficiency was not '
64 basf2.B2INFO(f
'Setting up {self.__class__.__name__} strategy '
65 f
'for {self.algorithm.name}')
68 machine_params[
'database_chain'] = self.database_chain
69 machine_params[
'dependent_databases'] = self.dependent_databases
70 machine_params[
'output_dir'] = self.output_dir
71 machine_params[
'output_database_dir'] = self.output_database_dir
72 machine_params[
'input_files'] = self.input_files
73 machine_params[
'ignored_runs'] = self.ignored_runs
74 self.
machinemachine.setup_from_dict(machine_params)
76 basf2.B2INFO(f
'Starting AlgorithmMachine of {self.algorithm.name}')
77 self.algorithm.algorithm.setCalibrationStage(
78 KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement)
81 self.
machinemachine.setup_algorithm(iteration=iteration)
83 basf2.B2INFO(f
'Beginning execution of {self.algorithm.name} using '
84 f
'strategy {self.__class__.__name__}')
87 runs = self.algorithm.algorithm.getRunListFromAllData()
88 all_runs_collected = set(runs_from_vector(runs))
91 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
93 runs_to_execute = all_runs_collected
96 basf2.B2INFO(f
'Removing the ignored_runs from the runs '
97 f
'to execute for {self.algorithm.name}')
98 runs_to_execute.difference_update(set(self.ignored_runs))
101 runs_to_execute = sorted(runs_to_execute)
102 runs_to_execute = split_runs_by_exp(runs_to_execute)
106 if 'iov_coverage' in self.algorithm.params:
107 iov_coverage = self.algorithm.params[
'iov_coverage']
110 number_of_experiments = len(runs_to_execute)
111 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
112 lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
113 run_list, iov_coverage)
114 highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
115 run_list, iov_coverage)
117 lowest_exprun, highest_exprun)
120 self.send_result(self.
machinemachine.result)
121 if (self.
machinemachine.result.result == AlgResult.ok.value)
or \
122 (self.
machinemachine.result.result == AlgResult.iterate.value):
123 self.send_final_state(self.COMPLETED)
125 self.send_final_state(self.FAILED)
128 calibration_stage, output_file):
130 Execute over run list.
133 self.
machinemachine.setup_algorithm()
136 self.
machinemachine.algorithm.algorithm.setForcedCalibration(
138 self.
machinemachine.algorithm.algorithm.setCalibrationStage(calibration_stage)
139 if (output_file
is not None):
140 self.
machinemachine.algorithm.algorithm.setOutputFileName(output_file)
141 self.
machinemachine.execute_runs(runs=run_list, iteration=iteration,
143 if (self.
machinemachine.result.result == AlgResult.ok.value)
or \
144 (self.
machinemachine.result.result == AlgResult.iterate.value):
150 lowest_exprun, highest_exprun):
152 Process runs from experiment.
160 for exp_run
in experiment_runs:
162 [exp_run], iteration,
False,
163 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
None)
164 result = self.
machinemachine.result.result
165 algorithm_results = KLMStripEfficiencyAlgorithm.Results(
166 self.
machinemachine.algorithm.algorithm.getResults())
169 if (algorithm_results.getExtHits() > 0):
170 run_data.append([exp_run.run, result, [exp_run],
171 algorithm_results,
'',
None])
172 result_str = calibration_result_string(result)
173 basf2.B2INFO(
'Run %d: %s.' % (exp_run.run, result_str))
176 run_data.sort(key=
lambda x: x[0])
181 while (i < len(run_data)):
182 if (run_data[i][1] == 2):
184 while (run_data[j][1] == 2):
186 if (j >= len(run_data)):
188 run_ranges.append([i, j])
195 def can_merge(run_data, run_not_enough_data, run_normal):
196 return run_data[run_not_enough_data][3].newExtHitsPlanes(
197 run_data[run_normal][3].getExtHitsPlane()) == 0
199 for run_range
in run_ranges:
200 next_run = run_range[1]
204 if (next_run < len(run_data)):
205 while (i >= run_range[0]):
206 if (can_merge(run_data, i, next_run)):
207 basf2.B2INFO(
'Run %d (not enough data) can be merged '
208 'into the next normal run %d.' %
209 (run_data[i][0], run_data[next_run][0]))
210 run_data[i][4] =
'next'
212 basf2.B2INFO(
'Run %d (not enough data) cannot be '
213 'merged into the next normal run %d, '
214 'will try the previous one.' %
215 (run_data[i][0], run_data[next_run][0]))
218 if (i < run_range[0]):
220 previous_run = run_range[0] - 1
221 if (previous_run >= 0):
223 if (can_merge(run_data, j, previous_run)):
224 basf2.B2INFO(
'Run %d (not enough data) can be merged '
225 'into the previous normal run %d.' %
227 run_data[previous_run][0]))
228 run_data[j][4] =
'previous'
230 basf2.B2INFO(
'Run %d (not enough data) cannot be '
231 'merged into the previous normal run %d.' %
233 run_data[previous_run][0]))
238 basf2.B2INFO(
'A range of runs with not enough data is found '
239 'that cannot be merged into neither previous nor '
240 'next normal run: from %d to %d.' %
241 (run_data[j][0], run_data[i][0]))
243 run_data[j][4] =
'none'
250 while (i < len(run_data) - 1):
251 while ((run_data[i][1] == 2)
and (run_data[i + 1][1] == 2)):
252 if (run_data[i][4] != run_data[i + 1][4]):
254 basf2.B2INFO(
'Merging run %d (not enough data) into '
255 'run %d (not enough data).' %
256 (run_data[i + 1][0], run_data[i][0]))
257 run_data[i][2].extend(run_data[i + 1][2])
260 run_data[i][2], iteration,
False,
261 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
None)
262 run_data[i][1] = self.
machinemachine.result.result
263 run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
264 self.
machinemachine.algorithm.algorithm.getResults())
265 result_str = calibration_result_string(run_data[i][1])
266 basf2.B2INFO(
'Run %d: %s.' % (run_data[i][0], result_str))
267 if (i >= len(run_data) - 1):
272 def merge_runs(run_data, run_not_enough_data, run_normal, forced):
273 basf2.B2INFO(
'Merging run %d (not enough data) into '
275 (run_data[run_not_enough_data][0],
276 run_data[run_normal][0]))
277 run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
279 run_data[run_normal][2], iteration, forced,
280 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
None)
281 run_data[run_normal][1] = self.
machinemachine.result.result
282 run_data[run_normal][3] = KLMStripEfficiencyAlgorithm.Results(
283 self.
machinemachine.algorithm.algorithm.getResults())
284 result_str = calibration_result_string(run_data[run_normal][1])
285 basf2.B2INFO(
'Run %d: %s.' % (run_data[run_normal][0], result_str))
286 if (run_data[run_normal][1] != 0):
287 basf2.B2FATAL(
'Merging run %d into run %d failed.' %
288 (run_data[run_not_enough_data][0],
289 run_data[run_normal][0]))
290 del run_data[run_not_enough_data]
293 while (i < len(run_data)):
294 if (run_data[i][1] == 2):
295 if (run_data[i][4] ==
'next'):
296 merge_runs(run_data, i, i + 1,
False)
297 elif (run_data[i][4] ==
'previous'):
298 merge_runs(run_data, i, i - 1,
False)
304 while (i < len(run_data)):
305 if (run_data[i][1] == 2
and run_data[i][4] ==
'none'):
306 new_planes_previous = -1
308 if (i < len(run_data) - 1):
309 new_planes_next = run_data[i][3].newExtHitsPlanes(
310 run_data[i + 1][3].getExtHitsPlane())
311 basf2.B2INFO(
'There are %d new active modules in run %d '
312 'relatively to run %d.' %
313 (new_planes_next, run_data[i][0],
316 new_planes_previous = run_data[i][3].newExtHitsPlanes(
317 run_data[i - 1][3].getExtHitsPlane())
318 basf2.B2INFO(
'There are %d new active modules in run %d '
319 'relatively to run %d.' %
320 (new_planes_previous,
321 run_data[i][0], run_data[i - 1][0]))
330 if (new_planes_previous >= 0
and new_planes_next < 0):
331 run_for_merging = i - 1
332 elif (new_planes_previous < 0
and new_planes_next >= 0):
333 run_for_merging = i + 1
334 elif (new_planes_previous >= 0
and new_planes_next >= 0):
335 if (new_planes_previous < new_planes_next):
336 run_for_merging = i - 1
338 run_for_merging = i + 1
340 basf2.B2INFO(
'Cannot determine run for merging for run %d, '
341 'performing its forced calibration.' %
344 run_data[i][2], iteration,
True,
345 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
347 run_data[i][1] = self.
machinemachine.result.result
348 run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
349 self.
machinemachine.algorithm.algorithm.getResults())
350 result_str = calibration_result_string(run_data[i][1])
351 basf2.B2INFO(
'Run %d: %s.' % (run_data[i][0], result_str))
352 if (run_data[i][1] != 0):
353 basf2.B2FATAL(
'Forced calibration of run %d failed.' %
355 if (run_for_merging >= 0):
356 merge_runs(run_data, i, run_for_merging,
True)
365 while (i < len(run_data)):
367 while (j < len(run_data)):
368 planes_differ =
False
369 if (run_data[j][3].newMeasuredPlanes(
370 run_data[i][3].getEfficiency()) != 0):
372 if (run_data[i][3].newMeasuredPlanes(
373 run_data[j][3].getEfficiency()) != 0):
376 basf2.B2INFO(
'Run %d: the set of planes is different '
378 % (run_data[j][0], run_data[i][0]))
381 basf2.B2INFO(
'Run %d: the set of planes is the same '
383 % (run_data[j][0], run_data[i][0]))
385 run_ranges.append([i, j])
391 if (
not os.path.isdir(
'efficiency')):
392 os.mkdir(
'efficiency')
395 def merge_runs_2(run_data, run_1, run_2, forced):
396 basf2.B2INFO(
'Merging run %d into run %d.' %
397 (run_data[run_2][0], run_data[run_1][0]))
398 run_data[run_1][2].extend(run_data[run_2][2])
399 output_file =
'efficiency/efficiency_%d_%d.root' % \
400 (run_data[run_1][2][0].exp, run_data[run_1][2][0].run)
402 run_data[run_1][2], iteration, forced,
403 KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
405 run_data[run_1][1] = self.
machinemachine.result.result
406 run_data[run_1][3] = KLMStripEfficiencyAlgorithm.Results(
407 self.
machinemachine.algorithm.algorithm.getResults())
408 run_data[run_1][5] = \
409 self.
machinemachine.algorithm.algorithm.getPayloadValues()
410 result_str = calibration_result_string(run_data[run_1][1])
411 basf2.B2INFO(
'Run %d: %s; requested precision %f, achieved '
413 (run_data[run_1][0], result_str, 0.02,
415 run_data[run_1][3].getAchievedPrecision()))
417 for run_range
in run_ranges:
419 while (i < run_range[1]):
420 output_file =
'efficiency/efficiency_%d_%d.root' % \
421 (run_data[i][2][0].exp, run_data[i][2][0].run)
423 if (i == run_range[1] - 1):
424 forced_calibration =
True
426 forced_calibration =
False
428 run_data[i][2], iteration, forced_calibration,
429 KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
431 run_data[i][1] = self.
machinemachine.result.result
432 run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
433 self.
machinemachine.algorithm.algorithm.getResults())
435 self.
machinemachine.algorithm.algorithm.getPayloadValues()
436 result_str = calibration_result_string(run_data[i][1])
437 basf2.B2INFO(
'Run %d: %s; requested precision %f, achieved '
439 (run_data[i][0], result_str, 0.02,
441 run_data[i][3].getAchievedPrecision()))
442 if (run_data[i][1] == 2):
444 while (j < run_range[1]):
447 if (j == run_range[1] - 1):
448 forced_calibration =
True
450 forced_calibration =
False
451 merge_runs_2(run_data, i, j, forced_calibration)
454 if (run_data[i][1] == 0):
461 while (i < len(run_data)):
462 if (run_data[i][1] == -1):
468 def commit_payload(run_data, run):
469 basf2.B2INFO(
'Writing run %d.' % (run_data[run][0]))
470 self.
machinemachine.algorithm.algorithm.commit(run_data[run][5])
472 for i
in range(0, len(run_data)):
474 run_data[i][2].sort(key=
lambda x: x.run)
475 first_run = run_data[i][2][0].run
478 run_data[i][5].front().iov = \
482 iov = run_data[previous_run][5].front().iov
483 if (previous_run == 0):
484 run_data[previous_run][5].front().iov = \
486 lowest_exprun.exp, lowest_exprun.run,
487 experiment, first_run - 1)
489 run_data[previous_run][5].front().iov = \
491 experiment, first_run - 1)
492 commit_payload(run_data, previous_run)
497 if (i == len(run_data) - 1):
498 iov = run_data[i][5].front().iov
499 run_data[i][5].front().iov = \
501 experiment, iov.getRunLow(),
502 highest_exprun.exp, highest_exprun.run)
503 commit_payload(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
first_execution
Flag for the first execution of this AlgorithmStrategy.
def __init__(self, algorithm)
def execute_over_run_list(self, run_list, iteration, forced_calibration, calibration_stage, output_file)
queue
The multiprocessing queue used to pass back results one at a time.