12 This script can be used to train the FEI on a cluster like available at KEKCC
13 All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
14 The script will automatically create some directories
15 - collection containing weight files, monitoring files and other stuff
16 - jobs containing temporary files during the training (can be deleted afterwards)
18 The distributed script automatically spawns jobs on the cluster (or local machine),
19 and runs the steering file on the provided MC.
20 Since a FEI training requires multiple runs over the same MC, it does so multiple times.
21 The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
23 In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
26 At the end it produces summary outputs using printReporting.py and latexReporting.py
27 (this will only work if you use the monitoring mode)
28 In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
30 If your training fails for some reason (e.g. a job fails on the cluster),
31 the FEI will stop, you can fix the problem and resume the training using the -x option.
32 This requires some expert knowledge, because you have to know how to fix the occurred problem
33 and at which step you have to resume the training.
35 After the training the weight files will be stored in the localdb in the collection directory
36 You have to upload these local database to the Belle 2 Condition Database if you want to use the FEI everywhere.
37 Alternatively you can just copy the localdb to somewhere and use it directly.
40 python3 ~/release/analysis/scripts/fei/distributed.py
42 -f ~/release/analysis/examples/FEI/B_generic.py
43 -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
45 -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
47 $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
62 import b2biiConversion
66 def getCommandLineOptions():
67 """ Parses the command line options of the fei and returns the corresponding arguments. """
71 ROOT.PyConfig.IgnoreCommandLineOptions =
True
72 parser = argparse.ArgumentParser()
73 parser.add_argument(
'-f',
'--steeringFile', dest=
'steering', type=str, required=
True,
74 help=
'Steering file. Calls fei.get_path()')
75 parser.add_argument(
'-w',
'--workingDirectory', dest=
'directory', type=str, required=
True,
76 help=
'Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
77 parser.add_argument(
'-l',
'--largeDirectory', dest=
'large_dir', type=str, default=
'',
78 help=
'Directory to store large files')
79 parser.add_argument(
'-n',
'--nJobs', dest=
'nJobs', type=int, default=100,
80 help=
'Number of jobs')
81 parser.add_argument(
'-d',
'--data', dest=
'data', type=str, required=
True, action=
'append', nargs=
'+',
82 help=
'Data files in bash expansion syntax or as process_url')
83 parser.add_argument(
'-x',
'--skip-to', dest=
'skip', type=str, default=
'',
84 help=
'Skip setup of directories')
85 parser.add_argument(
'-o',
'--once', dest=
'once', action=
'store_true',
86 help=
'Execute just once time, instead of waiting until a Summary is produced.')
87 parser.add_argument(
'-s',
'--site', dest=
'site', type=str, default=
'kekcc',
88 help=
'Site to use [kekcc|kitekp|local]')
89 args = parser.parse_args()
93 def get_job_script(args, i):
95 Create a bash file which will be dispatched to the batch system.
96 The file will run basf2 on the provided MC or the previous output
97 using the provided steering file.
100 if [ -f "{args.directory}/jobs/{i}/basf2_input.root" ]; then
101 INPUT="{args.directory}/jobs/{i}/basf2_input.root"
103 INPUT="{args.directory}/jobs/{i}/input_*.root"
105 time basf2 -l error {args.directory}/collection/basf2_steering_file.py -i "$INPUT" \
106 -o {args.directory}/jobs/{i}/basf2_output.root &> my_output_hack.log || touch basf2_job_error
107 touch basf2_finished_successfully
114 Setup all directories, create job_scripts, split up MC into chunks
115 which are processed by each job. Create symlinks for databases.
117 os.chdir(args.directory)
123 if (y.startswith(
"http://")
or y.startswith(
"https://")):
126 data_files += glob.glob(y)
127 print(f
'Found {len(data_files)} MC files')
129 for file
in data_files:
130 file_sizes.append(os.stat(file).st_size)
131 data_files_sorted = [x
for _, x
in sorted(zip(file_sizes, data_files))]
132 n = int(len(data_files) / args.nJobs)
134 raise RuntimeError(f
'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
135 data_chunks = [data_files_sorted[i::args.nJobs]
for i
in range(args.nJobs)]
138 print(f
'Create environment in {args.directory}')
139 shutil.rmtree(
'collection', ignore_errors=
True)
140 shutil.rmtree(
'jobs', ignore_errors=
True)
141 os.mkdir(
'collection')
142 os.mkdir(
'collection/localdb')
145 if not os.path.isdir(args.large_dir):
146 raise RuntimeError(
'Large dir does not exist. Please make sure it does.')
148 shutil.copyfile(args.steering,
'collection/basf2_steering_file.py')
150 for i
in range(args.nJobs):
152 os.mkdir(f
'jobs/{i}')
153 with open(f
'jobs/{i}/basf2_script.sh',
'w')
as f:
154 f.write(get_job_script(args, i))
155 os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
157 for j, data_input
in enumerate(data_chunks[i]):
158 os.symlink(data_input, f
'jobs/{i}/input_{j}.root')
160 os.symlink(args.directory +
'/collection/localdb', f
'jobs/{i}/localdb')
163 def create_report(args):
165 Dumps Summary.pickle to JSON for easy inspection.
166 Create all the reports for the FEI training and the individual mva trainings.
167 This will only work if
168 1) Monitoring mode is used (see FeiConfiguration)
169 2) The system has enough memory to hold the training data for the mva classifiers
170 If this fails you can also copy the collection directory somewhere and
171 execute the commands by hand.
173 os.chdir(args.directory +
'/collection')
174 with open(
'Summary.pickle',
'rb')
as file:
175 summary = pickle.load(file)
177 summary_dict = {particle.identifier:
178 {
'mvaConfig': particle.mvaConfig._asdict(),
179 'channels': [{field: (value._asdict()
if field
in [
'mvaConfig',
'preCutConfig']
else value)
for
180 field, value
in channel._asdict().items()}
for channel
in particle.channels],
181 'preCutConfig': particle.preCutConfig._asdict(),
182 'postCutConfig': particle.postCutConfig._asdict()}
183 for particle
in summary[0]}
184 summary_dict.update({
'feiConfig': summary[1]._asdict()})
186 with open(
'Summary.json',
'w')
as summary_json_file:
187 json.dump(summary_dict, summary_json_file, indent=4)
189 ret = subprocess.call(
'basf2 fei/printReporting.py > ../summary.txt', shell=
True)
190 ret = subprocess.call(
'basf2 fei/latexReporting.py ../summary.tex', shell=
True)
191 for i
in glob.glob(
"*.xml"):
192 if not fei.core.Teacher.check_if_weightfile_is_fake(i):
193 subprocess.call(f
"basf2_mva_evaluate.py -id '{i[:-4]}.xml' -data 'training_input.root' "
194 f
"--treename '{i[:-4]} variables' -o '../{i[:-4]}.zip'", shell=
True)
195 os.chdir(args.directory)
199 def submit_job(args, i):
201 Submits a job to the desired batch system.
202 Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
203 EKP @ KIT, or your local machine
206 if os.path.isfile(args.directory +
'/collection/Summary.pickle'):
207 shutil.copyfile(args.directory +
'/collection/Summary.pickle', args.directory + f
'/jobs/{i}/Summary.pickle')
208 os.chdir(args.directory + f
'/jobs/{i}/')
209 if args.site ==
'kekcc':
210 ret = subprocess.call(
"bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
211 elif args.site ==
'kekcc2':
212 ret = subprocess.call(
"bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
213 elif args.site ==
'kitekp':
214 ret = subprocess.call(
"qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=
True)
215 elif args.site ==
'local':
216 subprocess.Popen([
'bash',
'./basf2_script.sh'])
219 raise RuntimeError(f
'Given site {args.site} is not supported')
220 os.chdir(args.directory)
224 def do_trainings(args):
226 Trains the multivariate classifiers for all available training data in
227 the collection directory, which wasn't trained yet.
228 This is called once per iteration
230 os.chdir(args.directory +
'/collection')
231 if not os.path.isfile(
'Summary.pickle'):
233 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
234 weightfiles = fei.do_trainings(particles, configuration)
235 for i
in range(args.nJobs):
236 for weightfile_on_disk, _
in weightfiles:
237 os.symlink(args.directory +
'/collection/' + weightfile_on_disk,
238 args.directory + f
'/jobs/{i}/' + weightfile_on_disk)
240 xmlfiles = glob.glob(
"*.xml")
241 for i
in range(args.nJobs):
242 for xmlfile
in xmlfiles:
243 if not os.path.isfile(args.directory + f
'/jobs/{i}/' + xmlfile):
244 print(
"Added missing symlink to ", xmlfile,
" in job directory ", i)
245 os.symlink(args.directory +
'/collection/' + xmlfile,
246 args.directory + f
'/jobs/{i}/' + xmlfile)
247 os.chdir(args.directory)
250 def jobs_finished(args):
252 Check if all jobs already finished.
253 Throws a runtime error of it detects an error in one of the jobs
255 finished = glob.glob(args.directory +
'/jobs/*/basf2_finished_successfully')
256 failed = glob.glob(args.directory +
'/jobs/*/basf2_job_error')
259 raise RuntimeError(f
'basf2 execution failed! Error occurred in: {str(failed)}')
261 return len(finished) == args.nJobs
264 def merge_root_files(args):
266 Merges all produced ROOT files of all jobs together
267 and puts the merged ROOT files into the collection directory.
269 - the training data for the multivariate classifiers
270 - the monitoring files
273 for f
in glob.glob(args.directory +
'/jobs/0/*.root'):
274 f = os.path.basename(f)
275 if f
in [
'basf2_input.root',
'basf2_output.root']:
277 if f.startswith(
'input_'):
280 if os.path.isfile(args.directory +
'/collection/' + f)
and not f ==
'training_input.root':
283 if len(rootfiles) == 0:
284 print(
'There are no root files to merge')
286 print(
'Merge the following files', rootfiles)
288 output = args.directory +
'/collection/' + f
289 inputs = [args.directory + f
'/jobs/{i}/' + f
for i
in range(args.nJobs)]
290 ret = subprocess.call([
'analysis-fei-mergefiles', output] + inputs)
292 raise RuntimeError(
'Error during merging root files')
295 if f ==
'mcParticlesCount.root':
298 os.symlink(output, i)
301 def update_input_files(args):
303 Updates the input files.
304 For the first iteration the input files are the MC provided by the user.
305 After each training this function replaces the input with the output of the previous iteration.
306 Effectively this caches the whole DataStore of basf2 between the iterations.
308 for i
in range(args.nJobs):
309 output_file = args.directory +
'/jobs/' + str(i) +
'/basf2_output.root'
310 input_file = args.directory +
'/jobs/' + str(i) +
'/basf2_input.root'
312 real_input_file = args.large_dir +
'/basf2_input_' + str(i) +
'.root'
313 shutil.move(output_file, real_input_file)
314 if os.path.isfile(input_file):
315 os.remove(input_file)
316 os.symlink(real_input_file, input_file)
318 shutil.move(output_file, input_file)
321 shutil.copyfile(args.directory +
'/jobs/0/Summary.pickle', args.directory +
'/collection/Summary.pickle')
324 def clean_job_directory(args):
326 Cleans the job directory for the next iteration
327 Meaning we remove all logs
329 files = glob.glob(args.directory +
'/jobs/*/basf2_finished_successfully')
330 files += glob.glob(args.directory +
'/jobs/*/error.log')
331 files += glob.glob(args.directory +
'/jobs/*/output.log')
336 def is_still_training(args):
338 Checks if the FEI training is still ongoing.
339 The training is finished if the FEI reached stage 7
341 os.chdir(args.directory +
'/collection')
342 if not os.path.isfile(
'Summary.pickle'):
344 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
345 os.chdir(args.directory)
346 return configuration.cache != 7
349 if __name__ ==
'__main__':
350 args = getCommandLineOptions()
352 os.chdir(args.directory)
358 print(
'Skipping setup')
360 if args.skip ==
'clean':
362 elif args.skip ==
'update':
364 elif args.skip ==
'merge':
366 elif args.skip ==
'wait':
368 elif args.skip ==
'submit':
370 elif args.skip ==
'resubmit':
372 elif args.skip ==
'report':
374 elif args.skip ==
'run':
377 raise RuntimeError(f
'Unknown skip parameter {args.skip}')
385 print(
'Submitting jobs')
386 for i
in range(args.nJobs):
391 error_file = args.directory + f
'/jobs/{i}/basf2_job_error'
392 success_file = args.directory + f
'/jobs/{i}/basf2_finished_successfully'
393 if os.path.isfile(error_file)
or not os.path.isfile(success_file):
394 print(f
"Delete {error_file} and resubmit job")
395 if os.path.isfile(error_file):
396 os.remove(error_file)
397 if os.path.isfile(success_file):
398 os.remove(success_file)
402 shutil.copyfile(os.path.join(args.directory,
'collection/Summary.pickle'),
403 os.path.join(args.directory, f
'jobs/{i}/Summary.pickle'))
404 if not submit_job(args, i):
405 raise RuntimeError(
'Error during submitting job')
408 print(
'Wait for jobs to end')
409 while not jobs_finished(args):
413 print(
'Merge ROOT files')
414 merge_root_files(args)
417 print(
'Update input files')
418 update_input_files(args)
421 print(
'Clean job directory')
422 clean_job_directory(args)
437 while is_still_training(args):
438 print(
'Do available trainings')
441 print(
'Submitting jobs')
442 for i
in range(args.nJobs):
443 if not submit_job(args, i):
444 raise RuntimeError(
'Error during submitting jobs')
446 print(
'Wait for jobs to end')
447 while not jobs_finished(args):
450 print(
'Merge ROOT files')
451 merge_root_files(args)
453 print(
'Update input files')
454 update_input_files(args)
456 print(
'Clean job directory')
457 clean_job_directory(args)
def parse_process_url(url)