19from typing
import Tuple
22from validationscript
import Script
27 A class that provides the controls for running jobs on a (remote)
28 cluster. It provides two methods:
29 -
is_job_finished(job): Returns
True or False, depending on whether the job
30 has finished execution
31 -
execute(job): Takes a job
and executes it by sending it to the cluster
37 Check if the bsub command
is available
39 return shutil.which(
"bsub")
is not None
44 Returns name of this job control
51 Returns description of this job control
53 return "Batch submission to bsub-based cluster"
57 The default constructor.
58 - Holds the current working directory, which is also the location of
59 the shellscripts that are being sent to the cluster.
60 - Initializes a logger which writes to validate_basf2.py
's log.
61 - Finds the revision of basf2 that will be set up on the cluster.
72 self.logger = logging.getLogger("validate_basf2")
80 belle2_release_dir = os.environ.get(
"BELLE2_RELEASE_DIR",
None)
81 belle2_local_dir = os.environ.get(
"BELLE2_LOCAL_DIR",
None)
85 if belle2_release_dir
is not None:
86 self.
b2setup +=
" " + belle2_release_dir.split(
"/")[-1]
87 if belle2_local_dir
is not None:
94 if os.environ.get(
"BELLE2_OPTION") !=
"debug":
95 self.
b2setup +=
"; b2code-option " + os.environ.get(
"BELLE2_OPTION")
98 self.
logger.debug(f
"Setting up the following release: {self.b2setup}")
102 clusterlog_dir =
"./html/logs/__general__/"
103 if not os.path.exists(clusterlog_dir):
104 os.makedirs(clusterlog_dir)
109 This method can be used if path names are different on submission
111 @param path: The past that needs to be adjusted
112 @return: The adjusted path
120 The cluster should always be available to accept new jobs.
121 @return: Will always
return True if the function can be called
126 def execute(self, job: Script, options=
"", dry=
False, tag=
"current"):
128 Takes a Script object and a string
with options
and runs it on the
129 cluster, either
with ROOT
or with basf2, depending on the file type.
131 @param job: The steering file object that should be executed
132 @param options: Options that will be given to the basf2 command
133 @param dry: Whether to perform a dry run
or not
134 @param tag: The folder within the results directory
143 output_dir = os.path.abspath(f
"./results/{tag}/{job.package}")
144 if not os.path.exists(output_dir):
145 os.makedirs(output_dir)
147 log_file = output_dir +
"/" + os.path.basename(job.path) +
".log"
150 donefile_path = f
"{self.path}/script_{job.name}.done"
151 if os.path.isfile(donefile_path):
152 os.remove(donefile_path)
154 extension = os.path.splitext(job.path)[1]
155 if extension ==
".C":
157 command =
"root -b -q " + job.path
161 command = f
"basf2 {job.path} {options}"
170 with open(tmp_name,
"w+")
as tmp_file:
173 +
"BELLE2_NO_TOOLS_CHECK=1 \n"
174 + f
"source {self.tools}/b2setup \n"
175 + f
"cd {self.adjust_path(output_dir)} \n"
177 + f
"echo $? > {self.path}/script_{job.name}.done \n"
178 + f
"rm {tmp_name} \n"
182 st = os.stat(tmp_name)
183 os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
196 self.
logger.debug(subprocess.list2cmdline(params))
200 proc = subprocess.run(
202 stdout=subprocess.PIPE,
203 stderr=subprocess.PIPE,
204 universal_newlines=
True,
206 except subprocess.CalledProcessError:
207 job.status =
"failed"
208 self.
logger.error(
"Failed to submit job. Here's the traceback:")
209 self.
logger.error(traceback.format_exc())
210 self.
logger.error(
"Will attempt to cleanup job files.")
214 if proc.stdout.strip():
216 f
"Stdout of job submission: '{proc.stdout.strip()}'."
218 if proc.stderr.strip():
220 f
"Stderr of job submission: '{proc.stderr.strip()}'."
225 res = re.search(
"Job <([0-9]*)> is submitted", proc.stdout)
227 job.job_id = res.group(1)
230 "Could not find job id! Will not be able to terminate"
231 " this job, even if necessary. "
234 os.system(f
"echo 0 > {self.path}/script_{job.name}.done")
238 """ Clean up after job has finished. """
243 """ Name of temporary file used for job submission. """
244 return self.
path +
"/" +
"script_" + job.name +
".sh"
248 Checks whether the '.done'-file has been created
for a job. If so, it
249 returns
True,
else it returns
False.
250 Also deletes the .done-File once it has returned
True.
252 @param job: The job of which we want to know
if it finished
253 @return: (
True if the job has finished, exit code). If we can
't find the
254 exit code in the
'.done'-file, the returncode will be -654.
255 If the job
is not finished, the exit code
is returned
as 0.
258 donefile_path = f"{self.path}/script_{job.name}.done"
260 if os.path.isfile(donefile_path):
263 with open(donefile_path)
as f:
265 returncode = int(f.read().strip())
269 os.remove(donefile_path)
271 return True, returncode
278 """! Terminate a running job
281 params = [
"bkill", job.job_id]
282 self.
logger.debug(subprocess.list2cmdline(params))
284 proc = subprocess.run(
286 stdout=subprocess.PIPE,
287 stderr=subprocess.PIPE,
288 universal_newlines=
True,
290 except subprocess.CalledProcessError:
291 job.status =
"failed"
293 "Probably wasn't able to cancel job. Here's the traceback:"
295 self.
logger.error(traceback.format_exc())
297 if proc.stdout.strip():
299 f
"Stdout of job termination: '{proc.stdout.strip()}'."
301 if proc.stderr.strip():
303 f
"Stderr of job termination: '{proc.stderr.strip()}'."
309 "Termination of the job corresponding to steering file "
310 f
"{job.path} has been requested, but no job id is available."
311 " Can't do anything."
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
tools
Path to the basf2 tools and central/local release.
None _cleanup(self, Script job)
b2setup
The command for b2setup (and b2code-option)
Tuple[bool, int] is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
path
The path, where the help files are being created Maybe there should be a special subfolder for them?
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
str _get_tmp_name(self, Script job)
def adjust_path(self, path)
This method can be used if path names are different on submission and execution hosts.
def terminate(self, Script job)
Terminate a running job.
def __init__(self)
The default constructor.