Belle II Software  release-08-01-10
tensorflow_dnn_interface.py
1 #!/usr/bin/env python3
2 
3 
10 
11 import os
12 import json
13 import tempfile
14 import numpy as np
15 import tensorflow as tf
16 import pandas
17 
19 
20 from dft import binning
21 
22 from dft import tensorflow_dnn_model as tfm
23 from dft.TfData import TfDataBasf2, TfDataBasf2Stub
24 
25 
26 def get_tensorflow_model(number_of_features, parameters):
27  """
28  generates the tensorflow model
29  :param int number_of_features: number of features is handled separately
30  :param dictionary parameters: additional parameters passed to tensorflow_dnn_model.DefaultModel
31  :return:
32  """
33 
34  layers = parameters.get('layers', None)
35  wd_coeffs = parameters.get('wd_coeffs', [])
36 
37  lr_dec_rate = parameters.get('lr_dec_rate', 1 / (1 + 2e-7)**1.2e5)
38  lr_init = parameters.get('lr_init', .05)
39  mom_init = parameters.get('mom_init', .9)
40  min_epochs = parameters.get('min_epochs', 300)
41  max_epochs = parameters.get('max_epochs', 400)
42  stop_epochs = parameters.get('stop_epochs', 10)
43 
44  if layers is None:
45  layers = [['h0', 'tanh', number_of_features, 300, .0001, 1.0 / np.sqrt(300)],
46  ['h1', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
47  ['h2', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
48  ['h3', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
49  ['h4', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
50  ['h5', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
51  ['h6', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
52  ['h7', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
53  ['y', 'sigmoid', 300, 1, .0001, 0.002 * 1.0 / np.sqrt(300)]]
54  else:
55  layers[0][2] = number_of_features
56 
57  # None disables usage of wd_coeffs
58  if wd_coeffs is not None and not wd_coeffs:
59  wd_coeffs = [2e-5 for _ in layers]
60 
61  mlp = tfm.MultilayerPerceptron.from_list(layers)
62  model = tfm.DefaultModel(mlp, lr_dec_rate=lr_dec_rate, lr_init=lr_init, mom_init=mom_init, wd_coeffs=wd_coeffs,
63  min_epochs=min_epochs, max_epochs=max_epochs, stop_epochs=stop_epochs)
64  return model
65 
66 
67 def get_model(number_of_features, number_of_spectators, number_of_events, training_fraction, parameters):
68  """
69  specifies the and configures the tensorflow model
70  :param number_of_features:
71  :param number_of_spectators:
72  :param number_of_events:
73  :param training_fraction:
74  :param parameters: as dictionary encoded json object
75  :return: State obj
76  """
77 
78  # get all parameters, if they are not available, use default values
79  if parameters is None:
80  parameters = {}
81  else:
82  if not isinstance(parameters, dict):
83  raise TypeError('parameters must be a dictionary')
84 
85  cuda_mask = parameters.get('cuda_visible_devices', '3')
86  tensorboard_dir = parameters.get('tensorboard_dir', None)
87 
88  batch_size = parameters.get('batch_size', 100)
89  seed = parameters.get('seed', None)
90 
91  # postprocessing parameters, from dictionary
92  transform_to_probability = parameters.get('transform_to_probability', False)
93 
94  # set random state
95  if seed:
96  print('Seed: ', seed)
97  tf.set_random_seed(seed)
98 
99  # mask cuda devices
100  os.environ['CUDA_VISIBLE_DEVICES'] = cuda_mask
101  gpus = tf.config.list_physical_devices('GPU')
102  if gpus:
103  for gpu in gpus:
104  tf.config.experimental.set_memory_growth(gpu, True)
105 
106  # using a stub data set since there is no data available at this state
107  stub_data_set = TfDataBasf2Stub(batch_size, number_of_features, number_of_events, training_fraction)
108 
109  # set saving file name, unfortunately this is already required in partial_fit
110  save_dir = tempfile.TemporaryDirectory()
111  save_name = os.path.join(save_dir.name, 'mymodel')
112 
113  model = get_tensorflow_model(number_of_features, parameters)
114  training = tfm.Trainer(model, stub_data_set, tensorboard_dir, save_name)
115 
116  state = State(model)
117 
118  # training object is required in partial fit
119  state.training = training
120  state.batch_size = batch_size
121  state.save_dir = save_dir
122 
123  state.transform_to_probability = transform_to_probability
124 
125  # save parameters
126  saved_parameters = parameters.copy()
127  saved_parameters['number_of_features'] = number_of_features
128  state.parameters = json.dumps(saved_parameters)
129  state.seed = seed
130  return state
131 
132 
133 def apply(state, X):
134  """
135  modified apply function
136  """
137 
138  binning.transform_ndarray(X, state.binning_parameters)
139  chunk_size = 1000000
140  if len(X) > chunk_size:
141  results = list()
142  for i in range(0, len(X), chunk_size):
143  results.append(state.model(X).numpy().flatten())
144  r = np.concatenate(results).flatten()
145  else:
146  r = state.model(X).numpy().flatten()
147  if state.transform_to_probability:
148  binning.transform_array_to_sf(r, state.sig_back_tuple, signal_fraction=.5)
149 
150  return np.require(r, dtype=np.float32, requirements=['A', 'W', 'C', 'O'])
151 
152 
153 def load(obj):
154  """
155  Load Tensorflow estimator into state
156  """
157  # tensorflow operations
158  gpus = tf.config.list_physical_devices('GPU')
159  if gpus:
160  for gpu in gpus:
161  tf.config.experimental.set_memory_growth(gpu, True)
162 
163  parameters = json.loads(obj[0])
164 
165  number_of_features = parameters.pop('number_of_features')
166 
167  class DataStub:
168  """
169  simple stub obj
170  """
171  feature_number = number_of_features
172  batches = 1
173 
174  model = get_tensorflow_model(number_of_features, parameters)
175  model.initialize(DataStub())
176 
177  # tensorflow is a moving target, file loading and saving of mid-level api changes rapidly. so we use the legacy here
178  with tempfile.TemporaryDirectory() as path:
179  with open(os.path.join(path, obj[1] + '.data-00000-of-00001'), 'w+b') as file1, open(
180  os.path.join(path, obj[1] + '.index'), 'w+b') as file2:
181  file1.write(bytes(obj[2]))
182  file2.write(bytes(obj[3]))
183 
184  checkpoint = tf.train.Checkpoint(model)
185  checkpoint.restore(os.path.join(path, obj[1]))
186 
187  state = State(model)
188  # preprocessing parameters
189  state.binning_parameters = obj[4]
190 
191  # postprocessing transform to probability, if pdf was sampled during training
192  state.transform_to_probability = obj[5]
193  state.sig_back_tuple = obj[6]
194 
195  seed = obj[7]
196  print('Deep FlavorTagger loading... Training seed: ', seed)
197 
198  return state
199 
200 
201 def begin_fit(state, Xtest, Stest, ytest, wtest, nBatches):
202  """
203  use test sets for monitoring
204  """
205  # TODO: split this set to define an independent test set for transformations to probability
206  state.Xvalid = Xtest[:len(Xtest) // 2]
207  state.yvalid = ytest[:len(ytest) // 2]
208 
209  state.Xtest = Xtest[len(Xtest) // 2:]
210  state.ytest = ytest[len(ytest) // 2:]
211 
212  return state
213 
214 
215 def partial_fit(state, X, S, y, w, epoch, batch):
216  """
217  returns fractions of training and testing dataset, also uses weights
218  :param X: unprocessed training dataset
219  :param Xtest: unprocessed validation dataset
220  :return: bool, True == continue, False == stop iterations
221  """
222 
223  # the epochs and batches are handled internally by the Trainer. This is all done within 1 external epoch and 1 external batch.
224  if epoch > 0 or batch > 0:
225  raise RuntimeError
226 
227  # preprocessing
228  state.binning_parameters = binning.get_ndarray_binning_parameters(X)
229 
230  binning.transform_ndarray(X, state.binning_parameters)
231  binning.transform_ndarray(state.Xvalid, state.binning_parameters)
232 
233  if np.any(np.isnan(X)):
234  raise ValueError('NaN values in Dataset. Preprocessing transformations failed.')
235 
236  # replace stub dataset
237  data_set = TfDataBasf2(X, y, state.Xvalid, state.yvalid, state.batch_size, seed=state.seed)
238 
239  state.training.data_set = data_set
240 
241  # start training
242  state.training.train_model()
243 
244  return False
245 
246 
247 def end_fit(state):
248  """
249  save the trained model
250  :param state:
251  :return:
252  """
253  filename = state.training.save_name
254  # postfix -2 is needed (current state gets postfix -1)
255  with open(filename + '-2.data-00000-of-00001', 'rb') as file1, open(filename + '-2.index', 'rb') as file2:
256  data1 = file1.read()
257  data2 = file2.read()
258  binning_parameters = state.binning_parameters
259 
260  # transform to probability has to be saved since state object has to return untransformed network output
261  transform_to_probability = state.transform_to_probability
262  state.transform_to_probability = False
263 
264  # sample pdfs of trained model on test_dataset, return test df
265  y_hat = state.model(state.Xtest).numpy().flatten()
266  test_df = pandas.DataFrame.from_dict({'y': state.ytest.reshape(-1), 'y_hat': y_hat.reshape(-1)})
267  (sig_pdf, back_pdf) = binning.get_signal_background_pdf(test_df)
268  seed = state.seed
269  parameters = state.parameters
270  del state
271  return [parameters, os.path.basename(filename), data1, data2, binning_parameters, transform_to_probability,
272  (sig_pdf, back_pdf), seed]