Belle II Software  release-05-02-19
distributed.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #
4 # Thomas Keck 2017
5 
6 """
7  This script can be used to train the FEI on a cluster like available at KEKCC
8  All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
9  The script will automatically create some directories
10  - collection containing weight files, monitoring files and other stuff
11  - jobs containing temporary files during the training (can be deleted afterwards)
12 
13  The distributed script automatically spawns jobs on the cluster (or local machine),
14  and runs the steering file on the provided MC.
15  Since a FEI training requires multiple runs over the same MC, it does so multiple times.
16  The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
17 
18  In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
19  at each stage.
20 
21  At the end it produces summary outputs using printReporting.py and latexReporting.py
22  (this will only work if you use the monitoring mode)
23  In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
24 
25  If your training fails for some reason (e.g. a job fails on the cluster),
26  the FEI will stop, you can fix the problem and resume the training using the -x option.
27  This requires some expert knowledge, because you have to know how to fix the occurred problem
28  and at which step you have to resume the training.
29 
30  After the training the weight files will be stored in the localdb in the collection directory
31  You have to upload these local database to the Belle 2 Condition Database if you want to use the FEI everywhere.
32  Alternatively you can just copy the localdb to somewhere and use it directly.
33 
34  Example:
35  python3 ~/release/analysis/scripts/fei/distributed.py
36  -s kekcc2
37  -f ~/release/analysis/examples/FEI/B_generic.py
38  -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
39  -n 100
40  -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
41  | head -n 50)
42  $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
43  | head -n 50)
44 """
45 
46 
47 import subprocess
48 import sys
49 import os
50 import argparse
51 import glob
52 import time
53 import stat
54 import shutil
55 import pickle
56 import json
57 import b2biiConversion
58 import fei
59 
60 
61 def getCommandLineOptions():
62  """ Parses the command line options of the fei and returns the corresponding arguments. """
63  parser = argparse.ArgumentParser()
64  parser.add_argument('-f', '--steeringFile', dest='steering', type=str, required=True,
65  help='Steering file. Calls fei.get_path()')
66  parser.add_argument('-w', '--workingDirectory', dest='directory', type=str, required=True,
67  help='Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
68  parser.add_argument('-l', '--largeDirectory', dest='large_dir', type=str, default='',
69  help='Directory to store large files')
70  parser.add_argument('-n', '--nJobs', dest='nJobs', type=int, default=100,
71  help='Number of jobs')
72  parser.add_argument('-d', '--data', dest='data', type=str, required=True, action='append', nargs='+',
73  help='Data files in bash expansion syntax or as process_url')
74  parser.add_argument('-x', '--skip-to', dest='skip', type=str, default='',
75  help='Skip setup of directories')
76  parser.add_argument('-o', '--once', dest='once', action='store_true',
77  help='Execute just once time, instead of waiting until a Summary is produced.')
78  parser.add_argument('-s', '--site', dest='site', type=str, default='kekcc',
79  help='Site to use [kekcc|kitekp|local]')
80  args = parser.parse_args()
81  return args
82 
83 
84 def get_job_script(args, i):
85  """
86  Create a bash file which will be dispatched to the batch system.
87  The file will run basf2 on the provided MC or the previous output
88  using the provided steering file.
89  """
90  job_script = f"""
91  if [ -f "{args.directory}/jobs/{i}/basf2_input.root" ]; then
92  INPUT="{args.directory}/jobs/{i}/basf2_input.root"
93  else
94  INPUT="{args.directory}/jobs/{i}/input_*.root"
95  fi
96  time basf2 -l error {args.directory}/collection/basf2_steering_file.py -i "$INPUT" \
97  -o {args.directory}/jobs/{i}/basf2_output.root &> my_output_hack.log || touch basf2_job_error
98  touch basf2_finished_successfully
99  """
100  return job_script
101 
102 
103 def setup(args):
104  """
105  Setup all directories, create job_scripts, split up MC into chunks
106  which are processed by each job. Create symlinks for databases.
107  """
108  os.chdir(args.directory)
109  # Search and partition data files into even chunks
110  data_files = []
111 
112  for x in args.data:
113  for y in x:
114  if (y.startswith("http://") or y.startswith("https://")):
115  data_files += b2biiConversion.parse_process_url(y)
116  else:
117  data_files += glob.glob(y)
118  print(f'Found {len(data_files)} MC files')
119  file_sizes = []
120  for file in data_files:
121  file_sizes.append(os.stat(file).st_size)
122  data_files_sorted = [x for _, x in sorted(zip(file_sizes, data_files))]
123  n = int(len(data_files) / args.nJobs)
124  if n < 1:
125  raise RuntimeError(f'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
126  data_chunks = [data_files_sorted[i::args.nJobs] for i in range(args.nJobs)]
127 
128  # Create needed directories
129  print(f'Create environment in {args.directory}')
130  shutil.rmtree('collection', ignore_errors=True)
131  shutil.rmtree('jobs', ignore_errors=True)
132  os.mkdir('collection')
133  os.mkdir('collection/localdb')
134  os.mkdir('jobs')
135  if args.large_dir:
136  if not os.path.isdir(args.large_dir):
137  raise RuntimeError('Large dir does not exist. Please make sure it does.')
138 
139  shutil.copyfile(args.steering, 'collection/basf2_steering_file.py')
140 
141  for i in range(args.nJobs):
142  # Create job directory
143  os.mkdir(f'jobs/{i}')
144  with open(f'jobs/{i}/basf2_script.sh', 'w') as f:
145  f.write(get_job_script(args, i))
146  os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
147  # Symlink initial input data files
148  for j, data_input in enumerate(data_chunks[i]):
149  os.symlink(data_input, f'jobs/{i}/input_{j}.root')
150  # Symlink weight directory and basf2_path
151  os.symlink(args.directory + '/collection/localdb', f'jobs/{i}/localdb')
152 
153 
154 def create_report(args):
155  """
156  Dumps Summary.pickle to JSON for easy inspection.
157  Create all the reports for the FEI training and the individual mva trainings.
158  This will only work if
159  1) Monitoring mode is used (see FeiConfiguration)
160  2) The system has enough memory to hold the training data for the mva classifiers
161  If this fails you can also copy the collection directory somewhere and
162  execute the commands by hand.
163  """
164  os.chdir(args.directory + '/collection')
165  with open('Summary.pickle', 'rb') as file:
166  summary = pickle.load(file)
167 
168  summary_dict = {particle.identifier:
169  {'mvaConfig': particle.mvaConfig._asdict(),
170  'channels': [{field: (value._asdict() if field in ['mvaConfig', 'preCutConfig'] else value) for
171  field, value in channel._asdict().items()} for channel in particle.channels],
172  'preCutConfig': particle.preCutConfig._asdict(),
173  'postCutConfig': particle.postCutConfig._asdict()}
174  for particle in summary[0]}
175  summary_dict.update({'feiConfig': summary[1]._asdict()})
176 
177  with open('Summary.json', 'w') as summary_json_file:
178  json.dump(summary_dict, summary_json_file, indent=4)
179 
180  ret = subprocess.call('basf2 fei/printReporting.py > ../summary.txt', shell=True)
181  ret = subprocess.call('basf2 fei/latexReporting.py ../summary.tex', shell=True)
182  for i in glob.glob("*.xml"):
183  if not fei.core.Teacher.check_if_weightfile_is_fake(i):
184  subprocess.call(f"basf2_mva_evaluate.py -id '{i[:-4]}.xml' -data '{i[:-4]}.root' "
185  f"--treename variables -o '../{i[:-4]}.zip'", shell=True)
186  os.chdir(args.directory)
187  return ret == 0
188 
189 
190 def submit_job(args, i):
191  """
192  Submits a job to the desired batch system.
193  Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
194  EKP @ KIT, or your local machine
195  """
196  # Synchronize summaries
197  if os.path.isfile(args.directory + '/collection/Summary.pickle'):
198  shutil.copyfile(args.directory + '/collection/Summary.pickle', args.directory + f'/jobs/{i}/Summary.pickle')
199  os.chdir(args.directory + f'/jobs/{i}/')
200  if args.site == 'kekcc':
201  ret = subprocess.call("bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
202  elif args.site == 'kekcc2':
203  ret = subprocess.call("bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
204  elif args.site == 'kitekp':
205  ret = subprocess.call("qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=True) # noqa
206  elif args.site == 'local':
207  subprocess.Popen(['bash', './basf2_script.sh'])
208  ret = 0
209  else:
210  raise RuntimeError(f'Given site {args.site} is not supported')
211  os.chdir(args.directory)
212  return ret == 0
213 
214 
215 def do_trainings(args):
216  """
217  Trains the multivariate classifiers for all available training data in
218  the collection directory, which wasn't trained yet.
219  This is called once per iteration
220  """
221  os.chdir(args.directory + '/collection')
222  if not os.path.isfile('Summary.pickle'):
223  return
224  particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
225  weightfiles = fei.do_trainings(particles, configuration)
226  for i in range(args.nJobs):
227  for weightfile_on_disk, _ in weightfiles:
228  os.symlink(args.directory + '/collection/' + weightfile_on_disk,
229  args.directory + f'/jobs/{i}/' + weightfile_on_disk)
230  # Check if some xml files are missing
231  xmlfiles = glob.glob("*.xml")
232  for i in range(args.nJobs):
233  for xmlfile in xmlfiles:
234  if not os.path.isfile(args.directory + f'/jobs/{i}/' + xmlfile):
235  print("Added missing symlink to ", xmlfile, " in job directory ", i)
236  os.symlink(args.directory + '/collection/' + xmlfile,
237  args.directory + f'/jobs/{i}/' + xmlfile)
238  os.chdir(args.directory)
239 
240 
241 def jobs_finished(args):
242  """
243  Check if all jobs already finished.
244  Throws a runtime error of it detects an error in one of the jobs
245  """
246  finished = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
247  failed = glob.glob(args.directory + '/jobs/*/basf2_job_error')
248 
249  if len(failed) > 0:
250  raise RuntimeError(f'basf2 execution failed! Error occurred in: {str(failed)}')
251 
252  return len(finished) == args.nJobs
253 
254 
255 def merge_root_files(args):
256  """
257  Merges all produced ROOT files of all jobs together
258  and puts the merged ROOT files into the collection directory.
259  This affects mostly
260  - the training data for the multivariate classifiers
261  - the monitoring files
262  """
263  rootfiles = []
264  for f in glob.glob(args.directory + '/jobs/0/*.root'):
265  f = os.path.basename(f)
266  if f in ['basf2_input.root', 'basf2_output.root']:
267  continue
268  if f.startswith('input_'):
269  continue
270  if os.path.isfile(args.directory + '/collection/' + f):
271  continue
272  rootfiles.append(f)
273  if len(rootfiles) == 0:
274  print('There are no root files to merge')
275  else:
276  print('Merge the following files', rootfiles)
277  for f in rootfiles:
278  output = args.directory + '/collection/' + f
279  inputs = [args.directory + f'/jobs/{i}/' + f for i in range(args.nJobs)]
280  ret = subprocess.call(['fei_merge_files', output] + inputs)
281  if ret != 0:
282  raise RuntimeError('Error during merging root files')
283  # Replace mcParticlesCount.root with merged file in all directories
284  # so that the individual jobs calculate the correct mcCounts and sampling rates
285  if f == 'mcParticlesCount.root':
286  for i in inputs:
287  os.remove(i)
288  os.symlink(output, i)
289 
290 
291 def update_input_files(args):
292  """
293  Updates the input files.
294  For the first iteration the input files are the MC provided by the user.
295  After each training this function replaces the input with the output of the previous iteration.
296  Effectively this caches the whole DataStore of basf2 between the iterations.
297  """
298  for i in range(args.nJobs):
299  output_file = args.directory + '/jobs/' + str(i) + '/basf2_output.root'
300  input_file = args.directory + '/jobs/' + str(i) + '/basf2_input.root'
301  if args.large_dir:
302  real_input_file = args.large_dir + '/basf2_input_' + str(i) + '.root'
303  shutil.move(output_file, real_input_file)
304  if os.path.isfile(input_file):
305  os.remove(input_file)
306  os.symlink(real_input_file, input_file)
307  else:
308  shutil.move(output_file, input_file)
309  # Saves the Summary.pickle of the first job to the collection directory
310  # so we can keep track at which stage of the reconstruction the FEI is currently.
311  shutil.copyfile(args.directory + '/jobs/0/Summary.pickle', args.directory + '/collection/Summary.pickle')
312 
313 
314 def clean_job_directory(args):
315  """
316  Cleans the job directory for the next iteration
317  Meaning we remove all logs
318  """
319  files = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
320  files += glob.glob(args.directory + '/jobs/*/error.log')
321  files += glob.glob(args.directory + '/jobs/*/output.log')
322  for f in files:
323  os.remove(f)
324 
325 
326 def is_still_training(args):
327  """
328  Checks if the FEI training is still ongoing.
329  The training is finished if the FEI reached stage 7
330  """
331  os.chdir(args.directory + '/collection')
332  if not os.path.isfile('Summary.pickle'):
333  return True
334  particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
335  os.chdir(args.directory)
336  return configuration.cache != 7
337 
338 
339 if __name__ == '__main__':
340  args = getCommandLineOptions()
341 
342  os.chdir(args.directory)
343 
344  # If the user wants resume an existing training
345  # we check at which step he wants to resume and
346  # try to perform the necessary steps to do so
347  if args.skip:
348  print('Skipping setup')
349  start = 0
350  if args.skip == 'clean':
351  start = 1
352  elif args.skip == 'update':
353  start = 2
354  elif args.skip == 'merge':
355  start = 3
356  elif args.skip == 'wait':
357  start = 4
358  elif args.skip == 'submit':
359  start = 5
360  elif args.skip == 'resubmit':
361  start = 6
362  elif args.skip == 'report':
363  start = 7
364  elif args.skip == 'run':
365  start = 0
366  else:
367  raise RuntimeError(f'Unknown skip parameter {args.skip}')
368 
369  if start == 7:
370  import sys
371  create_report(args)
372  sys.exit(0)
373 
374  # The user wants to submit the jobs again
375  if start >= 5:
376  print('Submitting jobs')
377  for i in range(args.nJobs):
378  # The user wants to resubmit jobs, this means the training of some jobs failed
379  # We check which jobs contained an error flag, and were not successful
380  # These jobs are submitted again, other jobs are skipped (continue)
381  if start >= 6:
382  error_file = args.directory + f'/jobs/{i}/basf2_job_error'
383  success_file = args.directory + f'/jobs/{i}/basf2_finished_successfully'
384  if os.path.isfile(error_file) or not os.path.isfile(success_file):
385  print(f"Delete {error_file} and resubmit job")
386  if os.path.isfile(error_file):
387  os.remove(error_file)
388  if os.path.isfile(success_file):
389  os.remove(success_file)
390  else:
391  continue
392  # Reset Summary file
393  shutil.copyfile(os.path.join(args.directory, 'collection/Summary.pickle'),
394  os.path.join(args.directory, f'jobs/{i}/Summary.pickle'))
395  if not submit_job(args, i):
396  raise RuntimeError('Error during submitting job')
397 
398  if start >= 4:
399  print('Wait for jobs to end')
400  while not jobs_finished(args):
401  time.sleep(40)
402 
403  if start >= 3:
404  print('Merge ROOT files')
405  merge_root_files(args)
406 
407  if start >= 2:
408  print('Update input files')
409  update_input_files(args)
410 
411  if start >= 1:
412  print('Clean job directory')
413  clean_job_directory(args)
414 
415  else:
416  # This is a new training
417  # So we have to setup the whole directory (this will override any existing training)
418  setup(args)
419 
420  # The main loop, which steers the whole FEI training on a batch system
421  # 1. We check if the FEI still requires further steps
422  # 2. We do all necessary trainings which we can perform at this point in time
423  # 3. We submit new jobs which will use the new trainings to reconstruct the hierarchy further
424  # 4. We wait until all jobs finished
425  # 5. We merge the output of the jobs
426  # 6. We update the inputs of the jobs (input of next stage is the output of the current stage)
427  # 7. We clean the job directories so they can be used during the next stage again.
428  while is_still_training(args):
429  print('Do available trainings')
430  do_trainings(args)
431 
432  print('Submitting jobs')
433  for i in range(args.nJobs):
434  if not submit_job(args, i):
435  raise RuntimeError('Error during submitting jobs')
436 
437  print('Wait for jobs to end')
438  while not jobs_finished(args):
439  time.sleep(40)
440 
441  print('Merge ROOT files')
442  merge_root_files(args)
443 
444  print('Update input files')
445  update_input_files(args)
446 
447  print('Clean job directory')
448  clean_job_directory(args)
449 
450  if args.once:
451  break
452  else:
453  # This else will be called if the loop was not existed by break
454  # This means the training finished and we can create our summary reports.
455  create_report(args)
setup
b2biiConversion.parse_process_url
def parse_process_url(url)
Definition: b2biiConversion.py:201