3 """Custom calibration strategy for KLM channel status."""
11 from ROOT
import Belle2
13 from caf.utils
import ExpRun, IoV, AlgResult
14 from caf.utils
import runs_overlapping_iov, runs_from_vector
15 from caf.utils
import split_runs_by_exp
16 from caf.strategies
import AlgorithmStrategy
17 from caf.state_machines
import AlgorithmMachine
18 from ROOT.Belle2
import KLMChannelStatusAlgorithm, KLMChannelIndex
19 from klm_strategies_common
import get_lowest_exprun, get_highest_exprun, \
20 calibration_result_string
25 Custom strategy for executing the KLM channel status. Requires complex
28 This uses a `caf.state_machines.AlgorithmMachine` to actually execute
29 the various steps rather than operating on a CalibrationAlgorithm
35 usable_params = {
'iov_coverage': IoV}
44 self.
machine = AlgorithmMachine(self.algorithm)
48 def run(self, iov, iteration, queue):
50 Runs the algorithm machine over the collected data and
53 if not self.is_valid():
54 raise StrategyError(
'The strategy KLMChannelStatus was not '
59 basf2.B2INFO(f
'Setting up {self.__class__.__name__} strategy '
60 f
'for {self.algorithm.name}')
63 machine_params[
'database_chain'] = self.database_chain
64 machine_params[
'dependent_databases'] = self.dependent_databases
65 machine_params[
'output_dir'] = self.output_dir
66 machine_params[
'output_database_dir'] = self.output_database_dir
67 machine_params[
'input_files'] = self.input_files
68 machine_params[
'ignored_runs'] = self.ignored_runs
69 self.
machine.setup_from_dict(machine_params)
71 basf2.B2INFO(f
'Starting AlgorithmMachine of {self.algorithm.name}')
74 self.
machine.setup_algorithm(iteration=iteration)
76 basf2.B2INFO(f
'Beginning execution of {self.algorithm.name} using '
77 f
'strategy {self.__class__.__name__}')
80 runs = self.algorithm.algorithm.getRunListFromAllData()
81 all_runs_collected = set(runs_from_vector(runs))
84 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
86 runs_to_execute = all_runs_collected
89 basf2.B2INFO(f
'Removing the ignored_runs from the runs '
90 f
'to execute for {self.algorithm.name}')
91 runs_to_execute.difference_update(set(self.ignored_runs))
94 runs_to_execute = sorted(runs_to_execute)
95 runs_to_execute = split_runs_by_exp(runs_to_execute)
99 if 'iov_coverage' in self.algorithm.params:
100 iov_coverage = self.algorithm.params[
'iov_coverage']
103 number_of_experiments = len(runs_to_execute)
104 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
105 lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
106 run_list, iov_coverage)
107 highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
108 run_list, iov_coverage)
110 lowest_exprun, highest_exprun)
113 self.send_result(self.
machine.result)
114 if (self.
machine.result.result == AlgResult.ok.value)
or \
115 (self.
machine.result.result == AlgResult.iterate.value):
116 self.send_final_state(self.COMPLETED)
118 self.send_final_state(self.FAILED)
122 Execute over run list.
128 self.
machine.algorithm.algorithm.setForcedCalibration(
130 self.
machine.execute_runs(runs=run_list, iteration=iteration,
132 if (self.
machine.result.result == AlgResult.ok.value)
or \
133 (self.
machine.result.result == AlgResult.iterate.value):
139 lowest_exprun, highest_exprun):
141 Process runs from experiment.
147 run_data_klm_excluded = []
150 for exp_run
in experiment_runs:
152 result = self.
machine.result.result
153 algorithm_results = KLMChannelStatusAlgorithm.Results(
154 self.
machine.algorithm.algorithm.getResults())
155 payload = self.
machine.algorithm.algorithm.getPayloadValues()
157 exp_run.run, result, [exp_run], algorithm_results,
'', payload]
158 if (algorithm_results.getTotalHitNumber() > 0):
159 run_data.append(run_results)
161 run_data_klm_excluded.append(run_results)
162 result_str = calibration_result_string(result)
163 basf2.B2INFO(
'Run %d: %s.' % (exp_run.run, result_str))
166 run_data.sort(key=
lambda x: x[0])
167 run_data_klm_excluded.sort(key=
lambda x: x[0])
170 save_channel_hit_map =
False
171 save_module_hit_map =
True
172 save_sector_hit_map =
True
173 f_hit_map = ROOT.TFile(
'hit_map.root',
'recreate')
174 run = numpy.zeros(1, dtype=int)
175 calibration_result = numpy.zeros(1, dtype=int)
176 module = numpy.zeros(1, dtype=int)
177 subdetector = numpy.zeros(1, dtype=int)
178 section = numpy.zeros(1, dtype=int)
179 sector = numpy.zeros(1, dtype=int)
180 layer = numpy.zeros(1, dtype=int)
181 plane = numpy.zeros(1, dtype=int)
182 strip = numpy.zeros(1, dtype=int)
183 hits_total = numpy.zeros(1, dtype=int)
184 hits_module = numpy.zeros(1, dtype=int)
185 active_channels = numpy.zeros(1, dtype=int)
186 hit_map_channel = ROOT.TTree(
'hit_map_channel',
'')
187 hit_map_channel.Branch(
'run', run,
'run/I')
188 hit_map_channel.Branch(
'calibration_result', calibration_result,
189 'calibration_result/I')
190 hit_map_channel.Branch(
'channel', module,
'channel/I')
191 hit_map_channel.Branch(
'subdetector', subdetector,
'subdetector/I')
192 hit_map_channel.Branch(
'section', section,
'section/I')
193 hit_map_channel.Branch(
'sector', sector,
'sector/I')
194 hit_map_channel.Branch(
'layer', layer,
'layer/I')
195 hit_map_channel.Branch(
'plane', plane,
'plane/I')
196 hit_map_channel.Branch(
'strip', strip,
'strip/I')
197 hit_map_channel.Branch(
'hits_total', hits_total,
'hits_total/I')
198 hit_map_channel.Branch(
'hits_channel', hits_module,
'hits_channel/I')
199 hit_map_module = ROOT.TTree(
'hit_map_module',
'')
200 hit_map_module.Branch(
'run', run,
'run/I')
201 hit_map_module.Branch(
'calibration_result', calibration_result,
202 'calibration_result/I')
203 hit_map_module.Branch(
'module', module,
'module/I')
204 hit_map_module.Branch(
'subdetector', subdetector,
'subdetector/I')
205 hit_map_module.Branch(
'section', section,
'section/I')
206 hit_map_module.Branch(
'sector', sector,
'sector/I')
207 hit_map_module.Branch(
'layer', layer,
'layer/I')
208 hit_map_module.Branch(
'hits_total', hits_total,
'hits_total/I')
209 hit_map_module.Branch(
'hits_module', hits_module,
'hits_module/I')
210 hit_map_module.Branch(
'active_channels', active_channels,
212 hit_map_sector = ROOT.TTree(
'hit_map_sector',
'')
213 hit_map_sector.Branch(
'run', run,
'run/I')
214 hit_map_sector.Branch(
'calibration_result', calibration_result,
215 'calibration_result/I')
216 hit_map_sector.Branch(
'sector_global', module,
'sector_global/I')
217 hit_map_sector.Branch(
'subdetector', subdetector,
'subdetector/I')
218 hit_map_sector.Branch(
'section', section,
'section/I')
219 hit_map_sector.Branch(
'sector', sector,
'sector/I')
220 hit_map_sector.Branch(
'hits_total', hits_total,
'hits_total/I')
221 hit_map_sector.Branch(
'hits_sector', hits_module,
'hits_sector/I')
222 for i
in range(0, len(run_data)):
223 run[0] = run_data[i][0]
224 calibration_result[0] = run_data[i][1]
225 hits_total[0] = run_data[i][3].getTotalHitNumber()
227 if (save_channel_hit_map):
228 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
229 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
230 while (index != index2.end()):
231 module[0] = index.getKLMChannelNumber()
232 subdetector[0] = index.getSubdetector()
233 section[0] = index.getSection()
234 sector[0] = index.getSector()
235 layer[0] = index.getLayer()
236 plane[0] = index.getPlane()
237 strip[0] = index.getStrip()
238 hits_module[0] = run_data[i][3].getHitMapChannel(). \
239 getChannelData(int(module[0]))
240 hit_map_channel.Fill()
243 if (save_module_hit_map):
244 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
245 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
246 while (index != index2.end()):
247 module[0] = index.getKLMModuleNumber()
248 subdetector[0] = index.getSubdetector()
249 section[0] = index.getSection()
250 sector[0] = index.getSector()
251 layer[0] = index.getLayer()
252 hits_module[0] = run_data[i][3].getHitMapModule(). \
253 getChannelData(int(module[0]))
254 active_channels[0] = run_data[i][3]. \
255 getModuleActiveChannelMap(). \
256 getChannelData(int(module[0]))
257 hit_map_module.Fill()
260 if (save_sector_hit_map):
261 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
262 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
263 while (index != index2.end()):
264 module[0] = index.getKLMSectorNumber()
265 subdetector[0] = index.getSubdetector()
266 section[0] = index.getSection()
267 sector[0] = index.getSector()
268 hits_module[0] = run_data[i][3].getHitMapSector(). \
269 getChannelData(int(module[0]))
270 hit_map_sector.Fill()
272 hit_map_channel.Write()
273 hit_map_module.Write()
274 hit_map_sector.Write()
280 while (i < len(run_data)):
281 if (run_data[i][1] == 2):
283 while (run_data[j][1] == 2):
285 if (j >= len(run_data)):
287 run_ranges.append([i, j])
294 def can_merge(run_data, run_not_enough_data, run_normal):
295 return run_data[run_not_enough_data][3].getModuleStatus(). \
297 run_data[run_normal][3].getModuleStatus()) == 0
299 for run_range
in run_ranges:
300 next_run = run_range[1]
304 if (next_run < len(run_data)):
305 while (i >= run_range[0]):
306 if (can_merge(run_data, i, next_run)):
308 'Run %d (not enough data) can be merged into '
309 'the next normal run %d.' %
310 (run_data[i][0], run_data[next_run][0]))
311 run_data[i][4] =
'next'
314 'Run %d (not enough data) cannot be merged into '
315 'the next normal run %d, will try the previous '
316 'one.' % (run_data[i][0], run_data[next_run][0]))
319 if (i < run_range[0]):
321 previous_run = run_range[0] - 1
322 if (previous_run >= 0):
324 if (can_merge(run_data, j, previous_run)):
326 'Run %d (not enough data) can be merged into '
327 'the previous normal run %d.' %
328 (run_data[j][0], run_data[previous_run][0]))
329 run_data[j][4] =
'previous'
332 'Run %d (not enough data) cannot be merged into '
333 'the previous normal run %d.' %
334 (run_data[j][0], run_data[previous_run][0]))
339 basf2.B2INFO(
'A range of runs with not enough data is found '
340 'that cannot be merged into neither previous nor '
341 'next normal run: from %d to %d.' %
342 (run_data[j][0], run_data[i][0]))
344 run_data[j][4] =
'none'
351 while (i < len(run_data) - 1):
352 while ((run_data[i][1] == 2)
and (run_data[i + 1][1] == 2)):
353 if (run_data[i][4] != run_data[i + 1][4]):
355 basf2.B2INFO(
'Merging run %d (not enough data) into '
356 'run %d (not enough data).' %
357 (run_data[i + 1][0], run_data[i][0]))
358 run_data[i][2].extend(run_data[i + 1][2])
361 run_data[i][1] = self.
machine.result.result
362 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
363 self.
machine.algorithm.algorithm.getResults())
365 self.
machine.algorithm.algorithm.getPayloadValues()
366 result_str = calibration_result_string(run_data[i][1])
367 basf2.B2INFO(
'Run %d: %s.' % (run_data[i][0], result_str))
368 if (i >= len(run_data) - 1):
374 def merge_runs(run_data, run_not_enough_data, run_normal, forced):
375 basf2.B2INFO(
'Merging run %d (not enough data) into '
377 (run_data[run_not_enough_data][0],
378 run_data[run_normal][0]))
379 run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
382 run_data[run_normal][1] = self.
machine.result.result
383 run_data[run_normal][3] = KLMChannelStatusAlgorithm.Results(
384 self.
machine.algorithm.algorithm.getResults())
385 run_data[run_normal][5] = self.
machine.algorithm.algorithm.getPayloadValues()
386 result_str = calibration_result_string(run_data[run_normal][1])
387 basf2.B2INFO(
'Run %d: %s.' % (run_data[run_normal][0], result_str))
388 if (run_data[run_normal][1] != 0):
389 basf2.B2FATAL(
'Merging run %d into run %d failed.' %
390 (run_data[run_not_enough_data][0],
391 run_data[run_normal][0]))
392 del run_data[run_not_enough_data]
395 while (i < len(run_data)):
396 if (run_data[i][1] == 2):
397 if (run_data[i][4] ==
'next'):
398 merge_runs(run_data, i, i + 1,
False)
399 elif (run_data[i][4] ==
'previous'):
400 merge_runs(run_data, i, i - 1,
False)
406 while (i < len(run_data)):
407 if (run_data[i][1] == 2
and run_data[i][4] ==
'none'):
408 new_modules_previous = -1
409 new_modules_next = -1
410 if (i < len(run_data) - 1):
411 new_modules_next = run_data[i][3].getModuleStatus(). \
412 newNormalChannels(run_data[i + 1][3].getModuleStatus())
413 basf2.B2INFO(
'There are %d new active modules in run %d '
414 'relatively to run %d.' %
415 (new_modules_next, run_data[i][0],
418 new_modules_previous = run_data[i][3].getModuleStatus(). \
419 newNormalChannels(run_data[i - 1][3].getModuleStatus())
420 basf2.B2INFO(
'There are %d new active modules in run %d '
421 'relatively to run %d.' %
422 (new_modules_previous, run_data[i][0],
432 if (new_modules_previous >= 0
and new_modules_next < 0):
433 run_for_merging = i - 1
434 elif (new_modules_previous < 0
and new_modules_next >= 0):
435 run_for_merging = i + 1
436 elif (new_modules_previous >= 0
and new_modules_next >= 0):
437 if (new_modules_previous < new_modules_next):
438 run_for_merging = i - 1
440 run_for_merging = i + 1
442 basf2.B2INFO(
'Cannot determine run for merging for run %d, '
443 'performing its forced calibration.' %
446 run_data[i][1] = self.
machine.result.result
447 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
448 self.
machine.algorithm.algorithm.getResults())
449 run_data[i][5] = self.
machine.algorithm.algorithm.getPayloadValues()
450 result_str = calibration_result_string(run_data[i][1])
451 basf2.B2INFO(
'Run %d: %s.' % (run_data[i][0], result_str))
452 if (run_data[i][1] != 0):
453 basf2.B2FATAL(
'Forced calibration of run %d failed.' %
455 if (run_for_merging >= 0):
456 merge_runs(run_data, i, run_for_merging,
True)
461 def commit_payload(run_data):
462 if (run_data[1] == 2):
463 basf2.B2INFO(
'Run %d has no calibration result, skipped.' %
466 basf2.B2INFO(
'Writing run %d.' % (run_data[0]))
467 self.
machine.algorithm.algorithm.commit(run_data[5])
469 def write_result(run_data, run):
470 iov = run_data[run][5].front().iov
471 run_low = iov.getRunLow()
472 run_high = iov.getRunHigh()
475 while (j < len(run_data_klm_excluded)):
476 if (run_low < run_data_klm_excluded[j][0]
and
477 ((run_data_klm_excluded[j][0] < run_high)
or
479 runs.append([run_data_klm_excluded[j][0],
'klm_excluded'])
482 commit_payload(run_data[run])
484 for r
in run_data[run][2]:
485 runs.append([r.run,
'klm_included'])
486 runs.sort(key=
lambda x: x[0])
489 while (run_last < len(runs)):
491 while (runs[run_last][1] == runs[run_first][1]):
493 if (run_last >= len(runs)):
498 run1 = runs[run_first][0]
499 if (run_last < len(runs)):
500 run2 = runs[run_last][0] - 1
505 if (runs[run_first][1] ==
'klm_included'):
506 run_data[run][5].front().iov = iov
507 commit_payload(run_data[run])
509 run_data_klm_excluded[0][5].front().iov = iov
510 commit_payload(run_data_klm_excluded[0])
514 for i
in range(0, len(run_data)):
516 run_data[i][2].sort(key=
lambda x: x.run)
517 first_run = run_data[i][2][0].run
520 run_data[i][5].front().iov = \
523 write_previous_run =
True
525 if (run_data[i][1] == 0
and run_data[i - 1][1] == 0):
526 if (run_data[i][3].getChannelStatus() ==
527 run_data[i - 1][3].getChannelStatus()):
528 basf2.B2INFO(
'Run %d: result is the same as '
529 'for the previous run %d.' %
530 (run_data[i][0], run_data[i - 1][0]))
531 if (previous_run >= 0):
532 iov = run_data[previous_run][5].front().iov
533 run_data[previous_run][5].front().iov = \
535 experiment, iov.getRunLow(),
536 experiment, first_run - 1)
537 write_previous_run =
False
542 if (write_previous_run
and (i > 0)):
543 iov = run_data[previous_run][5].front().iov
544 if (previous_run == 0):
546 lowest_exprun.exp, lowest_exprun.run,
547 experiment, first_run - 1)
550 experiment, first_run - 1)
551 write_result(run_data, previous_run)
556 if (i == len(run_data) - 1):
557 iov = run_data[i][5].front().iov
559 experiment, iov.getRunLow(),
560 highest_exprun.exp, highest_exprun.run)
561 write_result(run_data, i)