Belle II Software  release-06-02-00
distributed.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12  This script can be used to train the FEI on a cluster like available at KEKCC
13  All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
14  The script will automatically create some directories
15  - collection containing weight files, monitoring files and other stuff
16  - jobs containing temporary files during the training (can be deleted afterwards)
17 
18  The distributed script automatically spawns jobs on the cluster (or local machine),
19  and runs the steering file on the provided MC.
20  Since a FEI training requires multiple runs over the same MC, it does so multiple times.
21  The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
22 
23  In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
24  at each stage.
25 
26  At the end it produces summary outputs using printReporting.py and latexReporting.py
27  (this will only work if you use the monitoring mode)
28  In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
29 
30  If your training fails for some reason (e.g. a job fails on the cluster),
31  the FEI will stop, you can fix the problem and resume the training using the -x option.
32  This requires some expert knowledge, because you have to know how to fix the occurred problem
33  and at which step you have to resume the training.
34 
35  After the training the weight files will be stored in the localdb in the collection directory
36  You have to upload these local database to the Belle 2 Condition Database if you want to use the FEI everywhere.
37  Alternatively you can just copy the localdb to somewhere and use it directly.
38 
39  Example:
40  python3 ~/release/analysis/scripts/fei/distributed.py
41  -s kekcc2
42  -f ~/release/analysis/examples/FEI/B_generic.py
43  -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
44  -n 100
45  -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
46  | head -n 50)
47  $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
48  | head -n 50)
49 """
50 
51 
52 import subprocess
53 import sys
54 import os
55 import argparse
56 import glob
57 import time
58 import stat
59 import shutil
60 import pickle
61 import json
62 import b2biiConversion
63 import fei
64 
65 
66 def getCommandLineOptions():
67  """ Parses the command line options of the fei and returns the corresponding arguments. """
68  parser = argparse.ArgumentParser()
69  parser.add_argument('-f', '--steeringFile', dest='steering', type=str, required=True,
70  help='Steering file. Calls fei.get_path()')
71  parser.add_argument('-w', '--workingDirectory', dest='directory', type=str, required=True,
72  help='Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
73  parser.add_argument('-l', '--largeDirectory', dest='large_dir', type=str, default='',
74  help='Directory to store large files')
75  parser.add_argument('-n', '--nJobs', dest='nJobs', type=int, default=100,
76  help='Number of jobs')
77  parser.add_argument('-d', '--data', dest='data', type=str, required=True, action='append', nargs='+',
78  help='Data files in bash expansion syntax or as process_url')
79  parser.add_argument('-x', '--skip-to', dest='skip', type=str, default='',
80  help='Skip setup of directories')
81  parser.add_argument('-o', '--once', dest='once', action='store_true',
82  help='Execute just once time, instead of waiting until a Summary is produced.')
83  parser.add_argument('-s', '--site', dest='site', type=str, default='kekcc',
84  help='Site to use [kekcc|kitekp|local]')
85  args = parser.parse_args()
86  return args
87 
88 
89 def get_job_script(args, i):
90  """
91  Create a bash file which will be dispatched to the batch system.
92  The file will run basf2 on the provided MC or the previous output
93  using the provided steering file.
94  """
95  job_script = f"""
96  if [ -f "{args.directory}/jobs/{i}/basf2_input.root" ]; then
97  INPUT="{args.directory}/jobs/{i}/basf2_input.root"
98  else
99  INPUT="{args.directory}/jobs/{i}/input_*.root"
100  fi
101  time basf2 -l error {args.directory}/collection/basf2_steering_file.py -i "$INPUT" \
102  -o {args.directory}/jobs/{i}/basf2_output.root &> my_output_hack.log || touch basf2_job_error
103  touch basf2_finished_successfully
104  """
105  return job_script
106 
107 
108 def setup(args):
109  """
110  Setup all directories, create job_scripts, split up MC into chunks
111  which are processed by each job. Create symlinks for databases.
112  """
113  os.chdir(args.directory)
114  # Search and partition data files into even chunks
115  data_files = []
116 
117  for x in args.data:
118  for y in x:
119  if (y.startswith("http://") or y.startswith("https://")):
120  data_files += b2biiConversion.parse_process_url(y)
121  else:
122  data_files += glob.glob(y)
123  print(f'Found {len(data_files)} MC files')
124  file_sizes = []
125  for file in data_files:
126  file_sizes.append(os.stat(file).st_size)
127  data_files_sorted = [x for _, x in sorted(zip(file_sizes, data_files))]
128  n = int(len(data_files) / args.nJobs)
129  if n < 1:
130  raise RuntimeError(f'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
131  data_chunks = [data_files_sorted[i::args.nJobs] for i in range(args.nJobs)]
132 
133  # Create needed directories
134  print(f'Create environment in {args.directory}')
135  shutil.rmtree('collection', ignore_errors=True)
136  shutil.rmtree('jobs', ignore_errors=True)
137  os.mkdir('collection')
138  os.mkdir('collection/localdb')
139  os.mkdir('jobs')
140  if args.large_dir:
141  if not os.path.isdir(args.large_dir):
142  raise RuntimeError('Large dir does not exist. Please make sure it does.')
143 
144  shutil.copyfile(args.steering, 'collection/basf2_steering_file.py')
145 
146  for i in range(args.nJobs):
147  # Create job directory
148  os.mkdir(f'jobs/{i}')
149  with open(f'jobs/{i}/basf2_script.sh', 'w') as f:
150  f.write(get_job_script(args, i))
151  os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
152  # Symlink initial input data files
153  for j, data_input in enumerate(data_chunks[i]):
154  os.symlink(data_input, f'jobs/{i}/input_{j}.root')
155  # Symlink weight directory and basf2_path
156  os.symlink(args.directory + '/collection/localdb', f'jobs/{i}/localdb')
157 
158 
159 def create_report(args):
160  """
161  Dumps Summary.pickle to JSON for easy inspection.
162  Create all the reports for the FEI training and the individual mva trainings.
163  This will only work if
164  1) Monitoring mode is used (see FeiConfiguration)
165  2) The system has enough memory to hold the training data for the mva classifiers
166  If this fails you can also copy the collection directory somewhere and
167  execute the commands by hand.
168  """
169  os.chdir(args.directory + '/collection')
170  with open('Summary.pickle', 'rb') as file:
171  summary = pickle.load(file)
172 
173  summary_dict = {particle.identifier:
174  {'mvaConfig': particle.mvaConfig._asdict(),
175  'channels': [{field: (value._asdict() if field in ['mvaConfig', 'preCutConfig'] else value) for
176  field, value in channel._asdict().items()} for channel in particle.channels],
177  'preCutConfig': particle.preCutConfig._asdict(),
178  'postCutConfig': particle.postCutConfig._asdict()}
179  for particle in summary[0]}
180  summary_dict.update({'feiConfig': summary[1]._asdict()})
181 
182  with open('Summary.json', 'w') as summary_json_file:
183  json.dump(summary_dict, summary_json_file, indent=4)
184 
185  ret = subprocess.call('basf2 fei/printReporting.py > ../summary.txt', shell=True)
186  ret = subprocess.call('basf2 fei/latexReporting.py ../summary.tex', shell=True)
187  for i in glob.glob("*.xml"):
188  if not fei.core.Teacher.check_if_weightfile_is_fake(i):
189  subprocess.call(f"basf2_mva_evaluate.py -id '{i[:-4]}.xml' -data 'training_input.root' "
190  f"--treename '{i[:-4]} variables' -o '../{i[:-4]}.zip'", shell=True)
191  os.chdir(args.directory)
192  return ret == 0
193 
194 
195 def submit_job(args, i):
196  """
197  Submits a job to the desired batch system.
198  Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
199  EKP @ KIT, or your local machine
200  """
201  # Synchronize summaries
202  if os.path.isfile(args.directory + '/collection/Summary.pickle'):
203  shutil.copyfile(args.directory + '/collection/Summary.pickle', args.directory + f'/jobs/{i}/Summary.pickle')
204  os.chdir(args.directory + f'/jobs/{i}/')
205  if args.site == 'kekcc':
206  ret = subprocess.call("bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
207  elif args.site == 'kekcc2':
208  ret = subprocess.call("bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
209  elif args.site == 'kitekp':
210  ret = subprocess.call("qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=True) # noqa
211  elif args.site == 'local':
212  subprocess.Popen(['bash', './basf2_script.sh'])
213  ret = 0
214  else:
215  raise RuntimeError(f'Given site {args.site} is not supported')
216  os.chdir(args.directory)
217  return ret == 0
218 
219 
220 def do_trainings(args):
221  """
222  Trains the multivariate classifiers for all available training data in
223  the collection directory, which wasn't trained yet.
224  This is called once per iteration
225  """
226  os.chdir(args.directory + '/collection')
227  if not os.path.isfile('Summary.pickle'):
228  return
229  particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
230  weightfiles = fei.do_trainings(particles, configuration)
231  for i in range(args.nJobs):
232  for weightfile_on_disk, _ in weightfiles:
233  os.symlink(args.directory + '/collection/' + weightfile_on_disk,
234  args.directory + f'/jobs/{i}/' + weightfile_on_disk)
235  # Check if some xml files are missing
236  xmlfiles = glob.glob("*.xml")
237  for i in range(args.nJobs):
238  for xmlfile in xmlfiles:
239  if not os.path.isfile(args.directory + f'/jobs/{i}/' + xmlfile):
240  print("Added missing symlink to ", xmlfile, " in job directory ", i)
241  os.symlink(args.directory + '/collection/' + xmlfile,
242  args.directory + f'/jobs/{i}/' + xmlfile)
243  os.chdir(args.directory)
244 
245 
246 def jobs_finished(args):
247  """
248  Check if all jobs already finished.
249  Throws a runtime error of it detects an error in one of the jobs
250  """
251  finished = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
252  failed = glob.glob(args.directory + '/jobs/*/basf2_job_error')
253 
254  if len(failed) > 0:
255  raise RuntimeError(f'basf2 execution failed! Error occurred in: {str(failed)}')
256 
257  return len(finished) == args.nJobs
258 
259 
260 def merge_root_files(args):
261  """
262  Merges all produced ROOT files of all jobs together
263  and puts the merged ROOT files into the collection directory.
264  This affects mostly
265  - the training data for the multivariate classifiers
266  - the monitoring files
267  """
268  rootfiles = []
269  for f in glob.glob(args.directory + '/jobs/0/*.root'):
270  f = os.path.basename(f)
271  if f in ['basf2_input.root', 'basf2_output.root']:
272  continue
273  if f.startswith('input_'):
274  continue
275  # in case of training_input.root, append to already existing file
276  if os.path.isfile(args.directory + '/collection/' + f) and not f == 'training_input.root':
277  continue
278  rootfiles.append(f)
279  if len(rootfiles) == 0:
280  print('There are no root files to merge')
281  else:
282  print('Merge the following files', rootfiles)
283  for f in rootfiles:
284  output = args.directory + '/collection/' + f
285  inputs = [args.directory + f'/jobs/{i}/' + f for i in range(args.nJobs)]
286  ret = subprocess.call(['analysis-fei-mergefiles', output] + inputs)
287  if ret != 0:
288  raise RuntimeError('Error during merging root files')
289  # Replace mcParticlesCount.root with merged file in all directories
290  # so that the individual jobs calculate the correct mcCounts and sampling rates
291  if f == 'mcParticlesCount.root':
292  for i in inputs:
293  os.remove(i)
294  os.symlink(output, i)
295 
296 
297 def update_input_files(args):
298  """
299  Updates the input files.
300  For the first iteration the input files are the MC provided by the user.
301  After each training this function replaces the input with the output of the previous iteration.
302  Effectively this caches the whole DataStore of basf2 between the iterations.
303  """
304  for i in range(args.nJobs):
305  output_file = args.directory + '/jobs/' + str(i) + '/basf2_output.root'
306  input_file = args.directory + '/jobs/' + str(i) + '/basf2_input.root'
307  if args.large_dir:
308  real_input_file = args.large_dir + '/basf2_input_' + str(i) + '.root'
309  shutil.move(output_file, real_input_file)
310  if os.path.isfile(input_file):
311  os.remove(input_file)
312  os.symlink(real_input_file, input_file)
313  else:
314  shutil.move(output_file, input_file)
315  # Saves the Summary.pickle of the first job to the collection directory
316  # so we can keep track at which stage of the reconstruction the FEI is currently.
317  shutil.copyfile(args.directory + '/jobs/0/Summary.pickle', args.directory + '/collection/Summary.pickle')
318 
319 
320 def clean_job_directory(args):
321  """
322  Cleans the job directory for the next iteration
323  Meaning we remove all logs
324  """
325  files = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
326  files += glob.glob(args.directory + '/jobs/*/error.log')
327  files += glob.glob(args.directory + '/jobs/*/output.log')
328  for f in files:
329  os.remove(f)
330 
331 
332 def is_still_training(args):
333  """
334  Checks if the FEI training is still ongoing.
335  The training is finished if the FEI reached stage 7
336  """
337  os.chdir(args.directory + '/collection')
338  if not os.path.isfile('Summary.pickle'):
339  return True
340  particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
341  os.chdir(args.directory)
342  return configuration.cache != 7
343 
344 
345 if __name__ == '__main__':
346  args = getCommandLineOptions()
347 
348  os.chdir(args.directory)
349 
350  # If the user wants resume an existing training
351  # we check at which step he wants to resume and
352  # try to perform the necessary steps to do so
353  if args.skip:
354  print('Skipping setup')
355  start = 0
356  if args.skip == 'clean':
357  start = 1
358  elif args.skip == 'update':
359  start = 2
360  elif args.skip == 'merge':
361  start = 3
362  elif args.skip == 'wait':
363  start = 4
364  elif args.skip == 'submit':
365  start = 5
366  elif args.skip == 'resubmit':
367  start = 6
368  elif args.skip == 'report':
369  start = 7
370  elif args.skip == 'run':
371  start = 0
372  else:
373  raise RuntimeError(f'Unknown skip parameter {args.skip}')
374 
375  if start == 7:
376  create_report(args)
377  sys.exit(0)
378 
379  # The user wants to submit the jobs again
380  if start >= 5:
381  print('Submitting jobs')
382  for i in range(args.nJobs):
383  # The user wants to resubmit jobs, this means the training of some jobs failed
384  # We check which jobs contained an error flag, and were not successful
385  # These jobs are submitted again, other jobs are skipped (continue)
386  if start >= 6:
387  error_file = args.directory + f'/jobs/{i}/basf2_job_error'
388  success_file = args.directory + f'/jobs/{i}/basf2_finished_successfully'
389  if os.path.isfile(error_file) or not os.path.isfile(success_file):
390  print(f"Delete {error_file} and resubmit job")
391  if os.path.isfile(error_file):
392  os.remove(error_file)
393  if os.path.isfile(success_file):
394  os.remove(success_file)
395  else:
396  continue
397  # Reset Summary file
398  shutil.copyfile(os.path.join(args.directory, 'collection/Summary.pickle'),
399  os.path.join(args.directory, f'jobs/{i}/Summary.pickle'))
400  if not submit_job(args, i):
401  raise RuntimeError('Error during submitting job')
402 
403  if start >= 4:
404  print('Wait for jobs to end')
405  while not jobs_finished(args):
406  time.sleep(40)
407 
408  if start >= 3:
409  print('Merge ROOT files')
410  merge_root_files(args)
411 
412  if start >= 2:
413  print('Update input files')
414  update_input_files(args)
415 
416  if start >= 1:
417  print('Clean job directory')
418  clean_job_directory(args)
419 
420  else:
421  # This is a new training
422  # So we have to setup the whole directory (this will override any existing training)
423  setup(args)
424 
425  # The main loop, which steers the whole FEI training on a batch system
426  # 1. We check if the FEI still requires further steps
427  # 2. We do all necessary trainings which we can perform at this point in time
428  # 3. We submit new jobs which will use the new trainings to reconstruct the hierarchy further
429  # 4. We wait until all jobs finished
430  # 5. We merge the output of the jobs
431  # 6. We update the inputs of the jobs (input of next stage is the output of the current stage)
432  # 7. We clean the job directories so they can be used during the next stage again.
433  while is_still_training(args):
434  print('Do available trainings')
435  do_trainings(args)
436 
437  print('Submitting jobs')
438  for i in range(args.nJobs):
439  if not submit_job(args, i):
440  raise RuntimeError('Error during submitting jobs')
441 
442  print('Wait for jobs to end')
443  while not jobs_finished(args):
444  time.sleep(40)
445 
446  print('Merge ROOT files')
447  merge_root_files(args)
448 
449  print('Update input files')
450  update_input_files(args)
451 
452  print('Clean job directory')
453  clean_job_directory(args)
454 
455  if args.once:
456  break
457  else:
458  # This else will be called if the loop was not existed by break
459  # This means the training finished and we can create our summary reports.
460  create_report(args)
def parse_process_url(url)