Belle II Software development
clustercontroldrmaa.py
1#!/usr/bin/env python3
2
3
10
11# std
12import sys
13
14# ours
15from clustercontrolbase import ClusterBase
16from validationscript import Script
17
18
20 """
21 A class that provides the controls for running jobs on a (remote)
22 Sun Grid Engine cluster. It provides two methods:
23 - is_job_finished(job): Returns True or False, depending on whether the job
24 has finished execution
25 - execute(job): Takes a job and executes it by sending it to the cluster
26 """
27
28 @staticmethod
30 """
31 Check if qsub is available
32 """
33 try:
34 import drmaa # noqa
35
36 return True
37 except ImportError:
38 print(
39 "drmaa library is not installed, please use 'pip3 install "
40 "drmaa'"
41 )
42 return False
43 except RuntimeError as re:
44 print("drmaa library not properly configured")
45 print(str(re))
46 return False
47
48 @staticmethod
49 def name():
50 """
51 Returns name of this job control
52 """
53 return "cluster-drmaa"
54
55 @staticmethod
57 """
58 Returns description of this job control
59 """
60 return "Batch submission via the drmaa interface to Grid Engine"
61
62 def __init__(self):
63 """!
64 The default constructor.
65 - Holds the current working directory, which is also the location of
66 the shellscripts that are being sent to the cluster.
67 - Initializes a logger which writes to validate_basf2.py's log.
68 - Finds the revision of basf2 that will be set up on the cluster.
69 """
70
71
73 self.native_spec = (
74 "-l h_vmem={requirement_vmem}G,h_fsize={"
75 "requirement_storage}G "
76 "-q {queuename} -V"
77 )
78
79
82
83
87
88
89 self.queuename = "short.q"
90
91 # call the base constructor, which will setup the batch cluster
92 # common stuff
93 super().__init__()
94
95 # noinspection PyMethodMayBeStatic
96 def adjust_path(self, path: str):
97 """!
98 This method can be used if path names are different on submission
99 and execution hosts.
100 @param path: The past that needs to be adjusted
101 @return: The adjusted path
102 """
103
104 return path
105
106 # noinspection PyMethodMayBeStatic
107 def available(self):
108 """!
109 The cluster should always be available to accept new jobs.
110 @return: Will always return True if the function can be called
111 """
112
113 return True
114
115 def execute(self, job: Script, options="", dry=False, tag="current"):
116 """!
117 Takes a Script object and a string with options and runs it on the
118 cluster, either with ROOT or with basf2, depending on the file type.
119
120 @param job: The steering file object that should be executed
121 @param options: Options that will be given to the basf2 command
122 @param dry: Whether to perform a dry run or not
123 @param tag: The folder within the results directory
124 @return: None
125 """
126
127 # import here first so the whole module can also be imported on python
128 # installations which have no drmaa at all
129 import drmaa
130
131 print(str(drmaa.Session()))
132
133 with drmaa.Session() as session:
134 print("got session ")
135 print(str(session))
136
137 shell_script_name = self.prepareSubmission(job, options, tag)
138
139 # native specification with all the good settings for the batch
140 # server
141 native_spec_string = self.native_spec.format(
142 requirement_storage=self.requirement_storage,
143 requirement_vmem=self.requirement_vmem,
144 queuename=self.queuename,
145 )
146 print(
147 f"Creating job template for wrapper script {shell_script_name}"
148 )
149 jt = session.createJobTemplate()
150 jt.remoteCommand = shell_script_name
151 jt.joinFiles = True
152 jt.nativeSpecification = native_spec_string
153
154 if not dry:
155 jobid = session.runJob(jt)
156 self.logger.debug(
157 f"Script {job.name} started with job id {jobid}"
158 )
159 job.job_id = jobid
160
161 session.deleteJobTemplate(jt)
162 return
163
164 def is_job_finished(self, job: Script):
165 """!
166 Checks whether the '.done'-file has been created for a job. If so, it
167 returns True, else it returns False.
168 Also deletes the .done-File once it has returned True.
169
170 @param job: The job of which we want to know if it finished
171 @return: (True if the job has finished, exit code). If we can't find the
172 exit code in the '.done'-file, the returncode will be -654.
173 If the job is not finished, the exit code is returned as 0.
174 """
175
176 # import here first so the whole module can also be imported on python
177 # installations which have no drmaa at all
178 import drmaa
179
180 if job.job_id is None:
181 print(
182 "Job has not been started with cluster drmaaa because "
183 "job id is missing"
184 )
185 sys.exit(0)
186
187 with drmaa.Session() as session:
188
189 # some batch server will forget completed jobs right away
190 try:
191 status = session.jobStatus(job.job_id)
192 except drmaa.errors.InvalidJobException:
193 print(
194 f"Job info for jobid {job.job_id} cannot be retrieved, assuming job has terminated"
195 )
196
197 (donefile_exists, donefile_returncode) = self.checkDoneFile(job)
198
199 # always return the job es complete even if there is no done
200 # file at this point tho job is also not longer
201 # running/queued on the cluster
202 return [True, donefile_returncode]
203
204 # Return that the job is finished + the return code for it
205 # depending when we look for the job this might never be used,
206 # because the jobs disappear from qstat before we can query them
207 # ..
208 if status == drmaa.JobState.DONE:
209 # todo: return code
210 return [True, 0]
211 if status == drmaa.JobState.FAILED:
212 return [True, 1]
213
214 return [False, 0]
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
def checkDoneFile(self, job)
Checks whether the '.done'-file has been created for a job.
def prepareSubmission(self, Script job, options, tag)
Setup output folders and create the wrapping shell script.
queuename
Queue best suitable for execution at DESY NAF.
def is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
native_spec
The command to submit a job.
requirement_vmem
required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consump...
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
requirement_storage
the storage IO in GB which can be performed by each job.
def __init__(self)
The default constructor.
def adjust_path(self, str path)
This method can be used if path names are different on submission and execution hosts.