Belle II Software development
tensorflow_dnn_model.py
1#!/usr/bin/env python3
2
3
10
11
12import os
13import sys
14import time
15import numpy as np
16
17import tensorflow as tf
18from tqdm import trange
19
20
21class Layer(tf.Module):
22 """
23 definition of a layer obj
24 """
25
26 def __init__(self, name, tf_activation_str, dim_input, dim_output, p_bias, p_w,
27 random_seed=None):
28 """
29 :param name: name of the layer.
30 :param tf_activation: string, name of an available tensorflow activations function
31 :param dim_input: dimension of the input
32 :param dim_output: dimension of the output
33 :param p_bias: initial bias
34 :param p_w: stddev of uniform distribution to initialize
35 :param random_seed: random seed used in initialising the weights
36 :return: None
37 """
38
39 super().__init__(name=name)
40
41 tf_activation_dict = {
42 'tanh': tf.nn.tanh,
43 'sigmoid': tf.nn.sigmoid,
44 'relu': tf.nn.relu,
45 'leaky_relu': tf.nn.leaky_relu,
46 }
47
48 if tf_activation_str not in tf_activation_dict:
49 raise ValueError
50
51
52 self.tf_activation = tf_activation_dict[tf_activation_str]
53
54
55 self.shape = [dim_input, dim_output]
56
57
58 self.w = self._init_weight(self.shape, p_w, random_seed)
59
60
61 self.b = self._init_bias(self.shape[1], p_bias)
62
63
64 self.input = None
65
66
67 self.output = None
68
69 def _init_bias(self, width, init_val, name=None):
70 """
71 define bias variables
72 """
73 if name is None:
74 name = self.name + '_b'
75 initial = tf.constant(init_val, shape=[width], name=name)
76 return tf.Variable(initial, name=name, trainable=True)
77
78 def _init_weight(self, shape, stddev, operation_seed, name=None):
79 """
80 define weight variables
81 """
82 if name is None:
83 name = self.name + '_w'
84 initial = tf.random.truncated_normal(shape, stddev=stddev, seed=operation_seed, name=name)
85 return tf.Variable(initial, name=name, trainable=True)
86
87 @tf.function
88 def __call__(self, x):
89 """
90 evaluate the layer
91 """
92 return self.tf_activation(tf.matmul(x, self.w) + self.b)
93
94 def variable_to_summary(self, var, step, writer):
95 """
96 Passes information about each variable to the summary writer.
97 """
98 with writer.as_default():
99 mean = tf.reduce_mean(var)
100 stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
101 tf.summary.scalar(f'{var.name}_mean', mean, step=step)
102 tf.summary.scalar(f'{var.name}_stddev', stddev, step=step)
103 tf.summary.scalar(f'{var.name}_max', tf.reduce_max(var), step=step)
104 tf.summary.scalar(f'{var.name}_min', tf.reduce_min(var), step=step)
105 tf.summary.histogram(f'{var.name}_histogram', var, step=step)
106 writer.flush()
107 return
108
109 def all_to_summary(self, step, writer):
110 """
111 Passes all layer variables to the tf.summary writer.
112 """
113 self.variable_to_summary(self.w, step=step, writer=writer)
114 self.variable_to_summary(self.b, step=step, writer=writer)
115 return
116
117
118class MultilayerPerceptron(tf.Module):
119 """
120 multilayer perceptron class.
121 """
122
123 def __init__(self, layers, name='mlp'):
124 """
125 initialization
126 """
127 super().__init__(name=name)
128
129
130 self.layers = layers
131
132
133 self.w = None
134
135
136 self.b = None
137
138
139 self.is_initialized = False
140 self.initialize()
141
142 @classmethod
143 def from_list(cls, layers):
144 """
145 define layers from list
146 """
147 layer_obj = []
148 for layer in layers:
149 layer_obj.append(Layer(*layer))
150
151 mlp = cls(layer_obj)
152 return mlp
153
155 """
156 collect tunable parameters
157 """
158 self.w = []
159 self.b = []
160 for layer in self.layers:
161 self.w.append(layer.w)
162 self.b.append(layer.b)
163
164 def initialize(self):
165 """
166 initialize. Checks that the layer dimensions align.
167 """
168 if self.is_initialized:
169 raise RuntimeError
170
171 # check shape
172 for _idx in range(len(self.layers) - 1):
173 assert self.layers[_idx].shape[1] == self.layers[_idx + 1].shape[0]
174
175 self._collect_weights_and_biases() # TODO - remove?
176
177 self.is_initialized = True
178 return
179
180 @tf.function
181 def __call__(self, x):
182 """
183 Run the events through all the layers
184 """
185 for layer in self.layers:
186 x = layer(x)
187 return x
188
189 def variables_to_writer(self, step, writer):
190 """
191 passes all the MLP variables to the tf.summary writer
192 """
193 for layer in self.layers:
194 layer.all_to_summary(step, writer)
195 return
196
197
198class DefaultModel(tf.Module):
199 """
200 define the default model
201 """
202
203 def __init__(self, mlp,
204 mom_init=.9,
205 mom_max=.99,
206 mom_epochs=200,
207 lr_init=.05,
208 lr_min=1e-6,
209 lr_dec_rate=.976,
210 stop_epochs=10,
211 min_epochs=200,
212 max_epochs=1000,
213 wd_coeffs=None,
214 change_optimizer=None,
215 staircase=True,
216 smooth_cross_entropy=False):
217 """
218 initialization function
219 :param mlp: network model.
220 :param mom_init: initial momentum
221 :param mom_max: maximum momentum
222 :param mom_epochs: momentum epochs
223 :param lr_init: initial learning rate
224 :param lr_min: minimum learning rate
225 :param lr_dec_rate: learning rate decay factor
226 :param stop_epochs: number of epochs without improvement required for early termination
227 :param min_epochs: minimum number of epochs for training
228 :param max_epochs: maximum number of epochs for training
229 :param wd_coeffs: weight decay coefficients. If not None must have one per mlp layer.
230 :param change_optimizer:
231 :param staircaise:
232 "param smooth_cross_entropy:
233 """
234
235
236 self.mlp = mlp
237
238 if wd_coeffs is not None:
239 assert len(wd_coeffs) == len(mlp.layers)
240
241
242 self.wd_coeffs = wd_coeffs
243
244
245 self.global_step = tf.Variable(0, trainable=False, name='global_step', dtype=tf.int64)
246
247 # --optimizer params--
248
249 self.c_mom_init = tf.constant(mom_init, dtype=tf.float32)
250
251
252 self.c_mom_max = tf.constant(mom_max, dtype=tf.float32)
253
254
255 self.c_mom_epochs = tf.constant(mom_epochs, dtype=tf.float32)
256
257
258 self.c_mom_dec_rate = (self.c_mom_max - self.c_mom_init) / tf.cast(self.c_mom_epochs, tf.float32)
259
260
261 self.c_lr_init = tf.constant(lr_init, dtype=tf.float32)
262
263
264 self.c_lr_min = tf.constant(lr_min, dtype=tf.float32)
265
266
267 self.c_lr_dec_rate = tf.constant(lr_dec_rate, dtype=tf.float32)
268
269
270 self.c_stop_epochs = stop_epochs
271
272
273 self.c_staircase = staircase
274
275
277
278
279 self.optimizers = []
280 # list with epochs in which optimizers will be changed, if None is given, only the default optimizer will be
281
282 self.optimizer_change_epochs = change_optimizer
283 if change_optimizer is not None:
284 self.optimizer_change_epochs.insert(0, 0)
285 self.optimizer_change_epochs.append(sys.maxsize)
286
287 # termination criterion
288
289 self.min_epochs = min_epochs
290
291
292 self.max_epochs = max_epochs
293
294
296
297
299
300
301 self.best_value = np.inf
302
303
305
306
307 self.smooth_cross_entropy = smooth_cross_entropy
308
309
310 self.is_initialized = False
311
312 return
313
314 # needs to be called since parameters depends on number of batches
315
316 def initialize(self, data_set):
317 """
318 Finalises initialization based of data_set specific information (number of batches per epoch)
319 """
320 if self.is_initialized:
321 raise RuntimeError
322
323 self.batches_per_epoch = data_set.batches
324
325 # check layer dimensions align properly
326 if not self.mlp.is_initialized:
327 self.mlp.initialize()
328
329 self._set_optimizer()
331
332 self.is_initialized = True
333 return
334
335 @tf.function
336 def __call__(self, x):
337 """
338 Call the mlp
339 """
340 return self.mlp(x)
341
343 monitoring_param,
344 epoch,
345 prop_dec=1e-5):
346 """
347 early stopping criterion
348
349 :param monitoring_param: the parameter to monitor for early termination
350 :param epoch: the current epoch
351 :param prop_dec:
352 :return:
353 """
354 if epoch < self.min_epochs:
355 return False
356
357 if monitoring_param < self.best_value * (1. - prop_dec):
358 self.step_countdown = self.c_stop_epochs
359 self.best_value = monitoring_param
360 else:
361 self.step_countdown -= 1
362
363 if self.step_countdown > 0:
364 return False
365 return True
366
368 """
369 Returns the learning rate at the current global step.
370 """
371 p = tf.cast(self.global_step, tf.float32) / tf.cast(self.batches_per_epoch, tf.float32)
372 if self.c_staircase:
373 p = tf.floor(p)
374 return tf.maximum(tf.multiply(self.c_lr_init, tf.pow(self.c_lr_dec_rate, p)), self.c_lr_min)
375
376 def _get_momentum(self):
377 """
378 returns the momentum at the current global step.
379 """
380 t_batches_per_epoch = tf.constant(self.batches_per_epoch, dtype=tf.float32)
381 global_step = tf.cast(self.global_step, tf.float32)
382
383 if self.c_staircase:
384 t_limited_mom = tf.minimum(self.c_mom_init + self.c_mom_dec_rate *
385 tf.floor(global_step / t_batches_per_epoch), self.c_mom_max)
386 else:
387 t_limited_mom = tf.minimum(self.c_mom_init + self.c_mom_dec_rate *
388 (global_step / t_batches_per_epoch), self.c_mom_max)
389 return t_limited_mom
390
391 def _set_optimizer(self):
392 """
393 set optimizers
394 """
395 self.optimizers.append(tf.optimizers.SGD(learning_rate=self._get_learning_rate,
396 momentum=self._get_momentum))
397 self.optimizers.append(tf.optimizers.SGD(learning_rate=self._get_learning_rate))
398 return
399
400 def get_optimizer(self, epoch=0):
401 """
402 get the optimizer. If multiple optimizers are booked gets the one appropriate for the epoch.
403
404 :param epoch: current epoch.
405 """
406
407 if self.optimizer_change_epochs is None:
408 return self.optimizers[0]
409
410 if len(self.optimizer_change_epochs) > len(self.optimizers) + 1:
411 raise RuntimeError
412
413 # switch optimizer for given epoch
414 for i in range(1, len(self.optimizer_change_epochs)):
415 if self.optimizer_change_epochs[i - 1] <= epoch < self.optimizer_change_epochs[i]:
416 return self.optimizers[i - 1]
417
418 def loss(self, predict_y, true_y):
419 """
420 calculate the loss
421
422 :param predict_y: predicted labels
423 :param true_y: true labels
424 """
425
426 epsilon = 1e-10
427 t_epsilon = tf.constant(epsilon)
428
429 # sum over batch
430 with tf.name_scope('cross_entropy'):
431 if self.smooth_cross_entropy:
432 cross_entropy = -tf.reduce_mean(tf.reduce_sum((true_y * tf.math.log(predict_y + t_epsilon) + (1 - true_y) *
433 tf.math.log(1 - predict_y + t_epsilon)), 1))
434 else:
435 cross_entropy = -tf.reduce_mean(tf.reduce_sum((true_y * tf.math.log(tf.clip_by_value(predict_y, epsilon, 1))) +
436 ((1 - true_y) * tf.math.log(tf.clip_by_value((1 - predict_y),
437 epsilon, 1))), 1))
438 loss = cross_entropy
439
440 if self.wd_coeffs:
441 wd_coeffs = self.wd_coeffs
442 weights = self.mlp.w
443
444 wd = [tf.constant(coeff) * tf.nn.l2_loss(w) for coeff, w in zip(wd_coeffs, weights)]
445 loss += sum(wd)
446
447 return loss, cross_entropy
448
449
451 """
452 handling the training of the network model
453 """
454
455 def __init__(self,
456 model,
457 data_set,
458 log_dir=None,
459 save_name=None,
460 monitoring_size=10000):
461 """
462 class to train a predefined model
463 :param model: DefaultModel obj
464 :param data_set: TFData obj
465 :param log_dir: str, directory name of tensorboard logging
466 :param save_name: str, path and name for saving the weightfiles
467 :param monitoring_size: int, number of events of training fraction used for monitoring
468 """
469
470
471 self._time = time.time()
472
473
474 self.model = model
475
476
477 self.data_set = data_set
478 self.model.initialize(data_set)
479
480
481 self.monitoring_size = monitoring_size
482
483
484 self.log_dir = log_dir
485
486
487 self.termination_criterion = self.model.termination_criterion
488
489
491
492
493 self.best_epoch = -np.inf
494
495 if log_dir is not None:
496 self._prepare_tensorboard(log_dir)
497
498 if save_name is None:
499 time_str = time.strftime("%Y%m%d-%H%M%S")
500 save_name = os.path.join(os.getcwd(), '_'.join([time_str, 'model']))
501
502
503 self.save_name = save_name
504
506 return
507
509 """
510 checking dataset sizes for evaluation. These samples are used after each epoch to collect
511 summary statistics and test early stopping criteria.
512 """
513
515
517 if self.data_set.train_events > self.monitoring_size:
519
520 if self.data_set.valid_events > self.monitoring_size:
522 return
523
524 def _prepare_tensorboard(self, log_dir):
525 """
526 prepare tensorboard
527 """
528 log_dir_train = os.path.join(log_dir, 'train')
529 log_dir_valid = os.path.join(log_dir, 'valid')
530
531
532 self.train_writer = tf.summary.create_file_writer(log_dir_train)
533
534
535 self.valid_writer = tf.summary.create_file_writer(log_dir_valid)
536 return
537
538 def _train_epoch(self, current_epoch):
539 """
540 train epoch
541 """
542
543 self.optimizer = self.model.get_optimizer(current_epoch)
544
545 batch_iter = self.data_set.batch_iterator()
546
547 t_range = trange(self.data_set.batches)
548 t_range.set_description(f'Epoch {current_epoch:4d}')
549 for i in t_range:
550
551 batch = next(batch_iter)
552
553 batch_x = batch[0]
554 batch_y = batch[1]
555
556 with tf.GradientTape() as tape:
557 loss, _ = self.model.loss(self.model(batch_x), batch_y)
558 grads = tape.gradient(loss, self.model.trainable_variables)
559
560 self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
561
562 # write the learning rate and momentum to the tensorbord log
563 if self.log_dir is not None:
564 with self.train_writer.as_default():
565 tf.summary.scalar('learning_rate', self.model._get_learning_rate(), step=self.model.global_step)
566 tf.summary.scalar('momentum', self.model._get_momentum(), step=self.model.global_step)
567 self.train_writer.flush()
568
569 self.model.global_step.assign_add(1)
570
571 train_x = self.data_set.train_x[:self.train_monitor]
572 train_y = self.data_set.train_y[:self.train_monitor]
573
574 valid_x = self.data_set.valid_x[:self.valid_monitor]
575 valid_y = self.data_set.valid_y[:self.valid_monitor]
576
577 # run the training and validation samples to collect statistics
578 train_loss, train_cross_entropy = self.model.loss(self.model(train_x), train_y)
579 valid_loss, valid_cross_entropy = self.model.loss(self.model(valid_x), valid_y)
580
581 # if we have a log_dir set write extra summary information
582 if self.log_dir is not None:
583 with self.train_writer.as_default():
584 tf.summary.scalar('loss', train_loss, step=current_epoch)
585 tf.summary.scalar('cross_entropy', train_cross_entropy, step=current_epoch)
586
587 # this is now at the end of each epoch
588 tf.summary.scalar('epoch_learning_rate', self.model._get_learning_rate(), step=current_epoch)
589 tf.summary.scalar('epoch_momentum', self.model._get_momentum(), step=current_epoch)
590 self.train_writer.flush()
591
592 # write all the model parameters to the summary file too
593 self.model.mlp.variables_to_writer(current_epoch, self.train_writer)
594
595 with self.valid_writer.as_default():
596 tf.summary.scalar('loss', valid_loss, step=current_epoch)
597 tf.summary.scalar('cross_entropy', valid_cross_entropy, step=current_epoch)
598 tf.summary.scalar('best_epoch', self.best_epoch, step=current_epoch)
599 self.valid_writer.flush()
600
601 # update time
602 self._time = time.time()
603 self.current_epoch += 1
604
605 return valid_cross_entropy
606
607 def _save_best_state(self, cross_entropy):
608 """
609 save model as a checkpoint only if a global minimum is reached on validation sample
610 :return:
611 """
612 # current state - do we need this?
613 checkpoint = tf.train.Checkpoint(self.model)
614 checkpoint.save(self.save_name.replace('model', 'model_current'))
615
616 # check for a not set best value
617 if self.model.best_value == np.inf:
618 return
619
620 if cross_entropy < self.model.best_value:
621 self.best_epoch = self.current_epoch
622 checkpoint.save(self.save_name)
623 return
624
625 def _closing_ops(self):
626 """
627 closing operations
628 """
629 if self.log_dir is not None:
630 self.train_writer.close()
631 self.valid_writer.close()
632 return
633
634 def train_model(self):
635 """
636 train model
637 """
638 for epoch in range(self.model.max_epochs):
639 valid_cross_entropy = self._train_epoch(epoch)
640
641 self._save_best_state(valid_cross_entropy)
642
643 if self.termination_criterion(valid_cross_entropy, epoch):
644 break
645
646 self._closing_ops()
647 return
def _default_termination_criterion(self, monitoring_param, epoch, prop_dec=1e-5)
best_value
the best value will be set a default start value, then updated with the termination criterion
def __init__(self, mlp, mom_init=.9, mom_max=.99, mom_epochs=200, lr_init=.05, lr_min=1e-6, lr_dec_rate=.976, stop_epochs=10, min_epochs=200, max_epochs=1000, wd_coeffs=None, change_optimizer=None, staircase=True, smooth_cross_entropy=False)
c_stop_epochs
number of epochs without improvement for early termination
batches_per_epoch
batches per epoch unknown.
smooth_cross_entropy
True for a small epsilon addition, false for a clipped network output.
def _init_bias(self, width, init_val, name=None)
def _init_weight(self, shape, stddev, operation_seed, name=None)
w
init parameters for uniform distribution
def variable_to_summary(self, var, step, writer)
def __init__(self, name, tf_activation_str, dim_input, dim_output, p_bias, p_w, random_seed=None)
def all_to_summary(self, step, writer)
def _save_best_state(self, cross_entropy)
def __init__(self, model, data_set, log_dir=None, save_name=None, monitoring_size=10000)
optimizer
set optimizer for this epoch
valid_writer
tf.summary.writer for validation
train_writer
tf.summary.writer for training
current_epoch
initialise current epoch
def _train_epoch(self, current_epoch)
save_name
set the path and name for saving the weightfiles
termination_criterion
termination criterion
Definition: train.py:1