Belle II Software  release-08-01-10
clustercontroldrmaa.py
1 #!/usr/bin/env python3
2 
3 
10 
11 # std
12 import sys
13 
14 # ours
15 from clustercontrolbase import ClusterBase
16 from validationscript import Script
17 
18 
20  """
21  A class that provides the controls for running jobs on a (remote)
22  Sun Grid Engine cluster. It provides two methods:
23  - is_job_finished(job): Returns True or False, depending on whether the job
24  has finished execution
25  - execute(job): Takes a job and executes it by sending it to the cluster
26  """
27 
28  @staticmethod
29  def is_supported():
30  """
31  Check if qsub is available
32  """
33  try:
34  import drmaa # noqa
35 
36  return True
37  except ImportError:
38  print(
39  "drmaa library is not installed, please ues 'pip3 install "
40  "drmaa'"
41  )
42  return False
43  except RuntimeError as re:
44  print("drmaa library not properly configured")
45  print(str(re))
46  return False
47 
48  @staticmethod
49  def name():
50  """
51  Returns name of this job contol
52  """
53  return "cluster-drmaa"
54 
55  @staticmethod
56  def description():
57  """
58  Returns description of this job control
59  """
60  return "Batch submission via the drmaa interface to Grid Engine"
61 
62  def __init__(self):
63  """!
64  The default constructor.
65  - Holds the current working directory, which is also the location of
66  the shellscripts that are being sent to the cluster.
67  - Initializes a logger which writes to validate_basf2.py's log.
68  - Finds the revision of basf2 that will be set up on the cluster.
69  """
70 
71 
73  self.native_specnative_spec = (
74  "-l h_vmem={requirement_vmem}G,h_fsize={"
75  "requirement_storage}G "
76  "-q {queuename} -V"
77  )
78 
79 
81  self.requirement_vmemrequirement_vmem = 4
82 
83 
86  self.requirement_storagerequirement_storage = 50
87 
88 
89  self.queuenamequeuename = "short.q"
90 
91  # call the base constructor, which will setup the batch cluster
92  # common stuff
93  super().__init__()
94 
95  # noinspection PyMethodMayBeStatic
96  def adjust_path(self, path: str):
97  """!
98  This method can be used if path names are different on submission
99  and execution hosts.
100  @param path: The past that needs to be adjusted
101  @return: The adjusted path
102  """
103 
104  return path
105 
106  # noinspection PyMethodMayBeStatic
107  def available(self):
108  """!
109  The cluster should always be available to accept new jobs.
110  @return: Will always return True if the function can be called
111  """
112 
113  return True
114 
115  def execute(self, job: Script, options="", dry=False, tag="current"):
116  """!
117  Takes a Script object and a string with options and runs it on the
118  cluster, either with ROOT or with basf2, depending on the file type.
119 
120  @param job: The steering file object that should be executed
121  @param options: Options that will be given to the basf2 command
122  @param dry: Whether to perform a dry run or not
123  @param tag: The folder within the results directory
124  @return: None
125  """
126 
127  # import here first so the whole module can also be imported on python
128  # installations which have no drmaa at all
129  import drmaa
130 
131  print(str(drmaa.Session()))
132 
133  with drmaa.Session() as session:
134  print("got session ")
135  print(str(session))
136 
137  shell_script_name = self.prepareSubmissionprepareSubmission(job, options, tag)
138 
139  # native specification with all the good settings for the batch
140  # server
141  native_spec_string = self.native_specnative_spec.format(
142  requirement_storage=self.requirement_storagerequirement_storage,
143  requirement_vmem=self.requirement_vmemrequirement_vmem,
144  queuename=self.queuenamequeuename,
145  )
146  print(
147  f"Creating job template for wrapper script {shell_script_name}"
148  )
149  jt = session.createJobTemplate()
150  jt.remoteCommand = shell_script_name
151  jt.joinFiles = True
152  jt.nativeSpecification = native_spec_string
153 
154  if not dry:
155  jobid = session.runJob(jt)
156  self.loggerlogger.debug(
157  f"Script {job.name} started with job id {jobid}"
158  )
159  job.job_id = jobid
160 
161  session.deleteJobTemplate(jt)
162  return
163 
164  def is_job_finished(self, job: Script):
165  """!
166  Checks whether the '.done'-file has been created for a job. If so, it
167  returns True, else it returns False.
168  Also deletes the .done-File once it has returned True.
169 
170  @param job: The job of which we want to know if it finished
171  @return: (True if the job has finished, exit code). If we can't find the
172  exit code in the '.done'-file, the returncode will be -654.
173  If the job is not finished, the exit code is returned as 0.
174  """
175 
176  # import here first so the whole module can also be imported on python
177  # installations which have no drmaa at all
178  import drmaa
179 
180  if job.job_id is None:
181  print(
182  "Job has not been started with cluster drmaaa because "
183  "job id is missing"
184  )
185  sys.exit(0)
186 
187  with drmaa.Session() as session:
188 
189  # some batch server will forget completed jobs right away
190  try:
191  status = session.jobStatus(job.job_id)
192  except drmaa.errors.InvalidJobException:
193  print(
194  "Job info for jobid {} cannot be retrieved, assuming "
195  "job has terminated".format(job.job_id)
196  )
197 
198  (donefile_exists, donefile_returncode) = self.checkDoneFilecheckDoneFile(job)
199 
200  # always return the job es complete even if there is no done
201  # file at this ponint tho job is also not longer
202  # running/queued on the cluster
203  return [True, donefile_returncode]
204 
205  # Return that the job is finished + the return code for it
206  # depending when we look for the job this migh never be used,
207  # because the jobs disappear from qstat before we can query them
208  # ..
209  if status == drmaa.JobState.DONE:
210  # todo: return code
211  return [True, 0]
212  if status == drmaa.JobState.FAILED:
213  return [True, 1]
214 
215  return [False, 0]
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
def checkDoneFile(self, job)
Checks whether the '.done'-file has been created for a job.
def prepareSubmission(self, Script job, options, tag)
Setup output folders and create the wrapping shell script.
queuename
Queue best suitable for execution at DESY NAF.
def available(self)
The cluster should always be available to accept new jobs.
def is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
native_spec
The command to submit a job.
requirement_vmem
required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consump...
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
requirement_storage
the storage IO in GB which can be performed by each job.
def __init__(self)
The default constructor.
def adjust_path(self, str path)
This method can be used if path names are different on submission and execution hosts.