Belle II Software  light-2205-abys
distributed.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12  This script can be used to train the FEI on a cluster like available at KEKCC
13  All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
14  The script will automatically create some directories
15  - collection containing weight files, monitoring files and other stuff
16  - jobs containing temporary files during the training (can be deleted afterwards)
17 
18  The distributed script automatically spawns jobs on the cluster (or local machine),
19  and runs the steering file on the provided MC.
20  Since a FEI training requires multiple runs over the same MC, it does so multiple times.
21  The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
22 
23  In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
24  at each stage.
25 
26  At the end it produces summary outputs using printReporting.py and latexReporting.py
27  (this will only work if you use the monitoring mode)
28  In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
29 
30  If your training fails for some reason (e.g. a job fails on the cluster),
31  the FEI will stop, you can fix the problem and resume the training using the -x option.
32  This requires some expert knowledge, because you have to know how to fix the occurred problem
33  and at which step you have to resume the training.
34 
35  After the training the weight files will be stored in the localdb in the collection directory
36  You have to upload these local database to the Belle 2 Condition Database if you want to use the FEI everywhere.
37  Alternatively you can just copy the localdb to somewhere and use it directly.
38 
39  Example:
40  python3 ~/release/analysis/scripts/fei/distributed.py
41  -s kekcc2
42  -f ~/release/analysis/examples/FEI/B_generic.py
43  -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
44  -n 100
45  -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
46  | head -n 50)
47  $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
48  | head -n 50)
49 """
50 
51 
52 import subprocess
53 import sys
54 import os
55 import argparse
56 import glob
57 import time
58 import stat
59 import shutil
60 import pickle
61 import json
62 import b2biiConversion
63 import fei
64 
65 
66 def getCommandLineOptions():
67  """ Parses the command line options of the fei and returns the corresponding arguments. """
68  # FEI defines own command line options, therefore we disable
69  # the ROOT command line options, which otherwise interfere sometimes.
70  import ROOT # noqa
71  ROOT.PyConfig.IgnoreCommandLineOptions = True
72  parser = argparse.ArgumentParser()
73  parser.add_argument('-f', '--steeringFile', dest='steering', type=str, required=True,
74  help='Steering file. Calls fei.get_path()')
75  parser.add_argument('-w', '--workingDirectory', dest='directory', type=str, required=True,
76  help='Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
77  parser.add_argument('-l', '--largeDirectory', dest='large_dir', type=str, default='',
78  help='Directory to store large files')
79  parser.add_argument('-n', '--nJobs', dest='nJobs', type=int, default=100,
80  help='Number of jobs')
81  parser.add_argument('-d', '--data', dest='data', type=str, required=True, action='append', nargs='+',
82  help='Data files in bash expansion syntax or as process_url')
83  parser.add_argument('-x', '--skip-to', dest='skip', type=str, default='',
84  help='Skip setup of directories')
85  parser.add_argument('-o', '--once', dest='once', action='store_true',
86  help='Execute just once time, instead of waiting until a Summary is produced.')
87  parser.add_argument('-s', '--site', dest='site', type=str, default='kekcc',
88  help='Site to use [kekcc|kitekp|local]')
89  args = parser.parse_args()
90  return args
91 
92 
93 def get_job_script(args, i):
94  """
95  Create a bash file which will be dispatched to the batch system.
96  The file will run basf2 on the provided MC or the previous output
97  using the provided steering file.
98  """
99  job_script = f"""
100  if [ -f "{args.directory}/jobs/{i}/basf2_input.root" ]; then
101  INPUT="{args.directory}/jobs/{i}/basf2_input.root"
102  else
103  INPUT="{args.directory}/jobs/{i}/input_*.root"
104  fi
105  time basf2 -l error {args.directory}/collection/basf2_steering_file.py -i "$INPUT" \
106  -o {args.directory}/jobs/{i}/basf2_output.root &> my_output_hack.log || touch basf2_job_error
107  touch basf2_finished_successfully
108  """
109  return job_script
110 
111 
112 def setup(args):
113  """
114  Setup all directories, create job_scripts, split up MC into chunks
115  which are processed by each job. Create symlinks for databases.
116  """
117  os.chdir(args.directory)
118  # Search and partition data files into even chunks
119  data_files = []
120 
121  for x in args.data:
122  for y in x:
123  if (y.startswith("http://") or y.startswith("https://")):
124  data_files += b2biiConversion.parse_process_url(y)
125  else:
126  data_files += glob.glob(y)
127  print(f'Found {len(data_files)} MC files')
128  file_sizes = []
129  for file in data_files:
130  file_sizes.append(os.stat(file).st_size)
131  data_files_sorted = [x for _, x in sorted(zip(file_sizes, data_files))]
132  n = int(len(data_files) / args.nJobs)
133  if n < 1:
134  raise RuntimeError(f'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
135  data_chunks = [data_files_sorted[i::args.nJobs] for i in range(args.nJobs)]
136 
137  # Create needed directories
138  print(f'Create environment in {args.directory}')
139  shutil.rmtree('collection', ignore_errors=True)
140  shutil.rmtree('jobs', ignore_errors=True)
141  os.mkdir('collection')
142  os.mkdir('collection/localdb')
143  os.mkdir('jobs')
144  if args.large_dir:
145  if not os.path.isdir(args.large_dir):
146  raise RuntimeError('Large dir does not exist. Please make sure it does.')
147 
148  shutil.copyfile(args.steering, 'collection/basf2_steering_file.py')
149 
150  for i in range(args.nJobs):
151  # Create job directory
152  os.mkdir(f'jobs/{i}')
153  with open(f'jobs/{i}/basf2_script.sh', 'w') as f:
154  f.write(get_job_script(args, i))
155  os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
156  # Symlink initial input data files
157  for j, data_input in enumerate(data_chunks[i]):
158  os.symlink(data_input, f'jobs/{i}/input_{j}.root')
159  # Symlink weight directory and basf2_path
160  os.symlink(args.directory + '/collection/localdb', f'jobs/{i}/localdb')
161 
162 
163 def create_report(args):
164  """
165  Dumps Summary.pickle to JSON for easy inspection.
166  Create all the reports for the FEI training and the individual mva trainings.
167  This will only work if
168  1) Monitoring mode is used (see FeiConfiguration)
169  2) The system has enough memory to hold the training data for the mva classifiers
170  If this fails you can also copy the collection directory somewhere and
171  execute the commands by hand.
172  """
173  os.chdir(args.directory + '/collection')
174  with open('Summary.pickle', 'rb') as file:
175  summary = pickle.load(file)
176 
177  summary_dict = {particle.identifier:
178  {'mvaConfig': particle.mvaConfig._asdict(),
179  'channels': [{field: (value._asdict() if field in ['mvaConfig', 'preCutConfig'] else value) for
180  field, value in channel._asdict().items()} for channel in particle.channels],
181  'preCutConfig': particle.preCutConfig._asdict(),
182  'postCutConfig': particle.postCutConfig._asdict()}
183  for particle in summary[0]}
184  summary_dict.update({'feiConfig': summary[1]._asdict()})
185 
186  with open('Summary.json', 'w') as summary_json_file:
187  json.dump(summary_dict, summary_json_file, indent=4)
188 
189  ret = subprocess.call('basf2 fei/printReporting.py > ../summary.txt', shell=True)
190  ret = subprocess.call('basf2 fei/latexReporting.py ../summary.tex', shell=True)
191  for i in glob.glob("*.xml"):
192  if not fei.core.Teacher.check_if_weightfile_is_fake(i):
193  subprocess.call(f"basf2_mva_evaluate.py -id '{i[:-4]}.xml' -data 'training_input.root' "
194  f"--treename '{i[:-4]} variables' -o '../{i[:-4]}.zip'", shell=True)
195  os.chdir(args.directory)
196  return ret == 0
197 
198 
199 def submit_job(args, i):
200  """
201  Submits a job to the desired batch system.
202  Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
203  EKP @ KIT, or your local machine
204  """
205  # Synchronize summaries
206  if os.path.isfile(args.directory + '/collection/Summary.pickle'):
207  shutil.copyfile(args.directory + '/collection/Summary.pickle', args.directory + f'/jobs/{i}/Summary.pickle')
208  os.chdir(args.directory + f'/jobs/{i}/')
209  if args.site == 'kekcc':
210  ret = subprocess.call("bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
211  elif args.site == 'kekcc2':
212  ret = subprocess.call("bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
213  elif args.site == 'kitekp':
214  ret = subprocess.call("qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=True) # noqa
215  elif args.site == 'local':
216  subprocess.Popen(['bash', './basf2_script.sh'])
217  ret = 0
218  else:
219  raise RuntimeError(f'Given site {args.site} is not supported')
220  os.chdir(args.directory)
221  return ret == 0
222 
223 
224 def do_trainings(args):
225  """
226  Trains the multivariate classifiers for all available training data in
227  the collection directory, which wasn't trained yet.
228  This is called once per iteration
229  """
230  os.chdir(args.directory + '/collection')
231  if not os.path.isfile('Summary.pickle'):
232  return
233  particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
234  weightfiles = fei.do_trainings(particles, configuration)
235  for i in range(args.nJobs):
236  for weightfile_on_disk, _ in weightfiles:
237  os.symlink(args.directory + '/collection/' + weightfile_on_disk,
238  args.directory + f'/jobs/{i}/' + weightfile_on_disk)
239  # Check if some xml files are missing
240  xmlfiles = glob.glob("*.xml")
241  for i in range(args.nJobs):
242  for xmlfile in xmlfiles:
243  if not os.path.isfile(args.directory + f'/jobs/{i}/' + xmlfile):
244  print("Added missing symlink to ", xmlfile, " in job directory ", i)
245  os.symlink(args.directory + '/collection/' + xmlfile,
246  args.directory + f'/jobs/{i}/' + xmlfile)
247  os.chdir(args.directory)
248 
249 
250 def jobs_finished(args):
251  """
252  Check if all jobs already finished.
253  Throws a runtime error of it detects an error in one of the jobs
254  """
255  finished = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
256  failed = glob.glob(args.directory + '/jobs/*/basf2_job_error')
257 
258  if len(failed) > 0:
259  raise RuntimeError(f'basf2 execution failed! Error occurred in: {str(failed)}')
260 
261  return len(finished) == args.nJobs
262 
263 
264 def merge_root_files(args):
265  """
266  Merges all produced ROOT files of all jobs together
267  and puts the merged ROOT files into the collection directory.
268  This affects mostly
269  - the training data for the multivariate classifiers
270  - the monitoring files
271  """
272  rootfiles = []
273  for f in glob.glob(args.directory + '/jobs/0/*.root'):
274  f = os.path.basename(f)
275  if f in ['basf2_input.root', 'basf2_output.root']:
276  continue
277  if f.startswith('input_'):
278  continue
279  # in case of training_input.root, append to already existing file
280  if os.path.isfile(args.directory + '/collection/' + f) and not f == 'training_input.root':
281  continue
282  rootfiles.append(f)
283  if len(rootfiles) == 0:
284  print('There are no root files to merge')
285  else:
286  print('Merge the following files', rootfiles)
287  for f in rootfiles:
288  output = args.directory + '/collection/' + f
289  inputs = [args.directory + f'/jobs/{i}/' + f for i in range(args.nJobs)]
290  ret = subprocess.call(['analysis-fei-mergefiles', output] + inputs)
291  if ret != 0:
292  raise RuntimeError('Error during merging root files')
293  # Replace mcParticlesCount.root with merged file in all directories
294  # so that the individual jobs calculate the correct mcCounts and sampling rates
295  if f == 'mcParticlesCount.root':
296  for i in inputs:
297  os.remove(i)
298  os.symlink(output, i)
299 
300 
301 def update_input_files(args):
302  """
303  Updates the input files.
304  For the first iteration the input files are the MC provided by the user.
305  After each training this function replaces the input with the output of the previous iteration.
306  Effectively this caches the whole DataStore of basf2 between the iterations.
307  """
308  for i in range(args.nJobs):
309  output_file = args.directory + '/jobs/' + str(i) + '/basf2_output.root'
310  input_file = args.directory + '/jobs/' + str(i) + '/basf2_input.root'
311  if args.large_dir:
312  real_input_file = args.large_dir + '/basf2_input_' + str(i) + '.root'
313  shutil.move(output_file, real_input_file)
314  if os.path.isfile(input_file):
315  os.remove(input_file)
316  os.symlink(real_input_file, input_file)
317  else:
318  shutil.move(output_file, input_file)
319  # Saves the Summary.pickle of the first job to the collection directory
320  # so we can keep track at which stage of the reconstruction the FEI is currently.
321  shutil.copyfile(args.directory + '/jobs/0/Summary.pickle', args.directory + '/collection/Summary.pickle')
322 
323 
324 def clean_job_directory(args):
325  """
326  Cleans the job directory for the next iteration
327  Meaning we remove all logs
328  """
329  files = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
330  files += glob.glob(args.directory + '/jobs/*/error.log')
331  files += glob.glob(args.directory + '/jobs/*/output.log')
332  for f in files:
333  os.remove(f)
334 
335 
336 def is_still_training(args):
337  """
338  Checks if the FEI training is still ongoing.
339  The training is finished if the FEI reached stage 7
340  """
341  os.chdir(args.directory + '/collection')
342  if not os.path.isfile('Summary.pickle'):
343  return True
344  particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
345  os.chdir(args.directory)
346  return configuration.cache != 7
347 
348 
349 if __name__ == '__main__':
350  args = getCommandLineOptions()
351 
352  os.chdir(args.directory)
353 
354  # If the user wants resume an existing training
355  # we check at which step he wants to resume and
356  # try to perform the necessary steps to do so
357  if args.skip:
358  print('Skipping setup')
359  start = 0
360  if args.skip == 'clean':
361  start = 1
362  elif args.skip == 'update':
363  start = 2
364  elif args.skip == 'merge':
365  start = 3
366  elif args.skip == 'wait':
367  start = 4
368  elif args.skip == 'submit':
369  start = 5
370  elif args.skip == 'resubmit':
371  start = 6
372  elif args.skip == 'report':
373  start = 7
374  elif args.skip == 'run':
375  start = 0
376  else:
377  raise RuntimeError(f'Unknown skip parameter {args.skip}')
378 
379  if start == 7:
380  create_report(args)
381  sys.exit(0)
382 
383  # The user wants to submit the jobs again
384  if start >= 5:
385  print('Submitting jobs')
386  for i in range(args.nJobs):
387  # The user wants to resubmit jobs, this means the training of some jobs failed
388  # We check which jobs contained an error flag, and were not successful
389  # These jobs are submitted again, other jobs are skipped (continue)
390  if start >= 6:
391  error_file = args.directory + f'/jobs/{i}/basf2_job_error'
392  success_file = args.directory + f'/jobs/{i}/basf2_finished_successfully'
393  if os.path.isfile(error_file) or not os.path.isfile(success_file):
394  print(f"Delete {error_file} and resubmit job")
395  if os.path.isfile(error_file):
396  os.remove(error_file)
397  if os.path.isfile(success_file):
398  os.remove(success_file)
399  else:
400  continue
401  # Reset Summary file
402  shutil.copyfile(os.path.join(args.directory, 'collection/Summary.pickle'),
403  os.path.join(args.directory, f'jobs/{i}/Summary.pickle'))
404  if not submit_job(args, i):
405  raise RuntimeError('Error during submitting job')
406 
407  if start >= 4:
408  print('Wait for jobs to end')
409  while not jobs_finished(args):
410  time.sleep(40)
411 
412  if start >= 3:
413  print('Merge ROOT files')
414  merge_root_files(args)
415 
416  if start >= 2:
417  print('Update input files')
418  update_input_files(args)
419 
420  if start >= 1:
421  print('Clean job directory')
422  clean_job_directory(args)
423 
424  else:
425  # This is a new training
426  # So we have to setup the whole directory (this will override any existing training)
427  setup(args)
428 
429  # The main loop, which steers the whole FEI training on a batch system
430  # 1. We check if the FEI still requires further steps
431  # 2. We do all necessary trainings which we can perform at this point in time
432  # 3. We submit new jobs which will use the new trainings to reconstruct the hierarchy further
433  # 4. We wait until all jobs finished
434  # 5. We merge the output of the jobs
435  # 6. We update the inputs of the jobs (input of next stage is the output of the current stage)
436  # 7. We clean the job directories so they can be used during the next stage again.
437  while is_still_training(args):
438  print('Do available trainings')
439  do_trainings(args)
440 
441  print('Submitting jobs')
442  for i in range(args.nJobs):
443  if not submit_job(args, i):
444  raise RuntimeError('Error during submitting jobs')
445 
446  print('Wait for jobs to end')
447  while not jobs_finished(args):
448  time.sleep(40)
449 
450  print('Merge ROOT files')
451  merge_root_files(args)
452 
453  print('Update input files')
454  update_input_files(args)
455 
456  print('Clean job directory')
457  clean_job_directory(args)
458 
459  if args.once:
460  break
461  else:
462  # This else will be called if the loop was not existed by break
463  # This means the training finished and we can create our summary reports.
464  create_report(args)
def parse_process_url(url)