Source code for sysvar

#!/usr/bin/env python3

##########################################################################
# basf2 (Belle II Analysis Software Framework)                           #
# Author: The Belle II Collaboration                                     #
#                                                                        #
# See git log for contributors and copyright holders.                    #
# This file is licensed under LGPL-3.0, see LICENSE.md.                  #
##########################################################################

import pandas as pd
import numpy as np
from dataclasses import dataclass
import matplotlib.pyplot as plt
import pdg
import warnings
import ast
from pandas.errors import PerformanceWarning

warnings.warn(
    "This module will soon be deprecated and eventually removed in a future release. "
    "Its functionality is being taken over by the standalone SysVar package: "
    "https://gitlab.desy.de/belle2/software/sysvar",
    FutureWarning,
    stacklevel=2
)

"""
A module that adds corrections to analysis dataframe.
It adds weight variations according to the total uncertainty for easier error propagation.
"""

_weight_cols = ['data_MC_ratio',
                'data_MC_uncertainty_stat_dn',
                'data_MC_uncertainty_stat_up',
                'data_MC_uncertainty_sys_dn',
                'data_MC_uncertainty_sys_up'
                ]

_correction_types = ['PID', 'FEI']
_fei_mode_col = 'dec_mode'


@dataclass
class ReweighterParticle:
    """
    Class that stores the information of a particle.
    """
    #: @var prefix
    #: Prefix of the particle in the ntuple
    prefix: str

    #: Type of the particle (PID or FEI)
    type: str

    #: Merged table of the weights
    merged_table: pd.DataFrame

    #: @var pdg_binning
    #: Kinematic binning of the weight table per particle
    pdg_binning: dict

    #: @var variable_aliases
    #: Variable aliases of the weight table
    variable_aliases: dict

    #: @var weight_name
    #: Weight column name that will be added to the ntuple
    weight_name: str

    #: Internal list of the names of the weight columns
    column_names: list = None

    #: Random seed for systematics
    sys_seed: int = None

    #: Covariance matrix corresponds to the total uncertainty
    cov: np.ndarray = None

    #: When true assume systematics are 100% correlated
    syscorr: bool = True

    #: Coverage of the user ntuple
    coverage: float = None

    #: Values for the plots
    plot_values: dict = None

    def get_varname(self, varname: str) -> str:
        """
        Returns the variable name with the prefix and use alias if defined.
        """
        name = varname
        if self.variable_aliases and varname in self.variable_aliases:
            name = self.variable_aliases[varname]
        if name.startswith(self.prefix):
            return name
        return f'{self.prefix}{name}'

    def get_binning_variables(self) -> list:
        """
        Returns the list of variables that are used for the binning
        """
        variables = set(sum([list(d.keys()) for d in self.pdg_binning.values()], []))
        return [f'{self.get_varname(var)}' for var in variables]

    def get_pdg_variables(self) -> list:
        """
        Returns the list of variables that are used for the PDG codes
        """
        pdg_vars = ['PDG']
        #: Add the mcPDG code requirement for PID particle
        if self.type == "PID":
            pdg_vars += ['mcPDG']
        return [f'{self.get_varname(var)}' for var in pdg_vars]

    def generate_variations(self,
                            n_variations: int,
                            rho_sys: np.ndarray = None,
                            rho_stat: np.ndarray = None) -> None:
        """
        Generates variations of weights according to the uncertainties
        """
        self.merged_table['stat_error'] = self.merged_table[["data_MC_uncertainty_stat_up",
                                                             "data_MC_uncertainty_stat_dn"]].max(axis=1)
        self.merged_table['sys_error'] = self.merged_table[["data_MC_uncertainty_sys_up",
                                                            "data_MC_uncertainty_sys_dn"]].max(axis=1)
        self.merged_table["error"] = np.sqrt(self.merged_table["stat_error"] ** 2 + self.merged_table["sys_error"] ** 2)
        means = self.merged_table["data_MC_ratio"].values
        #: Names of the varied weight columns
        self.column_names = [f"{self.weight_name}_{i}" for i in range(n_variations)]
        cov = self.get_covariance(n_variations, rho_sys, rho_stat)
        weights = cov + means
        self.merged_table[self.weight_name] = self.merged_table["data_MC_ratio"]
        self.merged_table[self.column_names] = weights.T
        self.column_names.insert(0, self.weight_name)

    def get_covariance(self,
                       n_variations: int,
                       rho_sys: np.ndarray = None,
                       rho_stat: np.ndarray = None) -> np.ndarray:
        """
        Returns the covariance matrix of the weights
        """
        len_means = len(self.merged_table["data_MC_ratio"])
        zeros = np.zeros(len_means)
        if self.cov is None:
            if rho_sys is None:
                if self.syscorr:
                    rho_sys = np.ones((len_means, len_means))
                else:
                    rho_sys = np.identity(len_means)
            if rho_stat is None:
                rho_stat = np.identity(len_means)
            sys_cov = np.matmul(
                np.matmul(np.diag(self.merged_table['sys_error']), rho_sys), np.diag(self.merged_table['sys_error'])
            )
            stat_cov = np.matmul(
                np.matmul(np.diag(self.merged_table['stat_error']), rho_stat), np.diag(self.merged_table['stat_error'])
            )
            np.random.seed(self.sys_seed)
            sys = np.random.multivariate_normal(zeros, sys_cov, n_variations)
            np.random.seed(None)
            stat = np.random.multivariate_normal(zeros, stat_cov, n_variations)
            return sys + stat
        errors = np.random.multivariate_normal(zeros, self.cov, n_variations)
        return errors

    def __str__(self) -> str:
        """
        Converts the object to a string.
        """
        separator = '------------------'
        title = 'ReweighterParticle'
        prefix_str = f'Type: {self.type} Prefix: {self.prefix}'
        columns = _weight_cols
        merged_table_str = f'Merged table:\n{self.merged_table[columns].describe()}'
        pdg_binning_str = 'PDG binning:\n'
        for pdgs in self.pdg_binning:
            pdg_binning_str += f'{pdgs}: {self.pdg_binning[pdgs]}\n'
        return '\n'.join([separator, title, prefix_str, merged_table_str, pdg_binning_str]) + separator

    def plot_coverage(self, fig=None, axs=None):
        """
        Plots the coverage of the ntuple.
        """
        if self.plot_values is None:
            return
        vars = set(sum([list(d.keys()) for d in self.plot_values.values()], []))
        if fig is None:
            fig, axs = plt.subplots(len(self.plot_values), len(vars), figsize=(5*len(vars), 3*len(self.plot_values)), dpi=120)
        axs = np.array(axs)
        if len(axs.shape) < 1:
            axs = axs.reshape(len(self.plot_values), len(vars))
        bin_plt = {'linewidth': 3, 'linestyle': '--', 'color': '0.5'}
        fig.suptitle(f'{self.type} particle {self.prefix.strip("_")}')
        for (reco_pdg, mc_pdg), ax_row in zip(self.plot_values, axs):
            for var, ax in zip(self.plot_values[(reco_pdg, mc_pdg)], ax_row):
                ymin = 0
                ymax = self.plot_values[(reco_pdg, mc_pdg)][var][1].max()*1.1
                # Plot binning
                if self.type == 'PID':
                    ax.vlines(self.pdg_binning[(reco_pdg, mc_pdg)][var], ymin, ymax,
                              label='Binning',
                              alpha=0.8,
                              **bin_plt)
                elif self.type == 'FEI':
                    values = np.array([int(val[4:]) for val in self.pdg_binning[(reco_pdg, mc_pdg)][var]])
                    ax.bar(values+0.5,
                           np.ones(len(values))*ymax,
                           width=1,
                           alpha=0.5,
                           label='Binning',
                           **bin_plt)
                    rest = np.setdiff1d(self.plot_values[(reco_pdg, mc_pdg)][var][0], values)
                    ax.bar(rest+0.5,
                           np.ones(len(rest))*ymax,
                           width=1,
                           alpha=0.2,
                           label='Rest category',
                           **bin_plt)
                # Plot values
                widths = (self.plot_values[(reco_pdg, mc_pdg)][var][0][1:] - self.plot_values[(reco_pdg, mc_pdg)][var][0][:-1])
                centers = self.plot_values[(reco_pdg, mc_pdg)][var][0][:-1] + widths/2
                ax.bar(centers,
                       self.plot_values[(reco_pdg, mc_pdg)][var][1],
                       width=widths,
                       label='Values',
                       alpha=0.8)
                ax.set_title(f'True {pdg.to_name(mc_pdg)} to reco {pdg.to_name(reco_pdg)} coverage')
                ax.set_xlabel(var)
        axs[-1][-1].legend()
        fig.tight_layout()
        return fig, axs


class Reweighter:
    """
    Class that reweights the dataframe.

    Args:
        n_variations (int): Number of weight variations to generate.
        weight_name (str): Name of the weight column.
        evaluate_plots (bool): Flag to indicate if the plots should be evaluated.
        nbins (int): Number of bins for the plots.
    """

    def __init__(self,
                 n_variations: int = 100,
                 weight_name: str = "Weight",
                 evaluate_plots: bool = True,
                 nbins: int = 50,
                 fillna: float = 1.0) -> None:
        """
        Initializes the Reweighter class.
        """
        #: Number of weight variations to generate
        self.n_variations = n_variations
        #: List of particles
        self.particles = []
        #: Correlations between the particles
        self.correlations = []
        #: Name of the weight column
        self.weight_name = weight_name
        #: Flag to indicate if the weights have been generated
        self.weights_generated = False
        #: Flag to indicate if the plots should be evaluated
        self.evaluate_plots = evaluate_plots
        #: Number of bins for the plots
        self.nbins = nbins
        #: Value to fill NaN values
        self.fillna = fillna

    def get_bin_columns(self, weight_df) -> list:
        """
        Returns the kinematic bin columns of the dataframe.
        """
        return [col for col in weight_df.columns if col.endswith('_min') or col.endswith('_max')]

    def get_binning(self, weight_df) -> dict:
        """
        Returns the kinematic binning of the dataframe.
        """
        columns = self.get_bin_columns(weight_df)
        var_names = {'_'.join(col.split('_')[:-1]) for col in columns}
        bin_dict = {}
        for var_name in var_names:
            bin_dict[var_name] = []
            for col in columns:
                if col.startswith(var_name):
                    bin_dict[var_name] += list(weight_df[col].values)
            bin_dict[var_name] = np.array(sorted(set(bin_dict[var_name])))
        return bin_dict

    def get_fei_binning(self, weight_df) -> dict:
        """
        Returns the irregular binning of the dataframe.
        """
        return {_fei_mode_col: weight_df.loc[weight_df[_fei_mode_col].str.startswith('mode'),
                                             _fei_mode_col].value_counts().index.to_list()}

    def get_ntuple_variables(self,
                             ntuple_df: pd.DataFrame,
                             particle: ReweighterParticle) -> None:
        """
        Checks if the variables are in the ntuple and returns them.

        Args:
            ntuple_df (pandas.DataFrame): Dataframe containing the analysis ntuple.
            particle (ReweighterParticle): Particle object containing the necessary variables.
        """
        ntuple_variables = particle.get_binning_variables()
        ntuple_variables += particle.get_pdg_variables()
        for var in ntuple_variables:
            if var not in ntuple_df.columns:
                raise ValueError(f'Variable {var} is not in the ntuple! Required variables are {ntuple_variables}')
        return ntuple_variables

    def merge_pid_weight_tables(self,
                                weights_dict: dict,
                                pdg_pid_variable_dict: dict) -> pd.DataFrame:
        """
        Merges the efficiency and fake rate weight tables.

        Args:
            weights_dict (dict): Dictionary containing the weight tables.
            pdg_pid_variable_dict (dict): Dictionary containing the PDG codes and variable names.
        """
        weight_dfs = []
        for reco_pdg, mc_pdg in weights_dict:
            if reco_pdg not in pdg_pid_variable_dict:
                raise ValueError(f'Reconstructed PDG code {reco_pdg} not found in thresholds!')
            weight_df = weights_dict[(reco_pdg, mc_pdg)]
            weight_df['mcPDG'] = mc_pdg
            weight_df['PDG'] = reco_pdg
            # Check if these are legacy tables:
            if 'charge' in weight_df.columns:
                charge_dict = {'+': [0, 2], '-': [-2, 0]}
                weight_df[['charge_min', 'charge_max']] = [charge_dict[val] for val in weight_df['charge'].values]
                weight_df = weight_df.drop(columns=['charge'])
                # If iso_score is a single value, drop the min and max columns
                if 'iso_score_min' in weight_df.columns and len(weight_df['iso_score_min'].unique()) == 1:
                    weight_df = weight_df.drop(columns=['iso_score_min', 'iso_score_max'])
            pid_variable_name = pdg_pid_variable_dict[reco_pdg][0]
            threshold = pdg_pid_variable_dict[reco_pdg][1]
            selected_weights = weight_df.query(f'variable == "{pid_variable_name}" and threshold == {threshold}')
            if len(selected_weights) == 0:
                available_variables = weight_df['variable'].unique()
                available_thresholds = weight_df['threshold'].unique()
                raise ValueError(f'No weights found for PDG code {reco_pdg}, mcPDG {mc_pdg},'
                                 f' variable {pid_variable_name} and threshold {threshold}!\n'
                                 f' Available variables: {available_variables}\n'
                                 f' Available thresholds: {available_thresholds}')
            weight_dfs.append(selected_weights)
        return pd.concat(weight_dfs, ignore_index=True)

    def add_pid_weight_columns(self,
                               ntuple_df: pd.DataFrame,
                               particle: ReweighterParticle) -> None:
        """
        Adds a weight and uncertainty columns to the dataframe.

        Args:
            ntuple_df (pandas.DataFrame): Dataframe containing the analysis ntuple.
            particle (ReweighterParticle): Particle object.
        """
        # Apply a weight value from the weight table to the ntuple, based on the binning
        binning_df = pd.DataFrame(index=ntuple_df.index)
        # Take absolute value of mcPDG for binning because we have charge already
        binning_df['mcPDG'] = ntuple_df[f'{particle.get_varname("mcPDG")}'].abs()
        binning_df['PDG'] = ntuple_df[f'{particle.get_varname("PDG")}'].abs()
        plot_values = {}
        for reco_pdg, mc_pdg in particle.pdg_binning:
            ntuple_cut = f'abs({particle.get_varname("mcPDG")}) == {mc_pdg} and abs({particle.get_varname("PDG")}) == {reco_pdg}'
            if ntuple_df.query(ntuple_cut).empty:
                continue
            plot_values[(reco_pdg, mc_pdg)] = {}
            for var in particle.pdg_binning[(reco_pdg, mc_pdg)]:
                labels = [(particle.pdg_binning[(reco_pdg, mc_pdg)][var][i-1], particle.pdg_binning[(reco_pdg, mc_pdg)][var][i])
                          for i in range(1, len(particle.pdg_binning[(reco_pdg, mc_pdg)][var]))]
                binning_df.loc[(binning_df['mcPDG'] == mc_pdg) & (binning_df['PDG'] == reco_pdg), var] = pd.cut(ntuple_df.query(
                    ntuple_cut)[f'{particle.get_varname(var)}'],
                    particle.pdg_binning[(reco_pdg, mc_pdg)][var], labels=labels)
                binning_df.loc[(binning_df['mcPDG'] == mc_pdg) & (binning_df['PDG'] == reco_pdg),
                               f'{var}_min'] = binning_df.loc[(binning_df['mcPDG'] == mc_pdg) & (binning_df['PDG'] == reco_pdg),
                                                              var].str[0]
                binning_df.loc[(binning_df['mcPDG'] == mc_pdg) & (binning_df['PDG'] == reco_pdg),
                               f'{var}_max'] = binning_df.loc[(binning_df['mcPDG'] == mc_pdg) & (binning_df['PDG'] == reco_pdg),
                                                              var].str[1]
                binning_df.drop(var, axis=1, inplace=True)
                if self.evaluate_plots:
                    values = ntuple_df.query(ntuple_cut)[f'{particle.get_varname(var)}']
                    if len(values.unique()) < 2:
                        print(f'Skip {var} for plotting!')
                        continue
                    x_range = np.linspace(values.min(), values.max(), self.nbins)
                    plot_values[(reco_pdg, mc_pdg)][var] = x_range, np.histogram(values, bins=x_range, density=True)[0]
        # merge the weight table with the ntuple on binning columns
        weight_cols = _weight_cols
        if particle.column_names:
            weight_cols = particle.column_names
        binning_df = binning_df.merge(particle.merged_table[weight_cols + binning_df.columns.tolist()],
                                      on=binning_df.columns.tolist(), how='left')
        binning_df.index = ntuple_df.index
        particle.coverage = 1 - binning_df[weight_cols[0]].isna().sum() / len(binning_df)
        particle.plot_values = plot_values
        for col in weight_cols:
            ntuple_df[f'{particle.get_varname(col)}'] = binning_df[col]
            ntuple_df[f'{particle.get_varname(col)}'] = ntuple_df[f'{particle.get_varname(col)}'].fillna(self.fillna)

    def add_pid_particle(self,
                         prefix: str,
                         weights_dict: dict,
                         pdg_pid_variable_dict: dict,
                         variable_aliases: dict = None,
                         sys_seed: int = None,
                         syscorr: bool = True) -> None:
        """
        Adds weight variations according to the total uncertainty for easier error propagation.

        Args:
            prefix (str): Prefix for the new columns.
            weights_dict (pandas.DataFrame): Dataframe containing the efficiency weights.
            pdg_pid_variable_dict (dict): Dictionary containing the PID variables and thresholds.
            variable_aliases (dict): Dictionary containing variable aliases.
            sys_seed (int): Seed for the systematic variations.
            syscorr (bool): When true assume systematics are 100% correlated defaults to
        true. Note this is overridden by provision of a None value rho_sys
        """
        # Empty prefix means no prefix
        if prefix is None:
            prefix = ''
        # Add underscore if not present
        if prefix and not prefix.endswith('_'):
            prefix += '_'
        if self.get_particle(prefix):
            raise ValueError(f"Particle with prefix '{prefix}' already exists!")
        if variable_aliases is None:
            variable_aliases = {}
        merged_weight_df = self.merge_pid_weight_tables(weights_dict, pdg_pid_variable_dict)
        pdg_binning = {(reco_pdg, mc_pdg): self.get_binning(merged_weight_df.query(f'PDG == {reco_pdg} and mcPDG == {mc_pdg}'))
                       for reco_pdg, mc_pdg in merged_weight_df[['PDG', 'mcPDG']].value_counts().index.to_list()}
        particle = ReweighterParticle(prefix,
                                      type='PID',
                                      merged_table=merged_weight_df,
                                      pdg_binning=pdg_binning,
                                      variable_aliases=variable_aliases,
                                      weight_name=self.weight_name,
                                      sys_seed=sys_seed,
                                      syscorr=syscorr)
        self.particles += [particle]

    def get_particle(self, prefix: str) -> ReweighterParticle:
        """
        Get a particle by its prefix.
        """
        cands = [particle for particle in self.particles if particle.prefix.strip('_') == prefix.strip('_')]
        if len(cands) == 0:
            return None
        return cands[0]

    def convert_fei_table(self, table: pd.DataFrame, threshold: float):
        """
        Checks if the tables are provided in a legacy format and converts them to the standard format.
        """
        result = None
        str_to_pdg = {'B+': 521, 'B-': 521, 'B0': 511}
        if 'cal' in table.columns:
            result = pd.DataFrame(index=table.index)
            result['data_MC_ratio'] = table['cal']
            result['PDG'] = table['Btag'].apply(lambda x: str_to_pdg.get(x))
            # Assume these are only efficiency tables
            result['mcPDG'] = result['PDG']
            result['threshold'] = table['sig_prob_threshold']
            result[_fei_mode_col] = table[_fei_mode_col]
            result['data_MC_uncertainty_stat_dn'] = table['cal_stat_error']
            result['data_MC_uncertainty_stat_up'] = table['cal_stat_error']
            result['data_MC_uncertainty_sys_dn'] = table['cal_sys_error']
            result['data_MC_uncertainty_sys_up'] = table['cal_sys_error']
        elif 'cal factor' in table.columns:
            result = pd.DataFrame(index=table.index)
            result['data_MC_ratio'] = table['cal factor']
            result['PDG'] = table['Btype'].apply(lambda x: str_to_pdg.get(x))
            result['mcPDG'] = result['PDG']
            result['threshold'] = table['sig prob cut']
            # Assign the total error to the stat uncertainty and set syst. one to 0
            result['data_MC_uncertainty_stat_dn'] = table['error']
            result['data_MC_uncertainty_stat_up'] = table['error']
            result['data_MC_uncertainty_sys_dn'] = 0
            result['data_MC_uncertainty_sys_up'] = 0
            result[_fei_mode_col] = table['mode']
        elif 'dmID' in table.columns:
            result = pd.DataFrame(index=table.index)
            result['data_MC_ratio'] = table['central_value']
            result['PDG'] = table['PDG'].apply(ast.literal_eval).str[0].abs()
            result['mcPDG'] = result['PDG']
            result['threshold'] = table['sigProb']
            # Assign the total error to the stat uncertainty and set syst. one to 0
            result['data_MC_uncertainty_stat_dn'] = table['total_error']
            result['data_MC_uncertainty_stat_up'] = table['total_error']
            result['data_MC_uncertainty_sys_dn'] = 0
            result['data_MC_uncertainty_sys_up'] = 0
            result[_fei_mode_col] = ('mode' + table['dmID'].astype(str)).replace('mode999', 'rest')
        else:
            result = table
        result = result.query(f'threshold == {threshold}')
        if len(result) == 0:
            raise ValueError(f'No weights found for threshold {threshold}!')
        return result

    def add_fei_particle(self, prefix: str,
                         table: pd.DataFrame,
                         threshold: float,
                         cov: np.ndarray = None,
                         variable_aliases: dict = None,
                         ) -> None:
        """
        Adds weight variations according to the total uncertainty for easier error propagation.

        Args:
            prefix (str): Prefix for the new columns.
            table (pandas.DataFrame): Dataframe containing the efficiency weights.
            threshold (float): Threshold for the efficiency weights.
            cov (numpy.ndarray): Covariance matrix for the efficiency weights.
            variable_aliases (dict): Dictionary containing variable aliases.
        """
        # Empty prefix means no prefix
        if prefix is None:
            prefix = ''
        if prefix and not prefix.endswith('_'):
            prefix += '_'
        if self.get_particle(prefix):
            raise ValueError(f"Particle with prefix '{prefix}' already exists!")
        if variable_aliases is None:
            variable_aliases = {}
        if table is None or len(table) == 0:
            raise ValueError('No weights provided!')
        converted_table = self.convert_fei_table(table, threshold)
        pdg_binning = {(reco_pdg, mc_pdg): self.get_fei_binning(converted_table.query(f'PDG == {reco_pdg} and mcPDG == {mc_pdg}'))
                       for reco_pdg, mc_pdg in converted_table[['PDG', 'mcPDG']].value_counts().index.to_list()}
        particle = ReweighterParticle(prefix,
                                      type='FEI',
                                      merged_table=converted_table,
                                      pdg_binning=pdg_binning,
                                      variable_aliases=variable_aliases,
                                      weight_name=self.weight_name,
                                      cov=cov)
        self.particles += [particle]

    def add_fei_weight_columns(self, ntuple_df: pd.DataFrame, particle: ReweighterParticle):
        """
        Adds weight columns according to the FEI calibration tables
        """
        rest_str = 'rest'
        particle.merged_table[_fei_mode_col]
        # Apply a weight value from the weight table to the ntuple, based on the binning
        binning_df = pd.DataFrame(index=ntuple_df.index)
        # Take absolute value of mcPDG for binning because we have charge already
        binning_df['PDG'] = ntuple_df[f'{particle.get_varname("PDG")}'].abs()
        # Copy the mode ID from the ntuple
        binning_df['num_mode'] = ntuple_df[particle.get_varname(_fei_mode_col)].astype(int)
        # Default value in case if reco PDG is not a B-meson PDG
        binning_df[_fei_mode_col] = np.nan
        plot_values = {}
        for reco_pdg, mc_pdg in particle.pdg_binning:
            plot_values[(reco_pdg, mc_pdg)] = {}
            binning_df.loc[binning_df['PDG'] == reco_pdg, _fei_mode_col] = particle.merged_table.query(
                f'PDG == {reco_pdg} and {_fei_mode_col}.str.lower() == "{rest_str}"')[_fei_mode_col].values[0]
            for mode in particle.pdg_binning[(reco_pdg, mc_pdg)][_fei_mode_col]:
                binning_df.loc[(binning_df['PDG'] == reco_pdg) & (binning_df['num_mode'] == int(mode[4:])), _fei_mode_col] = mode
            if self.evaluate_plots:
                values = ntuple_df[f'{particle.get_varname(_fei_mode_col)}']
                x_range = np.linspace(values.min(), values.max(), int(values.max())+1)
                plot_values[(reco_pdg, mc_pdg)][_fei_mode_col] = x_range, np.histogram(values, bins=x_range, density=True)[0]

        # merge the weight table with the ntuple on binning columns
        weight_cols = _weight_cols
        if particle.column_names:
            weight_cols = particle.column_names
        binning_df = binning_df.merge(particle.merged_table[weight_cols + ['PDG', _fei_mode_col]],
                                      on=['PDG', _fei_mode_col], how='left')
        binning_df.index = ntuple_df.index
        particle.coverage = 1 - binning_df[weight_cols[0]].isna().sum() / len(binning_df)
        particle.plot_values = plot_values
        for col in weight_cols:
            ntuple_df[f'{particle.get_varname(col)}'] = binning_df[col]

    def reweight(self,
                 df: pd.DataFrame,
                 generate_variations: bool = True):
        """
        Reweights the dataframe according to the weight tables.

        Args:
            df (pandas.DataFrame): Dataframe containing the analysis ntuple.
            generate_variations (bool): When true generate weight variations.
        """
        for particle in self.particles:
            if particle.type not in _correction_types:
                raise ValueError(f'Particle type {particle.type} not supported!')
            print(f'Required variables: {self.get_ntuple_variables(df, particle)}')
            if generate_variations:
                particle.generate_variations(n_variations=self.n_variations)
            if particle.type == 'PID':
                self.add_pid_weight_columns(df, particle)
            elif particle.type == 'FEI':
                self.add_fei_weight_columns(df, particle)
        return df

    def print_coverage(self):
        """
        Prints the coverage of each particle.
        """
        print('Coverage:')
        for particle in self.particles:
            print(f'{particle.type} {particle.prefix.strip("_")}: {particle.coverage*100 :0.1f}%')

    def plot_coverage(self):
        """
        Plots the coverage of each particle.
        """
        for particle in self.particles:
            particle.plot_coverage()


[docs] def add_weights_to_dataframe(prefix: str, df: pd.DataFrame, systematic: str, custom_tables: dict = None, custom_thresholds: dict = None, **kw_args) -> pd.DataFrame: """ Helper method that adds weights to a dataframe. Args: prefix (str): Prefix for the new columns. df (pandas.DataFrame): Dataframe containing the analysis ntuple. systematic (str): Type of the systematic corrections, options: "custom_PID" and "custom_FEI". MC_production (str): Name of the MC production. custom_tables (dict): Dictionary containing the custom efficiency weights. custom_thresholds (dict): Dictionary containing the custom thresholds for the custom efficiency weights. n_variations (int): Number of variations to generate. generate_variations (bool): When true generate weight variations. weight_name (str): Name of the weight column. show_plots (bool): When true show the coverage plots. variable_aliases (dict): Dictionary containing variable aliases. cov_matrix (numpy.ndarray): Covariance matrix for the custom efficiency weights. fillna (int): Value to fill NaN values with. sys_seed (int): Seed for the systematic variations. syscorr (bool): When true assume systematics are 100% correlated defaults to true. **kw_args: Additional arguments for the Reweighter class. """ generate_variations = kw_args.get('generate_variations', True) n_variations = kw_args.get('n_variations', 100) weight_name = kw_args.get('weight_name', "Weight") fillna = kw_args.get('fillna', 1.0) # Catch performance warnings from pandas with warnings.catch_warnings(): warnings.simplefilter("ignore", category=PerformanceWarning) reweighter = Reweighter(n_variations=n_variations, weight_name=weight_name, fillna=fillna) variable_aliases = kw_args.get('variable_aliases') if systematic.lower() == 'custom_fei': cov_matrix = kw_args.get('cov_matrix') reweighter.add_fei_particle(prefix=prefix, table=custom_tables, threshold=custom_thresholds, variable_aliases=variable_aliases, cov=cov_matrix ) elif systematic.lower() == 'custom_pid': sys_seed = kw_args.get('sys_seed') syscorr = kw_args.get('syscorr') if syscorr is None: syscorr = True reweighter.add_pid_particle(prefix=prefix, weights_dict=custom_tables, pdg_pid_variable_dict=custom_thresholds, variable_aliases=variable_aliases, sys_seed=sys_seed, syscorr=syscorr ) else: raise ValueError(f'Systematic {systematic} is not supported!') result = reweighter.reweight(df, generate_variations=generate_variations) if kw_args.get('show_plots'): reweighter.print_coverage() reweighter.plot_coverage() return result