10Airflow script for automatic CDC dEdx calibration. It is currently for the electron based
11calibration, where at present only RunGain, injection time, Cosine and WireGain are implemented.
12The remaining two 2D and 1D will be implemented in the near future.
14Second part called "Hadron calibration" are not compatible with CAF/AirFlow
15and will be done offline for a while.
19from ROOT
import gSystem
20from ROOT.Belle2
import CDCDedxRunGainAlgorithm, CDCDedxCosineAlgorithm, CDCDedxWireGainAlgorithm
21from ROOT.Belle2
import CDCDedxCosEdgeAlgorithm, CDCDedxBadWireAlgorithm, CDCDedxInjectTimeAlgorithm
22from ROOT.Belle2
import CDCDedx1DCellAlgorithm
24from caf.framework
import Calibration
25from caf.strategies
import SequentialRunByRun, SequentialBoundaries
26from prompt
import CalibrationSettings, INPUT_DATA_FILTERS
27import reconstruction
as recon
28from random
import seed
31gSystem.Load(
'libreconstruction.so')
32ROOT.gROOT.SetBatch(
True)
34settings = CalibrationSettings(
36 expert_username=
"renu",
38 input_data_formats=[
"cdst"],
39 input_data_names=[
"bhabha_all_calib"],
41 "payload_boundaries": [],
42 "calib_datamode":
False,
46 "adjustment": 1.00798,
48 "calibration_procedure": {
"rungain0": 0,
"rungain1": 0,
"rungain2": 0}
52 INPUT_DATA_FILTERS[
'Run Type'][
'physics'],
53 INPUT_DATA_FILTERS[
'Data Tag'][
'bhabha_all_calib'],
54 INPUT_DATA_FILTERS[
'Data Quality Tag'][
'Good Or Recoverable'],
55 INPUT_DATA_FILTERS[
'Magnet'][
'On'],
56 INPUT_DATA_FILTERS[
'Beam Energy'][
'4S'],
57 INPUT_DATA_FILTERS[
'Beam Energy'][
'Continuum'],
58 INPUT_DATA_FILTERS[
'Beam Energy'][
'Scan']]},
62def get_calibrations(input_data, **kwargs):
63 """ REQUIRED FUNCTION used by b2caf-prompt-run tool
64 This function return a list of Calibration
65 objects we assign to the CAF process
69 file_to_iov_physics = input_data[
"bhabha_all_calib"]
71 expert_config = kwargs.get(
"expert_config")
72 calib_mode = expert_config[
"calib_mode"]
75 fulldataMode = expert_config[
"calib_datamode"]
76 adjustment = expert_config[
"adjustment"]
79 input_files_rungain = list(file_to_iov_physics.keys())
80 input_files_coscorr = list(file_to_iov_physics.keys())
81 input_files_wiregain = list(file_to_iov_physics.keys())
85 maxevt_rg = expert_config[
"maxevt_rg"]
86 maxevt_cc = expert_config[
"maxevt_cc"]
87 maxevt_wg = expert_config[
"maxevt_wg"]
89 from prompt.utils import filter_by_max_events_per_run, filter_by_select_max_events_from_files
92 max_files_for_maxevents = maxevt_rg
93 reduced_file_to_iov_rungain = filter_by_max_events_per_run(file_to_iov_physics, max_files_for_maxevents,
True)
94 input_files_rungain = list(reduced_file_to_iov_rungain.keys())
95 basf2.B2INFO(f
"Total number of files used for rungains = {len(input_files_rungain)}")
98 input_files_coscorr = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_cc)
99 basf2.B2INFO(f
"Total number of files used for cosine = {len(input_files_coscorr)}")
100 if not input_files_coscorr:
102 f
"Cosine: all requested ({maxevt_cc}) events not found")
105 if maxevt_wg == maxevt_cc:
106 input_files_wiregain = input_files_coscorr
108 input_files_wiregain = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_wg)
110 basf2.B2INFO(f
"Total number of files used for wiregains = {len(input_files_wiregain)}")
111 if not input_files_wiregain:
113 f
"WireGain: all requested ({maxevt_wg}) events not found")
115 requested_iov = kwargs.get(
"requested_iov",
None)
116 from caf.utils
import ExpRun, IoV
117 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
119 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
120 payload_boundaries.extend([ExpRun(*boundary)
for boundary
in expert_config[
"payload_boundaries"]])
121 basf2.B2INFO(f
"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
123 collector_granularity =
'all'
124 if expert_config[
"payload_boundaries"]
is not None:
125 basf2.B2INFO(
'Found payload_boundaries: set collector granularity to run')
126 collector_granularity =
'run'
128 if calib_mode ==
"full":
129 calibration_procedure = {
142 elif calib_mode ==
"quick":
143 calibration_procedure = {
153 elif calib_mode ==
"manual":
154 calibration_procedure = expert_config[
"calibration_procedure"]
156 basf2.B2FATAL(f
"Calibration mode is not defined {calib_mode}, should be full, quick, or manual")
158 calib_keys = list(calibration_procedure)
159 cals = [
None]*len(calib_keys)
160 basf2.B2INFO(f
"Run calibration mode = {calib_mode}:")
162 for i
in range(len(cals)):
163 max_iter = calibration_procedure[calib_keys[i]]
165 data_files = [input_files_rungain, input_files_coscorr, input_files_wiregain]
166 cal_name =
''.join([i
for i
in calib_keys[i]
if not i.isdigit()])
167 if cal_name ==
"rungain":
168 alg = [rungain_algo(calib_keys[i], adjustment)]
169 elif cal_name ==
"coscorr":
171 elif cal_name ==
"cosedge":
172 alg = [cosedge_algo()]
173 elif cal_name ==
"timegain":
174 alg = [injection_time_algo()]
175 elif cal_name ==
"badwire":
176 alg = [badwire_algo()]
177 elif cal_name ==
"wiregain":
178 alg = [wiregain_algo()]
179 elif cal_name ==
"onedcell":
180 alg = [onedcell_algo()]
182 basf2.B2FATAL(f
"The calibration is not defined, check spelling: calib {i}: {calib_keys[i]}")
184 basf2.B2INFO(f
"calibration for {calib_keys[i]} with number of iteration={max_iter}")
186 cals[i] = CDCDedxCalibration(name=calib_keys[i],
188 input_file_dict=data_files,
189 max_iterations=max_iter,
190 collector_granularity=collector_granularity,
191 dependencies=[cals[i-1]]
if i > 0
else None
193 if payload_boundaries:
194 basf2.B2INFO(
"Found payload_boundaries: calibration strategies set to SequentialBoundaries.")
195 if cal_name ==
"rungain" or cal_name ==
"timegain":
196 cals[i].strategies = SequentialRunByRun
197 for algorithm
in cals[i].algorithms:
198 algorithm.params = {
"iov_coverage": output_iov}
199 if calib_keys[i] ==
"rungain0" or calib_keys[i] ==
"rungain1" or calib_keys[i] ==
"timegain0":
200 cals[i].save_payloads =
False
202 cals[i].strategies = SequentialBoundaries
203 for algorithm
in cals[i].algorithms:
204 algorithm.params = {
"iov_coverage": output_iov,
"payload_boundaries": payload_boundaries}
205 if calib_keys[i] ==
"coscorr0":
206 cals[i].save_payloads =
False
209 for algorithm
in cals[i].algorithms:
210 algorithm.params = {
"apply_iov": output_iov}
216def pre_collector(name='rg'):
218 Define pre collection.
220 name : name of the calibration
221 rungain rungain0 by Default.
223 path : path for pre collection
226 reco_path = basf2.create_path()
227 recon.prepare_cdst_analysis(path=reco_path)
228 if (name ==
"timegain" or name ==
"onedcell"):
229 trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_radee"])
230 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
231 ps_bhabhaskim = reco_path.add_module(
"Prescale", prescale=0.80)
232 ps_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
234 elif (name ==
"cosedge"):
235 trg_bhabhaskim = reco_path.add_module(
238 "software_trigger_cut&skim&accept_bhabha",
239 "software_trigger_cut&filter&ee_flat_90_180",
240 "software_trigger_cut&filter&ee_flat_0_19"])
241 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
243 trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_bhabha"])
244 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
246 reco_path.add_module(
248 relativeCorrections=
False,
261def collector(granularity='all', name=''):
263 Create a cdcdedx calibration collector
265 name : name of calibration
266 granularity : granularity : all or run
268 collector : collector module
271 from basf2
import register_module
272 col = register_module(
'CDCDedxElectronCollector', cleanupCuts=
True)
273 if name ==
"timegain":
274 CollParam = {
'isRun':
True,
'isInjTime':
True,
'granularity':
'run'}
276 elif name ==
"coscorr" or name ==
"cosedge":
277 CollParam = {
'isCharge':
True,
'isCosth':
True,
'granularity': granularity}
279 elif name ==
"badwire":
281 CollParam = {
'isWire':
True,
'isDedxhit': isHit,
'isADCcorr':
not isHit,
'granularity': granularity}
283 elif name ==
"wiregain":
284 CollParam = {
'isWire':
True,
'isDedxhit':
True,
'granularity': granularity}
286 elif name ==
"onedcell":
293 'granularity': granularity}
296 CollParam = {
'isRun':
True,
'granularity':
'run'}
304def rungain_algo(name, adjustment):
306 Create a rungain calibration algorithm.
308 algo : rungain algorithm
310 algo = CDCDedxRunGainAlgorithm()
311 algo.setMonitoringPlots(True)
312 if name ==
"rungain2":
313 algo.setAdjustment(adjustment)
319def injection_time_algo():
321 Create a injection time calibration algorithm.
323 algo : injection time algorithm
325 algo = CDCDedxInjectTimeAlgorithm()
326 algo.setMonitoringPlots(True)
334 Create a cosine calibration algorithm.
336 algo : cosine algorithm
338 algo = CDCDedxCosineAlgorithm()
339 algo.setMonitoringPlots(True)
347 Create a cosine edge calibration algorithm.
349 algo : cosine edge algorithm
351 algo = CDCDedxCosEdgeAlgorithm()
352 algo.setMonitoringPlots(True)
360 Create a badwire calibration algorithm.
362 algo : badwire algorithm
364 algo = CDCDedxBadWireAlgorithm()
366 algo.setHighFracThres(0.2)
367 algo.setMeanThres(0.4)
368 algo.setRMSThres(0.4)
369 algo.setHistPars(150, 0, 5)
370 algo.setMonitoringPlots(
True)
378 Create a wire gain calibration algorithm.
380 algo : wiregain algorithm
382 algo = CDCDedxWireGainAlgorithm()
383 algo.enableExtraPlots(True)
389 Create oned cell calibration algorithm.
391 algo : oned cell correction algorithm
393 algo = CDCDedx1DCellAlgorithm()
394 algo.enableExtraPlots(True)
395 algo.setMergePayload(
True)
401 CDCDedxCalibration is a specialized calibration
for cdcdedx.
410 collector_granularity='All'):
413 name: name of calibration
414 algorithms: algorithm of calibration
415 input_file_dict: input files list
416 max_iterations: maximum number of iterations
417 dependencies: depends on the previous calibration
418 collector_granularity: granularity : all or run
421 algorithms=algorithms
424 from caf.framework
import Collection
425 cal_name =
''.join([i
for i
in name
if not i.isdigit()])
426 if cal_name ==
"badwire" or cal_name ==
"wiregain":
427 collection =
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
428 input_files=input_file_dict[2],
429 pre_collector_path=pre_collector(cal_name)
431 elif cal_name ==
"coscorr" or cal_name ==
"cosedge" or cal_name ==
"onedcell":
432 collection =
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
433 input_files=input_file_dict[1],
434 pre_collector_path=pre_collector(cal_name)
437 collection =
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
438 input_files=input_file_dict[0],
439 pre_collector_path=pre_collector(cal_name)
441 self.add_collection(name=cal_name, collection=collection)
446 if dependencies
is not None:
447 for dep
in dependencies:
max_iterations
maximum iterations
def __init__(self, name, algorithms, input_file_dict, max_iterations=5, dependencies=None, collector_granularity='All')