15 from clustercontrolbase
import ClusterBase
16 from validationscript
import Script
21 A class that provides the controls for running jobs on a (remote)
22 Sun Grid Engine cluster. It provides two methods:
23 - is_job_finished(job): Returns True or False, depending on whether the job
24 has finished execution
25 - execute(job): Takes a job and executes it by sending it to the cluster
31 Check if qsub is available
39 "drmaa library is not installed, please ues 'pip3 install "
43 except RuntimeError
as re:
44 print(
"drmaa library not properly configured")
51 Returns name of this job contol
53 return "cluster-drmaa"
58 Returns description of this job control
60 return "Batch submission via the drmaa interface to Grid Engine"
64 The default constructor.
65 - Holds the current working directory, which is also the location of
66 the shellscripts that are being sent to the cluster.
67 - Initializes a logger which writes to validate_basf2.py's log.
68 - Finds the revision of basf2 that will be set up on the cluster.
74 "-l h_vmem={requirement_vmem}G,h_fsize={"
75 "requirement_storage}G "
98 This method can be used if path names are different on submission
100 @param path: The past that needs to be adjusted
101 @return: The adjusted path
109 The cluster should always be available to accept new jobs.
110 @return: Will always return True if the function can be called
115 def execute(self, job: Script, options=
"", dry=
False, tag=
"current"):
117 Takes a Script object and a string with options and runs it on the
118 cluster, either with ROOT or with basf2, depending on the file type.
120 @param job: The steering file object that should be executed
121 @param options: Options that will be given to the basf2 command
122 @param dry: Whether to perform a dry run or not
123 @param tag: The folder within the results directory
131 print(str(drmaa.Session()))
133 with drmaa.Session()
as session:
134 print(
"got session ")
141 native_spec_string = self.
native_specnative_spec.format(
147 f
"Creating job template for wrapper script {shell_script_name}"
149 jt = session.createJobTemplate()
150 jt.remoteCommand = shell_script_name
152 jt.nativeSpecification = native_spec_string
155 jobid = session.runJob(jt)
157 f
"Script {job.name} started with job id {jobid}"
161 session.deleteJobTemplate(jt)
166 Checks whether the '.done'-file has been created for a job. If so, it
167 returns True, else it returns False.
168 Also deletes the .done-File once it has returned True.
170 @param job: The job of which we want to know if it finished
171 @return: (True if the job has finished, exit code). If we can't find the
172 exit code in the '.done'-file, the returncode will be -654.
173 If the job is not finished, the exit code is returned as 0.
180 if job.job_id
is None:
182 "Job has not been started with cluster drmaaa because "
187 with drmaa.Session()
as session:
191 status = session.jobStatus(job.job_id)
192 except drmaa.errors.InvalidJobException:
194 "Job info for jobid {} cannot be retrieved, assuming "
195 "job has terminated".format(job.job_id)
198 (donefile_exists, donefile_returncode) = self.
checkDoneFilecheckDoneFile(job)
203 return [
True, donefile_returncode]
209 if status == drmaa.JobState.DONE:
212 if status == drmaa.JobState.FAILED:
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
def checkDoneFile(self, job)
Checks whether the '.done'-file has been created for a job.
def prepareSubmission(self, Script job, options, tag)
Setup output folders and create the wrapping shell script.
queuename
Queue best suitable for execution at DESY NAF.
def available(self)
The cluster should always be available to accept new jobs.
def is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
native_spec
The command to submit a job.
requirement_vmem
required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consump...
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
requirement_storage
the storage IO in GB which can be performed by each job.
def __init__(self)
The default constructor.
def adjust_path(self, str path)
This method can be used if path names are different on submission and execution hosts.