Belle II Software  release-06-01-15
klm_channel_status.py
1 # -*- coding: utf-8 -*-
2 
3 
10 
11 """Custom calibration strategy for KLM channel status."""
12 
13 import numpy
14 
15 import basf2
16 import ROOT
17 from ROOT import Belle2
18 
19 from caf.utils import AlgResult, IoV
20 from caf.utils import runs_overlapping_iov, runs_from_vector
21 from caf.utils import split_runs_by_exp
22 from caf.strategies import AlgorithmStrategy, StrategyError
23 from caf.state_machines import AlgorithmMachine
24 from ROOT.Belle2 import KLMChannelStatusAlgorithm, KLMChannelIndex
25 from klm_strategies_common import get_lowest_exprun, get_highest_exprun, \
26  calibration_result_string
27 
28 
29 class KLMChannelStatus(AlgorithmStrategy):
30  """
31  Custom strategy for executing the KLM channel status. Requires complex
32  run merging rules.
33 
34  This uses a `caf.state_machines.AlgorithmMachine` to actually execute
35  the various steps rather than operating on a CalibrationAlgorithm
36  C++ class directly.
37  """
38 
39 
41  usable_params = {'iov_coverage': IoV}
42 
43  def __init__(self, algorithm):
44  """
45  """
46  super().__init__(algorithm)
47 
50  self.machinemachine = AlgorithmMachine(self.algorithm)
51 
52  self.first_executionfirst_execution = True
53 
54  def run(self, iov, iteration, queue):
55  """
56  Runs the algorithm machine over the collected data and
57  fills the results.
58  """
59  if not self.is_valid():
60  raise StrategyError('The strategy KLMChannelStatus was not '
61  'set up correctly.')
62 
63  self.queuequeue = queue
64 
65  basf2.B2INFO(f'Setting up {self.__class__.__name__} strategy '
66  f'for {self.algorithm.name}')
67  # Add all the necessary parameters for a strategy to run.
68  machine_params = {}
69  machine_params['database_chain'] = self.database_chain
70  machine_params['dependent_databases'] = self.dependent_databases
71  machine_params['output_dir'] = self.output_dir
72  machine_params['output_database_dir'] = self.output_database_dir
73  machine_params['input_files'] = self.input_files
74  machine_params['ignored_runs'] = self.ignored_runs
75  self.machinemachine.setup_from_dict(machine_params)
76  # Start moving through machine states.
77  basf2.B2INFO(f'Starting AlgorithmMachine of {self.algorithm.name}')
78  # This sets up the logging and database chain and assigns all
79  # input files from collector jobs.
80  self.machinemachine.setup_algorithm(iteration=iteration)
81  # After this point, the logging is in the stdout of the algorithm.
82  basf2.B2INFO(f'Beginning execution of {self.algorithm.name} using '
83  f'strategy {self.__class__.__name__}')
84 
85  # Select of runs for calibration.
86  runs = self.algorithm.algorithm.getRunListFromAllData()
87  all_runs_collected = set(runs_from_vector(runs))
88  # Select runs overlapping with the calibration IOV if it is specified.
89  if iov:
90  runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
91  else:
92  runs_to_execute = all_runs_collected
93  # Remove the ignored runs.
94  if self.ignored_runs:
95  basf2.B2INFO(f'Removing the ignored_runs from the runs '
96  f'to execute for {self.algorithm.name}')
97  runs_to_execute.difference_update(set(self.ignored_runs))
98 
99  # Creation of sorted run list split by experiment.
100  runs_to_execute = sorted(runs_to_execute)
101  runs_to_execute = split_runs_by_exp(runs_to_execute)
102 
103  # Get IOV coverage,
104  iov_coverage = None
105  if 'iov_coverage' in self.algorithm.params:
106  iov_coverage = self.algorithm.params['iov_coverage']
107 
108  # Iterate over experiment run lists.
109  number_of_experiments = len(runs_to_execute)
110  for i_exp, run_list in enumerate(runs_to_execute, start=1):
111  lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
112  run_list, iov_coverage)
113  highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
114  run_list, iov_coverage)
115  self.process_experimentprocess_experiment(run_list[0].exp, run_list, iteration,
116  lowest_exprun, highest_exprun)
117 
118  # Send final state and result to CAF.
119  self.send_result(self.machinemachine.result)
120  if (self.machinemachine.result.result == AlgResult.ok.value) or \
121  (self.machinemachine.result.result == AlgResult.iterate.value):
122  self.send_final_state(self.COMPLETED)
123  else:
124  self.send_final_state(self.FAILED)
125 
126  def execute_over_run_list(self, run_list, iteration, forced_calibration):
127  """
128  Execute over run list.
129  """
130  if not self.first_executionfirst_execution:
131  self.machinemachine.setup_algorithm()
132  else:
133  self.first_executionfirst_execution = False
134  self.machinemachine.algorithm.algorithm.setForcedCalibration(
135  forced_calibration)
136  self.machinemachine.execute_runs(runs=run_list, iteration=iteration,
137  apply_iov=None)
138  if (self.machinemachine.result.result == AlgResult.ok.value) or \
139  (self.machinemachine.result.result == AlgResult.iterate.value):
140  self.machinemachine.complete()
141  else:
142  self.machinemachine.fail()
143 
144  def process_experiment(self, experiment, experiment_runs, iteration,
145  lowest_exprun, highest_exprun):
146  """
147  Process runs from experiment.
148  """
149  # Run lists. They have the following format: run number,
150  # calibration result code, ExpRun, algorithm results,
151  # merge information, payload.
152  run_data = []
153  run_data_klm_excluded = []
154 
155  # Initial run.
156  for exp_run in experiment_runs:
157  self.execute_over_run_listexecute_over_run_list([exp_run], iteration, False)
158  result = self.machinemachine.result.result
159  algorithm_results = KLMChannelStatusAlgorithm.Results(
160  self.machinemachine.algorithm.algorithm.getResults())
161  payload = self.machinemachine.algorithm.algorithm.getPayloadValues()
162  run_results = [
163  exp_run.run, result, [exp_run], algorithm_results, '', payload]
164  if (algorithm_results.getTotalHitNumber() > 0):
165  run_data.append(run_results)
166  else:
167  run_data_klm_excluded.append(run_results)
168  result_str = calibration_result_string(result)
169  basf2.B2INFO('Run %d: %s.' % (exp_run.run, result_str))
170 
171  # Sort by run number.
172  run_data.sort(key=lambda x: x[0])
173  run_data_klm_excluded.sort(key=lambda x: x[0])
174 
175  # Create a tree with number of events.
176  save_channel_hit_map = False
177  save_module_hit_map = True
178  save_sector_hit_map = True
179  f_hit_map = ROOT.TFile('hit_map.root', 'recreate')
180  run = numpy.zeros(1, dtype=int)
181  calibration_result = numpy.zeros(1, dtype=int)
182  module = numpy.zeros(1, dtype=int)
183  subdetector = numpy.zeros(1, dtype=int)
184  section = numpy.zeros(1, dtype=int)
185  sector = numpy.zeros(1, dtype=int)
186  layer = numpy.zeros(1, dtype=int)
187  plane = numpy.zeros(1, dtype=int)
188  strip = numpy.zeros(1, dtype=int)
189  hits_total = numpy.zeros(1, dtype=int)
190  hits_module = numpy.zeros(1, dtype=int)
191  active_channels = numpy.zeros(1, dtype=int)
192  hit_map_channel = ROOT.TTree('hit_map_channel', '')
193  hit_map_channel.Branch('run', run, 'run/I')
194  hit_map_channel.Branch('calibration_result', calibration_result,
195  'calibration_result/I')
196  hit_map_channel.Branch('channel', module, 'channel/I')
197  hit_map_channel.Branch('subdetector', subdetector, 'subdetector/I')
198  hit_map_channel.Branch('section', section, 'section/I')
199  hit_map_channel.Branch('sector', sector, 'sector/I')
200  hit_map_channel.Branch('layer', layer, 'layer/I')
201  hit_map_channel.Branch('plane', plane, 'plane/I')
202  hit_map_channel.Branch('strip', strip, 'strip/I')
203  hit_map_channel.Branch('hits_total', hits_total, 'hits_total/I')
204  hit_map_channel.Branch('hits_channel', hits_module, 'hits_channel/I')
205  hit_map_module = ROOT.TTree('hit_map_module', '')
206  hit_map_module.Branch('run', run, 'run/I')
207  hit_map_module.Branch('calibration_result', calibration_result,
208  'calibration_result/I')
209  hit_map_module.Branch('module', module, 'module/I')
210  hit_map_module.Branch('subdetector', subdetector, 'subdetector/I')
211  hit_map_module.Branch('section', section, 'section/I')
212  hit_map_module.Branch('sector', sector, 'sector/I')
213  hit_map_module.Branch('layer', layer, 'layer/I')
214  hit_map_module.Branch('hits_total', hits_total, 'hits_total/I')
215  hit_map_module.Branch('hits_module', hits_module, 'hits_module/I')
216  hit_map_module.Branch('active_channels', active_channels,
217  'active_channels/I')
218  hit_map_sector = ROOT.TTree('hit_map_sector', '')
219  hit_map_sector.Branch('run', run, 'run/I')
220  hit_map_sector.Branch('calibration_result', calibration_result,
221  'calibration_result/I')
222  hit_map_sector.Branch('sector_global', module, 'sector_global/I')
223  hit_map_sector.Branch('subdetector', subdetector, 'subdetector/I')
224  hit_map_sector.Branch('section', section, 'section/I')
225  hit_map_sector.Branch('sector', sector, 'sector/I')
226  hit_map_sector.Branch('hits_total', hits_total, 'hits_total/I')
227  hit_map_sector.Branch('hits_sector', hits_module, 'hits_sector/I')
228  for i in range(0, len(run_data)):
229  run[0] = run_data[i][0]
230  calibration_result[0] = run_data[i][1]
231  hits_total[0] = run_data[i][3].getTotalHitNumber()
232  # Channel hit map.
233  if (save_channel_hit_map):
234  index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
235  index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
236  while (index != index2.end()):
237  module[0] = index.getKLMChannelNumber()
238  subdetector[0] = index.getSubdetector()
239  section[0] = index.getSection()
240  sector[0] = index.getSector()
241  layer[0] = index.getLayer()
242  plane[0] = index.getPlane()
243  strip[0] = index.getStrip()
244  hits_module[0] = run_data[i][3].getHitMapChannel(). \
245  getChannelData(int(module[0]))
246  hit_map_channel.Fill()
247  index.increment()
248  # Module hit map.
249  if (save_module_hit_map):
250  index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
251  index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
252  while (index != index2.end()):
253  module[0] = index.getKLMModuleNumber()
254  subdetector[0] = index.getSubdetector()
255  section[0] = index.getSection()
256  sector[0] = index.getSector()
257  layer[0] = index.getLayer()
258  hits_module[0] = run_data[i][3].getHitMapModule(). \
259  getChannelData(int(module[0]))
260  active_channels[0] = run_data[i][3]. \
261  getModuleActiveChannelMap(). \
262  getChannelData(int(module[0]))
263  hit_map_module.Fill()
264  index.increment()
265  # Sector hit map.
266  if (save_sector_hit_map):
267  index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
268  index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
269  while (index != index2.end()):
270  module[0] = index.getKLMSectorNumber()
271  subdetector[0] = index.getSubdetector()
272  section[0] = index.getSection()
273  sector[0] = index.getSector()
274  hits_module[0] = run_data[i][3].getHitMapSector(). \
275  getChannelData(int(module[0]))
276  hit_map_sector.Fill()
277  index.increment()
278  hit_map_channel.Write()
279  hit_map_module.Write()
280  hit_map_sector.Write()
281  f_hit_map.Close()
282 
283  # Create list of runs that do not have enough data.
284  run_ranges = []
285  i = 0
286  while (i < len(run_data)):
287  if (run_data[i][1] == 2):
288  j = i
289  while (run_data[j][1] == 2):
290  j += 1
291  if (j >= len(run_data)):
292  break
293  run_ranges.append([i, j])
294  i = j
295  else:
296  i += 1
297 
298  # Determine whether the runs with insufficient data can be merged
299  # to the next or previous normal run.
300  def can_merge(run_data, run_not_enough_data, run_normal):
301  return run_data[run_not_enough_data][3].getModuleStatus(). \
302  newNormalChannels(
303  run_data[run_normal][3].getModuleStatus()) == 0
304 
305  for run_range in run_ranges:
306  next_run = run_range[1]
307  # To mark as 'none' at the end if there are no normal runs.
308  j = run_range[0]
309  i = next_run - 1
310  if (next_run < len(run_data)):
311  while (i >= run_range[0]):
312  if (can_merge(run_data, i, next_run)):
313  basf2.B2INFO(
314  'Run %d (not enough data) can be merged into '
315  'the next normal run %d.' %
316  (run_data[i][0], run_data[next_run][0]))
317  run_data[i][4] = 'next'
318  else:
319  basf2.B2INFO(
320  'Run %d (not enough data) cannot be merged into '
321  'the next normal run %d, will try the previous '
322  'one.' % (run_data[i][0], run_data[next_run][0]))
323  break
324  i -= 1
325  if (i < run_range[0]):
326  continue
327  previous_run = run_range[0] - 1
328  if (previous_run >= 0):
329  while (j <= i):
330  if (can_merge(run_data, j, previous_run)):
331  basf2.B2INFO(
332  'Run %d (not enough data) can be merged into '
333  'the previous normal run %d.' %
334  (run_data[j][0], run_data[previous_run][0]))
335  run_data[j][4] = 'previous'
336  else:
337  basf2.B2INFO(
338  'Run %d (not enough data) cannot be merged into '
339  'the previous normal run %d.' %
340  (run_data[j][0], run_data[previous_run][0]))
341  break
342  j += 1
343  if (j > i):
344  continue
345  basf2.B2INFO('A range of runs with not enough data is found '
346  'that cannot be merged into neither previous nor '
347  'next normal run: from %d to %d.' %
348  (run_data[j][0], run_data[i][0]))
349  while (j <= i):
350  run_data[j][4] = 'none'
351  j += 1
352 
353  # Merge runs that do not have enough data. If both this and next
354  # run do not have enough data, then merge the collected data.
355  i = 0
356  j = 0
357  while (i < len(run_data) - 1):
358  while ((run_data[i][1] == 2) and (run_data[i + 1][1] == 2)):
359  if (run_data[i][4] != run_data[i + 1][4]):
360  break
361  basf2.B2INFO('Merging run %d (not enough data) into '
362  'run %d (not enough data).' %
363  (run_data[i + 1][0], run_data[i][0]))
364  run_data[i][2].extend(run_data[i + 1][2])
365  del run_data[i + 1]
366  self.execute_over_run_listexecute_over_run_list(run_data[i][2], iteration, False)
367  run_data[i][1] = self.machinemachine.result.result
368  run_data[i][3] = KLMChannelStatusAlgorithm.Results(
369  self.machinemachine.algorithm.algorithm.getResults())
370  run_data[i][5] = \
371  self.machinemachine.algorithm.algorithm.getPayloadValues()
372  result_str = calibration_result_string(run_data[i][1])
373  basf2.B2INFO('Run %d: %s.' % (run_data[i][0], result_str))
374  if (i >= len(run_data) - 1):
375  break
376  i += 1
377 
378  # Merge runs that do not have enough data into normal runs.
379  # Currently merging the data (TODO: consider result comparison).
380  def merge_runs(run_data, run_not_enough_data, run_normal, forced):
381  basf2.B2INFO('Merging run %d (not enough data) into '
382  'run %d (normal).' %
383  (run_data[run_not_enough_data][0],
384  run_data[run_normal][0]))
385  run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
386  self.execute_over_run_listexecute_over_run_list(run_data[run_normal][2], iteration,
387  forced)
388  run_data[run_normal][1] = self.machinemachine.result.result
389  run_data[run_normal][3] = KLMChannelStatusAlgorithm.Results(
390  self.machinemachine.algorithm.algorithm.getResults())
391  run_data[run_normal][5] = self.machinemachine.algorithm.algorithm.getPayloadValues()
392  result_str = calibration_result_string(run_data[run_normal][1])
393  basf2.B2INFO('Run %d: %s.' % (run_data[run_normal][0], result_str))
394  if (run_data[run_normal][1] != 0):
395  basf2.B2FATAL('Merging run %d into run %d failed.' %
396  (run_data[run_not_enough_data][0],
397  run_data[run_normal][0]))
398  del run_data[run_not_enough_data]
399 
400  i = 0
401  while (i < len(run_data)):
402  if (run_data[i][1] == 2):
403  if (run_data[i][4] == 'next'):
404  merge_runs(run_data, i, i + 1, False)
405  elif (run_data[i][4] == 'previous'):
406  merge_runs(run_data, i, i - 1, False)
407  else:
408  i += 1
409  else:
410  i += 1
411  i = 0
412  while (i < len(run_data)):
413  if (run_data[i][1] == 2 and run_data[i][4] == 'none'):
414  new_modules_previous = -1
415  new_modules_next = -1
416  if (i < len(run_data) - 1):
417  new_modules_next = run_data[i][3].getModuleStatus(). \
418  newNormalChannels(run_data[i + 1][3].getModuleStatus())
419  basf2.B2INFO('There are %d new active modules in run %d '
420  'relatively to run %d.' %
421  (new_modules_next, run_data[i][0],
422  run_data[i + 1][0]))
423  if (i > 0):
424  new_modules_previous = run_data[i][3].getModuleStatus(). \
425  newNormalChannels(run_data[i - 1][3].getModuleStatus())
426  basf2.B2INFO('There are %d new active modules in run %d '
427  'relatively to run %d.' %
428  (new_modules_previous, run_data[i][0],
429  run_data[i - 1][0]))
430  run_for_merging = -1
431  # If a forced merge of the normal run with another run from
432  # a different range of runs with not enough data has already
433  # been performed, then the list of active modules may change
434  # and there would be 0 new modules. Consequently, the number
435  # of modules is checked to be greater or equal than 0. However,
436  # there is no guarantee that the same added module would be
437  # calibrated normally. Thus, a forced merge is performed anyway.
438  if (new_modules_previous >= 0 and new_modules_next < 0):
439  run_for_merging = i - 1
440  elif (new_modules_previous < 0 and new_modules_next >= 0):
441  run_for_merging = i + 1
442  elif (new_modules_previous >= 0 and new_modules_next >= 0):
443  if (new_modules_previous < new_modules_next):
444  run_for_merging = i - 1
445  else:
446  run_for_merging = i + 1
447  else:
448  basf2.B2INFO('Cannot determine run for merging for run %d, '
449  'performing its forced calibration.' %
450  (run_data[i][0]))
451  self.execute_over_run_listexecute_over_run_list(run_data[i][2], iteration, True)
452  run_data[i][1] = self.machinemachine.result.result
453  run_data[i][3] = KLMChannelStatusAlgorithm.Results(
454  self.machinemachine.algorithm.algorithm.getResults())
455  run_data[i][5] = self.machinemachine.algorithm.algorithm.getPayloadValues()
456  result_str = calibration_result_string(run_data[i][1])
457  basf2.B2INFO('Run %d: %s.' % (run_data[i][0], result_str))
458  if (run_data[i][1] != 0):
459  basf2.B2FATAL('Forced calibration of run %d failed.' %
460  (run_data[i][0]))
461  if (run_for_merging >= 0):
462  merge_runs(run_data, i, run_for_merging, True)
463  else:
464  i += 1
465 
466  # Write the results.
467  def commit_payload(run_data):
468  if (run_data[1] == 2):
469  basf2.B2INFO('Run %d has no calibration result, skipped.' %
470  (run_data[0]))
471  return
472  basf2.B2INFO('Writing run %d.' % (run_data[0]))
473  self.machinemachine.algorithm.algorithm.commit(run_data[5])
474 
475  def write_result(run_data, run):
476  iov = run_data[run][5].front().iov
477  run_low = iov.getRunLow()
478  run_high = iov.getRunHigh()
479  j = 0
480  runs = []
481  while (j < len(run_data_klm_excluded)):
482  if (run_low < run_data_klm_excluded[j][0] and
483  ((run_data_klm_excluded[j][0] < run_high) or
484  (run_high == -1))):
485  runs.append([run_data_klm_excluded[j][0], 'klm_excluded'])
486  j += 1
487  if (len(runs) == 0):
488  commit_payload(run_data[run])
489  return
490  for r in run_data[run][2]:
491  runs.append([r.run, 'klm_included'])
492  runs.sort(key=lambda x: x[0])
493  run_first = 0
494  run_last = 0
495  while (run_last < len(runs)):
496  run_last = run_first
497  while (runs[run_last][1] == runs[run_first][1]):
498  run_last += 1
499  if (run_last >= len(runs)):
500  break
501  if (run_first == 0):
502  run1 = run_low
503  else:
504  run1 = runs[run_first][0]
505  if (run_last < len(runs)):
506  run2 = runs[run_last][0] - 1
507  else:
508  run2 = run_high
509  iov = Belle2.IntervalOfValidity(experiment, run1,
510  experiment, run2)
511  if (runs[run_first][1] == 'klm_included'):
512  run_data[run][5].front().iov = iov
513  commit_payload(run_data[run])
514  else:
515  run_data_klm_excluded[0][5].front().iov = iov
516  commit_payload(run_data_klm_excluded[0])
517  run_first = run_last
518 
519  first_run = 0
520  for i in range(0, len(run_data)):
521  # Get first run again due to possible mergings.
522  run_data[i][2].sort(key=lambda x: x.run)
523  first_run = run_data[i][2][0].run
524  # Set IOV for the current run.
525  # The last run will be overwritten when writing the result.
526  run_data[i][5].front().iov = \
527  Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
528  # Compare with the previous run.
529  write_previous_run = True
530  if (i > 0):
531  if (run_data[i][1] == 0 and run_data[i - 1][1] == 0):
532  if (run_data[i][3].getChannelStatus() ==
533  run_data[i - 1][3].getChannelStatus()):
534  basf2.B2INFO('Run %d: result is the same as '
535  'for the previous run %d.' %
536  (run_data[i][0], run_data[i - 1][0]))
537  if (previous_run >= 0):
538  iov = run_data[previous_run][5].front().iov
539  run_data[previous_run][5].front().iov = \
541  experiment, iov.getRunLow(),
542  experiment, first_run - 1)
543  write_previous_run = False
544  # Set IOV for the current run.
545  # The last run will be overwritten when writing the result.
546  run_data[i][5].front().iov = Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
547  # If the calibration result is different, write the previous run.
548  if (write_previous_run and (i > 0)):
549  iov = run_data[previous_run][5].front().iov
550  if (previous_run == 0):
551  run_data[previous_run][5].front().iov = Belle2.IntervalOfValidity(
552  lowest_exprun.exp, lowest_exprun.run,
553  experiment, first_run - 1)
554  else:
555  run_data[previous_run][5].front().iov = Belle2.IntervalOfValidity(experiment, iov.getRunLow(),
556  experiment, first_run - 1)
557  write_result(run_data, previous_run)
558  previous_run = i
559  if (i == 0):
560  previous_run = 0
561  # Write the current run if it is the last run.
562  if (i == len(run_data) - 1):
563  iov = run_data[i][5].front().iov
564  run_data[i][5].front().iov = Belle2.IntervalOfValidity(
565  experiment, iov.getRunLow(),
566  highest_exprun.exp, highest_exprun.run)
567  write_result(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
first_execution
Flag for the first execution of this AlgorithmStrategy.
def execute_over_run_list(self, run_list, iteration, forced_calibration)
queue
The multiprocessing queue used to pass back results one at a time.