7 import tensorflow
as tf
14 CONTINUE_ENQUEUE =
True
17 def signal_handler(signal, frame):
19 Used to safely stopp training. This can only be used in main thread,
20 so depending on the training style, it can take a while before execution.
22 print(
'Stopping Training safely')
24 global CONTINUE_ENQUEUE
26 CONTINUE_ENQUEUE =
False
29 def execute_train_op(state):
31 Do the actual training multithreaded.
36 while (
not COORD.should_stop()):
37 xy_list = state.session.run(state.dequeue)
38 feed_dict = {state.x: xy_list[0], state.y: xy_list[1]}
40 state.session.run(state.optimizer, feed_dict=feed_dict)
42 if (epoch % 100 == 0):
43 train_eva = state.session.run(state.cost, feed_dict=feed_dict)
44 print(
'Step %d: Train cost = %.4f' % (epoch, train_eva))
48 except tf.errors.OutOfRangeError:
49 print(
'No more items in closed queue, end of training.')
53 def get_model(number_of_features, number_of_spectators, number_of_events, training_fraction, parameters):
55 Build tensorflow graph, handles parameter and initialise variables
58 param = {
'capacity': 1e6,
'min_after_dequeue': 800,
'batch_size': 500}
60 if isinstance(parameters, dict):
61 param.update(parameters)
63 x = tf.placeholder(
"float", [
None, number_of_features])
64 y = tf.placeholder(
"float", [
None, 1])
66 def layer(x, shape, name, unit=tf.sigmoid):
67 with tf.name_scope(name)
as scope:
68 weights = tf.Variable(tf.truncated_normal(shape, stddev=1.0 / np.sqrt(float(shape[0]))), name=
'weights')
69 biases = tf.Variable(tf.constant(0.0, shape=[shape[1]]), name=
'biases')
70 layer = unit(tf.matmul(x, weights) + biases)
73 inference_hidden1 = layer(x, [number_of_features, 20],
'inference_hidden1')
74 inference_hidden2 = layer(inference_hidden1, [20, 20],
'inference_hidden2')
75 inference_hidden3 = layer(inference_hidden2, [20, 20],
'inference_hidden3')
76 inference_hidden4 = layer(inference_hidden3, [20, 20],
'inference_hidden4')
77 inference_activation = layer(inference_hidden4, [20, 1],
'inference_sigmoid', unit=tf.sigmoid)
80 inference_loss = -tf.reduce_sum(y * tf.log(inference_activation + epsilon) +
81 (1.0 - y) * tf.log(1 - inference_activation + epsilon))
83 inference_optimizer = tf.train.AdamOptimizer()
84 inference_global_step = tf.Variable(0, name=
'inference_global_step', trainable=
False)
85 inference_minimize = inference_optimizer.minimize(inference_loss, global_step=inference_global_step)
87 init = tf.global_variables_initializer()
88 config = tf.ConfigProto()
89 config.gpu_options.allow_growth =
True
90 session = tf.Session(config=config)
92 tmp_state = State(x, y, inference_activation, inference_loss, inference_minimize, session)
97 tmp_state.queue = tf.RandomShuffleQueue(int(param[
'capacity']), int(param[
'min_after_dequeue']), [tf.float32, tf.float32],
98 shapes=[[number_of_features], [1]])
99 tmp_state.enqueue = tmp_state.queue.enqueue_many([x, y])
100 tmp_state.dequeue = tmp_state.queue.dequeue_many(int(param[
'batch_size']))
104 COORD = tf.train.Coordinator()
105 tmp_state.thread = threading.Thread(target=execute_train_op, args=(tmp_state,))
110 def begin_fit(state, Xtest, Stest, ytest, wtest):
112 Starting training op async
119 def partial_fit(state, X, S, y, w, epoch):
121 Put data in the queue.
123 signal.signal(signal.SIGINT, signal_handler)
125 state.session.run(state.enqueue, feed_dict={state.x: X, state.y: np.reshape(y, ((len(y), 1)))})
127 print(
"Queue Epoch: %d, Queue Size: %d" % (epoch, state.session.run(state.queue.size())))
129 return CONTINUE_ENQUEUE
132 if __name__ ==
"__main__":
133 from basf2
import conditions
135 conditions.testing_payloads = [
136 'localdb/database.txt'
142 general_options = basf2_mva.GeneralOptions()
143 general_options.m_datafiles = basf2_mva.vector(
"train.root")
144 general_options.m_treename =
"tree"
145 variables = [
'p',
'pt',
'pz',
146 'daughter(0, p)',
'daughter(0, pz)',
'daughter(0, pt)',
147 'daughter(1, p)',
'daughter(1, pz)',
'daughter(1, pt)',
148 'daughter(2, p)',
'daughter(2, pz)',
'daughter(2, pt)',
149 'chiProb',
'dr',
'dz',
150 'daughter(0, dr)',
'daughter(1, dr)',
151 'daughter(0, dz)',
'daughter(1, dz)',
152 'daughter(0, chiProb)',
'daughter(1, chiProb)',
'daughter(2, chiProb)',
153 'daughter(0, kaonID)',
'daughter(0, pionID)',
154 'daughterInvariantMass(0, 1)',
'daughterInvariantMass(0, 2)',
'daughterInvariantMass(1, 2)']
156 general_options.m_variables = basf2_mva.vector(*variables)
157 general_options.m_spectators = basf2_mva.vector(
'M')
158 general_options.m_target_variable =
"isSignal"
160 specific_options = basf2_mva.PythonOptions()
161 specific_options.m_framework =
"tensorflow"
162 specific_options.m_steering_file =
'mva/examples/tensorflow/multithreaded.py'
163 specific_options.m_nIterations = 400
164 specific_options.m_mini_batch_size = 100000
166 general_options.m_identifier =
"tensorflow_multithreaded"
167 specific_options.m_config = json.dumps({
'capacity': 2e3,
'min_after_dequeue': 600,
'batch_size': 500})
168 basf2_mva.teacher(general_options, specific_options)