Belle II Software  release-06-01-15
core.py
1 #!/usr/bin/env python3
2 
3 
10 
11 # @cond SUPPRESS_DOXYGEN
12 
13 """
14  The Full Event Interpretation Algorithm
15 
16  Some basic facts:
17  - The algorithm will automatically reconstruct B mesons and calculate a signal probability for each candidate.
18  - It can be used for hadronic and semileptonic tagging.
19  - The algorithm has to be trained on MC, and can afterwards be applied on data.
20  - The training requires O(100) million MC events
21  - The weight files are stored in the Belle 2 Condition database
22 
23  Read this file if you want to understand the technical details of the FEI.
24 
25  The FEI follows a hierarchical approach.
26  There are 7 stages:
27  (Stage -1: Write out information about the provided data sample)
28  Stage 0: Final State Particles (FSP)
29  Stage 1: pi0, J/Psi, Lambda0
30  Stage 2: K_S0, Sigma+
31  Stage 3: D and Lambda_c mesons
32  Stage 4: D* mesons
33  Stage 5: B mesons
34  Stage 6: Finish
35 
36  Most stages consists of:
37  - Create Particle Candidates
38  - Apply Cuts
39  - Do vertex Fitting
40  - Apply a multivariate classification method
41  - Apply more Cuts
42 
43  The FEI will reconstruct these 7 stages during the training phase,
44  since the stages depend on one another, you have to run basf2 multiple (7) times on the same data
45  to train all the necessary multivariate classifiers.
46 """
47 
48 # FEI defines own command line options, therefore we disable
49 # the ROOT command line options, which otherwise interfere sometimes.
50 from ROOT import PyConfig
51 PyConfig.IgnoreCommandLineOptions = True # noqa
52 
53 # FEI uses multi-threading for parallel execution of tasks therefore
54 # the ROOT gui-thread is disabled, which otherwise interferes sometimes
55 PyConfig.StartGuiThread = False # noqa
56 import ROOT
57 from ROOT import Belle2
58 
59 # Import basf2
60 import basf2
61 from basf2 import B2INFO, B2WARNING
62 import pybasf2
63 import modularAnalysis as ma
64 import b2bii
65 
66 import basf2_mva
67 
68 # Should come after basf2 import
69 import pdg
70 
71 from fei import config
72 
73 # Standard python modules
74 import collections
75 import os
76 import shutil
77 import typing
78 import pickle
79 import re
80 import functools
81 import subprocess
82 import multiprocessing
83 
84 # Simple object containing the output of fei
85 FeiState = collections.namedtuple('FeiState', 'path, stage, plists')
86 
87 
88 class TrainingDataInformation:
89  """
90  Contains the relevant information about the used training data.
91  Basically we write out the number of MC particles in the whole dataset.
92  This numbers we can use to calculate what fraction of candidates we have to write
93  out as TrainingData to get a reasonable amount of candidates to train on
94  (too few candidates will lead to heavy overtraining, too many won't fit into memory).
95  Secondly we can use this information for the generation of the monitoring pdfs,
96  where we calculate reconstruction efficiencies.
97  """
98 
99  def __init__(self, particles: typing.Sequence[config.Particle]):
100  """
101  Create a new TrainingData object
102  @param particles list of config.Particle objects
103  """
104 
105  self.particles = particles
106 
107  self.filename = 'mcParticlesCount.root'
108 
109  def available(self) -> bool:
110  """
111  Check if the relevant information is already available
112  """
113  return os.path.isfile(self.filename)
114 
115  def reconstruct(self) -> pybasf2.Path:
116  """
117  Returns pybasf2.Path which counts the number of MCParticles in each event.
118  @param particles list of config.Particle objects
119  """
120  # Unique absolute pdg-codes of all particles
121  pdgs = {abs(pdg.from_name(particle.name)) for particle in self.particles}
122 
123  path = basf2.create_path()
124  module = basf2.register_module('VariablesToHistogram')
125  module.set_name("VariablesToHistogram_MCCount")
126  module.param('variables', [(f'NumberOfMCParticlesInEvent({pdg})', 100, -0.5, 99.5) for pdg in pdgs])
127  module.param('fileName', self.filename)
128  path.add_module(module)
129  return path
130 
131  def get_mc_counts(self):
132  """
133  Read out the number of MC particles from the file created by reconstruct
134  """
135  # Unique absolute pdg-codes of all particles
136  root_file = ROOT.TFile.Open(self.filename, 'read')
137  mc_counts = {}
138 
140 
141  for key in root_file.GetListOfKeys():
142  variable = Belle2.invertMakeROOTCompatible(key.GetName())
143  pdg = abs(int(variable[len('NumberOfMCParticlesInEvent('):-len(")")]))
144  hist = key.ReadObj()
145  mc_counts[pdg] = {}
146  mc_counts[pdg]['sum'] = sum(hist.GetXaxis().GetBinCenter(bin + 1) * hist.GetBinContent(bin + 1)
147  for bin in range(hist.GetNbinsX()))
148  mc_counts[pdg]['std'] = hist.GetStdDev()
149  mc_counts[pdg]['avg'] = hist.GetMean()
150  mc_counts[pdg]['max'] = hist.GetXaxis().GetBinCenter(hist.FindLastBinAbove(0.0))
151  mc_counts[pdg]['min'] = hist.GetXaxis().GetBinCenter(hist.FindFirstBinAbove(0.0))
152 
153  mc_counts[0] = {}
154  mc_counts[0]['sum'] = hist.GetEntries()
155  root_file.Close()
156  return mc_counts
157 
158 
159 class FSPLoader:
160  """
161  Steers the loading of FSP particles.
162  This does NOT include RootInput, Geometry or anything required before loading FSPs,
163  the user has to add this himself (because it depends on the MC campaign and if you want
164  to use Belle 1 or Belle 2).
165  """
166 
167  def __init__(self, particles: typing.Sequence[config.Particle], config: config.FeiConfiguration):
168  """
169  Create a new FSPLoader object
170  @param particles list of config.Particle objects
171  @param config config.FeiConfiguration object
172  """
173 
174  self.particles = particles
175 
176  self.config = config
177 
178  def reconstruct(self) -> pybasf2.Path:
179  """
180  Returns pybasf2.Path which loads the FSP Particles
181  """
182  path = basf2.create_path()
183 
184  if b2bii.isB2BII():
185  ma.fillParticleLists([('K+:FSP', ''), ('pi+:FSP', ''), ('e+:FSP', ''),
186  ('mu+:FSP', ''), ('p+:FSP', '')], writeOut=True, path=path)
187  for outputList, inputList in [('gamma:FSP', 'gamma:mdst'), ('K_S0:V0', 'K_S0:mdst'),
188  ('Lambda0:V0', 'Lambda0:mdst'), ('K_L0:FSP', 'K_L0:mdst'),
189  ('pi0:FSP', 'pi0:mdst'), ('gamma:V0', 'gamma:v0mdst')]:
190  ma.copyParticles(outputList, inputList, writeOut=True, path=path)
191  else:
192  ma.fillParticleLists([('K+:FSP', ''), ('pi+:FSP', ''), ('e+:FSP', ''),
193  ('mu+:FSP', ''), ('gamma:FSP', ''),
194  ('p+:FSP', ''), ('K_L0:FSP', '')], writeOut=True, loadPhotonBeamBackgroundMVA=False, path=path)
195  ma.fillParticleList('K_S0:V0 -> pi+ pi-', '', writeOut=True, path=path)
196  ma.fillParticleList('Lambda0:V0 -> p+ pi-', '', writeOut=True, path=path)
197  ma.fillConvertedPhotonsList('gamma:V0 -> e+ e-', '', writeOut=True, path=path)
198 
199  if self.config.monitor:
200  names = ['e+', 'K+', 'pi+', 'mu+', 'gamma', 'K_S0', 'p+', 'K_L0', 'Lambda0', 'pi0']
201  filename = 'Monitor_FSPLoader.root'
202  pdgs = {abs(pdg.from_name(name)) for name in names}
203  variables = [(f'NumberOfMCParticlesInEvent({pdg})', 100, -0.5, 99.5) for pdg in pdgs]
204  ma.variablesToHistogram('', variables=variables, filename=filename, path=path)
205  return path
206 
207 
208 class TrainingData:
209  """
210  Steers the creation of the training data.
211  The training data is used to train a multivariate classifier for each channel.
212  The training of the FEI at its core is just generating this training data for each channel.
213  After we created the training data for a stage, we have to train the classifiers (see Teacher class further down).
214  """
215 
216  def __init__(self, particles: typing.Sequence[config.Particle], config: config.FeiConfiguration,
217  mc_counts: typing.Mapping[int, typing.Mapping[str, float]]):
218  """
219  Create a new TrainingData object
220  @param particles list of config.Particle objects
221  @param config config.FeiConfiguration object
222  @param mc_counts containing number of MC Particles
223  """
224 
225  self.particles = particles
226 
227  self.config = config
228 
229  self.mc_counts = mc_counts
230 
231  def reconstruct(self) -> pybasf2.Path:
232  """
233  Returns pybasf2.Path which creates the training data for the given particles
234  """
235  path = basf2.create_path()
236 
237  for particle in self.particles:
238  pdgcode = abs(pdg.from_name(particle.name))
239  nSignal = self.mc_counts[pdgcode]['sum']
240  # For D-Mesons we usually have a efficiency of 10^-3 including branching fraction
241  if pdgcode > 400:
242  nSignal /= 1000
243  # For B-Mesons we usually have a efficiency of 10^-4 including branching fraction
244  if pdgcode > 500:
245  nSignal /= 10000
246 
247  for channel in particle.channels:
248  filename = 'training_input.root'
249 
250  nBackground = self.mc_counts[0]['sum'] * channel.preCutConfig.bestCandidateCut
251  inverseSamplingRates = {}
252  # For some very pure channels (Jpsi), this sampling can be too aggressive and training fails.
253  # It can therefore be disabled in the preCutConfig.
254  if nBackground > Teacher.MaximumNumberOfMVASamples and not channel.preCutConfig.noBackgroundSampling:
255  inverseSamplingRates[0] = int(nBackground / Teacher.MaximumNumberOfMVASamples) + 1
256  if nSignal > Teacher.MaximumNumberOfMVASamples:
257  inverseSamplingRates[1] = int(nSignal / Teacher.MaximumNumberOfMVASamples) + 1
258 
259  spectators = [channel.mvaConfig.target]
260  if channel.mvaConfig.sPlotVariable is not None:
261  spectators.append(channel.mvaConfig.sPlotVariable)
262 
263  if self.config.monitor:
264  hist_variables = ['mcErrors', 'mcParticleStatus'] + channel.mvaConfig.variables + spectators
265  hist_variables_2d = [(x, channel.mvaConfig.target)
266  for x in channel.mvaConfig.variables + spectators if x is not channel.mvaConfig.target]
267  hist_filename = f'Monitor_TrainingData.root'
268  ma.variablesToHistogram(channel.name, variables=config.variables2binnings(hist_variables),
269  variables_2d=config.variables2binnings_2d(hist_variables_2d),
270  filename=config.removeJPsiSlash(hist_filename),
271  directory=config.removeJPsiSlash(f'{channel.label}'), path=path)
272 
273  teacher = basf2.register_module('VariablesToNtuple')
274  teacher.set_name('VariablesToNtuple_' + channel.name)
275  teacher.param('fileName', filename)
276  teacher.param('treeName', f'{channel.label} variables')
277  teacher.param('variables', channel.mvaConfig.variables + spectators)
278  teacher.param('particleList', channel.name)
279  teacher.param('sampling', (channel.mvaConfig.target, inverseSamplingRates))
280  path.add_module(teacher)
281  return path
282 
283 
284 class PreReconstruction:
285  """
286  Steers the reconstruction phase before the mva method was applied
287  It Includes:
288  - The ParticleCombination (for each particle and channel we create candidates using
289  the daughter candidates from the previous stages)
290  - MC Matching
291  - Vertex Fitting (this is the slowest part of the whole FEI, KFit is used by default,
292  but you can use fastFit as a drop-in replacement https://github.com/thomaskeck/FastFit/,
293  this will speed up the whole FEI by a factor 2-3)
294  """
295 
296  def __init__(self, particles: typing.Sequence[config.Particle], config: config.FeiConfiguration):
297  """
298  Create a new PreReconstruction object
299  @param particles list of config.Particle objects
300  @param config config.FeiConfiguration object
301  """
302 
303  self.particles = particles
304 
305  self.config = config
306 
307  def reconstruct(self) -> pybasf2.Path:
308  """
309  Returns pybasf2.Path which reconstructs the particles and does the vertex fitting if necessary
310  """
311  path = basf2.create_path()
312 
313  for particle in self.particles:
314  for channel in particle.channels:
315 
316  if len(channel.daughters) == 1:
317  ma.cutAndCopyList(channel.name, channel.daughters[0], channel.preCutConfig.userCut, writeOut=True, path=path)
318  v2EI = basf2.register_module('VariablesToExtraInfo')
319  v2EI.set_name('VariablesToExtraInfo_' + channel.name)
320  v2EI.param('particleList', channel.name)
321  v2EI.param('variables', {f'constant({channel.decayModeID})': 'decayModeID'})
322  # suppress warning that decay mode ID won't be overwritten if it already exists
323  v2EI.set_log_level(basf2.logging.log_level.ERROR)
324  path.add_module(v2EI)
325  else:
326  ma.reconstructDecay(channel.decayString, channel.preCutConfig.userCut, channel.decayModeID,
327  writeOut=True, path=path)
328  if self.config.monitor:
329  ma.matchMCTruth(channel.name, path=path)
330  bc_variable = channel.preCutConfig.bestCandidateVariable
331  hist_variables = [bc_variable, 'mcErrors', 'mcParticleStatus', channel.mvaConfig.target]
332  hist_variables_2d = [(bc_variable, channel.mvaConfig.target),
333  (bc_variable, 'mcErrors'),
334  (bc_variable, 'mcParticleStatus')]
335  filename = f'Monitor_PreReconstruction_BeforeRanking.root'
336  ma.variablesToHistogram(channel.name,
337  variables=config.variables2binnings(hist_variables),
338  variables_2d=config.variables2binnings_2d(hist_variables_2d),
339  filename=filename, directory=f'{channel.label}', path=path)
340 
341  if channel.preCutConfig.bestCandidateMode == 'lowest':
342  ma.rankByLowest(channel.name,
343  channel.preCutConfig.bestCandidateVariable,
344  channel.preCutConfig.bestCandidateCut,
345  'preCut_rank',
346  path=path)
347  elif channel.preCutConfig.bestCandidateMode == 'highest':
348  ma.rankByHighest(channel.name,
349  channel.preCutConfig.bestCandidateVariable,
350  channel.preCutConfig.bestCandidateCut,
351  'preCut_rank',
352  path=path)
353  else:
354  raise RuntimeError("Unknown bestCandidateMode " + repr(channel.preCutConfig.bestCandidateMode))
355 
356  if self.config.monitor:
357  filename = f'Monitor_PreReconstruction_AfterRanking.root'
358  hist_variables += ['extraInfo(preCut_rank)']
359  hist_variables_2d += [('extraInfo(preCut_rank)', channel.mvaConfig.target),
360  ('extraInfo(preCut_rank)', 'mcErrors'),
361  ('extraInfo(preCut_rank)', 'mcParticleStatus')]
362  ma.variablesToHistogram(channel.name,
363  variables=config.variables2binnings(hist_variables),
364  variables_2d=config.variables2binnings_2d(hist_variables_2d),
365  filename=filename, directory=f'{channel.label}', path=path)
366  # If we are not in monitor mode we do the mc matching now,
367  # otherwise we did it above already!
368  elif self.config.training:
369  ma.matchMCTruth(channel.name, path=path)
370 
371  if b2bii.isB2BII() and particle.name in ['K_S0', 'Lambda0']:
372  pvfit = basf2.register_module('ParticleVertexFitter')
373  pvfit.set_name('ParticleVertexFitter_' + channel.name)
374  pvfit.param('listName', channel.name)
375  pvfit.param('confidenceLevel', channel.preCutConfig.vertexCut)
376  pvfit.param('vertexFitter', 'KFit')
377  pvfit.param('fitType', 'vertex')
378  pvfit.set_log_level(basf2.logging.log_level.ERROR) # let's not produce gigabytes of uninteresting warnings
379  path.add_module(pvfit)
380  elif re.findall(r"[\w']+", channel.decayString).count('pi0') > 1 and particle.name != 'pi0':
381  basf2.B2INFO(f"Ignoring vertex fit for {channel.name} because multiple pi0 are not supported yet.")
382  elif len(channel.daughters) > 1:
383  pvfit = basf2.register_module('ParticleVertexFitter')
384  pvfit.set_name('ParticleVertexFitter_' + channel.name)
385  pvfit.param('listName', channel.name)
386  pvfit.param('confidenceLevel', channel.preCutConfig.vertexCut)
387  pvfit.param('vertexFitter', 'KFit')
388  pvfit.param('fitType', 'vertex')
389  pvfit.set_log_level(basf2.logging.log_level.ERROR) # let's not produce gigabytes of uninteresting warnings
390  path.add_module(pvfit)
391 
392  if self.config.monitor:
393  hist_variables = ['chiProb', 'mcErrors', 'mcParticleStatus', channel.mvaConfig.target]
394  hist_variables_2d = [('chiProb', channel.mvaConfig.target),
395  ('chiProb', 'mcErrors'),
396  ('chiProb', 'mcParticleStatus')]
397  filename = f'Monitor_PreReconstruction_AfterVertex.root'
398  ma.variablesToHistogram(channel.name,
399  variables=config.variables2binnings(hist_variables),
400  variables_2d=config.variables2binnings_2d(hist_variables_2d),
401  filename=filename, directory=f'{channel.label}', path=path)
402 
403  return path
404 
405 
406 class PostReconstruction:
407  """
408  Steers the reconstruction phase after the mva method was applied
409  It Includes:
410  - The application of the mva method itself.
411  - Copying all channel lists in a common one for each particle defined in particles
412  - Tag unique signal candidates, to avoid double counting of channels with overlap
413  """
414 
415  def __init__(self, particles: typing.Sequence[config.Particle], config: config.FeiConfiguration):
416  """
417  Create a new PostReconstruction object
418  @param particles list of config.Particle objects
419  @param config config.FeiConfiguration object
420  """
421 
422  self.particles = particles
423 
424  self.config = config
425 
426  def get_missing_channels(self) -> typing.Sequence[str]:
427  """
428  Returns all channels for which the weightfile is missing
429  """
430  missing = []
431  for particle in self.particles:
432  for channel in particle.channels:
433  # weightfile = self.config.prefix + '_' + channel.label
434  weightfile = channel.label + '.xml'
435  if not basf2_mva.available(weightfile):
436  missing += [channel.label]
437  return missing
438 
439  def available(self) -> bool:
440  """
441  Check if the relevant information is already available
442  """
443  return len(self.get_missing_channels()) == 0
444 
445  def reconstruct(self) -> pybasf2.Path:
446  """
447  Returns pybasf2.Path which reconstructs the particles and does the vertex fitting if necessary
448  """
449  path = basf2.create_path()
450 
451  for particle in self.particles:
452  for channel in particle.channels:
453  expert = basf2.register_module('MVAExpert')
454  expert.set_name('MVAExpert_' + channel.name)
455  if self.config.training:
456  expert.param('identifier', channel.label + '.xml')
457  else:
458  expert.param('identifier', self.config.prefix + '_' + channel.label)
459  expert.param('extraInfoName', 'SignalProbability')
460  expert.param('listNames', [channel.name])
461  # suppress warning that signal probability won't be overwritten if it already exists
462  expert.set_log_level(basf2.logging.log_level.ERROR)
463  path.add_module(expert)
464 
465  uniqueSignal = basf2.register_module('TagUniqueSignal')
466  uniqueSignal.param('particleList', channel.name)
467  uniqueSignal.param('target', channel.mvaConfig.target)
468  uniqueSignal.param('extraInfoName', 'uniqueSignal')
469  uniqueSignal.set_name('TagUniqueSignal_' + channel.name)
470  # suppress warning that unique signal extra info won't be overwritten if it already exists
471  uniqueSignal.set_log_level(basf2.logging.log_level.ERROR)
472  path.add_module(uniqueSignal)
473 
474  if self.config.monitor:
475  hist_variables = ['mcErrors', 'mcParticleStatus', 'extraInfo(uniqueSignal)', 'extraInfo(SignalProbability)',
476  channel.mvaConfig.target, 'extraInfo(decayModeID)']
477  hist_variables_2d = [('extraInfo(SignalProbability)', channel.mvaConfig.target),
478  ('extraInfo(SignalProbability)', 'mcErrors'),
479  ('extraInfo(SignalProbability)', 'mcParticleStatus'),
480  ('extraInfo(decayModeID)', channel.mvaConfig.target),
481  ('extraInfo(decayModeID)', 'mcErrors'),
482  ('extraInfo(decayModeID)', 'extraInfo(uniqueSignal)'),
483  ('extraInfo(decayModeID)', 'mcParticleStatus')]
484  filename = f'Monitor_PostReconstruction_AfterMVA.root'
485  ma.variablesToHistogram(channel.name,
486  variables=config.variables2binnings(hist_variables),
487  variables_2d=config.variables2binnings_2d(hist_variables_2d),
488  filename=filename, directory=f'{channel.label}', path=path)
489 
490  cutstring = ''
491  if particle.postCutConfig.value > 0.0:
492  cutstring = str(particle.postCutConfig.value) + ' < extraInfo(SignalProbability)'
493 
494  ma.mergeListsWithBestDuplicate(particle.identifier, [c.name for c in particle.channels],
495  variable='particleSource', writeOut=True, path=path)
496 
497  if self.config.monitor:
498  hist_variables = ['mcErrors', 'mcParticleStatus', 'extraInfo(uniqueSignal)', 'extraInfo(SignalProbability)',
499  particle.mvaConfig.target, 'extraInfo(decayModeID)']
500  hist_variables_2d = [('extraInfo(decayModeID)', particle.mvaConfig.target),
501  ('extraInfo(decayModeID)', 'mcErrors'),
502  ('extraInfo(decayModeID)', 'mcParticleStatus')]
503  filename = f'Monitor_PostReconstruction_BeforePostCut.root'
504  ma.variablesToHistogram(
505  particle.identifier,
506  variables=config.variables2binnings(hist_variables),
507  variables_2d=config.variables2binnings_2d(hist_variables_2d),
508  filename=config.removeJPsiSlash(filename),
509  directory=config.removeJPsiSlash(f'{particle.identifier}'),
510  path=path)
511 
512  ma.applyCuts(particle.identifier, cutstring, path=path)
513 
514  if self.config.monitor:
515  filename = f'Monitor_PostReconstruction_BeforeRanking.root'
516  ma.variablesToHistogram(
517  particle.identifier,
518  variables=config.variables2binnings(hist_variables),
519  variables_2d=config.variables2binnings_2d(hist_variables_2d),
520  filename=config.removeJPsiSlash(filename),
521  directory=config.removeJPsiSlash(f'{particle.identifier}'),
522  path=path)
523 
524  ma.rankByHighest(particle.identifier, 'extraInfo(SignalProbability)',
525  particle.postCutConfig.bestCandidateCut, 'postCut_rank', path=path)
526 
527  if self.config.monitor:
528  hist_variables += ['extraInfo(postCut_rank)']
529  hist_variables_2d += [('extraInfo(decayModeID)', 'extraInfo(postCut_rank)'),
530  (particle.mvaConfig.target, 'extraInfo(postCut_rank)'),
531  ('mcErrors', 'extraInfo(postCut_rank)'),
532  ('mcParticleStatus', 'extraInfo(postCut_rank)')]
533  filename = f'Monitor_PostReconstruction_AfterRanking.root'
534  ma.variablesToHistogram(
535  particle.identifier,
536  variables=config.variables2binnings(hist_variables),
537  variables_2d=config.variables2binnings_2d(hist_variables_2d),
538  filename=config.removeJPsiSlash(filename),
539  directory=config.removeJPsiSlash(f'{particle.identifier}'),
540  path=path)
541 
542  variables = ['extraInfo(SignalProbability)', 'mcErrors', 'mcParticleStatus', particle.mvaConfig.target,
543  'extraInfo(uniqueSignal)', 'extraInfo(decayModeID)']
544 
545  if 'B_s0' == particle.name:
546  variables += ['Mbc']
547  elif 'B' in particle.name:
548  variables += ['Mbc', 'cosThetaBetweenParticleAndNominalB']
549 
550  filename = f'Monitor_Final.root'
551  ma.variablesToNtuple(particle.identifier, variables, treename=config.removeJPsiSlash(
552  f'{particle.identifier} variables'), filename=config.removeJPsiSlash(filename), path=path)
553  return path
554 
555 
556 class Teacher:
557  """
558  Performs all necessary trainings for all training data files which are
559  available but where there is no weight file available yet.
560  This class is usually used by the do_trainings function below, to perform the necessary trainings after each stage.
561  The trainings are run in parallel using multi-threading of python.
562  Each training is done by a subprocess call, the training command (passed by config.externTeacher) can be either
563  * basf2_mva_teacher, the training will be done directly on the machine
564  * externClustTeacher, the training will be submitted to the batch system of KEKCC
565  """
566 
568  MaximumNumberOfMVASamples = int(1e7)
569 
571  MinimumNumberOfMVASamples = int(5e2)
572 
573  def __init__(self, particles: typing.Sequence[config.Particle], config: config.FeiConfiguration):
574  """
575  Create a new Teacher object
576  @param particles list of config.Particle objects
577  @param config config.FeiConfiguration object
578  """
579 
580  self.particles = particles
581 
582  self.config = config
583 
584  @staticmethod
585  def create_fake_weightfile(channel: str):
586  """
587  Create a fake weight file using the trivial method, it will always return 0.0
588  @param channel for which we create a fake weight file
589  """
590  content = f"""
591  <?xml version="1.0" encoding="utf-8"?>
592  <method>Trivial</method>
593  <weightfile>{channel}.xml</weightfile>
594  <treename>tree</treename>
595  <target_variable>isSignal</target_variable>
596  <weight_variable>__weight__</weight_variable>
597  <signal_class>1</signal_class>
598  <max_events>0</max_events>
599  <number_feature_variables>1</number_feature_variables>
600  <variable0>M</variable0>
601  <number_spectator_variables>0</number_spectator_variables>
602  <number_data_files>1</number_data_files>
603  <datafile0>train.root</datafile0>
604  <Trivial_version>1</Trivial_version>
605  <Trivial_output>0</Trivial_output>
606  <signal_fraction>0.066082567</signal_fraction>
607  """
608  with open(channel + ".xml", "w") as f:
609  f.write(content)
610 
611  @staticmethod
612  def check_if_weightfile_is_fake(filename: str):
613  """
614  Checks if the provided filename is a fake-weight file or not
615  @param filename the filename of the weight file
616  """
617  try:
618  return '<method>Trivial</method>' in open(filename).readlines()[2]
619  except BaseException:
620  return True
621  return True
622 
623  def upload(self, channel: str):
624  """
625  Upload the weight file into the condition database
626  @param channel whose weight file is uploaded
627  """
628  disk = channel + '.xml'
629  dbase = self.config.prefix + '_' + channel
630  basf2_mva.upload(disk, dbase)
631  return (disk, dbase)
632 
633  def do_all_trainings(self):
634  """
635  Do all trainings for which we find training data
636  """
637  job_list = []
638  filename = 'training_input.root'
639  if not os.path.isfile(filename):
640  B2WARNING("Training of MVC failed. Couldn't find ROOT file. "
641  "No weight files will be provided.")
642  else:
643  f = ROOT.TFile.Open(filename, 'read')
644  if f.IsZombie():
645  B2WARNING("Training of MVC failed. ROOT file corrupt. "
646  "No weight files will be provided.")
647  elif len([k.GetName() for k in f.GetListOfKeys()]) == 0:
648  B2WARNING("Training of MVC failed. ROOT file does not contain any trees. "
649  "No weight files will be provided.")
650  else:
651  for particle in self.particles:
652  for channel in particle.channels:
653  weightfile = channel.label + '.xml'
654  if not basf2_mva.available(weightfile):
655  keys = [m for m in f.GetListOfKeys() if f"{channel.label}" in m.GetName()]
656  if not keys:
657  continue
658  tree = keys[0].ReadObj()
659  nSig = tree.GetEntries(channel.mvaConfig.target + ' == 1.0')
660  nBg = tree.GetEntries(channel.mvaConfig.target + ' != 1.0')
661  if nSig < Teacher.MinimumNumberOfMVASamples:
662  B2WARNING("Training of MVC failed. "
663  f"Tree contains too few signal events {nSig}. Ignoring channel {channel}.")
664  self.create_fake_weightfile(channel.label)
665  self.upload(channel.label)
666  continue
667  if nBg < Teacher.MinimumNumberOfMVASamples:
668  B2WARNING("Training of MVC failed. "
669  f"Tree contains too few bckgrd events {nBg}. Ignoring channel {channel}.")
670  self.create_fake_weightfile(channel.label)
671  self.upload(channel.label)
672  continue
673  variable_str = "' '".join(channel.mvaConfig.variables)
674 
675  command = (f"{self.config.externTeacher}"
676  f" --method '{channel.mvaConfig.method}'"
677  f" --target_variable '{channel.mvaConfig.target}'"
678  f" --treename '{channel.label} variables' --datafile 'training_input.root'"
679  f" --signal_class 1 --variables '{variable_str}'"
680  f" --identifier '{channel.label}.xml'"
681  f" {channel.mvaConfig.config} > '{channel.label}'.log 2>&1")
682  B2INFO(f"Used following command to invoke teacher: \n {command}")
683  job_list.append((channel.label, command))
684  f.Close()
685  p = multiprocessing.Pool(None, maxtasksperchild=1)
686  func = functools.partial(subprocess.call, shell=True)
687  p.map(func, [c for _, c in job_list])
688  p.close()
689  p.join()
690  weightfiles = []
691  for name, _ in job_list:
692  if not basf2_mva.available(name + '.xml'):
693  B2WARNING("Training of MVC failed. For unknown reasons, check the logfile")
694  self.create_fake_weightfile(name)
695  weightfiles.append(self.upload(name))
696  return weightfiles
697 
698 
699 def convert_legacy_training(particles: typing.Sequence[config.Particle], configuration: config.FeiConfiguration):
700  """
701  Convert an old FEI training into the new format.
702  The old format used hashes for the weight files, the hashes can be converted to the new naming scheme
703  using the Summary.pickle file outputted by the FEIv3. This file must be passes by the parameter configuration.legacy.
704  @param particles list of config.Particle objects
705  @param config config.FeiConfiguration object
706  """
707  summary = pickle.load(open(configuration.legacy, 'rb'))
708  channel2lists = {k: v[2] for k, v in summary['channel2lists'].items()}
709 
710  teacher = Teacher(particles, configuration)
711 
712  for particle in particles:
713  for channel in particle.channels:
714  new_weightfile = configuration.prefix + '_' + channel.label
715  old_weightfile = configuration.prefix + '_' + channel2lists[channel.label.replace('Jpsi', 'J/psi')]
716  if not basf2_mva.available(new_weightfile):
717  if old_weightfile is None or not basf2_mva.available(old_weightfile):
718  Teacher.create_fake_weightfile(channel.label)
719  teacher.upload(channel.label)
720  else:
721  basf2_mva.download(old_weightfile, channel.label + '.xml')
722  teacher.upload(channel.label)
723 
724 
725 def get_stages_from_particles(particles: typing.Sequence[config.Particle]):
726  """
727  Returns the hierarchical structure of the FEI.
728  Each stage depends on the particles in the previous stage.
729  The final stage is empty (meaning everything is done, and the training is finished at this point).
730  @param particles list of config.Particle objects
731  """
732  stages = [
733  [p for p in particles if p.name in ['e+', 'K+', 'pi+', 'mu+', 'gamma', 'p+', 'K_L0']],
734  [p for p in particles if p.name in ['pi0', 'J/psi', 'Lambda0']],
735  [p for p in particles if p.name in ['K_S0', 'Sigma+']],
736  [p for p in particles if p.name in ['D+', 'D0', 'D_s+', 'Lambda_c+']],
737  [p for p in particles if p.name in ['D*+', 'D*0', 'D_s*+']],
738  [p for p in particles if p.name in ['B0', 'B+', 'B_s0']],
739  []
740  ]
741 
742  for p in particles:
743  if p.name not in [p.name for stage in stages for p in stage]:
744  raise RuntimeError(f"Unknown particle {p.name}: Not implemented in FEI")
745 
746  return stages
747 
748 
749 def do_trainings(particles: typing.Sequence[config.Particle], configuration: config.FeiConfiguration):
750  """
751  Performs the training of mva classifiers for all available training data,
752  this function must be either called by the user after each stage of the FEI during training,
753  or (more likely) is called by the distributed.py script after merging the outputs of all jobs,
754  @param particles list of config.Particle objects
755  @param config config.FeiConfiguration object
756  @return list of tuple with weight file on disk and identifier in database for all trained classifiers
757  """
758  teacher = Teacher(particles, configuration)
759  return teacher.do_all_trainings()
760 
761 
762 def save_summary(particles: typing.Sequence[config.Particle], configuration: config.FeiConfiguration, cache: int):
763  """
764  Creates the Summary.pickle, which is used to keep track of the stage during the training,
765  and can be used later to investigate which configuration was used exactly to create the training.
766  @param particles list of config.Particle objects
767  @param config config.FeiConfiguration object
768  @param cache current cache level
769  """
770  configuration = config.FeiConfiguration(configuration.prefix, cache,
771  configuration.monitor, configuration.legacy, configuration.externTeacher,
772  configuration.training)
773  # Backup existing Summary.pickle files
774  for i in [8, 7, 6, 5, 4, 3, 2, 1, 0]:
775  if os.path.isfile(f'Summary.pickle.backup_{i}'):
776  shutil.copyfile(f'Summary.pickle.backup_{i}', f'Summary.pickle.backup_{i + 1}')
777  if os.path.isfile('Summary.pickle'):
778  shutil.copyfile('Summary.pickle', 'Summary.pickle.backup_0')
779  pickle.dump((particles, configuration), open('Summary.pickle', 'wb'))
780 
781 
782 def get_path(particles: typing.Sequence[config.Particle], configuration: config.FeiConfiguration) -> FeiState:
783  """
784  The most important function of the FEI.
785  This creates the FEI path for training/fitting (both terms are equal), and application/inference (both terms are equal).
786  The whole FEI is defined by the particles which are reconstructed (see default_channels.py)
787  and the configuration (see config.py).
788 
789  TRAINING
790  For training this function is called multiple times, each time the FEI reconstructs one more stage in the hierarchical structure
791  i.e. we start with FSP, pi0, KS_0, D, D*, and with B mesons. You have to set configuration.training to True for training mode.
792  All weight files created during the training will be stored in your local database.
793  If you want to use the FEI training everywhere without copying this database by hand, you have to upload your local database
794  to the central database first (see documentation for the Belle2 Condition Database).
795 
796  APPLICATION
797  For application you call this function once, and it returns the whole path which will reconstruct B mesons
798  with an associated signal probability. You have to set configuration.training to False for application mode.
799 
800  MONITORING
801  You can always turn on the monitoring (configuration.monitor = True),
802  to write out ROOT Histograms of many quantities for each stage,
803  using these histograms you can use the printReporting.py or latexReporting.py scripts to automatically create pdf files.
804 
805  LEGACY
806  This function can also use old FEI trainings (version 3), just pass the Summary.pickle file of the old training,
807  and the weight files will be automatically converted to the new naming scheme.
808 
809  @param particles list of config.Particle objects
810  @param config config.FeiConfiguration object
811  """
812  print(r"""
813  ____ _ _ _ _ ____ _ _ ____ _ _ ___ _ _ _ ___ ____ ____ ___ ____ ____ ___ ____ ___ _ ____ _ _
814  |___ | | | | |___ | | |___ |\ | | | |\ | | |___ |__/ |__] |__/ |___ | |__| | | | | |\ |
815  | |__| |___ |___ |___ \/ |___ | \| | | | \| | |___ | \ | | \ |___ | | | | | |__| | \|
816 
817  Author: Thomas Keck 2014 - 2017
818  Please cite my PhD thesis
819  """)
820 
821  # The cache parameter of the configuration object is used during training to keep track,
822  # which reconstruction steps are already performed.
823  # For fitting/training we start by default with -1, meaning we still have to create the TrainingDataInformation,
824  # which is used to determine the number of candidates we have to write out for the FSP trainings in stage 0.
825  # For inference/application we start by default with 0, because we don't need the TrainingDataInformation in stage 0.
826  # During the training we save the particles and configuration (including the current cache stage) in the Summary.pickle object.
827  if configuration.cache is None:
828  if os.path.isfile('Summary.pickle'):
829  print("Cache: Replaced particles and configuration with the ones from Summary.pickle!")
830  particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
831  cache = configuration.cache
832  else:
833  if configuration.training:
834  cache = -1
835  else:
836  cache = 0
837  else:
838  cache = configuration.cache
839 
840  # Now we start building the training or application path
841  path = basf2.create_path()
842 
843  # There are in total 7 stages.
844  # For training we start with -1 and go to 7 one stage at a time
845  # For application we can run stage 0 to 7 at once
846  stages = get_stages_from_particles(particles)
847 
848  # If the user provided a Summary.pickle file of a FEIv3 training we
849  # convert the old weight files (with hashes), to the new naming scheme.
850  # Afterwards the algorithm runs as usual
851  if configuration.legacy is not None:
852  convert_legacy_training(particles, configuration)
853 
854  # During the training we require the number of MC particles in the whole processed
855  # data sample, because we don't want to write out billions of e.g. pion candidates.
856  # Knowing the total amount of MC particles we can write out only every e.g. 10th candidate
857  # That's why we have to write out the TrainingDataInformation before doing anything during the training phase.
858  # During application we only need this if we run in monitor mode, and want to write out a summary in the end,
859  # the summary contains efficiency, and the efficiency calculation requires the total number of MC particles.
860  training_data_information = TrainingDataInformation(particles)
861  if cache < 0 and configuration.training:
862  print("Stage 0: Run over all files to count the number of events and McParticles")
863  path.add_path(training_data_information.reconstruct())
864  if configuration.training:
865  save_summary(particles, configuration, 0)
866  return FeiState(path, 0, [])
867  elif not configuration.training and configuration.monitor:
868  path.add_path(training_data_information.reconstruct())
869 
870  # We load the Final State particles
871  # It is assumed that the user takes care of adding RootInput, Geometry, and everything
872  # which is required to read in data, so we directly start to load the FSP particles
873  # used by the FEI.
874  loader = FSPLoader(particles, configuration)
875  if cache < 1:
876  print("Stage 0: Load FSP particles")
877  path.add_path(loader.reconstruct())
878 
879  # Now we reconstruct each stage one after another.
880  # Each stage consists of two parts:
881  # PreReconstruction (before the mva method was applied):
882  # - Particle combination
883  # - Do vertex fitting
884  # - Some simple cuts and best candidate selection
885  # PostReconstruction (after the mva method was applied):
886  # - Apply the mva method
887  # - Apply cuts on the mva output and best candidate selection
888  #
889  # If the weight files for the PostReconstruction are not available for the current stage and we are in training mode,
890  # we have to create the training data. The training itself is done by the do_trainings function which is called
891  # either by the user after each step, or by the distributed.py script
892  #
893  # If the weight files for the PostReconstruction are not available for the current stage and we are not in training mode,
894  # we keep going, as soon as the user will call process on the produced path he will get an error message that the
895  # weight files are missing.
896  #
897  # Finally we keep track of the ParticleLists we use, so the user can run the RemoveParticles module to reduce the size of the
898  # intermediate output of RootOutput.
899  used_lists = []
900  for stage, stage_particles in enumerate(stages):
901  pre_reconstruction = PreReconstruction(stage_particles, configuration)
902  if cache <= stage:
903  print(f"Stage {stage}: PreReconstruct particles: ", [p.name for p in stage_particles])
904  path.add_path(pre_reconstruction.reconstruct())
905 
906  post_reconstruction = PostReconstruction(stage_particles, configuration)
907  if configuration.training and not post_reconstruction.available():
908  print(f"Stage {stage}: Create training data for particles: ", [p.name for p in stage_particles])
909  mc_counts = training_data_information.get_mc_counts()
910  training_data = TrainingData(stage_particles, configuration, mc_counts)
911  path.add_path(training_data.reconstruct())
912  used_lists += [channel.name for particle in stage_particles for channel in particle.channels]
913  break
914  if cache <= stage + 1:
915  path.add_path(post_reconstruction.reconstruct())
916  used_lists += [particle.identifier for particle in stage_particles]
917 
918  # If we run in monitor mode we are interested in the ModuleStatistics,
919  # these statistics contain the runtime for each module which was run.
920  if configuration.monitor:
921  output = basf2.register_module('RootOutput')
922  output.param('outputFileName', 'Monitor_ModuleStatistics.root')
923  output.param('branchNames', ['EventMetaData']) # cannot be removed, but of only small effect on file size
924  output.param('branchNamesPersistent', ['ProcessStatistics'])
925  output.param('ignoreCommandLineOverride', True)
926  path.add_module(output)
927 
928  # As mentioned above the FEI keeps track of the stages which are already reconstructed during the training
929  # so we write out the Summary.pickle here, and increase the stage by one.
930  if configuration.training or configuration.monitor:
931  save_summary(particles, configuration, stage + 1)
932 
933  # Finally we return the path, the stage and the used lists to the user.
934  return FeiState(path, stage + 1, plists=used_lists)
935 
936 # @endcond
def isB2BII()
Definition: b2bii.py:14
Global list of available variables.
Definition: Manager.h:98
std::string invertMakeROOTCompatible(std::string str)
Invert makeROOTCompatible operation.
def from_name(name)
Definition: pdg.py:62