#!/usr/bin/env python3
# -*- coding: utf-8 -*-
##########################################################################
# basf2 (Belle II Analysis Software Framework) #
# Author: The Belle II Collaboration #
# #
# See git log for contributors and copyright holders. #
# This file is licensed under LGPL-3.0, see LICENSE.md. #
##########################################################################
"""
Miscellaneous utility functions for skim experts.
"""
import subprocess
import json
import re
from pathlib import Path
from skim.registry import Registry
[docs]def get_eventN(filename):
"""
Retrieve the number of events in a file using ``b2file-metadata-show``.
Parameters:
filename (str): File to get number of events from.
Returns:
int: Number of events in the file.
"""
return int(get_file_metadata(filename)["nEvents"])
[docs]def resolve_skim_modules(SkimsOrModules, *, LocalModule=None):
"""
Produce an ordered list of skims, by expanding any Python skim module names into a
list of skims in that module. Also produce a dict of skims grouped by Python module.
Raises:
RuntimeError: Raised if a skim is listed twice.
ValueError: Raised if ``LocalModule`` is passed and skims are normally expected
from more than one module.
"""
skims = []
for name in SkimsOrModules:
if name in Registry.names:
skims.append(name)
elif name in Registry.modules:
skims.extend(Registry.get_skims_in_module(name))
duplicates = set([skim for skim in skims if skims.count(skim) > 1])
if duplicates:
raise RuntimeError(
f"Skim{'s'*(len(duplicates)>1)} requested more than once: {', '.join(duplicates)}"
)
modules = sorted({Registry.get_skim_module(skim) for skim in skims})
if LocalModule:
if len(modules) > 1:
raise ValueError(
f"Local module {LocalModule} specified, but the combined skim expects "
"skims from more than one module. No steering file written."
)
modules = {LocalModule.rstrip(".py"): sorted(skims)}
else:
modules = {
module: sorted(
[skim for skim in skims if Registry.get_skim_module(skim) == module]
)
for module in modules
}
return skims, modules
class _hashable_list(list):
def __hash__(self):
return hash(tuple(self))
def _sphinxify_decay(decay_string):
"""Format the given decay string by using LaTeX commands instead of plain-text.
Output is formatted for use with Sphinx (ReStructured Text).
This is a utility function for autogenerating skim documentation.
Parameters:
decay_string (str): A decay descriptor.
Returns:
sphinxed_string (str): LaTeX version of the decay descriptor.
"""
decay_string = re.sub("^(B.):generic", "\\1_{\\\\text{had}}", decay_string)
decay_string = decay_string.replace(":generic", "")
decay_string = decay_string.replace(":semileptonic", "_{\\text{SL}}")
decay_string = decay_string.replace(":FSP", "_{FSP}")
decay_string = decay_string.replace(":V0", "_{V0}")
decay_string = re.sub("_[0-9]+", "", decay_string)
# Note: these are applied from top to bottom, so if you have
# both B0 and anti-B0, put anti-B0 first.
substitutes = [
("==>", "\\to"),
("->", "\\to"),
("gamma", "\\gamma"),
("p+", "p"),
("anti-p-", "\\bar{p}"),
("pi+", "\\pi^+"),
("pi-", "\\pi^-"),
("pi0", "\\pi^0"),
("K_S0", "K^0_S"),
("K_L0", "K^0_L"),
("mu+", "\\mu^+"),
("mu-", "\\mu^-"),
("tau+", "\\tau^+"),
("tau-", "\\tau^-"),
("nu", "\\nu"),
("K+", "K^+"),
("K-", "K^-"),
("e+", "e^+"),
("e-", "e^-"),
("J/psi", "J/\\psi"),
("anti-Lambda_c-", "\\Lambda^{-}_{c}"),
("anti-Sigma+", "\\overline{\\Sigma}^{+}"),
("anti-Lambda0", "\\overline{\\Lambda}^{0}"),
("anti-D0*", "\\overline{D}^{0*}"),
("anti-D*0", "\\overline{D}^{0*}"),
("anti-D0", "\\overline{D}^0"),
("anti-B0", "\\overline{B}^0"),
("Sigma+", "\\Sigma^{+}"),
("Lambda_c+", "\\Lambda^{+}_{c}"),
("Lambda0", "\\Lambda^{0}"),
("D+", "D^+"),
("D-", "D^-"),
("D0", "D^0"),
("D*+", "D^{+*}"),
("D*-", "D^{-*}"),
("D*0", "D^{0*}"),
("D_s+", "D^+_s"),
("D_s-", "D^-_s"),
("D_s*+", "D^{+*}_s"),
("D_s*-", "D^{-*}_s"),
("B+", "B^+"),
("B-", "B^-"),
("B0", "B^0"),
("B_s0", "B^0_s"),
("K*0", "K^{0*}"),
]
tex_string = decay_string
for (key, value) in substitutes:
tex_string = tex_string.replace(key, value)
return f":math:`{tex_string}`"
[docs]def dry_run_steering_file(SteeringFile):
"""
Check if the steering file at the given path can be run with the "--dry-run" option.
"""
proc = subprocess.run(
["basf2", "--dry-run", "-i", "i.root", "-o", "o.root", str(SteeringFile)],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
if proc.returncode != 0:
stdout = proc.stdout.decode("utf-8")
stderr = proc.stderr.decode("utf-8")
raise RuntimeError(
f"An error occured while dry-running steering file {SteeringFile}\n"
f"Script output:\n{stdout}\n{stderr}"
)