Belle II Software development
klm_strip_efficiency.py
1
8
9"""Custom calibration strategy for KLM strip efficiency."""
10
11import os
12
13import basf2
14from ROOT import Belle2
15
16from caf.utils import AlgResult, IoV
17from caf.utils import runs_overlapping_iov, runs_from_vector
18from caf.utils import split_runs_by_exp
19from caf.strategies import AlgorithmStrategy, StrategyError
20from caf.state_machines import AlgorithmMachine
21from ROOT.Belle2 import KLMStripEfficiencyAlgorithm
22from klm_strategies_common import get_lowest_exprun, get_highest_exprun, \
23 calibration_result_string
24
25
26class KLMStripEfficiency(AlgorithmStrategy):
27 """
28 Custom strategy for executing the KLM strip efficiency. Requires complex
29 run merging rules.
30
31 This uses a `caf.state_machines.AlgorithmMachine` to actually execute
32 the various steps rather than operating on a CalibrationAlgorithm
33 C++ class directly.
34 """
35
36
38 usable_params = {'iov_coverage': IoV}
39
40 def __init__(self, algorithm):
41 """
42 """
43 super().__init__(algorithm)
44
47 self.machine = AlgorithmMachine(self.algorithm)
48
49 self.first_execution = True
50
51 def run(self, iov, iteration, queue):
52 """
53 Runs the algorithm machine over the collected data and
54 fills the results.
55 """
56 if not self.is_valid():
57 raise StrategyError('The strategy KLMStripEfficiency was not '
58 'set up correctly.')
59
60 self.queue = queue
61
62 basf2.B2INFO(f'Setting up {self.__class__.__name__} strategy '
63 f'for {self.algorithm.name}')
64 # Add all the necessary parameters for a strategy to run.
65 machine_params = {}
66 machine_params['database_chain'] = self.database_chain
67 machine_params['dependent_databases'] = self.dependent_databases
68 machine_params['output_dir'] = self.output_dir
69 machine_params['output_database_dir'] = self.output_database_dir
70 machine_params['input_files'] = self.input_files
71 machine_params['ignored_runs'] = self.ignored_runs
72 self.machine.setup_from_dict(machine_params)
73 # Start moving through machine states.
74 basf2.B2INFO(f'Starting AlgorithmMachine of {self.algorithm.name}')
75 self.algorithm.algorithm.setCalibrationStage(
76 KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement)
77 # This sets up the logging and database chain and assigns all
78 # input files from collector jobs.
79 self.machine.setup_algorithm(iteration=iteration)
80 # After this point, the logging is in the stdout of the algorithm.
81 basf2.B2INFO(f'Beginning execution of {self.algorithm.name} using '
82 f'strategy {self.__class__.__name__}')
83
84 # Select of runs for calibration.
85 runs = self.algorithm.algorithm.getRunListFromAllData()
86 all_runs_collected = set(runs_from_vector(runs))
87 # Select runs overlapping with the calibration IOV if it is specified.
88 if iov:
89 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
90 else:
91 runs_to_execute = all_runs_collected
92 # Remove the ignored runs.
93 if self.ignored_runs:
94 basf2.B2INFO(f'Removing the ignored_runs from the runs '
95 f'to execute for {self.algorithm.name}')
96 runs_to_execute.difference_update(set(self.ignored_runs))
97
98 # Creation of sorted run list split by experiment.
99 runs_to_execute = sorted(runs_to_execute)
100 runs_to_execute = split_runs_by_exp(runs_to_execute)
101
102 # Get IOV coverage,
103 iov_coverage = None
104 if 'iov_coverage' in self.algorithm.params:
105 iov_coverage = self.algorithm.params['iov_coverage']
106
107 # Iterate over experiment run lists.
108 number_of_experiments = len(runs_to_execute)
109 for i_exp, run_list in enumerate(runs_to_execute, start=1):
110 lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
111 run_list, iov_coverage)
112 highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
113 run_list, iov_coverage)
114 self.process_experiment(run_list[0].exp, run_list, iteration,
115 lowest_exprun, highest_exprun)
116
117 # Send final state and result to CAF.
118 self.send_result(self.machine.result)
119 if (self.machine.result.result == AlgResult.ok.value) or \
120 (self.machine.result.result == AlgResult.iterate.value):
121 self.send_final_state(self.COMPLETED)
122 else:
123 self.send_final_state(self.FAILED)
124
125 def execute_over_run_list(self, run_list, iteration, forced_calibration,
126 calibration_stage, output_file):
127 """
128 Execute over run list.
129 """
130 if not self.first_execution:
131 self.machine.setup_algorithm()
132 else:
133 self.first_execution = False
134 self.machine.algorithm.algorithm.setForcedCalibration(
135 forced_calibration)
136 self.machine.algorithm.algorithm.setCalibrationStage(calibration_stage)
137 if (output_file is not None):
138 self.machine.algorithm.algorithm.setOutputFileName(output_file)
139 self.machine.execute_runs(runs=run_list, iteration=iteration,
140 apply_iov=None)
141 if (self.machine.result.result == AlgResult.ok.value) or \
142 (self.machine.result.result == AlgResult.iterate.value):
143 self.machine.complete()
144 else:
145 self.machine.fail()
146
147 def process_experiment(self, experiment, experiment_runs, iteration,
148 lowest_exprun, highest_exprun):
149 """
150 Process runs from experiment.
151 """
152 # Run lists. They have the following format: run number,
153 # calibration result code, ExpRun, algorithm results,
154 # merge information, payload.
155 run_data = []
156
157 # Initial run.
158 for exp_run in experiment_runs:
160 [exp_run], iteration, False,
161 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
162 result = self.machine.result.result
163 algorithm_results = KLMStripEfficiencyAlgorithm.Results(
164 self.machine.algorithm.algorithm.getResults())
165 # If number of hits is 0, then KLM is excluded. Such runs
166 # can be ignored safely.
167 if (algorithm_results.getExtHits() > 0):
168 run_data.append([exp_run.run, result, [exp_run],
169 algorithm_results, '', None])
170 result_str = calibration_result_string(result)
171 basf2.B2INFO(f'Run {int(exp_run.run)}: {result_str}.')
172
173 # Sort by run number.
174 run_data.sort(key=lambda x: x[0])
175
176 # Create list of runs that do not have enough data.
177 run_ranges = []
178 i = 0
179 while (i < len(run_data)):
180 if (run_data[i][1] == 2):
181 j = i
182 while (run_data[j][1] == 2):
183 j += 1
184 if (j >= len(run_data)):
185 break
186 run_ranges.append([i, j])
187 i = j
188 else:
189 i += 1
190
191 # Determine whether the runs with insufficient data can be merged to
192 # the next or previous normal run.
193 def can_merge(run_data, run_not_enough_data, run_normal):
194 return run_data[run_not_enough_data][3].newExtHitsPlanes(
195 run_data[run_normal][3].getExtHitsPlane()) == 0
196
197 for run_range in run_ranges:
198 next_run = run_range[1]
199 # To mark as 'none' at the end if there are no normal runs.
200 j = run_range[0]
201 i = next_run - 1
202 if (next_run < len(run_data)):
203 while (i >= run_range[0]):
204 if (can_merge(run_data, i, next_run)):
205 basf2.B2INFO(f'Run {int(run_data[i][0])} (not enough data) can be merged into the next normal run ' +
206 f'{int(run_data[next_run][0])}.')
207 run_data[i][4] = 'next'
208 else:
209 basf2.B2INFO(f'Run {int(run_data[i][0])} (not enough data) cannot be merged into the next normal run ' +
210 f'{int(run_data[next_run][0])}, will try the previous one.')
211 break
212 i -= 1
213 if (i < run_range[0]):
214 continue
215 previous_run = run_range[0] - 1
216 if (previous_run >= 0):
217 while (j <= i):
218 if (can_merge(run_data, j, previous_run)):
219 basf2.B2INFO(f'Run {int(run_data[j][0])} (not enough data) can be merged into the previous normal run ' +
220 f'{int(run_data[previous_run][0])}.')
221 run_data[j][4] = 'previous'
222 else:
223 basf2.B2INFO(f'Run {int(run_data[j][0])} (not enough data) cannot be merged into the previous normal ' +
224 f'run {int(run_data[previous_run][0])}.')
225 break
226 j += 1
227 if (j > i):
228 continue
229 basf2.B2INFO('A range of runs with not enough data is found that cannot be merged into neither previous nor ' +
230 f'next normal run: from {int(run_data[j][0])} to {int(run_data[i][0])}.')
231 while (j <= i):
232 run_data[j][4] = 'none'
233 j += 1
234
235 # Merge runs that do not have enough data. If both this and next
236 # run do not have enough data, then merge the collected data.
237 i = 0
238 j = 0
239 while (i < len(run_data) - 1):
240 while ((run_data[i][1] == 2) and (run_data[i + 1][1] == 2)):
241 if (run_data[i][4] != run_data[i + 1][4]):
242 break
243 basf2.B2INFO(f'Merging run {int(run_data[i + 1][0])} (not enough data) into run {int(run_data[i][0])} ' +
244 '(not enough data).')
245 run_data[i][2].extend(run_data[i + 1][2])
246 del run_data[i + 1]
248 run_data[i][2], iteration, False,
249 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
250 run_data[i][1] = self.machine.result.result
251 run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
252 self.machine.algorithm.algorithm.getResults())
253 result_str = calibration_result_string(run_data[i][1])
254 basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
255 if (i >= len(run_data) - 1):
256 break
257 i += 1
258
259 # Merge runs that do not have enough data into normal runs.
260 def merge_runs(run_data, run_not_enough_data, run_normal, forced):
261 basf2.B2INFO(f'Merging run {int(run_data[run_not_enough_data][0])} (not enough data) into run ' +
262 f'{int(run_data[run_normal][0])} (normal).')
263 run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
265 run_data[run_normal][2], iteration, forced,
266 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
267 run_data[run_normal][1] = self.machine.result.result
268 run_data[run_normal][3] = KLMStripEfficiencyAlgorithm.Results(
269 self.machine.algorithm.algorithm.getResults())
270 result_str = calibration_result_string(run_data[run_normal][1])
271 basf2.B2INFO(f'Run {int(run_data[run_normal][0])}: {result_str}.')
272 if (run_data[run_normal][1] != 0):
273 basf2.B2FATAL(f'Merging run {int(run_data[run_not_enough_data][0])} into ' +
274 f'run {int(run_data[run_normal][0])} failed.')
275 del run_data[run_not_enough_data]
276
277 i = 0
278 while (i < len(run_data)):
279 if (run_data[i][1] == 2):
280 if (run_data[i][4] == 'next'):
281 merge_runs(run_data, i, i + 1, False)
282 elif (run_data[i][4] == 'previous'):
283 merge_runs(run_data, i, i - 1, False)
284 else:
285 i += 1
286 else:
287 i += 1
288 i = 0
289 while (i < len(run_data)):
290 if (run_data[i][1] == 2 and run_data[i][4] == 'none'):
291 new_planes_previous = -1
292 new_planes_next = -1
293 if (i < len(run_data) - 1):
294 new_planes_next = run_data[i][3].newExtHitsPlanes(
295 run_data[i + 1][3].getExtHitsPlane())
296 basf2.B2INFO(f'There are {int(new_planes_next)} new active modules in run {int(run_data[i][0])} ' +
297 f'relatively to run {int(run_data[i + 1][0])}.')
298 if (i > 0):
299 new_planes_previous = run_data[i][3].newExtHitsPlanes(
300 run_data[i - 1][3].getExtHitsPlane())
301 basf2.B2INFO(f'There are {int(new_planes_previous)} new active modules in run {int(run_data[i][0])} ' +
302 f'relatively to run {int(run_data[i - 1][0])}.')
303 run_for_merging = -1
304 # If a forced merge of the normal run with another run from
305 # a different range of runs with not enough data has already
306 # been performed, then the list of active modules may change
307 # and there would be 0 new modules. Consequently, the number
308 # of modules is checked to be greater or equal than 0. However,
309 # there is no guarantee that the same added module would be
310 # calibrated normally. Thus, a forced merge is performed anyway.
311 if (new_planes_previous >= 0 and new_planes_next < 0):
312 run_for_merging = i - 1
313 elif (new_planes_previous < 0 and new_planes_next >= 0):
314 run_for_merging = i + 1
315 elif (new_planes_previous >= 0 and new_planes_next >= 0):
316 if (new_planes_previous < new_planes_next):
317 run_for_merging = i - 1
318 else:
319 run_for_merging = i + 1
320 else:
321 basf2.B2INFO(f'Cannot determine run for merging for run {int(run_data[i][0])}, performing its' +
322 ' forced calibration.')
324 run_data[i][2], iteration, True,
325 KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
326 None)
327 run_data[i][1] = self.machine.result.result
328 run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
329 self.machine.algorithm.algorithm.getResults())
330 result_str = calibration_result_string(run_data[i][1])
331 basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
332 if (run_data[i][1] != 0):
333 basf2.B2FATAL(f'Forced calibration of run {int(run_data[i][0])} failed.')
334 if (run_for_merging >= 0):
335 merge_runs(run_data, i, run_for_merging, True)
336 else:
337 i += 1
338
339 # Stage 2: determination of maximal run ranges.
340 # The set of calibrated planes should be the same for all small
341 # run ranges within the large run range.
342 run_ranges.clear()
343 i = 0
344 while (i < len(run_data)):
345 j = i + 1
346 while (j < len(run_data)):
347 planes_differ = False
348 if (run_data[j][3].newMeasuredPlanes(
349 run_data[i][3].getEfficiency()) != 0):
350 planes_differ = True
351 if (run_data[i][3].newMeasuredPlanes(
352 run_data[j][3].getEfficiency()) != 0):
353 planes_differ = True
354 if (planes_differ):
355 basf2.B2INFO(f'Run {int(run_data[j][0])}: the set of planes is different from run {int(run_data[i][0])}.')
356 break
357 else:
358 basf2.B2INFO(f'Run {int(run_data[j][0])}: the set of planes is the same as for run {int(run_data[i][0])}.')
359 j = j + 1
360 run_ranges.append([i, j])
361 i = j
362
363 # Stage 3: final calibration.
364
365 # Output directory.
366 if (not os.path.isdir('efficiency')):
367 os.mkdir('efficiency')
368
369 # Merge runs.
370 def merge_runs_2(run_data, run_1, run_2, forced):
371 basf2.B2INFO(f'Merging run {int(run_data[run_2][0])} into run {int(run_data[run_1][0])}.')
372 run_data[run_1][2].extend(run_data[run_2][2])
373 output_file = f'efficiency/efficiency_{int(run_data[run_1][2][0].exp)}_{int(run_data[run_1][2][0].run)}.root'
375 run_data[run_1][2], iteration, forced,
376 KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
377 output_file)
378 run_data[run_1][1] = self.machine.result.result
379 run_data[run_1][3] = KLMStripEfficiencyAlgorithm.Results(
380 self.machine.algorithm.algorithm.getResults())
381 run_data[run_1][5] = \
382 self.machine.algorithm.algorithm.getPayloadValues()
383 result_str = calibration_result_string(run_data[run_1][1])
384 basf2.B2INFO(f'Run {int(run_data[run_1][0])}: {result_str}; requested precision {0.02:f}, achieved precision ' +
385 f'{run_data[run_1][3].getAchievedPrecision():f}.')
386
387 for run_range in run_ranges:
388 i = run_range[0]
389 while (i < run_range[1]):
390 output_file = f'efficiency/efficiency_{int(run_data[i][2][0].exp)}_{int(run_data[i][2][0].run)}.root'
391 # Force calibration if there are no more runs in the range.
392 if (i == run_range[1] - 1):
393 forced_calibration = True
394 else:
395 forced_calibration = False
397 run_data[i][2], iteration, forced_calibration,
398 KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
399 output_file)
400 run_data[i][1] = self.machine.result.result
401 run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
402 self.machine.algorithm.algorithm.getResults())
403 run_data[i][5] = \
404 self.machine.algorithm.algorithm.getPayloadValues()
405 result_str = calibration_result_string(run_data[i][1])
406 basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}; requested precision {0.02:f}, achieved precision ' +
407 f'{run_data[i][3].getAchievedPrecision():f}.')
408 if (run_data[i][1] == 2):
409 j = i + 1
410 while (j < run_range[1]):
411 # Force calibration if there are no more runs
412 # in the range.
413 if (j == run_range[1] - 1):
414 forced_calibration = True
415 else:
416 forced_calibration = False
417 merge_runs_2(run_data, i, j, forced_calibration)
418 run_data[j][1] = -1
419 j = j + 1
420 if (run_data[i][1] == 0):
421 break
422 i = j
423 else:
424 i = i + 1
425
426 i = 0
427 while (i < len(run_data)):
428 if (run_data[i][1] == -1):
429 del run_data[i]
430 else:
431 i = i + 1
432
433 # Stage 4: write the results to the database.
434 def commit_payload(run_data, run):
435 basf2.B2INFO(f'Writing run {int(run_data[run][0])}.')
436 self.machine.algorithm.algorithm.commit(run_data[run][5])
437
438 for i in range(0, len(run_data)):
439 # Get first run again due to possible mergings.
440 run_data[i][2].sort(key=lambda x: x.run)
441 first_run = run_data[i][2][0].run
442 # Set IOV for the current run.
443 # The last run will be overwritten when writing the result.
444 run_data[i][5].front().iov = \
445 Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
446 # Write the previous run.
447 if (i > 0):
448 iov = run_data[previous_run][5].front().iov
449 if (previous_run == 0):
450 run_data[previous_run][5].front().iov = \
452 lowest_exprun.exp, lowest_exprun.run,
453 experiment, first_run - 1)
454 else:
455 run_data[previous_run][5].front().iov = \
456 Belle2.IntervalOfValidity(experiment, iov.getRunLow(),
457 experiment, first_run - 1)
458 commit_payload(run_data, previous_run)
459 previous_run = i
460 if (i == 0):
461 previous_run = 0
462 # Write the current run if it is the last run.
463 if (i == len(run_data) - 1):
464 iov = run_data[i][5].front().iov
465 run_data[i][5].front().iov = \
467 experiment, iov.getRunLow(),
468 highest_exprun.exp, highest_exprun.run)
469 commit_payload(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
first_execution
Flag for the first execution of this AlgorithmStrategy.
def execute_over_run_list(self, run_list, iteration, forced_calibration, calibration_stage, output_file)
queue
The multiprocessing queue used to pass back results one at a time.