12 airflow script for PXD gain calibration.
17 from prompt.utils import filter_by_max_files_per_run, filter_by_max_events_per_run
18 from prompt
import CalibrationSettings, INPUT_DATA_FILTERS
19 from caf.utils
import ExpRun, IoV
20 from itertools
import groupby
21 from itertools
import chain
22 from math
import ceil, inf
23 from prompt.calibrations.caf_beamspot
import settings
as beamspot_calibration
26 settings = CalibrationSettings(name=
"PXD gain calibration",
27 expert_username=
"qyliu",
29 input_data_formats=[
"cdst"],
30 input_data_names=[
"physics"],
33 INPUT_DATA_FILTERS[
"Data Tag"][
"bhabha_all_calib"],
34 INPUT_DATA_FILTERS[
"Beam Energy"][
"4S"],
35 INPUT_DATA_FILTERS[
"Beam Energy"][
"Continuum"],
36 INPUT_DATA_FILTERS[
"Beam Energy"][
"Scan"],
37 INPUT_DATA_FILTERS[
"Beam Energy"][
""],
38 INPUT_DATA_FILTERS[
"Run Type"][
"physics"],
39 INPUT_DATA_FILTERS[
"Data Quality Tag"][
"Good"]]},
43 "gain_method":
"analytic",
44 "min_files_per_chunk": 10,
45 "min_events_per_file": 1000,
46 "max_events_per_run": 4000000,
47 "max_files_per_run": 20,
48 "payload_boundaries": []
50 depends_on=[beamspot_calibration])
53 def get_calibrations(input_data, **kwargs):
56 input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
57 Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
58 assigning to calibration.files_to_iov
60 **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
61 backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
62 release explicitly if you want to.
64 Currently only kwargs["output_iov"] is used. This is the output IoV range that your payloads should
65 correspond to. Generally your highest ExpRun payload should be open ended e.g. IoV(3,4,-1,-1)
68 list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
72 requested_iov = kwargs.get(
"requested_iov",
None)
73 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
75 expert_config = kwargs.get(
"expert_config")
76 gain_method = expert_config[
"gain_method"]
77 debug = expert_config[
"debug"]
78 total_jobs = expert_config[
"total_jobs"]
79 max_events_per_run = expert_config[
"max_events_per_run"]
80 max_files_per_run = expert_config[
"max_files_per_run"]
81 min_files_per_chunk = expert_config[
"min_files_per_chunk"]
82 min_events_per_file = expert_config[
"min_events_per_file"]
83 cal_kwargs = expert_config.get(
"kwargs", {})
86 basf2.B2INFO(f
"Requested iov: {requested_iov} ")
87 basf2.B2INFO(f
"Expert config: {expert_config} ")
91 file_to_iov_physics = input_data[
"physics"]
95 if max_events_per_run < 0:
96 basf2.B2INFO(
"No file reduction applied.")
97 reduced_file_to_iov_physics = file_to_iov_physics
98 elif max_events_per_run == 0:
99 basf2.B2INFO(f
"Reducing to a maximum of {max_files_per_run} files per run.")
100 reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics,
101 max_files_per_run, min_events_per_file)
103 basf2.B2INFO(f
"Reducing to a maximum of {max_events_per_run} events per run.")
104 reduced_file_to_iov_physics = filter_by_max_events_per_run(file_to_iov_physics,
105 max_events_per_run, random_select=
True)
108 input_iov_set_physics = set(reduced_file_to_iov_physics.values())
109 exp_set = set([iov.exp_low
for iov
in input_iov_set_physics])
112 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
113 payload_boundaries.extend([ExpRun(*boundary)
for boundary
in expert_config[
"payload_boundaries"]])
115 payload_boundaries.extend([ExpRun(exp, 0)
for exp
in sorted(exp_set)[1:]])
116 basf2.B2INFO(f
"Final Boundaries: {payload_boundaries}")
119 chunks_head = payload_boundaries
120 chunks_tail = payload_boundaries[1:] + [ExpRun(inf, inf)]
121 iov_chunks = [list(g)
for k, g
in groupby(sorted(input_iov_set_physics),
122 lambda x: [i
for i, j
in zip(chunks_head, chunks_tail)
if i <= x < j])]
125 input_file_to_iov = reduced_file_to_iov_physics
127 for ichunk, chunk
in enumerate(iov_chunks):
128 first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, -1, -1)
129 last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, -1, -1)
130 if last_iov < output_iov:
133 input_files = list(chain.from_iterable([list(g)
for k, g
in groupby(
134 input_file_to_iov,
lambda x: input_file_to_iov[x]
in chunk)
if k]))
136 if len(input_files) < min_files_per_chunk:
137 basf2.B2WARNING(f
"No enough file in sub run chunk [{chunk[0]},{chunk[-1]}]: {len(input_files)},\
138 but {min_files_per_chunk} required!")
141 specific_iov = first_iov
if iCal > 0
else output_iov
142 basf2.B2INFO(f
"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
143 cal_name = f
"{ichunk+1}_PXDAnalyticGainCalibration"
145 cal = gain_calibration(
147 gain_method=gain_method,
149 input_files=input_files,
151 for alg
in cal.algorithms:
152 alg.params[
"iov_coverage"] = specific_iov
155 basf2.B2INFO(f
"Dry run on Calibration(name={cal_name})")
164 total_input_files = len(reduced_file_to_iov_physics)
167 fraction_of_input_files = len(cal.input_files) / total_input_files
169 cal.max_collector_jobs = ceil(fraction_of_input_files * total_jobs)
170 basf2.B2INFO(f
"{cal.name} will submit a maximum of {cal.max_collector_jobs} batch jobs")