Belle II Software  release-05-01-25
clustercontrolsge.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 # std
5 import logging
6 import os
7 import subprocess
8 import stat
9 import shutil
10 
11 # ours
12 import validationfunctions
13 from validationscript import Script
14 
15 
16 class Cluster:
17  """
18  A class that provides the controls for running jobs on a (remote)
19  Sun Grid Engine cluster. It provides two methods:
20  - is_job_finished(job): Returns True or False, depending on whether the job
21  has finished execution
22  - execute(job): Takes a job and executes it by sending it to the cluster
23  """
24 
25  @staticmethod
26  def is_supported():
27  """
28  Check if qsub is available
29  """
30  return shutil.which("qsub") is not None
31 
32  @staticmethod
33  def name():
34  """
35  Returns name of this job contol
36  """
37  return "cluster-sge"
38 
39  @staticmethod
40  def description():
41  """
42  Returns description of this job control
43  """
44  return "Batch submission via command line to Grid Engine"
45 
46  def __init__(self):
47  """!
48  The default constructor.
49  - Holds the current working directory, which is also the location of
50  the shellscripts that are being sent to the cluster.
51  - Initializes a logger which writes to validate_basf2.py's log.
52  - Finds the revision of basf2 that will be set up on the cluster.
53  """
54 
55 
57  self.submit_command = ('qsub -cwd -l h_vmem={requirement_vmem}G,'
58  'h_fsize={requirement_storage}G '
59  '-oo {logfile} -q {queuename} -V')
60 
61 
64 
65 
69 
70 
71  self.queuename = "short.q"
72 
73 
75  self.path = os.getcwd()
76 
77 
81  self.logger = logging.getLogger('validate_basf2')
82 
83 
86 
87 
88  self.tools = self.adjust_path(os.environ['BELLE2_TOOLS'])
89  belle2_release_dir = os.environ.get('BELLE2_RELEASE_DIR', None)
90  belle2_local_dir = os.environ.get('BELLE2_LOCAL_DIR', None)
91 
92 
93  self.b2setup = 'b2setup'
94  if belle2_release_dir is not None:
95  self.b2setup += ' ' + belle2_release_dir.split('/')[-1]
96  if belle2_local_dir is not None:
97  self.b2setup = 'MY_BELLE2_DIR=' + \
98  self.adjust_path(belle2_local_dir) + ' ' + self.b2setup
99  if os.environ.get('BELLE2_OPTION') != 'debug':
100  self.b2setup += '; b2code-option ' + \
101  os.environ.get('BELLE2_OPTION')
102 
103  # Write to log which revision we are using
104  self.logger.debug(f'Setting up the following release: {self.b2setup}')
105 
106  # Define the folder in which the log of the cluster messages will be
107  # stored (same folder like the log for validate_basf2.py)
108  clusterlog_dir = './html/logs/__general__/'
109  if not os.path.exists(clusterlog_dir):
110  os.makedirs(clusterlog_dir)
111 
112 
113  self.clusterlog = open(clusterlog_dir + 'clusterlog.log', 'w+')
114 
115  # noinspection PyMethodMayBeStatic
116  def adjust_path(self, path: str):
117  """!
118  This method can be used if path names are different on submission
119  and execution hosts.
120  @param path: The past that needs to be adjusted
121  @return: The adjusted path
122  """
123 
124  return path
125 
126  # noinspection PyMethodMayBeStatic
127  def available(self):
128  """!
129  The cluster should always be available to accept new jobs.
130  @return: Will always return True if the function can be called
131  """
132 
133  return True
134 
135  def execute(self, job: Script, options='', dry=False, tag='current'):
136  """!
137  Takes a Script object and a string with options and runs it on the
138  cluster, either with ROOT or with basf2, depending on the file type.
139 
140  @param job: The steering file object that should be executed
141  @param options: Options that will be given to the basf2 command
142  @param dry: Whether to perform a dry run or not
143  @param tag: The folder within the results directory
144  @return: None
145  """
146 
147  # Define the folder in which the results (= the ROOT files) should be
148  # created. This is where the files containing plots will end up. By
149  # convention, data files will be stored in the parent dir.
150  # Then make sure the folder exists (create if it does not exist) and
151  # change to cwd to this folder.
152  output_dir = os.path.abspath(f'./results/{tag}/{job.package}')
153  if not os.path.exists(output_dir):
154  os.makedirs(output_dir)
155 
156  # Path where log file is supposed to be created
157  log_file = output_dir + '/' + os.path.basename(job.path) + '.log'
158 
159  # Remove any left over done files
160  donefile_path = f"{self.path}/script_{job.name}.done"
161  if os.path.isfile(donefile_path):
162  os.remove(donefile_path)
163 
164  # Now we need to distinguish between .py and .C files:
165  extension = os.path.splitext(job.path)[1]
166  if extension == '.C':
167  # .c files are executed with root
168  command = 'root -b -q ' + job.path
169  else:
170  # .py files are executed with basf2
171  # 'options' contains an option-string for basf2, e.g. '-n 100'
173  job.path, options.split()
174  )
175  command = subprocess.list2cmdline(params)
176 
177  # Create a helpfile-shellscript, which contains all the commands that
178  # need to be executed by the cluster.
179  # First, set up the basf2 tools and perform b2setup with the correct
180  # revision. The execute the command (i.e. run basf2 or ROOT on a
181  # steering file). Write the return code of that into a *.done file.
182  # Delete the helpfile-shellscript.
183  tmp_name = self.path + '/' + 'script_' + job.name + '.sh'
184  with open(tmp_name, 'w+') as tmp_file:
185  tmp_file.write('#!/bin/bash \n\n' +
186  'BELLE2_NO_TOOLS_CHECK=1 \n' +
187  'source {0}/b2setup \n'.format(self.tools) +
188  'cd {0} \n'.format(self.adjust_path(output_dir)) +
189  '{0} \n'.format(command) +
190  'echo $? > {0}/script_{1}.done \n'
191  .format(self.path, job.name) +
192  'rm {0} \n'.format(tmp_name))
193 
194  # Make the helpfile-shellscript executable
195  st = os.stat(tmp_name)
196  os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
197 
198  # Prepare the command line command for submission to the cluster
199  params = self.submit_command.format(
200  queuename=self.queuename,
201  requirement_storage=self.requirement_storage,
202  requirement_vmem=self.requirement_vmem,
203  logfile=log_file
204  ).split() + [tmp_name]
205 
206  # Log the command we are about the execute
207  self.logger.debug(subprocess.list2cmdline(params))
208 
209  # Submit it to the cluster. The steering
210  # file output will be written to 'log_file' (see above).
211  # If we are performing a dry run, don't send anything to the cluster
212  # and just create the *.done file right away and delete the *.sh file.
213  if not dry:
214  process = subprocess.Popen(params, stdout=self.clusterlog,
215  stderr=subprocess.STDOUT)
216 
217  # Check whether the submission succeeded
218  if process.wait() != 0:
219  job.status = 'failed'
220  else:
221  os.system(f'echo 0 > {self.path}/script_{job.name}.done')
222  os.unlink(tmp_name)
223 
224  def is_job_finished(self, job: Script):
225  """!
226  Checks whether the '.done'-file has been created for a job. If so, it
227  returns True, else it returns False.
228  Also deletes the .done-File once it has returned True.
229 
230  @param job: The job of which we want to know if it finished
231  @return: True if the job has finished, otherwise False
232  """
233 
234  # If there is a file indicating the job is done, that is its name:
235  donefile_path = f"{self.path}/script_{job.name}.done"
236 
237  # Check if such a file exists. If so, this means that the job has
238  # finished.
239  if os.path.isfile(donefile_path):
240 
241  # Read the returncode/exit_status for the job from the *.done-file
242  with open(donefile_path) as f:
243  try:
244  returncode = int(f.read().strip())
245  except ValueError:
246  returncode = -666
247 
248  # Delete the *.done file
249  os.remove(donefile_path)
250 
251  # Return that the job is finished + the return code for it
252  return [True, returncode]
253 
254  # If no such file exists, the job has not yet finished
255  else:
256  return [False, 0]
257 
258  # noinspection PyMethodMayBeStatic
259  def terminate(self, job: Script):
260  """!
261  Terminate a running job, not support with this backend so ignore the
262  call.
263  """
264  self.logger.error("Script termination not supported.")
clustercontrolsge.Cluster.requirement_storage
requirement_storage
the storage IO in GB which can be performed by each job.
Definition: clustercontrolsge.py:68
clustercontrolsge.Cluster.is_job_finished
def is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
Definition: clustercontrolsge.py:224
clustercontrolsge.Cluster.__init__
def __init__(self)
The default constructor.
Definition: clustercontrolsge.py:46
clustercontrolsge.Cluster.terminate
def terminate(self, Script job)
Terminate a running job, not support with this backend so ignore the call.
Definition: clustercontrolsge.py:259
clustercontrolsge.Cluster.b2setup
b2setup
The command for b2setup (and setoption)
Definition: clustercontrolsge.py:93
clustercontrolsge.Cluster.submit_command
submit_command
The command to submit a job.
Definition: clustercontrolsge.py:57
clustercontrolsge.Cluster.is_supported
def is_supported()
Definition: clustercontrolsge.py:26
clustercontrolsge.Cluster
Definition: clustercontrolsge.py:16
clustercontrolsge.Cluster.adjust_path
def adjust_path(self, str path)
This method can be used if path names are different on submission and execution hosts.
Definition: clustercontrolsge.py:116
validationfunctions.basf2_command_builder
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)
Definition: validationfunctions.py:63
clustercontrolsge.Cluster.execute
def execute(self, Script job, options='', dry=False, tag='current')
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
Definition: clustercontrolsge.py:135
clustercontrolsge.Cluster.available
def available(self)
The cluster should always be available to accept new jobs.
Definition: clustercontrolsge.py:127
clustercontrolsge.Cluster.description
def description()
Definition: clustercontrolsge.py:40
clustercontrolsge.Cluster.clusterlog
clusterlog
The file object to which all cluster messages will be written.
Definition: clustercontrolsge.py:113
clustercontrolsge.Cluster.name
def name()
Definition: clustercontrolsge.py:33
clustercontrolsge.Cluster.logger
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
Definition: clustercontrolsge.py:81
clustercontrolsge.Cluster.requirement_vmem
requirement_vmem
required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consump...
Definition: clustercontrolsge.py:63
clustercontrolsge.Cluster.queuename
queuename
Queue best suitable for execution at DESY NAF.
Definition: clustercontrolsge.py:71
clustercontrolsge.Cluster.tools
tools
We need to set up the same environment on the cluster like on the local machine.
Definition: clustercontrolsge.py:88
clustercontrolsge.Cluster.path
path
The path, where the help files are being created Maybe there should be a special subfolder for them?
Definition: clustercontrolsge.py:75