Belle II Software  release-05-01-25
caf_pxd.py
1 # -*- coding: utf-8 -*-
2 
3 """
4 airflow script for PXD hot/dead pixel masking.
5 Author: qingyuan.liu@desy.de
6 """
7 
8 import basf2
9 from pxd.calibration import hot_pixel_mask_calibration
10 from prompt.utils import filter_by_max_files_per_run
11 from prompt import CalibrationSettings
12 from caf.utils import IoV
13 from itertools import groupby
14 from itertools import chain
15 from math import ceil
16 
17 
18 settings = CalibrationSettings(name="PXD hot/dead pixel calibration",
19  expert_username="qyliu",
20  description=__doc__,
21  input_data_formats=["raw"],
22  input_data_names=["beamorphysics", "cosmic"],
23  depends_on=[])
24 
25 
26 def get_calibrations(input_data, **kwargs):
27  """
28  Parameters:
29  input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
30  Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
31  assigning to calibration.files_to_iov
32 
33  **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
34  backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
35  release explicitly if you want to.
36 
37  Currently only kwargs["output_iov"] is used. This is the output IoV range that your payloads should
38  correspond to. Generally your highest ExpRun payload should be open ended e.g. IoV(3,4,-1,-1)
39 
40  Returns:
41  list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
42  """
43 
44  # Set up config options
45  requested_iov = kwargs.get("requested_iov", None)
46  expert_config = kwargs.get("expert_config")
47  cal_kwargs = expert_config.get("kwargs", {})
48  output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
49  max_files_per_run = 20
50  min_files_per_chunk = 10
51  min_events_per_file = 1000 # avoid empty files
52 
53  # Read input_data
54  file_to_iov_physics = input_data["beamorphysics"]
55  file_to_iov_cosmics = input_data["cosmic"]
56 
57  # Reduce data and create calibration instances for different data categories
58  cal_list = []
59  reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics, max_files_per_run, min_events_per_file)
60  reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics, max_files_per_run, min_events_per_file)
61 
62  # Create run chunks based on exp no. and run type
63  iov_set_physics = set(reduced_file_to_iov_physics.values())
64  iov_set_cosmics = set(reduced_file_to_iov_cosmics.values())
65 
66  iov_list_cosmics = list(sorted(iov_set_cosmics))
67  # iov_list_physics = list(sorted(iov_set_physics))
68  iov_list_all = list(sorted(iov_set_cosmics | iov_set_physics))
69 
70  exp_set = set([iov.exp_low for iov in iov_list_all])
71  chunks_exp = []
72  for exp in sorted(exp_set):
73  chunks_exp += [list(g) for k, g in groupby(iov_list_all, lambda x: x.exp_low == exp) if k]
74 
75  chunks_phy = []
76  chunks_cosmic = []
77  for chunk_exp in chunks_exp:
78  chunks_phy += [list(g) for k, g in groupby(chunk_exp, lambda x: x in iov_list_cosmics) if not k]
79  chunks_cosmic += [list(g) for k, g in groupby(chunk_exp, lambda x: x in iov_list_cosmics) if k]
80 
81  # Create calibrations
82 
83  # Physics or beam run
84  chunk_list = chunks_phy
85  input_data = reduced_file_to_iov_physics
86  iCal = 0
87  for ichunk, chunk in enumerate(chunk_list):
88  first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, -1, -1)
89  last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, -1, -1)
90  if last_iov < output_iov: # All the chunk iovs are earlier than the requested
91  continue
92  else:
93  input_files = list(chain.from_iterable([list(g) for k, g in groupby(
94  input_data, lambda x: input_data[x] in chunk) if k]))
95  # Check the minimum number of files in the physics/beam run chunk
96  if len(input_files) < min_files_per_chunk:
97  continue
98  # From the second chunk within the requested range, we have the iov defined by the first run
99  specific_iov = first_iov if iCal > 0 else output_iov
100  basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
101  cal = hot_pixel_mask_calibration(
102  cal_name="{}_PXDHotPixelMaskCalibration_BeamorPhysics".format(iCal + 1),
103  input_files=input_files,
104  **cal_kwargs)
105  cal.algorithms[0].params = {"iov_coverage": specific_iov}
106  cal_list.append(cal)
107  iCal += 1
108 
109  # Cosmic run
110  nCal_phy = iCal
111  chunk_list = chunks_cosmic
112  input_data = reduced_file_to_iov_cosmics
113  for ichunk, chunk in enumerate(chunk_list):
114  first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, chunk[-1].exp_high, chunk[-1].run_high)
115  last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, chunk[-1].exp_high, chunk[-1].run_high)
116  if last_iov < output_iov: # All the chunk iovs are earlier than the requested
117  continue
118  # From the first chunk within the requested range, we have the iov defined by the first run
119  if iCal == nCal_phy:
120  specific_iov = max(first_iov, IoV(
121  requested_iov.exp_low, requested_iov.run_low, chunk[-1].exp_high, chunk[-1].run_high))
122  else:
123  specific_iov = first_iov
124  input_files = list(chain.from_iterable([list(g) for k, g in groupby(
125  input_data, lambda x: input_data[x] in chunk) if k]))
126  basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
127  cal = hot_pixel_mask_calibration(
128  cal_name="{}_PXDHotPixelMaskCalibration_Cosmic".format(iCal + 1),
129  input_files=input_files,
130  run_type='cosmic',
131  **cal_kwargs)
132  cal.algorithms[0].params = {"iov_coverage": specific_iov} # Not valid when using SimpleRunByRun strategy
133  cal_list.append(cal)
134  iCal += 1
135 
136  # The number of calibrations depends on the 'chunking' above. We would like to make sure that the total number of
137  # batch jobs submitted is approximately constant and reasonable, no matter how many files and chunks are used.
138  # So we define 1000 total jobs and split this between the calibrations depending on the fraction of total input files
139  # in the calibrations.
140 
141  total_jobs = 1000
142  total_input_files = len(reduced_file_to_iov_physics) + len(reduced_file_to_iov_cosmics)
143 
144  for cal in cal_list:
145  fraction_of_input_files = len(cal.input_files)/total_input_files
146  # Assign the max collector jobs to be roughly the same fraction of total jobs
147  cal.max_collector_jobs = ceil(fraction_of_input_files * total_jobs)
148  basf2.B2INFO(f"{cal.name} will submit a maximum of {cal.max_collector_jobs} batch jobs")
149 
150  return cal_list
pxd.calibration
Definition: calibration.py:1
prompt.utils
Definition: utils.py:1