Belle II Software development
caf_pxd_gain.py
1
8
9"""
10airflow script for PXD gain calibration.
11"""
12
13import basf2
14from pxd.calibration import gain_calibration
15from prompt.utils import filter_by_max_files_per_run, filter_by_max_events_per_run
16from prompt import CalibrationSettings, INPUT_DATA_FILTERS
17from caf.utils import ExpRun, IoV
18from itertools import groupby
19from itertools import chain
20from math import ceil, inf
21from prompt.calibrations.caf_beamspot import settings as beamspot_calibration
22
23
24settings = CalibrationSettings(name="PXD gain calibration",
25 expert_username="maiko.takahashi",
26 subsystem="pxd",
27 description=__doc__,
28 input_data_formats=["cdst"],
29 input_data_names=["physics"],
30 input_data_filters={
31 "physics": [
32 INPUT_DATA_FILTERS["Data Tag"]["bhabha_all_calib"],
33 INPUT_DATA_FILTERS["Beam Energy"]["4S"],
34 INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
35 INPUT_DATA_FILTERS["Beam Energy"]["Scan"],
36 INPUT_DATA_FILTERS["Beam Energy"][""],
37 INPUT_DATA_FILTERS["Run Type"]["physics"],
38 INPUT_DATA_FILTERS["Data Quality Tag"]["Good"]]},
39 expert_config={
40 "debug": False,
41 "total_jobs": 1000,
42 "gain_method": "analytic",
43 "min_files_per_chunk": 10,
44 "min_events_per_file": 1000, # avoid empty files
45 "max_events_per_run": 4000000,
46 "max_files_per_run": 20, # only valid when max_events/run = 0
47 "payload_boundaries": []
48 },
49 depends_on=[beamspot_calibration],
50 produced_payloads=["PXDGainMapPar"])
51
52
53def get_calibrations(input_data, **kwargs):
54 """
55 Parameters:
56 input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
57 Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
58 assigning to calibration.files_to_iov
59
60 **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
61 backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
62 release explicitly if you want to.
63
64 Currently only kwargs["output_iov"] is used. This is the output IoV range that your payloads should
65 correspond to. Generally your highest ExpRun payload should be open ended e.g. IoV(3,4,-1,-1)
66
67 Returns:
68 list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
69 """
70
71 # Set up config options
72 requested_iov = kwargs.get("requested_iov", None)
73 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
74 # expert config
75 expert_config = kwargs.get("expert_config")
76 gain_method = expert_config["gain_method"]
77 debug = expert_config["debug"]
78 total_jobs = expert_config["total_jobs"]
79 max_events_per_run = expert_config["max_events_per_run"]
80 max_files_per_run = expert_config["max_files_per_run"]
81 min_files_per_chunk = expert_config["min_files_per_chunk"]
82 min_events_per_file = expert_config["min_events_per_file"]
83 cal_kwargs = expert_config.get("kwargs", {})
84
85 # print all config
86 basf2.B2INFO(f"Requested iov: {requested_iov} ")
87 basf2.B2INFO(f"Expert config: {expert_config} ")
88 # basf2.B2INFO(f"Expert sets payload boundaries are: {expert_config['payload_boundaries']} ")
89
90 # Read input_data
91 file_to_iov_physics = input_data["physics"]
92
93 # Reduce data and create calibration instances for different data categories
94 cal_list = []
95 if max_events_per_run < 0:
96 basf2.B2INFO("No file reduction applied.")
97 reduced_file_to_iov_physics = file_to_iov_physics
98 elif max_events_per_run == 0:
99 basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
100 reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics,
101 max_files_per_run, min_events_per_file)
102 else:
103 basf2.B2INFO(f"Reducing to a maximum of {max_events_per_run} events per run.")
104 reduced_file_to_iov_physics = filter_by_max_events_per_run(file_to_iov_physics,
105 max_events_per_run, random_select=True)
106
107 # input_files_physics = list(reduced_file_to_iov_physics.keys())
108 input_iov_set_physics = set(reduced_file_to_iov_physics.values())
109 exp_set = {iov.exp_low for iov in input_iov_set_physics}
110
111 # boundaries setting for run chunks (At certain runs, gain was tuned)
112 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
113 payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
114 # We don't need run 0 for the first exp as it's handled by output_iov
115 payload_boundaries.extend([ExpRun(exp, 0) for exp in sorted(exp_set)[1:]])
116 basf2.B2INFO(f"Final Boundaries: {payload_boundaries}")
117
118 # run chunk creation
119 chunks_head = payload_boundaries
120 chunks_tail = payload_boundaries[1:] + [ExpRun(inf, inf)]
121 iov_chunks = [list(g) for k, g in groupby(sorted(input_iov_set_physics),
122 lambda x: [i for i, j in zip(chunks_head, chunks_tail) if i <= x < j])]
123
124 # Create calibrations from chunks
125 input_file_to_iov = reduced_file_to_iov_physics
126 iCal = 0
127 for ichunk, chunk in enumerate(iov_chunks):
128 first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, -1, -1)
129 last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, -1, -1)
130 if last_iov < output_iov: # All the chunk iovs are earlier than the requested
131 continue
132 else:
133 input_files = list(chain.from_iterable([list(g) for k, g in groupby(
134 input_file_to_iov, lambda x: input_file_to_iov[x] in chunk) if k]))
135 # Check the minimum number of files in the physics/beam run chunk
136 if len(input_files) < min_files_per_chunk:
137 basf2.B2WARNING(f"No enough file in sub run chunk [{chunk[0]},{chunk[-1]}]: {len(input_files)},\
138but {min_files_per_chunk} required!")
139 continue
140 # From the second chunk within the requested range, we have the iov defined by the first run
141 specific_iov = first_iov if iCal > 0 else output_iov
142 basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
143 cal_name = f"{ichunk+1}_PXDAnalyticGainCalibration"
144 if (not debug):
145 cal = gain_calibration(
146 cal_name=cal_name,
147 gain_method=gain_method,
148 # boundaries=vector_from_runs(payload_boundaries),
149 input_files=input_files,
150 **cal_kwargs)
151 for alg in cal.algorithms:
152 alg.params["iov_coverage"] = specific_iov
153 cal_list.append(cal)
154 else:
155 basf2.B2INFO(f"Dry run on Calibration(name={cal_name})")
156 iCal += 1
157
158 # The number of calibrations depends on the 'chunking' above. We would like to make sure that the total number of
159 # batch jobs submitted is approximately constant and reasonable, no matter how many files and chunks are used.
160 # So we define 1000 total jobs and split this between the calibrations depending on the fraction of total input
161 # files in the calibrations.
162
163 # total_jobs = expert_config["total_jobs"]
164 total_input_files = len(reduced_file_to_iov_physics)
165
166 for cal in cal_list:
167 fraction_of_input_files = len(cal.input_files) / total_input_files
168 # Assign the max collector jobs to be roughly the same fraction of total jobs
169 cal.max_collector_jobs = ceil(fraction_of_input_files * total_jobs)
170 basf2.B2INFO(f"{cal.name} will submit a maximum of {cal.max_collector_jobs} batch jobs")
171
172 return cal_list