Belle II Software  release-08-01-10
skim.py
1 # @cond
2 import b2luigi as luigi
3 from b2luigi.basf2_helper.tasks import Basf2PathTask
4 
5 import os
6 import json
7 import basf2 as b2
8 import modularAnalysis as ma
9 import vertex as vx
10 
11 
12 class SkimTask(Basf2PathTask):
13  batch_system = "gbasf2"
14  gbasf2_project_name_prefix = luigi.Parameter(significant=False)
15  gbasf2_input_dataset = luigi.Parameter(hashed=True)
16 # gbasf2_release = "<release name>" #defaults to current basf2 release if not specified
17  gbasf2_print_status_updates = True
18  gbasf2_max_retries = 10
19  gbasf2_download_dataset = True
20  gbasf2_download_logs = False
21 
22  runningOnMC = luigi.BoolParameter()
23 
24  def output(self):
25  yield self.add_to_output("skim.udst.root") # udst.skimmed_udst_output will add ending .udst.root if different
26 
27  def create_path(self):
28  mypath = b2.create_path()
29  ma.inputMdstList(filelist=self.gbasf2_input_dataset, path=mypath, entrySequences=['0:10'])
30 
31  ma.fillParticleList(
32  decayString='K+:my',
33  cut="dr < 0.5 and abs(dz) < 3 and thetaInCDCAcceptance and kaonID > 0.01",
34  path=mypath)
35  ma.fillParticleList(decayString='pi+:my', cut="dr < 0.5 and abs(dz) < 3 and thetaInCDCAcceptance", path=mypath)
36 
37  ma.reconstructDecay(decayString="D-:K2Pi -> K+:my pi-:my pi-:my", cut="1.5 < M < 2.2", path=mypath)
38 
39  ma.reconstructDecay(decayString='B0:PiD-toK2Pi -> D-:K2Pi pi+:my', cut='5.0 < Mbc and abs(deltaE) < 1.0', path=mypath)
40  vx.treeFit('B0:PiD-toK2Pi', 0, path=mypath, updateAllDaughters=False, ipConstraint=True, massConstraint=[411])
41  ma.applyCuts('B0:PiD-toK2Pi', '5.2 < Mbc and abs(deltaE) < 0.5', path=mypath)
42 
43  import udst
44  # dump in UDST format
45  # basf2 currently does not support pickling paths with skimmed udst outputs
46  udst.add_udst_output(path=mypath, filename="skim.udst.root", particleLists=['B0:PiD-toK2Pi'], mc=self.runningOnMC)
47 
48 # udst.add_skimmed_udst_output(mypath, skimDecayMode="BtoPiD", skimParticleLists=['B0:PiD-toK2Pi'], mc=self.runningOnMC,
49 # outputFile="skim.udst.root" # WARNING: here do not use self.get_output_file_name
50 # )
51  return mypath
52 
53 
54 class BatchesToTextFile(luigi.Task):
55  batch_system = 'local'
56  skim = luigi.Parameter(hashed=True)
57  projectName = luigi.Parameter()
58  runningOnMC = luigi.BoolParameter()
59  NumBatches = 3
60 
61  def requires(self):
62  yield SkimTask(
63  runningOnMC=self.runningOnMC,
64  gbasf2_project_name_prefix=self.projectName,
65  gbasf2_input_dataset=self.skim
66  )
67 
68  def get_batch_file_names(self, key="skim.udst.root"):
69  inputdir = self._transform_input(self.input(), key)[0]
70  skimfiles = [f"{inputdir}/{file}" for file in os.listdir(inputdir)]
71 
72  binwidth = int(len(skimfiles)/self.NumBatches)
73 
74  batches = {}
75  for batch in range(self.NumBatches):
76  if(batch == self.NumBatches - 1):
77  batches.update({f"batch{batch}.json": list(skimfiles[binwidth*batch:])})
78  else:
79  batches.update({f"batch{batch}.json": list(skimfiles[binwidth*batch:binwidth*(batch+1)])})
80  return batches
81 
82  def output(self):
83  for batch in range(self.NumBatches):
84  yield self.add_to_output(f"batch{batch}.json")
85 
86  def run(self):
87 
88  for key, file_list in self.get_batch_file_names().items():
89  if hasattr(self, "keys") and key not in self.keys:
90  continue
91 
92  with open(self.get_output_file_name(key), "w+") as f:
93  f.write(json.dumps(file_list))
94 # @endcond
def add_udst_output(path, filename, particleLists=None, additionalBranches=None, dataDescription=None, mc=True)
Definition: udst.py:27