Belle II Software development
distributed.py
1#!/usr/bin/env python3
2
3
10
11"""
12 This script can be used to train the FEI on a cluster like available at KEKCC
13 All you need is a basf2 steering file (see analysis/examples/FEI/ ) and some MC O(100) million
14 The script will automatically create some directories
15 - collection containing weight files, monitoring files and other stuff
16 - jobs containing temporary files during the training (can be deleted afterwards)
17
18 The distributed script automatically spawns jobs on the cluster (or local machine),
19 and runs the steering file on the provided MC.
20 Since a FEI training requires multiple runs over the same MC, it does so multiple times.
21 The output of a run is passed as input to the next run (so your script has to use RootInput and RootOutput).
22
23 In between it calls the do_trainings function of the FEI, to train the multivariate classifiers of the FEI
24 at each stage.
25
26 At the end it produces summary outputs using printReporting.py and latexReporting.py
27 (this will only work if you use the monitoring mode)
28 In addition, a summary file for each mva training is produced using basf2_mva_evaluate.
29
30 If your training fails for some reason (e.g. a job fails on the cluster),
31 the FEI will stop, you can fix the problem and resume the training using the -x option.
32 This requires some expert knowledge, because you have to know how to fix the occurred problem
33 and at which step you have to resume the training.
34
35 After the training the weight files will be stored in the localdb in the collection directory
36 You have to upload these local database to the Belle II Condition Database if you want to use the FEI everywhere.
37 Alternatively you can just copy the localdb to somewhere and use it directly.
38
39 Example:
40 python3 ~/release/analysis/scripts/fei/distributed.py
41 -s kekcc2
42 -f ~/release/analysis/examples/FEI/B_generic.py
43 -w /home/belle2/tkeck/group/B2TauNuWorkspace_2/new_fei
44 -n 100
45 -d $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000786/s00/e0000/4S/r00000/mixed/sub01/*.root
46 | head -n 50)
47 $(ls /ghi/fs01/belle2/bdata/MC/release-00-07-02/DBxxxxxxxx/MC7/prod00000788/s00/e0000/4S/r00000/charged/sub01/*.root
48 | head -n 50)
49"""
50
51
52import subprocess
53import sys
54import os
55import argparse
56import glob
57import time
58import stat
59import shutil
60import pickle
61import json
62import b2biiConversion
63import fei
64import basf2_mva
65
66
67def getCommandLineOptions():
68 """ Parses the command line options of the fei and returns the corresponding arguments. """
69 # FEI defines own command line options, therefore we disable
70 # the ROOT command line options, which otherwise interfere sometimes.
71 # Always avoid the top-level 'import ROOT'.
72 import ROOT # noqa
73 ROOT.PyConfig.IgnoreCommandLineOptions = True
74 parser = argparse.ArgumentParser()
75 parser.add_argument('-f', '--steeringFile', dest='steering', type=str, required=True,
76 help='Steering file. Calls fei.get_path()')
77 parser.add_argument('-w', '--workingDirectory', dest='directory', type=str, required=True,
78 help='Working directory for basf2 jobs. On KEKCC, this must NOT be on HSM!')
79 parser.add_argument('-l', '--largeDirectory', dest='large_dir', type=str, default='',
80 help='Directory to store large files')
81 parser.add_argument('-n', '--nJobs', dest='nJobs', type=int, default=100,
82 help='Number of jobs')
83 parser.add_argument('-d', '--data', dest='data', type=str, required=True, action='append', nargs='+',
84 help='Data files in bash expansion syntax or as process_url')
85 parser.add_argument('-x', '--skip-to', dest='skip', type=str, default='',
86 help='Skip setup of directories')
87 parser.add_argument('-o', '--once', dest='once', action='store_true',
88 help='Execute just once time, instead of waiting until a Summary is produced.')
89 parser.add_argument('-s', '--site', dest='site', type=str, default='kekcc',
90 help='Site to use [kekcc|kitekp|local]')
91 parser.add_argument('-e', '--end', dest='end', type=int, default=6,
92 help='Stage at which to end the training')
93 parser.add_argument('-r', '--retrain', dest='retrain', type=int, default=-1,
94 help='Stage at which to retrain the training')
95 parser.add_argument('-v', '--validation', dest='validation', type=float,
96 default=0.2, help='Fraction of data to use for validation')
97 args = parser.parse_args()
98 return args
99
100
101def get_job_script():
102 """
103 Create a bash file which will be dispatched to the batch system.
104 The file will run basf2 on the provided MC or the previous output
105 using the provided steering file.
106 """
107 job_script = """
108 if [ -f "basf2_input.root" ]; then
109 INPUT="basf2_input.root"
110 else
111 INPUT="input_*.root"
112 fi
113 time basf2 -l error ../../collection/basf2_steering_file.py -i "$INPUT" \
114 -o basf2_output.root &> my_output_hack.log || touch basf2_job_error
115 touch basf2_finished_successfully
116 """
117 return job_script
118
119
120def setup(args, fullOverwrite=True):
121 """
122 Setup all directories, create job_scripts, split up MC into chunks
123 which are processed by each job. Create symlinks for databases.
124 """
125 print(f'FEI-distributed-setup: Setup environment in {args.directory} with overwrite {fullOverwrite}')
126 currDir = os.getcwd()
127
128 os.chdir(args.directory)
129 # Search and partition data files into even chunks
130 data_files = []
131
132 for x in args.data:
133 for y in x:
134 if (y.startswith("http://") or y.startswith("https://")):
135 data_files += b2biiConversion.parse_process_url(y)
136 else:
137 data_files += glob.glob(y)
138 print(f'FEI-distributed-setup: Found {len(data_files)} MC files')
139 file_sizes = []
140 for file in data_files:
141 file_sizes.append(os.stat(file).st_size)
142 data_files_sorted = [x for _, x in sorted(zip(file_sizes, data_files))]
143 n = int(len(data_files) / args.nJobs)
144 if n < 1:
145 raise RuntimeError(f'Too few MC files {len(data_files)} for the given number of jobs {args.nJobs}')
146 data_chunks = [data_files_sorted[i::args.nJobs] for i in range(args.nJobs)]
147
148 # Create needed directories
149 print(f'FEI-distributed-setup: Create environment in {args.directory}')
150 if fullOverwrite:
151 shutil.rmtree('collection', ignore_errors=True)
152 os.mkdir('collection')
153 if os.path.isfile('collection/Summary.pickle'):
154 os.remove('collection/Summary.pickle')
155 os.mkdir('collection/localdb')
156 shutil.rmtree('jobs', ignore_errors=True)
157 os.mkdir('jobs')
158 if args.large_dir:
159 if not os.path.isdir(args.large_dir):
160 raise RuntimeError('FEI-distributed-setup: Large dir does not exist. Please make sure it does.')
161
162 shutil.copyfile(args.steering, 'collection/basf2_steering_file.py')
163
164 for i in range(args.nJobs):
165 # Create job directory
166 os.mkdir(f'jobs/{i}')
167 job_script = get_job_script()
168 with open(f'jobs/{i}/basf2_script.sh', 'w') as f:
169 f.write(job_script)
170 os.chmod(f.fileno(), stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
171 # Symlink initial input data files
172 for j, data_input in enumerate(data_chunks[i]):
173 os.symlink(data_input, f'jobs/{i}/input_{j}.root')
174 # Symlink weight directory and basf2_path
175 os.symlink('../../collection/localdb', f'jobs/{i}/localdb')
176 os.chdir(currDir)
177
178
179def create_report(args):
180 """
181 Dumps Summary.pickle to JSON for easy inspection.
182 Create all the reports for the FEI training and the individual mva trainings.
183 This will only work if
184 1) Monitoring mode is used (see FeiConfiguration)
185 2) The system has enough memory to hold the training data for the mva classifiers
186 If this fails you can also copy the collection directory somewhere and
187 execute the commands by hand.
188 """
189 import ROOT # noqa
190 print('FEI-distributed-report: Create report')
191 currDir = os.getcwd()
192
193 os.chdir(f'{args.directory}/collection')
194 with open('Summary.pickle', 'rb') as file:
195 summary = pickle.load(file)
196
197 summary_dict = {particle.identifier:
198 {'mvaConfig': particle.mvaConfig._asdict(),
199 'channels': [{field: (value._asdict() if field in ['mvaConfig', 'preCutConfig'] else value) for
200 field, value in channel._asdict().items()} for channel in particle.channels],
201 'preCutConfig': particle.preCutConfig._asdict(),
202 'postCutConfig': particle.postCutConfig._asdict()}
203 for particle in summary[0]}
204 summary_dict.update({'feiConfig': summary[1]._asdict()})
205
206 with open('Summary.json', 'w') as summary_json_file:
207 json.dump(summary_dict, summary_json_file, indent=4)
208 print('FEI-distributed-report: read Summary.pickle and wrote Summary.json')
209
210 ret = subprocess.call('basf2 fei/printReporting.py summary.txt', shell=True)
211 if ret != 0:
212 raise RuntimeError('Error during printReporting.py')
213 else:
214 print('FEI-distributed-report: Created summary.txt')
215 ret = subprocess.call('basf2 fei/latexReporting.py summary.tex', shell=True)
216 if ret != 0:
217 raise RuntimeError('Error during latexReporting.py')
218 else:
219 print('FEI-distributed-report: Created summary.tex')
220
221 print('FEI-distributed-report: Creating *.zip files for the mva evaluation.')
222 particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
223 stage_particles = fei.core.get_stages_from_particles(particles)
224 ret = 0
225 for istage in range(len(stage_particles)):
226 stage = stage_particles[istage]
227 for particle in stage:
228 for channel in particle.channels:
229 if basf2_mva.available(f'{channel.label}.xml') and not fei.core.Teacher.check_if_weightfile_is_fake(
230 f'{channel.label}.xml'):
231 treeName = ROOT.Belle2.MakeROOTCompatible.makeROOTCompatible(f'{channel.label} variables')
232 try:
233 result = subprocess.run(
234 f"basf2_mva_evaluate.py -id '{channel.label}.xml' -train 'training_input.root' "
235 f"-data 'validation_input.root' "
236 f"--treename '{treeName}' "
237 f"-o '../{channel.label}.zip'",
238 shell=True,
239 capture_output=True,
240 text=True,
241 check=True)
242 print(f"FEi-distributed-report: Created {channel.label}.zip, success status: ")
243 print(result.stdout)
244 print(f"FEi-distributed-report: err-output evaluation of {channel.label}.xml")
245 print(result.stderr)
246 except subprocess.CalledProcessError as e:
247 print(f"FEi-distributed-report: Error during evaluation of {channel.label}.xml")
248 print(e.stderr)
249 ret = 1
250 else:
251 print(f"FEi-distributed-report: Skipping evaluation of {channel.label}.xml as it is a fake weight file")
252 os.chdir(currDir)
253 print('FEI-distributed-report: DONE creating reports')
254 return ret == 0
255
256
257def remove_objects(file_path, objectNames):
258 """
259 Remove objects from a ROOT file
260 """
261 if os.path.islink(file_path):
262 file_path = os.readlink(file_path)
263 import ROOT # noqa
264 root_file = ROOT.TFile.Open(file_path, "UPDATE")
265 key_names = [key.GetName() for key in root_file.GetListOfKeys()]
266
267 print(f'All keys in {file_path}: {key_names}')
268 for obj_name in key_names:
269 if any([ROOT.Belle2.MakeROOTCompatible.makeROOTCompatible(objectName) == obj_name for objectName in objectNames]):
270 print(f'Removed {obj_name}')
271 root_file.Delete(f'{obj_name};*')
272 root_file.Write("", ROOT.TObject.kOverwrite)
273 root_file.Close()
274
275
276def remove_helper(args, channelLabel, configuration):
277 """
278 Helper function to remove all files related to a channelLabel
279 """
280 curDir = os.getcwd()
281 os.chdir(f'{args.directory}/collection')
282 print(f'FEI-REtrain: Cleaning {channelLabel}')
283 # 3.2/ remove .zip, trained BDTs, logs, etc. in collection:
284 removeFiles = glob.glob(f'{channelLabel}*')
285 removeFiles += glob.glob(f'{channelLabel.split(" ")[0]}*.png')
286 removeFiles += glob.glob(f'../{channelLabel}*.zip')
287 print(f'FEI-REtrain: Removing {channelLabel}* files in total {len(removeFiles)}')
288 for f in removeFiles:
289 os.remove(f)
290 # 3.3/ clean localdb in collection: # not for now
291
292 os.chdir(args.directory)
293 # 3.4/ clean symlinks:
294 # - remove .xml symlinks with particle identifier and Summary.pickle
295 symlinks = glob.glob(f'jobs/*/{channelLabel}.xml')
296 print(f'FEI-REtrain: Removing symlinks for {channelLabel} in total {len(symlinks)}')
297 for f in symlinks:
298 os.unlink(f)
299 os.chdir(curDir)
300
301
302def clean_higher_stages(args):
303 """
304 Cleans the higher stages of the training.
305 This is needed if you want to retrain the BDTs
306 """
307 print('FEI-distributed-clean_higher_stages: Cleaning higher stages')
308 currDir = os.getcwd()
309 os.chdir(f'{args.directory}/collection')
310 # 3.1/ basf2_input.root Are pruned before this, using a special run of jobs with roundMode == 3
311
312 stages = fei.core.get_stages_from_particles(particles)
313 channels_to_remove = []
314 # list of all .xml weight files
315 xmlfiles = glob.glob("*.xml")
316 for i in range(args.retrain-1, len(stages)):
317 print(f'FEI-distributed-clean_higher_stages: Cleaning stage {i}')
318 for particle in stages[i]:
319 for channel in particle.channels:
320 if f'{channel.label}.xml' in xmlfiles:
321 xmlfiles.remove(f'{channel.label}.xml') # this will force to retrain
322 remove_helper(args, channel.label, configuration)
323 # 3.5/ prune Monitoring training plots in collection:
324 channels_to_remove.append(channel.label)
325 print(f"FEI-REtrain: Channels to remove: {channels_to_remove}")
326
327 for xf in xmlfiles:
328 print(f'FEI-REtrain: {xf} will not be retrained!')
329
330 # 3.6/ prune Monitoring plots in collection:
331 partIds = []
332 for i in range(args.retrain-1, len(stages)):
333 for particle in stages[i]:
334 partIds.append(particle.identifier)
335 if "J/psi" in particle.identifier:
336 partIds.append(particle.identifier.replace("J/psi", "Jpsi"))
337 print(f'FEI-REtrain: particles to remove: {partIds}')
338
339 root_files = glob.glob('*.root')
340 for f in root_files:
341 if any([x in f for x in ['Monitor_PostReconstruction_AfterMVA', 'Monitor_Pre', 'Monitor_TrainingData']]):
342 print(f'FEI-REtrain: Removing branches in {f}')
343 remove_objects(f, objectNames=channels_to_remove)
344 elif 'Monitor_PostReco' in f:
345 print(f'FEI-REtrain: Removing branches in {f}')
346 remove_objects(f, objectNames=partIds)
347 elif 'Monitor_Final' in f:
348 print(f'FEI-REtrain: Removing branches in {f}')
349 remove_objects(f, objectNames=[f'{partId} variables' for partId in partIds])
350 elif any([x in f for x in ['training_input', 'validation_input']]):
351 print(f'FEI-REtrain: Removing branches in {f}')
352 remove_objects(f, objectNames=[f'{ch} variables' for ch in channels_to_remove])
353
354 # 3.7/ clean job directories:
355 # - remove all root files except mcParticlesCount.root and basf2_input.root
356 # - particles in Particle list are removed in a special run of jobs using RemoveParticlesNotInList module
357 os.chdir(args.directory)
358 root_files = glob.glob('jobs/*/*.root')
359 print('FEI-REtrain: Removing job root files (not mcParticlesCount.root, basf2_input.root or basf2_output.root)')
360 for f in root_files:
361 if 'mcParticlesCount.root' not in f and 'basf2_input.root' not in f and 'basf2_output.root' not in f:
362 os.remove(f)
363 os.chdir(currDir)
364 print('FEI-distributed-clean_higher_stages: Done cleaning higher stages')
365
366
367def clean_monitoring(args, ijob=None):
368 """
369 Cleans the monitoring files in the jobs directory and the training_input.root files
370 """
371 if ijob is None:
372 files = glob.glob('jobs/*/Monitor_*.root')
373 files += glob.glob('jobs/*/training_input.root')
374 else:
375 files = glob.glob(f'jobs/{ijob}/Monitor_*.root')
376 files += glob.glob(f'jobs/{ijob}/training_input.root')
377
378 for f in files:
379 os.remove(f)
380
381
382def clean_job_directory(args):
383 """
384 Cleans the job directory for the next iteration
385 Meaning we remove all logs
386 """
387 files = glob.glob('jobs/*/basf2_finished_successfully')
388 files += glob.glob('jobs/*/error.log')
389 files += glob.glob('jobs/*/output.log')
390 files += glob.glob('jobs/*/basf2_job_error')
391 for f in files:
392 os.remove(f)
393 for i in range(args.nJobs):
394 nHackLogs = len(glob.glob(f'jobs/{i}/my_output_hack.log.backup_*'))
395 if os.path.isfile(f'jobs/{i}/my_output_hack.log'):
396 os.rename(f'jobs/{i}/my_output_hack.log', f'jobs/{i}/my_output_hack.log.backup_{nHackLogs}')
397
398
399def submit_job(args, i):
400 """
401 Submits a job to the desired batch system.
402 Currently we can run on KEKCC (long queue), KEKCC (dedicated FEI queue),
403 EKP @ KIT, or your local machine
404 """
405 # Synchronize summaries
406 currDir = os.getcwd()
407 os.chdir(args.directory)
408 if os.path.isfile('collection/Summary.pickle'):
409 shutil.copyfile('collection/Summary.pickle', f'jobs/{i}/Summary.pickle')
410
411 os.chdir(f'{args.directory}/jobs/{i}/')
412 if args.site == 'kekcc':
413 ret = subprocess.call("bsub -q l -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
414 elif args.site == 'kekccs':
415 ret = subprocess.call("bsub -q s -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
416 elif 'kekccX' in args.site:
417 ret = subprocess.call(f"bsub -q l -n {args.site[6:]} -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
418 elif args.site == 'kekcc2':
419 ret = subprocess.call("bsub -q b2_fei -e error.log -o output.log ./basf2_script.sh | cut -f 2 -d ' ' | sed 's/<//' | sed 's/>//' > basf2_jobid", shell=True) # noqa
420 elif args.site == 'kitekp':
421 ret = subprocess.call("qsub -cwd -q express,short,medium,long -e error.log -o output.log -V basf2_script.sh | cut -f 3 -d ' ' > basf2_jobid", shell=True) # noqa
422 elif args.site == 'local':
423 subprocess.Popen(['bash', './basf2_script.sh'])
424 ret = 0
425 else:
426 raise RuntimeError(f'Given site {args.site} is not supported')
427 os.chdir(currDir)
428 return ret == 0
429
430
431def missingSymlinks(args, printing=False):
432 """
433 Check if all xml files are present in the job directories.
434 If not, create a symlink to the collection directory
435 """
436 currDir = os.getcwd()
437 os.chdir(f'{args.directory}/collection')
438 xmlfiles = glob.glob("*.xml")
439 for i in range(args.nJobs):
440 for xmlfile in xmlfiles:
441 if not os.path.isfile(f'../jobs/{i}/{xmlfile}'):
442 if printing:
443 print("FEI-missing-symlinks: Added missing symlink to ", xmlfile, " in job directory ", i)
444 os.symlink(f'../../collection/{xmlfile}', f'../jobs/{i}/{xmlfile}')
445 os.chdir(currDir)
446
447
448def do_trainings(args):
449 """
450 Trains the multivariate classifiers for all available training data in
451 the collection directory, which wasn't trained yet.
452 This is called once per iteration
453 """
454 currDir = os.getcwd()
455 os.chdir(f'{args.directory}/collection')
456 if not os.path.isfile('Summary.pickle'):
457 return
458 particles, configuration = pickle.load(open('Summary.pickle', 'rb'))
459 weightfiles = fei.do_trainings(particles, configuration)
460 print('FEI-distributed-do_trainings: Finished trainings!!')
461
462 os.chdir(args.directory)
463 for i in range(args.nJobs):
464 for weightfile_on_disk, _ in weightfiles:
465 if not os.path.isfile(f'jobs/{i}/{weightfile_on_disk}'):
466 os.symlink(f'../../collection/{weightfile_on_disk}', f'jobs/{i}/{weightfile_on_disk}')
467
468 # Check if some xml files are missing
469 missingSymlinks(args)
470 if configuration.cache >= args.end:
471 print('FEI-distributed-do_trainings: Finished last training')
472 update_pickle(args, configuration.cache, 2)
473 os.chdir(currDir)
474
475
476def jobs_finished(args):
477 """
478 Check if all jobs already finished.
479 Throws a runtime error of it detects an error in one of the jobs
480 """
481 currDir = os.getcwd()
482 os.chdir(args.directory)
483 finished = glob.glob('jobs/*/basf2_finished_successfully')
484 failed = glob.glob('jobs/*/basf2_job_error')
485 if len(failed) > 0:
486 raise RuntimeError(f'basf2 execution failed! Error occurred in: {str(failed)}')
487 os.chdir(currDir)
488 return len(finished) == args.nJobs
489
490
491def merge_root_files(args, stage, roundMode=0):
492 """
493 Merges all produced ROOT files of all jobs together
494 and puts the merged ROOT files into the collection directory.
495 This affects mostly
496 - the training data for the multivariate classifiers
497 - the monitoring files
498 """
499 print('FEI-distributed-merge_root_files: Merging root files in stage', stage)
500 if roundMode == 3:
501 print(f'FEI-distributed-merge_root_files --> FEI-REtrain: Cleaning up training data after stage {args.retrain}')
502 clean_higher_stages(args)
503 return
504
505 currDir = os.getcwd()
506 os.chdir(args.directory)
507
508 rootfiles = []
509 for f in glob.glob('jobs/0/*.root'):
510 f = os.path.basename(f)
511 if f in ['basf2_input.root', 'basf2_output.root', 'validation_input.root']:
512 continue
513 if f.startswith('input_'):
514 continue
515 if os.path.isfile(f'collection/{f}') and f in ['mcParticlesCount.root', 'Monitor_FSPLoader.root']:
516 # done only once, TODO potential problem if you add additional particle PDG code on retraining?
517 continue
518 if (roundMode == 1) and (('PostReconstruction' in f) or (f == 'Monitor_Final.root')):
519 # ignore when only preReco
520 continue
521 if (roundMode == 2) and (('PreReconstruction' in f) or (f in ['Monitor_TrainingData.root', 'training_input.root'])):
522 # ignore when only postReco
523 continue
524 rootfiles.append(f)
525 if len(rootfiles) == 0:
526 print('There are no root files to merge in jobs/0/*.root')
527 else:
528 print('Merge the following files', rootfiles)
529 os.chdir(f'{args.directory}/collection')
530 for f in rootfiles:
531 inputs = [f'../jobs/{i}/{f}' for i in range(args.nJobs)]
532 cmd = f"analysis-fei-mergefiles -o {f} -i " + " ".join(inputs)
533 if 'training_input' in f:
534 cmd += f" -s {1.0-args.validation}"
535 ret = subprocess.call(cmd, shell=True)
536 if ret != 0:
537 raise RuntimeError('Error during merging root files')
538
539 # Replace mcParticlesCount.root with merged file in all directories
540 # so that the individual jobs calculate the correct mcCounts and sampling rates
541 if f == 'mcParticlesCount.root':
542 for i in inputs:
543 os.remove(i)
544 os.symlink('../../collection/mcParticlesCount.root', i)
545 os.chdir(currDir)
546 print('FEI-distributed-merge_root_files: Done merging root files in stage', stage)
547
548
549def update_input_files(args):
550 """
551 Updates the input files.
552 For the first iteration the input files are the MC provided by the user.
553 After each training this function replaces the input with the output of the previous iteration.
554 Effectively this caches the whole DataStore of basf2 between the iterations.
555 """
556 currDir = os.getcwd()
557 os.chdir(args.directory)
558 print(f'FEI-distributed-update_input_files: Update input files, {os.getcwd()}')
559 for i in range(args.nJobs):
560 output_file = f'jobs/{i}/basf2_output.root'
561 input_file = f'jobs/{i}/basf2_input.root'
562 if args.large_dir:
563 real_input_file = f'{args.large_dir}/basf2_input_{i}.root'
564 shutil.move(output_file, real_input_file)
565 if os.path.isfile(input_file):
566 os.remove(input_file)
567 os.symlink(real_input_file, input_file)
568 else:
569 shutil.move(output_file, input_file)
570 # Saves the Summary.pickle of the first job to the collection directory
571 # so we can keep track at which stage of the reconstruction the FEI is currently.
572 cache, roundMode = get_training_cache(args)
573 if roundMode != 2:
574 shutil.copyfile('jobs/0/Summary.pickle', 'collection/Summary.pickle')
575
576 cache, roundMode = get_training_cache(args)
577 if roundMode == 1:
578 roundMode = 0
579 if roundMode == 3:
580 roundMode = 1
581 update_pickle(args, roundMode=roundMode)
582 print('FEI-distributed-update_input_files: Updated input files to stage ',
583 cache, 'with roundMode', roundMode, '. DONT do this again!')
584 os.chdir(currDir)
585
586
587def update_pickle(args, cache=None, roundMode=None):
588 """
589 Updates the pickle file in the collection directory.
590 This is needed to keep track of the current stage of the training
591 and the roundMode.
592 """
593 currDir = os.getcwd()
594 os.chdir(args.directory)
595 particles, configuration = pickle.load(open('collection/Summary.pickle', 'rb'))
596 os.remove('collection/Summary.pickle')
597 if cache is None:
598 cache = configuration.cache
599 if roundMode is None:
600 roundMode = configuration.roundMode
601 os.chdir(f'{args.directory}/collection')
602 fei.save_summary(particles, configuration, cache, roundMode)
603 os.chdir(currDir)
604 return particles, configuration
605
606
607def get_training_cache(args):
608 """
609 Checks if the FEI training is still ongoing.
610 The training is finished if the FEI reached stage 7
611 """
612 if not os.path.isfile(f'{args.directory}/collection/Summary.pickle'):
613 return -1, 0
614 particles, configuration = pickle.load(open(f'{args.directory}/collection/Summary.pickle', 'rb'))
615 # return configuration.cache != 7
616 return configuration.cache, configuration.roundMode
617
618
619# ================================================================
620if __name__ == '__main__':
621 pid = os.getpid()
622 print(f'FEI training nohup job ID: {pid}', flush=True)
623
624 args = getCommandLineOptions()
625 os.chdir(args.directory)
626
627 # If the user wants resume an existing training
628 # we check at which step he wants to resume and
629 # try to perform the necessary steps to do so
630 if args.skip:
631 print('FEI-distributed: Skipping setup')
632 start = 0
633 if args.skip == 'clean':
634 start = 1
635 elif args.skip == 'update':
636 start = 2
637 elif args.skip == 'merge':
638 start = 3
639 elif args.skip == 'wait':
640 start = 4
641 elif args.skip == 'submit':
642 start = 5
643 elif args.skip == 'resubmit':
644 start = 6
645 elif args.skip == 'report':
646 start = 7
647 elif args.skip == 'run':
648 start = 0
649 elif args.skip == 'rebase':
650 start = -1
651 elif args.skip == 'retrain':
652 start = -2
653 else:
654 raise RuntimeError(f'Unknown skip parameter {args.skip}')
655
656 if start == 7:
657 print('FEI-distributed: (report) Create full report')
658 create_report(args)
659 sys.exit(0)
660
661 # The user wants to submit the jobs again
662 if start >= 5:
663 print('Submitting jobs')
664 # if not resubmit also retrain BDTs if needed
665 if start == 5:
666 if get_training_cache(args)[0] is None or get_training_cache(
667 args)[0] <= 0 or get_training_cache(args)[1] == 3 or get_training_cache(args)[1] == 1:
668 print('FEI-distributed: no training, because cache is None or <= 0, or roundMode is 3 or 1')
669 else:
670 print('FEI-distributed: (train) Do available trainings for stage: ', get_training_cache(args))
671 do_trainings(args)
672
673 print('FEI-distributed: (submit) Submitting jobs in mode: ', get_training_cache(args))
674 for i in range(args.nJobs):
675 # The user wants to resubmit jobs, this means the training of some jobs failed
676 # We check which jobs contained an error flag, and were not successful
677 # These jobs are submitted again, other jobs are skipped (continue)
678 error_file = f'jobs/{i}/basf2_job_error'
679 success_file = f'jobs/{i}/basf2_finished_successfully'
680 output_log_file = f'jobs/{i}/output.log'
681 error_log_file = f'jobs/{i}/error.log'
682
683 if start >= 6:
684 if (not os.path.isfile(error_file) and os.path.isfile(success_file)):
685 continue
686 else:
687 print(f'FEI-distributed: (resubmit) Resubmitting job {i}')
688 clean_monitoring(args, i)
689
690 if os.path.isfile(error_file):
691 os.remove(error_file)
692 if os.path.isfile(success_file):
693 os.remove(success_file)
694 if os.path.isfile(output_log_file):
695 os.remove(output_log_file)
696 if os.path.isfile(error_log_file):
697 os.remove(error_log_file)
698 # Reset Summary file
699 shutil.copyfile('collection/Summary.pickle', f'jobs/{i}/Summary.pickle')
700 if not submit_job(args, i):
701 raise RuntimeError('Error during submitting job')
702
703 if start >= 4:
704 print('FEI-distributed: (wait) Wait for jobs to end')
705 while not jobs_finished(args):
706 time.sleep(40)
707
708 if start >= 3:
709 print('FEI-distributed: (merge) Merge ROOT files with training_input.root: ', get_training_cache(args))
710 merge_root_files(args, get_training_cache(args)[0], roundMode=get_training_cache(args)[1])
711
712 if start >= 2:
713 print('FEI-distributed: (update) Update input files with summary: ', get_training_cache(args)[1])
714 update_input_files(args)
715
716 if start >= 1:
717 print('FEI-distributed: (clean) Clean job directory')
718 clean_job_directory(args)
719
720 if start == 0:
721 print('FEI-distributed: (run) Resuming training')
722 tmp_cache, tmp_roundMode = get_training_cache(args)
723 if tmp_roundMode == 2:
724 update_pickle(args, roundMode=1)
725 else:
726 update_pickle(args)
727
728 if start == -1:
729 print('FEI-distributed: (setup-) Rebasing setup, but do not overwrite')
730 setup(args, fullOverwrite=False)
731 missingSymlinks(args, printing=False)
732
733 if start == -2:
734 print('FEI-distributed: Attempting to start a retraining')
735 if args.retrain >= 0:
736 if get_training_cache(args)[0] is None:
737 raise RuntimeError(
738 f'FEI-REtrain: Cannot retrain! No training data found in {args.directory}/collection/Summary.pickle')
739 if args.retrain-1 > get_training_cache(args)[0]:
740 raise RuntimeError(
741 f'FEI-REtrain: Cannot retrain! Training has not reached the stage {args.retrain} yet, '
742 f'instead its at {get_training_cache(args)}!'
743 )
744 if args.retrain > args.end:
745 raise RuntimeError(
746 f'FEI-REtrain: Cannot retrain if you want to end the training before the retrain stage {args.retrain}!')
747 print(f'FEI-REtrain: Retraining from stage {args.retrain}')
748 # 3.0/ fix pickle
749 # candidates for retrain stage BDT training are computed one stage earlier
750 particles, configuration = update_pickle(args, args.retrain-1, 3)
751 clean_job_directory(args)
752 else:
753 raise RuntimeError('FEI-REtrain: Cannot retrain! No retrain stage provided!')
754 else:
755 # This is a new training
756 # So we have to setup the whole directory (this will override any existing training)
757 print('FEI-distributed: (setup+) Setup from scratch')
758 setup(args, fullOverwrite=True)
759
760 # The main loop, which steers the whole FEI training on a batch system
761 # 1. We check if the FEI still requires further steps
762 # 2. We do all necessary trainings which we can perform at this point in time
763 # 3. We submit new jobs which will use the new trainings to reconstruct the hierarchy further
764 # 4. We wait until all jobs finished
765 # 5. We merge the output of the jobs
766 # 6. We update the inputs of the jobs (input of next stage is the output of the current stage)
767 # 7. We clean the job directories so they can be used during the next stage again.
768 if get_training_cache(args)[0] is not None and get_training_cache(args)[0] > args.end:
769 raise RuntimeError(
770 f"FEI-distributed: Check args.end this doesn't make sense: {args.end}, training is at {get_training_cache(args)}")
771
772 while get_training_cache(args)[1] != 2:
773 if get_training_cache(args)[0] is None or get_training_cache(
774 args)[0] <= 0 or get_training_cache(args)[1] == 3 or get_training_cache(args)[1] == 1:
775 print('FEI-distributed: no training, because cache is None or <= 0, or roundMode is 3 or 1')
776 else:
777 print('FEI-distributed: (train) Do available trainings for stage: ', get_training_cache(args))
778 do_trainings(args)
779
780 print('FEI-distributed: (submit) Submitting jobs in mode: ', get_training_cache(args))
781 for i in range(args.nJobs):
782 clean_monitoring(args, i)
783 if not submit_job(args, i):
784 raise RuntimeError('Error during submitting jobs')
785
786 print('FEI-distributed: (wait) Wait for jobs to end')
787 while not jobs_finished(args):
788 time.sleep(40)
789
790 print('FEI-distributed: (merge) Merge ROOT files, with training_input.root: ', get_training_cache(args)[1])
791 merge_root_files(args, get_training_cache(args)[0], roundMode=get_training_cache(args)[1])
792
793 print('FEI-distributed: (update) Update input files with summary: ', get_training_cache(args)[1])
794 update_input_files(args)
795
796 print('FEI-distributed: (clean) Clean job directory')
797 clean_job_directory(args)
798
799 if args.once:
800 break
801 else:
802 # This else will be called if the loop was not exited by break
803 # This means the training finished and we can create our summary reports.
804 print('FEI-distributed: (report) Create report')
805 create_report(args)