10Airflow script for automatic CDC dEdx calibration. It is the electron based
11calibration, where at present only RunGain, injection time, Cosine, WireGain and 1D are implemented.
12The remaining two 2D will be implemented in the near future.
16from ROOT
import gSystem
17from ROOT.Belle2
import CDCDedxRunGainAlgorithm, CDCDedxCosineAlgorithm, CDCDedxWireGainAlgorithm
18from ROOT.Belle2
import CDCDedxCosEdgeAlgorithm, CDCDedxBadWireAlgorithm, CDCDedxInjectTimeAlgorithm
19from ROOT.Belle2
import CDCDedx1DCellAlgorithm
21from caf.framework
import Calibration
22from caf.strategies
import SequentialRunByRun, SequentialBoundaries
23from prompt
import CalibrationSettings, INPUT_DATA_FILTERS
24import reconstruction
as recon
25from random
import seed
28gSystem.Load(
'libreconstruction.so')
29ROOT.gROOT.SetBatch(
True)
31settings = CalibrationSettings(
33 expert_username=
"renu",
35 input_data_formats=[
"cdst"],
36 input_data_names=[
"bhabha_all_calib"],
38 "payload_boundaries": [],
39 "calib_datamode":
False,
43 "adjustment": 1.00798,
45 "calibration_procedure": {
"rungain0": 0,
"rungain1": 0,
"rungain2": 0}
49 INPUT_DATA_FILTERS[
'Run Type'][
'physics'],
50 INPUT_DATA_FILTERS[
'Data Tag'][
'bhabha_all_calib'],
51 INPUT_DATA_FILTERS[
'Data Quality Tag'][
'Good Or Recoverable'],
52 INPUT_DATA_FILTERS[
'Magnet'][
'On'],
53 INPUT_DATA_FILTERS[
'Beam Energy'][
'4S'],
54 INPUT_DATA_FILTERS[
'Beam Energy'][
'Continuum'],
55 INPUT_DATA_FILTERS[
'Beam Energy'][
'Scan']]},
59def get_calibrations(input_data, **kwargs):
60 """ REQUIRED FUNCTION used by b2caf-prompt-run tool
61 This function return a list of Calibration
62 objects we assign to the CAF process
66 file_to_iov_physics = input_data[
"bhabha_all_calib"]
68 expert_config = kwargs.get(
"expert_config")
69 calib_mode = expert_config[
"calib_mode"]
72 fulldataMode = expert_config[
"calib_datamode"]
73 adjustment = expert_config[
"adjustment"]
76 input_files_rungain = list(file_to_iov_physics.keys())
77 input_files_coscorr = list(file_to_iov_physics.keys())
78 input_files_wiregain = list(file_to_iov_physics.keys())
82 maxevt_rg = expert_config[
"maxevt_rg"]
83 maxevt_cc = expert_config[
"maxevt_cc"]
84 maxevt_wg = expert_config[
"maxevt_wg"]
86 from prompt.utils import filter_by_max_events_per_run, filter_by_select_max_events_from_files
89 max_files_for_maxevents = maxevt_rg
90 reduced_file_to_iov_rungain = filter_by_max_events_per_run(file_to_iov_physics, max_files_for_maxevents,
True)
91 input_files_rungain = list(reduced_file_to_iov_rungain.keys())
92 basf2.B2INFO(f
"Total number of files used for rungains = {len(input_files_rungain)}")
95 input_files_coscorr = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_cc)
96 basf2.B2INFO(f
"Total number of files used for cosine = {len(input_files_coscorr)}")
97 if not input_files_coscorr:
99 f
"Cosine: all requested ({maxevt_cc}) events not found")
102 if maxevt_wg == maxevt_cc:
103 input_files_wiregain = input_files_coscorr
105 input_files_wiregain = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_wg)
107 basf2.B2INFO(f
"Total number of files used for wiregains = {len(input_files_wiregain)}")
108 if not input_files_wiregain:
110 f
"WireGain: all requested ({maxevt_wg}) events not found")
112 requested_iov = kwargs.get(
"requested_iov",
None)
113 from caf.utils
import ExpRun, IoV
114 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
116 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
117 payload_boundaries.extend([ExpRun(*boundary)
for boundary
in expert_config[
"payload_boundaries"]])
118 basf2.B2INFO(f
"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
120 collector_granularity =
'all'
121 if expert_config[
"payload_boundaries"]
is not None:
122 basf2.B2INFO(
'Found payload_boundaries: set collector granularity to run')
123 collector_granularity =
'run'
125 if calib_mode ==
"full":
126 calibration_procedure = {
139 elif calib_mode ==
"quick":
140 calibration_procedure = {
150 elif calib_mode ==
"manual":
151 calibration_procedure = expert_config[
"calibration_procedure"]
153 basf2.B2FATAL(f
"Calibration mode is not defined {calib_mode}, should be full, quick, or manual")
155 calib_keys = list(calibration_procedure)
156 cals = [
None]*len(calib_keys)
157 basf2.B2INFO(f
"Run calibration mode = {calib_mode}:")
159 for i
in range(len(cals)):
160 max_iter = calibration_procedure[calib_keys[i]]
162 data_files = [input_files_rungain, input_files_coscorr, input_files_wiregain]
163 cal_name =
''.join([i
for i
in calib_keys[i]
if not i.isdigit()])
164 if cal_name ==
"rungain":
165 alg = [rungain_algo(calib_keys[i], adjustment)]
166 elif cal_name ==
"coscorr":
168 elif cal_name ==
"cosedge":
169 alg = [cosedge_algo()]
170 elif cal_name ==
"timegain":
171 alg = [injection_time_algo()]
172 elif cal_name ==
"badwire":
173 alg = [badwire_algo()]
174 elif cal_name ==
"wiregain":
175 alg = [wiregain_algo()]
176 elif cal_name ==
"onedcell":
177 alg = [onedcell_algo()]
179 basf2.B2FATAL(f
"The calibration is not defined, check spelling: calib {i}: {calib_keys[i]}")
181 basf2.B2INFO(f
"calibration for {calib_keys[i]} with number of iteration={max_iter}")
183 cals[i] = CDCDedxCalibration(name=calib_keys[i],
185 input_file_dict=data_files,
186 max_iterations=max_iter,
187 collector_granularity=collector_granularity,
188 dependencies=[cals[i-1]]
if i > 0
else None
190 if payload_boundaries:
191 basf2.B2INFO(
"Found payload_boundaries: calibration strategies set to SequentialBoundaries.")
192 if cal_name ==
"rungain" or cal_name ==
"timegain":
193 cals[i].strategies = SequentialRunByRun
194 for algorithm
in cals[i].algorithms:
195 algorithm.params = {
"iov_coverage": output_iov}
196 if calib_keys[i] ==
"rungain0" or calib_keys[i] ==
"rungain1" or calib_keys[i] ==
"timegain0":
197 cals[i].save_payloads =
False
199 cals[i].strategies = SequentialBoundaries
200 for algorithm
in cals[i].algorithms:
201 algorithm.params = {
"iov_coverage": output_iov,
"payload_boundaries": payload_boundaries}
202 if calib_keys[i] ==
"coscorr0":
203 cals[i].save_payloads =
False
206 for algorithm
in cals[i].algorithms:
207 algorithm.params = {
"apply_iov": output_iov}
213def pre_collector(name='rg'):
215 Define pre collection.
217 name : name of the calibration
218 rungain rungain0 by Default.
220 path : path for pre collection
223 reco_path = basf2.create_path()
224 recon.prepare_cdst_analysis(path=reco_path)
225 if (name ==
"timegain" or name ==
"onedcell"):
226 trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_radee"])
227 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
228 ps_bhabhaskim = reco_path.add_module(
"Prescale", prescale=0.80)
229 ps_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
231 elif (name ==
"cosedge"):
232 trg_bhabhaskim = reco_path.add_module(
235 "software_trigger_cut&skim&accept_bhabha",
236 "software_trigger_cut&filter&ee_flat_90_180",
237 "software_trigger_cut&filter&ee_flat_0_19"])
238 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
240 trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_bhabha"])
241 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
243 reco_path.add_module(
245 relativeCorrections=
False,
258def collector(granularity='all', name=''):
260 Create a cdcdedx calibration collector
262 name : name of calibration
263 granularity : granularity : all or run
265 collector : collector module
268 from basf2
import register_module
269 col = register_module(
'CDCDedxElectronCollector', cleanupCuts=
True)
270 if name ==
"timegain":
271 CollParam = {
'isRun':
True,
'isInjTime':
True,
'granularity':
'run'}
273 elif name ==
"coscorr" or name ==
"cosedge":
274 CollParam = {
'isCharge':
True,
'isCosth':
True,
'granularity': granularity}
276 elif name ==
"badwire":
278 CollParam = {
'isWire':
True,
'isDedxhit': isHit,
'isADCcorr':
not isHit,
'granularity': granularity}
280 elif name ==
"wiregain":
281 CollParam = {
'isWire':
True,
'isDedxhit':
True,
'granularity': granularity}
283 elif name ==
"onedcell":
290 'granularity': granularity}
293 CollParam = {
'isRun':
True,
'granularity':
'run'}
301def rungain_algo(name, adjustment):
303 Create a rungain calibration algorithm.
305 algo : rungain algorithm
307 algo = CDCDedxRunGainAlgorithm()
308 algo.setMonitoringPlots(True)
309 if name ==
"rungain2":
310 algo.setAdjustment(adjustment)
316def injection_time_algo():
318 Create a injection time calibration algorithm.
320 algo : injection time algorithm
322 algo = CDCDedxInjectTimeAlgorithm()
323 algo.setMonitoringPlots(True)
331 Create a cosine calibration algorithm.
333 algo : cosine algorithm
335 algo = CDCDedxCosineAlgorithm()
336 algo.setMonitoringPlots(True)
344 Create a cosine edge calibration algorithm.
346 algo : cosine edge algorithm
348 algo = CDCDedxCosEdgeAlgorithm()
349 algo.setMonitoringPlots(True)
357 Create a badwire calibration algorithm.
359 algo : badwire algorithm
361 algo = CDCDedxBadWireAlgorithm()
363 algo.setHighFracThres(0.2)
364 algo.setMeanThres(0.4)
365 algo.setRMSThres(0.4)
366 algo.setHistPars(150, 0, 5)
367 algo.setMonitoringPlots(
True)
375 Create a wire gain calibration algorithm.
377 algo : wiregain algorithm
379 algo = CDCDedxWireGainAlgorithm()
380 algo.enableExtraPlots(True)
386 Create oned cell calibration algorithm.
388 algo : oned cell correction algorithm
390 algo = CDCDedx1DCellAlgorithm()
391 algo.enableExtraPlots(True)
392 algo.setMergePayload(
True)
398 CDCDedxCalibration is a specialized calibration
for cdcdedx.
407 collector_granularity='All'):
410 name: name of calibration
411 algorithms: algorithm of calibration
412 input_file_dict: input files list
413 max_iterations: maximum number of iterations
414 dependencies: depends on the previous calibration
415 collector_granularity: granularity : all or run
418 algorithms=algorithms
421 from caf.framework
import Collection
422 cal_name =
''.join([i
for i
in name
if not i.isdigit()])
423 if cal_name ==
"badwire" or cal_name ==
"wiregain":
424 collection =
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
425 input_files=input_file_dict[2],
426 pre_collector_path=pre_collector(cal_name)
428 elif cal_name ==
"coscorr" or cal_name ==
"cosedge" or cal_name ==
"onedcell":
429 collection =
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
430 input_files=input_file_dict[1],
431 pre_collector_path=pre_collector(cal_name)
434 collection =
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
435 input_files=input_file_dict[0],
436 pre_collector_path=pre_collector(cal_name)
438 self.add_collection(name=cal_name, collection=collection)
443 if dependencies
is not None:
444 for dep
in dependencies:
max_iterations
maximum iterations
def __init__(self, name, algorithms, input_file_dict, max_iterations=5, dependencies=None, collector_granularity='All')