Belle II Software  release-08-01-10
distributed.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12  This script can be used to train the FEI on a cluster like available at KEKCC
13  All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
14  The script will automatically create some directories
15  - collection containing weight files, monitoring files and other stuff
16  - jobs containing temporary files during the training (can be deleted afterwards)
17 
18  The distributed script automatically spawns jobs on the cluster (or local machine),
19  and runs the steering file on the provided MC.
20  Since a FEI training requires multiple runs over the same MC, it does so multiple times.
21  The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
22 
23  In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
24  at each stage.
25 
26  At the end it produces summary outputs using printReporting.py and latexReporting.py
27  (this will only work if you use the monitoring mode)
28  In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
29 
30  If your training fails for some reason (e.g. a job fails on the cluster),
31  the FEI will stop, you can fix the problem and resume the training using the -x option.
32  This requires some expert knowledge, because you have to know how to fix the occurred problem
33  and at which step you have to resume the training.
34 
35  After the training the weight files will be stored in the localdb in the collection directory
36  You have to upload these local database to the Belle II Condition Database if you want to use the FEI everywhere.
37  Alternatively you can just copy the localdb to somewhere and use it directly.
38 
39  Example:
40  python3 ~/release/analysis/scripts/fei/distributed.py
41  -s kekcc2
42  -f ~/release/analysis/examples/FEI/B_generic.py
43  -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
44  -n 100
45  -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
46  | head -n 50)
47  $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
48  | head -n 50)
49 """
50 
51 
52 import subprocess
53 import sys
54 import os
55 import argparse
56 import glob
57 import time
58 import stat
59 import shutil
60 import pickle
61 import json
62 import b2biiConversion
63 import fei
64 
65 
66 def getCommandLineOptions():
67  """ Parses the command line options of the fei and returns the corresponding arguments. """
68  # FEI defines own command line options, therefore we disable
69  # the ROOT command line options, which otherwise interfere sometimes.
70  # Always avoid the top-level 'import ROOT'.
71  import ROOT # noqa
72  ROOT.PyConfig.IgnoreCommandLineOptions = True
73  parser = argparse.ArgumentParser()
74  parser.add_argument('-f', '--steeringFile', dest='steering', type=str, required=True,
75  help='Steering file. Calls fei.get_path()')
76  parser.add_argument('-w', '--workingDirectory', dest='directory', type=str, required=True,
77  help='Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
78  parser.add_argument('-l', '--largeDirectory', dest='large_dir', type=str, default='',
79  help='Directory to store large files')
80  parser.add_argument('-n', '--nJobs', dest='nJobs', type=int, default=100,
81  help='Number of jobs')
82  parser.add_argument('-d', '--data', dest='data', type=str, required=True, action='append', nargs='+',
83  help='Data files in bash expansion syntax or as process_url')
84  parser.add_argument('-x', '--skip-to', dest='skip', type=str, default='',
85  help='Skip setup of directories')
86  parser.add_argument('-o', '--once', dest='once', action='store_true',
87  help='Execute just once time, instead of waiting until a Summary is produced.')
88  parser.add_argument('-s', '--site', dest='site', type=str, default='kekcc',
89  help='Site to use [kekcc|kitekp|local]')
90  args = parser.parse_args()
91  return args
92 
93 
94 def get_job_script(args, i):
95  """
96  Create a bash file which will be dispatched to the batch system.
97  The file will run basf2 on the provided MC or the previous output
98  using the provided steering file.
99  """
100  job_script = f"""
101  if [ -f "{args.directory}/jobs/{i}/basf2_input.root" ]; then
102  INPUT="{args.directory}/jobs/{i}/basf2_input.root"
103  else
104  INPUT="{args.directory}/jobs/{i}/input_*.root"
105  fi
106  time basf2 -l error {args.directory}/collection/basf2_steering_file.py -i "$INPUT" \
107  -o {args.directory}/jobs/{i}/basf2_output.root &> my_output_hack.log || touch basf2_job_error
108  touch basf2_finished_successfully
109  """
110  return job_script
111 
112 
113 def setup(args):
114  """
115  Setup all directories, create job_scripts, split up MC into chunks
116  which are processed by each job. Create symlinks for databases.
117  """
118  os.chdir(args.directory)
119  # Search and partition data files into even chunks
120  data_files = []
121 
122  for x in args.data:
123  for y in x:
124  if (y.startswith("http://") or y.startswith("https://")):
125  data_files += b2biiConversion.parse_process_url(y)
126  else:
127  data_files += glob.glob(y)
128  print(f'Found {len(data_files)} MC files')
129  file_sizes = []
130  for file in data_files:
131  file_sizes.append(os.stat(file).st_size)
132  data_files_sorted = [x for _, x in sorted(zip(file_sizes, data_files))]
133  n = int(len(data_files) / args.nJobs)
134  if n < 1:
135  raise RuntimeError(f'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
136  data_chunks = [data_files_sorted[i::args.nJobs] for i in range(args.nJobs)]
137 
138  # Create needed directories
139  print(f'Create environment in {args.directory}')
140  shutil.rmtree('collection', ignore_errors=True)
141  shutil.rmtree('jobs', ignore_errors=True)
142  os.mkdir('collection')
143  os.mkdir('collection/localdb')
144  os.mkdir('jobs')
145  if args.large_dir:
146  if not os.path.isdir(args.large_dir):
147  raise RuntimeError('Large dir does not exist. Please make sure it does.')
148 
149  shutil.copyfile(args.steering, 'collection/basf2_steering_file.py')
150 
151  for i in range(args.nJobs):
152  # Create job directory
153  os.mkdir(f'jobs/{i}')
154  with open(f'jobs/{i}/basf2_script.sh', 'w') as f:
155  f.write(get_job_script(args, i))
156  os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
157  # Symlink initial input data files
158  for j, data_input in enumerate(data_chunks[i]):
159  os.symlink(data_input, f'jobs/{i}/input_{j}.root')
160  # Symlink weight directory and basf2_path
161  os.symlink(args.directory + '/collection/localdb', f'jobs/{i}/localdb')
162 
163 
164 def create_report(args):
165  """
166  Dumps Summary.pickle to JSON for easy inspection.
167  Create all the reports for the FEI training and the individual mva trainings.
168  This will only work if
169  1) Monitoring mode is used (see FeiConfiguration)
170  2) The system has enough memory to hold the training data for the mva classifiers
171  If this fails you can also copy the collection directory somewhere and
172  execute the commands by hand.
173  """
174  os.chdir(args.directory + '/collection')
175  with open('Summary.pickle', 'rb') as file:
176  summary = pickle.load(file)
177 
178  summary_dict = {particle.identifier:
179  {'mvaConfig': particle.mvaConfig._asdict(),
180  'channels': [{field: (value._asdict() if field in ['mvaConfig', 'preCutConfig'] else value) for
181  field, value in channel._asdict().items()} for channel in particle.channels],
182  'preCutConfig': particle.preCutConfig._asdict(),
183  'postCutConfig': particle.postCutConfig._asdict()}
184  for particle in summary[0]}
185  summary_dict.update({'feiConfig': summary[1]._asdict()})
186 
187  with open('Summary.json', 'w') as summary_json_file:
188  json.dump(summary_dict, summary_json_file, indent=4)
189 
190  ret = subprocess.call('basf2 fei/printReporting.py > ../summary.txt', shell=True)
191  ret = subprocess.call('basf2 fei/latexReporting.py ../summary.tex', shell=True)
192  for i in glob.glob("*.xml"):
193  if not fei.core.Teacher.check_if_weightfile_is_fake(i):
194  subprocess.call(f"basf2_mva_evaluate.py -id '{i[:-4]}.xml' -data 'training_input.root' "
195  f"--treename '{i[:-4]} variables' -o '../{i[:-4]}.zip'", shell=True)
196  os.chdir(args.directory)
197  return ret == 0
198 
199 
200 def submit_job(args, i):
201  """
202  Submits a job to the desired batch system.
203  Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
204  EKP @ KIT, or your local machine
205  """
206  # Synchronize summaries
207  if os.path.isfile(args.directory + '/collection/Summary.pickle'):
208  shutil.copyfile(args.directory + '/collection/Summary.pickle', args.directory + f'/jobs/{i}/Summary.pickle')
209  os.chdir(args.directory + f'/jobs/{i}/')
210  if args.site == 'kekcc':
211  ret = subprocess.call("bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
212  elif args.site == 'kekcc2':
213  ret = subprocess.call("bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
214  elif args.site == 'kitekp':
215  ret = subprocess.call("qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=True) # noqa
216  elif args.site == 'local':
217  subprocess.Popen(['bash', './basf2_script.sh'])
218  ret = 0
219  else:
220  raise RuntimeError(f'Given site {args.site} is not supported')
221  os.chdir(args.directory)
222  return ret == 0
223 
224 
225 def do_trainings(args):
226  """
227  Trains the multivariate classifiers for all available training data in
228  the collection directory, which wasn't trained yet.
229  This is called once per iteration
230  """
231  os.chdir(args.directory + '/collection')
232  if not os.path.isfile('Summary.pickle'):
233  return
234  particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
235  weightfiles = fei.do_trainings(particles, configuration)
236  for i in range(args.nJobs):
237  for weightfile_on_disk, _ in weightfiles:
238  os.symlink(args.directory + '/collection/' + weightfile_on_disk,
239  args.directory + f'/jobs/{i}/' + weightfile_on_disk)
240  # Check if some xml files are missing
241  xmlfiles = glob.glob("*.xml")
242  for i in range(args.nJobs):
243  for xmlfile in xmlfiles:
244  if not os.path.isfile(args.directory + f'/jobs/{i}/' + xmlfile):
245  print("Added missing symlink to ", xmlfile, " in job directory ", i)
246  os.symlink(args.directory + '/collection/' + xmlfile,
247  args.directory + f'/jobs/{i}/' + xmlfile)
248  os.chdir(args.directory)
249 
250 
251 def jobs_finished(args):
252  """
253  Check if all jobs already finished.
254  Throws a runtime error of it detects an error in one of the jobs
255  """
256  finished = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
257  failed = glob.glob(args.directory + '/jobs/*/basf2_job_error')
258 
259  if len(failed) > 0:
260  raise RuntimeError(f'basf2 execution failed! Error occurred in: {str(failed)}')
261 
262  return len(finished) == args.nJobs
263 
264 
265 def merge_root_files(args):
266  """
267  Merges all produced ROOT files of all jobs together
268  and puts the merged ROOT files into the collection directory.
269  This affects mostly
270  - the training data for the multivariate classifiers
271  - the monitoring files
272  """
273  rootfiles = []
274  for f in glob.glob(args.directory + '/jobs/0/*.root'):
275  f = os.path.basename(f)
276  if f in ['basf2_input.root', 'basf2_output.root']:
277  continue
278  if f.startswith('input_'):
279  continue
280  # in case of training_input.root, append to already existing file
281  if os.path.isfile(args.directory + '/collection/' + f) and not f == 'training_input.root':
282  continue
283  rootfiles.append(f)
284  if len(rootfiles) == 0:
285  print('There are no root files to merge')
286  else:
287  print('Merge the following files', rootfiles)
288  for f in rootfiles:
289  output = args.directory + '/collection/' + f
290  inputs = [args.directory + f'/jobs/{i}/' + f for i in range(args.nJobs)]
291  ret = subprocess.call(['analysis-fei-mergefiles', output] + inputs)
292  if ret != 0:
293  raise RuntimeError('Error during merging root files')
294  # Replace mcParticlesCount.root with merged file in all directories
295  # so that the individual jobs calculate the correct mcCounts and sampling rates
296  if f == 'mcParticlesCount.root':
297  for i in inputs:
298  os.remove(i)
299  os.symlink(output, i)
300 
301 
302 def update_input_files(args):
303  """
304  Updates the input files.
305  For the first iteration the input files are the MC provided by the user.
306  After each training this function replaces the input with the output of the previous iteration.
307  Effectively this caches the whole DataStore of basf2 between the iterations.
308  """
309  for i in range(args.nJobs):
310  output_file = args.directory + '/jobs/' + str(i) + '/basf2_output.root'
311  input_file = args.directory + '/jobs/' + str(i) + '/basf2_input.root'
312  if args.large_dir:
313  real_input_file = args.large_dir + '/basf2_input_' + str(i) + '.root'
314  shutil.move(output_file, real_input_file)
315  if os.path.isfile(input_file):
316  os.remove(input_file)
317  os.symlink(real_input_file, input_file)
318  else:
319  shutil.move(output_file, input_file)
320  # Saves the Summary.pickle of the first job to the collection directory
321  # so we can keep track at which stage of the reconstruction the FEI is currently.
322  shutil.copyfile(args.directory + '/jobs/0/Summary.pickle', args.directory + '/collection/Summary.pickle')
323 
324 
325 def clean_job_directory(args):
326  """
327  Cleans the job directory for the next iteration
328  Meaning we remove all logs
329  """
330  files = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
331  files += glob.glob(args.directory + '/jobs/*/error.log')
332  files += glob.glob(args.directory + '/jobs/*/output.log')
333  for f in files:
334  os.remove(f)
335 
336 
337 def is_still_training(args):
338  """
339  Checks if the FEI training is still ongoing.
340  The training is finished if the FEI reached stage 7
341  """
342  os.chdir(args.directory + '/collection')
343  if not os.path.isfile('Summary.pickle'):
344  return True
345  particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
346  os.chdir(args.directory)
347  return configuration.cache != 7
348 
349 
350 if __name__ == '__main__':
351  args = getCommandLineOptions()
352 
353  os.chdir(args.directory)
354 
355  # If the user wants resume an existing training
356  # we check at which step he wants to resume and
357  # try to perform the necessary steps to do so
358  if args.skip:
359  print('Skipping setup')
360  start = 0
361  if args.skip == 'clean':
362  start = 1
363  elif args.skip == 'update':
364  start = 2
365  elif args.skip == 'merge':
366  start = 3
367  elif args.skip == 'wait':
368  start = 4
369  elif args.skip == 'submit':
370  start = 5
371  elif args.skip == 'resubmit':
372  start = 6
373  elif args.skip == 'report':
374  start = 7
375  elif args.skip == 'run':
376  start = 0
377  else:
378  raise RuntimeError(f'Unknown skip parameter {args.skip}')
379 
380  if start == 7:
381  create_report(args)
382  sys.exit(0)
383 
384  # The user wants to submit the jobs again
385  if start >= 5:
386  print('Submitting jobs')
387  for i in range(args.nJobs):
388  # The user wants to resubmit jobs, this means the training of some jobs failed
389  # We check which jobs contained an error flag, and were not successful
390  # These jobs are submitted again, other jobs are skipped (continue)
391  if start >= 6:
392  error_file = args.directory + f'/jobs/{i}/basf2_job_error'
393  success_file = args.directory + f'/jobs/{i}/basf2_finished_successfully'
394  if os.path.isfile(error_file) or not os.path.isfile(success_file):
395  print(f"Delete {error_file} and resubmit job")
396  if os.path.isfile(error_file):
397  os.remove(error_file)
398  if os.path.isfile(success_file):
399  os.remove(success_file)
400  else:
401  continue
402  # Reset Summary file
403  shutil.copyfile(os.path.join(args.directory, 'collection/Summary.pickle'),
404  os.path.join(args.directory, f'jobs/{i}/Summary.pickle'))
405  if not submit_job(args, i):
406  raise RuntimeError('Error during submitting job')
407 
408  if start >= 4:
409  print('Wait for jobs to end')
410  while not jobs_finished(args):
411  time.sleep(40)
412 
413  if start >= 3:
414  print('Merge ROOT files')
415  merge_root_files(args)
416 
417  if start >= 2:
418  print('Update input files')
419  update_input_files(args)
420 
421  if start >= 1:
422  print('Clean job directory')
423  clean_job_directory(args)
424 
425  else:
426  # This is a new training
427  # So we have to setup the whole directory (this will override any existing training)
428  setup(args)
429 
430  # The main loop, which steers the whole FEI training on a batch system
431  # 1. We check if the FEI still requires further steps
432  # 2. We do all necessary trainings which we can perform at this point in time
433  # 3. We submit new jobs which will use the new trainings to reconstruct the hierarchy further
434  # 4. We wait until all jobs finished
435  # 5. We merge the output of the jobs
436  # 6. We update the inputs of the jobs (input of next stage is the output of the current stage)
437  # 7. We clean the job directories so they can be used during the next stage again.
438  while is_still_training(args):
439  print('Do available trainings')
440  do_trainings(args)
441 
442  print('Submitting jobs')
443  for i in range(args.nJobs):
444  if not submit_job(args, i):
445  raise RuntimeError('Error during submitting jobs')
446 
447  print('Wait for jobs to end')
448  while not jobs_finished(args):
449  time.sleep(40)
450 
451  print('Merge ROOT files')
452  merge_root_files(args)
453 
454  print('Update input files')
455  update_input_files(args)
456 
457  print('Clean job directory')
458  clean_job_directory(args)
459 
460  if args.once:
461  break
462  else:
463  # This else will be called if the loop was not existed by break
464  # This means the training finished and we can create our summary reports.
465  create_report(args)
def parse_process_url(url)