10 Airflow script for automatic CDC dEdx calibration. It is currently for the electron based
11 calibration, where at present only RunGain, injection time, Cosine and WireGain are implimented.
12 The remaining two 2D and 1D will be implimented in the near future.
14 Second part called "Hadron calibration" are not compitable with CAF/AirFlow
15 and will be done offline for a while.
19 from ROOT
import gSystem
20 from ROOT.Belle2
import CDCDedxRunGainAlgorithm, CDCDedxCosineAlgorithm, CDCDedxWireGainAlgorithm
21 from ROOT.Belle2
import CDCDedxCosEdgeAlgorithm, CDCDedxBadWireAlgorithm, CDCDedxInjectTimeAlgorithm
22 from caf.framework
import Calibration
23 from caf.strategies
import SequentialRunByRun, SequentialBoundaries
24 from prompt
import CalibrationSettings, INPUT_DATA_FILTERS
25 import reconstruction
as recon
26 from random
import seed
29 gSystem.Load(
'libreconstruction.so')
30 ROOT.gROOT.SetBatch(
True)
32 settings = CalibrationSettings(
34 expert_username=
"renu",
36 input_data_formats=[
"cdst"],
37 input_data_names=[
"bhabha_all_calib"],
39 "payload_boundaries": [],
40 "calib_datamode":
False,
44 "adjustment": 1.00798,
46 "calibration_procedure": {
"rungain0": 0,
"rungain1": 0,
"rungain2": 0}
50 INPUT_DATA_FILTERS[
'Run Type'][
'physics'],
51 INPUT_DATA_FILTERS[
'Data Tag'][
'bhabha_all_calib'],
52 INPUT_DATA_FILTERS[
'Data Quality Tag'][
'Good Or Recoverable'],
53 INPUT_DATA_FILTERS[
'Magnet'][
'On'],
54 INPUT_DATA_FILTERS[
'Beam Energy'][
'4S'],
55 INPUT_DATA_FILTERS[
'Beam Energy'][
'Continuum'],
56 INPUT_DATA_FILTERS[
'Beam Energy'][
'Scan']]},
60 def get_calibrations(input_data, **kwargs):
61 """ REQUIRED FUNCTION used by b2caf-prompt-run tool
62 This function return a list of Calibration
63 objects we assign to the CAF process
67 file_to_iov_physics = input_data[
"bhabha_all_calib"]
69 expert_config = kwargs.get(
"expert_config")
70 calib_mode = expert_config[
"calib_mode"]
73 fulldataMode = expert_config[
"calib_datamode"]
74 adjustment = expert_config[
"adjustment"]
77 input_files_rungain = list(file_to_iov_physics.keys())
78 input_files_coscorr = list(file_to_iov_physics.keys())
79 input_files_wiregain = list(file_to_iov_physics.keys())
83 maxevt_rg = expert_config[
"maxevt_rg"]
84 maxevt_cc = expert_config[
"maxevt_cc"]
85 maxevt_wg = expert_config[
"maxevt_wg"]
87 from prompt.utils import filter_by_max_events_per_run, filter_by_select_max_events_from_files
90 max_files_for_maxevents = maxevt_rg
91 reduced_file_to_iov_rungain = filter_by_max_events_per_run(file_to_iov_physics, max_files_for_maxevents,
True)
92 input_files_rungain = list(reduced_file_to_iov_rungain.keys())
93 basf2.B2INFO(f
"Total number of files used for rungains = {len(input_files_rungain)}")
96 input_files_coscorr = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_cc)
97 basf2.B2INFO(f
"Total number of files used for cosine = {len(input_files_coscorr)}")
98 if not input_files_coscorr:
100 f
"Cosine: all requested ({maxevt_cc}) events not found")
103 if maxevt_wg == maxevt_cc:
104 input_files_wiregain = input_files_coscorr
106 input_files_wiregain = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_wg)
108 basf2.B2INFO(f
"Total number of files used for wiregains = {len(input_files_wiregain)}")
109 if not input_files_wiregain:
111 f
"WireGain: all requested ({maxevt_wg}) events not found")
113 requested_iov = kwargs.get(
"requested_iov",
None)
114 from caf.utils
import ExpRun, IoV
115 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
117 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
118 payload_boundaries.extend([ExpRun(*boundary)
for boundary
in expert_config[
"payload_boundaries"]])
119 basf2.B2INFO(f
"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
121 collector_granularity =
'all'
122 if expert_config[
"payload_boundaries"]
is not None:
123 basf2.B2INFO(
'Found payload_boundaries: set collector granularity to run')
124 collector_granularity =
'run'
126 if calib_mode ==
"full":
127 calibration_procedure = {
139 elif calib_mode ==
"quick":
140 calibration_procedure = {
150 elif calib_mode ==
"manual":
151 calibration_procedure = expert_config[
"calibration_procedure"]
153 basf2.B2FATAL(f
"Calibration mode is not defined {calib_mode}, should be full, quick, or manual")
155 calib_keys = list(calibration_procedure)
156 cals = [
None]*len(calib_keys)
157 basf2.B2INFO(f
"Run calibration mode = {calib_mode}:")
159 for i
in range(len(cals)):
160 max_iter = calibration_procedure[calib_keys[i]]
162 data_files = [input_files_rungain, input_files_coscorr, input_files_wiregain]
163 cal_name =
''.join([i
for i
in calib_keys[i]
if not i.isdigit()])
164 if cal_name ==
"rungain":
165 alg = [rungain_algo(calib_keys[i], adjustment)]
166 elif cal_name ==
"coscorr":
168 elif cal_name ==
"cosedge":
169 alg = [cosedge_algo()]
170 elif cal_name ==
"timegain":
171 alg = [injection_time_algo()]
172 elif cal_name ==
"badwire":
173 alg = [badwire_algo()]
174 elif cal_name ==
"wiregain":
175 alg = [wiregain_algo()]
177 basf2.B2FATAL(f
"The calibration is not defined, check spelling: calib {i}: {calib_keys[i]}")
179 basf2.B2INFO(f
"calibration for {calib_keys[i]} with number of iteration={max_iter}")
181 cals[i] = CDCDedxCalibration(name=calib_keys[i],
183 input_file_dict=data_files,
184 max_iterations=max_iter,
185 collector_granularity=collector_granularity,
186 dependencies=[cals[i-1]]
if i > 0
else None
188 if payload_boundaries:
189 basf2.B2INFO(
"Found payload_boundaries: calibration strategies set to SequentialBoundaries.")
190 if cal_name ==
"rungain" or cal_name ==
"timegain":
191 cals[i].strategies = SequentialRunByRun
192 for algorithm
in cals[i].algorithms:
193 algorithm.params = {
"iov_coverage": output_iov}
194 if calib_keys[i] ==
"rungain0" or calib_keys[i] ==
"rungain1" or calib_keys[i] ==
"timegain0":
195 cals[i].save_payloads =
False
197 cals[i].strategies = SequentialBoundaries
198 for algorithm
in cals[i].algorithms:
199 algorithm.params = {
"iov_coverage": output_iov,
"payload_boundaries": payload_boundaries}
200 if calib_keys[i] ==
"coscorr0":
201 cals[i].save_payloads =
False
204 for algorithm
in cals[i].algorithms:
205 algorithm.params = {
"apply_iov": output_iov}
211 def pre_collector(name='rg'):
213 Define pre collection.
215 name : name of the calibration
216 rungain rungain0 by Default.
218 path : path for pre collection
221 reco_path = basf2.create_path()
222 recon.prepare_cdst_analysis(path=reco_path)
223 if (name ==
"timegain"):
224 trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_radee"])
225 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
229 elif (name ==
"cosedge"):
230 trg_bhabhaskim = reco_path.add_module(
233 "software_trigger_cut&skim&accept_bhabha",
234 "software_trigger_cut&filter&ee_flat_90_180",
235 "software_trigger_cut&filter&ee_flat_0_19"])
236 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
238 trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_bhabha"])
239 trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
241 reco_path.add_module(
243 relativeCorrections=
False,
256 def collector(granularity='all', name=''):
258 Create a cdcdedx calibration collector
260 name : name of calibration
261 granularity : granularity : all or run
263 collector : collector module
266 from basf2
import register_module
267 col = register_module(
'CDCDedxElectronCollector', cleanupCuts=
True)
268 if name ==
"timegain":
269 CollParam = {
'isRun':
True,
'isInjTime':
True,
'granularity':
'run'}
271 elif name ==
"coscorr" or name ==
"cosedge":
272 CollParam = {
'isCharge':
True,
'isCosth':
True,
'granularity': granularity}
274 elif name ==
"badwire":
276 CollParam = {
'isWire':
True,
'isDedxhit': isHit,
'isADCcorr':
not isHit,
'granularity': granularity}
278 elif name ==
"wiregain":
279 CollParam = {
'isWire':
True,
'isDedxhit':
True,
'granularity': granularity}
282 CollParam = {
'isRun':
True,
'granularity':
'run'}
290 def rungain_algo(name, adjustment):
292 Create a rungain calibration algorithm.
294 algo : rungain algorithm
296 algo = CDCDedxRunGainAlgorithm()
297 algo.setMonitoringPlots(
True)
298 if name ==
"rungain2":
299 algo.setAdjustment(adjustment)
305 def injection_time_algo():
307 Create a injection time calibration algorithm.
309 algo : injection time algorithm
311 algo = CDCDedxInjectTimeAlgorithm()
312 algo.setMonitoringPlots(
True)
320 Create a cosine calibration algorithm.
322 algo : cosine algorithm
324 algo = CDCDedxCosineAlgorithm()
325 algo.setMonitoringPlots(
True)
333 Create a cosine edge calibration algorithm.
335 algo : cosine edge algorithm
337 algo = CDCDedxCosEdgeAlgorithm()
338 algo.setMonitoringPlots(
True)
346 Create a badwire calibration algorithm.
348 algo : badwire algorithm
350 algo = CDCDedxBadWireAlgorithm()
352 algo.setHighFracThres(0.2)
353 algo.setMeanThres(0.4)
354 algo.setRMSThres(0.4)
355 algo.setHistPars(150, 0, 5)
356 algo.setMonitoringPlots(
True)
364 Create a wire gain calibration algorithm.
366 algo : wiregain algorithm
368 algo = CDCDedxWireGainAlgorithm()
369 algo.enableExtraPlots(
True)
375 CDCDedxCalibration is a specialized calibration for cdcdedx.
384 collector_granularity='All'):
387 name: name of calibration
388 algorithims: algorithm of calibration
389 input_file_dict: input files list
390 max_iterations: maximum number of iterations
391 dependenices: depends on the previous calibration
392 collector_granularity: granularity : all or run
395 algorithms=algorithms
398 from caf.framework
import Collection
399 cal_name =
''.join([i
for i
in name
if not i.isdigit()])
400 if cal_name ==
"badwire" or cal_name ==
"wiregain":
401 collection =
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
402 input_files=input_file_dict[2],
403 pre_collector_path=pre_collector(cal_name)
405 elif cal_name ==
"coscorr" or cal_name ==
"cosedge":
406 collection =
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
407 input_files=input_file_dict[1],
408 pre_collector_path=pre_collector(cal_name)
411 collection =
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
412 input_files=input_file_dict[0],
413 pre_collector_path=pre_collector(cal_name)
415 self.add_collection(name=cal_name, collection=collection)
420 if dependencies
is not None:
421 for dep
in dependencies:
max_iterations
maximum iterations
def __init__(self, name, algorithms, input_file_dict, max_iterations=5, dependencies=None, collector_granularity='All')