7 This script can be used to train the FEI on a cluster like available at KEKCC
8 All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
9 The script will automatically create some directories
10 - collection containing weight files, monitoring files and other stuff
11 - jobs containing temporary files during the training (can be deleted afterwards)
13 The distributed script automatically spawns jobs on the cluster (or local machine),
14 and runs the steering file on the provided MC.
15 Since a FEI training requires multiple runs over the same MC, it does so multiple times.
16 The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
18 In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
21 At the end it produces summary outputs using printReporting.py and latexReporting.py
22 (this will only work if you use the monitoring mode)
23 In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
25 If your training fails for some reason (e.g. a job fails on the cluster),
26 the FEI will stop, you can fix the problem and resume the training using the -x option.
27 This requires some expert knowledge, because you have to know how to fix the occurred problem
28 and at which step you have to resume the training.
30 After the training the weight files will be stored in the localdb in the collection directory
31 You have to upload these local database to the Belle 2 Condition Database if you want to use the FEI everywhere.
32 Alternatively you can just copy the localdb to somewhere and use it directly.
35 python3 ~/release/analysis/scripts/fei/distributed.py
37 -f ~/release/analysis/examples/FEI/B_generic.py
38 -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
40 -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
42 $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
57 import b2biiConversion
61 def getCommandLineOptions():
62 """ Parses the command line options of the fei and returns the corresponding arguments. """
63 parser = argparse.ArgumentParser()
64 parser.add_argument(
'-f',
'--steeringFile', dest=
'steering', type=str, required=
True,
65 help=
'Steering file. Calls fei.get_path()')
66 parser.add_argument(
'-w',
'--workingDirectory', dest=
'directory', type=str, required=
True,
67 help=
'Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
68 parser.add_argument(
'-l',
'--largeDirectory', dest=
'large_dir', type=str, default=
'',
69 help=
'Directory to store large files')
70 parser.add_argument(
'-n',
'--nJobs', dest=
'nJobs', type=int, default=100,
71 help=
'Number of jobs')
72 parser.add_argument(
'-d',
'--data', dest=
'data', type=str, required=
True, action=
'append', nargs=
'+',
73 help=
'Data files in bash expansion syntax or as process_url')
74 parser.add_argument(
'-x',
'--skip-to', dest=
'skip', type=str, default=
'',
75 help=
'Skip setup of directories')
76 parser.add_argument(
'-o',
'--once', dest=
'once', action=
'store_true',
77 help=
'Execute just once time, instead of waiting until a Summary is produced.')
78 parser.add_argument(
'-s',
'--site', dest=
'site', type=str, default=
'kekcc',
79 help=
'Site to use [kekcc|kitekp|local]')
80 args = parser.parse_args()
84 def get_job_script(args, i):
86 Create a bash file which will be dispatched to the batch system.
87 The file will run basf2 on the provided MC or the previous output
88 using the provided steering file.
91 if [ -f "{args.directory}/jobs/{i}/basf2_input.root" ]; then
92 INPUT="{args.directory}/jobs/{i}/basf2_input.root"
94 INPUT="{args.directory}/jobs/{i}/input_*.root"
96 time basf2 -l error {args.directory}/collection/basf2_steering_file.py -i "$INPUT" \
97 -o {args.directory}/jobs/{i}/basf2_output.root &> my_output_hack.log || touch basf2_job_error
98 touch basf2_finished_successfully
105 Setup all directories, create job_scripts, split up MC into chunks
106 which are processed by each job. Create symlinks for databases.
108 os.chdir(args.directory)
114 if (y.startswith(
"http://")
or y.startswith(
"https://")):
117 data_files += glob.glob(y)
118 print(f
'Found {len(data_files)} MC files')
120 for file
in data_files:
121 file_sizes.append(os.stat(file).st_size)
122 data_files_sorted = [x
for _, x
in sorted(zip(file_sizes, data_files))]
123 n = int(len(data_files) / args.nJobs)
125 raise RuntimeError(f
'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
126 data_chunks = [data_files_sorted[i::args.nJobs]
for i
in range(args.nJobs)]
129 print(f
'Create environment in {args.directory}')
130 shutil.rmtree(
'collection', ignore_errors=
True)
131 shutil.rmtree(
'jobs', ignore_errors=
True)
132 os.mkdir(
'collection')
133 os.mkdir(
'collection/localdb')
136 if not os.path.isdir(args.large_dir):
137 raise RuntimeError(
'Large dir does not exist. Please make sure it does.')
139 shutil.copyfile(args.steering,
'collection/basf2_steering_file.py')
141 for i
in range(args.nJobs):
143 os.mkdir(f
'jobs/{i}')
144 with open(f
'jobs/{i}/basf2_script.sh',
'w')
as f:
145 f.write(get_job_script(args, i))
146 os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
148 for j, data_input
in enumerate(data_chunks[i]):
149 os.symlink(data_input, f
'jobs/{i}/input_{j}.root')
151 os.symlink(args.directory +
'/collection/localdb', f
'jobs/{i}/localdb')
154 def create_report(args):
156 Dumps Summary.pickle to JSON for easy inspection.
157 Create all the reports for the FEI training and the individual mva trainings.
158 This will only work if
159 1) Monitoring mode is used (see FeiConfiguration)
160 2) The system has enough memory to hold the training data for the mva classifiers
161 If this fails you can also copy the collection directory somewhere and
162 execute the commands by hand.
164 os.chdir(args.directory +
'/collection')
165 with open(
'Summary.pickle',
'rb')
as file:
166 summary = pickle.load(file)
168 summary_dict = {particle.identifier:
169 {
'mvaConfig': particle.mvaConfig._asdict(),
170 'channels': [{field: (value._asdict()
if field
in [
'mvaConfig',
'preCutConfig']
else value)
for
171 field, value
in channel._asdict().items()}
for channel
in particle.channels],
172 'preCutConfig': particle.preCutConfig._asdict(),
173 'postCutConfig': particle.postCutConfig._asdict()}
174 for particle
in summary[0]}
175 summary_dict.update({
'feiConfig': summary[1]._asdict()})
177 with open(
'Summary.json',
'w')
as summary_json_file:
178 json.dump(summary_dict, summary_json_file, indent=4)
180 ret = subprocess.call(
'basf2 fei/printReporting.py > ../summary.txt', shell=
True)
181 ret = subprocess.call(
'basf2 fei/latexReporting.py ../summary.tex', shell=
True)
182 for i
in glob.glob(
"*.xml"):
183 if not fei.core.Teacher.check_if_weightfile_is_fake(i):
184 subprocess.call(f
"basf2_mva_evaluate.py -id '{i[:-4]}.xml' -data '{i[:-4]}.root' "
185 f
"--treename variables -o '../{i[:-4]}.zip'", shell=
True)
186 os.chdir(args.directory)
190 def submit_job(args, i):
192 Submits a job to the desired batch system.
193 Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
194 EKP @ KIT, or your local machine
197 if os.path.isfile(args.directory +
'/collection/Summary.pickle'):
198 shutil.copyfile(args.directory +
'/collection/Summary.pickle', args.directory + f
'/jobs/{i}/Summary.pickle')
199 os.chdir(args.directory + f
'/jobs/{i}/')
200 if args.site ==
'kekcc':
201 ret = subprocess.call(
"bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
202 elif args.site ==
'kekcc2':
203 ret = subprocess.call(
"bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
204 elif args.site ==
'kitekp':
205 ret = subprocess.call(
"qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=
True)
206 elif args.site ==
'local':
207 subprocess.Popen([
'bash',
'./basf2_script.sh'])
210 raise RuntimeError(f
'Given site {args.site} is not supported')
211 os.chdir(args.directory)
215 def do_trainings(args):
217 Trains the multivariate classifiers for all available training data in
218 the collection directory, which wasn't trained yet.
219 This is called once per iteration
221 os.chdir(args.directory +
'/collection')
222 if not os.path.isfile(
'Summary.pickle'):
224 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
225 weightfiles = fei.do_trainings(particles, configuration)
226 for i
in range(args.nJobs):
227 for weightfile_on_disk, _
in weightfiles:
228 os.symlink(args.directory +
'/collection/' + weightfile_on_disk,
229 args.directory + f
'/jobs/{i}/' + weightfile_on_disk)
231 xmlfiles = glob.glob(
"*.xml")
232 for i
in range(args.nJobs):
233 for xmlfile
in xmlfiles:
234 if not os.path.isfile(args.directory + f
'/jobs/{i}/' + xmlfile):
235 print(
"Added missing symlink to ", xmlfile,
" in job directory ", i)
236 os.symlink(args.directory +
'/collection/' + xmlfile,
237 args.directory + f
'/jobs/{i}/' + xmlfile)
238 os.chdir(args.directory)
241 def jobs_finished(args):
243 Check if all jobs already finished.
244 Throws a runtime error of it detects an error in one of the jobs
246 finished = glob.glob(args.directory +
'/jobs/*/basf2_finished_successfully')
247 failed = glob.glob(args.directory +
'/jobs/*/basf2_job_error')
250 raise RuntimeError(f
'basf2 execution failed! Error occurred in: {str(failed)}')
252 return len(finished) == args.nJobs
255 def merge_root_files(args):
257 Merges all produced ROOT files of all jobs together
258 and puts the merged ROOT files into the collection directory.
260 - the training data for the multivariate classifiers
261 - the monitoring files
264 for f
in glob.glob(args.directory +
'/jobs/0/*.root'):
265 f = os.path.basename(f)
266 if f
in [
'basf2_input.root',
'basf2_output.root']:
268 if f.startswith(
'input_'):
270 if os.path.isfile(args.directory +
'/collection/' + f):
273 if len(rootfiles) == 0:
274 print(
'There are no root files to merge')
276 print(
'Merge the following files', rootfiles)
278 output = args.directory +
'/collection/' + f
279 inputs = [args.directory + f
'/jobs/{i}/' + f
for i
in range(args.nJobs)]
280 ret = subprocess.call([
'fei_merge_files', output] + inputs)
282 raise RuntimeError(
'Error during merging root files')
285 if f ==
'mcParticlesCount.root':
288 os.symlink(output, i)
291 def update_input_files(args):
293 Updates the input files.
294 For the first iteration the input files are the MC provided by the user.
295 After each training this function replaces the input with the output of the previous iteration.
296 Effectively this caches the whole DataStore of basf2 between the iterations.
298 for i
in range(args.nJobs):
299 output_file = args.directory +
'/jobs/' + str(i) +
'/basf2_output.root'
300 input_file = args.directory +
'/jobs/' + str(i) +
'/basf2_input.root'
302 real_input_file = args.large_dir +
'/basf2_input_' + str(i) +
'.root'
303 shutil.move(output_file, real_input_file)
304 if os.path.isfile(input_file):
305 os.remove(input_file)
306 os.symlink(real_input_file, input_file)
308 shutil.move(output_file, input_file)
311 shutil.copyfile(args.directory +
'/jobs/0/Summary.pickle', args.directory +
'/collection/Summary.pickle')
314 def clean_job_directory(args):
316 Cleans the job directory for the next iteration
317 Meaning we remove all logs
319 files = glob.glob(args.directory +
'/jobs/*/basf2_finished_successfully')
320 files += glob.glob(args.directory +
'/jobs/*/error.log')
321 files += glob.glob(args.directory +
'/jobs/*/output.log')
326 def is_still_training(args):
328 Checks if the FEI training is still ongoing.
329 The training is finished if the FEI reached stage 7
331 os.chdir(args.directory +
'/collection')
332 if not os.path.isfile(
'Summary.pickle'):
334 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
335 os.chdir(args.directory)
336 return configuration.cache != 7
339 if __name__ ==
'__main__':
340 args = getCommandLineOptions()
342 os.chdir(args.directory)
348 print(
'Skipping setup')
350 if args.skip ==
'clean':
352 elif args.skip ==
'update':
354 elif args.skip ==
'merge':
356 elif args.skip ==
'wait':
358 elif args.skip ==
'submit':
360 elif args.skip ==
'resubmit':
362 elif args.skip ==
'report':
364 elif args.skip ==
'run':
367 raise RuntimeError(f
'Unknown skip parameter {args.skip}')
376 print(
'Submitting jobs')
377 for i
in range(args.nJobs):
382 error_file = args.directory + f
'/jobs/{i}/basf2_job_error'
383 success_file = args.directory + f
'/jobs/{i}/basf2_finished_successfully'
384 if os.path.isfile(error_file)
or not os.path.isfile(success_file):
385 print(f
"Delete {error_file} and resubmit job")
386 if os.path.isfile(error_file):
387 os.remove(error_file)
388 if os.path.isfile(success_file):
389 os.remove(success_file)
393 shutil.copyfile(os.path.join(args.directory,
'collection/Summary.pickle'),
394 os.path.join(args.directory, f
'jobs/{i}/Summary.pickle'))
395 if not submit_job(args, i):
396 raise RuntimeError(
'Error during submitting job')
399 print(
'Wait for jobs to end')
400 while not jobs_finished(args):
404 print(
'Merge ROOT files')
405 merge_root_files(args)
408 print(
'Update input files')
409 update_input_files(args)
412 print(
'Clean job directory')
413 clean_job_directory(args)
428 while is_still_training(args):
429 print(
'Do available trainings')
432 print(
'Submitting jobs')
433 for i
in range(args.nJobs):
434 if not submit_job(args, i):
435 raise RuntimeError(
'Error during submitting jobs')
437 print(
'Wait for jobs to end')
438 while not jobs_finished(args):
441 print(
'Merge ROOT files')
442 merge_root_files(args)
444 print(
'Update input files')
445 update_input_files(args)
447 print(
'Clean job directory')
448 clean_job_directory(args)