Belle II Software development
caf_pxd.py
1
8
9"""
10airflow script for PXD hot/dead pixel masking.
11"""
12
13import basf2
14from pxd.calibration import hot_pixel_mask_calibration
15from prompt.utils import filter_by_max_files_per_run, filter_by_max_events_per_run
16from prompt import CalibrationSettings, INPUT_DATA_FILTERS
17from caf.utils import IoV
18from itertools import groupby
19from itertools import chain
20from math import ceil
21
22
23settings = CalibrationSettings(name="PXD hot/dead pixel calibration",
24 expert_username="takaham",
25 description=__doc__,
26 input_data_formats=["raw"],
27 input_data_names=["beamorphysics", "cosmic"],
28 input_data_filters={
29 "beamorphysics": [
30 INPUT_DATA_FILTERS["Data Tag"]["bhabha_all_calib"],
31 INPUT_DATA_FILTERS["Data Tag"]["gamma_gamma_calib"],
32 INPUT_DATA_FILTERS["Data Tag"]["hadron_calib"],
33 INPUT_DATA_FILTERS["Data Tag"]["offip_calib"],
34 INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
35 INPUT_DATA_FILTERS["Beam Energy"]["4S"],
36 INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
37 INPUT_DATA_FILTERS["Beam Energy"]["Scan"],
38 INPUT_DATA_FILTERS["Beam Energy"][""],
39 INPUT_DATA_FILTERS["Run Type"]["physics"],
40 INPUT_DATA_FILTERS["Data Quality Tag"]["Good Or Recoverable"]],
41 "cosmic": [INPUT_DATA_FILTERS["Run Type"]["cosmic"]]},
42 expert_config={
43 "max_events_per_run": 400000,
44 "max_files_per_run": 20, # only valid when max_events/run <= 0
45 },
46 depends_on=[])
47
48
49def get_calibrations(input_data, **kwargs):
50 """
51 Parameters:
52 input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
53 Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
54 assigning to calibration.files_to_iov
55
56 **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
57 backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
58 release explicitly if you want to.
59
60 Currently only kwargs["output_iov"] is used. This is the output IoV range that your payloads should
61 correspond to. Generally your highest ExpRun payload should be open ended e.g. IoV(3,4,-1,-1)
62
63 Returns:
64 list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
65 """
66
67 # Set up config options
68 requested_iov = kwargs.get("requested_iov", None)
69 expert_config = kwargs.get("expert_config")
70 cal_kwargs = expert_config.get("kwargs", {})
71 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
72 expert_config = kwargs.get("expert_config")
73 max_events_per_run = expert_config["max_events_per_run"]
74 max_files_per_run = expert_config["max_files_per_run"]
75 min_files_per_chunk = 10
76 min_events_per_file = 1000 # avoid empty files
77
78 # Read input_data
79 file_to_iov_physics = input_data["beamorphysics"]
80 file_to_iov_cosmics = input_data["cosmic"]
81
82 # Reduce data and create calibration instances for different data categories
83 cal_list = []
84 if max_events_per_run <= 0:
85 basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
86 reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics,
87 max_files_per_run, min_events_per_file)
88 reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics,
89 max_files_per_run, min_events_per_file)
90 else:
91 basf2.B2INFO(f"Reducing to a maximum of {max_events_per_run} events per run.")
92 reduced_file_to_iov_physics = filter_by_max_events_per_run(file_to_iov_physics,
93 max_events_per_run, random_select=True)
94 reduced_file_to_iov_cosmics = filter_by_max_events_per_run(file_to_iov_cosmics,
95 max_events_per_run, random_select=True)
96
97 # Create run chunks based on exp no. and run type
98 iov_set_physics = set(reduced_file_to_iov_physics.values())
99 iov_set_cosmics = set(reduced_file_to_iov_cosmics.values())
100
101 iov_list_cosmics = list(sorted(iov_set_cosmics))
102 # iov_list_physics = list(sorted(iov_set_physics))
103 iov_list_all = list(sorted(iov_set_cosmics | iov_set_physics))
104
105 exp_set = {iov.exp_low for iov in iov_list_all}
106 chunks_exp = []
107 for exp in sorted(exp_set):
108 chunks_exp += [list(g) for k, g in groupby(iov_list_all, lambda x: x.exp_low == exp) if k]
109
110 chunks_phy = []
111 chunks_cosmic = []
112 for chunk_exp in chunks_exp:
113 chunks_phy += [list(g) for k, g in groupby(chunk_exp, lambda x: x in iov_list_cosmics) if not k]
114 chunks_cosmic += [list(g) for k, g in groupby(chunk_exp, lambda x: x in iov_list_cosmics) if k]
115
116 # Create calibrations
117
118 # Physics or beam run
119 chunk_list = chunks_phy
120 input_data = reduced_file_to_iov_physics
121 iCal = 0
122 for ichunk, chunk in enumerate(chunk_list):
123 first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, -1, -1)
124 last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, -1, -1)
125 if last_iov < output_iov: # All the chunk iovs are earlier than the requested
126 continue
127 else:
128 input_files = list(chain.from_iterable([list(g) for k, g in groupby(
129 input_data, lambda x: input_data[x] in chunk) if k]))
130 # Check the minimum number of files in the physics/beam run chunk
131 if len(input_files) < min_files_per_chunk:
132 continue
133 # From the second chunk within the requested range, we have the iov defined by the first run
134 specific_iov = first_iov if iCal > 0 else output_iov
135 basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
136 cal = hot_pixel_mask_calibration(
137 cal_name=f"{iCal + 1}_PXDHotPixelMaskCalibration_BeamorPhysics",
138 input_files=input_files,
139 **cal_kwargs)
140 cal.algorithms[0].params = {"iov_coverage": specific_iov}
141 cal_list.append(cal)
142 iCal += 1
143
144 # Cosmic run
145 nCal_phy = iCal
146 chunk_list = chunks_cosmic
147 input_data = reduced_file_to_iov_cosmics
148 for ichunk, chunk in enumerate(chunk_list):
149 first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, chunk[-1].exp_high, chunk[-1].run_high)
150 last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, chunk[-1].exp_high, chunk[-1].run_high)
151 if last_iov < output_iov: # All the chunk iovs are earlier than the requested
152 continue
153 # From the first chunk within the requested range, we have the iov defined by the first run
154 if iCal == nCal_phy:
155 specific_iov = max(first_iov, IoV(
156 requested_iov.exp_low, requested_iov.run_low, chunk[-1].exp_high, chunk[-1].run_high))
157 else:
158 specific_iov = first_iov
159 input_files = list(chain.from_iterable([list(g) for k, g in groupby(
160 input_data, lambda x: input_data[x] in chunk) if k]))
161 basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
162 cal = hot_pixel_mask_calibration(
163 cal_name=f"{iCal + 1}_PXDHotPixelMaskCalibration_Cosmic",
164 input_files=input_files,
165 run_type='cosmic',
166 **cal_kwargs)
167 cal.algorithms[0].params = {"iov_coverage": specific_iov} # Not valid when using SimpleRunByRun strategy
168 cal_list.append(cal)
169 iCal += 1
170
171 # The number of calibrations depends on the 'chunking' above. We would like to make sure that the total number of
172 # batch jobs submitted is approximately constant and reasonable, no matter how many files and chunks are used.
173 # So we define 1000 total jobs and split this between the calibrations depending on the fraction of total input files
174 # in the calibrations.
175
176 total_jobs = 1000
177 total_input_files = len(reduced_file_to_iov_physics) + len(reduced_file_to_iov_cosmics)
178
179 for cal in cal_list:
180 fraction_of_input_files = len(cal.input_files)/total_input_files
181 # Assign the max collector jobs to be roughly the same fraction of total jobs
182 cal.max_collector_jobs = ceil(fraction_of_input_files * total_jobs)
183 basf2.B2INFO(f"{cal.name} will submit a maximum of {cal.max_collector_jobs} batch jobs")
184
185 return cal_list