12 airflow script for PXD hot/dead pixel masking.
17 from prompt.utils import filter_by_max_files_per_run, filter_by_max_events_per_run
18 from prompt
import CalibrationSettings, INPUT_DATA_FILTERS
19 from caf.utils
import IoV
20 from itertools
import groupby
21 from itertools
import chain
25 settings = CalibrationSettings(name=
"PXD hot/dead pixel calibration",
26 expert_username=
"qyliu",
28 input_data_formats=[
"raw"],
29 input_data_names=[
"beamorphysics",
"cosmic"],
32 INPUT_DATA_FILTERS[
"Data Tag"][
"bhabha_all_calib"],
33 INPUT_DATA_FILTERS[
"Data Tag"][
"gamma_gamma_calib"],
34 INPUT_DATA_FILTERS[
"Data Tag"][
"hadron_calib"],
35 INPUT_DATA_FILTERS[
"Data Tag"][
"offip_calib"],
36 INPUT_DATA_FILTERS[
"Data Tag"][
"cosmic_calib"],
37 INPUT_DATA_FILTERS[
"Beam Energy"][
"4S"],
38 INPUT_DATA_FILTERS[
"Beam Energy"][
"Continuum"],
39 INPUT_DATA_FILTERS[
"Beam Energy"][
"Scan"],
40 INPUT_DATA_FILTERS[
"Beam Energy"][
""],
41 INPUT_DATA_FILTERS[
"Run Type"][
"physics"],
42 INPUT_DATA_FILTERS[
"Data Quality Tag"][
"Good Or Recoverable"]],
43 "cosmic": [INPUT_DATA_FILTERS[
"Run Type"][
"cosmic"]]},
45 "max_events_per_run": 400000,
46 "max_files_per_run": 20,
51 def get_calibrations(input_data, **kwargs):
54 input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
55 Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
56 assigning to calibration.files_to_iov
58 **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
59 backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
60 release explicitly if you want to.
62 Currently only kwargs["output_iov"] is used. This is the output IoV range that your payloads should
63 correspond to. Generally your highest ExpRun payload should be open ended e.g. IoV(3,4,-1,-1)
66 list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
70 requested_iov = kwargs.get(
"requested_iov",
None)
71 expert_config = kwargs.get(
"expert_config")
72 cal_kwargs = expert_config.get(
"kwargs", {})
73 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
74 expert_config = kwargs.get(
"expert_config")
75 max_events_per_run = expert_config[
"max_events_per_run"]
76 max_files_per_run = expert_config[
"max_files_per_run"]
77 min_files_per_chunk = 10
78 min_events_per_file = 1000
81 file_to_iov_physics = input_data[
"beamorphysics"]
82 file_to_iov_cosmics = input_data[
"cosmic"]
86 if max_events_per_run <= 0:
87 basf2.B2INFO(f
"Reducing to a maximum of {max_files_per_run} files per run.")
88 reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics,
89 max_files_per_run, min_events_per_file)
90 reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics,
91 max_files_per_run, min_events_per_file)
93 basf2.B2INFO(f
"Reducing to a maximum of {max_events_per_run} events per run.")
94 reduced_file_to_iov_physics = filter_by_max_events_per_run(file_to_iov_physics,
95 max_events_per_run, random_select=
True)
96 reduced_file_to_iov_cosmics = filter_by_max_events_per_run(file_to_iov_cosmics,
97 max_events_per_run, random_select=
True)
100 iov_set_physics = set(reduced_file_to_iov_physics.values())
101 iov_set_cosmics = set(reduced_file_to_iov_cosmics.values())
103 iov_list_cosmics = list(sorted(iov_set_cosmics))
105 iov_list_all = list(sorted(iov_set_cosmics | iov_set_physics))
107 exp_set = set([iov.exp_low
for iov
in iov_list_all])
109 for exp
in sorted(exp_set):
110 chunks_exp += [list(g)
for k, g
in groupby(iov_list_all,
lambda x: x.exp_low == exp)
if k]
114 for chunk_exp
in chunks_exp:
115 chunks_phy += [list(g)
for k, g
in groupby(chunk_exp,
lambda x: x
in iov_list_cosmics)
if not k]
116 chunks_cosmic += [list(g)
for k, g
in groupby(chunk_exp,
lambda x: x
in iov_list_cosmics)
if k]
121 chunk_list = chunks_phy
122 input_data = reduced_file_to_iov_physics
124 for ichunk, chunk
in enumerate(chunk_list):
125 first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, -1, -1)
126 last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, -1, -1)
127 if last_iov < output_iov:
130 input_files = list(chain.from_iterable([list(g)
for k, g
in groupby(
131 input_data,
lambda x: input_data[x]
in chunk)
if k]))
133 if len(input_files) < min_files_per_chunk:
136 specific_iov = first_iov
if iCal > 0
else output_iov
137 basf2.B2INFO(f
"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
138 cal = hot_pixel_mask_calibration(
139 cal_name=
"{}_PXDHotPixelMaskCalibration_BeamorPhysics".format(iCal + 1),
140 input_files=input_files,
142 cal.algorithms[0].params = {
"iov_coverage": specific_iov}
148 chunk_list = chunks_cosmic
149 input_data = reduced_file_to_iov_cosmics
150 for ichunk, chunk
in enumerate(chunk_list):
151 first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, chunk[-1].exp_high, chunk[-1].run_high)
152 last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, chunk[-1].exp_high, chunk[-1].run_high)
153 if last_iov < output_iov:
157 specific_iov = max(first_iov, IoV(
158 requested_iov.exp_low, requested_iov.run_low, chunk[-1].exp_high, chunk[-1].run_high))
160 specific_iov = first_iov
161 input_files = list(chain.from_iterable([list(g)
for k, g
in groupby(
162 input_data,
lambda x: input_data[x]
in chunk)
if k]))
163 basf2.B2INFO(f
"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
164 cal = hot_pixel_mask_calibration(
165 cal_name=
"{}_PXDHotPixelMaskCalibration_Cosmic".format(iCal + 1),
166 input_files=input_files,
169 cal.algorithms[0].params = {
"iov_coverage": specific_iov}
179 total_input_files = len(reduced_file_to_iov_physics) + len(reduced_file_to_iov_cosmics)
182 fraction_of_input_files = len(cal.input_files)/total_input_files
184 cal.max_collector_jobs = ceil(fraction_of_input_files * total_jobs)
185 basf2.B2INFO(f
"{cal.name} will submit a maximum of {cal.max_collector_jobs} batch jobs")