Belle II Software light-2505-deimos
distributed.py
1#!/usr/bin/env python3
2
3
10
11"""
12 This script can be used to train the FEI on a cluster like available at KEKCC
13 All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
14 The script will automatically create some directories
15 - collection containing weight files, monitoring files and other stuff
16 - jobs containing temporary files during the training (can be deleted afterwards)
17
18 The distributed script automatically spawns jobs on the cluster (or local machine),
19 and runs the steering file on the provided MC.
20 Since a FEI training requires multiple runs over the same MC, it does so multiple times.
21 The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
22
23 In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
24 at each stage.
25
26 At the end it produces summary outputs using printReporting.py and latexReporting.py
27 (this will only work if you use the monitoring mode)
28 In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
29
30 If your training fails for some reason (e.g. a job fails on the cluster),
31 the FEI will stop, you can fix the problem and resume the training using the -x option.
32 This requires some expert knowledge, because you have to know how to fix the occurred problem
33 and at which step you have to resume the training.
34
35 After the training the weight files will be stored in the localdb in the collection directory
36 You have to upload these local database to the Belle II Condition Database if you want to use the FEI everywhere.
37 Alternatively you can just copy the localdb to somewhere and use it directly.
38
39 Example:
40 python3 ~/release/analysis/scripts/fei/distributed.py
41 -s kekcc2
42 -f ~/release/analysis/examples/FEI/B_generic.py
43 -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
44 -n 100
45 -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
46 | head -n 50)
47 $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
48 | head -n 50)
49"""
50
51
52import subprocess
53import sys
54import os
55import argparse
56import glob
57import time
58import stat
59import shutil
60import pickle
61import json
62import b2biiConversion
63import fei
64
65
66def getCommandLineOptions():
67 """ Parses the command line options of the fei and returns the corresponding arguments. """
68 # FEI defines own command line options, therefore we disable
69 # the ROOT command line options, which otherwise interfere sometimes.
70 # Always avoid the top-level 'import ROOT'.
71 import ROOT # noqa
72 ROOT.PyConfig.IgnoreCommandLineOptions = True
73 parser = argparse.ArgumentParser()
74 parser.add_argument('-f', '--steeringFile', dest='steering', type=str, required=True,
75 help='Steering file. Calls fei.get_path()')
76 parser.add_argument('-w', '--workingDirectory', dest='directory', type=str, required=True,
77 help='Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
78 parser.add_argument('-l', '--largeDirectory', dest='large_dir', type=str, default='',
79 help='Directory to store large files')
80 parser.add_argument('-n', '--nJobs', dest='nJobs', type=int, default=100,
81 help='Number of jobs')
82 parser.add_argument('-d', '--data', dest='data', type=str, required=True, action='append', nargs='+',
83 help='Data files in bash expansion syntax or as process_url')
84 parser.add_argument('-x', '--skip-to', dest='skip', type=str, default='',
85 help='Skip setup of directories')
86 parser.add_argument('-o', '--once', dest='once', action='store_true',
87 help='Execute just once time, instead of waiting until a Summary is produced.')
88 parser.add_argument('-s', '--site', dest='site', type=str, default='kekcc',
89 help='Site to use [kekcc|kitekp|local]')
90 args = parser.parse_args()
91 return args
92
93
94def get_job_script(args, i):
95 """
96 Create a bash file which will be dispatched to the batch system.
97 The file will run basf2 on the provided MC or the previous output
98 using the provided steering file.
99 """
100 job_script = f"""
101 if [ -f "{args.directory}/jobs/{i}/basf2_input.root" ]; then
102 INPUT="{args.directory}/jobs/{i}/basf2_input.root"
103 else
104 INPUT="{args.directory}/jobs/{i}/input_*.root"
105 fi
106 time basf2 -l error {args.directory}/collection/basf2_steering_file.py -i "$INPUT" \
107 -o {args.directory}/jobs/{i}/basf2_output.root &> my_output_hack.log || touch basf2_job_error
108 touch basf2_finished_successfully
109 """
110 return job_script
111
112
113def setup(args):
114 """
115 Setup all directories, create job_scripts, split up MC into chunks
116 which are processed by each job. Create symlinks for databases.
117 """
118 os.chdir(args.directory)
119 # Search and partition data files into even chunks
120 data_files = []
121
122 for x in args.data:
123 for y in x:
124 if (y.startswith("http://") or y.startswith("https://")):
125 data_files += b2biiConversion.parse_process_url(y)
126 else:
127 data_files += glob.glob(y)
128 print(f'Found {len(data_files)} MC files')
129 file_sizes = []
130 for file in data_files:
131 file_sizes.append(os.stat(file).st_size)
132 data_files_sorted = [x for _, x in sorted(zip(file_sizes, data_files))]
133 n = int(len(data_files) / args.nJobs)
134 if n < 1:
135 raise RuntimeError(f'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
136 data_chunks = [data_files_sorted[i::args.nJobs] for i in range(args.nJobs)]
137
138 # Create needed directories
139 print(f'Create environment in {args.directory}')
140 shutil.rmtree('collection', ignore_errors=True)
141 shutil.rmtree('jobs', ignore_errors=True)
142 os.mkdir('collection')
143 os.mkdir('collection/localdb')
144 os.mkdir('jobs')
145 if args.large_dir:
146 if not os.path.isdir(args.large_dir):
147 raise RuntimeError('Large dir does not exist. Please make sure it does.')
148
149 shutil.copyfile(args.steering, 'collection/basf2_steering_file.py')
150
151 for i in range(args.nJobs):
152 # Create job directory
153 os.mkdir(f'jobs/{i}')
154 with open(f'jobs/{i}/basf2_script.sh', 'w') as f:
155 f.write(get_job_script(args, i))
156 os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
157 # Symlink initial input data files
158 for j, data_input in enumerate(data_chunks[i]):
159 os.symlink(data_input, f'jobs/{i}/input_{j}.root')
160 # Symlink weight directory and basf2_path
161 os.symlink('../../collection/localdb', f'jobs/{i}/localdb')
162
163
164def create_report(args):
165 """
166 Dumps Summary.pickle to JSON for easy inspection.
167 Create all the reports for the FEI training and the individual mva trainings.
168 This will only work if
169 1) Monitoring mode is used (see FeiConfiguration)
170 2) The system has enough memory to hold the training data for the mva classifiers
171 If this fails you can also copy the collection directory somewhere and
172 execute the commands by hand.
173 """
174 os.chdir(args.directory + '/collection')
175 with open('Summary.pickle', 'rb') as file:
176 summary = pickle.load(file)
177
178 summary_dict = {particle.identifier:
179 {'mvaConfig': particle.mvaConfig._asdict(),
180 'channels': [{field: (value._asdict() if field in ['mvaConfig', 'preCutConfig'] else value) for
181 field, value in channel._asdict().items()} for channel in particle.channels],
182 'preCutConfig': particle.preCutConfig._asdict(),
183 'postCutConfig': particle.postCutConfig._asdict()}
184 for particle in summary[0]}
185 summary_dict.update({'feiConfig': summary[1]._asdict()})
186
187 with open('Summary.json', 'w') as summary_json_file:
188 json.dump(summary_dict, summary_json_file, indent=4)
189
190 ret = subprocess.call('basf2 fei/printReporting.py > ../summary.txt', shell=True)
191 ret = subprocess.call('basf2 fei/latexReporting.py ../summary.tex', shell=True)
192 for i in glob.glob("*.xml"):
193 if not fei.core.Teacher.check_if_weightfile_is_fake(i):
194 subprocess.call(f"basf2_mva_evaluate.py -id '{i[:-4]}.xml' -data 'training_input.root' "
195 f"--treename '{i[:-4]} variables' -o '../{i[:-4]}.zip'", shell=True)
196 os.chdir(args.directory)
197 return ret == 0
198
199
200def submit_job(args, i):
201 """
202 Submits a job to the desired batch system.
203 Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
204 EKP @ KIT, or your local machine
205 """
206 # Synchronize summaries
207 if os.path.isfile(args.directory + '/collection/Summary.pickle'):
208 shutil.copyfile(args.directory + '/collection/Summary.pickle', args.directory + f'/jobs/{i}/Summary.pickle')
209 os.chdir(args.directory + f'/jobs/{i}/')
210 if args.site == 'kekcc':
211 ret = subprocess.call("bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
212 elif args.site == 'kekccs':
213 ret = subprocess.call(f"bsub -q s -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
214 elif 'kekccX' in args.site:
215 ret = subprocess.call(f"bsub -q l -n {args.site[6:]} -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
216 elif args.site == 'kekcc2':
217 ret = subprocess.call("bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
218 elif args.site == 'kitekp':
219 ret = subprocess.call("qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=True) # noqa
220 elif args.site == 'local':
221 subprocess.Popen(['bash', './basf2_script.sh'])
222 ret = 0
223 else:
224 raise RuntimeError(f'Given site {args.site} is not supported')
225 os.chdir(args.directory)
226 return ret == 0
227
228
229def do_trainings(args):
230 """
231 Trains the multivariate classifiers for all available training data in
232 the collection directory, which wasn't trained yet.
233 This is called once per iteration
234 """
235 os.chdir(args.directory + '/collection')
236 if not os.path.isfile('Summary.pickle'):
237 return
238 particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
239 weightfiles = fei.do_trainings(particles, configuration)
240 os.chdir(args.directory)
241 for i in range(args.nJobs):
242 for weightfile_on_disk, _ in weightfiles:
243 os.symlink('../../collection/' + weightfile_on_disk, f'jobs/{i}/' + weightfile_on_disk)
244 # Check if some xml files are missing
245 os.chdir(args.directory + '/collection')
246 xmlfiles = glob.glob("*.xml")
247 os.chdir(args.directory)
248 for i in range(args.nJobs):
249 for xmlfile in xmlfiles:
250 if not (os.path.exists(f'jobs/{i}/' + xmlfile) or os.path.islink(f'jobs/{i}/' + xmlfile)):
251 print("Added missing symlink to ", xmlfile, " in job directory ", i)
252 os.symlink('../../collection/' + xmlfile, f'jobs/{i}/' + xmlfile)
253
254
255def jobs_finished(args):
256 """
257 Check if all jobs already finished.
258 Throws a runtime error of it detects an error in one of the jobs
259 """
260 finished = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
261 failed = glob.glob(args.directory + '/jobs/*/basf2_job_error')
262
263 if len(failed) > 0:
264 raise RuntimeError(f'basf2 execution failed! Error occurred in: {str(failed)}')
265
266 return len(finished) == args.nJobs
267
268
269def merge_root_files(args):
270 """
271 Merges all produced ROOT files of all jobs together
272 and puts the merged ROOT files into the collection directory.
273 This affects mostly
274 - the training data for the multivariate classifiers
275 - the monitoring files
276 """
277 rootfiles = []
278 for f in glob.glob(args.directory + '/jobs/0/*.root'):
279 f = os.path.basename(f)
280 if f in ['basf2_input.root', 'basf2_output.root']:
281 continue
282 if f.startswith('input_'):
283 continue
284 # in case of training_input.root, append to already existing file
285 if os.path.isfile(args.directory + '/collection/' + f) and not f == 'training_input.root':
286 continue
287 rootfiles.append(f)
288 if len(rootfiles) == 0:
289 print('There are no root files to merge')
290 else:
291 print('Merge the following files', rootfiles)
292 for f in rootfiles:
293 output = args.directory + '/collection/' + f
294 inputs = [args.directory + f'/jobs/{i}/' + f for i in range(args.nJobs)]
295 ret = subprocess.call(['analysis-fei-mergefiles', output] + inputs)
296 if ret != 0:
297 raise RuntimeError('Error during merging root files')
298 # Replace mcParticlesCount.root with merged file in all directories
299 # so that the individual jobs calculate the correct mcCounts and sampling rates
300 if f == 'mcParticlesCount.root':
301 for i in inputs:
302 os.remove(i)
303 os.symlink('../../collection/mcParticlesCount.root', i)
304
305
306def update_input_files(args):
307 """
308 Updates the input files.
309 For the first iteration the input files are the MC provided by the user.
310 After each training this function replaces the input with the output of the previous iteration.
311 Effectively this caches the whole DataStore of basf2 between the iterations.
312 """
313 for i in range(args.nJobs):
314 output_file = args.directory + '/jobs/' + str(i) + '/basf2_output.root'
315 input_file = args.directory + '/jobs/' + str(i) + '/basf2_input.root'
316 if args.large_dir:
317 real_input_file = args.large_dir + '/basf2_input_' + str(i) + '.root'
318 shutil.move(output_file, real_input_file)
319 if os.path.isfile(input_file):
320 os.remove(input_file)
321 os.symlink(real_input_file, input_file)
322 else:
323 shutil.move(output_file, input_file)
324 # Saves the Summary.pickle of the first job to the collection directory
325 # so we can keep track at which stage of the reconstruction the FEI is currently.
326 shutil.copyfile(args.directory + '/jobs/0/Summary.pickle', args.directory + '/collection/Summary.pickle')
327
328
329def clean_job_directory(args):
330 """
331 Cleans the job directory for the next iteration
332 Meaning we remove all logs
333 """
334 files = glob.glob(args.directory + '/jobs/*/basf2_finished_successfully')
335 files += glob.glob(args.directory + '/jobs/*/error.log')
336 files += glob.glob(args.directory + '/jobs/*/output.log')
337 for f in files:
338 os.remove(f)
339
340
341def is_still_training(args):
342 """
343 Checks if the FEI training is still ongoing.
344 The training is finished if the FEI reached stage 7
345 """
346 os.chdir(args.directory + '/collection')
347 if not os.path.isfile('Summary.pickle'):
348 return True
349 particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
350 os.chdir(args.directory)
351 return configuration.cache != 7
352
353
354if __name__ == '__main__':
355 args = getCommandLineOptions()
356
357 os.chdir(args.directory)
358
359 # If the user wants resume an existing training
360 # we check at which step he wants to resume and
361 # try to perform the necessary steps to do so
362 if args.skip:
363 print('Skipping setup')
364 start = 0
365 if args.skip == 'clean':
366 start = 1
367 elif args.skip == 'update':
368 start = 2
369 elif args.skip == 'merge':
370 start = 3
371 elif args.skip == 'wait':
372 start = 4
373 elif args.skip == 'submit':
374 start = 5
375 elif args.skip == 'resubmit':
376 start = 6
377 elif args.skip == 'report':
378 start = 7
379 elif args.skip == 'run':
380 start = 0
381 else:
382 raise RuntimeError(f'Unknown skip parameter {args.skip}')
383
384 if start == 7:
385 create_report(args)
386 sys.exit(0)
387
388 # The user wants to submit the jobs again
389 if start >= 5:
390 print('Submitting jobs')
391 for i in range(args.nJobs):
392 # The user wants to resubmit jobs, this means the training of some jobs failed
393 # We check which jobs contained an error flag, and were not successful
394 # These jobs are submitted again, other jobs are skipped (continue)
395 if start >= 6:
396 error_file = args.directory + f'/jobs/{i}/basf2_job_error'
397 success_file = args.directory + f'/jobs/{i}/basf2_finished_successfully'
398 if os.path.isfile(error_file) or not os.path.isfile(success_file):
399 print(f"Delete {error_file} and resubmit job")
400 if os.path.isfile(error_file):
401 os.remove(error_file)
402 if os.path.isfile(success_file):
403 os.remove(success_file)
404 else:
405 continue
406 # Reset Summary file
407 shutil.copyfile(os.path.join(args.directory, 'collection/Summary.pickle'),
408 os.path.join(args.directory, f'jobs/{i}/Summary.pickle'))
409 if not submit_job(args, i):
410 raise RuntimeError('Error during submitting job')
411
412 if start >= 4:
413 print('Wait for jobs to end')
414 while not jobs_finished(args):
415 time.sleep(40)
416
417 if start >= 3:
418 print('Merge ROOT files')
419 merge_root_files(args)
420
421 if start >= 2:
422 print('Update input files')
423 update_input_files(args)
424
425 if start >= 1:
426 print('Clean job directory')
427 clean_job_directory(args)
428
429 else:
430 # This is a new training
431 # So we have to setup the whole directory (this will override any existing training)
432 setup(args)
433
434 # The main loop, which steers the whole FEI training on a batch system
435 # 1. We check if the FEI still requires further steps
436 # 2. We do all necessary trainings which we can perform at this point in time
437 # 3. We submit new jobs which will use the new trainings to reconstruct the hierarchy further
438 # 4. We wait until all jobs finished
439 # 5. We merge the output of the jobs
440 # 6. We update the inputs of the jobs (input of next stage is the output of the current stage)
441 # 7. We clean the job directories so they can be used during the next stage again.
442 while is_still_training(args):
443 print('Do available trainings')
444 do_trainings(args)
445
446 print('Submitting jobs')
447 for i in range(args.nJobs):
448 if not submit_job(args, i):
449 raise RuntimeError('Error during submitting jobs')
450
451 print('Wait for jobs to end')
452 while not jobs_finished(args):
453 time.sleep(40)
454
455 print('Merge ROOT files')
456 merge_root_files(args)
457
458 print('Update input files')
459 update_input_files(args)
460
461 print('Clean job directory')
462 clean_job_directory(args)
463
464 if args.once:
465 break
466 else:
467 # This else will be called if the loop was not existed by break
468 # This means the training finished and we can create our summary reports.
469 create_report(args)