Belle II Software  release-05-01-25
klm_strip_efficiency.py
1 # -*- coding: utf-8 -*-
2 
3 """Custom calibration strategy for KLM strip efficiency."""
4 
5 import collections
6 import os
7 
8 import basf2
9 from ROOT import Belle2
10 
11 from caf.utils import ExpRun, IoV, AlgResult
12 from caf.utils import runs_overlapping_iov, runs_from_vector
13 from caf.utils import split_runs_by_exp
14 from caf.strategies import AlgorithmStrategy
15 from caf.state_machines import AlgorithmMachine
16 from ROOT.Belle2 import KLMStripEfficiencyAlgorithm
17 from klm_strategies_common import get_lowest_exprun, get_highest_exprun, \
18  calibration_result_string
19 
20 
21 class KLMStripEfficiency(AlgorithmStrategy):
22  """
23  Custom strategy for executing the KLM strip efficiency. Requires complex
24  run merging rules.
25 
26  This uses a `caf.state_machines.AlgorithmMachine` to actually execute
27  the various steps rather than operating on a CalibrationAlgorithm
28  C++ class directly.
29  """
30 
31 
33  usable_params = {'iov_coverage': IoV}
34 
35  def __init__(self, algorithm):
36  """
37  """
38  super().__init__(algorithm)
39 
42  self.machine = AlgorithmMachine(self.algorithm)
43 
44  self.first_execution = True
45 
46  def run(self, iov, iteration, queue):
47  """
48  Runs the algorithm machine over the collected data and
49  fills the results.
50  """
51  if not self.is_valid():
52  raise StrategyError('The strategy KLMStripEfficiency was not '
53  'set up correctly.')
54 
55  self.queue = queue
56 
57  basf2.B2INFO(f'Setting up {self.__class__.__name__} strategy '
58  f'for {self.algorithm.name}')
59  # Add all the necessary parameters for a strategy to run.
60  machine_params = {}
61  machine_params['database_chain'] = self.database_chain
62  machine_params['dependent_databases'] = self.dependent_databases
63  machine_params['output_dir'] = self.output_dir
64  machine_params['output_database_dir'] = self.output_database_dir
65  machine_params['input_files'] = self.input_files
66  machine_params['ignored_runs'] = self.ignored_runs
67  self.machine.setup_from_dict(machine_params)
68  # Start moving through machine states.
69  basf2.B2INFO(f'Starting AlgorithmMachine of {self.algorithm.name}')
70  self.algorithm.algorithm.setCalibrationStage(
71  KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement)
72  # This sets up the logging and database chain and assigns all
73  # input files from collector jobs.
74  self.machine.setup_algorithm(iteration=iteration)
75  # After this point, the logging is in the stdout of the algorithm.
76  basf2.B2INFO(f'Beginning execution of {self.algorithm.name} using '
77  f'strategy {self.__class__.__name__}')
78 
79  # Select of runs for calibration.
80  runs = self.algorithm.algorithm.getRunListFromAllData()
81  all_runs_collected = set(runs_from_vector(runs))
82  # Select runs overlapping with the calibration IOV if it is specified.
83  if iov:
84  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
85  else:
86  runs_to_execute = all_runs_collected
87  # Remove the ignored runs.
88  if self.ignored_runs:
89  basf2.B2INFO(f'Removing the ignored_runs from the runs '
90  f'to execute for {self.algorithm.name}')
91  runs_to_execute.difference_update(set(self.ignored_runs))
92 
93  # Creation of sorted run list split by experiment.
94  runs_to_execute = sorted(runs_to_execute)
95  runs_to_execute = split_runs_by_exp(runs_to_execute)
96 
97  # Get IOV coverage,
98  iov_coverage = None
99  if 'iov_coverage' in self.algorithm.params:
100  iov_coverage = self.algorithm.params['iov_coverage']
101 
102  # Iterate over experiment run lists.
103  number_of_experiments = len(runs_to_execute)
104  for i_exp, run_list in enumerate(runs_to_execute, start=1):
105  lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
106  run_list, iov_coverage)
107  highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
108  run_list, iov_coverage)
109  self.process_experiment(run_list[0].exp, run_list, iteration,
110  lowest_exprun, highest_exprun)
111 
112  # Send final state and result to CAF.
113  self.send_result(self.machine.result)
114  if (self.machine.result.result == AlgResult.ok.value) or \
115  (self.machine.result.result == AlgResult.iterate.value):
116  self.send_final_state(self.COMPLETED)
117  else:
118  self.send_final_state(self.FAILED)
119 
120  def execute_over_run_list(self, run_list, iteration, forced_calibration,
121  calibration_stage, output_file):
122  """
123  Execute over run list.
124  """
125  if not self.first_execution:
126  self.machine.setup_algorithm()
127  else:
128  self.first_execution = False
129  self.machine.algorithm.algorithm.setForcedCalibration(
130  forced_calibration)
131  self.machine.algorithm.algorithm.setCalibrationStage(calibration_stage)
132  if (output_file is not None):
133  self.machine.algorithm.algorithm.setOutputFileName(output_file)
134  self.machine.execute_runs(runs=run_list, iteration=iteration,
135  apply_iov=None)
136  if (self.machine.result.result == AlgResult.ok.value) or \
137  (self.machine.result.result == AlgResult.iterate.value):
138  self.machine.complete()
139  else:
140  self.machine.fail()
141 
142  def process_experiment(self, experiment, experiment_runs, iteration,
143  lowest_exprun, highest_exprun):
144  """
145  Process runs from experiment.
146  """
147  # Run lists. They have the following format: run number,
148  # calibration result code, ExpRun, algorithm results,
149  # merge information, payload.
150  run_data = []
151 
152  # Initial run.
153  for exp_run in experiment_runs:
155  [exp_run], iteration, False,
156  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
157  result = self.machine.result.result
158  algorithm_results = KLMStripEfficiencyAlgorithm.Results(
159  self.machine.algorithm.algorithm.getResults())
160  # If number of hits is 0, then KLM is excluded. Such runs
161  # can be ignored safely.
162  if (algorithm_results.getExtHits() > 0):
163  run_data.append([exp_run.run, result, [exp_run],
164  algorithm_results, '', None])
165  result_str = calibration_result_string(result)
166  basf2.B2INFO('Run %d: %s.' % (exp_run.run, result_str))
167 
168  # Sort by run number.
169  run_data.sort(key=lambda x: x[0])
170 
171  # Create list of runs that do not have enough data.
172  run_ranges = []
173  i = 0
174  while (i < len(run_data)):
175  if (run_data[i][1] == 2):
176  j = i
177  while (run_data[j][1] == 2):
178  j += 1
179  if (j >= len(run_data)):
180  break
181  run_ranges.append([i, j])
182  i = j
183  else:
184  i += 1
185 
186  # Determine whether the runs with insufficient data can be merged to
187  # the next or previous normal run.
188  def can_merge(run_data, run_not_enough_data, run_normal):
189  return run_data[run_not_enough_data][3].newExtHitsPlanes(
190  run_data[run_normal][3].getExtHitsPlane()) == 0
191 
192  for run_range in run_ranges:
193  next_run = run_range[1]
194  # To mark as 'none' at the end if there are no normal runs.
195  j = run_range[0]
196  i = next_run - 1
197  if (next_run < len(run_data)):
198  while (i >= run_range[0]):
199  if (can_merge(run_data, i, next_run)):
200  basf2.B2INFO('Run %d (not enough data) can be merged '
201  'into the next normal run %d.' %
202  (run_data[i][0], run_data[next_run][0]))
203  run_data[i][4] = 'next'
204  else:
205  basf2.B2INFO('Run %d (not enough data) cannot be '
206  'merged into the next normal run %d, '
207  'will try the previous one.' %
208  (run_data[i][0], run_data[next_run][0]))
209  break
210  i -= 1
211  if (i < run_range[0]):
212  continue
213  previous_run = run_range[0] - 1
214  if (previous_run >= 0):
215  while (j <= i):
216  if (can_merge(run_data, j, previous_run)):
217  basf2.B2INFO('Run %d (not enough data) can be merged '
218  'into the previous normal run %d.' %
219  (run_data[j][0],
220  run_data[previous_run][0]))
221  run_data[j][4] = 'previous'
222  else:
223  basf2.B2INFO('Run %d (not enough data) cannot be '
224  'merged into the previous normal run %d.' %
225  (run_data[j][0],
226  run_data[previous_run][0]))
227  break
228  j += 1
229  if (j > i):
230  continue
231  basf2.B2INFO('A range of runs with not enough data is found '
232  'that cannot be merged into neither previous nor '
233  'next normal run: from %d to %d.' %
234  (run_data[j][0], run_data[i][0]))
235  while (j <= i):
236  run_data[j][4] = 'none'
237  j += 1
238 
239  # Merge runs that do not have enough data. If both this and next
240  # run do not have enough data, then merge the collected data.
241  i = 0
242  j = 0
243  while (i < len(run_data) - 1):
244  while ((run_data[i][1] == 2) and (run_data[i + 1][1] == 2)):
245  if (run_data[i][4] != run_data[i + 1][4]):
246  break
247  basf2.B2INFO('Merging run %d (not enough data) into '
248  'run %d (not enough data).' %
249  (run_data[i + 1][0], run_data[i][0]))
250  run_data[i][2].extend(run_data[i + 1][2])
251  del run_data[i + 1]
253  run_data[i][2], iteration, False,
254  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
255  run_data[i][1] = self.machine.result.result
256  run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
257  self.machine.algorithm.algorithm.getResults())
258  result_str = calibration_result_string(run_data[i][1])
259  basf2.B2INFO('Run %d: %s.' % (run_data[i][0], result_str))
260  if (i >= len(run_data) - 1):
261  break
262  i += 1
263 
264  # Merge runs that do not have enough data into normal runs.
265  def merge_runs(run_data, run_not_enough_data, run_normal, forced):
266  basf2.B2INFO('Merging run %d (not enough data) into '
267  'run %d (normal).' %
268  (run_data[run_not_enough_data][0],
269  run_data[run_normal][0]))
270  run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
272  run_data[run_normal][2], iteration, forced,
273  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
274  run_data[run_normal][1] = self.machine.result.result
275  run_data[run_normal][3] = KLMStripEfficiencyAlgorithm.Results(
276  self.machine.algorithm.algorithm.getResults())
277  result_str = calibration_result_string(run_data[run_normal][1])
278  basf2.B2INFO('Run %d: %s.' % (run_data[run_normal][0], result_str))
279  if (run_data[run_normal][1] != 0):
280  basf2.B2FATAL('Merging run %d into run %d failed.' %
281  (run_data[run_not_enough_data][0],
282  run_data[run_normal][0]))
283  del run_data[run_not_enough_data]
284 
285  i = 0
286  while (i < len(run_data)):
287  if (run_data[i][1] == 2):
288  if (run_data[i][4] == 'next'):
289  merge_runs(run_data, i, i + 1, False)
290  elif (run_data[i][4] == 'previous'):
291  merge_runs(run_data, i, i - 1, False)
292  else:
293  i += 1
294  else:
295  i += 1
296  i = 0
297  while (i < len(run_data)):
298  if (run_data[i][1] == 2 and run_data[i][4] == 'none'):
299  new_planes_previous = -1
300  new_planes_next = -1
301  if (i < len(run_data) - 1):
302  new_planes_next = run_data[i][3].newExtHitsPlanes(
303  run_data[i + 1][3].getExtHitsPlane())
304  basf2.B2INFO('There are %d new active modules in run %d '
305  'relatively to run %d.' %
306  (new_planes_next, run_data[i][0],
307  run_data[i + 1][0]))
308  if (i > 0):
309  new_planes_previous = run_data[i][3].newExtHitsPlanes(
310  run_data[i - 1][3].getExtHitsPlane())
311  basf2.B2INFO('There are %d new active modules in run %d '
312  'relatively to run %d.' %
313  (new_planes_previous,
314  run_data[i][0], run_data[i - 1][0]))
315  run_for_merging = -1
316  # If a forced merge of the normal run with another run from
317  # a different range of runs with not enough data has already
318  # been performed, then the list of active modules may change
319  # and there would be 0 new modules. Consequently, the number
320  # of modules is checked to be greater or equal than 0. However,
321  # there is no guarantee that the same added module would be
322  # calibrated normally. Thus, a forced merge is performed anyway.
323  if (new_planes_previous >= 0 and new_planes_next < 0):
324  run_for_merging = i - 1
325  elif (new_planes_previous < 0 and new_planes_next >= 0):
326  run_for_merging = i + 1
327  elif (new_planes_previous >= 0 and new_planes_next >= 0):
328  if (new_planes_previous < new_planes_next):
329  run_for_merging = i - 1
330  else:
331  run_for_merging = i + 1
332  else:
333  basf2.B2INFO('Cannot determine run for merging for run %d, '
334  'performing its forced calibration.' %
335  (run_data[i][0]))
337  run_data[i][2], iteration, True,
338  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
339  None)
340  run_data[i][1] = self.machine.result.result
341  run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
342  self.machine.algorithm.algorithm.getResults())
343  result_str = calibration_result_string(run_data[i][1])
344  basf2.B2INFO('Run %d: %s.' % (run_data[i][0], result_str))
345  if (run_data[i][1] != 0):
346  basf2.B2FATAL('Forced calibration of run %d failed.' %
347  (run_data[i][0]))
348  if (run_for_merging >= 0):
349  merge_runs(run_data, i, run_for_merging, True)
350  else:
351  i += 1
352 
353  # Stage 2: determination of maximal run ranges.
354  # The set of calibrated planes should be the same for all small
355  # run ranges within the large run range.
356  run_ranges.clear()
357  i = 0
358  while (i < len(run_data)):
359  j = i + 1
360  while (j < len(run_data)):
361  planes_differ = False
362  if (run_data[j][3].newMeasuredPlanes(
363  run_data[i][3].getEfficiency()) != 0):
364  planes_differ = True
365  if (run_data[i][3].newMeasuredPlanes(
366  run_data[j][3].getEfficiency()) != 0):
367  planes_differ = True
368  if (planes_differ):
369  basf2.B2INFO('Run %d: the set of planes is different '
370  'from run %d.'
371  % (run_data[j][0], run_data[i][0]))
372  break
373  else:
374  basf2.B2INFO('Run %d: the set of planes is the same '
375  'as for run %d.'
376  % (run_data[j][0], run_data[i][0]))
377  j = j + 1
378  run_ranges.append([i, j])
379  i = j
380 
381  # Stage 3: final calibration.
382 
383  # Output directory.
384  if (not os.path.isdir('efficiency')):
385  os.mkdir('efficiency')
386 
387  # Merge runs.
388  def merge_runs_2(run_data, run_1, run_2, forced):
389  basf2.B2INFO('Merging run %d into run %d.' %
390  (run_data[run_2][0], run_data[run_1][0]))
391  run_data[run_1][2].extend(run_data[run_2][2])
392  output_file = 'efficiency/efficiency_%d_%d.root' % \
393  (run_data[run_1][2][0].exp, run_data[run_1][2][0].run)
395  run_data[run_1][2], iteration, forced,
396  KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
397  output_file)
398  run_data[run_1][1] = self.machine.result.result
399  run_data[run_1][3] = KLMStripEfficiencyAlgorithm.Results(
400  self.machine.algorithm.algorithm.getResults())
401  run_data[run_1][5] = \
402  self.machine.algorithm.algorithm.getPayloadValues()
403  result_str = calibration_result_string(run_data[run_1][1])
404  basf2.B2INFO('Run %d: %s; requested precision %f, achieved '
405  'precision %f.' %
406  (run_data[run_1][0], result_str, 0.02,
407  # run_data[run_1][3].getRequestedPrecision(),
408  run_data[run_1][3].getAchievedPrecision()))
409 
410  for run_range in run_ranges:
411  i = run_range[0]
412  while (i < run_range[1]):
413  output_file = 'efficiency/efficiency_%d_%d.root' % \
414  (run_data[i][2][0].exp, run_data[i][2][0].run)
415  # Force calibration if there are no more runs in the range.
416  if (i == run_range[1] - 1):
417  forced_calibration = True
418  else:
419  forced_calibration = False
421  run_data[i][2], iteration, forced_calibration,
422  KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
423  output_file)
424  run_data[i][1] = self.machine.result.result
425  run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
426  self.machine.algorithm.algorithm.getResults())
427  run_data[i][5] = \
428  self.machine.algorithm.algorithm.getPayloadValues()
429  result_str = calibration_result_string(run_data[i][1])
430  basf2.B2INFO('Run %d: %s; requested precision %f, achieved '
431  'precision %f.' %
432  (run_data[i][0], result_str, 0.02,
433  # run_data[i][3].getRequestedPrecision(),
434  run_data[i][3].getAchievedPrecision()))
435  if (run_data[i][1] == 2):
436  j = i + 1
437  while (j < run_range[1]):
438  # Force calibration if there are no more runs
439  # in the range.
440  if (j == run_range[1] - 1):
441  forced_calibration = True
442  else:
443  forced_calibration = False
444  merge_runs_2(run_data, i, j, forced_calibration)
445  run_data[j][1] = -1
446  j = j + 1
447  if (run_data[i][1] == 0):
448  break
449  i = j
450  else:
451  i = i + 1
452 
453  i = 0
454  while (i < len(run_data)):
455  if (run_data[i][1] == -1):
456  del run_data[i]
457  else:
458  i = i + 1
459 
460  # Stage 4: write the results to the database.
461  def commit_payload(run_data, run):
462  basf2.B2INFO('Writing run %d.' % (run_data[run][0]))
463  self.machine.algorithm.algorithm.commit(run_data[run][5])
464 
465  for i in range(0, len(run_data)):
466  # Get first run again due to possible mergings.
467  run_data[i][2].sort(key=lambda x: x.run)
468  first_run = run_data[i][2][0].run
469  # Set IOV for the current run.
470  # The last run will be overwritten when writing the result.
471  run_data[i][5].front().iov = \
472  Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
473  # Write the previous run.
474  if (i > 0):
475  iov = run_data[previous_run][5].front().iov
476  if (previous_run == 0):
477  run_data[previous_run][5].front().iov = \
479  lowest_exprun.exp, lowest_exprun.run,
480  experiment, first_run - 1)
481  else:
482  run_data[previous_run][5].front().iov = \
483  Belle2.IntervalOfValidity(experiment, iov.getRunLow(),
484  experiment, first_run - 1)
485  commit_payload(run_data, previous_run)
486  previous_run = i
487  if (i == 0):
488  previous_run = 0
489  # Write the current run if it is the last run.
490  if (i == len(run_data) - 1):
491  iov = run_data[i][5].front().iov
492  run_data[i][5].front().iov = \
494  experiment, iov.getRunLow(),
495  highest_exprun.exp, highest_exprun.run)
496  commit_payload(run_data, i)
Belle2::IntervalOfValidity
A class that describes the interval of experiments/runs for which an object in the database is valid.
Definition: IntervalOfValidity.h:35
klm_strip_efficiency.KLMStripEfficiency.run
def run(self, iov, iteration, queue)
Definition: klm_strip_efficiency.py:46
klm_strip_efficiency.KLMStripEfficiency.process_experiment
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
Definition: klm_strip_efficiency.py:142
klm_strip_efficiency.KLMStripEfficiency.__init__
def __init__(self, algorithm)
Definition: klm_strip_efficiency.py:35
klm_strip_efficiency.KLMStripEfficiency.queue
queue
The multiprocessing queue used to pass back results one at a time.
Definition: klm_strip_efficiency.py:55
klm_strip_efficiency.KLMStripEfficiency.first_execution
first_execution
Flag for the first execution of this AlgorithmStrategy.
Definition: klm_strip_efficiency.py:44
klm_strip_efficiency.KLMStripEfficiency
Definition: klm_strip_efficiency.py:21
klm_strip_efficiency.KLMStripEfficiency.machine
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
Definition: klm_strip_efficiency.py:42
klm_strip_efficiency.KLMStripEfficiency.execute_over_run_list
def execute_over_run_list(self, run_list, iteration, forced_calibration, calibration_stage, output_file)
Definition: klm_strip_efficiency.py:120