Belle II Software  release-08-01-10
9 """Custom calibration strategy for KLM channel status."""
11 import numpy
13 import basf2
14 import ROOT
15 from ROOT import Belle2
17 from caf.utils import AlgResult, IoV
18 from caf.utils import runs_overlapping_iov, runs_from_vector
19 from caf.utils import split_runs_by_exp
20 from caf.strategies import AlgorithmStrategy, StrategyError
21 from caf.state_machines import AlgorithmMachine
22 from ROOT.Belle2 import KLMChannelStatusAlgorithm, KLMChannelIndex
23 from klm_strategies_common import get_lowest_exprun, get_highest_exprun, \
24  calibration_result_string
27 class KLMChannelStatus(AlgorithmStrategy):
28  """
29  Custom strategy for executing the KLM channel status. Requires complex
30  run merging rules.
32  This uses a `caf.state_machines.AlgorithmMachine` to actually execute
33  the various steps rather than operating on a CalibrationAlgorithm
34  C++ class directly.
35  """
39  usable_params = {'iov_coverage': IoV}
41  def __init__(self, algorithm):
42  """
43  """
44  super().__init__(algorithm)
48  self.machinemachine = AlgorithmMachine(self.algorithm)
50  self.first_executionfirst_execution = True
52  def run(self, iov, iteration, queue):
53  """
54  Runs the algorithm machine over the collected data and
55  fills the results.
56  """
57  if not self.is_valid():
58  raise StrategyError('The strategy KLMChannelStatus was not '
59  'set up correctly.')
61  self.queuequeue = queue
63  basf2.B2INFO(f'Setting up {self.__class__.__name__} strategy '
64  f'for {}')
65  # Add all the necessary parameters for a strategy to run.
66  machine_params = {}
67  machine_params['database_chain'] = self.database_chain
68  machine_params['dependent_databases'] = self.dependent_databases
69  machine_params['output_dir'] = self.output_dir
70  machine_params['output_database_dir'] = self.output_database_dir
71  machine_params['input_files'] = self.input_files
72  machine_params['ignored_runs'] = self.ignored_runs
73  self.machinemachine.setup_from_dict(machine_params)
74  # Start moving through machine states.
75  basf2.B2INFO(f'Starting AlgorithmMachine of {}')
76  # This sets up the logging and database chain and assigns all
77  # input files from collector jobs.
78  self.machinemachine.setup_algorithm(iteration=iteration)
79  # After this point, the logging is in the stdout of the algorithm.
80  basf2.B2INFO(f'Beginning execution of {} using '
81  f'strategy {self.__class__.__name__}')
83  # Select of runs for calibration.
84  runs = self.algorithm.algorithm.getRunListFromAllData()
85  all_runs_collected = set(runs_from_vector(runs))
86  # Select runs overlapping with the calibration IOV if it is specified.
87  if iov:
88  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
89  else:
90  runs_to_execute = all_runs_collected
91  # Remove the ignored runs.
92  if self.ignored_runs:
93  basf2.B2INFO(f'Removing the ignored_runs from the runs '
94  f'to execute for {}')
95  runs_to_execute.difference_update(set(self.ignored_runs))
97  # Creation of sorted run list split by experiment.
98  runs_to_execute = sorted(runs_to_execute)
99  runs_to_execute = split_runs_by_exp(runs_to_execute)
101  # Get IOV coverage,
102  iov_coverage = None
103  if 'iov_coverage' in self.algorithm.params:
104  iov_coverage = self.algorithm.params['iov_coverage']
106  # Iterate over experiment run lists.
107  number_of_experiments = len(runs_to_execute)
108  for i_exp, run_list in enumerate(runs_to_execute, start=1):
109  lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
110  run_list, iov_coverage)
111  highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
112  run_list, iov_coverage)
113  self.process_experimentprocess_experiment(run_list[0].exp, run_list, iteration,
114  lowest_exprun, highest_exprun)
116  # Send final state and result to CAF.
117  self.send_result(self.machinemachine.result)
118  if (self.machinemachine.result.result == AlgResult.ok.value) or \
119  (self.machinemachine.result.result == AlgResult.iterate.value):
120  self.send_final_state(self.COMPLETED)
121  else:
122  self.send_final_state(self.FAILED)
124  def execute_over_run_list(self, run_list, iteration, forced_calibration):
125  """
126  Execute over run list.
127  """
128  if not self.first_executionfirst_execution:
129  self.machinemachine.setup_algorithm()
130  else:
131  self.first_executionfirst_execution = False
132  self.machinemachine.algorithm.algorithm.setForcedCalibration(
133  forced_calibration)
134  self.machinemachine.execute_runs(runs=run_list, iteration=iteration,
135  apply_iov=None)
136  if (self.machinemachine.result.result == AlgResult.ok.value) or \
137  (self.machinemachine.result.result == AlgResult.iterate.value):
138  self.machinemachine.complete()
139  else:
142  def process_experiment(self, experiment, experiment_runs, iteration,
143  lowest_exprun, highest_exprun):
144  """
145  Process runs from experiment.
146  """
147  # Run lists. They have the following format: run number,
148  # calibration result code, ExpRun, algorithm results,
149  # merge information, payload.
150  run_data = []
151  run_data_klm_excluded = []
153  # Initial run.
154  for exp_run in experiment_runs:
155  self.execute_over_run_listexecute_over_run_list([exp_run], iteration, False)
156  result = self.machinemachine.result.result
157  algorithm_results = KLMChannelStatusAlgorithm.Results(
158  self.machinemachine.algorithm.algorithm.getResults())
159  payload = self.machinemachine.algorithm.algorithm.getPayloadValues()
160  run_results = [
161, result, [exp_run], algorithm_results, '', payload]
162  if (algorithm_results.getTotalHitNumber() > 0):
163  run_data.append(run_results)
164  else:
165  run_data_klm_excluded.append(run_results)
166  result_str = calibration_result_string(result)
167  basf2.B2INFO(f'Run {int(}: {result_str}.')
169  # Sort by run number.
170  run_data.sort(key=lambda x: x[0])
171  run_data_klm_excluded.sort(key=lambda x: x[0])
173  # Create a tree with number of events.
174  save_channel_hit_map = False
175  save_module_hit_map = True
176  save_sector_hit_map = True
177  f_hit_map = ROOT.TFile('hit_map.root', 'recreate')
178  run = numpy.zeros(1, dtype=int)
179  calibration_result = numpy.zeros(1, dtype=int)
180  module = numpy.zeros(1, dtype=int)
181  subdetector = numpy.zeros(1, dtype=int)
182  section = numpy.zeros(1, dtype=int)
183  sector = numpy.zeros(1, dtype=int)
184  layer = numpy.zeros(1, dtype=int)
185  plane = numpy.zeros(1, dtype=int)
186  strip = numpy.zeros(1, dtype=int)
187  hits_total = numpy.zeros(1, dtype=int)
188  hits_module = numpy.zeros(1, dtype=int)
189  active_channels = numpy.zeros(1, dtype=int)
190  hit_map_channel = ROOT.TTree('hit_map_channel', '')
191  hit_map_channel.Branch('run', run, 'run/I')
192  hit_map_channel.Branch('calibration_result', calibration_result,
193  'calibration_result/I')
194  hit_map_channel.Branch('channel', module, 'channel/I')
195  hit_map_channel.Branch('subdetector', subdetector, 'subdetector/I')
196  hit_map_channel.Branch('section', section, 'section/I')
197  hit_map_channel.Branch('sector', sector, 'sector/I')
198  hit_map_channel.Branch('layer', layer, 'layer/I')
199  hit_map_channel.Branch('plane', plane, 'plane/I')
200  hit_map_channel.Branch('strip', strip, 'strip/I')
201  hit_map_channel.Branch('hits_total', hits_total, 'hits_total/I')
202  hit_map_channel.Branch('hits_channel', hits_module, 'hits_channel/I')
203  hit_map_module = ROOT.TTree('hit_map_module', '')
204  hit_map_module.Branch('run', run, 'run/I')
205  hit_map_module.Branch('calibration_result', calibration_result,
206  'calibration_result/I')
207  hit_map_module.Branch('module', module, 'module/I')
208  hit_map_module.Branch('subdetector', subdetector, 'subdetector/I')
209  hit_map_module.Branch('section', section, 'section/I')
210  hit_map_module.Branch('sector', sector, 'sector/I')
211  hit_map_module.Branch('layer', layer, 'layer/I')
212  hit_map_module.Branch('hits_total', hits_total, 'hits_total/I')
213  hit_map_module.Branch('hits_module', hits_module, 'hits_module/I')
214  hit_map_module.Branch('active_channels', active_channels,
215  'active_channels/I')
216  hit_map_sector = ROOT.TTree('hit_map_sector', '')
217  hit_map_sector.Branch('run', run, 'run/I')
218  hit_map_sector.Branch('calibration_result', calibration_result,
219  'calibration_result/I')
220  hit_map_sector.Branch('sector_global', module, 'sector_global/I')
221  hit_map_sector.Branch('subdetector', subdetector, 'subdetector/I')
222  hit_map_sector.Branch('section', section, 'section/I')
223  hit_map_sector.Branch('sector', sector, 'sector/I')
224  hit_map_sector.Branch('hits_total', hits_total, 'hits_total/I')
225  hit_map_sector.Branch('hits_sector', hits_module, 'hits_sector/I')
226  for i in range(0, len(run_data)):
227  run[0] = run_data[i][0]
228  calibration_result[0] = run_data[i][1]
229  hits_total[0] = run_data[i][3].getTotalHitNumber()
230  # Channel hit map.
231  if (save_channel_hit_map):
232  index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
233  index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
234  while (index != index2.end()):
235  module[0] = index.getKLMChannelNumber()
236  subdetector[0] = index.getSubdetector()
237  section[0] = index.getSection()
238  sector[0] = index.getSector()
239  layer[0] = index.getLayer()
240  plane[0] = index.getPlane()
241  strip[0] = index.getStrip()
242  hits_module[0] = run_data[i][3].getHitMapChannel(). \
243  getChannelData(int(module[0]))
244  hit_map_channel.Fill()
245  index.increment()
246  # Module hit map.
247  if (save_module_hit_map):
248  index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
249  index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
250  while (index != index2.end()):
251  module[0] = index.getKLMModuleNumber()
252  subdetector[0] = index.getSubdetector()
253  section[0] = index.getSection()
254  sector[0] = index.getSector()
255  layer[0] = index.getLayer()
256  hits_module[0] = run_data[i][3].getHitMapModule(). \
257  getChannelData(int(module[0]))
258  active_channels[0] = run_data[i][3]. \
259  getModuleActiveChannelMap(). \
260  getChannelData(int(module[0]))
261  hit_map_module.Fill()
262  index.increment()
263  # Sector hit map.
264  if (save_sector_hit_map):
265  index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
266  index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
267  while (index != index2.end()):
268  module[0] = index.getKLMSectorNumber()
269  subdetector[0] = index.getSubdetector()
270  section[0] = index.getSection()
271  sector[0] = index.getSector()
272  hits_module[0] = run_data[i][3].getHitMapSector(). \
273  getChannelData(int(module[0]))
274  hit_map_sector.Fill()
275  index.increment()
276  hit_map_channel.Write()
277  hit_map_module.Write()
278  hit_map_sector.Write()
279  f_hit_map.Close()
281  # Create list of runs that do not have enough data.
282  run_ranges = []
283  i = 0
284  while (i < len(run_data)):
285  if (run_data[i][1] == 2):
286  j = i
287  while (run_data[j][1] == 2):
288  j += 1
289  if (j >= len(run_data)):
290  break
291  run_ranges.append([i, j])
292  i = j
293  else:
294  i += 1
296  # Determine whether the runs with insufficient data can be merged
297  # to the next or previous normal run.
298  def can_merge(run_data, run_not_enough_data, run_normal):
299  return run_data[run_not_enough_data][3].getModuleStatus(). \
300  newNormalChannels(
301  run_data[run_normal][3].getModuleStatus()) == 0
303  for run_range in run_ranges:
304  next_run = run_range[1]
305  # To mark as 'none' at the end if there are no normal runs.
306  j = run_range[0]
307  i = next_run - 1
308  if (next_run < len(run_data)):
309  while (i >= run_range[0]):
310  if (can_merge(run_data, i, next_run)):
311  basf2.B2INFO(
312  f'Run {int(run_data[i][0])} (not enough data) can be merged into the next normal run ' +
313  f'{int(run_data[next_run][0])}.')
314  run_data[i][4] = 'next'
315  else:
316  basf2.B2INFO(
317  f'Run {int(run_data[i][0])} (not enough data) cannot be merged into the next normal run ' +
318  f'{int(run_data[next_run][0])}, will try the previous one.')
319  break
320  i -= 1
321  if (i < run_range[0]):
322  continue
323  previous_run = run_range[0] - 1
324  if (previous_run >= 0):
325  while (j <= i):
326  if (can_merge(run_data, j, previous_run)):
327  basf2.B2INFO(
328  f'Run {int(run_data[j][0])} (not enough data) can be merged into the previous normal run ' +
329  f'{int(run_data[previous_run][0])}.')
330  run_data[j][4] = 'previous'
331  else:
332  basf2.B2INFO(
333  f'Run {int(run_data[j][0])} (not enough data) cannot be merged into the previous normal run ' +
334  f'{int(run_data[previous_run][0])}.')
335  break
336  j += 1
337  if (j > i):
338  continue
339  basf2.B2INFO('A range of runs with not enough data is found that cannot be merged into neither previous nor ' +
340  f'next normal run: from {int(run_data[j][0])} to {int(run_data[i][0])}.')
341  while (j <= i):
342  run_data[j][4] = 'none'
343  j += 1
345  # Merge runs that do not have enough data. If both this and next
346  # run do not have enough data, then merge the collected data.
347  i = 0
348  j = 0
349  while (i < len(run_data) - 1):
350  while ((run_data[i][1] == 2) and (run_data[i + 1][1] == 2)):
351  if (run_data[i][4] != run_data[i + 1][4]):
352  break
353  basf2.B2INFO(f'Merging run {int(run_data[i + 1][0])} (not enough data) into run {int(run_data[i][0])} ' +
354  '(not enough data).')
355  run_data[i][2].extend(run_data[i + 1][2])
356  del run_data[i + 1]
357  self.execute_over_run_listexecute_over_run_list(run_data[i][2], iteration, False)
358  run_data[i][1] = self.machinemachine.result.result
359  run_data[i][3] = KLMChannelStatusAlgorithm.Results(
360  self.machinemachine.algorithm.algorithm.getResults())
361  run_data[i][5] = \
362  self.machinemachine.algorithm.algorithm.getPayloadValues()
363  result_str = calibration_result_string(run_data[i][1])
364  basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
365  if (i >= len(run_data) - 1):
366  break
367  i += 1
369  # Merge runs that do not have enough data into normal runs.
370  # Currently merging the data (TODO: consider result comparison).
371  def merge_runs(run_data, run_not_enough_data, run_normal, forced):
372  basf2.B2INFO(f'Merging run {int(run_data[run_not_enough_data][0])} (not enough data) into run ' +
373  f'{int(run_data[run_normal][0])} (normal).')
374  run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
375  self.execute_over_run_listexecute_over_run_list(run_data[run_normal][2], iteration,
376  forced)
377  run_data[run_normal][1] = self.machinemachine.result.result
378  run_data[run_normal][3] = KLMChannelStatusAlgorithm.Results(
379  self.machinemachine.algorithm.algorithm.getResults())
380  run_data[run_normal][5] = self.machinemachine.algorithm.algorithm.getPayloadValues()
381  result_str = calibration_result_string(run_data[run_normal][1])
382  basf2.B2INFO(f'Run {int(run_data[run_normal][0])}: {result_str}.')
383  if (run_data[run_normal][1] != 0):
384  basf2.B2FATAL(f'Merging run {int(run_data[run_not_enough_data][0])} into run {int(run_data[run_normal][0])} ' +
385  'failed.')
386  del run_data[run_not_enough_data]
388  i = 0
389  while (i < len(run_data)):
390  if (run_data[i][1] == 2):
391  if (run_data[i][4] == 'next'):
392  merge_runs(run_data, i, i + 1, False)
393  elif (run_data[i][4] == 'previous'):
394  merge_runs(run_data, i, i - 1, False)
395  else:
396  i += 1
397  else:
398  i += 1
399  i = 0
400  while (i < len(run_data)):
401  if (run_data[i][1] == 2 and run_data[i][4] == 'none'):
402  new_modules_previous = -1
403  new_modules_next = -1
404  if (i < len(run_data) - 1):
405  new_modules_next = run_data[i][3].getModuleStatus(). \
406  newNormalChannels(run_data[i + 1][3].getModuleStatus())
407  basf2.B2INFO(f'There are {int(new_modules_next)} new active modules in run {int(run_data[i][0])} ' +
408  f'relatively to run {int(run_data[i + 1][0])}.')
409  if (i > 0):
410  new_modules_previous = run_data[i][3].getModuleStatus(). \
411  newNormalChannels(run_data[i - 1][3].getModuleStatus())
412  basf2.B2INFO(f'There are {int(new_modules_previous)} new active modules in run {int(run_data[i][0])} ' +
413  f'relatively to run {int(run_data[i - 1][0])}.')
414  run_for_merging = -1
415  # If a forced merge of the normal run with another run from
416  # a different range of runs with not enough data has already
417  # been performed, then the list of active modules may change
418  # and there would be 0 new modules. Consequently, the number
419  # of modules is checked to be greater or equal than 0. However,
420  # there is no guarantee that the same added module would be
421  # calibrated normally. Thus, a forced merge is performed anyway.
422  if (new_modules_previous >= 0 and new_modules_next < 0):
423  run_for_merging = i - 1
424  elif (new_modules_previous < 0 and new_modules_next >= 0):
425  run_for_merging = i + 1
426  elif (new_modules_previous >= 0 and new_modules_next >= 0):
427  if (new_modules_previous < new_modules_next):
428  run_for_merging = i - 1
429  else:
430  run_for_merging = i + 1
431  else:
432  basf2.B2INFO(f'Cannot determine run for merging for run {int(run_data[i][0])}, performing its forced ' +
433  'calibration.')
434  self.execute_over_run_listexecute_over_run_list(run_data[i][2], iteration, True)
435  run_data[i][1] = self.machinemachine.result.result
436  run_data[i][3] = KLMChannelStatusAlgorithm.Results(
437  self.machinemachine.algorithm.algorithm.getResults())
438  run_data[i][5] = self.machinemachine.algorithm.algorithm.getPayloadValues()
439  result_str = calibration_result_string(run_data[i][1])
440  basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
441  if (run_data[i][1] != 0):
442  basf2.B2FATAL(f'Forced calibration of run {int(run_data[i][0])} failed.')
443  if (run_for_merging >= 0):
444  merge_runs(run_data, i, run_for_merging, True)
445  else:
446  i += 1
448  # Write the results.
449  def commit_payload(run_data):
450  if (run_data[1] == 2):
451  basf2.B2INFO(f'Run {int(run_data[0])} has no calibration result, skipped.')
452  return
453  basf2.B2INFO(f'Writing run {int(run_data[0])}.')
454  self.machinemachine.algorithm.algorithm.commit(run_data[5])
456  def write_result(run_data, run):
457  iov = run_data[run][5].front().iov
458  run_low = iov.getRunLow()
459  run_high = iov.getRunHigh()
460  j = 0
461  runs = []
462  while (j < len(run_data_klm_excluded)):
463  if (run_low < run_data_klm_excluded[j][0] and
464  ((run_data_klm_excluded[j][0] < run_high) or
465  (run_high == -1))):
466  runs.append([run_data_klm_excluded[j][0], 'klm_excluded'])
467  j += 1
468  if (len(runs) == 0):
469  commit_payload(run_data[run])
470  return
471  for r in run_data[run][2]:
472  runs.append([, 'klm_included'])
473  runs.sort(key=lambda x: x[0])
474  run_first = 0
475  run_last = 0
476  while (run_last < len(runs)):
477  run_last = run_first
478  while (runs[run_last][1] == runs[run_first][1]):
479  run_last += 1
480  if (run_last >= len(runs)):
481  break
482  if (run_first == 0):
483  run1 = run_low
484  else:
485  run1 = runs[run_first][0]
486  if (run_last < len(runs)):
487  run2 = runs[run_last][0] - 1
488  else:
489  run2 = run_high
490  iov = Belle2.IntervalOfValidity(experiment, run1,
491  experiment, run2)
492  if (runs[run_first][1] == 'klm_included'):
493  run_data[run][5].front().iov = iov
494  commit_payload(run_data[run])
495  else:
496  run_data_klm_excluded[0][5].front().iov = iov
497  commit_payload(run_data_klm_excluded[0])
498  run_first = run_last
500  first_run = 0
501  for i in range(0, len(run_data)):
502  # Get first run again due to possible mergings.
503  run_data[i][2].sort(key=lambda x:
504  first_run = run_data[i][2][0].run
505  # Set IOV for the current run.
506  # The last run will be overwritten when writing the result.
507  run_data[i][5].front().iov = \
508  Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
509  # Compare with the previous run.
510  write_previous_run = True
511  if (i > 0):
512  if (run_data[i][1] == 0 and run_data[i - 1][1] == 0):
513  if (run_data[i][3].getChannelStatus() ==
514  run_data[i - 1][3].getChannelStatus()):
515  basf2.B2INFO(f'Run {int(run_data[i][0])}: result is the same as for the previous run ' +
516  f'{int(run_data[i - 1][0])}.')
517  if (previous_run >= 0):
518  iov = run_data[previous_run][5].front().iov
519  run_data[previous_run][5].front().iov = \
521  experiment, iov.getRunLow(),
522  experiment, first_run - 1)
523  write_previous_run = False
524  # Set IOV for the current run.
525  # The last run will be overwritten when writing the result.
526  run_data[i][5].front().iov = Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
527  # If the calibration result is different, write the previous run.
528  if (write_previous_run and (i > 0)):
529  iov = run_data[previous_run][5].front().iov
530  if (previous_run == 0):
531  run_data[previous_run][5].front().iov = Belle2.IntervalOfValidity(
532  lowest_exprun.exp,,
533  experiment, first_run - 1)
534  else:
535  run_data[previous_run][5].front().iov = Belle2.IntervalOfValidity(experiment, iov.getRunLow(),
536  experiment, first_run - 1)
537  write_result(run_data, previous_run)
538  previous_run = i
539  if (i == 0):
540  previous_run = 0
541  # Write the current run if it is the last run.
542  if (i == len(run_data) - 1):
543  iov = run_data[i][5].front().iov
544  run_data[i][5].front().iov = Belle2.IntervalOfValidity(
545  experiment, iov.getRunLow(),
546  highest_exprun.exp,
547  write_result(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
def run(self, iov, iteration, queue)
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
Flag for the first execution of this AlgorithmStrategy.
def execute_over_run_list(self, run_list, iteration, forced_calibration)
The multiprocessing queue used to pass back results one at a time.