12 This script can be used to train the FEI on a cluster like available at KEKCC
13 All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
14 The script will automatically create some directories
15 - collection containing weight files, monitoring files and other stuff
16 - jobs containing temporary files during the training (can be deleted afterwards)
18 The distributed script automatically spawns jobs on the cluster (or local machine),
19 and runs the steering file on the provided MC.
20 Since a FEI training requires multiple runs over the same MC, it does so multiple times.
21 The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
23 In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
26 At the end it produces summary outputs using printReporting.py and latexReporting.py
27 (this will only work if you use the monitoring mode)
28 In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
30 If your training fails for some reason (e.g. a job fails on the cluster),
31 the FEI will stop, you can fix the problem and resume the training using the -x option.
32 This requires some expert knowledge, because you have to know how to fix the occurred problem
33 and at which step you have to resume the training.
35 After the training the weight files will be stored in the localdb in the collection directory
36 You have to upload these local database to the Belle 2 Condition Database if you want to use the FEI everywhere.
37 Alternatively you can just copy the localdb to somewhere and use it directly.
40 python3 ~/release/analysis/scripts/fei/distributed.py
42 -f ~/release/analysis/examples/FEI/B_generic.py
43 -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
45 -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
47 $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
62 import b2biiConversion
66 def getCommandLineOptions():
67 """ Parses the command line options of the fei and returns the corresponding arguments. """
68 parser = argparse.ArgumentParser()
69 parser.add_argument(
'-f',
'--steeringFile', dest=
'steering', type=str, required=
True,
70 help=
'Steering file. Calls fei.get_path()')
71 parser.add_argument(
'-w',
'--workingDirectory', dest=
'directory', type=str, required=
True,
72 help=
'Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
73 parser.add_argument(
'-l',
'--largeDirectory', dest=
'large_dir', type=str, default=
'',
74 help=
'Directory to store large files')
75 parser.add_argument(
'-n',
'--nJobs', dest=
'nJobs', type=int, default=100,
76 help=
'Number of jobs')
77 parser.add_argument(
'-d',
'--data', dest=
'data', type=str, required=
True, action=
'append', nargs=
'+',
78 help=
'Data files in bash expansion syntax or as process_url')
79 parser.add_argument(
'-x',
'--skip-to', dest=
'skip', type=str, default=
'',
80 help=
'Skip setup of directories')
81 parser.add_argument(
'-o',
'--once', dest=
'once', action=
'store_true',
82 help=
'Execute just once time, instead of waiting until a Summary is produced.')
83 parser.add_argument(
'-s',
'--site', dest=
'site', type=str, default=
'kekcc',
84 help=
'Site to use [kekcc|kitekp|local]')
85 args = parser.parse_args()
89 def get_job_script(args, i):
91 Create a bash file which will be dispatched to the batch system.
92 The file will run basf2 on the provided MC or the previous output
93 using the provided steering file.
96 if [ -f "{args.directory}/jobs/{i}/basf2_input.root" ]; then
97 INPUT="{args.directory}/jobs/{i}/basf2_input.root"
99 INPUT="{args.directory}/jobs/{i}/input_*.root"
101 time basf2 -l error {args.directory}/collection/basf2_steering_file.py -i "$INPUT" \
102 -o {args.directory}/jobs/{i}/basf2_output.root &> my_output_hack.log || touch basf2_job_error
103 touch basf2_finished_successfully
110 Setup all directories, create job_scripts, split up MC into chunks
111 which are processed by each job. Create symlinks for databases.
113 os.chdir(args.directory)
119 if (y.startswith(
"http://")
or y.startswith(
"https://")):
122 data_files += glob.glob(y)
123 print(f
'Found {len(data_files)} MC files')
125 for file
in data_files:
126 file_sizes.append(os.stat(file).st_size)
127 data_files_sorted = [x
for _, x
in sorted(zip(file_sizes, data_files))]
128 n = int(len(data_files) / args.nJobs)
130 raise RuntimeError(f
'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
131 data_chunks = [data_files_sorted[i::args.nJobs]
for i
in range(args.nJobs)]
134 print(f
'Create environment in {args.directory}')
135 shutil.rmtree(
'collection', ignore_errors=
True)
136 shutil.rmtree(
'jobs', ignore_errors=
True)
137 os.mkdir(
'collection')
138 os.mkdir(
'collection/localdb')
141 if not os.path.isdir(args.large_dir):
142 raise RuntimeError(
'Large dir does not exist. Please make sure it does.')
144 shutil.copyfile(args.steering,
'collection/basf2_steering_file.py')
146 for i
in range(args.nJobs):
148 os.mkdir(f
'jobs/{i}')
149 with open(f
'jobs/{i}/basf2_script.sh',
'w')
as f:
150 f.write(get_job_script(args, i))
151 os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
153 for j, data_input
in enumerate(data_chunks[i]):
154 os.symlink(data_input, f
'jobs/{i}/input_{j}.root')
156 os.symlink(args.directory +
'/collection/localdb', f
'jobs/{i}/localdb')
159 def create_report(args):
161 Dumps Summary.pickle to JSON for easy inspection.
162 Create all the reports for the FEI training and the individual mva trainings.
163 This will only work if
164 1) Monitoring mode is used (see FeiConfiguration)
165 2) The system has enough memory to hold the training data for the mva classifiers
166 If this fails you can also copy the collection directory somewhere and
167 execute the commands by hand.
169 os.chdir(args.directory +
'/collection')
170 with open(
'Summary.pickle',
'rb')
as file:
171 summary = pickle.load(file)
173 summary_dict = {particle.identifier:
174 {
'mvaConfig': particle.mvaConfig._asdict(),
175 'channels': [{field: (value._asdict()
if field
in [
'mvaConfig',
'preCutConfig']
else value)
for
176 field, value
in channel._asdict().items()}
for channel
in particle.channels],
177 'preCutConfig': particle.preCutConfig._asdict(),
178 'postCutConfig': particle.postCutConfig._asdict()}
179 for particle
in summary[0]}
180 summary_dict.update({
'feiConfig': summary[1]._asdict()})
182 with open(
'Summary.json',
'w')
as summary_json_file:
183 json.dump(summary_dict, summary_json_file, indent=4)
185 ret = subprocess.call(
'basf2 fei/printReporting.py > ../summary.txt', shell=
True)
186 ret = subprocess.call(
'basf2 fei/latexReporting.py ../summary.tex', shell=
True)
187 for i
in glob.glob(
"*.xml"):
188 if not fei.core.Teacher.check_if_weightfile_is_fake(i):
189 subprocess.call(f
"basf2_mva_evaluate.py -id '{i[:-4]}.xml' -data 'training_input.root' "
190 f
"--treename '{i[:-4]} variables' -o '../{i[:-4]}.zip'", shell=
True)
191 os.chdir(args.directory)
195 def submit_job(args, i):
197 Submits a job to the desired batch system.
198 Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
199 EKP @ KIT, or your local machine
202 if os.path.isfile(args.directory +
'/collection/Summary.pickle'):
203 shutil.copyfile(args.directory +
'/collection/Summary.pickle', args.directory + f
'/jobs/{i}/Summary.pickle')
204 os.chdir(args.directory + f
'/jobs/{i}/')
205 if args.site ==
'kekcc':
206 ret = subprocess.call(
"bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
207 elif args.site ==
'kekcc2':
208 ret = subprocess.call(
"bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
209 elif args.site ==
'kitekp':
210 ret = subprocess.call(
"qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=
True)
211 elif args.site ==
'local':
212 subprocess.Popen([
'bash',
'./basf2_script.sh'])
215 raise RuntimeError(f
'Given site {args.site} is not supported')
216 os.chdir(args.directory)
220 def do_trainings(args):
222 Trains the multivariate classifiers for all available training data in
223 the collection directory, which wasn't trained yet.
224 This is called once per iteration
226 os.chdir(args.directory +
'/collection')
227 if not os.path.isfile(
'Summary.pickle'):
229 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
230 weightfiles = fei.do_trainings(particles, configuration)
231 for i
in range(args.nJobs):
232 for weightfile_on_disk, _
in weightfiles:
233 os.symlink(args.directory +
'/collection/' + weightfile_on_disk,
234 args.directory + f
'/jobs/{i}/' + weightfile_on_disk)
236 xmlfiles = glob.glob(
"*.xml")
237 for i
in range(args.nJobs):
238 for xmlfile
in xmlfiles:
239 if not os.path.isfile(args.directory + f
'/jobs/{i}/' + xmlfile):
240 print(
"Added missing symlink to ", xmlfile,
" in job directory ", i)
241 os.symlink(args.directory +
'/collection/' + xmlfile,
242 args.directory + f
'/jobs/{i}/' + xmlfile)
243 os.chdir(args.directory)
246 def jobs_finished(args):
248 Check if all jobs already finished.
249 Throws a runtime error of it detects an error in one of the jobs
251 finished = glob.glob(args.directory +
'/jobs/*/basf2_finished_successfully')
252 failed = glob.glob(args.directory +
'/jobs/*/basf2_job_error')
255 raise RuntimeError(f
'basf2 execution failed! Error occurred in: {str(failed)}')
257 return len(finished) == args.nJobs
260 def merge_root_files(args):
262 Merges all produced ROOT files of all jobs together
263 and puts the merged ROOT files into the collection directory.
265 - the training data for the multivariate classifiers
266 - the monitoring files
269 for f
in glob.glob(args.directory +
'/jobs/0/*.root'):
270 f = os.path.basename(f)
271 if f
in [
'basf2_input.root',
'basf2_output.root']:
273 if f.startswith(
'input_'):
276 if os.path.isfile(args.directory +
'/collection/' + f)
and not f ==
'training_input.root':
279 if len(rootfiles) == 0:
280 print(
'There are no root files to merge')
282 print(
'Merge the following files', rootfiles)
284 output = args.directory +
'/collection/' + f
285 inputs = [args.directory + f
'/jobs/{i}/' + f
for i
in range(args.nJobs)]
286 ret = subprocess.call([
'analysis-fei-mergefiles', output] + inputs)
288 raise RuntimeError(
'Error during merging root files')
291 if f ==
'mcParticlesCount.root':
294 os.symlink(output, i)
297 def update_input_files(args):
299 Updates the input files.
300 For the first iteration the input files are the MC provided by the user.
301 After each training this function replaces the input with the output of the previous iteration.
302 Effectively this caches the whole DataStore of basf2 between the iterations.
304 for i
in range(args.nJobs):
305 output_file = args.directory +
'/jobs/' + str(i) +
'/basf2_output.root'
306 input_file = args.directory +
'/jobs/' + str(i) +
'/basf2_input.root'
308 real_input_file = args.large_dir +
'/basf2_input_' + str(i) +
'.root'
309 shutil.move(output_file, real_input_file)
310 if os.path.isfile(input_file):
311 os.remove(input_file)
312 os.symlink(real_input_file, input_file)
314 shutil.move(output_file, input_file)
317 shutil.copyfile(args.directory +
'/jobs/0/Summary.pickle', args.directory +
'/collection/Summary.pickle')
320 def clean_job_directory(args):
322 Cleans the job directory for the next iteration
323 Meaning we remove all logs
325 files = glob.glob(args.directory +
'/jobs/*/basf2_finished_successfully')
326 files += glob.glob(args.directory +
'/jobs/*/error.log')
327 files += glob.glob(args.directory +
'/jobs/*/output.log')
332 def is_still_training(args):
334 Checks if the FEI training is still ongoing.
335 The training is finished if the FEI reached stage 7
337 os.chdir(args.directory +
'/collection')
338 if not os.path.isfile(
'Summary.pickle'):
340 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
341 os.chdir(args.directory)
342 return configuration.cache != 7
345 if __name__ ==
'__main__':
346 args = getCommandLineOptions()
348 os.chdir(args.directory)
354 print(
'Skipping setup')
356 if args.skip ==
'clean':
358 elif args.skip ==
'update':
360 elif args.skip ==
'merge':
362 elif args.skip ==
'wait':
364 elif args.skip ==
'submit':
366 elif args.skip ==
'resubmit':
368 elif args.skip ==
'report':
370 elif args.skip ==
'run':
373 raise RuntimeError(f
'Unknown skip parameter {args.skip}')
381 print(
'Submitting jobs')
382 for i
in range(args.nJobs):
387 error_file = args.directory + f
'/jobs/{i}/basf2_job_error'
388 success_file = args.directory + f
'/jobs/{i}/basf2_finished_successfully'
389 if os.path.isfile(error_file)
or not os.path.isfile(success_file):
390 print(f
"Delete {error_file} and resubmit job")
391 if os.path.isfile(error_file):
392 os.remove(error_file)
393 if os.path.isfile(success_file):
394 os.remove(success_file)
398 shutil.copyfile(os.path.join(args.directory,
'collection/Summary.pickle'),
399 os.path.join(args.directory, f
'jobs/{i}/Summary.pickle'))
400 if not submit_job(args, i):
401 raise RuntimeError(
'Error during submitting job')
404 print(
'Wait for jobs to end')
405 while not jobs_finished(args):
409 print(
'Merge ROOT files')
410 merge_root_files(args)
413 print(
'Update input files')
414 update_input_files(args)
417 print(
'Clean job directory')
418 clean_job_directory(args)
433 while is_still_training(args):
434 print(
'Do available trainings')
437 print(
'Submitting jobs')
438 for i
in range(args.nJobs):
439 if not submit_job(args, i):
440 raise RuntimeError(
'Error during submitting jobs')
442 print(
'Wait for jobs to end')
443 while not jobs_finished(args):
446 print(
'Merge ROOT files')
447 merge_root_files(args)
449 print(
'Update input files')
450 update_input_files(args)
452 print(
'Clean job directory')
453 clean_job_directory(args)
def parse_process_url(url)