Belle II Software development
klm_channel_status.py
1
8
9"""Custom calibration strategy for KLM channel status."""
10
11import numpy
12
13import basf2
14import ROOT
15from ROOT import Belle2
16
17from caf.utils import AlgResult, IoV
18from caf.utils import runs_overlapping_iov, runs_from_vector
19from caf.utils import split_runs_by_exp
20from caf.strategies import AlgorithmStrategy, StrategyError
21from caf.state_machines import AlgorithmMachine
22from ROOT.Belle2 import KLMChannelStatusAlgorithm, KLMChannelIndex
23from klm_strategies_common import get_lowest_exprun, get_highest_exprun, \
24 calibration_result_string
25
26
27class KLMChannelStatus(AlgorithmStrategy):
28 """
29 Custom strategy for executing the KLM channel status. Requires complex
30 run merging rules.
31
32 This uses a `caf.state_machines.AlgorithmMachine` to actually execute
33 the various steps rather than operating on a CalibrationAlgorithm
34 C++ class directly.
35 """
36
37
39 usable_params = {'iov_coverage': IoV}
40
41 def __init__(self, algorithm):
42 """
43 """
44 super().__init__(algorithm)
45
48 self.machine = AlgorithmMachine(self.algorithm)
49
50 self.first_execution = True
51
52 def run(self, iov, iteration, queue):
53 """
54 Runs the algorithm machine over the collected data and
55 fills the results.
56 """
57 if not self.is_valid():
58 raise StrategyError('The strategy KLMChannelStatus was not '
59 'set up correctly.')
60
61 self.queue = queue
62
63 basf2.B2INFO(f'Setting up {self.__class__.__name__} strategy '
64 f'for {self.algorithm.name}')
65 # Add all the necessary parameters for a strategy to run.
66 machine_params = {}
67 machine_params['database_chain'] = self.database_chain
68 machine_params['dependent_databases'] = self.dependent_databases
69 machine_params['output_dir'] = self.output_dir
70 machine_params['output_database_dir'] = self.output_database_dir
71 machine_params['input_files'] = self.input_files
72 machine_params['ignored_runs'] = self.ignored_runs
73 self.machine.setup_from_dict(machine_params)
74 # Start moving through machine states.
75 basf2.B2INFO(f'Starting AlgorithmMachine of {self.algorithm.name}')
76 # This sets up the logging and database chain and assigns all
77 # input files from collector jobs.
78 self.machine.setup_algorithm(iteration=iteration)
79 # After this point, the logging is in the stdout of the algorithm.
80 basf2.B2INFO(f'Beginning execution of {self.algorithm.name} using '
81 f'strategy {self.__class__.__name__}')
82
83 # Select of runs for calibration.
84 runs = self.algorithm.algorithm.getRunListFromAllData()
85 all_runs_collected = set(runs_from_vector(runs))
86 # Select runs overlapping with the calibration IOV if it is specified.
87 if iov:
88 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
89 else:
90 runs_to_execute = all_runs_collected
91 # Remove the ignored runs.
92 if self.ignored_runs:
93 basf2.B2INFO(f'Removing the ignored_runs from the runs '
94 f'to execute for {self.algorithm.name}')
95 runs_to_execute.difference_update(set(self.ignored_runs))
96
97 # Creation of sorted run list split by experiment.
98 runs_to_execute = sorted(runs_to_execute)
99 runs_to_execute = split_runs_by_exp(runs_to_execute)
100
101 # Get IOV coverage,
102 iov_coverage = None
103 if 'iov_coverage' in self.algorithm.params:
104 iov_coverage = self.algorithm.params['iov_coverage']
105
106 # Iterate over experiment run lists.
107 number_of_experiments = len(runs_to_execute)
108 for i_exp, run_list in enumerate(runs_to_execute, start=1):
109 lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
110 run_list, iov_coverage)
111 highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
112 run_list, iov_coverage)
113 self.process_experiment(run_list[0].exp, run_list, iteration,
114 lowest_exprun, highest_exprun)
115
116 # Send final state and result to CAF.
117 self.send_result(self.machine.result)
118 if (self.machine.result.result == AlgResult.ok.value) or \
119 (self.machine.result.result == AlgResult.iterate.value):
120 self.send_final_state(self.COMPLETED)
121 else:
122 self.send_final_state(self.FAILED)
123
124 def execute_over_run_list(self, run_list, iteration, forced_calibration):
125 """
126 Execute over run list.
127 """
128 if not self.first_execution:
129 self.machine.setup_algorithm()
130 else:
131 self.first_execution = False
132 self.machine.algorithm.algorithm.setForcedCalibration(
133 forced_calibration)
134 self.machine.execute_runs(runs=run_list, iteration=iteration,
135 apply_iov=None)
136 if (self.machine.result.result == AlgResult.ok.value) or \
137 (self.machine.result.result == AlgResult.iterate.value):
138 self.machine.complete()
139 else:
140 self.machine.fail()
141
142 def process_experiment(self, experiment, experiment_runs, iteration,
143 lowest_exprun, highest_exprun):
144 """
145 Process runs from experiment.
146 """
147 # Run lists. They have the following format: run number,
148 # calibration result code, ExpRun, algorithm results,
149 # merge information, payload.
150 run_data = []
151 run_data_klm_excluded = []
152
153 # Initial run.
154 for exp_run in experiment_runs:
155 self.execute_over_run_list([exp_run], iteration, False)
156 result = self.machine.result.result
157 algorithm_results = KLMChannelStatusAlgorithm.Results(
158 self.machine.algorithm.algorithm.getResults())
159 payload = self.machine.algorithm.algorithm.getPayloadValues()
160 run_results = [
161 exp_run.run, result, [exp_run], algorithm_results, '', payload]
162 if (algorithm_results.getTotalHitNumber() > 0):
163 run_data.append(run_results)
164 else:
165 run_data_klm_excluded.append(run_results)
166 result_str = calibration_result_string(result)
167 basf2.B2INFO(f'Run {int(exp_run.run)}: {result_str}.')
168
169 # Sort by run number.
170 run_data.sort(key=lambda x: x[0])
171 run_data_klm_excluded.sort(key=lambda x: x[0])
172
173 # Create a tree with number of events.
174 save_channel_hit_map = False
175 save_module_hit_map = True
176 save_sector_hit_map = True
177 f_hit_map = ROOT.TFile('hit_map.root', 'recreate')
178 run = numpy.zeros(1, dtype=int)
179 calibration_result = numpy.zeros(1, dtype=int)
180 module = numpy.zeros(1, dtype=int)
181 subdetector = numpy.zeros(1, dtype=int)
182 section = numpy.zeros(1, dtype=int)
183 sector = numpy.zeros(1, dtype=int)
184 layer = numpy.zeros(1, dtype=int)
185 plane = numpy.zeros(1, dtype=int)
186 strip = numpy.zeros(1, dtype=int)
187 hits_total = numpy.zeros(1, dtype=int)
188 hits_module = numpy.zeros(1, dtype=int)
189 active_channels = numpy.zeros(1, dtype=int)
190 hit_map_channel = ROOT.TTree('hit_map_channel', '')
191 hit_map_channel.Branch('run', run, 'run/I')
192 hit_map_channel.Branch('calibration_result', calibration_result,
193 'calibration_result/I')
194 hit_map_channel.Branch('channel', module, 'channel/I')
195 hit_map_channel.Branch('subdetector', subdetector, 'subdetector/I')
196 hit_map_channel.Branch('section', section, 'section/I')
197 hit_map_channel.Branch('sector', sector, 'sector/I')
198 hit_map_channel.Branch('layer', layer, 'layer/I')
199 hit_map_channel.Branch('plane', plane, 'plane/I')
200 hit_map_channel.Branch('strip', strip, 'strip/I')
201 hit_map_channel.Branch('hits_total', hits_total, 'hits_total/I')
202 hit_map_channel.Branch('hits_channel', hits_module, 'hits_channel/I')
203 hit_map_module = ROOT.TTree('hit_map_module', '')
204 hit_map_module.Branch('run', run, 'run/I')
205 hit_map_module.Branch('calibration_result', calibration_result,
206 'calibration_result/I')
207 hit_map_module.Branch('module', module, 'module/I')
208 hit_map_module.Branch('subdetector', subdetector, 'subdetector/I')
209 hit_map_module.Branch('section', section, 'section/I')
210 hit_map_module.Branch('sector', sector, 'sector/I')
211 hit_map_module.Branch('layer', layer, 'layer/I')
212 hit_map_module.Branch('hits_total', hits_total, 'hits_total/I')
213 hit_map_module.Branch('hits_module', hits_module, 'hits_module/I')
214 hit_map_module.Branch('active_channels', active_channels,
215 'active_channels/I')
216 hit_map_sector = ROOT.TTree('hit_map_sector', '')
217 hit_map_sector.Branch('run', run, 'run/I')
218 hit_map_sector.Branch('calibration_result', calibration_result,
219 'calibration_result/I')
220 hit_map_sector.Branch('sector_global', module, 'sector_global/I')
221 hit_map_sector.Branch('subdetector', subdetector, 'subdetector/I')
222 hit_map_sector.Branch('section', section, 'section/I')
223 hit_map_sector.Branch('sector', sector, 'sector/I')
224 hit_map_sector.Branch('hits_total', hits_total, 'hits_total/I')
225 hit_map_sector.Branch('hits_sector', hits_module, 'hits_sector/I')
226 for i in range(0, len(run_data)):
227 run[0] = run_data[i][0]
228 calibration_result[0] = run_data[i][1]
229 hits_total[0] = run_data[i][3].getTotalHitNumber()
230 # Channel hit map.
231 if (save_channel_hit_map):
232 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
233 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
234 while (index != index2.end()):
235 module[0] = index.getKLMChannelNumber()
236 subdetector[0] = index.getSubdetector()
237 section[0] = index.getSection()
238 sector[0] = index.getSector()
239 layer[0] = index.getLayer()
240 plane[0] = index.getPlane()
241 strip[0] = index.getStrip()
242 hits_module[0] = run_data[i][3].getHitMapChannel(). \
243 getChannelData(int(module[0]))
244 hit_map_channel.Fill()
245 index.increment()
246 # Module hit map.
247 if (save_module_hit_map):
248 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
249 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
250 while (index != index2.end()):
251 module[0] = index.getKLMModuleNumber()
252 subdetector[0] = index.getSubdetector()
253 section[0] = index.getSection()
254 sector[0] = index.getSector()
255 layer[0] = index.getLayer()
256 hits_module[0] = run_data[i][3].getHitMapModule(). \
257 getChannelData(int(module[0]))
258 active_channels[0] = run_data[i][3]. \
259 getModuleActiveChannelMap(). \
260 getChannelData(int(module[0]))
261 hit_map_module.Fill()
262 index.increment()
263 # Sector hit map.
264 if (save_sector_hit_map):
265 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
266 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
267 while (index != index2.end()):
268 module[0] = index.getKLMSectorNumber()
269 subdetector[0] = index.getSubdetector()
270 section[0] = index.getSection()
271 sector[0] = index.getSector()
272 hits_module[0] = run_data[i][3].getHitMapSector(). \
273 getChannelData(int(module[0]))
274 hit_map_sector.Fill()
275 index.increment()
276 hit_map_channel.Write()
277 hit_map_module.Write()
278 hit_map_sector.Write()
279 f_hit_map.Close()
280
281 # Create list of runs that do not have enough data.
282 run_ranges = []
283 i = 0
284 while (i < len(run_data)):
285 if (run_data[i][1] == 2):
286 j = i
287 while (run_data[j][1] == 2):
288 j += 1
289 if (j >= len(run_data)):
290 break
291 run_ranges.append([i, j])
292 i = j
293 else:
294 i += 1
295
296 # Determine whether the runs with insufficient data can be merged
297 # to the next or previous normal run.
298 def can_merge(run_data, run_not_enough_data, run_normal):
299 return run_data[run_not_enough_data][3].getModuleStatus(). \
300 newNormalChannels(
301 run_data[run_normal][3].getModuleStatus()) == 0
302
303 for run_range in run_ranges:
304 next_run = run_range[1]
305 # To mark as 'none' at the end if there are no normal runs.
306 j = run_range[0]
307 i = next_run - 1
308 if (next_run < len(run_data)):
309 while (i >= run_range[0]):
310 if (can_merge(run_data, i, next_run)):
311 basf2.B2INFO(
312 f'Run {int(run_data[i][0])} (not enough data) can be merged into the next normal run ' +
313 f'{int(run_data[next_run][0])}.')
314 run_data[i][4] = 'next'
315 else:
316 basf2.B2INFO(
317 f'Run {int(run_data[i][0])} (not enough data) cannot be merged into the next normal run ' +
318 f'{int(run_data[next_run][0])}, will try the previous one.')
319 break
320 i -= 1
321 if (i < run_range[0]):
322 continue
323 previous_run = run_range[0] - 1
324 if (previous_run >= 0):
325 while (j <= i):
326 if (can_merge(run_data, j, previous_run)):
327 basf2.B2INFO(
328 f'Run {int(run_data[j][0])} (not enough data) can be merged into the previous normal run ' +
329 f'{int(run_data[previous_run][0])}.')
330 run_data[j][4] = 'previous'
331 else:
332 basf2.B2INFO(
333 f'Run {int(run_data[j][0])} (not enough data) cannot be merged into the previous normal run ' +
334 f'{int(run_data[previous_run][0])}.')
335 break
336 j += 1
337 if (j > i):
338 continue
339 basf2.B2INFO('A range of runs with not enough data is found that cannot be merged into neither previous nor ' +
340 f'next normal run: from {int(run_data[j][0])} to {int(run_data[i][0])}.')
341 while (j <= i):
342 run_data[j][4] = 'none'
343 j += 1
344
345 # Merge runs that do not have enough data. If both this and next
346 # run do not have enough data, then merge the collected data.
347 i = 0
348 j = 0
349 while (i < len(run_data) - 1):
350 while ((run_data[i][1] == 2) and (run_data[i + 1][1] == 2)):
351 if (run_data[i][4] != run_data[i + 1][4]):
352 break
353 basf2.B2INFO(f'Merging run {int(run_data[i + 1][0])} (not enough data) into run {int(run_data[i][0])} ' +
354 '(not enough data).')
355 run_data[i][2].extend(run_data[i + 1][2])
356 del run_data[i + 1]
357 self.execute_over_run_list(run_data[i][2], iteration, False)
358 run_data[i][1] = self.machine.result.result
359 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
360 self.machine.algorithm.algorithm.getResults())
361 run_data[i][5] = \
362 self.machine.algorithm.algorithm.getPayloadValues()
363 result_str = calibration_result_string(run_data[i][1])
364 basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
365 if (i >= len(run_data) - 1):
366 break
367 i += 1
368
369 # Merge runs that do not have enough data into normal runs.
370 # Currently merging the data (TODO: consider result comparison).
371 def merge_runs(run_data, run_not_enough_data, run_normal, forced):
372 basf2.B2INFO(f'Merging run {int(run_data[run_not_enough_data][0])} (not enough data) into run ' +
373 f'{int(run_data[run_normal][0])} (normal).')
374 run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
375 self.execute_over_run_list(run_data[run_normal][2], iteration,
376 forced)
377 run_data[run_normal][1] = self.machine.result.result
378 run_data[run_normal][3] = KLMChannelStatusAlgorithm.Results(
379 self.machine.algorithm.algorithm.getResults())
380 run_data[run_normal][5] = self.machine.algorithm.algorithm.getPayloadValues()
381 result_str = calibration_result_string(run_data[run_normal][1])
382 basf2.B2INFO(f'Run {int(run_data[run_normal][0])}: {result_str}.')
383 if (run_data[run_normal][1] != 0):
384 basf2.B2FATAL(f'Merging run {int(run_data[run_not_enough_data][0])} into run {int(run_data[run_normal][0])} ' +
385 'failed.')
386 del run_data[run_not_enough_data]
387
388 i = 0
389 while (i < len(run_data)):
390 if (run_data[i][1] == 2):
391 if (run_data[i][4] == 'next'):
392 merge_runs(run_data, i, i + 1, False)
393 elif (run_data[i][4] == 'previous'):
394 merge_runs(run_data, i, i - 1, False)
395 else:
396 i += 1
397 else:
398 i += 1
399 i = 0
400 while (i < len(run_data)):
401 if (run_data[i][1] == 2 and run_data[i][4] == 'none'):
402 new_modules_previous = -1
403 new_modules_next = -1
404 if (i < len(run_data) - 1):
405 new_modules_next = run_data[i][3].getModuleStatus(). \
406 newNormalChannels(run_data[i + 1][3].getModuleStatus())
407 basf2.B2INFO(f'There are {int(new_modules_next)} new active modules in run {int(run_data[i][0])} ' +
408 f'relatively to run {int(run_data[i + 1][0])}.')
409 if (i > 0):
410 new_modules_previous = run_data[i][3].getModuleStatus(). \
411 newNormalChannels(run_data[i - 1][3].getModuleStatus())
412 basf2.B2INFO(f'There are {int(new_modules_previous)} new active modules in run {int(run_data[i][0])} ' +
413 f'relatively to run {int(run_data[i - 1][0])}.')
414 run_for_merging = -1
415 # If a forced merge of the normal run with another run from
416 # a different range of runs with not enough data has already
417 # been performed, then the list of active modules may change
418 # and there would be 0 new modules. Consequently, the number
419 # of modules is checked to be greater or equal than 0. However,
420 # there is no guarantee that the same added module would be
421 # calibrated normally. Thus, a forced merge is performed anyway.
422 if (new_modules_previous >= 0 and new_modules_next < 0):
423 run_for_merging = i - 1
424 elif (new_modules_previous < 0 and new_modules_next >= 0):
425 run_for_merging = i + 1
426 elif (new_modules_previous >= 0 and new_modules_next >= 0):
427 if (new_modules_previous < new_modules_next):
428 run_for_merging = i - 1
429 else:
430 run_for_merging = i + 1
431 else:
432 basf2.B2INFO(f'Cannot determine run for merging for run {int(run_data[i][0])}, performing its forced ' +
433 'calibration.')
434 self.execute_over_run_list(run_data[i][2], iteration, True)
435 run_data[i][1] = self.machine.result.result
436 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
437 self.machine.algorithm.algorithm.getResults())
438 run_data[i][5] = self.machine.algorithm.algorithm.getPayloadValues()
439 result_str = calibration_result_string(run_data[i][1])
440 basf2.B2INFO(f'Run {int(run_data[i][0])}: {result_str}.')
441 if (run_data[i][1] != 0):
442 basf2.B2FATAL(f'Forced calibration of run {int(run_data[i][0])} failed.')
443 if (run_for_merging >= 0):
444 merge_runs(run_data, i, run_for_merging, True)
445 else:
446 i += 1
447
448 # Write the results.
449 def commit_payload(run_data):
450 if (run_data[1] == 2):
451 basf2.B2INFO(f'Run {int(run_data[0])} has no calibration result, skipped.')
452 return
453 basf2.B2INFO(f'Writing run {int(run_data[0])}.')
454 self.machine.algorithm.algorithm.commit(run_data[5])
455
456 def write_result(run_data, run):
457 iov = run_data[run][5].front().iov
458 run_low = iov.getRunLow()
459 run_high = iov.getRunHigh()
460 j = 0
461 runs = []
462 while (j < len(run_data_klm_excluded)):
463 if (run_low < run_data_klm_excluded[j][0] and
464 ((run_data_klm_excluded[j][0] < run_high) or
465 (run_high == -1))):
466 runs.append([run_data_klm_excluded[j][0], 'klm_excluded'])
467 j += 1
468 if (len(runs) == 0):
469 commit_payload(run_data[run])
470 return
471 for r in run_data[run][2]:
472 runs.append([r.run, 'klm_included'])
473 runs.sort(key=lambda x: x[0])
474 run_first = 0
475 run_last = 0
476 while (run_last < len(runs)):
477 run_last = run_first
478 while (runs[run_last][1] == runs[run_first][1]):
479 run_last += 1
480 if (run_last >= len(runs)):
481 break
482 if (run_first == 0):
483 run1 = run_low
484 else:
485 run1 = runs[run_first][0]
486 if (run_last < len(runs)):
487 run2 = runs[run_last][0] - 1
488 else:
489 run2 = run_high
490 iov = Belle2.IntervalOfValidity(experiment, run1,
491 experiment, run2)
492 if (runs[run_first][1] == 'klm_included'):
493 run_data[run][5].front().iov = iov
494 commit_payload(run_data[run])
495 else:
496 run_data_klm_excluded[0][5].front().iov = iov
497 commit_payload(run_data_klm_excluded[0])
498 run_first = run_last
499
500 first_run = 0
501 for i in range(0, len(run_data)):
502 # Get first run again due to possible mergings.
503 run_data[i][2].sort(key=lambda x: x.run)
504 first_run = run_data[i][2][0].run
505 # Set IOV for the current run.
506 # The last run will be overwritten when writing the result.
507 run_data[i][5].front().iov = \
508 Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
509 # Compare with the previous run.
510 write_previous_run = True
511 if (i > 0):
512 if (run_data[i][1] == 0 and run_data[i - 1][1] == 0):
513 if (run_data[i][3].getChannelStatus() ==
514 run_data[i - 1][3].getChannelStatus()):
515 basf2.B2INFO(f'Run {int(run_data[i][0])}: result is the same as for the previous run ' +
516 f'{int(run_data[i - 1][0])}.')
517 if (previous_run >= 0):
518 iov = run_data[previous_run][5].front().iov
519 run_data[previous_run][5].front().iov = \
521 experiment, iov.getRunLow(),
522 experiment, first_run - 1)
523 write_previous_run = False
524 # Set IOV for the current run.
525 # The last run will be overwritten when writing the result.
526 run_data[i][5].front().iov = Belle2.IntervalOfValidity(experiment, first_run, experiment, -1)
527 # If the calibration result is different, write the previous run.
528 if (write_previous_run and (i > 0)):
529 iov = run_data[previous_run][5].front().iov
530 if (previous_run == 0):
531 run_data[previous_run][5].front().iov = Belle2.IntervalOfValidity(
532 lowest_exprun.exp, lowest_exprun.run,
533 experiment, first_run - 1)
534 else:
535 run_data[previous_run][5].front().iov = Belle2.IntervalOfValidity(experiment, iov.getRunLow(),
536 experiment, first_run - 1)
537 write_result(run_data, previous_run)
538 previous_run = i
539 if (i == 0):
540 previous_run = 0
541 # Write the current run if it is the last run.
542 if (i == len(run_data) - 1):
543 iov = run_data[i][5].front().iov
544 run_data[i][5].front().iov = Belle2.IntervalOfValidity(
545 experiment, iov.getRunLow(),
546 highest_exprun.exp, highest_exprun.run)
547 write_result(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
first_execution
Flag for the first execution of this AlgorithmStrategy.
def execute_over_run_list(self, run_list, iteration, forced_calibration)
queue
The multiprocessing queue used to pass back results one at a time.