12 from typing
import Tuple
15 from validationscript
import Script
20 A class that provides the controls for running jobs on a (remote)
21 cluster. It provides two methods:
22 - is_job_finished(job): Returns True or False, depending on whether the job
23 has finished execution
24 - execute(job): Takes a job and executes it by sending it to the cluster
30 Check if the bsub command is available
32 return shutil.which(
"bsub")
is not None
37 Returns name of this job contol
44 Returns description of this job control
46 return "Batch submission to bsub-based cluster"
50 The default constructor.
51 - Holds the current working directory, which is also the location of
52 the shellscripts that are being sent to the cluster.
53 - Initializes a logger which writes to validate_basf2.py's log.
54 - Finds the revision of basf2 that will be set up on the cluster.
65 self.
logger = logging.getLogger(
'validate_basf2')
73 belle2_release_dir = os.environ.get(
'BELLE2_RELEASE_DIR',
None)
74 belle2_local_dir = os.environ.get(
'BELLE2_LOCAL_DIR',
None)
78 if belle2_release_dir
is not None:
79 self.
b2setup +=
' ' + belle2_release_dir.split(
'/')[-1]
80 if belle2_local_dir
is not None:
81 self.
b2setup =
'MY_BELLE2_DIR=' + \
83 if os.environ.get(
'BELLE2_OPTION') !=
'debug':
84 self.
b2setup +=
'; b2code-option ' + \
85 os.environ.get(
'BELLE2_OPTION')
88 self.
logger.debug(f
'Setting up the following release: {self.b2setup}')
92 clusterlog_dir =
'./html/logs/__general__/'
93 if not os.path.exists(clusterlog_dir):
94 os.makedirs(clusterlog_dir)
99 This method can be used if path names are different on submission
101 @param path: The past that needs to be adjusted
102 @return: The adjusted path
110 The cluster should always be available to accept new jobs.
111 @return: Will always return True if the function can be called
116 def execute(self, job: Script, options=
'', dry=
False, tag=
'current'):
118 Takes a Script object and a string with options and runs it on the
119 cluster, either with ROOT or with basf2, depending on the file type.
121 @param job: The steering file object that should be executed
122 @param options: Options that will be given to the basf2 command
123 @param dry: Whether to perform a dry run or not
124 @param tag: The folder within the results directory
133 output_dir = os.path.abspath(f
'./results/{tag}/{job.package}')
134 if not os.path.exists(output_dir):
135 os.makedirs(output_dir)
137 log_file = output_dir +
'/' + os.path.basename(job.path) +
'.log'
140 donefile_path = f
"{self.path}/script_{job.name}.done"
141 if os.path.isfile(donefile_path):
142 os.remove(donefile_path)
144 extension = os.path.splitext(job.path)[1]
145 if extension ==
'.C':
147 command =
'root -b -q ' + job.path
151 command = f
'basf2 {job.path} {options}'
160 with open(tmp_name,
'w+')
as tmp_file:
161 tmp_file.write(
'#!/bin/bash \n\n' +
162 'BELLE2_NO_TOOLS_CHECK=1 \n' +
163 'source {0}/b2setup \n'.format(self.
tools) +
165 '{0} \n'.format(command) +
166 'echo $? > {0}/script_{1}.done \n'
167 .format(self.
path, job.name) +
168 'rm {0} \n'.format(tmp_name))
171 st = os.stat(tmp_name)
172 os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
176 "bsub",
"-oo", log_file,
"-q",
"l", tmp_name,
180 self.
logger.debug(subprocess.list2cmdline(params))
184 proc = subprocess.run(
186 stdout=subprocess.PIPE,
187 stderr=subprocess.PIPE,
188 universal_newlines=
True,
190 except subprocess.CalledProcessError:
191 job.status =
'failed'
192 self.
logger.error(
"Failed to submit job. Here's the traceback:")
193 self.
logger.error(traceback.format_exc())
194 self.
logger.error(
"Will attempt to cleanup job files.")
198 if proc.stdout.strip():
200 f
"Stdout of job submission: '{proc.stdout.strip()}'."
202 if proc.stderr.strip():
204 f
"Stderr of job submission: '{proc.stderr.strip()}'."
209 res = re.search(
"Job <([0-9]*)> is submitted", proc.stdout)
211 job.job_id = res.group(1)
214 f
"Could not find job id! Will not be able to terminate"
215 f
" this job, even if necessary. "
218 os.system(f
'echo 0 > {self.path}/script_{job.name}.done')
222 """ Clean up after job has finished. """
227 """ Name of temporary file used for job submission. """
228 return self.
path +
'/' +
'script_' + job.name +
'.sh'
232 Checks whether the '.done'-file has been created for a job. If so, it
233 returns True, else it returns False.
234 Also deletes the .done-File once it has returned True.
236 @param job: The job of which we want to know if it finished
237 @return: (True if the job has finished, exit code). If we can't find the
238 exit code in the '.done'-file, the returncode will be -666.
239 If the job is not finished, the exit code is returned as 0.
242 donefile_path = f
"{self.path}/script_{job.name}.done"
244 if os.path.isfile(donefile_path):
247 with open(donefile_path)
as f:
249 returncode = int(f.read().strip())
253 os.remove(donefile_path)
255 return True, returncode
262 """! Terminate a running job
265 params = [
"bkill", job.job_id]
266 self.
logger.debug(subprocess.list2cmdline(params))
268 proc = subprocess.run(
270 stdout=subprocess.PIPE,
271 stderr=subprocess.PIPE,
272 universal_newlines=
True,
274 except subprocess.CalledProcessError:
275 job.status =
'failed'
277 f
"Probably wasn't able to cancel job. Here's the traceback:"
279 self.
logger.error(traceback.format_exc())
281 if proc.stdout.strip():
283 f
"Stdout of job termination: '{proc.stdout.strip()}'."
285 if proc.stderr.strip():
287 f
"Stderr of job termination: '{proc.stderr.strip()}'."
293 "Termination of the job corresponding to steering file "
294 f
"{job.path} has been requested, but no job id is available."
295 f
" Can't do anything."