Belle II Software development
multithreaded.py
1#!/usr/bin/env python3
2
3
10
11import numpy as np
12import tensorflow as tf
13import threading
14import os
15import sys
16import tempfile
17
19
20
21def execute_train_op(state):
22 """
23 Do the actual training multithreaded.
24 """
25 global COORD
26 try:
27 epoch = 0
28 while (not COORD.should_stop()):
29 X, w, y = state.queue.dequeue_many(state.dequeue_batch_size)
30
31 with tf.GradientTape() as tape:
32 avg_cost = state.model.loss(state.model(X), y, w)
33 grads = tape.gradient(avg_cost, state.model.trainable_variables)
34
35 state.model.optimizer.apply_gradients(zip(grads, state.model.trainable_variables))
36
37 if (epoch % 100 == 0):
38 print(f'Step {epoch:d}: Train cost = {avg_cost:.4f}')
39 epoch += 1
40
41 except tf.errors.OutOfRangeError:
42 print('No more items in closed queue, end of training.')
43 COORD.request_stop()
44
45
46def get_model(number_of_features, number_of_spectators, number_of_events, training_fraction, parameters):
47 """
48 Build tensorflow graph, handles parameter and initialise variables
49 """
50
51 param = {'capacity': 1e6, 'min_after_dequeue': 800, 'batch_size': 500}
52
53 if isinstance(parameters, dict):
54 param.update(parameters)
55
56 gpus = tf.config.list_physical_devices('GPU')
57 if gpus:
58 for gpu in gpus:
59 tf.config.experimental.set_memory_growth(gpu, True)
60
61 class my_model(tf.Module):
62
63 def __init__(self, **kwargs):
64 super().__init__(**kwargs)
65
66 self.optimizer = tf.optimizers.Adam(0.01)
67
68 def create_layer_variables(shape, name, activation_function):
69 weights = tf.Variable(
70 tf.random.truncated_normal(shape, stddev=1.0 / np.sqrt(float(shape[0]))),
71 name=f'{name}_weights')
72 biases = tf.Variable(tf.zeros(shape=[shape[1]]), name=f'{name}_biases')
73 return weights, biases, activation_function
74
75 self.n_layers = 3
76 self.layer_variables = []
77
78 shape = [number_of_features, number_of_features]
79 for i in range(self.n_layers - 1):
80 self.layer_variables.append(create_layer_variables(shape, f'inference_hidden{i}', tf.nn.relu))
81 self.layer_variables.append(create_layer_variables([number_of_features, 1], 'inference_sigmoid', tf.nn.sigmoid))
82
83 @tf.function(input_signature=[tf.TensorSpec(shape=[None, number_of_features], dtype=tf.float32)])
84 def __call__(self, x):
85
86 def dense(x, W, b, activation_function):
87 return activation_function(tf.matmul(x, W) + b)
88
89 for i in range(self.n_layers):
90 x = dense(x, *self.layer_variables[i])
91 return x
92
93 @tf.function
94 def loss(self, predicted_y, target_y, w):
95 epsilon = 1e-5
96 diff_from_truth = tf.where(target_y == 1., predicted_y, 1. - predicted_y)
97 cross_entropy = - tf.reduce_sum(w * tf.math.log(diff_from_truth + epsilon)) / tf.reduce_sum(w)
98 return cross_entropy
99
100 state = State(model=my_model())
101
102 # defining queue, enqueue and dequeue-operation
103 state.queue = tf.queue.RandomShuffleQueue(int(param['capacity']), int(param['min_after_dequeue']),
104 [tf.float32, tf.float32, tf.float32],
105 shapes=[[number_of_features], [1], [1]])
106
107 state.dequeue_batch_size = int(param['batch_size'])
108
109 # defining threads for training
110 global COORD
111 COORD = tf.train.Coordinator()
112 state.threads = [threading.Thread(target=execute_train_op, args=(state,)) for i in range(2)]
113 COORD.join(state.threads)
114 return state
115
116
117def begin_fit(state, Xtest, Stest, ytest, wtest, nBatches):
118 """
119 Starting training op async
120 """
121 for t in state.threads:
122 t.start()
123 return state
124
125
126def partial_fit(state, X, S, y, w, epoch, batch):
127 """
128 Put data in the queue.
129 """
130 state.queue.enqueue_many([X, w, y])
131 print(f"Queue Epoch: {epoch:d}, Queue Batch: {batch:d}, Queue Size: {state.queue.size():d}")
132 return True
133
134
135def end_fit(state):
136 """
137 Store tensorflow model in a graph
138 """
139 try:
140 import tensorflow as tf
141 except ImportError:
142 print("Please install tensorflow: pip3 install tensorflow")
143 sys.exit(1)
144
145 # close the queue allowing the dequeue operation to grab the last batches
146 state.queue.close()
147
148 with tempfile.TemporaryDirectory() as path:
149
150 tf.saved_model.save(state.model, path)
151 # tf.saved_model.save creates:
152 # path/saved_model.pb
153 # path/variables/variables.index
154 # path/variables/variables.data-00000-of-00001
155 # path/assets/* - This contains additional assets stored in the model.
156
157 file_names = ['saved_model.pb',
158 'variables/variables.index',
159 'variables/variables.data-00000-of-00001']
160
161 # we dont know what, if anything, is saved in assets/
162 assets_path = os.path.join(path, 'assets/')
163 file_names.extend([f'assets/{f.name}' for f in os.scandir(assets_path) if os.path.isfile(os.path.join(assets_path, f))])
164
165 files = []
166 for file_name in file_names:
167 with open(os.path.join(path, file_name), 'rb') as file:
168 files.append(file.read())
169 del state
170 return [file_names, files]
171
172
173if __name__ == "__main__":
174 from basf2 import conditions, find_file
175 # NOTE: do not use testing payloads in production! Any results obtained like this WILL NOT BE PUBLISHED
176 conditions.testing_payloads = [
177 'localdb/database.txt'
178 ]
179
180 import basf2_mva
181 import json
182
183 train_file = find_file("mva/train_D0toKpipi.root", "examples")
184 test_file = find_file("mva/test_D0toKpipi.root", "examples")
185
186 training_data = basf2_mva.vector(train_file)
187 testing_data = basf2_mva.vector(test_file)
188
189 general_options = basf2_mva.GeneralOptions()
190 general_options.m_datafiles = training_data
191 general_options.m_treename = "tree"
192 variables = ['p', 'pt', 'pz',
193 'daughter(0, p)', 'daughter(0, pz)', 'daughter(0, pt)',
194 'daughter(1, p)', 'daughter(1, pz)', 'daughter(1, pt)',
195 'daughter(2, p)', 'daughter(2, pz)', 'daughter(2, pt)',
196 'chiProb', 'dr', 'dz',
197 'daughter(0, dr)', 'daughter(1, dr)',
198 'daughter(0, dz)', 'daughter(1, dz)',
199 'daughter(0, chiProb)', 'daughter(1, chiProb)', 'daughter(2, chiProb)',
200 'daughter(0, kaonID)', 'daughter(0, pionID)',
201 'daughterInvM(0, 1)', 'daughterInvM(0, 2)', 'daughterInvM(1, 2)']
202
203 general_options.m_variables = basf2_mva.vector(*variables)
204 general_options.m_spectators = basf2_mva.vector('M')
205 general_options.m_target_variable = "isSignal"
206
207 specific_options = basf2_mva.PythonOptions()
208 specific_options.m_framework = "tensorflow"
209 specific_options.m_steering_file = 'mva/examples/tensorflow/multithreaded.py'
210 specific_options.m_nIterations = 100
211 specific_options.m_mini_batch_size = 0
212
213 general_options.m_identifier = "tensorflow_multithreaded"
214 specific_options.m_config = json.dumps({'capacity': 2e3, 'min_after_dequeue': 500, 'batch_size': 500})
215
216 import time
217 import basf2_mva_util
218 training_start = time.time()
219 basf2_mva.teacher(general_options, specific_options)
220 training_stop = time.time()
221 training_time = training_stop - training_start
222 method = basf2_mva_util.Method(general_options.m_identifier)
223 inference_start = time.time()
224 p, t = method.apply_expert(testing_data, general_options.m_treename)
225 inference_stop = time.time()
226 inference_time = inference_stop - inference_start
228 print("Tensorflow", training_time, inference_time, auc)
def calculate_roc_auc(p, t)