10 Airflow script for automatic CDC dEdx calibration. It is currently for the electron based
11 calibration, where at present only RunGain, injection time, Cosine and WireGain are implimented.
12 The remaining two 2D and 1D will be implimented in the near future.
14 Second part called "Hadron calibration" are not compitable with CAF/AirFlow
15 and will be done offline for a while.
19 from ROOT
import gSystem
20 from ROOT.Belle2
import CDCDedxRunGainAlgorithm, CDCDedxCosineAlgorithm, CDCDedxWireGainAlgorithm
21 from ROOT.Belle2
import CDCDedxCosEdgeAlgorithm, CDCDedxBadWireAlgorithm, CDCDedxInjectTimeAlgorithm
22 from caf.framework
import Calibration
23 from caf.strategies
import SequentialRunByRun, SequentialBoundaries
24 from prompt
import CalibrationSettings, INPUT_DATA_FILTERS
25 import reconstruction
as recon
26 from random
import seed
29 gSystem.Load(
'libreconstruction.so')
30 ROOT.gROOT.SetBatch(
True)
32 settings = CalibrationSettings(
34 expert_username=
"renu",
36 input_data_formats=[
"cdst"],
37 input_data_names=[
"bhabha_all_calib"],
39 "payload_boundaries": [],
40 "calib_datamode":
False,
44 "adjustment": 1.00798,
46 "calibration_procedure": {
"rgtrail0": 0,
"rgpre0": 0,
"rg0": 0}
50 INPUT_DATA_FILTERS[
'Run Type'][
'physics'],
51 INPUT_DATA_FILTERS[
'Data Tag'][
'bhabha_all_calib'],
52 INPUT_DATA_FILTERS[
'Data Quality Tag'][
'Good Or Recoverable'],
53 INPUT_DATA_FILTERS[
'Magnet'][
'On'],
54 INPUT_DATA_FILTERS[
'Beam Energy'][
'4S'],
55 INPUT_DATA_FILTERS[
'Beam Energy'][
'Continuum'],
56 INPUT_DATA_FILTERS[
'Beam Energy'][
'Scan']]},
60 def get_calibrations(input_data, **kwargs):
61 """ REQUIRED FUNCTION used by b2caf-prompt-run tool
62 This function return a list of Calibration
63 objects we assign to the CAF process
67 file_to_iov_physics = input_data[
"bhabha_all_calib"]
69 expert_config = kwargs.get(
"expert_config")
70 calib_mode = expert_config[
"calib_mode"]
73 fulldataMode = expert_config[
"calib_datamode"]
74 adjustment = expert_config[
"adjustment"]
77 input_files_rungain = list(file_to_iov_physics.keys())
78 input_files_coscorr = list(file_to_iov_physics.keys())
79 input_files_wiregain = list(file_to_iov_physics.keys())
83 maxevt_rg = expert_config[
"maxevt_rg"]
84 maxevt_cc = expert_config[
"maxevt_cc"]
85 maxevt_wg = expert_config[
"maxevt_wg"]
87 from prompt.utils import filter_by_max_events_per_run, filter_by_select_max_events_from_files
90 max_files_for_maxevents = maxevt_rg
91 reduced_file_to_iov_rungain = filter_by_max_events_per_run(file_to_iov_physics, max_files_for_maxevents,
True)
92 input_files_rungain = list(reduced_file_to_iov_rungain.keys())
93 basf2.B2INFO(f
"Total number of files used for rungains = {len(input_files_rungain)}")
96 input_files_coscorr = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_cc)
97 basf2.B2INFO(f
"Total number of files used for cosine = {len(input_files_coscorr)}")
98 if not input_files_coscorr:
100 f
"Cosine: all requested ({maxevt_cc}) events not found")
103 if maxevt_wg == maxevt_cc:
104 input_files_wiregain = input_files_coscorr
106 input_files_wiregain = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_wg)
108 basf2.B2INFO(f
"Total number of files used for wiregains = {len(input_files_wiregain)}")
109 if not input_files_wiregain:
111 f
"WireGain: all requested ({maxevt_wg}) events not found")
113 requested_iov = kwargs.get(
"requested_iov",
None)
114 from caf.utils
import ExpRun, IoV
115 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
117 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
118 payload_boundaries.extend([ExpRun(*boundary)
for boundary
in expert_config[
"payload_boundaries"]])
119 basf2.B2INFO(f
"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
121 collector_granularity =
'all'
122 if expert_config[
"payload_boundaries"]
is not None:
123 basf2.B2INFO(
'Found payload_boundaries: set collector granularity to run')
124 collector_granularity =
'run'
126 if calib_mode ==
"full":
127 calibration_procedure = {
138 elif calib_mode ==
"quick":
139 calibration_procedure = {
147 elif calib_mode ==
"manual":
148 calibration_procedure = expert_config[
"calibration_procedure"]
150 basf2.B2FATAL(f
"Calibration mode is not defined {calib_mode}, should be quick, full, or manual")
152 calib_keys = list(calibration_procedure)
153 cals = [
None]*len(calib_keys)
154 basf2.B2INFO(f
"Run calibration mode = {calib_mode}:")
157 for i
in range(len(cals)):
158 max_iter = calibration_procedure[calib_keys[i]]
160 data_files = [input_files_rungain, input_files_coscorr, input_files_wiregain]
161 cal_name =
''.join([i
for i
in calib_keys[i]
if not i.isdigit()])
162 if cal_name ==
"rg" or cal_name ==
"rgtrail" or cal_name ==
"rgpre":
163 alg = [rungain_algo(cal_name, adjustment)]
164 elif cal_name ==
"cc":
166 elif cal_name ==
"ce":
167 alg = [cosedge_algo()]
168 elif cal_name ==
"tg" or cal_name ==
"tgpre":
169 alg = [injection_time_algo()]
170 elif cal_name ==
"bd":
171 alg = [badwire_algo()]
172 elif cal_name ==
"wg":
173 alg = [wiregain_algo()]
175 basf2.B2FATAL(f
"The calibration is not defined, check spelling: calib {i}: {calib_keys[i]}")
177 basf2.B2INFO(f
"calibration for {calib_keys[i]} with number of iteration={max_iter}")
179 cals[i] = CDCDedxCalibration(name=cal_name,
181 input_file_dict=data_files,
182 max_iterations=max_iter,
183 collector_granularity=collector_granularity,
184 dependencies=[cals[i-1]]
if i > 0
else None
186 if payload_boundaries:
187 basf2.B2INFO(
"Found payload_boundaries: calibration strategies set to SequentialBoundaries.")
188 if cal_name ==
"rg" or cal_name ==
"rgtrail" or cal_name ==
"rgpre" or cal_name ==
"tg" or cal_name ==
"tgpre":
189 cals[i].strategies = SequentialRunByRun
190 for algorithm
in cals[i].algorithms:
191 algorithm.params = {
"iov_coverage": output_iov}
192 if cal_name ==
"rgtrail" or cal_name ==
"rgpre" or cal_name ==
"tgpre":
193 cals[i].save_payloads =
False
195 cals[i].strategies = SequentialBoundaries
196 for algorithm
in cals[i].algorithms:
197 algorithm.params = {
"iov_coverage": output_iov,
"payload_boundaries": payload_boundaries}
200 for algorithm
in cals[i].algorithms:
201 algorithm.params = {
"apply_iov": output_iov}
207 def pre_collector(name='rg'):
209 Define pre collection.
211 name : name of the calibration
212 rungain rg0 by Default.
214 path : path for pre collection
217 reco_path = basf2.create_path()
218 recon.prepare_cdst_analysis(path=reco_path)
219 if(name ==
"tg" or name ==
"tgpre"):
220 trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_radee"])
221 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
226 trg_bhabhaskim = reco_path.add_module(
229 "software_trigger_cut&skim&accept_bhabha",
230 "software_trigger_cut&filter&ee_flat_90_180",
231 "software_trigger_cut&filter&ee_flat_0_19"])
232 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
234 trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_bhabha"])
235 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
237 reco_path.add_module(
239 relativeCorrections=
False,
252 def collector(granularity='all', name=''):
254 Create a cdcdedx calibration collector
256 name : name of calibration
257 granularity : granularity : all or run
259 collector : collector module
262 from basf2
import register_module
263 col = register_module(
'CDCDedxElectronCollector', cleanupCuts=
True)
264 if name ==
"tg" or name ==
"tgpre":
265 CollParam = {
'isRun':
True,
'isInjTime':
True,
'granularity':
'run'}
267 elif name ==
"cc" or name ==
"ce":
268 CollParam = {
'isCharge':
True,
'isCosth':
True,
'granularity': granularity}
272 CollParam = {
'isWire':
True,
'isDedxhit': isHit,
'isADCcorr':
not isHit,
'granularity': granularity}
275 CollParam = {
'isWire':
True,
'isDedxhit':
True,
'granularity': granularity}
278 CollParam = {
'isRun':
True,
'granularity':
'run'}
286 def rungain_algo(name, adjustment):
288 Create a rungain calibration algorithm.
290 algo : rungain algorithm
292 algo = CDCDedxRunGainAlgorithm()
293 algo.setMonitoringPlots(
True)
295 algo.setAdjustment(adjustment)
301 def injection_time_algo():
303 Create a injection time calibration algorithm.
305 algo : injection time algorithm
307 algo = CDCDedxInjectTimeAlgorithm()
308 algo.setMonitoringPlots(
True)
316 Create a cosine calibration algorithm.
318 algo : cosine algorithm
320 algo = CDCDedxCosineAlgorithm()
321 algo.setMonitoringPlots(
True)
329 Create a cosine edge calibration algorithm.
331 algo : cosine edge algorithm
333 algo = CDCDedxCosEdgeAlgorithm()
334 algo.setMonitoringPlots(
True)
342 Create a badwire calibration algorithm.
344 algo : badwire algorithm
346 algo = CDCDedxBadWireAlgorithm()
348 algo.setHighFracThres(0.2)
349 algo.setMeanThres(0.4)
350 algo.setRMSThres(0.4)
351 algo.setHistPars(150, 0, 5)
352 algo.setMonitoringPlots(
True)
360 Create a wire gain calibration algorithm.
362 algo : wiregain algorithm
364 algo = CDCDedxWireGainAlgorithm()
365 algo.enableExtraPlots(
True)
371 CDCDedxCalibration is a specialized calibration for cdcdedx.
380 collector_granularity='All'):
383 name: name of calibration
384 algorithims: algorithm of calibration
385 input_file_dict: input files list
386 max_iterations: maximum number of iterations
387 dependenices: depends on the previous calibration
388 collector_granularity: granularity : all or run
391 algorithms=algorithms
394 from caf.framework
import Collection
396 if name ==
"bd" or name ==
"wg":
398 input_files=input_file_dict[2],
399 pre_collector_path=pre_collector(name)
401 elif name ==
"cc" or name ==
"ce":
403 input_files=input_file_dict[1],
404 pre_collector_path=pre_collector(name)
408 input_files=input_file_dict[0],
409 pre_collector_path=pre_collector(name)
411 self.add_collection(name=name, collection=collection)
416 if dependencies
is not None:
417 for dep
in dependencies:
max_iterations
maximum iterations
def __init__(self, name, algorithms, input_file_dict, max_iterations=5, dependencies=None, collector_granularity='All')