Belle II Software  release-05-02-19
multithreaded.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 # Dennis Weyland 2017
5 
6 import numpy as np
7 import tensorflow as tf
8 import threading
9 import signal
10 
12 
13 
14 CONTINUE_ENQUEUE = True
15 
16 
17 def signal_handler(signal, frame):
18  """
19  Used to safely stopp training. This can only be used in main thread,
20  so depending on the training style, it can take a while before execution.
21  """
22  print('Stopping Training safely')
23  global COORD
24  global CONTINUE_ENQUEUE
25  COORD.request_stop()
26  CONTINUE_ENQUEUE = False
27 
28 
29 def execute_train_op(state):
30  """
31  Do the actual training multithreaded.
32  """
33  global COORD
34  try:
35  epoch = 0
36  while (not COORD.should_stop()):
37  xy_list = state.session.run(state.dequeue)
38  feed_dict = {state.x: xy_list[0], state.y: xy_list[1]}
39 
40  state.session.run(state.optimizer, feed_dict=feed_dict)
41 
42  if (epoch % 100 == 0):
43  train_eva = state.session.run(state.cost, feed_dict=feed_dict)
44  print('Step %d: Train cost = %.4f' % (epoch, train_eva))
45 
46  epoch += 1
47 
48  except tf.errors.OutOfRangeError:
49  print('No more items in closed queue, end of training.')
50  COORD.request_stop()
51 
52 
53 def get_model(number_of_features, number_of_spectators, number_of_events, training_fraction, parameters):
54  """
55  Build tensorflow graph, handles parameter and initialise variables
56  """
57 
58  param = {'capacity': 1e6, 'min_after_dequeue': 800, 'batch_size': 500}
59 
60  if isinstance(parameters, dict):
61  param.update(parameters)
62 
63  x = tf.placeholder("float", [None, number_of_features])
64  y = tf.placeholder("float", [None, 1])
65 
66  def layer(x, shape, name, unit=tf.sigmoid):
67  with tf.name_scope(name) as scope:
68  weights = tf.Variable(tf.truncated_normal(shape, stddev=1.0 / np.sqrt(float(shape[0]))), name='weights')
69  biases = tf.Variable(tf.constant(0.0, shape=[shape[1]]), name='biases')
70  layer = unit(tf.matmul(x, weights) + biases)
71  return layer
72 
73  inference_hidden1 = layer(x, [number_of_features, 20], 'inference_hidden1')
74  inference_hidden2 = layer(inference_hidden1, [20, 20], 'inference_hidden2')
75  inference_hidden3 = layer(inference_hidden2, [20, 20], 'inference_hidden3')
76  inference_hidden4 = layer(inference_hidden3, [20, 20], 'inference_hidden4')
77  inference_activation = layer(inference_hidden4, [20, 1], 'inference_sigmoid', unit=tf.sigmoid)
78 
79  epsilon = 1e-5
80  inference_loss = -tf.reduce_sum(y * tf.log(inference_activation + epsilon) +
81  (1.0 - y) * tf.log(1 - inference_activation + epsilon))
82 
83  inference_optimizer = tf.train.AdamOptimizer()
84  inference_global_step = tf.Variable(0, name='inference_global_step', trainable=False)
85  inference_minimize = inference_optimizer.minimize(inference_loss, global_step=inference_global_step)
86 
87  init = tf.global_variables_initializer()
88  config = tf.ConfigProto()
89  config.gpu_options.allow_growth = True
90  session = tf.Session(config=config)
91  session.run(init)
92  tmp_state = State(x, y, inference_activation, inference_loss, inference_minimize, session)
93 
94  # All steps below are needed in state for training but not needed for mva_expert
95 
96  # defining queue, enqueue and dequeue-operation
97  tmp_state.queue = tf.RandomShuffleQueue(int(param['capacity']), int(param['min_after_dequeue']), [tf.float32, tf.float32],
98  shapes=[[number_of_features], [1]])
99  tmp_state.enqueue = tmp_state.queue.enqueue_many([x, y])
100  tmp_state.dequeue = tmp_state.queue.dequeue_many(int(param['batch_size']))
101 
102  # defining thread for training
103  global COORD
104  COORD = tf.train.Coordinator()
105  tmp_state.thread = threading.Thread(target=execute_train_op, args=(tmp_state,))
106 
107  return tmp_state
108 
109 
110 def begin_fit(state, Xtest, Stest, ytest, wtest):
111  """
112  Starting training op async
113  """
114  state.thread.start()
115 
116  return state
117 
118 
119 def partial_fit(state, X, S, y, w, epoch):
120  """
121  Put data in the queue.
122  """
123  signal.signal(signal.SIGINT, signal_handler)
124 
125  state.session.run(state.enqueue, feed_dict={state.x: X, state.y: np.reshape(y, ((len(y), 1)))})
126 
127  print("Queue Epoch: %d, Queue Size: %d" % (epoch, state.session.run(state.queue.size())))
128 
129  return CONTINUE_ENQUEUE
130 
131 
132 if __name__ == "__main__":
133  from basf2 import conditions
134  # NOTE: do not use testing payloads in production! Any results obtained like this WILL NOT BE PUBLISHED
135  conditions.testing_payloads = [
136  'localdb/database.txt'
137  ]
138 
139  import basf2_mva
140  import json
141 
142  general_options = basf2_mva.GeneralOptions()
143  general_options.m_datafiles = basf2_mva.vector("train.root")
144  general_options.m_treename = "tree"
145  variables = ['p', 'pt', 'pz',
146  'daughter(0, p)', 'daughter(0, pz)', 'daughter(0, pt)',
147  'daughter(1, p)', 'daughter(1, pz)', 'daughter(1, pt)',
148  'daughter(2, p)', 'daughter(2, pz)', 'daughter(2, pt)',
149  'chiProb', 'dr', 'dz',
150  'daughter(0, dr)', 'daughter(1, dr)',
151  'daughter(0, dz)', 'daughter(1, dz)',
152  'daughter(0, chiProb)', 'daughter(1, chiProb)', 'daughter(2, chiProb)',
153  'daughter(0, kaonID)', 'daughter(0, pionID)',
154  'daughterInvariantMass(0, 1)', 'daughterInvariantMass(0, 2)', 'daughterInvariantMass(1, 2)']
155 
156  general_options.m_variables = basf2_mva.vector(*variables)
157  general_options.m_spectators = basf2_mva.vector('M')
158  general_options.m_target_variable = "isSignal"
159 
160  specific_options = basf2_mva.PythonOptions()
161  specific_options.m_framework = "tensorflow"
162  specific_options.m_steering_file = 'mva/examples/tensorflow/multithreaded.py'
163  specific_options.m_nIterations = 400
164  specific_options.m_mini_batch_size = 100000
165 
166  general_options.m_identifier = "tensorflow_multithreaded"
167  specific_options.m_config = json.dumps({'capacity': 2e3, 'min_after_dequeue': 600, 'batch_size': 500})
168  basf2_mva.teacher(general_options, specific_options)
basf2_mva_python_interface.tensorflow
Definition: tensorflow.py:1