Belle II Software development
test_sysvar.py
1#!/usr/bin/env python3
2
3
10
11
12import unittest
13import numpy as np
14import pandas as pd
15from sysvar import Reweighter
16import itertools
17
18
19class TestSysVar(unittest.TestCase):
20 """Test case of SysVar module"""
21
22 def __init__(self, *args, **kwargs):
23 """Constructor"""
24 super().__init__(*args, **kwargs)
25 cosTheta_min = [-0.7, -0.5, 0, 0.2, 0.5, 0.8, 0.9]
26 cosTheta_max = [-0.5, 0, 0.2, 0.5, 0.8, 0.9, 0.95]
27 cosTheta_var = [list(c) for c in zip(cosTheta_min, cosTheta_max)]
28 p_min = [0.5, 1, 2, 3, 4, 5]
29 p_max = [1, 2, 3, 4, 5, 6]
30 charge_var = [[-2, 0], [0, 2]]
31 p_var = [list(p) for p in zip(p_min, p_max)]
32 eff_binning = np.array([list(c) for c in itertools.product(p_var, cosTheta_var, charge_var)]).reshape(-1, 6)
33 p_min = [0.5, 1, 2, 3, 4]
34 p_max = [1, 2, 3, 4, 5]
35 p_var = [list(p) for p in zip(p_min, p_max)]
36 fake_binning = np.array([list(c) for c in itertools.product(p_var, cosTheta_var, charge_var)]).reshape(-1, 6)
37 np.random.seed(0)
38 eff_df = pd.DataFrame(eff_binning, columns=['p_min', 'p_max', 'cosTheta_min', 'cosTheta_max', 'charge_min', 'charge_max'])
39 eff_df['data_MC_ratio'] = np.random.normal(1, 0.1, size=len(eff_df))
40 eff_df['data_MC_uncertainty_stat_dn'] = np.random.uniform(0.001, 0.01, size=len(eff_df))
41 eff_df['data_MC_uncertainty_stat_up'] = np.random.uniform(0.001, 0.01, size=len(eff_df))
42 eff_df['data_MC_uncertainty_sys_up'] = np.random.uniform(0.001, 0.01, size=len(eff_df))
43 eff_df['data_MC_uncertainty_sys_dn'] = np.random.uniform(0.001, 0.01, size=len(eff_df))
44 eff_df['variable'] = 'electronID'
45 eff_df['threshold'] = 0.8
46
47 fake_df = pd.DataFrame(fake_binning, columns=['p_min', 'p_max', 'cosTheta_min', 'cosTheta_max', 'charge_min', 'charge_max'])
48 fake_df['data_MC_ratio'] = np.random.normal(2, 0.2, size=len(fake_df))
49 fake_df['data_MC_uncertainty_stat_dn'] = np.random.uniform(0.01, 0.02, size=len(fake_df))
50 fake_df['data_MC_uncertainty_stat_up'] = np.random.uniform(0.01, 0.02, size=len(fake_df))
51 fake_df['data_MC_uncertainty_sys_up'] = np.random.uniform(0.01, 0.02, size=len(fake_df))
52 fake_df['data_MC_uncertainty_sys_dn'] = np.random.uniform(0.01, 0.02, size=len(fake_df))
53 fake_df['variable'] = 'electronID'
54 fake_df['threshold'] = 0.8
55
56 self.pid_tables = {(11, 11): eff_df, (11, 211): fake_df}
57
58 user_data_dict = {
59 'cosTheta': np.array([0.25, 0.55, 0.85, 0.925]),
60 'p': np.array([1.5, 2.5, 3.5, 4.5]),
61 'charge': np.array([-1, 1, 1, 1]),
62 'electronID': np.array([0.85, 0.9, 0.95, 0.975]),
63 'PDG': np.array([11, -11, -11, -11]),
64 'mcPDG': np.array([11, 211, -11, 211]),
65 'B0_dec_mode': [0, 1, 2, 3],
66 'B0_PDG': [511, -511, 511, -511],
67 'B0_sigProb': [0.02, 0.03, 0.02, 0.9]
68 }
69
70 self.user_data = pd.DataFrame(user_data_dict)
71 print(self.pid_tables)
72 print(self.user_data)
73
74 fei_dict = {'dec_mode': ['All', 'Rest', 'mode0', 'mode2'],
75 'cal': [0.8, 0.85, 0.75, 0.82],
76 'cal_stat_error': [0.1, 0.02, 0.03, 0.05],
77 'cal_sys_error': [0.1, 0.02, 0.03, 0.05],
78 'sig_prob_threshold': [0.01, 0.01, 0.01, 0.01],
79 'Btag': ['B0', 'B0', 'B0', 'B0']
80 }
81
82 self.fei_table = pd.DataFrame(fei_dict)
83
85 """Tests merging of PID tables"""
86 reweighter = Reweighter()
87 thresholds = {11: ('electronID', 0.8)}
88 merged_table = reweighter.merge_pid_weight_tables(self.pid_tables, thresholds)
89 self.assertEqual(len(merged_table), 154)
90 self.assertIn('PDG', merged_table.columns)
91 self.assertIn('mcPDG', merged_table.columns)
92 with self.assertRaises(ValueError):
93 wrong_thresholds = {11: ('electronID', 0.9)}
94 reweighter.merge_pid_weight_tables(self.pid_tables, wrong_thresholds)
95
96 def test_binning(self):
97 """Tests binning of PID tables"""
98 reweighter = Reweighter()
99 binning = reweighter.get_binning(self.pid_tables[(11, 11)])
100 print(binning)
101 self.assertEqual(len(binning), 3)
102 self.assertCountEqual(binning.keys(), ['p', 'cosTheta', 'charge'])
103 self.assertEqual(len(binning['p']), 7)
104 self.assertEqual(len(binning['cosTheta']), 8)
105 self.assertEqual(len(binning['charge']), 3)
106 binning = reweighter.get_binning(self.pid_tables[(11, 211)])
107 self.assertEqual(len(binning), 3)
108 self.assertEqual(len(binning['p']), 6)
109
111 """Tests adding PID particle"""
112 reweighter = Reweighter()
113 thresholds = {11: ('electronID', 0.8)}
114 reweighter.add_pid_particle('', self.pid_tables, thresholds)
115 particle = reweighter.get_particle('')
116 self.assertIsNotNone(particle)
117 self.assertEqual(len(particle.merged_table), 154)
118 self.assertCountEqual(particle.get_binning_variables(), ['p', 'cosTheta', 'charge'])
119 print(particle)
120 with self.assertRaises(ValueError):
121 reweighter.add_pid_particle('', self.pid_tables, thresholds)
122
124 """Tests add_fei_particle method"""
125 print('==================================================')
126 reweighter = Reweighter()
127 reweighter.add_fei_particle('', self.fei_table, 0.01, None)
128 particle = reweighter.get_particle('')
129 self.assertEqual(len(particle.merged_table), 4)
130 self.assertCountEqual(particle.get_binning_variables(), ['dec_mode'])
131 print(particle)
132 with self.assertRaises(ValueError):
133 reweighter.add_fei_particle('', self.fei_table, 0.001, None)
134
135 def test_reweight(self):
136 """Tests reweighting of PID and FEI particles"""
137 n_variations = 50
138 reweighter = Reweighter(n_variations=n_variations)
139 thresholds = {11: ('electronID', 0.8)}
140 reweighter.add_pid_particle('', self.pid_tables, thresholds)
141 reweighter.add_fei_particle('B0', self.fei_table, 0.01, None)
142 local_data = self.user_data.copy(deep=True)
143 reweighter.reweight(local_data)
144 self.assertEqual(len(local_data), 4)
145 self.assertIn('Weight', local_data.columns)
146 self.assertIn('B0_Weight', local_data.columns)
147 cols = [f'Weight_{i}' for i in range(0, n_variations)]
148 self.assertCountEqual(cols, [col for col in local_data.columns if col.startswith('Weight_')])
149 self.assertTrue((local_data.query('abs(mcPDG) == 11')[['Weight']+cols] < 1.5).all().all())
150 self.assertTrue((local_data.query('abs(mcPDG) != 11')[['Weight']+cols] > 1.5).all().all())
151 # print(local_data.query('abs(mcPDG) != 11')[['Weight']+cols].std(axis=1))
152 self.assertTrue((local_data.query('abs(mcPDG) == 11')[['Weight']+cols].std(axis=1) < 0.015).all())
153 self.assertTrue((local_data.query('abs(mcPDG) != 11')[['Weight']+cols].std(axis=1) > 0.015).all())
154
155 cols = [f'B0_Weight_{i}' for i in range(0, n_variations)]
156 self.assertCountEqual(cols, [col for col in local_data.columns if col.startswith('B0_Weight_')])
157 # print(local_data[['B0_Weight']+cols])
158 self.assertFalse((local_data[['B0_Weight']+cols].isna()).any().any())
159 self.assertTrue((local_data[['B0_Weight']+cols] > 0).all().all())
160
161 print('==================================================')
162 print('==================================================')
163 print('Shift index test:')
164 local_data_shift = self.user_data.copy(deep=True)
165 local_data_shift.index += 100
166 reweighter.reweight(local_data_shift)
167 self.assertIn('Weight', local_data_shift.columns)
168 cols = [f'Weight_{i}' for i in range(0, n_variations)]
169 print(local_data[['Weight']+cols])
170 print(local_data_shift[['Weight']+cols])
171 self.assertFalse((local_data_shift[['Weight']+cols].isna()).any().any())
172 cols = [f'B0_Weight_{i}' for i in range(0, n_variations)]
173 self.assertFalse((local_data_shift[['B0_Weight']+cols].isna()).any().any())
174
175
176if __name__ == '__main__':
177 unittest.main()
def test_add_pid_particle(self)
Definition: test_sysvar.py:110
def __init__(self, *args, **kwargs)
Definition: test_sysvar.py:22
fei_table
FEI tables for testing.
Definition: test_sysvar.py:82
pid_tables
PID tables for testing.
Definition: test_sysvar.py:56
def test_add_fei_particle(self)
Definition: test_sysvar.py:123
user_data
User data for testing.
Definition: test_sysvar.py:70
def test_merge_tables(self)
Definition: test_sysvar.py:84
STL namespace.