Source code for skim.utils.misc
#!/usr/bin/env python3
##########################################################################
# basf2 (Belle II Analysis Software Framework)                           #
# Author: The Belle II Collaboration                                     #
#                                                                        #
# See git log for contributors and copyright holders.                    #
# This file is licensed under LGPL-3.0, see LICENSE.md.                  #
##########################################################################
"""
Miscellaneous utility functions for skim experts.
"""
import subprocess
import json
import re
from pathlib import Path
from skim.registry import Registry
[docs]
def get_file_metadata(filename):
    """
    Retrieve the metadata for a file using ``b2file-metadata-show``.
    Parameters:
       metadata (str): File to get number of events from.
    Returns:
        dict: Metadata of file in dict format.
    """
    if not Path(filename).exists():
        raise FileNotFoundError(f"Could not find file {filename}")
    proc = subprocess.run(
        ["b2file-metadata-show", "--json", str(filename)],
        stdout=subprocess.PIPE,
        check=True,
    )
    metadata = json.loads(proc.stdout.decode("utf-8"))
    return metadata
[docs]
def get_eventN(filename):
    """
    Retrieve the number of events in a file using ``b2file-metadata-show``.
    Parameters:
       filename (str): File to get number of events from.
    Returns:
        int: Number of events in the file.
    """
    return int(get_file_metadata(filename)["nEvents"])
[docs]
def resolve_skim_modules(SkimsOrModules, *, LocalModule=None):
    """
    Produce an ordered list of skims, by expanding any Python skim module names into a
    list of skims in that module. Also produce a dict of skims grouped by Python module.
    Raises:
        RuntimeError: Raised if a skim is listed twice.
        ValueError: Raised if ``LocalModule`` is passed and skims are normally expected
            from more than one module.
    """
    skims = []
    for name in SkimsOrModules:
        if name in Registry.names:
            skims.append(name)
        elif name in Registry.modules:
            skims.extend(Registry.get_skims_in_module(name))
    duplicates = {skim for skim in skims if skims.count(skim) > 1}
    if duplicates:
        raise RuntimeError(
            f"Skim{'s'*(len(duplicates)>1)} requested more than once: {', '.join(duplicates)}"
        )
    modules = sorted({Registry.get_skim_module(skim) for skim in skims})
    if LocalModule:
        if len(modules) > 1:
            raise ValueError(
                f"Local module {LocalModule} specified, but the combined skim expects "
                "skims from more than one module. No steering file written."
            )
        modules = {LocalModule.rstrip(".py"): sorted(skims)}
    else:
        modules = {
            module: sorted(
                [skim for skim in skims if Registry.get_skim_module(skim) == module]
            )
            for module in modules
        }
    return skims, modules
class _hashable_list(list):
    def __hash__(self):
        return hash(tuple(self))
def _sphinxify_decay(decay_string):
    """Format the given decay string by using LaTeX commands instead of plain-text.
    Output is formatted for use with Sphinx (ReStructured Text).
    This is a utility function for autogenerating skim documentation.
    Parameters:
        decay_string (str): A decay descriptor.
    Returns:
        sphinxed_string (str): LaTeX version of the decay descriptor.
    """
    decay_string = re.sub("^(B.):generic", "\\1_{\\\\text{had}}", decay_string)
    decay_string = decay_string.replace(":generic", "")
    decay_string = decay_string.replace(":semileptonic", "_{\\text{SL}}")
    decay_string = decay_string.replace(":FSP", "_{FSP}")
    decay_string = decay_string.replace(":V0", "_{V0}")
    decay_string = re.sub("_[0-9]+", "", decay_string)
    # Note: these are applied from top to bottom, so if you have
    # both B0 and anti-B0, put anti-B0 first.
    substitutes = [
        ("==>", "\\to"),
        ("->", "\\to"),
        ("gamma", "\\gamma"),
        ("p+", "p"),
        ("anti-p-", "\\bar{p}"),
        ("pi+", "\\pi^+"),
        ("pi-", "\\pi^-"),
        ("pi0", "\\pi^0"),
        ("K_S0", "K^0_S"),
        ("K_L0", "K^0_L"),
        ("mu+", "\\mu^+"),
        ("mu-", "\\mu^-"),
        ("tau+", "\\tau^+"),
        ("tau-", "\\tau^-"),
        ("nu", "\\nu"),
        ("K+", "K^+"),
        ("K-", "K^-"),
        ("e+", "e^+"),
        ("e-", "e^-"),
        ("J/psi", "J/\\psi"),
        ("anti-Lambda_c-", "\\Lambda^{-}_{c}"),
        ("anti-Sigma+", "\\overline{\\Sigma}^{+}"),
        ("anti-Lambda0", "\\overline{\\Lambda}^{0}"),
        ("anti-D0*", "\\overline{D}^{0*}"),
        ("anti-D*0", "\\overline{D}^{0*}"),
        ("anti-D0", "\\overline{D}^0"),
        ("anti-B0", "\\overline{B}^0"),
        ("Sigma+", "\\Sigma^{+}"),
        ("Lambda_c+", "\\Lambda^{+}_{c}"),
        ("Lambda0", "\\Lambda^{0}"),
        ("D+", "D^+"),
        ("D-", "D^-"),
        ("D0", "D^0"),
        ("D*+", "D^{+*}"),
        ("D*-", "D^{-*}"),
        ("D*0", "D^{0*}"),
        ("D_s+", "D^+_s"),
        ("D_s-", "D^-_s"),
        ("D_s*+", "D^{+*}_s"),
        ("D_s*-", "D^{-*}_s"),
        ("B+", "B^+"),
        ("B-", "B^-"),
        ("B0", "B^0"),
        ("B_s0", "B^0_s"),
        ("K*0", "K^{0*}"),
    ]
    tex_string = decay_string
    for (key, value) in substitutes:
        tex_string = tex_string.replace(key, value)
    return f":math:`{tex_string}`"
[docs]
def fancy_skim_header(SkimClass):
    """Decorator to generate a fancy header to skim documentation and prepend it to the
    docstring. Add this just above the definition of a skim.
    Also ensures the documentation of the template functions like `BaseSkim.build_lists`
    is not repeated in every skim documentation.
    .. code-block:: python
        @fancy_skim_header
        class MySkimName(BaseSkim):
            # docstring here describing your skim, and explaining cuts.
    """
    SkimName = SkimClass.__name__
    SkimCode = Registry.encode_skim_name(SkimName)
    authors = SkimClass.__authors__ or ["(no authors listed)"]
    description = SkimClass.__description__ or "(no description)"
    contact = SkimClass.__contact__ or "(no contact listed)"
    category = SkimClass.__category__ or "(no category listed)"
    if isinstance(authors, str):
        # If we were given a string, split it up at: commas, "and", "&", and newlines
        authors = re.split(
            r",\s+and\s+|\s+and\s+|,\s+&\s+|\s+&\s+|,\s+|\s*\n\s*", authors
        )
        # Strip any remaining whitespace either side of an author's name
        authors = [re.sub(r"^\s+|\s+$", "", author) for author in authors]
    if isinstance(category, list):
        category = ", ".join(category)
    # If multiple contacts were given, split them up:
    contacts = re.split(r",\s+and\s+|\s+and\s+|,\s+&\s+|\s+&\s+|,\s+|\s*\n\s*", contact)
    # If the contact is of the form "NAME <EMAIL>" or "NAME (EMAIL)", then make it a link
    matches = [re.match("([^<>()`]+) [<(]([^<>()`]+@[^<>()`]+)[>)]", contact) for contact in contacts]
    for i, match in enumerate(matches):
        if match:
            name, email = match[1], match[2]
            contacts[i] = f"`{name} <mailto:{email}>`_"
        else:
            contacts[i] = contacts[i]
    header = f"""
    Note:
        * **Skim description**: {description}
        * **Skim name**: {SkimName}
        * **Skim LFN code**: {SkimCode}
        * **Category**: {category}
        * **Author{"s"*(len(authors) > 1)}**: {", ".join(authors)}
        * **Contact{"s"*(len(contacts) > 1)}**: {", ".join(contacts)}
    """
    if SkimClass.ApplyHLTHadronCut:
        HLTLine = "*This skim includes a selection on the HLT flag* ``hlt_hadron``."
        header = f"{header.rstrip()}\n\n        {HLTLine}\n"
    if SkimClass.__doc__:
        SkimClass.__doc__ = header + "\n\n" + SkimClass.__doc__.lstrip("\n")
    else:
        # Handle case where docstring is empty, or was not redefined
        SkimClass.__doc__ = header
    # If documentation of template functions not redefined, make sure BaseSkim docstring is not repeated
    SkimClass.load_standard_lists.__doc__ = SkimClass.load_standard_lists.__doc__ or ""
    SkimClass.build_lists.__doc__ = SkimClass.build_lists.__doc__ or ""
    SkimClass.validation_histograms.__doc__ = (
        SkimClass.validation_histograms.__doc__ or ""
    )
    SkimClass.additional_setup.__doc__ = SkimClass.additional_setup.__doc__ or ""
    return SkimClass
[docs]
def dry_run_steering_file(SteeringFile):
    """
    Check if the steering file at the given path can be run with the "--dry-run" option.
    """
    proc = subprocess.run(
        ["basf2", "--dry-run", "-i", "i.root", "-o", "o.root", str(SteeringFile)],
        stderr=subprocess.PIPE,
        stdout=subprocess.PIPE,
    )
    if proc.returncode != 0:
        stdout = proc.stdout.decode("utf-8")
        stderr = proc.stderr.decode("utf-8")
        raise RuntimeError(
            f"An error occurred while dry-running steering file {SteeringFile}\n"
            f"Script output:\n{stdout}\n{stderr}"
        )