Belle II Software  release-06-01-15
tensorflow_dnn_interface.py
1 #!/usr/bin/env python3
2 
3 
10 
11 
12 import os
13 import json
14 import tempfile
15 import numpy as np
16 import tensorflow as tf
17 import pandas
18 
19 # was still important for some shared libraries at some point
20 
22 
23 from dft import binning
24 
25 from dft import tensorflow_dnn_model as tfm
26 from dft.TfData import TfDataBasf2, TfDataBasf2Stub
27 
28 
29 def get_tensorflow_model(number_of_features, parameters):
30  """
31  generates the tensorflow model
32  :param number_of_features: int, number of features is handled separately
33  :param parameters:
34  :return:
35  """
36 
37  layers = parameters.get('layers', None)
38  wd_coeffs = parameters.get('wd_coeffs', [])
39 
40  lr_dec_rate = parameters.get('lr_dec_rate', 1 / (1 + 2e-7)**1.2e5)
41  lr_init = parameters.get('lr_init', .05)
42  mom_init = parameters.get('mom_init', .9)
43  min_epochs = parameters.get('min_epochs', 300)
44  max_epochs = parameters.get('max_epochs', 400)
45  stop_epochs = parameters.get('stop_epochs', 10)
46 
47  if layers is None:
48  layers = [['h0', 'tanh', number_of_features, 300, .0001, 1.0 / np.sqrt(300)],
49  ['h1', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
50  ['h2', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
51  ['h3', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
52  ['h4', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
53  ['h5', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
54  ['h6', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
55  ['h7', 'tanh', 300, 300, .0001, 1.0 / np.sqrt(300)],
56  ['y', 'sigmoid', 300, 1, .0001, 0.002 * 1.0 / np.sqrt(300)]]
57  else:
58  layers[0][2] = number_of_features
59 
60  # None disables usage of wd_coeffs
61  if wd_coeffs is not None and not wd_coeffs:
62  wd_coeffs = [2e-5 for _ in layers]
63 
64  mlp = tfm.MultilayerPerceptron.from_list(layers)
65  model = tfm.DefaultModel(mlp, lr_dec_rate=lr_dec_rate, lr_init=lr_init, mom_init=mom_init, wd_coeffs=wd_coeffs,
66  min_epochs=min_epochs, max_epochs=max_epochs, stop_epochs=stop_epochs)
67 
68  return model
69 
70 
71 def get_model(number_of_features, number_of_spectators, number_of_events, training_fraction, parameters):
72  """
73  specifies the and configures the tensorflow model
74  :param number_of_features:
75  :param number_of_spectators:
76  :param number_of_events:
77  :param training_fraction:
78  :param parameters: as dictionary encoded json object
79  :return: State obj
80  """
81 
82  # get all parameters, if they are not available, use default values
83  if parameters is None:
84  parameters = {}
85  else:
86  if not isinstance(parameters, dict):
87  raise TypeError('parameters must be a dictionary')
88 
89  cuda_mask = parameters.get('cuda_visible_devices', '3')
90  tensorboard_dir = parameters.get('tensorboard_dir', None)
91 
92  batch_size = parameters.get('batch_size', 100)
93  seed = parameters.get('seed', None)
94 
95  # postprocessing parameters, from dictionary
96  transform_to_probability = parameters.get('transform_to_probability', False)
97 
98  # initialize session
99  tf.reset_default_graph()
100  gpu_options = tf.GPUOptions(allow_growth=True)
101 
102  # set random state
103  if seed:
104  print('Seed: ', seed)
105  tf.set_random_seed(seed)
106 
107  # mask cuda devices
108  os.environ['CUDA_VISIBLE_DEVICES'] = cuda_mask
109  session = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))
110 
111  # initialize model
112  x = tf.placeholder(tf.float32, [None, number_of_features])
113  y = tf.placeholder(tf.float32, [None, 1])
114 
115  # using a stub data set since there is no data available at this state
116  stub_data_set = TfDataBasf2Stub(batch_size, number_of_features, number_of_events, training_fraction)
117 
118  # set saving file name, unfortunately this is already required in partial_fit
119  save_dir = tempfile.TemporaryDirectory()
120  save_name = os.path.join(save_dir.name, 'mymodel')
121 
122  model = get_tensorflow_model(number_of_features, parameters)
123  training = tfm.Trainer(model, stub_data_set, session, tensorboard_dir, save_name, input_placeholders=[x, y])
124 
125  state = State(x, y, session=session)
126 
127  # training object is required in partial fit
128  state.training = training
129  state.batch_size = batch_size
130  state.save_dir = save_dir
131 
132  state.transform_to_probability = transform_to_probability
133 
134  # save parameters
135  saved_parameters = parameters.copy()
136  saved_parameters['number_of_features'] = number_of_features
137  state.parameters = json.dumps(saved_parameters)
138  state.seed = seed
139  return state
140 
141 
142 def apply(state, X):
143  """
144  modified apply function
145  """
146 
147  binning.transform_ndarray(X, state.binning_parameters)
148  chunk_size = 1000000
149  if len(X) > chunk_size:
150  results = list()
151  for i in range(0, len(X), chunk_size):
152  results.append(state.session.run(state.activation, feed_dict={state.x: X[i: i + chunk_size]}))
153  r = np.concatenate(results).flatten()
154  else:
155  r = state.session.run(state.activation, feed_dict={state.x: X}).flatten()
156  if state.transform_to_probability:
157  binning.transform_array_to_sf(r, state.sig_back_tuple, signal_fraction=.5)
158 
159  return np.require(r, dtype=np.float32, requirements=['A', 'W', 'C', 'O'])
160 
161 
162 def load(obj):
163  """
164  Load Tensorflow estimator into state
165  """
166  # tensorflow operations
167  tf.reset_default_graph()
168  config = tf.ConfigProto()
169  config.gpu_options.allow_growth = True
170  session = tf.Session(config=config)
171 
172  parameters = json.loads(obj[0])
173 
174  number_of_features = parameters.pop('number_of_features')
175 
176  x = tf.placeholder(tf.float32, [None, number_of_features])
177  y = tf.placeholder(tf.float32, [None, 1])
178 
179  class DataStub:
180  """
181  simple stub obj
182  """
183  feature_number = number_of_features
184  batches = 1
185 
186  model = get_tensorflow_model(number_of_features, parameters)
187  model.initialize(DataStub(), [x, y])
188  saver = tf.train.Saver()
189 
190  # tensorflow is a moving target, file loading and saving of mid-level api changes rapidly. so we use the legacy here
191  with tempfile.TemporaryDirectory() as path:
192  with open(os.path.join(path, obj[1] + '.data-00000-of-00001'), 'w+b') as file1, open(
193  os.path.join(path, obj[1] + '.index'), 'w+b') as file2:
194  file1.write(bytes(obj[2]))
195  file2.write(bytes(obj[3]))
196  tf.train.update_checkpoint_state(path, obj[1])
197  saver.restore(session, os.path.join(path, obj[1]))
198 
199  # load and initialize required objects
200  state = State(x, y, session=session)
201  state.activation = model.mlp.output
202 
203  # preprocessing parameters
204  state.binning_parameters = obj[4]
205 
206  # postprocessing transform to probability, if pdf was sampled during training
207  state.transform_to_probability = obj[5]
208  state.sig_back_tuple = obj[6]
209 
210  seed = obj[7]
211  print('Deep FlavorTagger loading... Training seed: ', seed)
212 
213  return state
214 
215 
216 def begin_fit(state, Xtest, Stest, ytest, wtest):
217  """
218  use test sets for monitoring
219  """
220  # TODO: split this set to define an independent test set for transformations to probability
221 
222  state.Xvalid = Xtest[:len(Xtest) // 2]
223  state.yvalid = ytest[:len(ytest) // 2]
224 
225  state.Xtest = Xtest[len(Xtest) // 2:]
226  state.ytest = ytest[len(ytest) // 2:]
227 
228  return state
229 
230 
231 def partial_fit(state, X, S, y, w, epoch):
232  """
233  returns fractions of training and testing dataset, also uses weights
234  :param X: unprocessed training dataset
235  :param Xtest: unprocessed validation dataset
236  :return: bool, True == continue, False == stop iterations
237  """
238 
239  # training is performed in a single epoch
240  if epoch > 0:
241  raise RuntimeError
242 
243  # preprocessing
244  state.binning_parameters = binning.get_ndarray_binning_parameters(X)
245 
246  binning.transform_ndarray(X, state.binning_parameters)
247  binning.transform_ndarray(state.Xvalid, state.binning_parameters)
248 
249  if np.all(np.isnan(X)):
250  raise ValueError('NaN values in Dataset. Preprocessing transformations failed.')
251 
252  # replace stub dataset
253  data_set = TfDataBasf2(X, y, state.Xvalid, state.yvalid, state.batch_size, seed=state.seed)
254 
255  state.training.data_set = data_set
256 
257  # start training
258  state.training.train_model()
259 
260  return False
261 
262 
263 def end_fit(state):
264  """
265  save the trained model
266  :param state:
267  :return:
268  """
269  filename = state.training.save_name
270  with open(filename + '.data-00000-of-00001', 'rb') as file1, open(filename + '.index', 'rb') as file2:
271  data1 = file1.read()
272  data2 = file2.read()
273  binning_parameters = state.binning_parameters
274 
275  # transform to probability has to be saved since state object has to return untransformed network output
276  transform_to_probability = state.transform_to_probability
277  state.transform_to_probability = False
278 
279  # sample pdfs of trained model on test_dataset, return test df
280  state.get_from_collection()
281  y_hat = state(*state.Xtest)
282  test_df = pandas.DataFrame.from_dict({'y': state.ytest.reshape(-1), 'y_hat': y_hat.reshape(-1)})
283  (sig_pdf, back_pdf) = binning.get_signal_background_pdf(test_df)
284  seed = state.seed
285  parameters = state.parameters
286  del state
287  return [parameters, os.path.basename(filename), data1, data2, binning_parameters, transform_to_probability,
288  (sig_pdf, back_pdf), seed]