Belle II Software  release-06-01-15
multithreaded.py
1 #!/usr/bin/env python3
2 
3 
10 
11 import numpy as np
12 import tensorflow as tf
13 import threading
14 import signal
15 
17 
18 
19 CONTINUE_ENQUEUE = True
20 
21 
22 def signal_handler(signal, frame):
23  """
24  Used to safely stop training. This can only be used in main thread,
25  so depending on the training style, it can take a while before execution.
26  """
27  print('Stopping Training safely')
28  global COORD
29  global CONTINUE_ENQUEUE
30  COORD.request_stop()
31  CONTINUE_ENQUEUE = False
32 
33 
34 def execute_train_op(state):
35  """
36  Do the actual training multithreaded.
37  """
38  global COORD
39  try:
40  epoch = 0
41  while (not COORD.should_stop()):
42  xy_list = state.session.run(state.dequeue)
43  feed_dict = {state.x: xy_list[0], state.y: xy_list[1]}
44 
45  state.session.run(state.optimizer, feed_dict=feed_dict)
46 
47  if (epoch % 100 == 0):
48  train_eva = state.session.run(state.cost, feed_dict=feed_dict)
49  print('Step %d: Train cost = %.4f' % (epoch, train_eva))
50 
51  epoch += 1
52 
53  except tf.errors.OutOfRangeError:
54  print('No more items in closed queue, end of training.')
55  COORD.request_stop()
56 
57 
58 def get_model(number_of_features, number_of_spectators, number_of_events, training_fraction, parameters):
59  """
60  Build tensorflow graph, handles parameter and initialise variables
61  """
62 
63  param = {'capacity': 1e6, 'min_after_dequeue': 800, 'batch_size': 500}
64 
65  if isinstance(parameters, dict):
66  param.update(parameters)
67 
68  x = tf.placeholder("float", [None, number_of_features])
69  y = tf.placeholder("float", [None, 1])
70 
71  def layer(x, shape, name, unit=tf.sigmoid):
72  with tf.name_scope(name):
73  weights = tf.Variable(tf.truncated_normal(shape, stddev=1.0 / np.sqrt(float(shape[0]))), name='weights')
74  biases = tf.Variable(tf.constant(0.0, shape=[shape[1]]), name='biases')
75  layer = unit(tf.matmul(x, weights) + biases)
76  return layer
77 
78  inference_hidden1 = layer(x, [number_of_features, 20], 'inference_hidden1')
79  inference_hidden2 = layer(inference_hidden1, [20, 20], 'inference_hidden2')
80  inference_hidden3 = layer(inference_hidden2, [20, 20], 'inference_hidden3')
81  inference_hidden4 = layer(inference_hidden3, [20, 20], 'inference_hidden4')
82  inference_activation = layer(inference_hidden4, [20, 1], 'inference_sigmoid', unit=tf.sigmoid)
83 
84  epsilon = 1e-5
85  inference_loss = -tf.reduce_sum(y * tf.log(inference_activation + epsilon) +
86  (1.0 - y) * tf.log(1 - inference_activation + epsilon))
87 
88  inference_optimizer = tf.train.AdamOptimizer()
89  inference_global_step = tf.Variable(0, name='inference_global_step', trainable=False)
90  inference_minimize = inference_optimizer.minimize(inference_loss, global_step=inference_global_step)
91 
92  init = tf.global_variables_initializer()
93  config = tf.ConfigProto()
94  config.gpu_options.allow_growth = True
95  session = tf.Session(config=config)
96  session.run(init)
97  tmp_state = State(x, y, inference_activation, inference_loss, inference_minimize, session)
98 
99  # All steps below are needed in state for training but not needed for mva_expert
100 
101  # defining queue, enqueue and dequeue-operation
102  tmp_state.queue = tf.RandomShuffleQueue(int(param['capacity']), int(param['min_after_dequeue']), [tf.float32, tf.float32],
103  shapes=[[number_of_features], [1]])
104  tmp_state.enqueue = tmp_state.queue.enqueue_many([x, y])
105  tmp_state.dequeue = tmp_state.queue.dequeue_many(int(param['batch_size']))
106 
107  # defining thread for training
108  global COORD
109  COORD = tf.train.Coordinator()
110  tmp_state.thread = threading.Thread(target=execute_train_op, args=(tmp_state,))
111 
112  return tmp_state
113 
114 
115 def begin_fit(state, Xtest, Stest, ytest, wtest):
116  """
117  Starting training op async
118  """
119  state.thread.start()
120 
121  return state
122 
123 
124 def partial_fit(state, X, S, y, w, epoch):
125  """
126  Put data in the queue.
127  """
128  signal.signal(signal.SIGINT, signal_handler)
129 
130  state.session.run(state.enqueue, feed_dict={state.x: X, state.y: np.reshape(y, ((len(y), 1)))})
131 
132  print("Queue Epoch: %d, Queue Size: %d" % (epoch, state.session.run(state.queue.size())))
133 
134  return CONTINUE_ENQUEUE
135 
136 
137 if __name__ == "__main__":
138  from basf2 import conditions
139  # NOTE: do not use testing payloads in production! Any results obtained like this WILL NOT BE PUBLISHED
140  conditions.testing_payloads = [
141  'localdb/database.txt'
142  ]
143 
144  import basf2_mva
145  import json
146 
147  general_options = basf2_mva.GeneralOptions()
148  general_options.m_datafiles = basf2_mva.vector("train.root")
149  general_options.m_treename = "tree"
150  variables = ['p', 'pt', 'pz',
151  'daughter(0, p)', 'daughter(0, pz)', 'daughter(0, pt)',
152  'daughter(1, p)', 'daughter(1, pz)', 'daughter(1, pt)',
153  'daughter(2, p)', 'daughter(2, pz)', 'daughter(2, pt)',
154  'chiProb', 'dr', 'dz',
155  'daughter(0, dr)', 'daughter(1, dr)',
156  'daughter(0, dz)', 'daughter(1, dz)',
157  'daughter(0, chiProb)', 'daughter(1, chiProb)', 'daughter(2, chiProb)',
158  'daughter(0, kaonID)', 'daughter(0, pionID)',
159  'daughterInvariantMass(0, 1)', 'daughterInvariantMass(0, 2)', 'daughterInvariantMass(1, 2)']
160 
161  general_options.m_variables = basf2_mva.vector(*variables)
162  general_options.m_spectators = basf2_mva.vector('M')
163  general_options.m_target_variable = "isSignal"
164 
165  specific_options = basf2_mva.PythonOptions()
166  specific_options.m_framework = "tensorflow"
167  specific_options.m_steering_file = 'mva/examples/tensorflow/multithreaded.py'
168  specific_options.m_nIterations = 400
169  specific_options.m_mini_batch_size = 100000
170 
171  general_options.m_identifier = "tensorflow_multithreaded"
172  specific_options.m_config = json.dumps({'capacity': 2e3, 'min_after_dequeue': 600, 'batch_size': 500})
173  basf2_mva.teacher(general_options, specific_options)