Belle II Software development
tensorflow_dnn_interface.py
1#!/usr/bin/env python3
2
3
10
11import os
12import json
13import tempfile
14import numpy as np
15import tensorflow as tf
16import pandas
17
19
20from dft import binning
21
22from dft import tensorflow_dnn_model as tfm
23from dft.TfData import TfDataBasf2, TfDataBasf2Stub
24
25
26def get_tensorflow_model(number_of_features, parameters):
27 """
28 generates the tensorflow model
29 :param int number_of_features: number of features is handled separately
30 :param dictionary parameters: additional parameters passed to tensorflow_dnn_model.DefaultModel
31 :return:
32 """
33
34 layers = parameters.get('layers', None)
35 wd_coeffs = parameters.get('wd_coeffs', [])
36
37 lr_dec_rate = parameters.get('lr_dec_rate', 1 / (1 + 2e-7)**1.2e5)
38 lr_init = parameters.get('lr_init', .05)
39 mom_init = parameters.get('mom_init', .9)
40 min_epochs = parameters.get('min_epochs', 300)
41 max_epochs = parameters.get('max_epochs', 400)
42 stop_epochs = parameters.get('stop_epochs', 10)
43
44 if layers is None:
45 layers = [['h0', 'tanh', number_of_features, 300, .0001, 1.0 / np.sqrt(300)],
46 ['h1', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
47 ['h2', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
48 ['h3', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
49 ['h4', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
50 ['h5', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
51 ['h6', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
52 ['h7', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
53 ['y', 'sigmoid', 300, 1, .0001, 0.002 * 1.0 / np.sqrt(300)]]
54 else:
55 layers[0][2] = number_of_features
56
57 # None disables usage of wd_coeffs
58 if wd_coeffs is not None and not wd_coeffs:
59 wd_coeffs = [2e-5 for _ in layers]
60
61 mlp = tfm.MultilayerPerceptron.from_list(layers)
62 model = tfm.DefaultModel(mlp, lr_dec_rate=lr_dec_rate, lr_init=lr_init, mom_init=mom_init, wd_coeffs=wd_coeffs,
63 min_epochs=min_epochs, max_epochs=max_epochs, stop_epochs=stop_epochs)
64 return model
65
66
67def get_model(number_of_features, number_of_spectators, number_of_events, training_fraction, parameters):
68 """
69 specifies the and configures the tensorflow model
70 :param number_of_features:
71 :param number_of_spectators:
72 :param number_of_events:
73 :param training_fraction:
74 :param parameters: as dictionary encoded json object
75 :return: State obj
76 """
77
78 # get all parameters, if they are not available, use default values
79 if parameters is None:
80 parameters = {}
81 else:
82 if not isinstance(parameters, dict):
83 raise TypeError('parameters must be a dictionary')
84
85 cuda_mask = parameters.get('cuda_visible_devices', '3')
86 tensorboard_dir = parameters.get('tensorboard_dir', None)
87
88 batch_size = parameters.get('batch_size', 100)
89 seed = parameters.get('seed', None)
90
91 # postprocessing parameters, from dictionary
92 transform_to_probability = parameters.get('transform_to_probability', False)
93
94 # set random state
95 if seed:
96 print('Seed: ', seed)
97 tf.set_random_seed(seed)
98
99 # mask cuda devices
100 os.environ['CUDA_VISIBLE_DEVICES'] = cuda_mask
101 gpus = tf.config.list_physical_devices('GPU')
102 if gpus:
103 for gpu in gpus:
104 tf.config.experimental.set_memory_growth(gpu, True)
105
106 # using a stub data set since there is no data available at this state
107 stub_data_set = TfDataBasf2Stub(batch_size, number_of_features, number_of_events, training_fraction)
108
109 # set saving file name, unfortunately this is already required in partial_fit
110 save_dir = tempfile.TemporaryDirectory()
111 save_name = os.path.join(save_dir.name, 'mymodel')
112
113 model = get_tensorflow_model(number_of_features, parameters)
114 training = tfm.Trainer(model, stub_data_set, tensorboard_dir, save_name)
115
116 state = State(model)
117
118 # training object is required in partial fit
119 state.training = training
120 state.batch_size = batch_size
121 state.save_dir = save_dir
122
123 state.transform_to_probability = transform_to_probability
124
125 # save parameters
126 saved_parameters = parameters.copy()
127 saved_parameters['number_of_features'] = number_of_features
128 state.parameters = json.dumps(saved_parameters)
129 state.seed = seed
130 return state
131
132
133def apply(state, X):
134 """
135 modified apply function
136 """
137
138 binning.transform_ndarray(X, state.binning_parameters)
139 chunk_size = 1000000
140 if len(X) > chunk_size:
141 results = list()
142 for i in range(0, len(X), chunk_size):
143 results.append(state.model(X).numpy().flatten())
144 r = np.concatenate(results).flatten()
145 else:
146 r = state.model(X).numpy().flatten()
147 if state.transform_to_probability:
148 binning.transform_array_to_sf(r, state.sig_back_tuple, signal_fraction=.5)
149
150 return np.require(r, dtype=np.float32, requirements=['A', 'W', 'C', 'O'])
151
152
153def load(obj):
154 """
155 Load Tensorflow estimator into state
156 """
157 # tensorflow operations
158 gpus = tf.config.list_physical_devices('GPU')
159 if gpus:
160 for gpu in gpus:
161 tf.config.experimental.set_memory_growth(gpu, True)
162
163 parameters = json.loads(obj[0])
164
165 number_of_features = parameters.pop('number_of_features')
166
167 class DataStub:
168 """
169 simple stub obj
170 """
171 feature_number = number_of_features
172 batches = 1
173
174 model = get_tensorflow_model(number_of_features, parameters)
175 model.initialize(DataStub())
176
177 # tensorflow is a moving target, file loading and saving of mid-level api changes rapidly. so we use the legacy here
178 with tempfile.TemporaryDirectory() as path:
179 with open(os.path.join(path, obj[1] + '.data-00000-of-00001'), 'w+b') as file1, open(
180 os.path.join(path, obj[1] + '.index'), 'w+b') as file2:
181 file1.write(bytes(obj[2]))
182 file2.write(bytes(obj[3]))
183
184 checkpoint = tf.train.Checkpoint(model)
185 checkpoint.restore(os.path.join(path, obj[1]))
186
187 state = State(model)
188 # preprocessing parameters
189 state.binning_parameters = obj[4]
190
191 # postprocessing transform to probability, if pdf was sampled during training
192 state.transform_to_probability = obj[5]
193 state.sig_back_tuple = obj[6]
194
195 seed = obj[7]
196 print('Deep FlavorTagger loading... Training seed: ', seed)
197
198 return state
199
200
201def begin_fit(state, Xtest, Stest, ytest, wtest, nBatches):
202 """
203 use test sets for monitoring
204 """
205 # TODO: split this set to define an independent test set for transformations to probability
206 state.Xvalid = Xtest[:len(Xtest) // 2]
207 state.yvalid = ytest[:len(ytest) // 2]
208
209 state.Xtest = Xtest[len(Xtest) // 2:]
210 state.ytest = ytest[len(ytest) // 2:]
211
212 return state
213
214
215def partial_fit(state, X, S, y, w, epoch, batch):
216 """
217 returns fractions of training and testing dataset, also uses weights
218 :param X: unprocessed training dataset
219 :param Xtest: unprocessed validation dataset
220 :return: bool, True == continue, False == stop iterations
221 """
222
223 # the epochs and batches are handled internally by the Trainer. This is all done within 1 external epoch and 1 external batch.
224 if epoch > 0 or batch > 0:
225 raise RuntimeError
226
227 # preprocessing
228 state.binning_parameters = binning.get_ndarray_binning_parameters(X)
229
230 binning.transform_ndarray(X, state.binning_parameters)
231 binning.transform_ndarray(state.Xvalid, state.binning_parameters)
232
233 if np.any(np.isnan(X)):
234 raise ValueError('NaN values in Dataset. Preprocessing transformations failed.')
235
236 # replace stub dataset
237 data_set = TfDataBasf2(X, y, state.Xvalid, state.yvalid, state.batch_size, seed=state.seed)
238
239 state.training.data_set = data_set
240
241 # start training
242 state.training.train_model()
243
244 return False
245
246
247def end_fit(state):
248 """
249 save the trained model
250 :param state:
251 :return:
252 """
253 filename = state.training.save_name
254 # postfix -2 is needed (current state gets postfix -1)
255 with open(filename + '-2.data-00000-of-00001', 'rb') as file1, open(filename + '-2.index', 'rb') as file2:
256 data1 = file1.read()
257 data2 = file2.read()
258 binning_parameters = state.binning_parameters
259
260 # transform to probability has to be saved since state object has to return untransformed network output
261 transform_to_probability = state.transform_to_probability
262 state.transform_to_probability = False
263
264 # sample pdfs of trained model on test_dataset, return test df
265 y_hat = state.model(state.Xtest).numpy().flatten()
266 test_df = pandas.DataFrame.from_dict({'y': state.ytest.reshape(-1), 'y_hat': y_hat.reshape(-1)})
267 (sig_pdf, back_pdf) = binning.get_signal_background_pdf(test_df)
268 seed = state.seed
269 parameters = state.parameters
270 del state
271 return [parameters, os.path.basename(filename), data1, data2, binning_parameters, transform_to_probability,
272 (sig_pdf, back_pdf), seed]