Belle II Software development
klm_channel_status.py
1
8
9"""Custom calibration strategy for KLM channel status."""
10
11import numpy
12
13import basf2
14import ROOT
15from ROOT import Belle2
16
17from caf.utils import AlgResult, IoV
18from caf.utils import runs_overlapping_iov, runs_from_vector
19from caf.utils import split_runs_by_exp
20from caf.strategies import AlgorithmStrategy, StrategyError
21from caf.state_machines import AlgorithmMachine
22from ROOT.Belle2 import KLMChannelStatusAlgorithm, KLMChannelIndex
23from klm_strategies_common import get_lowest_exprun, get_highest_exprun, \
24 calibration_result_string
25
26
27class KLMChannelStatus(AlgorithmStrategy):
28 """
29 Custom strategy for executing the KLM channel status. Requires complex
30 run merging rules.
31
32 This uses a `caf.state_machines.AlgorithmMachine` to actually execute
33 the various steps rather than operating on a CalibrationAlgorithm
34 C++ class directly.
35 """
36
37
39 usable_params = {'iov_coverage': IoV}
40
41
42 ignored_runs: list[int]
43
44 COMPLETED = AlgorithmStrategy.COMPLETED
45
46 FAILED = AlgorithmStrategy.FAILED
47
48 def __init__(self, algorithm):
49 """
50 """
51 super().__init__(algorithm)
52
55 self.machine = AlgorithmMachine(self.algorithm)
56
57 self.first_execution = True
58
59 def run(self, iov, iteration, queue):
60 """
61 Runs the algorithm machine over the collected data and
62 fills the results.
63 """
64 if not self.is_valid():
65 raise StrategyError('The strategy KLMChannelStatus was not '
66 'set up correctly.')
67
68 self.queue = queue
69
70 basf2.B2INFO(f'Setting up {self.__class__.__name__} strategy '
71 f'for {self.algorithm.name}')
72 # Add all the necessary parameters for a strategy to run.
73 machine_params = {}
74 machine_params['database_chain'] = self.database_chain
75 machine_params['dependent_databases'] = self.dependent_databases
76 machine_params['output_dir'] = self.output_dir
77 machine_params['output_database_dir'] = self.output_database_dir
78 machine_params['input_files'] = self.input_files
79 machine_params['ignored_runs'] = self.ignored_runs
80 self.machine.setup_from_dict(machine_params)
81 # Start moving through machine states.
82 basf2.B2INFO(f'Starting AlgorithmMachine of {self.algorithm.name}')
83 # This sets up the logging and database chain and assigns all
84 # input files from collector jobs.
85 self.machine.setup_algorithm(iteration=iteration)
86 # After this point, the logging is in the stdout of the algorithm.
87 basf2.B2INFO(f'Beginning execution of {self.algorithm.name} using '
88 f'strategy {self.__class__.__name__}')
89
90 # Select of runs for calibration.
91 runs = self.algorithm.algorithm.getRunListFromAllData()
92 all_runs_collected = set(runs_from_vector(runs))
93 # Select runs overlapping with the calibration IOV if it is specified.
94 if iov:
95 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
96 else:
97 runs_to_execute = all_runs_collected
98 # Remove the ignored runs.
99 if self.ignored_runs:
100 basf2.B2INFO(f'Removing the ignored_runs from the runs '
101 f'to execute for {self.algorithm.name}')
102 runs_to_execute.difference_update(set(self.ignored_runs))
103
104 # Creation of sorted run list split by experiment.
105 runs_to_execute = sorted(runs_to_execute)
106 runs_to_execute = split_runs_by_exp(runs_to_execute)
107
108 # Get IOV coverage,
109 iov_coverage = None
110 if 'iov_coverage' in self.algorithm.params:
111 iov_coverage = self.algorithm.params['iov_coverage']
112
113 # Iterate over experiment run lists.
114 number_of_experiments = len(runs_to_execute)
115 for i_exp, run_list in enumerate(runs_to_execute, start=1):
116 lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
117 run_list, iov_coverage)
118 highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
119 run_list, iov_coverage)
120 self.process_experiment(run_list[0].exp, run_list, iteration,
121 lowest_exprun, highest_exprun)
122
123 # Send final state and result to CAF.
124 self.send_result(self.machine.result)
125 if (self.machine.result.result == AlgResult.ok.value) or \
126 (self.machine.result.result == AlgResult.iterate.value):
127 self.send_final_state(self.COMPLETED)
128 else:
129 self.send_final_state(self.FAILED)
130
131 def execute_over_run_list(self, run_list, iteration, forced_calibration):
132 """
133 Execute over run list.
134 """
135 if not self.first_execution:
136 self.machine.setup_algorithm()
137 else:
138 self.first_execution = False
139 self.machine.algorithm.algorithm.setForcedCalibration(
140 forced_calibration)
141 self.machine.execute_runs(runs=run_list, iteration=iteration,
142 apply_iov=None)
143 if (self.machine.result.result == AlgResult.ok.value) or \
144 (self.machine.result.result == AlgResult.iterate.value):
145 self.machine.complete()
146 else:
147 self.machine.fail()
148
149 def process_experiment(self, experiment, experiment_runs, iteration,
150 lowest_exprun, highest_exprun):
151 """
152 Process runs from experiment.
153 """
154 # Run lists. They have the following format: run number,
155 # calibration result code, ExpRun, algorithm results,
156 # merge information, payload.
157 run_data = []
158 run_data_klm_excluded = []
159
160 # Initial run.
161 for exp_run in experiment_runs:
162 self.execute_over_run_list([exp_run], iteration, False)
163 result = self.machine.result.result
164 algorithm_results = KLMChannelStatusAlgorithm.Results(
165 self.machine.algorithm.algorithm.getResults())
166 payload = self.machine.algorithm.algorithm.getPayloadValues()
167 run_results = [
168 exp_run.run, result, [exp_run], algorithm_results, '', payload]
169 if (algorithm_results.getTotalHitNumber() > 0):
170 run_data.append(run_results)
171 else:
172 run_data_klm_excluded.append(run_results)
173 result_str = calibration_result_string(result)
174 basf2.B2INFO(f'Run {int(exp_run.run)}: {result_str}.')
175
176 # Sort by run number.
177 run_data.sort(key=lambda x: x[0])
178 run_data_klm_excluded.sort(key=lambda x: x[0])
179
180 # Create a tree with number of events.
181 save_channel_hit_map = False
182 save_module_hit_map = True
183 save_sector_hit_map = True
184 f_hit_map = ROOT.TFile('hit_map.root', 'recreate')
185 run = numpy.zeros(1, dtype=int)
186 calibration_result = numpy.zeros(1, dtype=int)
187 module = numpy.zeros(1, dtype=int)
188 subdetector = numpy.zeros(1, dtype=int)
189 section = numpy.zeros(1, dtype=int)
190 sector = numpy.zeros(1, dtype=int)
191 layer = numpy.zeros(1, dtype=int)
192 plane = numpy.zeros(1, dtype=int)
193 strip = numpy.zeros(1, dtype=int)
194 hits_total = numpy.zeros(1, dtype=int)
195 hits_module = numpy.zeros(1, dtype=int)
196 active_channels = numpy.zeros(1, dtype=int)
197 hit_map_channel = ROOT.TTree('hit_map_channel', '')
198 hit_map_channel.Branch('run', run, 'run/I')
199 hit_map_channel.Branch('calibration_result', calibration_result,
200 'calibration_result/I')
201 hit_map_channel.Branch('channel', module, 'channel/I')
202 hit_map_channel.Branch('subdetector', subdetector, 'subdetector/I')
203 hit_map_channel.Branch('section', section, 'section/I')
204 hit_map_channel.Branch('sector', sector, 'sector/I')
205 hit_map_channel.Branch('layer', layer, 'layer/I')
206 hit_map_channel.Branch('plane', plane, 'plane/I')
207 hit_map_channel.Branch('strip', strip, 'strip/I')
208 hit_map_channel.Branch('hits_total', hits_total, 'hits_total/I')
209 hit_map_channel.Branch('hits_channel', hits_module, 'hits_channel/I')
210 hit_map_module = ROOT.TTree('hit_map_module', '')
211 hit_map_module.Branch('run', run, 'run/I')
212 hit_map_module.Branch('calibration_result', calibration_result,
213 'calibration_result/I')
214 hit_map_module.Branch('module', module, 'module/I')
215 hit_map_module.Branch('subdetector', subdetector, 'subdetector/I')
216 hit_map_module.Branch('section', section, 'section/I')
217 hit_map_module.Branch('sector', sector, 'sector/I')
218 hit_map_module.Branch('layer', layer, 'layer/I')
219 hit_map_module.Branch('hits_total', hits_total, 'hits_total/I')
220 hit_map_module.Branch('hits_module', hits_module, 'hits_module/I')
221 hit_map_module.Branch('active_channels', active_channels,
222 'active_channels/I')
223 hit_map_sector = ROOT.TTree('hit_map_sector', '')
224 hit_map_sector.Branch('run', run, 'run/I')
225 hit_map_sector.Branch('calibration_result', calibration_result,
226 'calibration_result/I')
227 hit_map_sector.Branch('sector_global', module, 'sector_global/I')
228 hit_map_sector.Branch('subdetector', subdetector, 'subdetector/I')
229 hit_map_sector.Branch('section', section, 'section/I')
230 hit_map_sector.Branch('sector', sector, 'sector/I')
231 hit_map_sector.Branch('hits_total', hits_total, 'hits_total/I')
232 hit_map_sector.Branch('hits_sector', hits_module, 'hits_sector/I')
233 for i in range(0, len(run_data)):
234 run[0] = run_data[i][0]
235 calibration_result[0] = run_data[i][1]
236 hits_total[0] = run_data[i][3].getTotalHitNumber()
237 # Channel hit map.
238 if (save_channel_hit_map):
239 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
240 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
241 while (index != index2.end()):
242 module[0] = index.getKLMChannelNumber()
243 subdetector[0] = index.getSubdetector()
244 section[0] = index.getSection()
245 sector[0] = index.getSector()
246 layer[0] = index.getLayer()
247 plane[0] = index.getPlane()
248 strip[0] = index.getStrip()
249 hits_module[0] = run_data[i][3].getHitMapChannel(). \
250 getChannelData(int(module[0]))
251 hit_map_channel.Fill()
252 index.increment()
253 # Module hit map.
254 if (save_module_hit_map):
255 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
256 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
257 while (index != index2.end()):
258 module[0] = index.getKLMModuleNumber()
259 subdetector[0] = index.getSubdetector()
260 section[0] = index.getSection()
261 sector[0] = index.getSector()
262 layer[0] = index.getLayer()
263 hits_module[0] = run_data[i][3].getHitMapModule(). \
264 getChannelData(int(module[0]))
265 active_channels[0] = run_data[i][3]. \
266 getModuleActiveChannelMap(). \
267 getChannelData(int(module[0]))
268 hit_map_module.Fill()
269 index.increment()
270 # Sector hit map.
271 if (save_sector_hit_map):
272 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
273 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
274 while (index != index2.end()):
275 module[0] = index.getKLMSectorNumber()
276 subdetector[0] = index.getSubdetector()
277 section[0] = index.getSection()
278 sector[0] = index.getSector()
279 hits_module[0] = run_data[i][3].getHitMapSector(). \
280 getChannelData(int(module[0]))
281 hit_map_sector.Fill()
282 index.increment()
283 hit_map_channel.Write()
284 hit_map_module.Write()
285 hit_map_sector.Write()
286 f_hit_map.Close()
287
288 # Create list of runs that do not have enough data.
289 run_ranges = []
290 i = 0
291 while (i < len(run_data)):
292 if (run_data[i][1] == 2):
293 j = i
294 while (run_data[j][1] == 2):
295 j += 1
296 if (j >= len(run_data)):
297 break
298 run_ranges.append([i, j])
299 i = j
300 else:
301 i += 1
302
303 # Determine whether the runs with insufficient data can be merged
304 # to the next or previous normal run.
305 def can_merge(run_data, run_not_enough_data, run_normal):
306 return run_data[run_not_enough_data][3].getModuleStatus(). \
307 newNormalChannels(
308 run_data[run_normal][3].getModuleStatus()) == 0
309
310 for run_range in run_ranges:
311 next_run = run_range[1]
312 # To mark as 'none' at the end if there are no normal runs.
313 j = run_range[0]
314 i = next_run - 1
315 if (next_run < len(run_data)):
316 while (i >= run_range[0]):
317 if (can_merge(run_data, i, next_run)):
318 basf2.B2INFO(
319 f'Run {int(run_data[i][0])} (not enough data) can be merged into the next normal run ' +
320 f'{int(run_data[next_run][0])}.')
321 run_data[i][4] = 'next'
322 else:
323 basf2.B2INFO(
324 f'Run {int(run_data[i][0])} (not enough data) cannot be merged into the next normal run ' +
325 f'{int(run_data[next_run][0])}, will try the previous one.')
326 break
327 i -= 1
328 if (i < run_range[0]):
329 continue
330 previous_run = run_range[0] - 1
331 if (previous_run >= 0):
332 while (j <= i):
333 if (can_merge(run_data, j, previous_run)):
334 basf2.B2INFO(
335 f'Run {int(run_data[j][0])} (not enough data) can be merged into the previous normal run ' +
336 f'{int(run_data[previous_run][0])}.')
337 run_data[j][4] = 'previous'
338 else:
339 basf2.B2INFO(
340 f'Run {int(run_data[j][0])} (not enough data) cannot be merged into the previous normal run ' +
341 f'{int(run_data[previous_run][0])}.')
342 break
343 j += 1
344 if (j > i):
345 continue
346 basf2.B2INFO('A range of runs with not enough data is found that cannot be merged into neither previous nor ' +
347 f'next normal run: from {int(run_data[j][0])} to {int(run_data[i][0])}.')
348 while (j <= i):
349 run_data[j][4] = 'none'
350 j += 1
351
352 # Merge runs that do not have enough data. If both this and next
353 # run do not have enough data, then merge the collected data.
354 i = 0
355 j = 0
356 while (i < len(run_data) - 1):
357 while ((run_data[i][1] == 2) and (run_data[i + 1][1] == 2)):
358 if (run_data[i][4] != run_data[i + 1][4]):
359 break
360 basf2.B2INFO(f'Merging run {int(run_data[i + 1][0])} (not enough data) into run {int(run_data[i][0])} ' +
361 '(not enough data).')
362 run_data[i][2].extend(run_data[i + 1][2])
363 del run_data[i + 1]
364 self.execute_over_run_list(run_data[i][2], iteration, False)
365 run_data[i][1] = self.machine.result.result
366 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
367 self.machine.algorithm.algorithm.getResults())
368 run_data[i][5] = \
369 self.machine.algorithm.algorithm.getPayloadValues()
370 result_str = calibration_result_string(run_data[i][1])
371 basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
372 if (i >= len(run_data) - 1):
373 break
374 i += 1
375
376 # Merge runs that do not have enough data into normal runs.
377 # Currently merging the data (TODO: consider result comparison).
378 def merge_runs(run_data, run_not_enough_data, run_normal, forced):
379 basf2.B2INFO(f'Merging run {int(run_data[run_not_enough_data][0])} (not enough data) into run ' +
380 f'{int(run_data[run_normal][0])} (normal).')
381 run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
382 self.execute_over_run_list(run_data[run_normal][2], iteration,
383 forced)
384 run_data[run_normal][1] = self.machine.result.result
385 run_data[run_normal][3] = KLMChannelStatusAlgorithm.Results(
386 self.machine.algorithm.algorithm.getResults())
387 run_data[run_normal][5] = self.machine.algorithm.algorithm.getPayloadValues()
388 result_str = calibration_result_string(run_data[run_normal][1])
389 basf2.B2INFO(f'Run {int(run_data[run_normal][0])}: {result_str}.')
390 if (run_data[run_normal][1] != 0):
391 basf2.B2FATAL(f'Merging run {int(run_data[run_not_enough_data][0])} into run {int(run_data[run_normal][0])} ' +
392 'failed.')
393 del run_data[run_not_enough_data]
394
395 i = 0
396 while (i < len(run_data)):
397 if (run_data[i][1] == 2):
398 if (run_data[i][4] == 'next'):
399 merge_runs(run_data, i, i + 1, False)
400 elif (run_data[i][4] == 'previous'):
401 merge_runs(run_data, i, i - 1, False)
402 else:
403 i += 1
404 else:
405 i += 1
406 i = 0
407 while (i < len(run_data)):
408 if (run_data[i][1] == 2 and run_data[i][4] == 'none'):
409 new_modules_previous = -1
410 new_modules_next = -1
411 if (i < len(run_data) - 1):
412 new_modules_next = run_data[i][3].getModuleStatus(). \
413 newNormalChannels(run_data[i + 1][3].getModuleStatus())
414 basf2.B2INFO(f'There are {int(new_modules_next)} new active modules in run {int(run_data[i][0])} ' +
415 f'relatively to run {int(run_data[i + 1][0])}.')
416 if (i > 0):
417 new_modules_previous = run_data[i][3].getModuleStatus(). \
418 newNormalChannels(run_data[i - 1][3].getModuleStatus())
419 basf2.B2INFO(f'There are {int(new_modules_previous)} new active modules in run {int(run_data[i][0])} ' +
420 f'relatively to run {int(run_data[i - 1][0])}.')
421 run_for_merging = -1
422 # If a forced merge of the normal run with another run from
423 # a different range of runs with not enough data has already
424 # been performed, then the list of active modules may change
425 # and there would be 0 new modules. Consequently, the number
426 # of modules is checked to be greater or equal than 0. However,
427 # there is no guarantee that the same added module would be
428 # calibrated normally. Thus, a forced merge is performed anyway.
429 if (new_modules_previous >= 0 and new_modules_next < 0):
430 run_for_merging = i - 1
431 elif (new_modules_previous < 0 and new_modules_next >= 0):
432 run_for_merging = i + 1
433 elif (new_modules_previous >= 0 and new_modules_next >= 0):
434 if (new_modules_previous < new_modules_next):
435 run_for_merging = i - 1
436 else:
437 run_for_merging = i + 1
438 else:
439 basf2.B2INFO(f'Cannot determine run for merging for run {int(run_data[i][0])}, performing its forced ' +
440 'calibration.')
441 self.execute_over_run_list(run_data[i][2], iteration, True)
442 run_data[i][1] = self.machine.result.result
443 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
444 self.machine.algorithm.algorithm.getResults())
445 run_data[i][5] = self.machine.algorithm.algorithm.getPayloadValues()
446 result_str = calibration_result_string(run_data[i][1])
447 basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
448 if (run_data[i][1] != 0):
449 basf2.B2FATAL(f'Forced calibration of run {int(run_data[i][0])} failed.')
450 if (run_for_merging >= 0):
451 merge_runs(run_data, i, run_for_merging, True)
452 else:
453 i += 1
454
455 # Write the results.
456 def commit_payload(run_data):
457 if (run_data[1] == 2):
458 basf2.B2INFO(f'Run {int(run_data[0])} has no calibration result, skipped.')
459 return
460 basf2.B2INFO(f'Writing run {int(run_data[0])}.')
461 self.machine.algorithm.algorithm.commit(run_data[5])
462
463 def write_result(run_data, run):
464 iov = run_data[run][5].front().iov
465 run_low = iov.getRunLow()
466 run_high = iov.getRunHigh()
467 j = 0
468 runs = []
469 while (j < len(run_data_klm_excluded)):
470 if (run_low < run_data_klm_excluded[j][0] and
471 ((run_data_klm_excluded[j][0] < run_high) or
472 (run_high == -1))):
473 runs.append([run_data_klm_excluded[j][0], 'klm_excluded'])
474 j += 1
475 if (len(runs) == 0):
476 commit_payload(run_data[run])
477 return
478 for r in run_data[run][2]:
479 runs.append([r.run, 'klm_included'])
480 runs.sort(key=lambda x: x[0])
481 run_first = 0
482 run_last = 0
483 while (run_last < len(runs)):
484 run_last = run_first
485 while (runs[run_last][1] == runs[run_first][1]):
486 run_last += 1
487 if (run_last >= len(runs)):
488 break
489 if (run_first == 0):
490 run1 = run_low
491 else:
492 run1 = runs[run_first][0]
493 if (run_last < len(runs)):
494 run2 = runs[run_last][0] - 1
495 else:
496 run2 = run_high
497 iov = Belle2.IntervalOfValidity(experiment, run1,
498 experiment, run2)
499 if (runs[run_first][1] == 'klm_included'):
500 run_data[run][5].front().iov = iov
501 commit_payload(run_data[run])
502 else:
503 run_data_klm_excluded[0][5].front().iov = iov
504 commit_payload(run_data_klm_excluded[0])
505 run_first = run_last
506
507 first_run = 0
508 for i in range(0, len(run_data)):
509 # Get first run again due to possible mergings.
510 run_data[i][2].sort(key=lambda x: x.run)
511 first_run = run_data[i][2][0].run
512 # Set IOV for the current run.
513 # The last run will be overwritten when writing the result.
514 run_data[i][5].front().iov = \
515 Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
516 # Compare with the previous run.
517 write_previous_run = True
518 if (i > 0):
519 if (run_data[i][1] == 0 and run_data[i - 1][1] == 0):
520 if (run_data[i][3].getChannelStatus() ==
521 run_data[i - 1][3].getChannelStatus()):
522 basf2.B2INFO(f'Run {int(run_data[i][0])}: result is the same as for the previous run ' +
523 f'{int(run_data[i - 1][0])}.')
524 if (previous_run >= 0):
525 iov = run_data[previous_run][5].front().iov
526 run_data[previous_run][5].front().iov = \
528 experiment, iov.getRunLow(),
529 experiment, first_run - 1)
530 write_previous_run = False
531 # Set IOV for the current run.
532 # The last run will be overwritten when writing the result.
533 run_data[i][5].front().iov = Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
534 # If the calibration result is different, write the previous run.
535 if (write_previous_run and (i > 0)):
536 iov = run_data[previous_run][5].front().iov
537 if (previous_run == 0):
538 run_data[previous_run][5].front().iov = Belle2.IntervalOfValidity(
539 lowest_exprun.exp, lowest_exprun.run,
540 experiment, first_run - 1)
541 else:
542 run_data[previous_run][5].front().iov = Belle2.IntervalOfValidity(experiment, iov.getRunLow(),
543 experiment, first_run - 1)
544 write_result(run_data, previous_run)
545 previous_run = i
546 if (i == 0):
547 previous_run = 0
548 # Write the current run if it is the last run.
549 if (i == len(run_data) - 1):
550 iov = run_data[i][5].front().iov
551 run_data[i][5].front().iov = Belle2.IntervalOfValidity(
552 experiment, iov.getRunLow(),
553 highest_exprun.exp, highest_exprun.run)
554 write_result(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
execute_over_run_list(self, run_list, iteration, forced_calibration)
bool first_execution
Flag for the first execution of this AlgorithmStrategy.
queue
The multiprocessing queue used to pass back results one at a time.