10 Airflow script for automatic CDC dEdx calibration. It is currently for the electron based 
   11 calibration, where at present only RunGain, injection time, Cosine and WireGain are implimented. 
   12 The remaining two 2D and 1D will be implimented in the near future. 
   14 Second part called "Hadron calibration" are not compitable with CAF/AirFlow 
   15 and will be done offline for a while. 
   19 from ROOT 
import gSystem
 
   20 from ROOT.Belle2 
import CDCDedxRunGainAlgorithm, CDCDedxCosineAlgorithm, CDCDedxWireGainAlgorithm
 
   21 from ROOT.Belle2 
import CDCDedxCosEdgeAlgorithm, CDCDedxBadWireAlgorithm, CDCDedxInjectTimeAlgorithm
 
   22 from ROOT.Belle2 
import CDCDedx1DCellAlgorithm
 
   24 from caf.framework 
import Calibration
 
   25 from caf.strategies 
import SequentialRunByRun, SequentialBoundaries
 
   26 from prompt 
import CalibrationSettings, INPUT_DATA_FILTERS
 
   27 import reconstruction 
as recon
 
   28 from random 
import seed
 
   31 gSystem.Load(
'libreconstruction.so')
 
   32 ROOT.gROOT.SetBatch(
True)
 
   34 settings = CalibrationSettings(
 
   36     expert_username=
"renu",
 
   38     input_data_formats=[
"cdst"],
 
   39     input_data_names=[
"bhabha_all_calib"],
 
   41         "payload_boundaries": [],
 
   42         "calib_datamode": 
False,
 
   46         "adjustment": 1.00798,
 
   48         "calibration_procedure": {
"rungain0": 0, 
"rungain1": 0, 
"rungain2": 0}
 
   52             INPUT_DATA_FILTERS[
'Run Type'][
'physics'],
 
   53             INPUT_DATA_FILTERS[
'Data Tag'][
'bhabha_all_calib'],
 
   54             INPUT_DATA_FILTERS[
'Data Quality Tag'][
'Good Or Recoverable'],
 
   55             INPUT_DATA_FILTERS[
'Magnet'][
'On'],
 
   56             INPUT_DATA_FILTERS[
'Beam Energy'][
'4S'],
 
   57             INPUT_DATA_FILTERS[
'Beam Energy'][
'Continuum'],
 
   58             INPUT_DATA_FILTERS[
'Beam Energy'][
'Scan']]},
 
   62 def get_calibrations(input_data, **kwargs):
 
   63     """ REQUIRED FUNCTION used by b2caf-prompt-run tool 
   64         This function return a list of Calibration 
   65         objects we assign to the CAF process 
   69     file_to_iov_physics = input_data[
"bhabha_all_calib"]
 
   71     expert_config = kwargs.get(
"expert_config")
 
   72     calib_mode = expert_config[
"calib_mode"]
 
   75     fulldataMode = expert_config[
"calib_datamode"]
 
   76     adjustment = expert_config[
"adjustment"]
 
   79         input_files_rungain = list(file_to_iov_physics.keys())
 
   80         input_files_coscorr = list(file_to_iov_physics.keys())
 
   81         input_files_wiregain = list(file_to_iov_physics.keys())
 
   85         maxevt_rg = expert_config[
"maxevt_rg"]
 
   86         maxevt_cc = expert_config[
"maxevt_cc"]
 
   87         maxevt_wg = expert_config[
"maxevt_wg"]
 
   89         from prompt.utils import filter_by_max_events_per_run, filter_by_select_max_events_from_files
 
   92         max_files_for_maxevents = maxevt_rg  
 
   93         reduced_file_to_iov_rungain = filter_by_max_events_per_run(file_to_iov_physics, max_files_for_maxevents, 
True)
 
   94         input_files_rungain = list(reduced_file_to_iov_rungain.keys())
 
   95         basf2.B2INFO(f
"Total number of files used for rungains = {len(input_files_rungain)}")
 
   98         input_files_coscorr = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_cc)
 
   99         basf2.B2INFO(f
"Total number of files used for cosine = {len(input_files_coscorr)}")
 
  100         if not input_files_coscorr:
 
  102                 f
"Cosine: all requested ({maxevt_cc}) events not found")
 
  105         if maxevt_wg == maxevt_cc:
 
  106             input_files_wiregain = input_files_coscorr
 
  108             input_files_wiregain = filter_by_select_max_events_from_files(list(file_to_iov_physics.keys()), maxevt_wg)
 
  110         basf2.B2INFO(f
"Total number of files used for wiregains = {len(input_files_wiregain)}")
 
  111         if not input_files_wiregain:
 
  113                 f
"WireGain: all requested ({maxevt_wg}) events not found")
 
  115     requested_iov = kwargs.get(
"requested_iov", 
None)
 
  116     from caf.utils 
import ExpRun, IoV
 
  117     output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
 
  119     payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
 
  120     payload_boundaries.extend([ExpRun(*boundary) 
for boundary 
in expert_config[
"payload_boundaries"]])
 
  121     basf2.B2INFO(f
"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
 
  123     collector_granularity = 
'all' 
  124     if expert_config[
"payload_boundaries"] 
is not None:
 
  125         basf2.B2INFO(
'Found payload_boundaries: set collector granularity to run')
 
  126         collector_granularity = 
'run' 
  128     if calib_mode == 
"full":
 
  129         calibration_procedure = {
 
  142     elif calib_mode == 
"quick":
 
  143         calibration_procedure = {
 
  153     elif calib_mode == 
"manual":
 
  154         calibration_procedure = expert_config[
"calibration_procedure"]
 
  156         basf2.B2FATAL(f
"Calibration mode is not defined {calib_mode}, should be full, quick, or manual")
 
  158     calib_keys = list(calibration_procedure)
 
  159     cals = [
None]*len(calib_keys)
 
  160     basf2.B2INFO(f
"Run calibration mode = {calib_mode}:")
 
  162     for i 
in range(len(cals)):
 
  163         max_iter = calibration_procedure[calib_keys[i]]
 
  165         data_files = [input_files_rungain, input_files_coscorr, input_files_wiregain]
 
  166         cal_name = 
''.join([i 
for i 
in calib_keys[i] 
if not i.isdigit()])
 
  167         if cal_name == 
"rungain":
 
  168             alg = [rungain_algo(calib_keys[i], adjustment)]
 
  169         elif cal_name == 
"coscorr":
 
  171         elif cal_name == 
"cosedge":
 
  172             alg = [cosedge_algo()]
 
  173         elif cal_name == 
"timegain":
 
  174             alg = [injection_time_algo()]
 
  175         elif cal_name == 
"badwire":
 
  176             alg = [badwire_algo()]
 
  177         elif cal_name == 
"wiregain":
 
  178             alg = [wiregain_algo()]
 
  179         elif cal_name == 
"onedcell":
 
  180             alg = [onedcell_algo()]
 
  182             basf2.B2FATAL(f
"The calibration is not defined, check spelling: calib {i}: {calib_keys[i]}")
 
  184         basf2.B2INFO(f
"calibration for {calib_keys[i]} with number of iteration={max_iter}")
 
  186         cals[i] = CDCDedxCalibration(name=calib_keys[i],
 
  188                                      input_file_dict=data_files,
 
  189                                      max_iterations=max_iter,
 
  190                                      collector_granularity=collector_granularity,
 
  191                                      dependencies=[cals[i-1]] 
if i > 0 
else None 
  193         if payload_boundaries:
 
  194             basf2.B2INFO(
"Found payload_boundaries: calibration strategies set to SequentialBoundaries.")
 
  195             if cal_name == 
"rungain" or cal_name == 
"timegain":
 
  196                 cals[i].strategies = SequentialRunByRun
 
  197                 for algorithm 
in cals[i].algorithms:
 
  198                     algorithm.params = {
"iov_coverage": output_iov}
 
  199                 if calib_keys[i] == 
"rungain0" or calib_keys[i] == 
"rungain1" or calib_keys[i] == 
"timegain0":
 
  200                     cals[i].save_payloads = 
False 
  202                 cals[i].strategies = SequentialBoundaries
 
  203                 for algorithm 
in cals[i].algorithms:
 
  204                     algorithm.params = {
"iov_coverage": output_iov, 
"payload_boundaries": payload_boundaries}
 
  205                 if calib_keys[i] == 
"coscorr0":
 
  206                     cals[i].save_payloads = 
False 
  209             for algorithm 
in cals[i].algorithms:
 
  210                 algorithm.params = {
"apply_iov": output_iov}
 
  216 def pre_collector(name='rg'):
 
  218     Define pre collection. 
  220         name : name of the calibration 
  221                            rungain rungain0 by Default. 
  223         path : path for pre collection 
  226     reco_path = basf2.create_path()
 
  227     recon.prepare_cdst_analysis(path=reco_path)
 
  228     if (name == 
"timegain" or name == 
"onedcell"):
 
  229         trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_radee"])
 
  230         trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
 
  231         ps_bhabhaskim = reco_path.add_module(
"Prescale", prescale=0.80)
 
  232         ps_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
 
  234     elif (name == 
"cosedge"):
 
  235         trg_bhabhaskim = reco_path.add_module(
 
  238                 "software_trigger_cut&skim&accept_bhabha",
 
  239                 "software_trigger_cut&filter&ee_flat_90_180",
 
  240                 "software_trigger_cut&filter&ee_flat_0_19"])
 
  241         trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
 
  243         trg_bhabhaskim = reco_path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&skim&accept_bhabha"])
 
  244         trg_bhabhaskim.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
 
  246     reco_path.add_module(
 
  248         relativeCorrections=
False,
 
  261 def collector(granularity='all', name=''):
 
  263     Create a cdcdedx calibration collector 
  265         name : name of calibration 
  266         granularity : granularity : all or run 
  268         collector : collector module 
  271     from basf2 
import register_module
 
  272     col = register_module(
'CDCDedxElectronCollector', cleanupCuts=
True)
 
  273     if name == 
"timegain":
 
  274         CollParam = {
'isRun': 
True, 
'isInjTime': 
True, 
'granularity': 
'run'}
 
  276     elif name == 
"coscorr" or name == 
"cosedge":
 
  277         CollParam = {
'isCharge': 
True, 
'isCosth': 
True, 
'granularity': granularity}
 
  279     elif name == 
"badwire":
 
  281         CollParam = {
'isWire': 
True, 
'isDedxhit': isHit, 
'isADCcorr': 
not isHit, 
'granularity': granularity}
 
  283     elif name == 
"wiregain":
 
  284         CollParam = {
'isWire': 
True, 
'isDedxhit': 
True, 
'granularity': granularity}
 
  286     elif name == 
"onedcell":
 
  293             'granularity': granularity}
 
  296         CollParam = {
'isRun': 
True, 
'granularity': 
'run'}
 
  304 def rungain_algo(name, adjustment):
 
  306     Create a rungain calibration algorithm. 
  308         algo : rungain algorithm 
  310     algo = CDCDedxRunGainAlgorithm()
 
  311     algo.setMonitoringPlots(
True)
 
  312     if name == 
"rungain2":
 
  313         algo.setAdjustment(adjustment)
 
  319 def injection_time_algo():
 
  321     Create a injection time calibration algorithm. 
  323         algo : injection time algorithm 
  325     algo = CDCDedxInjectTimeAlgorithm()
 
  326     algo.setMonitoringPlots(
True)
 
  334     Create a cosine calibration algorithm. 
  336         algo : cosine algorithm 
  338     algo = CDCDedxCosineAlgorithm()
 
  339     algo.setMonitoringPlots(
True)
 
  347     Create a cosine edge calibration algorithm. 
  349         algo : cosine edge algorithm 
  351     algo = CDCDedxCosEdgeAlgorithm()
 
  352     algo.setMonitoringPlots(
True)
 
  360     Create a badwire calibration algorithm. 
  362         algo : badwire algorithm 
  364     algo = CDCDedxBadWireAlgorithm()
 
  366     algo.setHighFracThres(0.2)
 
  367     algo.setMeanThres(0.4)
 
  368     algo.setRMSThres(0.4)
 
  369     algo.setHistPars(150, 0, 5)
 
  370     algo.setMonitoringPlots(
True)
 
  378     Create a wire gain calibration algorithm. 
  380         algo : wiregain algorithm 
  382     algo = CDCDedxWireGainAlgorithm()
 
  383     algo.enableExtraPlots(
True)
 
  389     Create oned cell calibration algorithim. 
  391         algo : oned cell correction algorithm 
  393     algo = CDCDedx1DCellAlgorithm()
 
  394     algo.enableExtraPlots(
True)
 
  395     algo.setMergePayload(
True)
 
  401     CDCDedxCalibration is a specialized calibration for cdcdedx. 
  410                  collector_granularity='All'):
 
  413             name: name of calibration 
  414             algorithims: algorithm of calibration 
  415             input_file_dict: input files list 
  416             max_iterations: maximum number of iterations 
  417             dependenices: depends on the previous calibration 
  418             collector_granularity: granularity : all or run 
  421                          algorithms=algorithms
 
  424         from caf.framework 
import Collection
 
  425         cal_name = 
''.join([i 
for i 
in name 
if not i.isdigit()])
 
  426         if cal_name == 
"badwire" or cal_name == 
"wiregain":
 
  427             collection = 
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
 
  428                                     input_files=input_file_dict[2],
 
  429                                     pre_collector_path=pre_collector(cal_name)
 
  431         elif cal_name == 
"coscorr" or cal_name == 
"cosedge" or cal_name == 
"onedcell":
 
  432             collection = 
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
 
  433                                     input_files=input_file_dict[1],
 
  434                                     pre_collector_path=pre_collector(cal_name)
 
  437             collection = 
Collection(collector=
collector(granularity=collector_granularity, name=cal_name),
 
  438                                     input_files=input_file_dict[0],
 
  439                                     pre_collector_path=pre_collector(cal_name)
 
  441         self.add_collection(name=cal_name, collection=collection)
 
  446         if dependencies 
is not None:
 
  447             for dep 
in dependencies:
 
max_iterations
maximum iterations
def __init__(self, name, algorithms, input_file_dict, max_iterations=5, dependencies=None, collector_granularity='All')