Belle II Software  release-08-01-10
clustercontrolsge.py
1 #!/usr/bin/env python3
2 
3 
10 
11 # std
12 import logging
13 import os
14 import subprocess
15 import stat
16 import shutil
17 
18 # ours
19 import validationfunctions
20 from validationscript import Script
21 
22 
23 class Cluster:
24  """
25  A class that provides the controls for running jobs on a (remote)
26  Sun Grid Engine cluster. It provides two methods:
27  - is_job_finished(job): Returns True or False, depending on whether the job
28  has finished execution
29  - execute(job): Takes a job and executes it by sending it to the cluster
30  """
31 
32  @staticmethod
33  def is_supported():
34  """
35  Check if qsub is available
36  """
37  return shutil.which("qsub") is not None
38 
39  @staticmethod
40  def name():
41  """
42  Returns name of this job contol
43  """
44  return "cluster-sge"
45 
46  @staticmethod
47  def description():
48  """
49  Returns description of this job control
50  """
51  return "Batch submission via command line to Grid Engine"
52 
53  def __init__(self):
54  """!
55  The default constructor.
56  - Holds the current working directory, which is also the location of
57  the shellscripts that are being sent to the cluster.
58  - Initializes a logger which writes to validate_basf2.py's log.
59  - Finds the revision of basf2 that will be set up on the cluster.
60  """
61 
62 
64  self.submit_commandsubmit_command = (
65  "qsub -cwd -l h_vmem={requirement_vmem}G,"
66  "h_fsize={requirement_storage}G "
67  "-oo {logfile} -q {queuename} -V"
68  )
69 
70 
72  self.requirement_vmemrequirement_vmem = 4
73 
74 
77  self.requirement_storagerequirement_storage = 50
78 
79 
80  self.queuenamequeuename = "short.q"
81 
82 
84  self.pathpath = os.getcwd()
85 
86 
90  self.loggerlogger = logging.getLogger("validate_basf2")
91 
92 
95 
96 
97  self.toolstools = self.adjust_pathadjust_path(os.environ["BELLE2_TOOLS"])
98  belle2_release_dir = os.environ.get("BELLE2_RELEASE_DIR", None)
99  belle2_local_dir = os.environ.get("BELLE2_LOCAL_DIR", None)
100 
101 
102  self.b2setupb2setup = "b2setup"
103  if belle2_release_dir is not None:
104  self.b2setupb2setup += " " + belle2_release_dir.split("/")[-1]
105  if belle2_local_dir is not None:
106  self.b2setupb2setup = (
107  "MY_BELLE2_DIR="
108  + self.adjust_pathadjust_path(belle2_local_dir)
109  + " "
110  + self.b2setupb2setup
111  )
112  if os.environ.get("BELLE2_OPTION") != "debug":
113  self.b2setupb2setup += "; b2code-option " + os.environ.get("BELLE2_OPTION")
114 
115  # Write to log which revision we are using
116  self.loggerlogger.debug(f"Setting up the following release: {self.b2setup}")
117 
118  # Define the folder in which the log of the cluster messages will be
119  # stored (same folder like the log for validate_basf2.py)
120  clusterlog_dir = "./html/logs/__general__/"
121  if not os.path.exists(clusterlog_dir):
122  os.makedirs(clusterlog_dir)
123 
124 
125  self.clusterlogclusterlog = open(clusterlog_dir + "clusterlog.log", "w+")
126 
127  # noinspection PyMethodMayBeStatic
128  def adjust_path(self, path: str):
129  """!
130  This method can be used if path names are different on submission
131  and execution hosts.
132  @param path: The past that needs to be adjusted
133  @return: The adjusted path
134  """
135 
136  return path
137 
138  # noinspection PyMethodMayBeStatic
139  def available(self):
140  """!
141  The cluster should always be available to accept new jobs.
142  @return: Will always return True if the function can be called
143  """
144 
145  return True
146 
147  def execute(self, job: Script, options="", dry=False, tag="current"):
148  """!
149  Takes a Script object and a string with options and runs it on the
150  cluster, either with ROOT or with basf2, depending on the file type.
151 
152  @param job: The steering file object that should be executed
153  @param options: Options that will be given to the basf2 command
154  @param dry: Whether to perform a dry run or not
155  @param tag: The folder within the results directory
156  @return: None
157  """
158 
159  # Define the folder in which the results (= the ROOT files) should be
160  # created. This is where the files containing plots will end up. By
161  # convention, data files will be stored in the parent dir.
162  # Then make sure the folder exists (create if it does not exist) and
163  # change to cwd to this folder.
164  output_dir = os.path.abspath(f"./results/{tag}/{job.package}")
165  if not os.path.exists(output_dir):
166  os.makedirs(output_dir)
167 
168  # Path where log file is supposed to be created
169  log_file = output_dir + "/" + os.path.basename(job.path) + ".log"
170 
171  # Remove any left over done files
172  donefile_path = f"{self.path}/script_{job.name}.done"
173  if os.path.isfile(donefile_path):
174  os.remove(donefile_path)
175 
176  # Now we need to distinguish between .py and .C files:
177  extension = os.path.splitext(job.path)[1]
178  if extension == ".C":
179  # .c files are executed with root
180  command = "root -b -q " + job.path
181  else:
182  # .py files are executed with basf2
183  # 'options' contains an option-string for basf2, e.g. '-n 100'
185  job.path, options.split()
186  )
187  command = subprocess.list2cmdline(params)
188 
189  # Create a helpfile-shellscript, which contains all the commands that
190  # need to be executed by the cluster.
191  # First, set up the basf2 tools and perform b2setup with the correct
192  # revision. The execute the command (i.e. run basf2 or ROOT on a
193  # steering file). Write the return code of that into a *.done file.
194  # Delete the helpfile-shellscript.
195  tmp_name = self.pathpath + "/" + "script_" + job.name + ".sh"
196  with open(tmp_name, "w+") as tmp_file:
197  tmp_file.write(
198  "#!/bin/bash \n\n"
199  + "BELLE2_NO_TOOLS_CHECK=1 \n"
200  + f"source {self.tools}/b2setup \n"
201  + "cd {} \n".format(self.adjust_pathadjust_path(output_dir))
202  + f"{command} \n"
203  + "echo $? > {}/script_{}.done \n".format(self.pathpath, job.name)
204  + f"rm {tmp_name} \n"
205  )
206 
207  # Make the helpfile-shellscript executable
208  st = os.stat(tmp_name)
209  os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
210 
211  # Prepare the command line command for submission to the cluster
212  params = self.submit_commandsubmit_command.format(
213  queuename=self.queuenamequeuename,
214  requirement_storage=self.requirement_storagerequirement_storage,
215  requirement_vmem=self.requirement_vmemrequirement_vmem,
216  logfile=log_file,
217  ).split() + [tmp_name]
218 
219  # Log the command we are about the execute
220  self.loggerlogger.debug(subprocess.list2cmdline(params))
221 
222  # Submit it to the cluster. The steering
223  # file output will be written to 'log_file' (see above).
224  # If we are performing a dry run, don't send anything to the cluster
225  # and just create the *.done file right away and delete the *.sh file.
226  if not dry:
227  process = subprocess.Popen(
228  params, stdout=self.clusterlogclusterlog, stderr=subprocess.STDOUT
229  )
230 
231  # Check whether the submission succeeded
232  if process.wait() != 0:
233  job.status = "failed"
234  else:
235  os.system(f"echo 0 > {self.path}/script_{job.name}.done")
236  os.unlink(tmp_name)
237 
238  def is_job_finished(self, job: Script):
239  """!
240  Checks whether the '.done'-file has been created for a job. If so, it
241  returns True, else it returns False.
242  Also deletes the .done-File once it has returned True.
243 
244  @param job: The job of which we want to know if it finished
245  @return: True if the job has finished, otherwise False
246  """
247 
248  # If there is a file indicating the job is done, that is its name:
249  donefile_path = f"{self.path}/script_{job.name}.done"
250 
251  # Check if such a file exists. If so, this means that the job has
252  # finished.
253  if os.path.isfile(donefile_path):
254 
255  # Read the returncode/exit_status for the job from the *.done-file
256  with open(donefile_path) as f:
257  try:
258  returncode = int(f.read().strip())
259  except ValueError:
260  returncode = -654
261 
262  # Delete the *.done file
263  os.remove(donefile_path)
264 
265  # Return that the job is finished + the return code for it
266  return [True, returncode]
267 
268  # If no such file exists, the job has not yet finished
269  else:
270  return [False, 0]
271 
272  # noinspection PyMethodMayBeStatic
273  def terminate(self, job: Script):
274  """!
275  Terminate a running job, not support with this backend so ignore the
276  call.
277  """
278  self.loggerlogger.error("Script termination not supported.")
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
queuename
Queue best suitable for execution at DESY NAF.
tools
We need to set up the same environment on the cluster like on the local machine.
def available(self)
The cluster should always be available to accept new jobs.
b2setup
The command for b2setup (and setoption)
clusterlog
The file object to which all cluster messages will be written.
def is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
path
The path, where the help files are being created Maybe there should be a special subfolder for them?
requirement_vmem
required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consump...
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
requirement_storage
the storage IO in GB which can be performed by each job.
def terminate(self, Script job)
Terminate a running job, not support with this backend so ignore the call.
submit_command
The command to submit a job.
def __init__(self)
The default constructor.
def adjust_path(self, str path)
This method can be used if path names are different on submission and execution hosts.
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)