Belle II Software  release-06-00-14
klm_strip_efficiency.py
1 # -*- coding: utf-8 -*-
2 
3 
10 
11 """Custom calibration strategy for KLM strip efficiency."""
12 
13 import os
14 
15 import basf2
16 from ROOT import Belle2
17 
18 from caf.utils import AlgResult, IoV
19 from caf.utils import runs_overlapping_iov, runs_from_vector
20 from caf.utils import split_runs_by_exp
21 from caf.strategies import AlgorithmStrategy, StrategyError
22 from caf.state_machines import AlgorithmMachine
23 from ROOT.Belle2 import KLMStripEfficiencyAlgorithm
24 from klm_strategies_common import get_lowest_exprun, get_highest_exprun, \
25  calibration_result_string
26 
27 
28 class KLMStripEfficiency(AlgorithmStrategy):
29  """
30  Custom strategy for executing the KLM strip efficiency. Requires complex
31  run merging rules.
32 
33  This uses a `caf.state_machines.AlgorithmMachine` to actually execute
34  the various steps rather than operating on a CalibrationAlgorithm
35  C++ class directly.
36  """
37 
38 
40  usable_params = {'iov_coverage': IoV}
41 
42  def __init__(self, algorithm):
43  """
44  """
45  super().__init__(algorithm)
46 
49  self.machinemachine = AlgorithmMachine(self.algorithm)
50 
51  self.first_executionfirst_execution = True
52 
53  def run(self, iov, iteration, queue):
54  """
55  Runs the algorithm machine over the collected data and
56  fills the results.
57  """
58  if not self.is_valid():
59  raise StrategyError('The strategy KLMStripEfficiency was not '
60  'set up correctly.')
61 
62  self.queuequeue = queue
63 
64  basf2.B2INFO(f'Setting up {self.__class__.__name__} strategy '
65  f'for {self.algorithm.name}')
66  # Add all the necessary parameters for a strategy to run.
67  machine_params = {}
68  machine_params['database_chain'] = self.database_chain
69  machine_params['dependent_databases'] = self.dependent_databases
70  machine_params['output_dir'] = self.output_dir
71  machine_params['output_database_dir'] = self.output_database_dir
72  machine_params['input_files'] = self.input_files
73  machine_params['ignored_runs'] = self.ignored_runs
74  self.machinemachine.setup_from_dict(machine_params)
75  # Start moving through machine states.
76  basf2.B2INFO(f'Starting AlgorithmMachine of {self.algorithm.name}')
77  self.algorithm.algorithm.setCalibrationStage(
78  KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement)
79  # This sets up the logging and database chain and assigns all
80  # input files from collector jobs.
81  self.machinemachine.setup_algorithm(iteration=iteration)
82  # After this point, the logging is in the stdout of the algorithm.
83  basf2.B2INFO(f'Beginning execution of {self.algorithm.name} using '
84  f'strategy {self.__class__.__name__}')
85 
86  # Select of runs for calibration.
87  runs = self.algorithm.algorithm.getRunListFromAllData()
88  all_runs_collected = set(runs_from_vector(runs))
89  # Select runs overlapping with the calibration IOV if it is specified.
90  if iov:
91  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
92  else:
93  runs_to_execute = all_runs_collected
94  # Remove the ignored runs.
95  if self.ignored_runs:
96  basf2.B2INFO(f'Removing the ignored_runs from the runs '
97  f'to execute for {self.algorithm.name}')
98  runs_to_execute.difference_update(set(self.ignored_runs))
99 
100  # Creation of sorted run list split by experiment.
101  runs_to_execute = sorted(runs_to_execute)
102  runs_to_execute = split_runs_by_exp(runs_to_execute)
103 
104  # Get IOV coverage,
105  iov_coverage = None
106  if 'iov_coverage' in self.algorithm.params:
107  iov_coverage = self.algorithm.params['iov_coverage']
108 
109  # Iterate over experiment run lists.
110  number_of_experiments = len(runs_to_execute)
111  for i_exp, run_list in enumerate(runs_to_execute, start=1):
112  lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
113  run_list, iov_coverage)
114  highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
115  run_list, iov_coverage)
116  self.process_experimentprocess_experiment(run_list[0].exp, run_list, iteration,
117  lowest_exprun, highest_exprun)
118 
119  # Send final state and result to CAF.
120  self.send_result(self.machinemachine.result)
121  if (self.machinemachine.result.result == AlgResult.ok.value) or \
122  (self.machinemachine.result.result == AlgResult.iterate.value):
123  self.send_final_state(self.COMPLETED)
124  else:
125  self.send_final_state(self.FAILED)
126 
127  def execute_over_run_list(self, run_list, iteration, forced_calibration,
128  calibration_stage, output_file):
129  """
130  Execute over run list.
131  """
132  if not self.first_executionfirst_execution:
133  self.machinemachine.setup_algorithm()
134  else:
135  self.first_executionfirst_execution = False
136  self.machinemachine.algorithm.algorithm.setForcedCalibration(
137  forced_calibration)
138  self.machinemachine.algorithm.algorithm.setCalibrationStage(calibration_stage)
139  if (output_file is not None):
140  self.machinemachine.algorithm.algorithm.setOutputFileName(output_file)
141  self.machinemachine.execute_runs(runs=run_list, iteration=iteration,
142  apply_iov=None)
143  if (self.machinemachine.result.result == AlgResult.ok.value) or \
144  (self.machinemachine.result.result == AlgResult.iterate.value):
145  self.machinemachine.complete()
146  else:
147  self.machinemachine.fail()
148 
149  def process_experiment(self, experiment, experiment_runs, iteration,
150  lowest_exprun, highest_exprun):
151  """
152  Process runs from experiment.
153  """
154  # Run lists. They have the following format: run number,
155  # calibration result code, ExpRun, algorithm results,
156  # merge information, payload.
157  run_data = []
158 
159  # Initial run.
160  for exp_run in experiment_runs:
161  self.execute_over_run_listexecute_over_run_list(
162  [exp_run], iteration, False,
163  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
164  result = self.machinemachine.result.result
165  algorithm_results = KLMStripEfficiencyAlgorithm.Results(
166  self.machinemachine.algorithm.algorithm.getResults())
167  # If number of hits is 0, then KLM is excluded. Such runs
168  # can be ignored safely.
169  if (algorithm_results.getExtHits() > 0):
170  run_data.append([exp_run.run, result, [exp_run],
171  algorithm_results, '', None])
172  result_str = calibration_result_string(result)
173  basf2.B2INFO('Run %d: %s.' % (exp_run.run, result_str))
174 
175  # Sort by run number.
176  run_data.sort(key=lambda x: x[0])
177 
178  # Create list of runs that do not have enough data.
179  run_ranges = []
180  i = 0
181  while (i < len(run_data)):
182  if (run_data[i][1] == 2):
183  j = i
184  while (run_data[j][1] == 2):
185  j += 1
186  if (j >= len(run_data)):
187  break
188  run_ranges.append([i, j])
189  i = j
190  else:
191  i += 1
192 
193  # Determine whether the runs with insufficient data can be merged to
194  # the next or previous normal run.
195  def can_merge(run_data, run_not_enough_data, run_normal):
196  return run_data[run_not_enough_data][3].newExtHitsPlanes(
197  run_data[run_normal][3].getExtHitsPlane()) == 0
198 
199  for run_range in run_ranges:
200  next_run = run_range[1]
201  # To mark as 'none' at the end if there are no normal runs.
202  j = run_range[0]
203  i = next_run - 1
204  if (next_run < len(run_data)):
205  while (i >= run_range[0]):
206  if (can_merge(run_data, i, next_run)):
207  basf2.B2INFO('Run %d (not enough data) can be merged '
208  'into the next normal run %d.' %
209  (run_data[i][0], run_data[next_run][0]))
210  run_data[i][4] = 'next'
211  else:
212  basf2.B2INFO('Run %d (not enough data) cannot be '
213  'merged into the next normal run %d, '
214  'will try the previous one.' %
215  (run_data[i][0], run_data[next_run][0]))
216  break
217  i -= 1
218  if (i < run_range[0]):
219  continue
220  previous_run = run_range[0] - 1
221  if (previous_run >= 0):
222  while (j <= i):
223  if (can_merge(run_data, j, previous_run)):
224  basf2.B2INFO('Run %d (not enough data) can be merged '
225  'into the previous normal run %d.' %
226  (run_data[j][0],
227  run_data[previous_run][0]))
228  run_data[j][4] = 'previous'
229  else:
230  basf2.B2INFO('Run %d (not enough data) cannot be '
231  'merged into the previous normal run %d.' %
232  (run_data[j][0],
233  run_data[previous_run][0]))
234  break
235  j += 1
236  if (j > i):
237  continue
238  basf2.B2INFO('A range of runs with not enough data is found '
239  'that cannot be merged into neither previous nor '
240  'next normal run: from %d to %d.' %
241  (run_data[j][0], run_data[i][0]))
242  while (j <= i):
243  run_data[j][4] = 'none'
244  j += 1
245 
246  # Merge runs that do not have enough data. If both this and next
247  # run do not have enough data, then merge the collected data.
248  i = 0
249  j = 0
250  while (i < len(run_data) - 1):
251  while ((run_data[i][1] == 2) and (run_data[i + 1][1] == 2)):
252  if (run_data[i][4] != run_data[i + 1][4]):
253  break
254  basf2.B2INFO('Merging run %d (not enough data) into '
255  'run %d (not enough data).' %
256  (run_data[i + 1][0], run_data[i][0]))
257  run_data[i][2].extend(run_data[i + 1][2])
258  del run_data[i + 1]
259  self.execute_over_run_listexecute_over_run_list(
260  run_data[i][2], iteration, False,
261  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
262  run_data[i][1] = self.machinemachine.result.result
263  run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
264  self.machinemachine.algorithm.algorithm.getResults())
265  result_str = calibration_result_string(run_data[i][1])
266  basf2.B2INFO('Run %d: %s.' % (run_data[i][0], result_str))
267  if (i >= len(run_data) - 1):
268  break
269  i += 1
270 
271  # Merge runs that do not have enough data into normal runs.
272  def merge_runs(run_data, run_not_enough_data, run_normal, forced):
273  basf2.B2INFO('Merging run %d (not enough data) into '
274  'run %d (normal).' %
275  (run_data[run_not_enough_data][0],
276  run_data[run_normal][0]))
277  run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
278  self.execute_over_run_listexecute_over_run_list(
279  run_data[run_normal][2], iteration, forced,
280  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck, None)
281  run_data[run_normal][1] = self.machinemachine.result.result
282  run_data[run_normal][3] = KLMStripEfficiencyAlgorithm.Results(
283  self.machinemachine.algorithm.algorithm.getResults())
284  result_str = calibration_result_string(run_data[run_normal][1])
285  basf2.B2INFO('Run %d: %s.' % (run_data[run_normal][0], result_str))
286  if (run_data[run_normal][1] != 0):
287  basf2.B2FATAL('Merging run %d into run %d failed.' %
288  (run_data[run_not_enough_data][0],
289  run_data[run_normal][0]))
290  del run_data[run_not_enough_data]
291 
292  i = 0
293  while (i < len(run_data)):
294  if (run_data[i][1] == 2):
295  if (run_data[i][4] == 'next'):
296  merge_runs(run_data, i, i + 1, False)
297  elif (run_data[i][4] == 'previous'):
298  merge_runs(run_data, i, i - 1, False)
299  else:
300  i += 1
301  else:
302  i += 1
303  i = 0
304  while (i < len(run_data)):
305  if (run_data[i][1] == 2 and run_data[i][4] == 'none'):
306  new_planes_previous = -1
307  new_planes_next = -1
308  if (i < len(run_data) - 1):
309  new_planes_next = run_data[i][3].newExtHitsPlanes(
310  run_data[i + 1][3].getExtHitsPlane())
311  basf2.B2INFO('There are %d new active modules in run %d '
312  'relatively to run %d.' %
313  (new_planes_next, run_data[i][0],
314  run_data[i + 1][0]))
315  if (i > 0):
316  new_planes_previous = run_data[i][3].newExtHitsPlanes(
317  run_data[i - 1][3].getExtHitsPlane())
318  basf2.B2INFO('There are %d new active modules in run %d '
319  'relatively to run %d.' %
320  (new_planes_previous,
321  run_data[i][0], run_data[i - 1][0]))
322  run_for_merging = -1
323  # If a forced merge of the normal run with another run from
324  # a different range of runs with not enough data has already
325  # been performed, then the list of active modules may change
326  # and there would be 0 new modules. Consequently, the number
327  # of modules is checked to be greater or equal than 0. However,
328  # there is no guarantee that the same added module would be
329  # calibrated normally. Thus, a forced merge is performed anyway.
330  if (new_planes_previous >= 0 and new_planes_next < 0):
331  run_for_merging = i - 1
332  elif (new_planes_previous < 0 and new_planes_next >= 0):
333  run_for_merging = i + 1
334  elif (new_planes_previous >= 0 and new_planes_next >= 0):
335  if (new_planes_previous < new_planes_next):
336  run_for_merging = i - 1
337  else:
338  run_for_merging = i + 1
339  else:
340  basf2.B2INFO('Cannot determine run for merging for run %d, '
341  'performing its forced calibration.' %
342  (run_data[i][0]))
343  self.execute_over_run_listexecute_over_run_list(
344  run_data[i][2], iteration, True,
345  KLMStripEfficiencyAlgorithm.c_MeasurablePlaneCheck,
346  None)
347  run_data[i][1] = self.machinemachine.result.result
348  run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
349  self.machinemachine.algorithm.algorithm.getResults())
350  result_str = calibration_result_string(run_data[i][1])
351  basf2.B2INFO('Run %d: %s.' % (run_data[i][0], result_str))
352  if (run_data[i][1] != 0):
353  basf2.B2FATAL('Forced calibration of run %d failed.' %
354  (run_data[i][0]))
355  if (run_for_merging >= 0):
356  merge_runs(run_data, i, run_for_merging, True)
357  else:
358  i += 1
359 
360  # Stage 2: determination of maximal run ranges.
361  # The set of calibrated planes should be the same for all small
362  # run ranges within the large run range.
363  run_ranges.clear()
364  i = 0
365  while (i < len(run_data)):
366  j = i + 1
367  while (j < len(run_data)):
368  planes_differ = False
369  if (run_data[j][3].newMeasuredPlanes(
370  run_data[i][3].getEfficiency()) != 0):
371  planes_differ = True
372  if (run_data[i][3].newMeasuredPlanes(
373  run_data[j][3].getEfficiency()) != 0):
374  planes_differ = True
375  if (planes_differ):
376  basf2.B2INFO('Run %d: the set of planes is different '
377  'from run %d.'
378  % (run_data[j][0], run_data[i][0]))
379  break
380  else:
381  basf2.B2INFO('Run %d: the set of planes is the same '
382  'as for run %d.'
383  % (run_data[j][0], run_data[i][0]))
384  j = j + 1
385  run_ranges.append([i, j])
386  i = j
387 
388  # Stage 3: final calibration.
389 
390  # Output directory.
391  if (not os.path.isdir('efficiency')):
392  os.mkdir('efficiency')
393 
394  # Merge runs.
395  def merge_runs_2(run_data, run_1, run_2, forced):
396  basf2.B2INFO('Merging run %d into run %d.' %
397  (run_data[run_2][0], run_data[run_1][0]))
398  run_data[run_1][2].extend(run_data[run_2][2])
399  output_file = 'efficiency/efficiency_%d_%d.root' % \
400  (run_data[run_1][2][0].exp, run_data[run_1][2][0].run)
401  self.execute_over_run_listexecute_over_run_list(
402  run_data[run_1][2], iteration, forced,
403  KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
404  output_file)
405  run_data[run_1][1] = self.machinemachine.result.result
406  run_data[run_1][3] = KLMStripEfficiencyAlgorithm.Results(
407  self.machinemachine.algorithm.algorithm.getResults())
408  run_data[run_1][5] = \
409  self.machinemachine.algorithm.algorithm.getPayloadValues()
410  result_str = calibration_result_string(run_data[run_1][1])
411  basf2.B2INFO('Run %d: %s; requested precision %f, achieved '
412  'precision %f.' %
413  (run_data[run_1][0], result_str, 0.02,
414  # run_data[run_1][3].getRequestedPrecision(),
415  run_data[run_1][3].getAchievedPrecision()))
416 
417  for run_range in run_ranges:
418  i = run_range[0]
419  while (i < run_range[1]):
420  output_file = 'efficiency/efficiency_%d_%d.root' % \
421  (run_data[i][2][0].exp, run_data[i][2][0].run)
422  # Force calibration if there are no more runs in the range.
423  if (i == run_range[1] - 1):
424  forced_calibration = True
425  else:
426  forced_calibration = False
427  self.execute_over_run_listexecute_over_run_list(
428  run_data[i][2], iteration, forced_calibration,
429  KLMStripEfficiencyAlgorithm.c_EfficiencyMeasurement,
430  output_file)
431  run_data[i][1] = self.machinemachine.result.result
432  run_data[i][3] = KLMStripEfficiencyAlgorithm.Results(
433  self.machinemachine.algorithm.algorithm.getResults())
434  run_data[i][5] = \
435  self.machinemachine.algorithm.algorithm.getPayloadValues()
436  result_str = calibration_result_string(run_data[i][1])
437  basf2.B2INFO('Run %d: %s; requested precision %f, achieved '
438  'precision %f.' %
439  (run_data[i][0], result_str, 0.02,
440  # run_data[i][3].getRequestedPrecision(),
441  run_data[i][3].getAchievedPrecision()))
442  if (run_data[i][1] == 2):
443  j = i + 1
444  while (j < run_range[1]):
445  # Force calibration if there are no more runs
446  # in the range.
447  if (j == run_range[1] - 1):
448  forced_calibration = True
449  else:
450  forced_calibration = False
451  merge_runs_2(run_data, i, j, forced_calibration)
452  run_data[j][1] = -1
453  j = j + 1
454  if (run_data[i][1] == 0):
455  break
456  i = j
457  else:
458  i = i + 1
459 
460  i = 0
461  while (i < len(run_data)):
462  if (run_data[i][1] == -1):
463  del run_data[i]
464  else:
465  i = i + 1
466 
467  # Stage 4: write the results to the database.
468  def commit_payload(run_data, run):
469  basf2.B2INFO('Writing run %d.' % (run_data[run][0]))
470  self.machinemachine.algorithm.algorithm.commit(run_data[run][5])
471 
472  for i in range(0, len(run_data)):
473  # Get first run again due to possible mergings.
474  run_data[i][2].sort(key=lambda x: x.run)
475  first_run = run_data[i][2][0].run
476  # Set IOV for the current run.
477  # The last run will be overwritten when writing the result.
478  run_data[i][5].front().iov = \
479  Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
480  # Write the previous run.
481  if (i > 0):
482  iov = run_data[previous_run][5].front().iov
483  if (previous_run == 0):
484  run_data[previous_run][5].front().iov = \
486  lowest_exprun.exp, lowest_exprun.run,
487  experiment, first_run - 1)
488  else:
489  run_data[previous_run][5].front().iov = \
490  Belle2.IntervalOfValidity(experiment, iov.getRunLow(),
491  experiment, first_run - 1)
492  commit_payload(run_data, previous_run)
493  previous_run = i
494  if (i == 0):
495  previous_run = 0
496  # Write the current run if it is the last run.
497  if (i == len(run_data) - 1):
498  iov = run_data[i][5].front().iov
499  run_data[i][5].front().iov = \
501  experiment, iov.getRunLow(),
502  highest_exprun.exp, highest_exprun.run)
503  commit_payload(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
first_execution
Flag for the first execution of this AlgorithmStrategy.
def execute_over_run_list(self, run_list, iteration, forced_calibration, calibration_stage, output_file)
queue
The multiprocessing queue used to pass back results one at a time.