15from clustercontrolbase
import ClusterBase
16from validationscript
import Script
21 A class that provides the controls for running jobs on a (remote)
22 Sun Grid Engine cluster. It provides two methods:
23 -
is_job_finished(job): Returns
True or False, depending on whether the job
24 has finished execution
25 -
execute(job): Takes a job
and executes it by sending it to the cluster
31 Check if qsub
is available
39 "drmaa library is not installed, please use 'pip3 install "
43 except RuntimeError
as re:
44 print(
"drmaa library not properly configured")
51 Returns name of this job control
53 return "cluster-drmaa"
58 Returns description of this job control
60 return "Batch submission via the drmaa interface to Grid Engine"
64 The default constructor.
65 - Holds the current working directory, which is also the location of
66 the shellscripts that are being sent to the cluster.
67 - Initializes a logger which writes to validate_basf2.py
's log.
68 - Finds the revision of basf2 that will be set up on the cluster.
74 "-l h_vmem={requirement_vmem}G,h_fsize={"
75 "requirement_storage}G "
98 This method can be used if path names are different on submission
100 @param path: The past that needs to be adjusted
101 @return: The adjusted path
109 The cluster should always be available to accept new jobs.
110 @return: Will always
return True if the function can be called
115 def execute(self, job: Script, options=
"", dry=
False, tag=
"current"):
117 Takes a Script object and a string
with options
and runs it on the
118 cluster, either
with ROOT
or with basf2, depending on the file type.
120 @param job: The steering file object that should be executed
121 @param options: Options that will be given to the basf2 command
122 @param dry: Whether to perform a dry run
or not
123 @param tag: The folder within the results directory
131 print(str(drmaa.Session()))
133 with drmaa.Session()
as session:
134 print(
"got session ")
147 f
"Creating job template for wrapper script {shell_script_name}"
149 jt = session.createJobTemplate()
150 jt.remoteCommand = shell_script_name
152 jt.nativeSpecification = native_spec_string
155 jobid = session.runJob(jt)
157 f
"Script {job.name} started with job id {jobid}"
161 session.deleteJobTemplate(jt)
166 Checks whether the '.done'-file has been created
for a job. If so, it
167 returns
True,
else it returns
False.
168 Also deletes the .done-File once it has returned
True.
170 @param job: The job of which we want to know
if it finished
171 @return: (
True if the job has finished, exit code). If we can
't find the
172 exit code in the
'.done'-file, the returncode will be -654.
173 If the job
is not finished, the exit code
is returned
as 0.
180 if job.job_id
is None:
182 "Job has not been started with cluster drmaaa because "
187 with drmaa.Session()
as session:
191 status = session.jobStatus(job.job_id)
192 except drmaa.errors.InvalidJobException:
194 f
"Job info for jobid {job.job_id} cannot be retrieved, assuming job has terminated"
197 (donefile_exists, donefile_returncode) = self.
checkDoneFile(job)
202 return [
True, donefile_returncode]
208 if status == drmaa.JobState.DONE:
211 if status == drmaa.JobState.FAILED:
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
def checkDoneFile(self, job)
Checks whether the '.done'-file has been created for a job.
def prepareSubmission(self, Script job, options, tag)
Setup output folders and create the wrapping shell script.
queuename
Queue best suitable for execution at DESY NAF.
def is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
native_spec
The command to submit a job.
requirement_vmem
required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consump...
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
requirement_storage
the storage IO in GB which can be performed by each job.
def __init__(self)
The default constructor.
def adjust_path(self, str path)
This method can be used if path names are different on submission and execution hosts.