Belle II Software  release-05-01-25
clustercontroldrmaa.py
1 #!/usr/bin/env python3
2 
3 # std
4 import sys
5 
6 # ours
7 from clustercontrolbase import ClusterBase
8 from validationscript import Script
9 
10 
12  """
13  A class that provides the controls for running jobs on a (remote)
14  Sun Grid Engine cluster. It provides two methods:
15  - is_job_finished(job): Returns True or False, depending on whether the job
16  has finished execution
17  - execute(job): Takes a job and executes it by sending it to the cluster
18  """
19 
20  @staticmethod
21  def is_supported():
22  """
23  Check if qsub is available
24  """
25  try:
26  import drmaa
27  return True
28  except ImportError:
29  print("drmaa library is not installed, please ues 'pip3 install "
30  "drmaa'")
31  return False
32  except RuntimeError as re:
33  print("drmaa library not properly configured")
34  print(str(re))
35  return False
36 
37  @staticmethod
38  def name():
39  """
40  Returns name of this job contol
41  """
42  return "cluster-drmaa"
43 
44  @staticmethod
45  def description():
46  """
47  Returns description of this job control
48  """
49  return "Batch submission via the drmaa interface to Grid Engine"
50 
51  def __init__(self):
52  """!
53  The default constructor.
54  - Holds the current working directory, which is also the location of
55  the shellscripts that are being sent to the cluster.
56  - Initializes a logger which writes to validate_basf2.py's log.
57  - Finds the revision of basf2 that will be set up on the cluster.
58  """
59 
60 
62  self.native_spec = ('-l h_vmem={requirement_vmem}G,h_fsize={'
63  'requirement_storage}G '
64  '-q {queuename} -V')
65 
66 
69 
70 
74 
75 
76  self.queuename = "short.q"
77 
78  # call the base constructor, which will setup the batch cluster
79  # common stuff
80  super().__init__()
81 
82  # noinspection PyMethodMayBeStatic
83  def adjust_path(self, path: str):
84  """!
85  This method can be used if path names are different on submission
86  and execution hosts.
87  @param path: The past that needs to be adjusted
88  @return: The adjusted path
89  """
90 
91  return path
92 
93  # noinspection PyMethodMayBeStatic
94  def available(self):
95  """!
96  The cluster should always be available to accept new jobs.
97  @return: Will always return True if the function can be called
98  """
99 
100  return True
101 
102  def execute(self, job: Script, options='', dry=False, tag='current'):
103  """!
104  Takes a Script object and a string with options and runs it on the
105  cluster, either with ROOT or with basf2, depending on the file type.
106 
107  @param job: The steering file object that should be executed
108  @param options: Options that will be given to the basf2 command
109  @param dry: Whether to perform a dry run or not
110  @param tag: The folder within the results directory
111  @return: None
112  """
113 
114  # import here first so the whole module can also be imported on python
115  # installations which have no drmaa at all
116  import drmaa
117 
118  ss = drmaa.Session()
119  print(str(drmaa.Session))
120 
121  with drmaa.Session() as session:
122  print("got session ")
123  print(str(session))
124 
125  shell_script_name = self.prepareSubmission(job, options, tag)
126 
127  # native specification with all the good settings for the batch
128  # server
129  native_spec_string = self.native_spec.format(
130  requirement_storage=self.requirement_storage,
131  requirement_vmem=self.requirement_vmem,
132  queuename=self.queuename
133  )
134  print(
135  f'Creating job template for wrapper script {shell_script_name}'
136  )
137  jt = session.createJobTemplate()
138  jt.remoteCommand = shell_script_name
139  jt.joinFiles = True
140  jt.nativeSpecification = native_spec_string
141 
142  if not dry:
143  jobid = session.runJob(jt)
144  self.logger.debug(
145  f"Script {job.name} started with job id {jobid}"
146  )
147  job.job_id = jobid
148 
149  session.deleteJobTemplate(jt)
150  return
151 
152  def is_job_finished(self, job: Script):
153  """!
154  Checks whether the '.done'-file has been created for a job. If so, it
155  returns True, else it returns False.
156  Also deletes the .done-File once it has returned True.
157 
158  @param job: The job of which we want to know if it finished
159  @return: (True if the job has finished, exit code). If we can't find the
160  exit code in the '.done'-file, the returncode will be -666.
161  If the job is not finished, the exit code is returned as 0.
162  """
163 
164  # import here first so the whole module can also be imported on python
165  # installations which have no drmaa at all
166  import drmaa
167 
168  if job.job_id is None:
169  print("Job has not been started with cluster drmaaa because "
170  "job id is missing")
171  sys.exit(0)
172 
173  with drmaa.Session() as session:
174 
175  # some batch server will forget completed jobs right away
176  try:
177  status = session.jobStatus(job.job_id)
178  except drmaa.errors.InvalidJobException:
179  print("Job info for jobid {} cannot be retrieved, assuming "
180  "job has terminated".format(job.job_id))
181 
182  (donefile_exists, donefile_returncode) = \
183  self.checkDoneFile(job)
184 
185  # always return the job es complete even if there is no done
186  # file at this ponint tho job is also not longer
187  # running/queued on the cluster
188  return [True, donefile_returncode]
189 
190  # Return that the job is finished + the return code for it
191  # depending when we look for the job this migh never be used,
192  # because the jobs disappear from qstat before we can query them
193  # ..
194  if status == drmaa.JobState.DONE:
195  # todo: return code
196  return [True, 0]
197  if status == drmaa.JobState.FAILED:
198  return [True, 1]
199 
200  return [False, 0]
clustercontrolbase.ClusterBase.logger
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
Definition: clustercontrolbase.py:32
clustercontroldrmaa.Cluster.__init__
def __init__(self)
The default constructor.
Definition: clustercontroldrmaa.py:51
clustercontroldrmaa.Cluster.requirement_vmem
requirement_vmem
required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consump...
Definition: clustercontroldrmaa.py:68
clustercontrolbase.ClusterBase
Definition: clustercontrolbase.py:14
clustercontroldrmaa.Cluster.is_job_finished
def is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
Definition: clustercontroldrmaa.py:152
clustercontroldrmaa.Cluster.name
def name()
Definition: clustercontroldrmaa.py:38
clustercontrolbase.ClusterBase.checkDoneFile
def checkDoneFile(self, job)
Checks whether the '.done'-file has been created for a job.
Definition: clustercontrolbase.py:127
clustercontroldrmaa.Cluster.adjust_path
def adjust_path(self, str path)
This method can be used if path names are different on submission and execution hosts.
Definition: clustercontroldrmaa.py:83
clustercontrolbase.ClusterBase.prepareSubmission
def prepareSubmission(self, Script job, options, tag)
Setup output folders and create the wrapping shell script.
Definition: clustercontrolbase.py:71
clustercontroldrmaa.Cluster.execute
def execute(self, Script job, options='', dry=False, tag='current')
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
Definition: clustercontroldrmaa.py:102
clustercontroldrmaa.Cluster.native_spec
native_spec
The command to submit a job.
Definition: clustercontroldrmaa.py:62
clustercontroldrmaa.Cluster.description
def description()
Definition: clustercontroldrmaa.py:45
clustercontroldrmaa.Cluster.requirement_storage
requirement_storage
the storage IO in GB which can be performed by each job.
Definition: clustercontroldrmaa.py:73
clustercontroldrmaa.Cluster.available
def available(self)
The cluster should always be available to accept new jobs.
Definition: clustercontroldrmaa.py:94
clustercontroldrmaa.Cluster
Definition: clustercontroldrmaa.py:11
clustercontroldrmaa.Cluster.queuename
queuename
Queue best suitable for execution at DESY NAF.
Definition: clustercontroldrmaa.py:76
clustercontroldrmaa.Cluster.is_supported
def is_supported()
Definition: clustercontroldrmaa.py:21