Belle II Software  release-05-02-19
caf_pxd_gain.py
1 # -*- coding: utf-8 -*-
2 
3 """
4 airflow script for PXD gain calibration.
5 Author: qingyuan.liu@desy.de
6 """
7 
8 import basf2
9 from pxd.calibration import gain_calibration
10 from prompt.utils import filter_by_max_files_per_run, filter_by_max_events_per_run
11 from prompt import CalibrationSettings, input_data_filters
12 from caf.utils import ExpRun, IoV
13 from itertools import groupby
14 from itertools import chain
15 from math import ceil, inf
16 from prompt.calibrations.caf_beamspot import settings as beamspot_calibration
17 
18 
19 settings = CalibrationSettings(name="PXD gain calibration",
20  expert_username="qyliu",
21  description=__doc__,
22  input_data_formats=["cdst"],
23  input_data_names=["physics"],
24  input_data_filters={
25  "physics": [
26  input_data_filters["Data Tag"]["bhabha_all_calib"],
27  input_data_filters["Beam Energy"]["4S"],
28  input_data_filters["Beam Energy"]["Continuum"],
29  input_data_filters["Beam Energy"]["Scan"],
30  input_data_filters["Run Type"]["physics"],
31  input_data_filters["Data Quality Tag"]["Good"]]},
32  expert_config={
33  "debug": False,
34  "total_jobs": 1000,
35  "gain_method": "analytic",
36  "min_files_per_chunk": 10,
37  "min_events_per_file": 1000, # avoid empty files
38  "max_events_per_run": 4000000,
39  "max_files_per_run": 20, # only valid when max_events/run = 0
40  "payload_boundaries": []
41  },
42  depends_on=[beamspot_calibration])
43 
44 
45 def get_calibrations(input_data, **kwargs):
46  """
47  Parameters:
48  input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
49  Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
50  assigning to calibration.files_to_iov
51 
52  **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
53  backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
54  release explicitly if you want to.
55 
56  Currently only kwargs["output_iov"] is used. This is the output IoV range that your payloads should
57  correspond to. Generally your highest ExpRun payload should be open ended e.g. IoV(3,4,-1,-1)
58 
59  Returns:
60  list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
61  """
62 
63  # Set up config options
64  requested_iov = kwargs.get("requested_iov", None)
65  output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
66  # expert config
67  expert_config = kwargs.get("expert_config")
68  gain_method = expert_config["gain_method"]
69  debug = expert_config["debug"]
70  total_jobs = expert_config["total_jobs"]
71  max_events_per_run = expert_config["max_events_per_run"]
72  max_files_per_run = expert_config["max_files_per_run"]
73  min_files_per_chunk = expert_config["min_files_per_chunk"]
74  min_events_per_file = expert_config["min_events_per_file"]
75  cal_kwargs = expert_config.get("kwargs", {})
76 
77  # print all config
78  basf2.B2INFO(f"Requested iov: {requested_iov} ")
79  basf2.B2INFO(f"Expert config: {expert_config} ")
80  # basf2.B2INFO(f"Expert sets payload boundaries are: {expert_config['payload_boundaries']} ")
81 
82  # Read input_data
83  file_to_iov_physics = input_data["physics"]
84 
85  # Reduce data and create calibration instances for different data categories
86  cal_list = []
87  if max_events_per_run < 0:
88  basf2.B2INFO("No file reduction applied.")
89  reduced_file_to_iov_physics = file_to_iov_physics
90  elif max_events_per_run == 0:
91  basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
92  reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics,
93  max_files_per_run, min_events_per_file)
94  else:
95  basf2.B2INFO(f"Reducing to a maximum of {max_events_per_run} events per run.")
96  reduced_file_to_iov_physics = filter_by_max_events_per_run(file_to_iov_physics,
97  max_events_per_run, random_select=True)
98 
99  # input_files_physics = list(reduced_file_to_iov_physics.keys())
100  input_iov_set_physics = set(reduced_file_to_iov_physics.values())
101  exp_set = set([iov.exp_low for iov in input_iov_set_physics])
102 
103  # boundaries setting for run chunks (At certain runs, gain was tuned)
104  payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
105  payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
106  # We don't need run 0 for the first exp as it's handled by output_iov
107  payload_boundaries.extend([ExpRun(exp, 0) for exp in sorted(exp_set)[1:]])
108  basf2.B2INFO(f"Final Boundaries: {payload_boundaries}")
109 
110  # run chunk creation
111  chunks_head = payload_boundaries
112  chunks_tail = payload_boundaries[1:] + [ExpRun(inf, inf)]
113  iov_chunks = [list(g) for k, g in groupby(sorted(input_iov_set_physics),
114  lambda x: [i for i, j in zip(chunks_head, chunks_tail) if i <= x < j])]
115 
116  # Create calibrations from chunks
117  input_file_to_iov = reduced_file_to_iov_physics
118  iCal = 0
119  for ichunk, chunk in enumerate(iov_chunks):
120  first_iov = IoV(chunk[0].exp_low, chunk[0].run_low, -1, -1)
121  last_iov = IoV(chunk[-1].exp_low, chunk[-1].run_low, -1, -1)
122  if last_iov < output_iov: # All the chunk iovs are earlier than the requested
123  continue
124  else:
125  input_files = list(chain.from_iterable([list(g) for k, g in groupby(
126  input_file_to_iov, lambda x: input_file_to_iov[x] in chunk) if k]))
127  # Check the minimum number of files in the physics/beam run chunk
128  if len(input_files) < min_files_per_chunk:
129  basf2.B2WARNING(f"No enough file in sub run chunk [{chunk[0]},{chunk[-1]}]: {len(input_files)},\
130 but {min_files_per_chunk} required!")
131  continue
132  # From the second chunk within the requested range, we have the iov defined by the first run
133  specific_iov = first_iov if iCal > 0 else output_iov
134  basf2.B2INFO(f"Total number of files actually used as input = {len(input_files)} for the output {specific_iov}")
135  cal_name = f"{ichunk+1}_PXDAnalyticGainCalibration"
136  if (not debug):
137  cal = gain_calibration(
138  cal_name=cal_name,
139  gain_method=gain_method,
140  # boundaries=vector_from_runs(payload_boundaries),
141  input_files=input_files,
142  **cal_kwargs)
143  for alg in cal.algorithms:
144  alg.params["iov_coverage"] = specific_iov
145  cal_list.append(cal)
146  else:
147  basf2.B2INFO(f"Dry run on Calibration(name={cal_name})")
148  iCal += 1
149 
150  # The number of calibrations depends on the 'chunking' above. We would like to make sure that the total number of
151  # batch jobs submitted is approximately constant and reasonable, no matter how many files and chunks are used.
152  # So we define 1000 total jobs and split this between the calibrations depending on the fraction of total input
153  # files in the calibrations.
154 
155  # total_jobs = expert_config["total_jobs"]
156  total_input_files = len(reduced_file_to_iov_physics)
157 
158  for cal in cal_list:
159  fraction_of_input_files = len(cal.input_files) / total_input_files
160  # Assign the max collector jobs to be roughly the same fraction of total jobs
161  cal.max_collector_jobs = ceil(fraction_of_input_files * total_jobs)
162  basf2.B2INFO(f"{cal.name} will submit a maximum of {cal.max_collector_jobs} batch jobs")
163 
164  return cal_list
pxd.calibration
Definition: __init__.py:1
prompt.utils
Definition: utils.py:1