Skim using gbasf2
Skim using gbasf2#
Finally, let us look at the skim task whose output files make up the input mdst lists for the reconstruction.
# @cond
import b2luigi as luigi
from b2luigi.basf2_helper.tasks import Basf2PathTask
import os
import json
import basf2 as b2
import modularAnalysis as ma
import vertex as vx
class SkimTask(Basf2PathTask):
batch_system = "gbasf2"
gbasf2_project_name_prefix = luigi.Parameter(significant=False)
gbasf2_input_dataset = luigi.Parameter(hashed=True)
# gbasf2_release = "<release name>" #defaults to current basf2 release if not specified
gbasf2_print_status_updates = True
gbasf2_max_retries = 10
gbasf2_download_dataset = True
gbasf2_download_logs = False
runningOnMC = luigi.BoolParameter()
def output(self):
yield self.add_to_output("skim.udst.root") # udst.skimmed_udst_output will add ending .udst.root if different
def create_path(self):
mypath = b2.create_path()
ma.inputMdstList(filelist=self.gbasf2_input_dataset, path=mypath, entrySequences=['0:10'])
ma.fillParticleList(
decayString='K+:my',
cut="dr < 0.5 and abs(dz) < 3 and thetaInCDCAcceptance and kaonID > 0.01",
path=mypath)
ma.fillParticleList(decayString='pi+:my', cut="dr < 0.5 and abs(dz) < 3 and thetaInCDCAcceptance", path=mypath)
ma.reconstructDecay(decayString="D-:K2Pi -> K+:my pi-:my pi-:my", cut="1.5 < M < 2.2", path=mypath)
ma.reconstructDecay(decayString='B0:PiD-toK2Pi -> D-:K2Pi pi+:my', cut='5.0 < Mbc and abs(deltaE) < 1.0', path=mypath)
vx.treeFit('B0:PiD-toK2Pi', 0, path=mypath, updateAllDaughters=False, ipConstraint=True, massConstraint=[411])
ma.applyCuts('B0:PiD-toK2Pi', '5.2 < Mbc and abs(deltaE) < 0.5', path=mypath)
import udst
# dump in UDST format
# basf2 currently does not support pickling paths with skimmed udst outputs
udst.add_udst_output(path=mypath, filename="skim.udst.root", particleLists=['B0:PiD-toK2Pi'], mc=self.runningOnMC)
# udst.add_skimmed_udst_output(mypath, skimDecayMode="BtoPiD", skimParticleLists=['B0:PiD-toK2Pi'], mc=self.runningOnMC,
# outputFile="skim.udst.root" # WARNING: here do not use self.get_output_file_name
# )
return mypath
class BatchesToTextFile(luigi.Task):
batch_system = 'local'
skim = luigi.Parameter(hashed=True)
projectName = luigi.Parameter()
runningOnMC = luigi.BoolParameter()
NumBatches = 3
def requires(self):
yield SkimTask(
runningOnMC=self.runningOnMC,
gbasf2_project_name_prefix=self.projectName,
gbasf2_input_dataset=self.skim
)
def get_batch_file_names(self, key="skim.udst.root"):
inputdir = self._transform_input(self.input(), key)[0]
skimfiles = [f"{inputdir}/{file}" for file in os.listdir(inputdir)]
binwidth = int(len(skimfiles)/self.NumBatches)
batches = {}
for batch in range(self.NumBatches):
if(batch == self.NumBatches - 1):
batches.update({f"batch{batch}.json": list(skimfiles[binwidth*batch:])})
else:
batches.update({f"batch{batch}.json": list(skimfiles[binwidth*batch:binwidth*(batch+1)])})
return batches
def output(self):
for batch in range(self.NumBatches):
yield self.add_to_output(f"batch{batch}.json")
def run(self):
for key, file_list in self.get_batch_file_names().items():
if hasattr(self, "keys") and key not in self.keys:
continue
with open(self.get_output_file_name(key), "w+") as f:
f.write(json.dumps(file_list))
# @endcond
The BatchesToTextFile
task fills text files with lists of skim output mdst paths, where NumBatches
specifies the number of batches per skim. Instead of through text files, one could pass a batch directly to the reconstruction by merging the corresponding skim output files. However direct, this would require a significant amount of additional storage space.
The SkimTask
task again employs b2luigi.basf2_helper.Basf2PathTask
to run a minimal steering script on the specified datasets and has a number of parameters specific to gbasf2, such as batch_system = "gbasf2"
. b2luigi will automatically setup a proxy, reschedule failed gbasf2 jobs a maximum of gbasf2_max_retries
times, download the skims and check for their completeness. Make sure that you are using the latest release of b2luigi, which may not be on the python package index.