#!/usr/bin/env python3
##########################################################################
# basf2 (Belle II Analysis Software Framework) #
# Author: The Belle II Collaboration #
# #
# See git log for contributors and copyright holders. #
# This file is licensed under LGPL-3.0, see LICENSE.md. #
##########################################################################
import numpy as np
import pandas as pd
import h5py
import uproot
def _make_const_lists():
"""Moving this code into a function to avoid a top-level ROOT import."""
from ROOT import Belle2 # noqa: make Belle2 namespace available
import ROOT.Belle2
PARTICLES, PDG_CODES = [], []
for i in range(len(ROOT.Belle2.Const.chargedStableSet)):
particle = ROOT.Belle2.Const.chargedStableSet.at(i)
name = (particle.__repr__()[7:-1]
.replace("-", "")
.replace("+", "")
.replace("euteron", ""))
PARTICLES.append(name)
PDG_CODES.append(particle.getPDGCode())
# PARTICLES = ["e", "mu", "pi", "K", "p", "d"]
# PDG_CODES = [11, 13, 211, 321, 2212, 1000010020]
DETECTORS = []
for det in ROOT.Belle2.Const.PIDDetectors.set():
DETECTORS.append(ROOT.Belle2.Const.parseDetectors(det))
# DETECTORS = ["SVD", "CDC", "TOP", "ARICH", "ECL", "KLM"]
return PARTICLES, PDG_CODES, DETECTORS
# PARTICLES, PDG_CODES, DETECTORS = _make_const_lists()
PARTICLES = ["e", "mu", "pi", "K", "p", "d"]
PDG_CODES = [11, 13, 211, 321, 2212, 1000010020]
DETECTORS = ["SVD", "CDC", "TOP", "ARICH", "ECL", "KLM"]
P_BINS = np.array([0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.5])
THETA_BINS = np.radians(np.array([17, 28, 40, 60, 77, 96, 115, 133, 150]))
def _column(particle, detector):
"""Default column names for detector log-likelihoods.
Args:
particle (str): particle name
detector (str): detector name
Returns:
str: Corresponding column name.
"""
return f"{detector}_{particle}"
[docs]
def root_column(particle, detector):
"""Column names for detector log-likelihoods found in our ROOT datafiles.
Args:
particle (str): particle name
detector (str): detector name
Returns:
str: Corresponding column name.
"""
pdg = PDG_CODES[PARTICLES.index(particle)]
return f"pidLogLikelyhoodOf{pdg}From{detector}"
[docs]
def read_root(root_filenames):
"""Reads one or several ROOT datafiles into a DataFrame.
Args:
root_filenames (list(str) or str): If only one filename, can be given as
a string. If more than one, should be given as a list or tuple.
Returns:
pandas.DataFrame: DataFrame containing the data of the ROOT datafile(s).
"""
return uproot.concatenate(root_filenames, library='pd')
[docs]
def make_h5(df, tags, out_filename, pdg=None, column=root_column):
"""Make an HDF5 file in our 'slim' format from the given DataFrame.
Args:
df (pandas.DataFrame): The DataFrame containing the data.
tags (list(str) or str): The particle tags used as a prefix for desired
columns. e.g. for kaons in a D* decay, this is 'DST_D0_K'. One or
more can be given.
out_filename (str): Output filename for the h5 file that will be
written.
pdg (int or None): The PDG code for the particles being
extracted. If None, uses the values found in the 'mcPDG' column of
the DataFrame. Defaults to None.
column: A function which, given the particle and
detector names, returns the column name for the corresponding
detector log-likelihood. Defaults to root_column, which assumes
column names are of the format
f'pidLogLikelyhoodOf{pdg}From{detector}'.
"""
if isinstance(tags, str):
tags = [tags]
def _concat(arrs):
return np.concatenate(arrs) if len(arrs) > 1 else arrs[0]
def _get_all(col):
return _concat([df[f"{tag}_{col}"].values for tag in tags])
with h5py.File(out_filename, "w") as f:
if pdg is not None:
pdg_values = np.ones(len(df) * len(tags)) * pdg
else:
pdg_values = np.abs(_get_all("mcPDG"))
f.create_dataset("pdg", data=pdg_values)
f.create_dataset("p", data=_get_all("p"))
f.create_dataset("theta", data=np.arccos(_get_all("cosTheta")))
f.create_dataset("phi", data=_get_all("phi"))
for det in DETECTORS:
for p in PARTICLES:
data = _get_all(column(p, det))
data[np.isnan(data)] = 0
f.create_dataset(f"{det}/{p}", data=data)
[docs]
def merge_h5s(filenames, out_filename, pdgs=None):
"""Merge several HDF5 files in our 'slim' format together.
Args:
filenames (list(str)): Filenames of HDF5 files to be merged.
out_filename (str): Output filename.
pdgs (list(int)): The PDG tags for the particle types, one per
filename, to overwrite the 'pdg' columns in those files when
merging. If None, simply uses the 'pdg' columns from the files.
Defaults to None.
"""
fs = [h5py.File(fname, "r") for fname in filenames]
m = h5py.File(out_filename, "w")
keys = ["p", "theta", "phi"]
keys += [f"{d}/{p}" for d in DETECTORS for p in PARTICLES]
for key in keys:
m.create_dataset(key, data=np.concatenate([f[key][()] for f in fs]))
if pdgs is not None:
# replace 'pdg' data with kinematic tags
m.create_dataset(
"pdg",
data=np.concatenate(
[np.ones_like(f["pdg"][()]) * pdg for f, pdg in zip(fs, pdgs)]
),
)
else:
m.create_dataset("pdg", data=np.concatenate([f["pdg"][()] for f in fs]))
for f in fs:
f.close()
m.close()
[docs]
def split_h5(
filename,
output_dir,
train_size=0.8,
val_size=0.1,
test_size=0.1,
shuffle=True,
random_state=None,
):
"""Split the data in an HDF5 'slim' format file in train, validation, and
test sets, stored in .npz files for ease of weight training.
Args:
filename (str): Filename of HDF5 input file.
output_dir (str): Name of output directory, in which the train,
validation, and test sets will be written. Will be created if it
does not already exist.
train_size (float): Fraction of the dataset to use for
training. Defaults to 0.8.
val_size (float): Fraction of the dataset to use for
validation. Defaults to 0.1.
test_size (float): Fraction of the dataset to use for testing.
Defaults to 0.1.
shuffle (bool): Whether to shuffle the dataset before
splitting. Defaults to True.
random_state (int or None): Random state for the shuffling.
Defaults to None.
"""
from sklearn.model_selection import train_test_split
from os.path import join
from os import makedirs
assert train_size > 0, f"train_size ({train_size}) must be positive"
assert val_size >= 0, f"val_size ({val_size}) may not be negative"
assert test_size >= 0, f"test_size ({test_size}) may not be negative"
assert val_size + test_size != 0, "val_size and test_size cannot both be zero"
if val_size == 0:
val_size = test_size
test_size = 0
if train_size + val_size + test_size != 1:
total = train_size + val_size + test_size
train_size = train_size / total
val_size = val_size / total
test_size = test_size / total
# read data
with h5py.File(filename, "r") as f:
data = np.stack(
[f[det][p][()] for p in PARTICLES for det in DETECTORS], axis=-1
)
p_data = f["p"][()]
theta_data = f["theta"][()]
labels = np.abs(f["pdg"][()])
for i, p in enumerate(PDG_CODES):
labels[labels == p] = i
mask = labels < 6
X = data[mask]
y = labels[mask]
p = p_data[mask]
t = theta_data[mask]
makedirs(output_dir, exist_ok=True)
kw = dict(shuffle=shuffle, random_state=random_state)
# split once
(X_0, X, y_0, y, p_0, p, t_0, t) = train_test_split(
X, y, p, t, train_size=train_size, **kw
)
np.savez(join(output_dir, "train.npz"), X=X_0, y=y_0, p=p_0, theta=t_0)
# split again if desired
if test_size != 0:
size = val_size / (1 - train_size)
(X_1, X_2, y_1, y_2, p_1, p_2, t_1, t_2) = train_test_split(
X, y, p, t, train_size=size, **kw
)
np.savez(join(output_dir, "val.npz"), X=X_1, y=y_1, p=p_1, theta=t_1)
np.savez(join(output_dir, "test.npz"), X=X_2, y=y_2, p=p_2, theta=t_2)
else:
np.savez(join(output_dir, "val.npz"), X=X, y=y, p=p, theta=t)
[docs]
def softmax(x):
"""Performs softmax calculation with corrections to help prevent overflow.
Note:
This is the calculation used to convert log-likelihoods to likelihood
ratios. Implementation following
https://stackoverflow.com/a/67112412/18837571
Args:
x (:func:`numpy.array`): Data to be softmaxed. Softmax is calculated over the last
dimension.
Returns:
:func:`numpy.array`: Softmaxed data.
"""
maxes = np.amax(x, axis=-1, keepdims=True)
x_exp = np.exp(x - maxes)
return x_exp / np.sum(x_exp, axis=-1, keepdims=True)
[docs]
def make_labels(df):
"""Make a 'labels' column for the DataFrame. The 'labels' column contains
the particle type labels for each event: 0 for electron, 1 for muon, and so
on.
Args:
df (pandas.DataFrame): DataFrame that 'labels' column will be added to. Must
not have NaN values in the 'pdg' column.
"""
labels = np.abs(df["pdg"].values)
if np.count_nonzero(~np.isfinite(labels)):
print(
'Warning: dataset contains NaN values in the "pdg" column. '
'This means the "labels" column cannot be made, so most of the '
"pidplots methods will fail."
)
labels = np.ones_like(labels) * np.nan
else:
for i, p in enumerate(PDG_CODES):
labels[labels == p] = i
labels[labels >= 6] = -1
df["labels"] = labels
[docs]
def make_bins(df, p_bins=P_BINS, theta_bins=THETA_BINS):
"""Make 'p_bin' and 'theta_bin' column in the given DataFrame.
Args:
df (pandas.DataFrame): The DataFrame to add bins columns to.
p_bins (:func:`numpy.array`): The edges of the momentum bins in GeV.
Defaults to P_BINS, [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.5] GeV.
theta_bins (:func:`numpy.array`): The edges of the theta bins in radians.
Defaults to THETA_BINS, [17, 28, 40, 60, 77, 96, 115, 133, 150]
degrees.
"""
df["p_bin"] = np.digitize(df["p"].values, p_bins) - 1
df["theta_bin"] = np.digitize(df["theta"].values, theta_bins) - 1
[docs]
def make_lrs(df, column=_column):
"""Makes likelihood ratio columns for each of the six particle types in the
given DataFrame.
Args:
df (pandas.DataFrame): DataFrame to which the columns will be added.
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
"""
# hypothesis log-likelihoods
h_logls = np.stack(
[
np.sum(df[[column(p, det) for det in DETECTORS]].values, -1)
for p in PARTICLES
],
-1,
)
# compute likelihood ratios
lrs = softmax(h_logls)
for i, p in enumerate(PARTICLES):
df[f"lr_{p}"] = lrs[:, i]
[docs]
def make_binary_lrs(df, column=_column):
"""Makes binary likelihood ratio columns for each of the five non-pion
particle type hypotheses in the given DataFrame.
Args:
df (pandas.DataFrame): DataFrame to which the columns will be added.
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
"""
for h in PARTICLES:
if h == "pi":
continue
h_logls = np.stack(
[
np.sum(df[[column(p, det) for det in DETECTORS]].values, -1)
for p in [h, "pi"]
],
-1,
)
lrs = softmax(h_logls)
df[f"binary_lr_{h}"] = lrs[:, 0]
[docs]
def make_pid(df):
"""Makes a 'pid' column in the given DataFrame. The 'pid' column is the
predicted particle type. Requires likelihood ratio columns to exist.
Args:
df (pandas.DataFrame): DataFrame to which the 'pid' column will be added.
"""
lrs = np.stack([df[f"lr_{p}"].values for p in PARTICLES], axis=-1)
pids = np.argmax(lrs, axis=-1)
df["pid"] = pids
[docs]
def compute_det_lrs(d, det, column=_column):
"""Computes single-detector likelihood ratios from the given DataFrame.
Args:
d (pandas.DataFrame): DataFrame containing the detector log-likelihoods.
det (str): The name of the detector for which the single-detector
likelihood ratios will be calculated.
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
Returns:
:func:`numpy.array`: The detector likelihood ratios.
"""
h_logls = d[[column(p, det) for p in PARTICLES]].values
lrs = softmax(h_logls)
return lrs
[docs]
def make_pid_det(df, column=_column):
"""Makes single-detector PID columns for each of the detectors in the given DataFrame.
Args:
df (pandas.DataFrame): DataFrame to which the columns will be added.
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
"""
for det in DETECTORS:
mask = df[column("e", det)] == 0 # TODO: make more robust
lrs = compute_det_lrs(df, det, column=column)
pids = np.argmax(lrs, axis=-1)
pids[mask] = -1
df[f"pid_{det}"] = pids
[docs]
def compute_abl_lrs(d, det, column=_column):
"""Computes ablation likelihood ratios from the given DataFrame.
Args:
d (pandas.DataFrame): DataFrame containing the detector log-likelihoods.
det (str): The name of the detector to be omitted for the ablation.
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
Returns:
:func:`numpy.array`: The ablation likelihood ratios.
"""
def _cols(p):
others = [det2 for det2 in DETECTORS if det2 != det]
return [column(p, det2) for det2 in others]
h_logls = np.stack([np.sum(d[_cols(p)].values, -1) for p in PARTICLES], -1)
lrs = softmax(h_logls)
return lrs
[docs]
def make_pid_abl(df, column=_column):
"""Makes ablation PID columns for each of the detectors in the given
DataFrame.
Args:
df (pandas.DataFrame): DataFrame to which the columns will be added.
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
"""
for det in DETECTORS:
lrs = compute_abl_lrs(df, det, column=column)
pids = np.argmax(lrs, axis=-1)
df[f"pid_no_{det}"] = pids
[docs]
def compute_contrib(d, corr=True):
"""Computes the detector contributions.
Args:
d (pandas.DataFrame): DataFrame containing the likelihood ratio data.
corr (bool): Whether to compute contribution to the likelihood
ratio of the _correct_ hypothesis (True) or the _chosen_ hypothesis
(False). Defaults to True.
Returns:
dict[str, :func:`numpy.array`]: The contributions of each detector.
"""
out = dict()
for det in DETECTORS:
reg_lrs = d[[f"lr_{p}" for p in PARTICLES]].values
abl_lrs = compute_abl_lrs(d, det)
idx = d["labels" if corr else "pid"].values.astype(int)
reg_lr = reg_lrs[np.arange(len(idx)), idx]
abl_lr = abl_lrs[np.arange(len(idx)), idx]
ctrb = reg_lr - abl_lr
out[det] = ctrb
return out
[docs]
def make_contrib(df, corr=True):
"""Makes columns for the detector contributions in the given DataFrame.
Args:
df (pandas.DataFrame): DataFrame to which the columns will be added.
corr (bool): Whether to compute contribution to the likelihood
ratio of the _correct_ hypothesis (True) or the _chosen_ hypothesis
(False). Defaults to True.
"""
ctrbs = compute_contrib(df, corr=corr)
for det, ctrb in ctrbs.items():
df[f"contrib_{det}"] = ctrb
[docs]
def make_columns(
df,
p_bins=P_BINS,
theta_bins=THETA_BINS,
contrib_corr=True,
column=_column,
):
"""Makes all the additional columns for a given DataFrame.
Args:
df (pandas.DataFrame): DataFrame to which the columns will be added.
p_bins (:func:`numpy.array`): The edges of the momentum bins in GeV.
Defaults to P_BINS, [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.5] GeV.
theta_bins (:func:`numpy.array`): The edges of the theta bins in radians.
Defaults to THETA_BINS, [17, 28, 40, 60, 77, 96, 115, 133, 150]
degrees.
contrib_corr (bool): Whether to compute contribution to the
likelihood ratio of the _correct_ hypothesis (True) or the _chosen_
hypothesis (False). Defaults to True.
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
"""
make_labels(df)
make_bins(df, p_bins=p_bins, theta_bins=theta_bins)
make_lrs(df, column=column)
make_binary_lrs(df, column=column)
make_pid(df)
make_pid_det(df, column=column)
make_pid_abl(df)
make_contrib(df, corr=contrib_corr)
[docs]
def apply_weights(df, weights, p_bins=P_BINS, theta_bins=THETA_BINS, column=_column):
"""Applies the given weights to the log-likelihood data in the DataFrame.
Args:
df (pandas.DataFrame): DataFrame to which the weights are applied.
weights (dict[tuple(int), :func:`numpy.array`] or :func:`numpy.array`): The calibration weight
values. If a dict, keys should be a tuple of ints, and each value is
the six-by-six array of weights for the bin. If a single np.array,
should be a six-by-six array of weights to be applied globally.
p_bins (:func:`numpy.array`): The edges of the momentum bins in GeV.
Defaults to P_BINS, [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.5] GeV.
theta_bins (:func:`numpy.array`): The edges of the theta bins in radians.
Defaults to THETA_BINS, [17, 28, 40, 60, 77, 96, 115, 133, 150]
degrees.
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
"""
if weights is None:
return
# per-bin weights
if isinstance(weights, dict):
for p in range(len(p_bins) - 1):
p_lo, p_hi = p_bins[p], p_bins[p + 1]
p_mask = (df["p"] >= p_lo) & (df["p"] <= p_hi)
for theta in range(len(theta_bins) - 1):
t_lo, t_hi = theta_bins[theta], theta_bins[theta + 1]
t_mask = (df["theta"] >= t_lo) & (df["theta"] <= t_hi)
for i, h in enumerate(PARTICLES):
for j, d in enumerate(DETECTORS):
df.loc[(p_mask & t_mask), column(h, d)] *= weights[p, theta][
i, j
]
# global weights
else:
for i, h in enumerate(PARTICLES):
for j, d in enumerate(DETECTORS):
df[column(h, d)] *= weights[i, j]
[docs]
def cut_particles(df, allowed_particles, column=_column):
"""Cuts the log-likelihood data associated with given particle types.
Args:
df (pandas.DataFrame): DataFrame to which the cuts will be applied.
allowed_particles (list(str)): List of allowed particle types. Any
particle types not present will be cut, unless the list is empty (in
which case no cuts are applied).
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
"""
if len(allowed_particles) == 0:
return
for p in PARTICLES:
if not (p in allowed_particles):
for d in DETECTORS:
df[column(p, d)] = -1e10
[docs]
def read_h5(filename):
"""Read an HDF5 file in our 'slim' format into a DataFrame.
Args:
filename (str): Input filename.
Returns:
pandas.DataFrame: DataFrame containing data.
"""
df = pd.DataFrame()
with h5py.File(filename, "r") as f:
for key in ["pdg", "p", "theta", "phi"]:
df[key] = f[key][()]
for key in [f"{d}/{p}" for d in DETECTORS for p in PARTICLES]:
df_key = key.replace("/", "_")
df[df_key] = f[key][()]
df[df_key] = df[df_key].fillna(0)
return df
[docs]
def read_npz(filename):
"""Read an npz file in our training format into a DataFrame.
Args:
filename (str): Input filename.
Returns:
pandas.DataFrame: DataFrame containing data.
"""
data = np.load(filename)
df = pd.DataFrame(
data=data["X"], columns=[f"{d}_{p}" for p in PARTICLES for d in DETECTORS],
)
df["labels"] = data["y"]
df["p"] = data["p"]
df["theta"] = data["theta"]
df["pdg"] = df["labels"]
for i, pdg in enumerate(PDG_CODES):
df.loc[df["labels"] == i, "pdg"] = pdg
return df
[docs]
def produce_analysis_df(
df,
compute_cols=True,
drop_nans=True,
drop_outside_bins=True,
weights=None,
allowed_particles=[],
p_bins=P_BINS,
theta_bins=THETA_BINS,
column=None,
):
"""Prepares a DataFrame for PID analysis by applying weights, computing and
adding additional columns, cutting NaNs, and more.
Args:
df (pandas.DataFrame): DataFrame to prepare for analysis.
compute_cols (bool): Whether to compute and add additional
columns. Defaults to True.
drop_nans (bool): Whether to drop rows that contain NaNs.
Defaults to True.
drop_outside_bins (bool): Whether to drop rows for particles
outside of the momentum and theta bins. Defaults to True.
weights (:func:`numpy.array`): Calibration weights to be applied to the
detector log-likelihoods. Defaults to None.
allowed_particles (list(str)): If not empty, specifies the
allowed particle types. Any not allowed particle types will be
excluded from the PID calculations. If empty, all particle types are
considered. Defaults to [].
p_bins (:func:`numpy.array`): The edges of the momentum bins in GeV.
Defaults to P_BINS, [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.5] GeV.
theta_bins (:func:`numpy.array`): The edges of the theta bins in radians.
Defaults to THETA_BINS, [17, 28, 40, 60, 77, 96, 115, 133, 150]
degrees.
column: A function which given the particle and
detector names returns the corresponding detector log-likelihood
column name. Defaults to _column, which gives column names of the
format f"{detector}_{particle}".
Returns:
pandas.DataFrame: Return the prepared DataFrame. (Not all modifications in
this method are in-place.)
"""
if column is not None:
for p in PARTICLES:
for d in DETECTORS:
df[f"{d}_{p}"] = df[column(p, d)]
apply_weights(df, weights, p_bins=p_bins, theta_bins=theta_bins)
cut_particles(df, allowed_particles)
if compute_cols:
make_columns(
df,
p_bins=p_bins,
theta_bins=theta_bins,
contrib_corr=True,
)
if drop_outside_bins:
df = df.loc[
np.logical_and.reduce(
[
df["p_bin"].values >= 0,
df["p_bin"].values < len(p_bins) - 1,
df["theta_bin"].values >= 0,
df["theta_bin"].values < len(theta_bins) - 1,
]
)
]
if drop_nans:
df = df.dropna()
df = df[df["labels"] >= 0]
return df