12 This script can be used to train the FEI on a cluster like available at KEKCC
13 All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
14 The script will automatically create some directories
15 - collection containing weight files, monitoring files and other stuff
16 - jobs containing temporary files during the training (can be deleted afterwards)
18 The distributed script automatically spawns jobs on the cluster (or local machine),
19 and runs the steering file on the provided MC.
20 Since a FEI training requires multiple runs over the same MC, it does so multiple times.
21 The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
23 In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
26 At the end it produces summary outputs using printReporting.py and latexReporting.py
27 (this will only work if you use the monitoring mode)
28 In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
30 If your training fails for some reason (e.g. a job fails on the cluster),
31 the FEI will stop, you can fix the problem and resume the training using the -x option.
32 This requires some expert knowledge, because you have to know how to fix the occurred problem
33 and at which step you have to resume the training.
35 After the training the weight files will be stored in the localdb in the collection directory
36 You have to upload these local database to the Belle II Condition Database if you want to use the FEI everywhere.
37 Alternatively you can just copy the localdb to somewhere and use it directly.
40 python3 ~/release/analysis/scripts/fei/distributed.py
42 -f ~/release/analysis/examples/FEI/B_generic.py
43 -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
45 -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
47 $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
67def getCommandLineOptions():
68 """ Parses the command line options of the fei and returns the corresponding arguments. """
73 ROOT.PyConfig.IgnoreCommandLineOptions =
True
74 parser = argparse.ArgumentParser()
75 parser.add_argument(
'-f',
'--steeringFile', dest=
'steering', type=str, required=
True,
76 help=
'Steering file. Calls fei.get_path()')
77 parser.add_argument(
'-w',
'--workingDirectory', dest=
'directory', type=str, required=
True,
78 help=
'Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
79 parser.add_argument(
'-l',
'--largeDirectory', dest=
'large_dir', type=str, default=
'',
80 help=
'Directory to store large files')
81 parser.add_argument(
'-n',
'--nJobs', dest=
'nJobs', type=int, default=100,
82 help=
'Number of jobs')
83 parser.add_argument(
'-d',
'--data', dest=
'data', type=str, required=
True, action=
'append', nargs=
'+',
84 help=
'Data files in bash expansion syntax or as process_url')
85 parser.add_argument(
'-x',
'--skip-to', dest=
'skip', type=str, default=
'',
86 help=
'Skip setup of directories')
87 parser.add_argument(
'-o',
'--once', dest=
'once', action=
'store_true',
88 help=
'Execute just once time, instead of waiting until a Summary is produced.')
89 parser.add_argument(
'-s',
'--site', dest=
'site', type=str, default=
'kekcc',
90 help=
'Site to use [kekcc|kitekp|local]')
91 parser.add_argument(
'-e',
'--end', dest=
'end', type=int, default=6,
92 help=
'Stage at which to end the training')
93 parser.add_argument(
'-r',
'--retrain', dest=
'retrain', type=int, default=-1,
94 help=
'Stage at which to retrain the training')
95 parser.add_argument(
'-v',
'--validation', dest=
'validation', type=float,
96 default=0.2, help=
'Fraction of data to use for validation')
97 args = parser.parse_args()
103 Create a bash file which will be dispatched to the batch system.
104 The file will run basf2 on the provided MC or the previous output
105 using the provided steering file.
108 if [ -f "basf2_input.root" ]; then
109 INPUT="basf2_input.root"
113 time basf2 -l error ../../collection/basf2_steering_file.py -i "$INPUT" \
114 -o basf2_output.root &> my_output_hack.log || touch basf2_job_error
115 touch basf2_finished_successfully
120def setup(args, fullOverwrite=True):
122 Setup all directories, create job_scripts, split up MC into chunks
123 which are processed by each job. Create symlinks for databases.
125 print(f
'FEI-distributed-setup: Setup environment in {args.directory} with overwrite {fullOverwrite}')
126 currDir = os.getcwd()
128 os.chdir(args.directory)
134 if (y.startswith(
"http://")
or y.startswith(
"https://")):
137 data_files += glob.glob(y)
138 print(f
'FEI-distributed-setup: Found {len(data_files)} MC files')
140 for file
in data_files:
141 file_sizes.append(os.stat(file).st_size)
142 data_files_sorted = [x
for _, x
in sorted(zip(file_sizes, data_files))]
143 n = int(len(data_files) / args.nJobs)
145 raise RuntimeError(f
'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
146 data_chunks = [data_files_sorted[i::args.nJobs]
for i
in range(args.nJobs)]
149 print(f
'FEI-distributed-setup: Create environment in {args.directory}')
151 shutil.rmtree(
'collection', ignore_errors=
True)
152 os.mkdir(
'collection')
153 if os.path.isfile(
'collection/Summary.pickle'):
154 os.remove(
'collection/Summary.pickle')
155 os.mkdir(
'collection/localdb')
156 shutil.rmtree(
'jobs', ignore_errors=
True)
159 if not os.path.isdir(args.large_dir):
160 raise RuntimeError(
'FEI-distributed-setup: Large dir does not exist. Please make sure it does.')
162 shutil.copyfile(args.steering,
'collection/basf2_steering_file.py')
164 for i
in range(args.nJobs):
166 os.mkdir(f
'jobs/{i}')
167 job_script = get_job_script()
168 with open(f
'jobs/{i}/basf2_script.sh',
'w')
as f:
170 os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
172 for j, data_input
in enumerate(data_chunks[i]):
173 os.symlink(data_input, f
'jobs/{i}/input_{j}.root')
175 os.symlink(
'../../collection/localdb', f
'jobs/{i}/localdb')
179def create_report(args):
181 Dumps Summary.pickle to JSON for easy inspection.
182 Create all the reports for the FEI training and the individual mva trainings.
183 This will only work if
184 1) Monitoring mode is used (see FeiConfiguration)
185 2) The system has enough memory to hold the training data for the mva classifiers
186 If this fails you can also copy the collection directory somewhere and
187 execute the commands by hand.
190 print(
'FEI-distributed-report: Create report')
191 currDir = os.getcwd()
193 os.chdir(f
'{args.directory}/collection')
194 with open(
'Summary.pickle',
'rb')
as file:
195 summary = pickle.load(file)
197 summary_dict = {particle.identifier:
198 {
'mvaConfig': particle.mvaConfig._asdict(),
199 'channels': [{field: (value._asdict()
if field
in [
'mvaConfig',
'preCutConfig']
else value)
for
200 field, value
in channel._asdict().items()}
for channel
in particle.channels],
201 'preCutConfig': particle.preCutConfig._asdict(),
202 'postCutConfig': particle.postCutConfig._asdict()}
203 for particle
in summary[0]}
204 summary_dict.update({
'feiConfig': summary[1]._asdict()})
206 with open(
'Summary.json',
'w')
as summary_json_file:
207 json.dump(summary_dict, summary_json_file, indent=4)
208 print(
'FEI-distributed-report: read Summary.pickle and wrote Summary.json')
210 ret = subprocess.call(
'basf2 fei/printReporting.py summary.txt', shell=
True)
212 raise RuntimeError(
'Error during printReporting.py')
214 print(
'FEI-distributed-report: Created summary.txt')
215 ret = subprocess.call(
'basf2 fei/latexReporting.py summary.tex', shell=
True)
217 raise RuntimeError(
'Error during latexReporting.py')
219 print(
'FEI-distributed-report: Created summary.tex')
221 print(
'FEI-distributed-report: Creating *.zip files for the mva evaluation.')
222 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
223 stage_particles = fei.core.get_stages_from_particles(particles)
225 for istage
in range(len(stage_particles)):
226 stage = stage_particles[istage]
227 for particle
in stage:
228 for channel
in particle.channels:
229 if basf2_mva.available(f
'{channel.label}.xml')
and not fei.core.Teacher.check_if_weightfile_is_fake(
230 f
'{channel.label}.xml'):
231 treeName = ROOT.Belle2.MakeROOTCompatible.makeROOTCompatible(f
'{channel.label} variables')
233 result = subprocess.run(
234 f
"basf2_mva_evaluate.py -id '{channel.label}.xml' -train 'training_input.root' "
235 f
"-data 'validation_input.root' "
236 f
"--treename '{treeName}' "
237 f
"-o '../{channel.label}.zip'",
242 print(f
"FEi-distributed-report: Created {channel.label}.zip, success status: ")
244 print(f
"FEi-distributed-report: err-output evaluation of {channel.label}.xml")
246 except subprocess.CalledProcessError
as e:
247 print(f
"FEi-distributed-report: Error during evaluation of {channel.label}.xml")
251 print(f
"FEi-distributed-report: Skipping evaluation of {channel.label}.xml as it is a fake weight file")
253 print(
'FEI-distributed-report: DONE creating reports')
257def remove_objects(file_path, objectNames):
259 Remove objects from a ROOT file
261 if os.path.islink(file_path):
262 file_path = os.readlink(file_path)
264 root_file = ROOT.TFile.Open(file_path,
"UPDATE")
265 key_names = [key.GetName()
for key
in root_file.GetListOfKeys()]
267 print(f
'All keys in {file_path}: {key_names}')
268 for obj_name
in key_names:
269 if any([ROOT.Belle2.MakeROOTCompatible.makeROOTCompatible(objectName) == obj_name
for objectName
in objectNames]):
270 print(f
'Removed {obj_name}')
271 root_file.Delete(f
'{obj_name};*')
272 root_file.Write(
"", ROOT.TObject.kOverwrite)
276def remove_helper(args, channelLabel, configuration):
278 Helper function to remove all files related to a channelLabel
281 os.chdir(f
'{args.directory}/collection')
282 print(f
'FEI-REtrain: Cleaning {channelLabel}')
284 removeFiles = glob.glob(f
'{channelLabel}*')
285 removeFiles += glob.glob(f
'{channelLabel.split(" ")[0]}*.png')
286 removeFiles += glob.glob(f
'../{channelLabel}*.zip')
287 print(f
'FEI-REtrain: Removing {channelLabel}* files in total {len(removeFiles)}')
288 for f
in removeFiles:
292 os.chdir(args.directory)
295 symlinks = glob.glob(f
'jobs/*/{channelLabel}.xml')
296 print(f
'FEI-REtrain: Removing symlinks for {channelLabel} in total {len(symlinks)}')
302def clean_higher_stages(args):
304 Cleans the higher stages of the training.
305 This is needed if you want to retrain the BDTs
307 print(
'FEI-distributed-clean_higher_stages: Cleaning higher stages')
308 currDir = os.getcwd()
309 os.chdir(f
'{args.directory}/collection')
312 stages = fei.core.get_stages_from_particles(particles)
313 channels_to_remove = []
315 xmlfiles = glob.glob(
"*.xml")
316 for i
in range(args.retrain-1, len(stages)):
317 print(f
'FEI-distributed-clean_higher_stages: Cleaning stage {i}')
318 for particle
in stages[i]:
319 for channel
in particle.channels:
320 if f
'{channel.label}.xml' in xmlfiles:
321 xmlfiles.remove(f
'{channel.label}.xml')
322 remove_helper(args, channel.label, configuration)
324 channels_to_remove.append(channel.label)
325 print(f
"FEI-REtrain: Channels to remove: {channels_to_remove}")
328 print(f
'FEI-REtrain: {xf} will not be retrained!')
332 for i
in range(args.retrain-1, len(stages)):
333 for particle
in stages[i]:
334 partIds.append(particle.identifier)
335 if "J/psi" in particle.identifier:
336 partIds.append(particle.identifier.replace(
"J/psi",
"Jpsi"))
337 print(f
'FEI-REtrain: particles to remove: {partIds}')
339 root_files = glob.glob(
'*.root')
341 if any([x
in f
for x
in [
'Monitor_PostReconstruction_AfterMVA',
'Monitor_Pre',
'Monitor_TrainingData']]):
342 print(f
'FEI-REtrain: Removing branches in {f}')
343 remove_objects(f, objectNames=channels_to_remove)
344 elif 'Monitor_PostReco' in f:
345 print(f
'FEI-REtrain: Removing branches in {f}')
346 remove_objects(f, objectNames=partIds)
347 elif 'Monitor_Final' in f:
348 print(f
'FEI-REtrain: Removing branches in {f}')
349 remove_objects(f, objectNames=[f
'{partId} variables' for partId
in partIds])
350 elif any([x
in f
for x
in [
'training_input',
'validation_input']]):
351 print(f
'FEI-REtrain: Removing branches in {f}')
352 remove_objects(f, objectNames=[f
'{ch} variables' for ch
in channels_to_remove])
357 os.chdir(args.directory)
358 root_files = glob.glob(
'jobs/*/*.root')
359 print(
'FEI-REtrain: Removing job root files (not mcParticlesCount.root, basf2_input.root or basf2_output.root)')
361 if 'mcParticlesCount.root' not in f
and 'basf2_input.root' not in f
and 'basf2_output.root' not in f:
364 print(
'FEI-distributed-clean_higher_stages: Done cleaning higher stages')
367def clean_monitoring(args, ijob=None):
369 Cleans the monitoring files in the jobs directory and the training_input.root files
372 files = glob.glob(
'jobs/*/Monitor_*.root')
373 files += glob.glob(
'jobs/*/training_input.root')
375 files = glob.glob(f
'jobs/{ijob}/Monitor_*.root')
376 files += glob.glob(f
'jobs/{ijob}/training_input.root')
382def clean_job_directory(args):
384 Cleans the job directory for the next iteration
385 Meaning we remove all logs
387 files = glob.glob(
'jobs/*/basf2_finished_successfully')
388 files += glob.glob(
'jobs/*/error.log')
389 files += glob.glob(
'jobs/*/output.log')
390 files += glob.glob(
'jobs/*/basf2_job_error')
393 for i
in range(args.nJobs):
394 nHackLogs = len(glob.glob(f
'jobs/{i}/my_output_hack.log.backup_*'))
395 if os.path.isfile(f
'jobs/{i}/my_output_hack.log'):
396 os.rename(f
'jobs/{i}/my_output_hack.log', f
'jobs/{i}/my_output_hack.log.backup_{nHackLogs}')
399def submit_job(args, i):
401 Submits a job to the desired batch system.
402 Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
403 EKP @ KIT, or your local machine
406 currDir = os.getcwd()
407 os.chdir(args.directory)
408 if os.path.isfile(
'collection/Summary.pickle'):
409 shutil.copyfile(
'collection/Summary.pickle', f
'jobs/{i}/Summary.pickle')
411 os.chdir(f
'{args.directory}/jobs/{i}/')
412 if args.site ==
'kekcc':
413 ret = subprocess.call(
"bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
414 elif args.site ==
'kekccs':
415 ret = subprocess.call(
"bsub -q s -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
416 elif 'kekccX' in args.site:
417 ret = subprocess.call(f
"bsub -q l -n {args.site[6:]} -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
418 elif args.site ==
'kekcc2':
419 ret = subprocess.call(
"bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=
True)
420 elif args.site ==
'kitekp':
421 ret = subprocess.call(
"qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=
True)
422 elif args.site ==
'local':
423 subprocess.Popen([
'bash',
'./basf2_script.sh'])
426 raise RuntimeError(f
'Given site {args.site} is not supported')
431def missingSymlinks(args, printing=False):
433 Check if all xml files are present in the job directories.
434 If not, create a symlink to the collection directory
436 currDir = os.getcwd()
437 os.chdir(f
'{args.directory}/collection')
438 xmlfiles = glob.glob(
"*.xml")
439 for i
in range(args.nJobs):
440 for xmlfile
in xmlfiles:
441 if not os.path.isfile(f
'../jobs/{i}/{xmlfile}'):
443 print(
"FEI-missing-symlinks: Added missing symlink to ", xmlfile,
" in job directory ", i)
444 os.symlink(f
'../../collection/{xmlfile}', f
'../jobs/{i}/{xmlfile}')
448def do_trainings(args):
450 Trains the multivariate classifiers for all available training data in
451 the collection directory, which wasn't trained yet.
452 This is called once per iteration
454 currDir = os.getcwd()
455 os.chdir(f
'{args.directory}/collection')
456 if not os.path.isfile(
'Summary.pickle'):
458 particles, configuration = pickle.load(open(
'Summary.pickle',
'rb'))
459 weightfiles = fei.do_trainings(particles, configuration)
460 print(
'FEI-distributed-do_trainings: Finished trainings!!')
462 os.chdir(args.directory)
463 for i
in range(args.nJobs):
464 for weightfile_on_disk, _
in weightfiles:
465 if not os.path.isfile(f
'jobs/{i}/{weightfile_on_disk}'):
466 os.symlink(f
'../../collection/{weightfile_on_disk}', f
'jobs/{i}/{weightfile_on_disk}')
469 missingSymlinks(args)
470 if configuration.cache >= args.end:
471 print(
'FEI-distributed-do_trainings: Finished last training')
472 update_pickle(args, configuration.cache, 2)
476def jobs_finished(args):
478 Check if all jobs already finished.
479 Throws a runtime error of it detects an error in one of the jobs
481 currDir = os.getcwd()
482 os.chdir(args.directory)
483 finished = glob.glob(
'jobs/*/basf2_finished_successfully')
484 failed = glob.glob(
'jobs/*/basf2_job_error')
486 raise RuntimeError(f
'basf2 execution failed! Error occurred in: {str(failed)}')
488 return len(finished) == args.nJobs
491def merge_root_files(args, stage, roundMode=0):
493 Merges all produced ROOT files of all jobs together
494 and puts the merged ROOT files into the collection directory.
496 - the training data for the multivariate classifiers
497 - the monitoring files
499 print(
'FEI-distributed-merge_root_files: Merging root files in stage', stage)
501 print(f
'FEI-distributed-merge_root_files --> FEI-REtrain: Cleaning up training data after stage {args.retrain}')
502 clean_higher_stages(args)
505 currDir = os.getcwd()
506 os.chdir(args.directory)
509 for f
in glob.glob(
'jobs/0/*.root'):
510 f = os.path.basename(f)
511 if f
in [
'basf2_input.root',
'basf2_output.root',
'validation_input.root']:
513 if f.startswith(
'input_'):
515 if os.path.isfile(f
'collection/{f}')
and f
in [
'mcParticlesCount.root',
'Monitor_FSPLoader.root']:
518 if (roundMode == 1)
and ((
'PostReconstruction' in f)
or (f ==
'Monitor_Final.root')):
521 if (roundMode == 2)
and ((
'PreReconstruction' in f)
or (f
in [
'Monitor_TrainingData.root',
'training_input.root'])):
525 if len(rootfiles) == 0:
526 print(
'There are no root files to merge in jobs/0/*.root')
528 print(
'Merge the following files', rootfiles)
529 os.chdir(f
'{args.directory}/collection')
531 inputs = [f
'../jobs/{i}/{f}' for i
in range(args.nJobs)]
532 cmd = f
"analysis-fei-mergefiles -o {f} -i " +
" ".join(inputs)
533 if 'training_input' in f:
534 cmd += f
" -s {1.0-args.validation}"
535 ret = subprocess.call(cmd, shell=
True)
537 raise RuntimeError(
'Error during merging root files')
541 if f ==
'mcParticlesCount.root':
544 os.symlink(
'../../collection/mcParticlesCount.root', i)
546 print(
'FEI-distributed-merge_root_files: Done merging root files in stage', stage)
549def update_input_files(args):
551 Updates the input files.
552 For the first iteration the input files are the MC provided by the user.
553 After each training this function replaces the input with the output of the previous iteration.
554 Effectively this caches the whole DataStore of basf2 between the iterations.
556 currDir = os.getcwd()
557 os.chdir(args.directory)
558 print(f
'FEI-distributed-update_input_files: Update input files, {os.getcwd()}')
559 for i
in range(args.nJobs):
560 output_file = f
'jobs/{i}/basf2_output.root'
561 input_file = f
'jobs/{i}/basf2_input.root'
563 real_input_file = f
'{args.large_dir}/basf2_input_{i}.root'
564 shutil.move(output_file, real_input_file)
565 if os.path.isfile(input_file):
566 os.remove(input_file)
567 os.symlink(real_input_file, input_file)
569 shutil.move(output_file, input_file)
572 cache, roundMode = get_training_cache(args)
574 shutil.copyfile(
'jobs/0/Summary.pickle',
'collection/Summary.pickle')
576 cache, roundMode = get_training_cache(args)
581 update_pickle(args, roundMode=roundMode)
582 print(
'FEI-distributed-update_input_files: Updated input files to stage ',
583 cache,
'with roundMode', roundMode,
'. DONT do this again!')
587def update_pickle(args, cache=None, roundMode=None):
589 Updates the pickle file in the collection directory.
590 This is needed to keep track of the current stage of the training
593 currDir = os.getcwd()
594 os.chdir(args.directory)
595 particles, configuration = pickle.load(open(
'collection/Summary.pickle',
'rb'))
596 os.remove(
'collection/Summary.pickle')
598 cache = configuration.cache
599 if roundMode
is None:
600 roundMode = configuration.roundMode
601 os.chdir(f
'{args.directory}/collection')
602 fei.save_summary(particles, configuration, cache, roundMode)
604 return particles, configuration
607def get_training_cache(args):
609 Checks if the FEI training is still ongoing.
610 The training is finished if the FEI reached stage 7
612 if not os.path.isfile(f
'{args.directory}/collection/Summary.pickle'):
614 particles, configuration = pickle.load(open(f
'{args.directory}/collection/Summary.pickle',
'rb'))
616 return configuration.cache, configuration.roundMode
620if __name__ ==
'__main__':
622 print(f
'FEI training nohup job ID: {pid}', flush=
True)
624 args = getCommandLineOptions()
625 os.chdir(args.directory)
631 print(
'FEI-distributed: Skipping setup')
633 if args.skip ==
'clean':
635 elif args.skip ==
'update':
637 elif args.skip ==
'merge':
639 elif args.skip ==
'wait':
641 elif args.skip ==
'submit':
643 elif args.skip ==
'resubmit':
645 elif args.skip ==
'report':
647 elif args.skip ==
'run':
649 elif args.skip ==
'rebase':
651 elif args.skip ==
'retrain':
654 raise RuntimeError(f
'Unknown skip parameter {args.skip}')
657 print(
'FEI-distributed: (report) Create full report')
663 print(
'Submitting jobs')
666 if get_training_cache(args)[0]
is None or get_training_cache(
667 args)[0] <= 0
or get_training_cache(args)[1] == 3
or get_training_cache(args)[1] == 1:
668 print(
'FEI-distributed: no training, because cache is None or <= 0, or roundMode is 3 or 1')
670 print(
'FEI-distributed: (train) Do available trainings for stage: ', get_training_cache(args))
673 print(
'FEI-distributed: (submit) Submitting jobs in mode: ', get_training_cache(args))
674 for i
in range(args.nJobs):
678 error_file = f
'jobs/{i}/basf2_job_error'
679 success_file = f
'jobs/{i}/basf2_finished_successfully'
680 output_log_file = f
'jobs/{i}/output.log'
681 error_log_file = f
'jobs/{i}/error.log'
684 if (
not os.path.isfile(error_file)
and os.path.isfile(success_file)):
687 print(f
'FEI-distributed: (resubmit) Resubmitting job {i}')
688 clean_monitoring(args, i)
690 if os.path.isfile(error_file):
691 os.remove(error_file)
692 if os.path.isfile(success_file):
693 os.remove(success_file)
694 if os.path.isfile(output_log_file):
695 os.remove(output_log_file)
696 if os.path.isfile(error_log_file):
697 os.remove(error_log_file)
699 shutil.copyfile(
'collection/Summary.pickle', f
'jobs/{i}/Summary.pickle')
700 if not submit_job(args, i):
701 raise RuntimeError(
'Error during submitting job')
704 print(
'FEI-distributed: (wait) Wait for jobs to end')
705 while not jobs_finished(args):
709 print(
'FEI-distributed: (merge) Merge ROOT files with training_input.root: ', get_training_cache(args))
710 merge_root_files(args, get_training_cache(args)[0], roundMode=get_training_cache(args)[1])
713 print(
'FEI-distributed: (update) Update input files with summary: ', get_training_cache(args)[1])
714 update_input_files(args)
717 print(
'FEI-distributed: (clean) Clean job directory')
718 clean_job_directory(args)
721 print(
'FEI-distributed: (run) Resuming training')
722 tmp_cache, tmp_roundMode = get_training_cache(args)
723 if tmp_roundMode == 2:
724 update_pickle(args, roundMode=1)
729 print(
'FEI-distributed: (setup-) Rebasing setup, but do not overwrite')
730 setup(args, fullOverwrite=
False)
731 missingSymlinks(args, printing=
False)
734 print(
'FEI-distributed: Attempting to start a retraining')
735 if args.retrain >= 0:
736 if get_training_cache(args)[0]
is None:
738 f
'FEI-REtrain: Cannot retrain! No training data found in {args.directory}/collection/Summary.pickle')
739 if args.retrain-1 > get_training_cache(args)[0]:
741 f
'FEI-REtrain: Cannot retrain! Training has not reached the stage {args.retrain} yet, '
742 f
'instead its at {get_training_cache(args)}!'
744 if args.retrain > args.end:
746 f
'FEI-REtrain: Cannot retrain if you want to end the training before the retrain stage {args.retrain}!')
747 print(f
'FEI-REtrain: Retraining from stage {args.retrain}')
750 particles, configuration = update_pickle(args, args.retrain-1, 3)
751 clean_job_directory(args)
753 raise RuntimeError(
'FEI-REtrain: Cannot retrain! No retrain stage provided!')
757 print(
'FEI-distributed: (setup+) Setup from scratch')
758 setup(args, fullOverwrite=
True)
768 if get_training_cache(args)[0]
is not None and get_training_cache(args)[0] > args.end:
770 f
"FEI-distributed: Check args.end this doesn't make sense: {args.end}, training is at {get_training_cache(args)}")
772 while get_training_cache(args)[1] != 2:
773 if get_training_cache(args)[0]
is None or get_training_cache(
774 args)[0] <= 0
or get_training_cache(args)[1] == 3
or get_training_cache(args)[1] == 1:
775 print(
'FEI-distributed: no training, because cache is None or <= 0, or roundMode is 3 or 1')
777 print(
'FEI-distributed: (train) Do available trainings for stage: ', get_training_cache(args))
780 print(
'FEI-distributed: (submit) Submitting jobs in mode: ', get_training_cache(args))
781 for i
in range(args.nJobs):
782 clean_monitoring(args, i)
783 if not submit_job(args, i):
784 raise RuntimeError(
'Error during submitting jobs')
786 print(
'FEI-distributed: (wait) Wait for jobs to end')
787 while not jobs_finished(args):
790 print(
'FEI-distributed: (merge) Merge ROOT files, with training_input.root: ', get_training_cache(args)[1])
791 merge_root_files(args, get_training_cache(args)[0], roundMode=get_training_cache(args)[1])
793 print(
'FEI-distributed: (update) Update input files with summary: ', get_training_cache(args)[1])
794 update_input_files(args)
796 print(
'FEI-distributed: (clean) Clean job directory')
797 clean_job_directory(args)
804 print(
'FEI-distributed: (report) Create report')