9 """Custom calibration strategy for KLM channel status."""
15 from ROOT
import Belle2
17 from caf.utils
import AlgResult, IoV
18 from caf.utils
import runs_overlapping_iov, runs_from_vector
19 from caf.utils
import split_runs_by_exp
20 from caf.strategies
import AlgorithmStrategy, StrategyError
21 from caf.state_machines
import AlgorithmMachine
22 from ROOT.Belle2
import KLMChannelStatusAlgorithm, KLMChannelIndex
23 from klm_strategies_common
import get_lowest_exprun, get_highest_exprun, \
24 calibration_result_string
29 Custom strategy for executing the KLM channel status. Requires complex
32 This uses a `caf.state_machines.AlgorithmMachine` to actually execute
33 the various steps rather than operating on a CalibrationAlgorithm
39 usable_params = {
'iov_coverage': IoV}
48 self.
machinemachine = AlgorithmMachine(self.algorithm)
52 def run(self, iov, iteration, queue):
54 Runs the algorithm machine over the collected data and
57 if not self.is_valid():
58 raise StrategyError(
'The strategy KLMChannelStatus was not '
63 basf2.B2INFO(f
'Setting up {self.__class__.__name__} strategy '
64 f
'for {self.algorithm.name}')
67 machine_params[
'database_chain'] = self.database_chain
68 machine_params[
'dependent_databases'] = self.dependent_databases
69 machine_params[
'output_dir'] = self.output_dir
70 machine_params[
'output_database_dir'] = self.output_database_dir
71 machine_params[
'input_files'] = self.input_files
72 machine_params[
'ignored_runs'] = self.ignored_runs
73 self.
machinemachine.setup_from_dict(machine_params)
75 basf2.B2INFO(f
'Starting AlgorithmMachine of {self.algorithm.name}')
78 self.
machinemachine.setup_algorithm(iteration=iteration)
80 basf2.B2INFO(f
'Beginning execution of {self.algorithm.name} using '
81 f
'strategy {self.__class__.__name__}')
84 runs = self.algorithm.algorithm.getRunListFromAllData()
85 all_runs_collected = set(runs_from_vector(runs))
88 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
90 runs_to_execute = all_runs_collected
93 basf2.B2INFO(f
'Removing the ignored_runs from the runs '
94 f
'to execute for {self.algorithm.name}')
95 runs_to_execute.difference_update(set(self.ignored_runs))
98 runs_to_execute = sorted(runs_to_execute)
99 runs_to_execute = split_runs_by_exp(runs_to_execute)
103 if 'iov_coverage' in self.algorithm.params:
104 iov_coverage = self.algorithm.params[
'iov_coverage']
107 number_of_experiments = len(runs_to_execute)
108 for i_exp, run_list
in enumerate(runs_to_execute, start=1):
109 lowest_exprun = get_lowest_exprun(number_of_experiments, i_exp,
110 run_list, iov_coverage)
111 highest_exprun = get_highest_exprun(number_of_experiments, i_exp,
112 run_list, iov_coverage)
114 lowest_exprun, highest_exprun)
117 self.send_result(self.
machinemachine.result)
118 if (self.
machinemachine.result.result == AlgResult.ok.value)
or \
119 (self.
machinemachine.result.result == AlgResult.iterate.value):
120 self.send_final_state(self.COMPLETED)
122 self.send_final_state(self.FAILED)
126 Execute over run list.
129 self.
machinemachine.setup_algorithm()
132 self.
machinemachine.algorithm.algorithm.setForcedCalibration(
134 self.
machinemachine.execute_runs(runs=run_list, iteration=iteration,
136 if (self.
machinemachine.result.result == AlgResult.ok.value)
or \
137 (self.
machinemachine.result.result == AlgResult.iterate.value):
143 lowest_exprun, highest_exprun):
145 Process runs from experiment.
151 run_data_klm_excluded = []
154 for exp_run
in experiment_runs:
156 result = self.
machinemachine.result.result
157 algorithm_results = KLMChannelStatusAlgorithm.Results(
158 self.
machinemachine.algorithm.algorithm.getResults())
159 payload = self.
machinemachine.algorithm.algorithm.getPayloadValues()
161 exp_run.run, result, [exp_run], algorithm_results,
'', payload]
162 if (algorithm_results.getTotalHitNumber() > 0):
163 run_data.append(run_results)
165 run_data_klm_excluded.append(run_results)
166 result_str = calibration_result_string(result)
167 basf2.B2INFO(f
'Run {int(exp_run.run)}: {result_str}.')
170 run_data.sort(key=
lambda x: x[0])
171 run_data_klm_excluded.sort(key=
lambda x: x[0])
174 save_channel_hit_map =
False
175 save_module_hit_map =
True
176 save_sector_hit_map =
True
177 f_hit_map = ROOT.TFile(
'hit_map.root',
'recreate')
178 run = numpy.zeros(1, dtype=int)
179 calibration_result = numpy.zeros(1, dtype=int)
180 module = numpy.zeros(1, dtype=int)
181 subdetector = numpy.zeros(1, dtype=int)
182 section = numpy.zeros(1, dtype=int)
183 sector = numpy.zeros(1, dtype=int)
184 layer = numpy.zeros(1, dtype=int)
185 plane = numpy.zeros(1, dtype=int)
186 strip = numpy.zeros(1, dtype=int)
187 hits_total = numpy.zeros(1, dtype=int)
188 hits_module = numpy.zeros(1, dtype=int)
189 active_channels = numpy.zeros(1, dtype=int)
190 hit_map_channel = ROOT.TTree(
'hit_map_channel',
'')
191 hit_map_channel.Branch(
'run', run,
'run/I')
192 hit_map_channel.Branch(
'calibration_result', calibration_result,
193 'calibration_result/I')
194 hit_map_channel.Branch(
'channel', module,
'channel/I')
195 hit_map_channel.Branch(
'subdetector', subdetector,
'subdetector/I')
196 hit_map_channel.Branch(
'section', section,
'section/I')
197 hit_map_channel.Branch(
'sector', sector,
'sector/I')
198 hit_map_channel.Branch(
'layer', layer,
'layer/I')
199 hit_map_channel.Branch(
'plane', plane,
'plane/I')
200 hit_map_channel.Branch(
'strip', strip,
'strip/I')
201 hit_map_channel.Branch(
'hits_total', hits_total,
'hits_total/I')
202 hit_map_channel.Branch(
'hits_channel', hits_module,
'hits_channel/I')
203 hit_map_module = ROOT.TTree(
'hit_map_module',
'')
204 hit_map_module.Branch(
'run', run,
'run/I')
205 hit_map_module.Branch(
'calibration_result', calibration_result,
206 'calibration_result/I')
207 hit_map_module.Branch(
'module', module,
'module/I')
208 hit_map_module.Branch(
'subdetector', subdetector,
'subdetector/I')
209 hit_map_module.Branch(
'section', section,
'section/I')
210 hit_map_module.Branch(
'sector', sector,
'sector/I')
211 hit_map_module.Branch(
'layer', layer,
'layer/I')
212 hit_map_module.Branch(
'hits_total', hits_total,
'hits_total/I')
213 hit_map_module.Branch(
'hits_module', hits_module,
'hits_module/I')
214 hit_map_module.Branch(
'active_channels', active_channels,
216 hit_map_sector = ROOT.TTree(
'hit_map_sector',
'')
217 hit_map_sector.Branch(
'run', run,
'run/I')
218 hit_map_sector.Branch(
'calibration_result', calibration_result,
219 'calibration_result/I')
220 hit_map_sector.Branch(
'sector_global', module,
'sector_global/I')
221 hit_map_sector.Branch(
'subdetector', subdetector,
'subdetector/I')
222 hit_map_sector.Branch(
'section', section,
'section/I')
223 hit_map_sector.Branch(
'sector', sector,
'sector/I')
224 hit_map_sector.Branch(
'hits_total', hits_total,
'hits_total/I')
225 hit_map_sector.Branch(
'hits_sector', hits_module,
'hits_sector/I')
226 for i
in range(0, len(run_data)):
227 run[0] = run_data[i][0]
228 calibration_result[0] = run_data[i][1]
229 hits_total[0] = run_data[i][3].getTotalHitNumber()
231 if (save_channel_hit_map):
232 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
233 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelStrip)
234 while (index != index2.end()):
235 module[0] = index.getKLMChannelNumber()
236 subdetector[0] = index.getSubdetector()
237 section[0] = index.getSection()
238 sector[0] = index.getSector()
239 layer[0] = index.getLayer()
240 plane[0] = index.getPlane()
241 strip[0] = index.getStrip()
242 hits_module[0] = run_data[i][3].getHitMapChannel(). \
243 getChannelData(int(module[0]))
244 hit_map_channel.Fill()
247 if (save_module_hit_map):
248 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
249 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelLayer)
250 while (index != index2.end()):
251 module[0] = index.getKLMModuleNumber()
252 subdetector[0] = index.getSubdetector()
253 section[0] = index.getSection()
254 sector[0] = index.getSector()
255 layer[0] = index.getLayer()
256 hits_module[0] = run_data[i][3].getHitMapModule(). \
257 getChannelData(int(module[0]))
258 active_channels[0] = run_data[i][3]. \
259 getModuleActiveChannelMap(). \
260 getChannelData(int(module[0]))
261 hit_map_module.Fill()
264 if (save_sector_hit_map):
265 index = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
266 index2 = KLMChannelIndex(KLMChannelIndex.c_IndexLevelSector)
267 while (index != index2.end()):
268 module[0] = index.getKLMSectorNumber()
269 subdetector[0] = index.getSubdetector()
270 section[0] = index.getSection()
271 sector[0] = index.getSector()
272 hits_module[0] = run_data[i][3].getHitMapSector(). \
273 getChannelData(int(module[0]))
274 hit_map_sector.Fill()
276 hit_map_channel.Write()
277 hit_map_module.Write()
278 hit_map_sector.Write()
284 while (i < len(run_data)):
285 if (run_data[i][1] == 2):
287 while (run_data[j][1] == 2):
289 if (j >= len(run_data)):
291 run_ranges.append([i, j])
298 def can_merge(run_data, run_not_enough_data, run_normal):
299 return run_data[run_not_enough_data][3].getModuleStatus(). \
301 run_data[run_normal][3].getModuleStatus()) == 0
303 for run_range
in run_ranges:
304 next_run = run_range[1]
308 if (next_run < len(run_data)):
309 while (i >= run_range[0]):
310 if (can_merge(run_data, i, next_run)):
312 f
'Run {int(run_data[i][0])} (not enough data) can be merged into the next normal run ' +
313 f
'{int(run_data[next_run][0])}.')
314 run_data[i][4] =
'next'
317 f
'Run {int(run_data[i][0])} (not enough data) cannot be merged into the next normal run ' +
318 f
'{int(run_data[next_run][0])}, will try the previous one.')
321 if (i < run_range[0]):
323 previous_run = run_range[0] - 1
324 if (previous_run >= 0):
326 if (can_merge(run_data, j, previous_run)):
328 f
'Run {int(run_data[j][0])} (not enough data) can be merged into the previous normal run ' +
329 f
'{int(run_data[previous_run][0])}.')
330 run_data[j][4] =
'previous'
333 f
'Run {int(run_data[j][0])} (not enough data) cannot be merged into the previous normal run ' +
334 f
'{int(run_data[previous_run][0])}.')
339 basf2.B2INFO(
'A range of runs with not enough data is found that cannot be merged into neither previous nor ' +
340 f
'next normal run: from {int(run_data[j][0])} to {int(run_data[i][0])}.')
342 run_data[j][4] =
'none'
349 while (i < len(run_data) - 1):
350 while ((run_data[i][1] == 2)
and (run_data[i + 1][1] == 2)):
351 if (run_data[i][4] != run_data[i + 1][4]):
353 basf2.B2INFO(f
'Merging run {int(run_data[i + 1][0])} (not enough data) into run {int(run_data[i][0])} ' +
354 '(not enough data).')
355 run_data[i][2].extend(run_data[i + 1][2])
358 run_data[i][1] = self.
machinemachine.result.result
359 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
360 self.
machinemachine.algorithm.algorithm.getResults())
362 self.
machinemachine.algorithm.algorithm.getPayloadValues()
363 result_str = calibration_result_string(run_data[i][1])
364 basf2.B2INFO(f
'Run {int(run_data[i][0])}: {result_str}.')
365 if (i >= len(run_data) - 1):
371 def merge_runs(run_data, run_not_enough_data, run_normal, forced):
372 basf2.B2INFO(f
'Merging run {int(run_data[run_not_enough_data][0])} (not enough data) into run ' +
373 f
'{int(run_data[run_normal][0])} (normal).')
374 run_data[run_normal][2].extend(run_data[run_not_enough_data][2])
377 run_data[run_normal][1] = self.
machinemachine.result.result
378 run_data[run_normal][3] = KLMChannelStatusAlgorithm.Results(
379 self.
machinemachine.algorithm.algorithm.getResults())
380 run_data[run_normal][5] = self.
machinemachine.algorithm.algorithm.getPayloadValues()
381 result_str = calibration_result_string(run_data[run_normal][1])
382 basf2.B2INFO(f
'Run {int(run_data[run_normal][0])}: {result_str}.')
383 if (run_data[run_normal][1] != 0):
384 basf2.B2FATAL(f
'Merging run {int(run_data[run_not_enough_data][0])} into run {int(run_data[run_normal][0])} ' +
386 del run_data[run_not_enough_data]
389 while (i < len(run_data)):
390 if (run_data[i][1] == 2):
391 if (run_data[i][4] ==
'next'):
392 merge_runs(run_data, i, i + 1,
False)
393 elif (run_data[i][4] ==
'previous'):
394 merge_runs(run_data, i, i - 1,
False)
400 while (i < len(run_data)):
401 if (run_data[i][1] == 2
and run_data[i][4] ==
'none'):
402 new_modules_previous = -1
403 new_modules_next = -1
404 if (i < len(run_data) - 1):
405 new_modules_next = run_data[i][3].getModuleStatus(). \
406 newNormalChannels(run_data[i + 1][3].getModuleStatus())
407 basf2.B2INFO(f
'There are {int(new_modules_next)} new active modules in run {int(run_data[i][0])} ' +
408 f
'relatively to run {int(run_data[i + 1][0])}.')
410 new_modules_previous = run_data[i][3].getModuleStatus(). \
411 newNormalChannels(run_data[i - 1][3].getModuleStatus())
412 basf2.B2INFO(f
'There are {int(new_modules_previous)} new active modules in run {int(run_data[i][0])} ' +
413 f
'relatively to run {int(run_data[i - 1][0])}.')
422 if (new_modules_previous >= 0
and new_modules_next < 0):
423 run_for_merging = i - 1
424 elif (new_modules_previous < 0
and new_modules_next >= 0):
425 run_for_merging = i + 1
426 elif (new_modules_previous >= 0
and new_modules_next >= 0):
427 if (new_modules_previous < new_modules_next):
428 run_for_merging = i - 1
430 run_for_merging = i + 1
432 basf2.B2INFO(f
'Cannot determine run for merging for run {int(run_data[i][0])}, performing its forced ' +
435 run_data[i][1] = self.
machinemachine.result.result
436 run_data[i][3] = KLMChannelStatusAlgorithm.Results(
437 self.
machinemachine.algorithm.algorithm.getResults())
438 run_data[i][5] = self.
machinemachine.algorithm.algorithm.getPayloadValues()
439 result_str = calibration_result_string(run_data[i][1])
440 basf2.B2INFO(f
'Run {int(run_data[i][0])}: {result_str}.')
441 if (run_data[i][1] != 0):
442 basf2.B2FATAL(f
'Forced calibration of run {int(run_data[i][0])} failed.')
443 if (run_for_merging >= 0):
444 merge_runs(run_data, i, run_for_merging,
True)
449 def commit_payload(run_data):
450 if (run_data[1] == 2):
451 basf2.B2INFO(f
'Run {int(run_data[0])} has no calibration result, skipped.')
453 basf2.B2INFO(f
'Writing run {int(run_data[0])}.')
454 self.
machinemachine.algorithm.algorithm.commit(run_data[5])
456 def write_result(run_data, run):
457 iov = run_data[run][5].front().iov
458 run_low = iov.getRunLow()
459 run_high = iov.getRunHigh()
462 while (j < len(run_data_klm_excluded)):
463 if (run_low < run_data_klm_excluded[j][0]
and
464 ((run_data_klm_excluded[j][0] < run_high)
or
466 runs.append([run_data_klm_excluded[j][0],
'klm_excluded'])
469 commit_payload(run_data[run])
471 for r
in run_data[run][2]:
472 runs.append([r.run,
'klm_included'])
473 runs.sort(key=
lambda x: x[0])
476 while (run_last < len(runs)):
478 while (runs[run_last][1] == runs[run_first][1]):
480 if (run_last >= len(runs)):
485 run1 = runs[run_first][0]
486 if (run_last < len(runs)):
487 run2 = runs[run_last][0] - 1
492 if (runs[run_first][1] ==
'klm_included'):
493 run_data[run][5].front().iov = iov
494 commit_payload(run_data[run])
496 run_data_klm_excluded[0][5].front().iov = iov
497 commit_payload(run_data_klm_excluded[0])
501 for i
in range(0, len(run_data)):
503 run_data[i][2].sort(key=
lambda x: x.run)
504 first_run = run_data[i][2][0].run
507 run_data[i][5].front().iov = \
510 write_previous_run =
True
512 if (run_data[i][1] == 0
and run_data[i - 1][1] == 0):
513 if (run_data[i][3].getChannelStatus() ==
514 run_data[i - 1][3].getChannelStatus()):
515 basf2.B2INFO(f
'Run {int(run_data[i][0])}: result is the same as for the previous run ' +
516 f
'{int(run_data[i - 1][0])}.')
517 if (previous_run >= 0):
518 iov = run_data[previous_run][5].front().iov
519 run_data[previous_run][5].front().iov = \
521 experiment, iov.getRunLow(),
522 experiment, first_run - 1)
523 write_previous_run =
False
528 if (write_previous_run
and (i > 0)):
529 iov = run_data[previous_run][5].front().iov
530 if (previous_run == 0):
532 lowest_exprun.exp, lowest_exprun.run,
533 experiment, first_run - 1)
536 experiment, first_run - 1)
537 write_result(run_data, previous_run)
542 if (i == len(run_data) - 1):
543 iov = run_data[i][5].front().iov
545 experiment, iov.getRunLow(),
546 highest_exprun.exp, highest_exprun.run)
547 write_result(run_data, i)
A class that describes the interval of experiments/runs for which an object in the database is valid.
def process_experiment(self, experiment, experiment_runs, iteration, lowest_exprun, highest_exprun)
def run(self, iov, iteration, queue)
machine
:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm.
first_execution
Flag for the first execution of this AlgorithmStrategy.
def __init__(self, algorithm)
def execute_over_run_list(self, run_list, iteration, forced_calibration)
queue
The multiprocessing queue used to pass back results one at a time.