Belle II Software  release-06-02-00
caf_pxd.py
1 # -*- coding: utf-8 -*-
2 
3 
10 
11 """
12 airflow script for PXD hot/dead pixel masking.
13 """
14 
15 import basf2
16 from pxd.calibration import hot_pixel_mask_calibration
17 from prompt.utils import filter_by_max_files_per_run, filter_by_max_events_per_run
18 from prompt import CalibrationSettings, INPUT_DATA_FILTERS
19 from caf.utils import IoV
20 from itertools import groupby
21 from itertools import chain
22 from math import ceil
23 
24 
25 settings = CalibrationSettings(name="PXD hot/dead pixel calibration",
26  expert_username="qyliu",
27  description=__doc__,
28  input_data_formats=["raw"],
29  input_data_names=["beamorphysics", "cosmic"],
30  input_data_filters={
31  "beamorphysics": [
32  INPUT_DATA_FILTERS["Data Tag"]["bhabha_all_calib"],
33  INPUT_DATA_FILTERS["Data Tag"]["gamma_gamma_calib"],
34  INPUT_DATA_FILTERS["Data Tag"]["hadron_calib"],
35  INPUT_DATA_FILTERS["Data Tag"]["offip_calib"],
36  INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
37  INPUT_DATA_FILTERS["Beam Energy"]["4S"],
38  INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
39  INPUT_DATA_FILTERS["Beam Energy"]["Scan"],
40  INPUT_DATA_FILTERS["Beam Energy"][""],
41  INPUT_DATA_FILTERS["Run Type"]["physics"],
42  INPUT_DATA_FILTERS["Data Quality Tag"]["Good Or Recoverable"]],
43  "cosmic": [INPUT_DATA_FILTERS["Run Type"]["cosmic"]]},
44  expert_config={
45  "max_events_per_run": 400000,
46  "max_files_per_run": 20, # only valid when max_events/run <= 0
47  },
48  depends_on=[])
49 
50 
51 def get_calibrations(input_data, **kwargs):
52  """
53  Parameters:
54  input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
55  Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
56  assigning to calibration.files_to_iov
57 
58  **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
59  backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
60  release explicitly if you want to.
61 
62  Currently only kwargs["output_iov"] is used. This is the output IoV range that your payloads should
63  correspond to. Generally your highest ExpRun payload should be open ended e.g. IoV(3,4,-1,-1)
64 
65  Returns:
66  list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
67  """
68 
69  # Set up config options
70  requested_iov = kwargs.get("requested_iov", None)
71  expert_config = kwargs.get("expert_config")
72  cal_kwargs = expert_config.get("kwargs", {})
73  output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
74  expert_config = kwargs.get("expert_config")
75  max_events_per_run = expert_config["max_events_per_run"]
76  max_files_per_run = expert_config["max_files_per_run"]
77  min_files_per_chunk = 10
78  min_events_per_file = 1000 # avoid empty files
79 
80  # Read input_data
81  file_to_iov_physics = input_data["beamorphysics"]
82  file_to_iov_cosmics = input_data["cosmic"]
83 
84  # Reduce data and create calibration instances for different data categories
85  cal_list = []
86  if max_events_per_run <= 0:
87  basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
88  reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics,
89  max_files_per_run, min_events_per_file)
90  reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics,
91  max_files_per_run, min_events_per_file)
92  else:
93  basf2.B2INFO(f"Reducing to a maximum of {max_events_per_run} events per run.")
94  reduced_file_to_iov_physics = filter_by_max_events_per_run(file_to_iov_physics,
95  max_events_per_run, random_select=True)
96  reduced_file_to_iov_cosmics = filter_by_max_events_per_run(file_to_iov_cosmics,
97  max_events_per_run, random_select=True)
98 
99  # Create run chunks based on exp no. and run type
100  iov_set_physics = set(reduced_file_to_iov_physics.values())
101  iov_set_cosmics = set(reduced_file_to_iov_cosmics.values())
102 
103  iov_list_cosmics = list(sorted(iov_set_cosmics))
104  # iov_list_physics = list(sorted(iov_set_physics))
105  iov_list_all = list(sorted(iov_set_cosmics | iov_set_physics))
106 
107  exp_set = set([iov.exp_low for iov in iov_list_all])
108  chunks_exp = []
109  for exp in sorted(exp_set):
110  chunks_exp += [list(g) for k, g in groupby(iov_list_all, lambda x: x.exp_low == exp) if k]
111 
112  chunks_phy = []
113  chunks_cosmic = []
114  for chunk_exp in chunks_exp:
115  chunks_phy += [list(g) for k, g in groupby(chunk_exp, lambda x: x in iov_list_cosmics) if not k]
116  chunks_cosmic += [list(g) for k, g in groupby(chunk_exp, lambda x: x in iov_list_cosmics) if k]
117 
118  # Create calibrations
119 
120  # Physics or beam run
121  chunk_list = chunks_phy
122  input_data = reduced_file_to_iov_physics
123  iCal = 0
124  for ichunk, chunk in enumerate(chunk_list):
125  first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, -1, -1)
126  last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, -1, -1)
127  if last_iov < output_iov: # All the chunk iovs are earlier than the requested
128  continue
129  else:
130  input_files = list(chain.from_iterable([list(g) for k, g in groupby(
131  input_data, lambda x: input_data[x] in chunk) if k]))
132  # Check the minimum number of files in the physics/beam run chunk
133  if len(input_files) < min_files_per_chunk:
134  continue
135  # From the second chunk within the requested range, we have the iov defined by the first run
136  specific_iov = first_iov if iCal > 0 else output_iov
137  basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
138  cal = hot_pixel_mask_calibration(
139  cal_name="{}_PXDHotPixelMaskCalibration_BeamorPhysics".format(iCal + 1),
140  input_files=input_files,
141  **cal_kwargs)
142  cal.algorithms[0].params = {"iov_coverage": specific_iov}
143  cal_list.append(cal)
144  iCal += 1
145 
146  # Cosmic run
147  nCal_phy = iCal
148  chunk_list = chunks_cosmic
149  input_data = reduced_file_to_iov_cosmics
150  for ichunk, chunk in enumerate(chunk_list):
151  first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, chunk[-1].exp_high, chunk[-1].run_high)
152  last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, chunk[-1].exp_high, chunk[-1].run_high)
153  if last_iov < output_iov: # All the chunk iovs are earlier than the requested
154  continue
155  # From the first chunk within the requested range, we have the iov defined by the first run
156  if iCal == nCal_phy:
157  specific_iov = max(first_iov, IoV(
158  requested_iov.exp_low, requested_iov.run_low, chunk[-1].exp_high, chunk[-1].run_high))
159  else:
160  specific_iov = first_iov
161  input_files = list(chain.from_iterable([list(g) for k, g in groupby(
162  input_data, lambda x: input_data[x] in chunk) if k]))
163  basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
164  cal = hot_pixel_mask_calibration(
165  cal_name="{}_PXDHotPixelMaskCalibration_Cosmic".format(iCal + 1),
166  input_files=input_files,
167  run_type='cosmic',
168  **cal_kwargs)
169  cal.algorithms[0].params = {"iov_coverage": specific_iov} # Not valid when using SimpleRunByRun strategy
170  cal_list.append(cal)
171  iCal += 1
172 
173  # The number of calibrations depends on the 'chunking' above. We would like to make sure that the total number of
174  # batch jobs submitted is approximately constant and reasonable, no matter how many files and chunks are used.
175  # So we define 1000 total jobs and split this between the calibrations depending on the fraction of total input files
176  # in the calibrations.
177 
178  total_jobs = 1000
179  total_input_files = len(reduced_file_to_iov_physics) + len(reduced_file_to_iov_cosmics)
180 
181  for cal in cal_list:
182  fraction_of_input_files = len(cal.input_files)/total_input_files
183  # Assign the max collector jobs to be roughly the same fraction of total jobs
184  cal.max_collector_jobs = ceil(fraction_of_input_files * total_jobs)
185  basf2.B2INFO(f"{cal.name} will submit a maximum of {cal.max_collector_jobs} batch jobs")
186 
187  return cal_list