Belle II Software  release-08-01-10
multithreaded.py
1 #!/usr/bin/env python3
2 
3 
10 
11 import numpy as np
12 import tensorflow as tf
13 import threading
14 import os
15 import sys
16 import tempfile
17 
19 
20 
21 def execute_train_op(state):
22  """
23  Do the actual training multithreaded.
24  """
25  global COORD
26  try:
27  epoch = 0
28  while (not COORD.should_stop()):
29  X, w, y = state.queue.dequeue_many(state.dequeue_batch_size)
30 
31  with tf.GradientTape() as tape:
32  avg_cost = state.model.loss(state.model(X), y, w)
33  grads = tape.gradient(avg_cost, state.model.trainable_variables)
34 
35  state.model.optimizer.apply_gradients(zip(grads, state.model.trainable_variables))
36 
37  if (epoch % 100 == 0):
38  print(f'Step {epoch:d}: Train cost = {avg_cost:.4f}')
39  epoch += 1
40 
41  except tf.errors.OutOfRangeError:
42  print('No more items in closed queue, end of training.')
43  COORD.request_stop()
44 
45 
46 def get_model(number_of_features, number_of_spectators, number_of_events, training_fraction, parameters):
47  """
48  Build tensorflow graph, handles parameter and initialise variables
49  """
50 
51  param = {'capacity': 1e6, 'min_after_dequeue': 800, 'batch_size': 500}
52 
53  if isinstance(parameters, dict):
54  param.update(parameters)
55 
56  gpus = tf.config.list_physical_devices('GPU')
57  if gpus:
58  for gpu in gpus:
59  tf.config.experimental.set_memory_growth(gpu, True)
60 
61  class my_model(tf.Module):
62 
63  def __init__(self, **kwargs):
64  super().__init__(**kwargs)
65 
66  self.optimizer = tf.optimizers.Adam(0.01)
67 
68  def create_layer_variables(shape, name, activation_function):
69  weights = tf.Variable(
70  tf.random.truncated_normal(shape, stddev=1.0 / np.sqrt(float(shape[0]))),
71  name=f'{name}_weights')
72  biases = tf.Variable(tf.zeros(shape=[shape[1]]), name=f'{name}_biases')
73  return weights, biases, activation_function
74 
75  self.n_layers = 3
76  self.layer_variables = []
77 
78  shape = [number_of_features, number_of_features]
79  for i in range(self.n_layers - 1):
80  self.layer_variables.append(create_layer_variables(shape, f'inference_hidden{i}', tf.nn.relu))
81  self.layer_variables.append(create_layer_variables([number_of_features, 1], 'inference_sigmoid', tf.nn.sigmoid))
82 
83  @tf.function(input_signature=[tf.TensorSpec(shape=[None, number_of_features], dtype=tf.float32)])
84  def __call__(self, x):
85 
86  def dense(x, W, b, activation_function):
87  return activation_function(tf.matmul(x, W) + b)
88 
89  for i in range(self.n_layers):
90  x = dense(x, *self.layer_variables[i])
91  return x
92 
93  @tf.function
94  def loss(self, predicted_y, target_y, w):
95  epsilon = 1e-5
96  diff_from_truth = tf.where(target_y == 1., predicted_y, 1. - predicted_y)
97  cross_entropy = - tf.reduce_sum(w * tf.math.log(diff_from_truth + epsilon)) / tf.reduce_sum(w)
98  return cross_entropy
99 
100  state = State(model=my_model())
101 
102  # defining queue, enqueue and dequeue-operation
103  state.queue = tf.queue.RandomShuffleQueue(int(param['capacity']), int(param['min_after_dequeue']),
104  [tf.float32, tf.float32, tf.float32],
105  shapes=[[number_of_features], [1], [1]])
106 
107  state.dequeue_batch_size = int(param['batch_size'])
108 
109  # defining threads for training
110  global COORD
111  COORD = tf.train.Coordinator()
112  state.threads = [threading.Thread(target=execute_train_op, args=(state,)) for i in range(2)]
113  COORD.join(state.threads)
114  return state
115 
116 
117 def begin_fit(state, Xtest, Stest, ytest, wtest, nBatches):
118  """
119  Starting training op async
120  """
121  for t in state.threads:
122  t.start()
123  return state
124 
125 
126 def partial_fit(state, X, S, y, w, epoch, batch):
127  """
128  Put data in the queue.
129  """
130  state.queue.enqueue_many([X, w, y])
131  print(f"Queue Epoch: {epoch:d}, Queue Batch: {batch:d}, Queue Size: {state.queue.size():d}")
132  return True
133 
134 
135 def end_fit(state):
136  """
137  Store tensorflow model in a graph
138  """
139  try:
140  import tensorflow as tf
141  except ImportError:
142  print("Please install tensorflow: pip3 install tensorflow")
143  sys.exit(1)
144 
145  # close the queue allowing the dequeue operation to grab the last batches
146  state.queue.close()
147 
148  with tempfile.TemporaryDirectory() as path:
149 
150  tf.saved_model.save(state.model, path)
151  # tf.saved_model.save creates:
152  # path/saved_model.pb
153  # path/variables/variables.index
154  # path/variables/variables.data-00000-of-00001
155  # path/assets/* - This contains additional assets stored in the model.
156 
157  file_names = ['saved_model.pb',
158  'variables/variables.index',
159  'variables/variables.data-00000-of-00001']
160 
161  # we dont know what, if anything, is saved in assets/
162  assets_path = os.path.join(path, 'assets/')
163  file_names.extend([f'assets/{f.name}' for f in os.scandir(assets_path) if os.path.isfile(os.path.join(assets_path, f))])
164 
165  files = []
166  for file_name in file_names:
167  with open(os.path.join(path, file_name), 'rb') as file:
168  files.append(file.read())
169  del state
170  return [file_names, files]
171 
172 
173 if __name__ == "__main__":
174  from basf2 import conditions, find_file
175  # NOTE: do not use testing payloads in production! Any results obtained like this WILL NOT BE PUBLISHED
176  conditions.testing_payloads = [
177  'localdb/database.txt'
178  ]
179 
180  import basf2_mva
181  import json
182 
183  train_file = find_file("mva/train_D0toKpipi.root", "examples")
184  test_file = find_file("mva/test_D0toKpipi.root", "examples")
185 
186  training_data = basf2_mva.vector(train_file)
187  testing_data = basf2_mva.vector(test_file)
188 
189  general_options = basf2_mva.GeneralOptions()
190  general_options.m_datafiles = training_data
191  general_options.m_treename = "tree"
192  variables = ['p', 'pt', 'pz',
193  'daughter(0, p)', 'daughter(0, pz)', 'daughter(0, pt)',
194  'daughter(1, p)', 'daughter(1, pz)', 'daughter(1, pt)',
195  'daughter(2, p)', 'daughter(2, pz)', 'daughter(2, pt)',
196  'chiProb', 'dr', 'dz',
197  'daughter(0, dr)', 'daughter(1, dr)',
198  'daughter(0, dz)', 'daughter(1, dz)',
199  'daughter(0, chiProb)', 'daughter(1, chiProb)', 'daughter(2, chiProb)',
200  'daughter(0, kaonID)', 'daughter(0, pionID)',
201  'daughterInvM(0, 1)', 'daughterInvM(0, 2)', 'daughterInvM(1, 2)']
202 
203  general_options.m_variables = basf2_mva.vector(*variables)
204  general_options.m_spectators = basf2_mva.vector('M')
205  general_options.m_target_variable = "isSignal"
206 
207  specific_options = basf2_mva.PythonOptions()
208  specific_options.m_framework = "tensorflow"
209  specific_options.m_steering_file = 'mva/examples/tensorflow/multithreaded.py'
210  specific_options.m_nIterations = 100
211  specific_options.m_mini_batch_size = 0
212 
213  general_options.m_identifier = "tensorflow_multithreaded"
214  specific_options.m_config = json.dumps({'capacity': 2e3, 'min_after_dequeue': 500, 'batch_size': 500})
215 
216  import time
217  import basf2_mva_util
218  training_start = time.time()
219  basf2_mva.teacher(general_options, specific_options)
220  training_stop = time.time()
221  training_time = training_stop - training_start
222  method = basf2_mva_util.Method(general_options.m_identifier)
223  inference_start = time.time()
224  p, t = method.apply_expert(testing_data, general_options.m_treename)
225  inference_stop = time.time()
226  inference_time = inference_stop - inference_start
228  print("Tensorflow", training_time, inference_time, auc)
def calculate_roc_auc(p, t)