Belle II Software  release-08-01-10
tensorflow_dnn_model.py
1 #!/usr/bin/env python3
2 
3 
10 
11 
12 import os
13 import sys
14 import time
15 import numpy as np
16 
17 import tensorflow as tf
18 from tqdm import trange
19 
20 
21 class Layer(tf.Module):
22  """
23  definition of a layer obj
24  """
25 
26  def __init__(self, name, tf_activation_str, dim_input, dim_output, p_bias, p_w,
27  random_seed=None):
28  """
29  :param name: name of the layer.
30  :param tf_activation: string, name of an available tensorflow activations function
31  :param dim_input: dimension of the input
32  :param dim_output: dimension of the output
33  :param p_bias: initial bias
34  :param p_w: stddev of uniform distribution to initialize
35  :param random_seed: random seed used in initialising the weights
36  :return: None
37  """
38 
39  super().__init__(name=name)
40 
41  tf_activation_dict = {
42  'tanh': tf.nn.tanh,
43  'sigmoid': tf.nn.sigmoid,
44  'relu': tf.nn.relu,
45  'leaky_relu': tf.nn.leaky_relu,
46  }
47 
48  if tf_activation_str not in tf_activation_dict:
49  raise ValueError
50 
51 
52  self.tf_activationtf_activation = tf_activation_dict[tf_activation_str]
53 
54 
55  self.shapeshape = [dim_input, dim_output]
56 
57 
58  self.ww = self._init_weight_init_weight(self.shapeshape, p_w, random_seed)
59 
60 
61  self.bb = self._init_bias_init_bias(self.shapeshape[1], p_bias)
62 
63 
64  self.inputinput = None
65 
66 
67  self.outputoutput = None
68 
69  def _init_bias(self, width, init_val, name=None):
70  """
71  define bias variables
72  """
73  if name is None:
74  name = self.name + '_b'
75  initial = tf.constant(init_val, shape=[width], name=name)
76  return tf.Variable(initial, name=name, trainable=True)
77 
78  def _init_weight(self, shape, stddev, operation_seed, name=None):
79  """
80  define weight variables
81  """
82  if name is None:
83  name = self.name + '_w'
84  initial = tf.random.truncated_normal(shape, stddev=stddev, seed=operation_seed, name=name)
85  return tf.Variable(initial, name=name, trainable=True)
86 
87  @tf.function
88  def __call__(self, x):
89  """
90  evaluate the layer
91  """
92  return self.tf_activationtf_activation(tf.matmul(x, self.ww) + self.bb)
93 
94  def variable_to_summary(self, var, step, writer):
95  """
96  Passes information about each variable to the summary writer.
97  """
98  with writer.as_default():
99  mean = tf.reduce_mean(var)
100  stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
101  tf.summary.scalar(f'{var.name}_mean', mean, step=step)
102  tf.summary.scalar(f'{var.name}_stddev', stddev, step=step)
103  tf.summary.scalar(f'{var.name}_max', tf.reduce_max(var), step=step)
104  tf.summary.scalar(f'{var.name}_min', tf.reduce_min(var), step=step)
105  tf.summary.histogram(f'{var.name}_histogram', var, step=step)
106  writer.flush()
107  return
108 
109  def all_to_summary(self, step, writer):
110  """
111  Passes all layer variables to the tf.summary writer.
112  """
113  self.variable_to_summaryvariable_to_summary(self.ww, step=step, writer=writer)
114  self.variable_to_summaryvariable_to_summary(self.bb, step=step, writer=writer)
115  return
116 
117 
118 class MultilayerPerceptron(tf.Module):
119  """
120  multilayer perceptron class.
121  """
122 
123  def __init__(self, layers, name='mlp'):
124  """
125  initialization
126  """
127  super().__init__(name=name)
128 
129 
130  self.layerslayers = layers
131 
132 
133  self.ww = None
134 
135 
136  self.bb = None
137 
138 
139  self.is_initializedis_initialized = False
140  self.initializeinitialize()
141 
142  @classmethod
143  def from_list(cls, layers):
144  """
145  define layers from list
146  """
147  layer_obj = []
148  for layer in layers:
149  layer_obj.append(Layer(*layer))
150 
151  mlp = cls(layer_obj)
152  return mlp
153 
155  """
156  collect tunable parameters
157  """
158  self.ww = []
159  self.bb = []
160  for layer in self.layerslayers:
161  self.ww.append(layer.w)
162  self.bb.append(layer.b)
163 
164  def initialize(self):
165  """
166  initialize. Checks that the layer dimensions align.
167  """
168  if self.is_initializedis_initialized:
169  raise RuntimeError
170 
171  # check shape
172  for _idx in range(len(self.layerslayers) - 1):
173  assert self.layerslayers[_idx].shape[1] == self.layerslayers[_idx + 1].shape[0]
174 
175  self._collect_weights_and_biases_collect_weights_and_biases() # TODO - remove?
176 
177  self.is_initializedis_initialized = True
178  return
179 
180  @tf.function
181  def __call__(self, x):
182  """
183  Run the events through all the layers
184  """
185  for layer in self.layerslayers:
186  x = layer(x)
187  return x
188 
189  def variables_to_writer(self, step, writer):
190  """
191  passes all the MLP variables to the tf.summary writer
192  """
193  for layer in self.layerslayers:
194  layer.all_to_summary(step, writer)
195  return
196 
197 
198 class DefaultModel(tf.Module):
199  """
200  define the default model
201  """
202 
203  def __init__(self, mlp,
204  mom_init=.9,
205  mom_max=.99,
206  mom_epochs=200,
207  lr_init=.05,
208  lr_min=1e-6,
209  lr_dec_rate=.976,
210  stop_epochs=10,
211  min_epochs=200,
212  max_epochs=1000,
213  wd_coeffs=None,
214  change_optimizer=None,
215  staircase=True,
216  smooth_cross_entropy=False):
217  """
218  initialization function
219  :param mlp: network model.
220  :param mom_init: initial momentum
221  :param mom_max: maximum momentum
222  :param mom_epochs: momentum epochs
223  :param lr_init: initial learning rate
224  :param lr_min: minimum learning rate
225  :param lr_dec_rate: learning rate decay factor
226  :param stop_epochs: number of epochs without improvement required for early termination
227  :param min_epochs: minimum number of epochs for training
228  :param max_epochs: maximum number of epochs for traning
229  :param wd_coeffs: weight decay coefficients. If not None must have one per mlp layer.
230  :param change_optimizer:
231  :param staircaise:
232  "param smooth_cross_entropy:
233  """
234 
235 
236  self.mlpmlp = mlp
237 
238  if wd_coeffs is not None:
239  assert len(wd_coeffs) == len(mlp.layers)
240 
241 
242  self.wd_coeffswd_coeffs = wd_coeffs
243 
244 
245  self.global_stepglobal_step = tf.Variable(0, trainable=False, name='global_step', dtype=tf.int64)
246 
247  # --optimizer params--
248 
249  self.c_mom_initc_mom_init = tf.constant(mom_init, dtype=tf.float32)
250 
251 
252  self.c_mom_maxc_mom_max = tf.constant(mom_max, dtype=tf.float32)
253 
254 
255  self.c_mom_epochsc_mom_epochs = tf.constant(mom_epochs, dtype=tf.float32)
256 
257 
258  self.c_mom_dec_ratec_mom_dec_rate = (self.c_mom_maxc_mom_max - self.c_mom_initc_mom_init) / tf.cast(self.c_mom_epochsc_mom_epochs, tf.float32)
259 
260 
261  self.c_lr_initc_lr_init = tf.constant(lr_init, dtype=tf.float32)
262 
263 
264  self.c_lr_minc_lr_min = tf.constant(lr_min, dtype=tf.float32)
265 
266 
267  self.c_lr_dec_ratec_lr_dec_rate = tf.constant(lr_dec_rate, dtype=tf.float32)
268 
269 
270  self.c_stop_epochsc_stop_epochs = stop_epochs
271 
272 
273  self.c_staircasec_staircase = staircase
274 
275 
276  self.batches_per_epochbatches_per_epoch = None
277 
278 
279  self.optimizersoptimizers = []
280  # list with epochs in which optimizers will be changed, if None is given, only the default optimizer will be
281 
282  self.optimizer_change_epochsoptimizer_change_epochs = change_optimizer
283  if change_optimizer is not None:
284  self.optimizer_change_epochsoptimizer_change_epochs.insert(0, 0)
285  self.optimizer_change_epochsoptimizer_change_epochs.append(sys.maxsize)
286 
287  # termination criterion
288 
289  self.min_epochsmin_epochs = min_epochs
290 
291 
292  self.max_epochsmax_epochs = max_epochs
293 
294 
295  self.termination_criteriontermination_criterion = None
296 
297 
298  self.recent_paramsrecent_params = []
299 
300 
301  self.best_valuebest_value = np.inf
302 
303 
304  self.step_countdownstep_countdown = self.c_stop_epochsc_stop_epochs
305 
306 
307  self.smooth_cross_entropysmooth_cross_entropy = smooth_cross_entropy
308 
309 
310  self.is_initializedis_initialized = False
311 
312  return
313 
314  # needs to be called since parameters depends on number of batches
315 
316  def initialize(self, data_set):
317  """
318  Finalises intitialization based of data_set specific information (number of batches per epoch)
319  """
320  if self.is_initializedis_initialized:
321  raise RuntimeError
322 
323  self.batches_per_epochbatches_per_epoch = data_set.batches
324 
325  # check layer dimensions align properly
326  if not self.mlpmlp.is_initialized:
327  self.mlpmlp.initialize()
328 
329  self._set_optimizer_set_optimizer()
330  self.termination_criteriontermination_criterion = self._default_termination_criterion_default_termination_criterion
331 
332  self.is_initializedis_initialized = True
333  return
334 
335  @tf.function
336  def __call__(self, x):
337  """
338  Call the mlp
339  """
340  return self.mlpmlp(x)
341 
343  monitoring_param,
344  epoch,
345  prop_dec=1e-5):
346  """
347  early stopping criterion
348 
349  :param monitoring_param: the parameter to monitor for early termination
350  :param epoch: the current epoch
351  :param prop_dec:
352  :return:
353  """
354  if epoch < self.min_epochsmin_epochs:
355  return False
356 
357  if monitoring_param < self.best_valuebest_value * (1. - prop_dec):
358  self.step_countdownstep_countdown = self.c_stop_epochsc_stop_epochs
359  self.best_valuebest_value = monitoring_param
360  else:
361  self.step_countdownstep_countdown -= 1
362 
363  if self.step_countdownstep_countdown > 0:
364  return False
365  return True
366 
368  """
369  Returns the learning rate at the current global step.
370  """
371  p = tf.cast(self.global_stepglobal_step, tf.float32) / tf.cast(self.batches_per_epochbatches_per_epoch, tf.float32)
372  if self.c_staircasec_staircase:
373  p = tf.floor(p)
374  return tf.maximum(tf.multiply(self.c_lr_initc_lr_init, tf.pow(self.c_lr_dec_ratec_lr_dec_rate, p)), self.c_lr_minc_lr_min)
375 
376  def _get_momentum(self):
377  """
378  returns the momentum at the current global step.
379  """
380  t_batches_per_epoch = tf.constant(self.batches_per_epochbatches_per_epoch, dtype=tf.float32)
381  global_step = tf.cast(self.global_stepglobal_step, tf.float32)
382 
383  if self.c_staircasec_staircase:
384  t_limited_mom = tf.minimum(self.c_mom_initc_mom_init + self.c_mom_dec_ratec_mom_dec_rate *
385  tf.floor(global_step / t_batches_per_epoch), self.c_mom_maxc_mom_max)
386  else:
387  t_limited_mom = tf.minimum(self.c_mom_initc_mom_init + self.c_mom_dec_ratec_mom_dec_rate *
388  (global_step / t_batches_per_epoch), self.c_mom_maxc_mom_max)
389  return t_limited_mom
390 
391  def _set_optimizer(self):
392  """
393  set optimizers
394  """
395  self.optimizersoptimizers.append(tf.optimizers.SGD(learning_rate=self._get_learning_rate_get_learning_rate,
396  momentum=self._get_momentum_get_momentum))
397  self.optimizersoptimizers.append(tf.optimizers.SGD(learning_rate=self._get_learning_rate_get_learning_rate))
398  return
399 
400  def get_optimizer(self, epoch=0):
401  """
402  get the optimizer. If multiple optimizers are booked gets the one appropriate for the epoch.
403 
404  :param epoch: current epoch.
405  """
406 
407  if self.optimizer_change_epochsoptimizer_change_epochs is None:
408  return self.optimizersoptimizers[0]
409 
410  if len(self.optimizer_change_epochsoptimizer_change_epochs) > len(self.optimizersoptimizers) + 1:
411  raise RuntimeError
412 
413  # switch optimizer for given epoch
414  for i in range(1, len(self.optimizer_change_epochsoptimizer_change_epochs)):
415  if self.optimizer_change_epochsoptimizer_change_epochs[i - 1] <= epoch < self.optimizer_change_epochsoptimizer_change_epochs[i]:
416  return self.optimizersoptimizers[i - 1]
417 
418  def loss(self, predict_y, true_y):
419  """
420  calculate the loss
421 
422  :param predict_y: predicted labels
423  :param true_y: true labels
424  """
425 
426  epsilon = 1e-10
427  t_epsilon = tf.constant(epsilon)
428 
429  # sum over batch
430  with tf.name_scope('cross_entropy'):
431  if self.smooth_cross_entropysmooth_cross_entropy:
432  cross_entropy = -tf.reduce_mean(tf.reduce_sum((true_y * tf.math.log(predict_y + t_epsilon) + (1 - true_y) *
433  tf.math.log(1 - predict_y + t_epsilon)), 1))
434  else:
435  cross_entropy = -tf.reduce_mean(tf.reduce_sum((true_y * tf.math.log(tf.clip_by_value(predict_y, epsilon, 1))) +
436  ((1 - true_y) * tf.math.log(tf.clip_by_value((1 - predict_y),
437  epsilon, 1))), 1))
438  loss = cross_entropy
439 
440  if self.wd_coeffswd_coeffs:
441  wd_coeffs = self.wd_coeffswd_coeffs
442  weights = self.mlpmlp.w
443 
444  wd = [tf.constant(coeff) * tf.nn.l2_loss(w) for coeff, w in zip(wd_coeffs, weights)]
445  loss += sum(wd)
446 
447  return loss, cross_entropy
448 
449 
450 class Trainer:
451  """
452  handling the training of the network model
453  """
454 
455  def __init__(self,
456  model,
457  data_set,
458  log_dir=None,
459  save_name=None,
460  monitoring_size=10000):
461  """
462  class to train a predefined model
463  :param model: DefaultModel obj
464  :param data_set: TFData obj
465  :param log_dir: str, directory name of tensorboard logging
466  :param save_name: str, path and name for saving the weightfiles
467  :param monitoring_size: int, number of events of training fraction used for monitoring
468  """
469 
470 
471  self._time_time = time.time()
472 
473 
474  self.modelmodel = model
475 
476 
477  self.data_setdata_set = data_set
478  self.modelmodel.initialize(data_set)
479 
480 
481  self.monitoring_sizemonitoring_size = monitoring_size
482 
483 
484  self.log_dirlog_dir = log_dir
485 
486 
487  self.termination_criteriontermination_criterion = self.modelmodel.termination_criterion
488 
489 
490  self.current_epochcurrent_epoch = 0
491 
492 
493  self.best_epochbest_epoch = -np.inf
494 
495  if log_dir is not None:
496  self._prepare_tensorboard_prepare_tensorboard(log_dir)
497 
498  if save_name is None:
499  time_str = time.strftime("%Y%m%d-%H%M%S")
500  save_name = os.path.join(os.getcwd(), '_'.join([time_str, 'model']))
501 
502 
503  self.save_namesave_name = save_name
504 
505  self._prepare_monitoring_prepare_monitoring()
506  return
507 
509  """
510  checking dataset sizes for evaluation. These samples are used after each epoch to collect
511  summary statistics and test early stopping criteria.
512  """
513 
514  self.train_monitortrain_monitor = -1
515 
516  self.valid_monitorvalid_monitor = -1
517  if self.data_setdata_set.train_events > self.monitoring_sizemonitoring_size:
518  self.train_monitortrain_monitor = self.monitoring_sizemonitoring_size
519 
520  if self.data_setdata_set.valid_events > self.monitoring_sizemonitoring_size:
521  self.valid_monitorvalid_monitor = self.monitoring_sizemonitoring_size
522  return
523 
524  def _prepare_tensorboard(self, log_dir):
525  """
526  prepare tensorboard
527  """
528  log_dir_train = os.path.join(log_dir, 'train')
529  log_dir_valid = os.path.join(log_dir, 'valid')
530 
531 
532  self.train_writertrain_writer = tf.summary.create_file_writer(log_dir_train)
533 
534 
535  self.valid_writervalid_writer = tf.summary.create_file_writer(log_dir_valid)
536  return
537 
538  def _train_epoch(self, current_epoch):
539  """
540  train epoch
541  """
542 
543  self.optimizeroptimizer = self.modelmodel.get_optimizer(current_epoch)
544 
545  batch_iter = self.data_setdata_set.batch_iterator()
546 
547  t_range = trange(self.data_setdata_set.batches)
548  t_range.set_description(f'Epoch {current_epoch:4d}')
549  for i in t_range:
550 
551  batch = next(batch_iter)
552 
553  batch_x = batch[0]
554  batch_y = batch[1]
555 
556  with tf.GradientTape() as tape:
557  loss, _ = self.modelmodel.loss(self.modelmodel(batch_x), batch_y)
558  grads = tape.gradient(loss, self.modelmodel.trainable_variables)
559 
560  self.optimizeroptimizer.apply_gradients(zip(grads, self.modelmodel.trainable_variables))
561 
562  # write the learning rate and momentum to the tensorbord log
563  if self.log_dirlog_dir is not None:
564  with self.train_writertrain_writer.as_default():
565  tf.summary.scalar('learning_rate', self.modelmodel._get_learning_rate(), step=self.modelmodel.global_step)
566  tf.summary.scalar('momentum', self.modelmodel._get_momentum(), step=self.modelmodel.global_step)
567  self.train_writertrain_writer.flush()
568 
569  self.modelmodel.global_step.assign_add(1)
570 
571  train_x = self.data_setdata_set.train_x[:self.train_monitortrain_monitor]
572  train_y = self.data_setdata_set.train_y[:self.train_monitortrain_monitor]
573 
574  valid_x = self.data_setdata_set.valid_x[:self.valid_monitorvalid_monitor]
575  valid_y = self.data_setdata_set.valid_y[:self.valid_monitorvalid_monitor]
576 
577  # run the training and validation samples to collect statistics
578  train_loss, train_cross_entropy = self.modelmodel.loss(self.modelmodel(train_x), train_y)
579  valid_loss, valid_cross_entropy = self.modelmodel.loss(self.modelmodel(valid_x), valid_y)
580 
581  # if we have a log_dir set write extra summary information
582  if self.log_dirlog_dir is not None:
583  with self.train_writertrain_writer.as_default():
584  tf.summary.scalar('loss', train_loss, step=current_epoch)
585  tf.summary.scalar('cross_entropy', train_cross_entropy, step=current_epoch)
586 
587  # this is now at the end of each epoch
588  tf.summary.scalar('epoch_learning_rate', self.modelmodel._get_learning_rate(), step=current_epoch)
589  tf.summary.scalar('epoch_momentum', self.modelmodel._get_momentum(), step=current_epoch)
590  self.train_writertrain_writer.flush()
591 
592  # write all the model parameters to the summary file too
593  self.modelmodel.mlp.variables_to_writer(current_epoch, self.train_writertrain_writer)
594 
595  with self.valid_writervalid_writer.as_default():
596  tf.summary.scalar('loss', valid_loss, step=current_epoch)
597  tf.summary.scalar('cross_entropy', valid_cross_entropy, step=current_epoch)
598  tf.summary.scalar('best_epoch', self.best_epochbest_epoch, step=current_epoch)
599  self.valid_writervalid_writer.flush()
600 
601  # update time
602  self._time_time = time.time()
603  self.current_epochcurrent_epoch += 1
604 
605  return valid_cross_entropy
606 
607  def _save_best_state(self, cross_entropy):
608  """
609  save model as a checkpoint only if a global minimum is reached on validation sample
610  :return:
611  """
612  # current state - do we need this?
613  checkpoint = tf.train.Checkpoint(self.modelmodel)
614  checkpoint.save(self.save_namesave_name.replace('model', 'model_current'))
615 
616  # check for a not set best value
617  if self.modelmodel.best_value == np.inf:
618  return
619 
620  if cross_entropy < self.modelmodel.best_value:
621  self.best_epochbest_epoch = self.current_epochcurrent_epoch
622  checkpoint.save(self.save_namesave_name)
623  return
624 
625  def _closing_ops(self):
626  """
627  closing operations
628  """
629  if self.log_dirlog_dir is not None:
630  self.train_writertrain_writer.close()
631  self.valid_writervalid_writer.close()
632  return
633 
634  def train_model(self):
635  """
636  train model
637  """
638  for epoch in range(self.modelmodel.max_epochs):
639  valid_cross_entropy = self._train_epoch_train_epoch(epoch)
640 
641  self._save_best_state_save_best_state(valid_cross_entropy)
642 
643  if self.termination_criteriontermination_criterion(valid_cross_entropy, epoch):
644  break
645 
646  self._closing_ops_closing_ops()
647  return
def _default_termination_criterion(self, monitoring_param, epoch, prop_dec=1e-5)
best_value
the best value will be set a default start value, then updated with the termination criterion
def __init__(self, mlp, mom_init=.9, mom_max=.99, mom_epochs=200, lr_init=.05, lr_min=1e-6, lr_dec_rate=.976, stop_epochs=10, min_epochs=200, max_epochs=1000, wd_coeffs=None, change_optimizer=None, staircase=True, smooth_cross_entropy=False)
c_stop_epochs
number of epochs without improvement for early termination
batches_per_epoch
batches per epoch unknown.
smooth_cross_entropy
True for a small epsilon addition, false for a clipped network output.
def _init_bias(self, width, init_val, name=None)
def _init_weight(self, shape, stddev, operation_seed, name=None)
w
init parameters for uniform distribution
def variable_to_summary(self, var, step, writer)
def __init__(self, name, tf_activation_str, dim_input, dim_output, p_bias, p_w, random_seed=None)
def all_to_summary(self, step, writer)
def _save_best_state(self, cross_entropy)
def __init__(self, model, data_set, log_dir=None, save_name=None, monitoring_size=10000)
optimizer
set optimizer for this epoch
valid_writer
tf.summary.writer for validation
train_writer
tf.summary.writer for training
current_epoch
initialise current epoch
def _train_epoch(self, current_epoch)
save_name
set the path and name for saving the weightfiles
termination_criterion
termination criterion