Belle II Software  release-08-01-10
klm_strip_efficiency.py
1 
8 
9 """Custom calibration strategy for KLM strip efficiency."""
10 
11 import os
12 
13 import basf2
14 from ROOT import Belle2
15 
16 from caf.utils import AlgResult, IoV
17 from caf.utils import runs_overlapping_iov, runs_from_vector
18 from caf.utils import split_runs_by_exp
19 from caf.strategies import AlgorithmStrategy, StrategyError
20 from caf.state_machines import AlgorithmMachine
21 from ROOT.Belle2 import KLMStripEfficiencyAlgorithm
22 from klm_strategies_common import get_lowest_exprun, get_highest_exprun, \
23  calibration_result_string
24 
25 
26 class KLMStripEfficiency(AlgorithmStrategy):
27  """
28  Custom strategy for executing the KLM strip efficiency. Requires complex
29  run merging rules.
30 
31  This uses a `caf.state_machines.AlgorithmMachine` to actually execute
32  the various steps rather than operating on a CalibrationAlgorithm
33  C++ class directly.
34  """
35 
36 
38  usable_params = {'iov_coverage': IoV}
39 
40  def __init__(self, algorithm):
41  """
42  """
43  super().__init__(algorithm)
44 
47  self.machinemachine = AlgorithmMachine(self.algorithm)
48 
49  self.first_executionfirst_execution = True
50 
51  def run(self, iov, iteration, queue):
52  """
53  Runs the algorithm machine over the collected data and
54  fills the results.
55  """
56  if not self.is_valid():
57  raise StrategyError('The strategy KLMStripEfficiency was not '
58  'set up correctly.')
59 
60  self.queuequeue = queue
61 
62  basf2.B2INFO(f'Setting up {self.__class__.__name__} strategy '
63  f'for {self.algorithm.name}')
64  # Add all the necessary parameters for a strategy to run.
65  machine_params = {}
66  machine_params['database_chain'] = self.database_chain
67  machine_params['dependent_databases'] = self.dependent_databases
68  machine_params['output_dir'] = self.output_dir
69  machine_params['output_database_dir'] = self.output_database_dir
70  machine_params['input_files'] = self.input_files
71  machine_params['ignored_runs'] = self.ignored_runs
72  self.machinemachine.setup_from_dict(machine_params)
73  # Start moving through machine states.
74  basf2.B2INFO(f'Starting AlgorithmMachine of {self.algorithm.name}')
75  self.algorithm.algorithm.setCalibrationStage(
76  KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement)
77  # This sets up the logging and database chain and assigns all
78  # input files from collector jobs.
79  self.machinemachine.setup_algorithm(iteration=iteration)
80  # After this point, the logging is in the stdout of the algorithm.
81  basf2.B2INFO(f'Beginning execution of {self.algorithm.name} using '
82  f'strategy {self.__class__.__name__}')
83 
84  # Select of runs for calibration.
85  runs = self.algorithm.algorithm.getRunListFromAllData()
86  all_runs_collected = set(runs_from_vector(runs))
87  # Select runs overlapping with the calibration IOV if it is specified.
88  if iov:
89  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
90  else:
91  runs_to_execute = all_runs_collected
92  # Remove the ignored runs.
93  if self.ignored_runs:
94  basf2.B2INFO(f'Removing the ignored_runs from the runs '
95  f'to execute for {self.algorithm.name}')
96  runs_to_execute.difference_update(set(self.ignored_runs))
97 
98  # Creation of sorted run list split by experiment.
99  runs_to_execute = sorted(runs_to_execute)
100  runs_to_execute = split_runs_by_exp(runs_to_execute)
101 
102  # Get IOV coverage,
103  iov_coverage = None
104  if 'iov_coverage' in self.algorithm.params:
105  iov_coverage = self.algorithm.params['iov_coverage']
106 
107  # Iterate over experiment run lists.
108  number_of_experiments = len(runs_to_execute)
109  for i_exp, run_list in enumerate(runs_to_execute, start=1):
110  lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
111  run_list, iov_coverage)
112  highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
113  run_list, iov_coverage)
114  self.process_experimentprocess_experiment(run_list[0].exp, run_list, iteration,
115  lowest_exprun, highest_exprun)
116 
117  # Send final state and result to CAF.
118  self.send_result(self.machinemachine.result)
119  if (self.machinemachine.result.result == AlgResult.ok.value) or \
120  (self.machinemachine.result.result == AlgResult.iterate.value):
121  self.send_final_state(self.COMPLETED)
122  else:
123  self.send_final_state(self.FAILED)
124 
125  def execute_over_run_list(self, run_list, iteration, forced_calibration,
126  calibration_stage, output_file):
127  """
128  Execute over run list.
129  """
130  if not self.first_executionfirst_execution:
131  self.machinemachine.setup_algorithm()
132  else:
133  self.first_executionfirst_execution = False
134  self.machinemachine.algorithm.algorithm.setForcedCalibration(
135  forced_calibration)
136  self.machinemachine.algorithm.algorithm.setCalibrationStage(calibration_stage)
137  if (output_file is not None):
138  self.machinemachine.algorithm.algorithm.setOutputFileName(output_file)
139  self.machinemachine.execute_runs(runs=run_list, iteration=iteration,
140  apply_iov=None)
141  if (self.machinemachine.result.result == AlgResult.ok.value) or \
142  (self.machinemachine.result.result == AlgResult.iterate.value):
143  self.machinemachine.complete()
144  else:
145  self.machinemachine.fail()
146 
147  def process_experiment(self, experiment, experiment_runs, iteration,
148  lowest_exprun, highest_exprun):
149  """
150  Process runs from experiment.
151  """
152  # Run lists. They have the following format: run number,
153  # calibration result code, ExpRun, algorithm results,
154  # merge information, payload.
155  run_data = []
156 
157  # Initial run.
158  for exp_run in experiment_runs:
159  self.execute_over_run_listexecute_over_run_list(
160  [exp_run], iteration, False,
161  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
162  result = self.machinemachine.result.result
163  algorithm_results = KLMStripEfficiencyAlgorithm.Results(
164  self.machinemachine.algorithm.algorithm.getResults())
165  # If number of hits is 0, then KLM is excluded. Such runs
166  # can be ignored safely.
167  if (algorithm_results.getExtHits() > 0):
168  run_data.append([exp_run.run, result, [exp_run],
169  algorithm_results, '', None])
170  result_str = calibration_result_string(result)
171  basf2.B2INFO(f'Run {int(exp_run.run)}: {result_str}.')
172 
173  # Sort by run number.
174  run_data.sort(key=lambda x: x[0])
175 
176  # Create list of runs that do not have enough data.
177  run_ranges = []
178  i = 0
179  while (i < len(run_data)):
180  if (run_data[i][1] == 2):
181  j = i
182  while (run_data[j][1] == 2):
183  j += 1
184  if (j >= len(run_data)):
185  break
186  run_ranges.append([i, j])
187  i = j
188  else:
189  i += 1
190 
191  # Determine whether the runs with insufficient data can be merged to
192  # the next or previous normal run.
193  def can_merge(run_data, run_not_enough_data, run_normal):
194  return run_data[run_not_enough_data][3].newExtHitsPlanes(
195  run_data[run_normal][3].getExtHitsPlane()) == 0
196 
197  for run_range in run_ranges:
198  next_run = run_range[1]
199  # To mark as 'none' at the end if there are no normal runs.
200  j = run_range[0]
201  i = next_run - 1
202  if (next_run < len(run_data)):
203  while (i >= run_range[0]):
204  if (can_merge(run_data, i, next_run)):
205  basf2.B2INFO(f'Run {int(run_data[i][0])} (not enough data) can be merged into the next normal run ' +
206  f'{int(run_data[next_run][0])}.')
207  run_data[i][4] = 'next'
208  else:
209  basf2.B2INFO(f'Run {int(run_data[i][0])} (not enough data) cannot be merged into the next normal run ' +
210  f'{int(run_data[next_run][0])}, will try the previous one.')
211  break
212  i -= 1
213  if (i < run_range[0]):
214  continue
215  previous_run = run_range[0] - 1
216  if (previous_run >= 0):
217  while (j <= i):
218  if (can_merge(run_data, j, previous_run)):
219  basf2.B2INFO(f'Run {int(run_data[j][0])} (not enough data) can be merged into the previous normal run ' +
220  f'{int(run_data[previous_run][0])}.')
221  run_data[j][4] = 'previous'
222  else:
223  basf2.B2INFO(f'Run {int(run_data[j][0])} (not enough data) cannot be merged into the previous normal ' +
224  f'run {int(run_data[previous_run][0])}.')
225  break
226  j += 1
227  if (j > i):
228  continue
229  basf2.B2INFO('A range of runs with not enough data is found that cannot be merged into neither previous nor ' +
230  f'next normal run: from {int(run_data[j][0])} to {int(run_data[i][0])}.')
231  while (j <= i):
232  run_data[j][4] = 'none'
233  j += 1
234 
235  # Merge runs that do not have enough data. If both this and next
236  # run do not have enough data, then merge the collected data.
237  i = 0
238  j = 0
239  while (i < len(run_data) - 1):
240  while ((run_data[i][1] == 2) and (run_data[i + 1][1] == 2)):
241  if (run_data[i][4] != run_data[i + 1][4]):
242  break
243  basf2.B2INFO(f'Merging run {int(run_data[i + 1][0])} (not enough data) into run {int(run_data[i][0])} ' +
244  '(not enough data).')
245  run_data[i][2].extend(run_data[i + 1][2])
246  del run_data[i + 1]
247  self.execute_over_run_listexecute_over_run_list(
248  run_data[i][2], iteration, False,
249  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
250  run_data[i][1] = self.machinemachine.result.result
251  run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
252  self.machinemachine.algorithm.algorithm.getResults())
253  result_str = calibration_result_string(run_data[i][1])
254  basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
255  if (i >= len(run_data) - 1):
256  break
257  i += 1
258 
259  # Merge runs that do not have enough data into normal runs.
260  def merge_runs(run_data, run_not_enough_data, run_normal, forced):
261  basf2.B2INFO(f'Merging run {int(run_data[run_not_enough_data][0])} (not enough data) into run ' +
262  f'{int(run_data[run_normal][0])} (normal).')
263  run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
264  self.execute_over_run_listexecute_over_run_list(
265  run_data[run_normal][2], iteration, forced,
266  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
267  run_data[run_normal][1] = self.machinemachine.result.result
268  run_data[run_normal][3] = KLMStripEfficiencyAlgorithm.Results(
269  self.machinemachine.algorithm.algorithm.getResults())
270  result_str = calibration_result_string(run_data[run_normal][1])
271  basf2.B2INFO(f'Run {int(run_data[run_normal][0])}: {result_str}.')
272  if (run_data[run_normal][1] != 0):
273  basf2.B2FATAL(f'Merging run {int(run_data[run_not_enough_data][0])} into ' +
274  f'run {int(run_data[run_normal][0])} failed.')
275  del run_data[run_not_enough_data]
276 
277  i = 0
278  while (i < len(run_data)):
279  if (run_data[i][1] == 2):
280  if (run_data[i][4] == 'next'):
281  merge_runs(run_data, i, i + 1, False)
282  elif (run_data[i][4] == 'previous'):
283  merge_runs(run_data, i, i - 1, False)
284  else:
285  i += 1
286  else:
287  i += 1
288  i = 0
289  while (i < len(run_data)):
290  if (run_data[i][1] == 2 and run_data[i][4] == 'none'):
291  new_planes_previous = -1
292  new_planes_next = -1
293  if (i < len(run_data) - 1):
294  new_planes_next = run_data[i][3].newExtHitsPlanes(
295  run_data[i + 1][3].getExtHitsPlane())
296  basf2.B2INFO(f'There are {int(new_planes_next)} new active modules in run {int(run_data[i][0])} ' +
297  f'relatively to run {int(run_data[i + 1][0])}.')
298  if (i > 0):
299  new_planes_previous = run_data[i][3].newExtHitsPlanes(
300  run_data[i - 1][3].getExtHitsPlane())
301  basf2.B2INFO(f'There are {int(new_planes_previous)} new active modules in run {int(run_data[i][0])} ' +
302  f'relatively to run {int(run_data[i - 1][0])}.')
303  run_for_merging = -1
304  # If a forced merge of the normal run with another run from
305  # a different range of runs with not enough data has already
306  # been performed, then the list of active modules may change
307  # and there would be 0 new modules. Consequently, the number
308  # of modules is checked to be greater or equal than 0. However,
309  # there is no guarantee that the same added module would be
310  # calibrated normally. Thus, a forced merge is performed anyway.
311  if (new_planes_previous >= 0 and new_planes_next < 0):
312  run_for_merging = i - 1
313  elif (new_planes_previous < 0 and new_planes_next >= 0):
314  run_for_merging = i + 1
315  elif (new_planes_previous >= 0 and new_planes_next >= 0):
316  if (new_planes_previous < new_planes_next):
317  run_for_merging = i - 1
318  else:
319  run_for_merging = i + 1
320  else:
321  basf2.B2INFO(f'Cannot determine run for merging for run {int(run_data[i][0])}, performing its' +
322  ' forced calibration.')
323  self.execute_over_run_listexecute_over_run_list(
324  run_data[i][2], iteration, True,
325  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
326  None)
327  run_data[i][1] = self.machinemachine.result.result
328  run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
329  self.machinemachine.algorithm.algorithm.getResults())
330  result_str = calibration_result_string(run_data[i][1])
331  basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
332  if (run_data[i][1] != 0):
333  basf2.B2FATAL(f'Forced calibration of run {int(run_data[i][0])} failed.')
334  if (run_for_merging >= 0):
335  merge_runs(run_data, i, run_for_merging, True)
336  else:
337  i += 1
338 
339  # Stage 2: determination of maximal run ranges.
340  # The set of calibrated planes should be the same for all small
341  # run ranges within the large run range.
342  run_ranges.clear()
343  i = 0
344  while (i < len(run_data)):
345  j = i + 1
346  while (j < len(run_data)):
347  planes_differ = False
348  if (run_data[j][3].newMeasuredPlanes(
349  run_data[i][3].getEfficiency()) != 0):
350  planes_differ = True
351  if (run_data[i][3].newMeasuredPlanes(
352  run_data[j][3].getEfficiency()) != 0):
353  planes_differ = True
354  if (planes_differ):
355  basf2.B2INFO(f'Run {int(run_data[j][0])}: the set of planes is different from run {int(run_data[i][0])}.')
356  break
357  else:
358  basf2.B2INFO(f'Run {int(run_data[j][0])}: the set of planes is the same as for run {int(run_data[i][0])}.')
359  j = j + 1
360  run_ranges.append([i, j])
361  i = j
362 
363  # Stage 3: final calibration.
364 
365  # Output directory.
366  if (not os.path.isdir('efficiency')):
367  os.mkdir('efficiency')
368 
369  # Merge runs.
370  def merge_runs_2(run_data, run_1, run_2, forced):
371  basf2.B2INFO(f'Merging run {int(run_data[run_2][0])} into run {int(run_data[run_1][0])}.')
372  run_data[run_1][2].extend(run_data[run_2][2])
373  output_file = f'efficiency/efficiency_{int(run_data[run_1][2][0].exp)}_{int(run_data[run_1][2][0].run)}.root'
374  self.execute_over_run_listexecute_over_run_list(
375  run_data[run_1][2], iteration, forced,
376  KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
377  output_file)
378  run_data[run_1][1] = self.machinemachine.result.result
379  run_data[run_1][3] = KLMStripEfficiencyAlgorithm.Results(
380  self.machinemachine.algorithm.algorithm.getResults())
381  run_data[run_1][5] = \
382  self.machinemachine.algorithm.algorithm.getPayloadValues()
383  result_str = calibration_result_string(run_data[run_1][1])
384  basf2.B2INFO(f'Run {int(run_data[run_1][0])}: {result_str}; requested precision {0.02:f}, achieved precision ' +
385  f'{run_data[run_1][3].getAchievedPrecision():f}.')
386 
387  for run_range in run_ranges:
388  i = run_range[0]
389  while (i < run_range[1]):
390  output_file = f'efficiency/efficiency_{int(run_data[i][2][0].exp)}_{int(run_data[i][2][0].run)}.root'
391  # Force calibration if there are no more runs in the range.
392  if (i == run_range[1] - 1):
393  forced_calibration = True
394  else:
395  forced_calibration = False
396  self.execute_over_run_listexecute_over_run_list(
397  run_data[i][2], iteration, forced_calibration,
398  KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
399  output_file)
400  run_data[i][1] = self.machinemachine.result.result
401  run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
402  self.machinemachine.algorithm.algorithm.getResults())
403  run_data[i][5] = \
404  self.machinemachine.algorithm.algorithm.getPayloadValues()
405  result_str = calibration_result_string(run_data[i][1])
406  basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}; requested precision {0.02:f}, achieved precision ' +
407  f'{run_data[i][3].getAchievedPrecision():f}.')
408  if (run_data[i][1] == 2):
409  j = i + 1
410  while (j < run_range[1]):
411  # Force calibration if there are no more runs
412  # in the range.
413  if (j == run_range[1] - 1):
414  forced_calibration = True
415  else:
416  forced_calibration = False
417  merge_runs_2(run_data, i, j, forced_calibration)
418  run_data[j][1] = -1
419  j = j + 1
420  if (run_data[i][1] == 0):
421  break
422  i = j
423  else:
424  i = i + 1
425 
426  i = 0
427  while (i < len(run_data)):
428  if (run_data[i][1] == -1):
429  del run_data[i]
430  else:
431  i = i + 1
432 
433  # Stage 4: write the results to the database.
434  def commit_payload(run_data, run):
435  basf2.B2INFO(f'Writing run {int(run_data[run][0])}.')
436  self.machinemachine.algorithm.algorithm.commit(run_data[run][5])
437 
438  for i in range(0, len(run_data)):
439  # Get first run again due to possible mergings.
440  run_data[i][2].sort(key=lambda x: x.run)
441  first_run = run_data[i][2][0].run
442  # Set IOV for the current run.
443  # The last run will be overwritten when writing the result.
444  run_data[i][5].front().iov = \
445  Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
446  # Write the previous run.
447  if (i > 0):
448  iov = run_data[previous_run][5].front().iov
449  if (previous_run == 0):
450  run_data[previous_run][5].front().iov = \
452  lowest_exprun.exp, lowest_exprun.run,
453  experiment, first_run - 1)
454  else:
455  run_data[previous_run][5].front().iov = \
456  Belle2.IntervalOfValidity(experiment, iov.getRunLow(),
457  experiment, first_run - 1)
458  commit_payload(run_data, previous_run)
459  previous_run = i
460  if (i == 0):
461  previous_run = 0
462  # Write the current run if it is the last run.
463  if (i == len(run_data) - 1):
464  iov = run_data[i][5].front().iov
465  run_data[i][5].front().iov = \
467  experiment, iov.getRunLow(),
468  highest_exprun.exp, highest_exprun.run)
469  commit_payload(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
first_execution
Flag for the first execution of this AlgorithmStrategy.
def execute_over_run_list(self, run_list, iteration, forced_calibration, calibration_stage, output_file)
queue
The multiprocessing queue used to pass back results one at a time.