12 This script can be used to train the FEI on a cluster like available at KEKCC
13 All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
14 The script will automatically create some directories
15 - collection containing weight files, monitoring files and other stuff
16 - jobs containing temporary files during the training (can be deleted afterwards)
18 The distributed script automatically spawns jobs on the cluster (or local machine),
19 and runs the steering file on the provided MC.
20 Since a FEI training requires multiple runs over the same MC, it does so multiple times.
21 The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
23 In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
26 At the end it produces summary outputs using printReporting.py and latexReporting.py
27 (this will only work if you use the monitoring mode)
28 In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
30 If your training fails for some reason (e.g. a job fails on the cluster),
31 the FEI will stop, you can fix the problem and resume the training using the -x option.
32 This requires some expert knowledge, because you have to know how to fix the occurred problem
33 and at which step you have to resume the training.
35 After the training the weight files will be stored in the localdb in the collection directory
36 You have to upload these local database to the Belle II Condition Database if you want to use the FEI everywhere.
37 Alternatively you can just copy the localdb to somewhere and use it directly.
40 python3 ~/release/analysis/scripts/fei/distributed.py
42 -f ~/release/analysis/examples/FEI/B_generic.py
43 -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
45 -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
47 $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
62 import b2biiConversion
66 def getCommandLineOptions():
67 """ Parses the command line options of the fei and returns the corresponding arguments. """
72 ROOT.PyConfig.IgnoreCommandLineOptions =
True
73 parser = argparse.ArgumentParser()
74 parser.add_argument(
'-f',
'--steeringFile', dest=
'steering', type=str, required=
True,
75 help=
'Steering file. Calls fei.get_path()')
76 parser.add_argument(
'-w',
'--workingDirectory', dest=
'directory', type=str, required=
True,
77 help=
'Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
78 parser.add_argument(
'-l',
'--largeDirectory', dest=
'large_dir', type=str, default=
'',
79 help=
'Directory to store large files')
80 parser.add_argument(
'-n',
'--nJobs', dest=
'nJobs', type=int, default=100,
81 help=
'Number of jobs')
82 parser.add_argument(
'-d',
'--data', dest=
'data', type=str, required=
True, action=
'append', nargs=
'+',
83 help=
'Data files in bash expansion syntax or as process_url')
84 parser.add_argument(
'-x',
'--skip-to', dest=
'skip', type=str, default=
'',
85 help=
'Skip setup of directories')
86 parser.add_argument(
'-o',
'--once', dest=
'once', action=
'store_true',
87 help=
'Execute just once time, instead of waiting until a Summary is produced.')
88 parser.add_argument(
'-s',
'--site', dest=
'site', type=str, default=
'kekcc',
89 help=
'Site to use [kekcc|kitekp|local]')
90 args = parser.parse_args()
94 def get_job_script(args, i):
96 Create a bash file which will be dispatched to the batch system.
97 The file will run basf2 on the provided MC or the previous output
98 using the provided steering file.
101 if [ -f "{args.directory}/jobs/{i}/basf2_input.root" ]; then
102 INPUT="{args.directory}/jobs/{i}/basf2_input.root"
104 INPUT="{args.directory}/jobs/{i}/input_*.root"
106 time basf2 -l error {args.directory}/collection/basf2_steering_file.py -i "$INPUT" \
107 -o {args.directory}/jobs/{i}/basf2_output.root &> my_output_hack.log || touch basf2_job_error
108 touch basf2_finished_successfully
115 Setup all directories, create job_scripts, split up MC into chunks
116 which are processed by each job. Create symlinks for databases.
118 os.chdir(args.directory)
124 if (y.startswith(
"http://")
or y.startswith(
"https://")):
127 data_files += glob.glob(y)
128 print(f
'Found {len(data_files)} MC files')
130 for file
in data_files:
131 file_sizes.append(os.stat(file).st_size)
132 data_files_sorted = [x
for _, x
in sorted(zip(file_sizes, data_files))]
133 n = int(len(data_files) / args.nJobs)
135 raise RuntimeError(f
'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
136 data_chunks = [data_files_sorted[i::args.nJobs]
for i
in range(args.nJobs)]
139 print(f
'Create environment in {args.directory}')
140 shutil.rmtree(
'collection', ignore_errors=
True)
141 shutil.rmtree(
'jobs', ignore_errors=
True)
142 os.mkdir(
'collection')
143 os.mkdir(
'collection/localdb')
146 if not os.path.isdir(args.large_dir):
147 raise RuntimeError(
'Large dir does not exist. Please make sure it does.')
149 shutil.copyfile(args.steering,
'collection/basf2_steering_file.py')
151 for i
in range(args.nJobs):
153 os.mkdir(f
'jobs/{i}')
154 with open(f
'jobs/{i}/basf2_script.sh',
'w')
as f:
155 f.write(get_job_script(args, i))
156 os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
158 for j, data_input
in enumerate(data_chunks[i]):
159 os.symlink(data_input, f
'jobs/{i}/input_{j}.root')
161 os.symlink(args.directory +
'/collection/localdb', f
'jobs/{i}/localdb')
164 def create_report(args):
166 Dumps Summary.pickle to JSON for easy inspection.
167 Create all the reports for the FEI training and the individual mva trainings.
168 This will only work if
169 1) Monitoring mode is used (see FeiConfiguration)
170 2) The system has enough memory to hold the training data for the mva classifiers
171 If this fails you can also copy the collection directory somewhere and
172 execute the commands by hand.
174 os.chdir(args.directory +
'/collection')
175 with open(
'Summary.pickle',
'rb')
as file:
176 summary = pickle.load(file)
178 summary_dict = {particle.identifier:
179 {
'mvaConfig': particle.mvaConfig._asdict(),
180 'channels': [{field: (value._asdict()
if field
in [
'mvaConfig',
'preCutConfig']
else value)
for
181 field, value
in channel._asdict().items()}
for channel
in particle.channels],
182 'preCutConfig': particle.preCutConfig._asdict(),
183 'postCutConfig': particle.postCutConfig._asdict()}
184 for particle
in summary[0]}
185 summary_dict.update({
'feiConfig': summary[1]._asdict()})
187 with open(
'Summary.json',
'w')
as summary_json_file:
188 json.dump(summary_dict, summary_json_file, indent=4)
190 ret = subprocess.call(
'basf2 fei/printReporting.py > ../summary.txt', shell=
True)
191 ret = subprocess.call(
'basf2 fei/latexReporting.py ../summary.tex', shell=
True)
192 for i
in glob.glob(
"*.xml"):
193 if not fei.core.Teacher.check_if_weightfile_is_fake(i):
194 subprocess.call(f
"basf2_mva_evaluate.py -id '{i[:-4]}.xml' -data 'training_input.root' "
195 f
"--treename '{i[:-4]} variables' -o '../{i[:-4]}.zip'", shell=
True)
196 os.chdir(args.directory)
200 def submit_job(args, i):
202 Submits a job to the desired batch system.
203 Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
204 EKP @ KIT, or your local machine
207 if os.path.isfile(args.directory +
'/collection/Summary.pickle'):
208 shutil.copyfile(args.directory +
'/collection/Summary.pickle', args.directory + f
'/jobs/{i}/Summary.pickle')
209 os.chdir(args.directory + f
'/jobs/{i}/')
210 if args.site ==
'kekcc':
211 ret = subprocess.call(
"bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
212 elif args.site ==
'kekcc2':
213 ret = subprocess.call(
"bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
214 elif args.site ==
'kitekp':
215 ret = subprocess.call(
"qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=
True)
216 elif args.site ==
'local':
217 subprocess.Popen([
'bash',
'./basf2_script.sh'])
220 raise RuntimeError(f
'Given site {args.site} is not supported')
221 os.chdir(args.directory)
225 def do_trainings(args):
227 Trains the multivariate classifiers for all available training data in
228 the collection directory, which wasn't trained yet.
229 This is called once per iteration
231 os.chdir(args.directory +
'/collection')
232 if not os.path.isfile(
'Summary.pickle'):
234 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
235 weightfiles = fei.do_trainings(particles, configuration)
236 for i
in range(args.nJobs):
237 for weightfile_on_disk, _
in weightfiles:
238 os.symlink(args.directory +
'/collection/' + weightfile_on_disk,
239 args.directory + f
'/jobs/{i}/' + weightfile_on_disk)
241 xmlfiles = glob.glob(
"*.xml")
242 for i
in range(args.nJobs):
243 for xmlfile
in xmlfiles:
244 if not os.path.isfile(args.directory + f
'/jobs/{i}/' + xmlfile):
245 print(
"Added missing symlink to ", xmlfile,
" in job directory ", i)
246 os.symlink(args.directory +
'/collection/' + xmlfile,
247 args.directory + f
'/jobs/{i}/' + xmlfile)
248 os.chdir(args.directory)
251 def jobs_finished(args):
253 Check if all jobs already finished.
254 Throws a runtime error of it detects an error in one of the jobs
256 finished = glob.glob(args.directory +
'/jobs/*/basf2_finished_successfully')
257 failed = glob.glob(args.directory +
'/jobs/*/basf2_job_error')
260 raise RuntimeError(f
'basf2 execution failed! Error occurred in: {str(failed)}')
262 return len(finished) == args.nJobs
265 def merge_root_files(args):
267 Merges all produced ROOT files of all jobs together
268 and puts the merged ROOT files into the collection directory.
270 - the training data for the multivariate classifiers
271 - the monitoring files
274 for f
in glob.glob(args.directory +
'/jobs/0/*.root'):
275 f = os.path.basename(f)
276 if f
in [
'basf2_input.root',
'basf2_output.root']:
278 if f.startswith(
'input_'):
281 if os.path.isfile(args.directory +
'/collection/' + f)
and not f ==
'training_input.root':
284 if len(rootfiles) == 0:
285 print(
'There are no root files to merge')
287 print(
'Merge the following files', rootfiles)
289 output = args.directory +
'/collection/' + f
290 inputs = [args.directory + f
'/jobs/{i}/' + f
for i
in range(args.nJobs)]
291 ret = subprocess.call([
'analysis-fei-mergefiles', output] + inputs)
293 raise RuntimeError(
'Error during merging root files')
296 if f ==
'mcParticlesCount.root':
299 os.symlink(output, i)
302 def update_input_files(args):
304 Updates the input files.
305 For the first iteration the input files are the MC provided by the user.
306 After each training this function replaces the input with the output of the previous iteration.
307 Effectively this caches the whole DataStore of basf2 between the iterations.
309 for i
in range(args.nJobs):
310 output_file = args.directory +
'/jobs/' + str(i) +
'/basf2_output.root'
311 input_file = args.directory +
'/jobs/' + str(i) +
'/basf2_input.root'
313 real_input_file = args.large_dir +
'/basf2_input_' + str(i) +
'.root'
314 shutil.move(output_file, real_input_file)
315 if os.path.isfile(input_file):
316 os.remove(input_file)
317 os.symlink(real_input_file, input_file)
319 shutil.move(output_file, input_file)
322 shutil.copyfile(args.directory +
'/jobs/0/Summary.pickle', args.directory +
'/collection/Summary.pickle')
325 def clean_job_directory(args):
327 Cleans the job directory for the next iteration
328 Meaning we remove all logs
330 files = glob.glob(args.directory +
'/jobs/*/basf2_finished_successfully')
331 files += glob.glob(args.directory +
'/jobs/*/error.log')
332 files += glob.glob(args.directory +
'/jobs/*/output.log')
337 def is_still_training(args):
339 Checks if the FEI training is still ongoing.
340 The training is finished if the FEI reached stage 7
342 os.chdir(args.directory +
'/collection')
343 if not os.path.isfile(
'Summary.pickle'):
345 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
346 os.chdir(args.directory)
347 return configuration.cache != 7
350 if __name__ ==
'__main__':
351 args = getCommandLineOptions()
353 os.chdir(args.directory)
359 print(
'Skipping setup')
361 if args.skip ==
'clean':
363 elif args.skip ==
'update':
365 elif args.skip ==
'merge':
367 elif args.skip ==
'wait':
369 elif args.skip ==
'submit':
371 elif args.skip ==
'resubmit':
373 elif args.skip ==
'report':
375 elif args.skip ==
'run':
378 raise RuntimeError(f
'Unknown skip parameter {args.skip}')
386 print(
'Submitting jobs')
387 for i
in range(args.nJobs):
392 error_file = args.directory + f
'/jobs/{i}/basf2_job_error'
393 success_file = args.directory + f
'/jobs/{i}/basf2_finished_successfully'
394 if os.path.isfile(error_file)
or not os.path.isfile(success_file):
395 print(f
"Delete {error_file} and resubmit job")
396 if os.path.isfile(error_file):
397 os.remove(error_file)
398 if os.path.isfile(success_file):
399 os.remove(success_file)
403 shutil.copyfile(os.path.join(args.directory,
'collection/Summary.pickle'),
404 os.path.join(args.directory, f
'jobs/{i}/Summary.pickle'))
405 if not submit_job(args, i):
406 raise RuntimeError(
'Error during submitting job')
409 print(
'Wait for jobs to end')
410 while not jobs_finished(args):
414 print(
'Merge ROOT files')
415 merge_root_files(args)
418 print(
'Update input files')
419 update_input_files(args)
422 print(
'Clean job directory')
423 clean_job_directory(args)
438 while is_still_training(args):
439 print(
'Do available trainings')
442 print(
'Submitting jobs')
443 for i
in range(args.nJobs):
444 if not submit_job(args, i):
445 raise RuntimeError(
'Error during submitting jobs')
447 print(
'Wait for jobs to end')
448 while not jobs_finished(args):
451 print(
'Merge ROOT files')
452 merge_root_files(args)
454 print(
'Update input files')
455 update_input_files(args)
457 print(
'Clean job directory')
458 clean_job_directory(args)
def parse_process_url(url)