11 """Custom calibration strategy for KLM channel status."""
17 from ROOT
import Belle2
19 from caf.utils
import AlgResult, IoV
20 from caf.utils
import runs_overlapping_iov, runs_from_vector
21 from caf.utils
import split_runs_by_exp
22 from caf.strategies
import AlgorithmStrategy, StrategyError
23 from caf.state_machines
import AlgorithmMachine
24 from ROOT.Belle2
import KLMChannelStatusAlgorithm, KLMChannelIndex
25 from klm_strategies_common
import get_lowest_exprun, get_highest_exprun, \
26 calibration_result_string
31 Custom strategy for executing the KLM channel status. Requires complex
34 This uses a `caf.state_machines.AlgorithmMachine` to actually execute
35 the various steps rather than operating on a CalibrationAlgorithm
41 usable_params = {
'iov_coverage': IoV}
50 self.
machinemachine = AlgorithmMachine(self.algorithm)
54 def run(self, iov, iteration, queue):
56 Runs the algorithm machine over the collected data and
59 if not self.is_valid():
60 raise StrategyError(
'The strategy KLMChannelStatus was not '
65 basf2.B2INFO(f
'Setting up {self.__class__.__name__} strategy '
66 f
'for {self.algorithm.name}')
69 machine_params[
'database_chain'] = self.database_chain
70 machine_params[
'dependent_databases'] = self.dependent_databases
71 machine_params[
'output_dir'] = self.output_dir
72 machine_params[
'output_database_dir'] = self.output_database_dir
73 machine_params[
'input_files'] = self.input_files
74 machine_params[
'ignored_runs'] = self.ignored_runs
75 self.
machinemachine.setup_from_dict(machine_params)
77 basf2.B2INFO(f
'Starting AlgorithmMachine of {self.algorithm.name}')
80 self.
machinemachine.setup_algorithm(iteration=iteration)
82 basf2.B2INFO(f
'Beginning execution of {self.algorithm.name} using '
83 f
'strategy {self.__class__.__name__}')
86 runs = self.algorithm.algorithm.getRunListFromAllData()
87 all_runs_collected = set(runs_from_vector(runs))
90 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
92 runs_to_execute = all_runs_collected
95 basf2.B2INFO(f
'Removing the ignored_runs from the runs '
96 f
'to execute for {self.algorithm.name}')
97 runs_to_execute.difference_update(set(self.ignored_runs))
100 runs_to_execute = sorted(runs_to_execute)
101 runs_to_execute = split_runs_by_exp(runs_to_execute)
105 if 'iov_coverage' in self.algorithm.params:
106 iov_coverage = self.algorithm.params[
'iov_coverage']
109 number_of_experiments = len(runs_to_execute)
110 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
111 lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
112 run_list, iov_coverage)
113 highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
114 run_list, iov_coverage)
116 lowest_exprun, highest_exprun)
119 self.send_result(self.
machinemachine.result)
120 if (self.
machinemachine.result.result == AlgResult.ok.value)
or \
121 (self.
machinemachine.result.result == AlgResult.iterate.value):
122 self.send_final_state(self.COMPLETED)
124 self.send_final_state(self.FAILED)
128 Execute over run list.
131 self.
machinemachine.setup_algorithm()
134 self.
machinemachine.algorithm.algorithm.setForcedCalibration(
136 self.
machinemachine.execute_runs(runs=run_list, iteration=iteration,
138 if (self.
machinemachine.result.result == AlgResult.ok.value)
or \
139 (self.
machinemachine.result.result == AlgResult.iterate.value):
145 lowest_exprun, highest_exprun):
147 Process runs from experiment.
153 run_data_klm_excluded = []
156 for exp_run
in experiment_runs:
158 result = self.
machinemachine.result.result
159 algorithm_results = KLMChannelStatusAlgorithm.Results(
160 self.
machinemachine.algorithm.algorithm.getResults())
161 payload = self.
machinemachine.algorithm.algorithm.getPayloadValues()
163 exp_run.run, result, [exp_run], algorithm_results,
'', payload]
164 if (algorithm_results.getTotalHitNumber() > 0):
165 run_data.append(run_results)
167 run_data_klm_excluded.append(run_results)
168 result_str = calibration_result_string(result)
169 basf2.B2INFO(
'Run %d: %s.' % (exp_run.run, result_str))
172 run_data.sort(key=
lambda x: x[0])
173 run_data_klm_excluded.sort(key=
lambda x: x[0])
176 save_channel_hit_map =
False
177 save_module_hit_map =
True
178 save_sector_hit_map =
True
179 f_hit_map = ROOT.TFile(
'hit_map.root',
'recreate')
180 run = numpy.zeros(1, dtype=int)
181 calibration_result = numpy.zeros(1, dtype=int)
182 module = numpy.zeros(1, dtype=int)
183 subdetector = numpy.zeros(1, dtype=int)
184 section = numpy.zeros(1, dtype=int)
185 sector = numpy.zeros(1, dtype=int)
186 layer = numpy.zeros(1, dtype=int)
187 plane = numpy.zeros(1, dtype=int)
188 strip = numpy.zeros(1, dtype=int)
189 hits_total = numpy.zeros(1, dtype=int)
190 hits_module = numpy.zeros(1, dtype=int)
191 active_channels = numpy.zeros(1, dtype=int)
192 hit_map_channel = ROOT.TTree(
'hit_map_channel',
'')
193 hit_map_channel.Branch(
'run', run,
'run/I')
194 hit_map_channel.Branch(
'calibration_result', calibration_result,
195 'calibration_result/I')
196 hit_map_channel.Branch(
'channel', module,
'channel/I')
197 hit_map_channel.Branch(
'subdetector', subdetector,
'subdetector/I')
198 hit_map_channel.Branch(
'section', section,
'section/I')
199 hit_map_channel.Branch(
'sector', sector,
'sector/I')
200 hit_map_channel.Branch(
'layer', layer,
'layer/I')
201 hit_map_channel.Branch(
'plane', plane,
'plane/I')
202 hit_map_channel.Branch(
'strip', strip,
'strip/I')
203 hit_map_channel.Branch(
'hits_total', hits_total,
'hits_total/I')
204 hit_map_channel.Branch(
'hits_channel', hits_module,
'hits_channel/I')
205 hit_map_module = ROOT.TTree(
'hit_map_module',
'')
206 hit_map_module.Branch(
'run', run,
'run/I')
207 hit_map_module.Branch(
'calibration_result', calibration_result,
208 'calibration_result/I')
209 hit_map_module.Branch(
'module', module,
'module/I')
210 hit_map_module.Branch(
'subdetector', subdetector,
'subdetector/I')
211 hit_map_module.Branch(
'section', section,
'section/I')
212 hit_map_module.Branch(
'sector', sector,
'sector/I')
213 hit_map_module.Branch(
'layer', layer,
'layer/I')
214 hit_map_module.Branch(
'hits_total', hits_total,
'hits_total/I')
215 hit_map_module.Branch(
'hits_module', hits_module,
'hits_module/I')
216 hit_map_module.Branch(
'active_channels', active_channels,
218 hit_map_sector = ROOT.TTree(
'hit_map_sector',
'')
219 hit_map_sector.Branch(
'run', run,
'run/I')
220 hit_map_sector.Branch(
'calibration_result', calibration_result,
221 'calibration_result/I')
222 hit_map_sector.Branch(
'sector_global', module,
'sector_global/I')
223 hit_map_sector.Branch(
'subdetector', subdetector,
'subdetector/I')
224 hit_map_sector.Branch(
'section', section,
'section/I')
225 hit_map_sector.Branch(
'sector', sector,
'sector/I')
226 hit_map_sector.Branch(
'hits_total', hits_total,
'hits_total/I')
227 hit_map_sector.Branch(
'hits_sector', hits_module,
'hits_sector/I')
228 for i
in range(0, len(run_data)):
229 run[0] = run_data[i][0]
230 calibration_result[0] = run_data[i][1]
231 hits_total[0] = run_data[i][3].getTotalHitNumber()
233 if (save_channel_hit_map):
234 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
235 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
236 while (index != index2.end()):
237 module[0] = index.getKLMChannelNumber()
238 subdetector[0] = index.getSubdetector()
239 section[0] = index.getSection()
240 sector[0] = index.getSector()
241 layer[0] = index.getLayer()
242 plane[0] = index.getPlane()
243 strip[0] = index.getStrip()
244 hits_module[0] = run_data[i][3].getHitMapChannel(). \
245 getChannelData(int(module[0]))
246 hit_map_channel.Fill()
249 if (save_module_hit_map):
250 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
251 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
252 while (index != index2.end()):
253 module[0] = index.getKLMModuleNumber()
254 subdetector[0] = index.getSubdetector()
255 section[0] = index.getSection()
256 sector[0] = index.getSector()
257 layer[0] = index.getLayer()
258 hits_module[0] = run_data[i][3].getHitMapModule(). \
259 getChannelData(int(module[0]))
260 active_channels[0] = run_data[i][3]. \
261 getModuleActiveChannelMap(). \
262 getChannelData(int(module[0]))
263 hit_map_module.Fill()
266 if (save_sector_hit_map):
267 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
268 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
269 while (index != index2.end()):
270 module[0] = index.getKLMSectorNumber()
271 subdetector[0] = index.getSubdetector()
272 section[0] = index.getSection()
273 sector[0] = index.getSector()
274 hits_module[0] = run_data[i][3].getHitMapSector(). \
275 getChannelData(int(module[0]))
276 hit_map_sector.Fill()
278 hit_map_channel.Write()
279 hit_map_module.Write()
280 hit_map_sector.Write()
286 while (i < len(run_data)):
287 if (run_data[i][1] == 2):
289 while (run_data[j][1] == 2):
291 if (j >= len(run_data)):
293 run_ranges.append([i, j])
300 def can_merge(run_data, run_not_enough_data, run_normal):
301 return run_data[run_not_enough_data][3].getModuleStatus(). \
303 run_data[run_normal][3].getModuleStatus()) == 0
305 for run_range
in run_ranges:
306 next_run = run_range[1]
310 if (next_run < len(run_data)):
311 while (i >= run_range[0]):
312 if (can_merge(run_data, i, next_run)):
314 'Run %d (not enough data) can be merged into '
315 'the next normal run %d.' %
316 (run_data[i][0], run_data[next_run][0]))
317 run_data[i][4] =
'next'
320 'Run %d (not enough data) cannot be merged into '
321 'the next normal run %d, will try the previous '
322 'one.' % (run_data[i][0], run_data[next_run][0]))
325 if (i < run_range[0]):
327 previous_run = run_range[0] - 1
328 if (previous_run >= 0):
330 if (can_merge(run_data, j, previous_run)):
332 'Run %d (not enough data) can be merged into '
333 'the previous normal run %d.' %
334 (run_data[j][0], run_data[previous_run][0]))
335 run_data[j][4] =
'previous'
338 'Run %d (not enough data) cannot be merged into '
339 'the previous normal run %d.' %
340 (run_data[j][0], run_data[previous_run][0]))
345 basf2.B2INFO(
'A range of runs with not enough data is found '
346 'that cannot be merged into neither previous nor '
347 'next normal run: from %d to %d.' %
348 (run_data[j][0], run_data[i][0]))
350 run_data[j][4] =
'none'
357 while (i < len(run_data) - 1):
358 while ((run_data[i][1] == 2)
and (run_data[i + 1][1] == 2)):
359 if (run_data[i][4] != run_data[i + 1][4]):
361 basf2.B2INFO(
'Merging run %d (not enough data) into '
362 'run %d (not enough data).' %
363 (run_data[i + 1][0], run_data[i][0]))
364 run_data[i][2].extend(run_data[i + 1][2])
367 run_data[i][1] = self.
machinemachine.result.result
368 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
369 self.
machinemachine.algorithm.algorithm.getResults())
371 self.
machinemachine.algorithm.algorithm.getPayloadValues()
372 result_str = calibration_result_string(run_data[i][1])
373 basf2.B2INFO(
'Run %d: %s.' % (run_data[i][0], result_str))
374 if (i >= len(run_data) - 1):
380 def merge_runs(run_data, run_not_enough_data, run_normal, forced):
381 basf2.B2INFO(
'Merging run %d (not enough data) into '
383 (run_data[run_not_enough_data][0],
384 run_data[run_normal][0]))
385 run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
388 run_data[run_normal][1] = self.
machinemachine.result.result
389 run_data[run_normal][3] = KLMChannelStatusAlgorithm.Results(
390 self.
machinemachine.algorithm.algorithm.getResults())
391 run_data[run_normal][5] = self.
machinemachine.algorithm.algorithm.getPayloadValues()
392 result_str = calibration_result_string(run_data[run_normal][1])
393 basf2.B2INFO(
'Run %d: %s.' % (run_data[run_normal][0], result_str))
394 if (run_data[run_normal][1] != 0):
395 basf2.B2FATAL(
'Merging run %d into run %d failed.' %
396 (run_data[run_not_enough_data][0],
397 run_data[run_normal][0]))
398 del run_data[run_not_enough_data]
401 while (i < len(run_data)):
402 if (run_data[i][1] == 2):
403 if (run_data[i][4] ==
'next'):
404 merge_runs(run_data, i, i + 1,
False)
405 elif (run_data[i][4] ==
'previous'):
406 merge_runs(run_data, i, i - 1,
False)
412 while (i < len(run_data)):
413 if (run_data[i][1] == 2
and run_data[i][4] ==
'none'):
414 new_modules_previous = -1
415 new_modules_next = -1
416 if (i < len(run_data) - 1):
417 new_modules_next = run_data[i][3].getModuleStatus(). \
418 newNormalChannels(run_data[i + 1][3].getModuleStatus())
419 basf2.B2INFO(
'There are %d new active modules in run %d '
420 'relatively to run %d.' %
421 (new_modules_next, run_data[i][0],
424 new_modules_previous = run_data[i][3].getModuleStatus(). \
425 newNormalChannels(run_data[i - 1][3].getModuleStatus())
426 basf2.B2INFO(
'There are %d new active modules in run %d '
427 'relatively to run %d.' %
428 (new_modules_previous, run_data[i][0],
438 if (new_modules_previous >= 0
and new_modules_next < 0):
439 run_for_merging = i - 1
440 elif (new_modules_previous < 0
and new_modules_next >= 0):
441 run_for_merging = i + 1
442 elif (new_modules_previous >= 0
and new_modules_next >= 0):
443 if (new_modules_previous < new_modules_next):
444 run_for_merging = i - 1
446 run_for_merging = i + 1
448 basf2.B2INFO(
'Cannot determine run for merging for run %d, '
449 'performing its forced calibration.' %
452 run_data[i][1] = self.
machinemachine.result.result
453 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
454 self.
machinemachine.algorithm.algorithm.getResults())
455 run_data[i][5] = self.
machinemachine.algorithm.algorithm.getPayloadValues()
456 result_str = calibration_result_string(run_data[i][1])
457 basf2.B2INFO(
'Run %d: %s.' % (run_data[i][0], result_str))
458 if (run_data[i][1] != 0):
459 basf2.B2FATAL(
'Forced calibration of run %d failed.' %
461 if (run_for_merging >= 0):
462 merge_runs(run_data, i, run_for_merging,
True)
467 def commit_payload(run_data):
468 if (run_data[1] == 2):
469 basf2.B2INFO(
'Run %d has no calibration result, skipped.' %
472 basf2.B2INFO(
'Writing run %d.' % (run_data[0]))
473 self.
machinemachine.algorithm.algorithm.commit(run_data[5])
475 def write_result(run_data, run):
476 iov = run_data[run][5].front().iov
477 run_low = iov.getRunLow()
478 run_high = iov.getRunHigh()
481 while (j < len(run_data_klm_excluded)):
482 if (run_low < run_data_klm_excluded[j][0]
and
483 ((run_data_klm_excluded[j][0] < run_high)
or
485 runs.append([run_data_klm_excluded[j][0],
'klm_excluded'])
488 commit_payload(run_data[run])
490 for r
in run_data[run][2]:
491 runs.append([r.run,
'klm_included'])
492 runs.sort(key=
lambda x: x[0])
495 while (run_last < len(runs)):
497 while (runs[run_last][1] == runs[run_first][1]):
499 if (run_last >= len(runs)):
504 run1 = runs[run_first][0]
505 if (run_last < len(runs)):
506 run2 = runs[run_last][0] - 1
511 if (runs[run_first][1] ==
'klm_included'):
512 run_data[run][5].front().iov = iov
513 commit_payload(run_data[run])
515 run_data_klm_excluded[0][5].front().iov = iov
516 commit_payload(run_data_klm_excluded[0])
520 for i
in range(0, len(run_data)):
522 run_data[i][2].sort(key=
lambda x: x.run)
523 first_run = run_data[i][2][0].run
526 run_data[i][5].front().iov = \
529 write_previous_run =
True
531 if (run_data[i][1] == 0
and run_data[i - 1][1] == 0):
532 if (run_data[i][3].getChannelStatus() ==
533 run_data[i - 1][3].getChannelStatus()):
534 basf2.B2INFO(
'Run %d: result is the same as '
535 'for the previous run %d.' %
536 (run_data[i][0], run_data[i - 1][0]))
537 if (previous_run >= 0):
538 iov = run_data[previous_run][5].front().iov
539 run_data[previous_run][5].front().iov = \
541 experiment, iov.getRunLow(),
542 experiment, first_run - 1)
543 write_previous_run =
False
548 if (write_previous_run
and (i > 0)):
549 iov = run_data[previous_run][5].front().iov
550 if (previous_run == 0):
552 lowest_exprun.exp, lowest_exprun.run,
553 experiment, first_run - 1)
556 experiment, first_run - 1)
557 write_result(run_data, previous_run)
562 if (i == len(run_data) - 1):
563 iov = run_data[i][5].front().iov
565 experiment, iov.getRunLow(),
566 highest_exprun.exp, highest_exprun.run)
567 write_result(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
first_execution
Flag for the first execution of this AlgorithmStrategy.
def __init__(self, algorithm)
def execute_over_run_list(self, run_list, iteration, forced_calibration)
queue
The multiprocessing queue used to pass back results one at a time.