Belle II Software development
caf_pxd.py
1
8
9"""
10airflow script for PXD hot/dead pixel masking.
11"""
12
13import basf2
14from pxd.calibration import hot_pixel_mask_calibration
15from prompt.utils import filter_by_max_files_per_run, filter_by_max_events_per_run
16from prompt import CalibrationSettings, INPUT_DATA_FILTERS
17from caf.utils import IoV
18from itertools import groupby
19from itertools import chain
20from math import ceil
21
22
23settings = CalibrationSettings(name="PXD hot/dead pixel calibration",
24 expert_username="maiko.takahashi",
25 subsystem="pxd",
26 description=__doc__,
27 input_data_formats=["raw"],
28 input_data_names=["beamorphysics", "cosmic"],
29 input_data_filters={
30 "beamorphysics": [
31 INPUT_DATA_FILTERS["Data Tag"]["bhabha_all_calib"],
32 INPUT_DATA_FILTERS["Data Tag"]["gamma_gamma_calib"],
33 INPUT_DATA_FILTERS["Data Tag"]["hadron_calib"],
34 INPUT_DATA_FILTERS["Data Tag"]["offip_calib"],
35 INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
36 INPUT_DATA_FILTERS["Beam Energy"]["4S"],
37 INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
38 INPUT_DATA_FILTERS["Beam Energy"]["Scan"],
39 INPUT_DATA_FILTERS["Beam Energy"][""],
40 INPUT_DATA_FILTERS["Run Type"]["physics"],
41 INPUT_DATA_FILTERS["Data Quality Tag"]["Good Or Recoverable"]],
42 "cosmic": [INPUT_DATA_FILTERS["Run Type"]["cosmic"]]},
43 expert_config={
44 "max_events_per_run": 400000,
45 "max_files_per_run": 20, # only valid when max_events/run <= 0
46 },
47 depends_on=[],
48 produced_payloads=["PXDDeadPixelPar", "PXDMaskedPixelPar"])
49
50
51def get_calibrations(input_data, **kwargs):
52 """
53 Parameters:
54 input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
55 Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
56 assigning to calibration.files_to_iov
57
58 **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
59 backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
60 release explicitly if you want to.
61
62 Currently only kwargs["output_iov"] is used. This is the output IoV range that your payloads should
63 correspond to. Generally your highest ExpRun payload should be open ended e.g. IoV(3,4,-1,-1)
64
65 Returns:
66 list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
67 """
68
69 # Set up config options
70 requested_iov = kwargs.get("requested_iov", None)
71 expert_config = kwargs.get("expert_config")
72 cal_kwargs = expert_config.get("kwargs", {})
73 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
74 expert_config = kwargs.get("expert_config")
75 max_events_per_run = expert_config["max_events_per_run"]
76 max_files_per_run = expert_config["max_files_per_run"]
77 min_files_per_chunk = 10
78 min_events_per_file = 1000 # avoid empty files
79
80 # Read input_data
81 file_to_iov_physics = input_data["beamorphysics"]
82 file_to_iov_cosmics = input_data["cosmic"]
83
84 # Reduce data and create calibration instances for different data categories
85 cal_list = []
86 if max_events_per_run <= 0:
87 basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
88 reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics,
89 max_files_per_run, min_events_per_file)
90 reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics,
91 max_files_per_run, min_events_per_file)
92 else:
93 basf2.B2INFO(f"Reducing to a maximum of {max_events_per_run} events per run.")
94 reduced_file_to_iov_physics = filter_by_max_events_per_run(file_to_iov_physics,
95 max_events_per_run, random_select=True)
96 reduced_file_to_iov_cosmics = filter_by_max_events_per_run(file_to_iov_cosmics,
97 max_events_per_run, random_select=True)
98
99 # Create run chunks based on exp no. and run type
100 iov_set_physics = set(reduced_file_to_iov_physics.values())
101 iov_set_cosmics = set(reduced_file_to_iov_cosmics.values())
102
103 iov_list_cosmics = list(sorted(iov_set_cosmics))
104 # iov_list_physics = list(sorted(iov_set_physics))
105 iov_list_all = list(sorted(iov_set_cosmics | iov_set_physics))
106
107 exp_set = {iov.exp_low for iov in iov_list_all}
108 chunks_exp = []
109 for exp in sorted(exp_set):
110 chunks_exp += [list(g) for k, g in groupby(iov_list_all, lambda x: x.exp_low == exp) if k]
111
112 chunks_phy = []
113 chunks_cosmic = []
114 for chunk_exp in chunks_exp:
115 chunks_phy += [list(g) for k, g in groupby(chunk_exp, lambda x: x in iov_list_cosmics) if not k]
116 chunks_cosmic += [list(g) for k, g in groupby(chunk_exp, lambda x: x in iov_list_cosmics) if k]
117
118 # Create calibrations
119
120 # Physics or beam run
121 chunk_list = chunks_phy
122 input_data = reduced_file_to_iov_physics
123 iCal = 0
124 for ichunk, chunk in enumerate(chunk_list):
125 first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, -1, -1)
126 last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, -1, -1)
127 if last_iov < output_iov: # All the chunk iovs are earlier than the requested
128 continue
129 else:
130 input_files = list(chain.from_iterable([list(g) for k, g in groupby(
131 input_data, lambda x: input_data[x] in chunk) if k]))
132 # Check the minimum number of files in the physics/beam run chunk
133 if len(input_files) < min_files_per_chunk:
134 continue
135 # From the second chunk within the requested range, we have the iov defined by the first run
136 specific_iov = first_iov if iCal > 0 else output_iov
137 basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
138 cal = hot_pixel_mask_calibration(
139 cal_name=f"{iCal + 1}_PXDHotPixelMaskCalibration_BeamorPhysics",
140 input_files=input_files,
141 **cal_kwargs)
142 cal.algorithms[0].params = {"iov_coverage": specific_iov}
143 cal_list.append(cal)
144 iCal += 1
145
146 # Cosmic run
147 nCal_phy = iCal
148 chunk_list = chunks_cosmic
149 input_data = reduced_file_to_iov_cosmics
150 for ichunk, chunk in enumerate(chunk_list):
151 first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, chunk[-1].exp_high, chunk[-1].run_high)
152 last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, chunk[-1].exp_high, chunk[-1].run_high)
153 if last_iov < output_iov: # All the chunk iovs are earlier than the requested
154 continue
155 # From the first chunk within the requested range, we have the iov defined by the first run
156 if iCal == nCal_phy:
157 specific_iov = max(first_iov, IoV(
158 requested_iov.exp_low, requested_iov.run_low, chunk[-1].exp_high, chunk[-1].run_high))
159 else:
160 specific_iov = first_iov
161 input_files = list(chain.from_iterable([list(g) for k, g in groupby(
162 input_data, lambda x: input_data[x] in chunk) if k]))
163 basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
164 cal = hot_pixel_mask_calibration(
165 cal_name=f"{iCal + 1}_PXDHotPixelMaskCalibration_Cosmic",
166 input_files=input_files,
167 run_type='cosmic',
168 **cal_kwargs)
169 cal.algorithms[0].params = {"iov_coverage": specific_iov} # Not valid when using SimpleRunByRun strategy
170 cal_list.append(cal)
171 iCal += 1
172
173 # The number of calibrations depends on the 'chunking' above. We would like to make sure that the total number of
174 # batch jobs submitted is approximately constant and reasonable, no matter how many files and chunks are used.
175 # So we define 1000 total jobs and split this between the calibrations depending on the fraction of total input files
176 # in the calibrations.
177
178 total_jobs = 1000
179 total_input_files = len(reduced_file_to_iov_physics) + len(reduced_file_to_iov_cosmics)
180
181 for cal in cal_list:
182 fraction_of_input_files = len(cal.input_files)/total_input_files
183 # Assign the max collector jobs to be roughly the same fraction of total jobs
184 cal.max_collector_jobs = ceil(fraction_of_input_files * total_jobs)
185 basf2.B2INFO(f"{cal.name} will submit a maximum of {cal.max_collector_jobs} batch jobs")
186
187 return cal_list