Belle II Software  release-08-01-10
clustercontrol.py
1 #!/usr/bin/env python3
2 
3 
10 
11 # std
12 import logging
13 import os
14 import subprocess
15 import stat
16 import shutil
17 import re
18 import traceback
19 from typing import Tuple
20 
21 # ours
22 from validationscript import Script
23 
24 
25 class Cluster:
26  """
27  A class that provides the controls for running jobs on a (remote)
28  cluster. It provides two methods:
29  - is_job_finished(job): Returns True or False, depending on whether the job
30  has finished execution
31  - execute(job): Takes a job and executes it by sending it to the cluster
32  """
33 
34  @staticmethod
35  def is_supported():
36  """
37  Check if the bsub command is available
38  """
39  return shutil.which("bsub") is not None
40 
41  @staticmethod
42  def name():
43  """
44  Returns name of this job contol
45  """
46  return "cluster"
47 
48  @staticmethod
49  def description():
50  """
51  Returns description of this job control
52  """
53  return "Batch submission to bsub-based cluster"
54 
55  def __init__(self):
56  """!
57  The default constructor.
58  - Holds the current working directory, which is also the location of
59  the shellscripts that are being sent to the cluster.
60  - Initializes a logger which writes to validate_basf2.py's log.
61  - Finds the revision of basf2 that will be set up on the cluster.
62  """
63 
64 
66  self.pathpath = os.getcwd()
67 
68 
72  self.loggerlogger = logging.getLogger("validate_basf2")
73 
74  # We need to set up the same environment on the cluster like on the
75  # local machine. The information can be extracted from $BELLE2_TOOLS,
76  # $BELLE2_RELEASE_DIR and $BELLE2_LOCAL_DIR
77 
78 
79  self.toolstools = self.adjust_pathadjust_path(os.environ["BELLE2_TOOLS"])
80  belle2_release_dir = os.environ.get("BELLE2_RELEASE_DIR", None)
81  belle2_local_dir = os.environ.get("BELLE2_LOCAL_DIR", None)
82 
83 
84  self.b2setupb2setup = "b2setup"
85  if belle2_release_dir is not None:
86  self.b2setupb2setup += " " + belle2_release_dir.split("/")[-1]
87  if belle2_local_dir is not None:
88  self.b2setupb2setup = (
89  "MY_BELLE2_DIR="
90  + self.adjust_pathadjust_path(belle2_local_dir)
91  + " "
92  + self.b2setupb2setup
93  )
94  if os.environ.get("BELLE2_OPTION") != "debug":
95  self.b2setupb2setup += "; b2code-option " + os.environ.get("BELLE2_OPTION")
96 
97  # Write to log which revision we are using
98  self.loggerlogger.debug(f"Setting up the following release: {self.b2setup}")
99 
100  # Define the folder in which the log of the cluster messages will be
101  # stored
102  clusterlog_dir = "./html/logs/__general__/"
103  if not os.path.exists(clusterlog_dir):
104  os.makedirs(clusterlog_dir)
105 
106  # noinspection PyMethodMayBeStatic
107  def adjust_path(self, path):
108  """!
109  This method can be used if path names are different on submission
110  and execution hosts.
111  @param path: The past that needs to be adjusted
112  @return: The adjusted path
113  """
114 
115  return path
116 
117  # noinspection PyMethodMayBeStatic
118  def available(self):
119  """!
120  The cluster should always be available to accept new jobs.
121  @return: Will always return True if the function can be called
122  """
123 
124  return True
125 
126  def execute(self, job: Script, options="", dry=False, tag="current"):
127  """!
128  Takes a Script object and a string with options and runs it on the
129  cluster, either with ROOT or with basf2, depending on the file type.
130 
131  @param job: The steering file object that should be executed
132  @param options: Options that will be given to the basf2 command
133  @param dry: Whether to perform a dry run or not
134  @param tag: The folder within the results directory
135  @return: None
136  """
137 
138  # Define the folder in which the results (= the ROOT files) should be
139  # created. This is where the files containing plots will end up. By
140  # convention, data files will be stored in the parent dir.
141  # Then make sure the folder exists (create if it does not exist) and
142  # change to cwd to this folder.
143  output_dir = os.path.abspath(f"./results/{tag}/{job.package}")
144  if not os.path.exists(output_dir):
145  os.makedirs(output_dir)
146 
147  log_file = output_dir + "/" + os.path.basename(job.path) + ".log"
148 
149  # Remove any left over done files
150  donefile_path = f"{self.path}/script_{job.name}.done"
151  if os.path.isfile(donefile_path):
152  os.remove(donefile_path)
153 
154  extension = os.path.splitext(job.path)[1]
155  if extension == ".C":
156  # .c files are executed with root
157  command = "root -b -q " + job.path
158  else:
159  # .py files are executed with basf2
160  # 'options' contains an option-string for basf2, e.g. '-n 100'
161  command = f"basf2 {job.path} {options}"
162 
163  # Create a helpfile-shellscript, which contains all the commands that
164  # need to be executed by the cluster.
165  # First, set up the basf2 tools and perform b2setup with the correct
166  # revision. The execute the command (i.e. run basf2 or ROOT on a
167  # steering file). Write the return code of that into a *.done file.
168  # Delete the helpfile-shellscript.
169  tmp_name = self._get_tmp_name_get_tmp_name(job)
170  with open(tmp_name, "w+") as tmp_file:
171  tmp_file.write(
172  "#!/bin/bash \n\n"
173  + "BELLE2_NO_TOOLS_CHECK=1 \n"
174  + f"source {self.tools}/b2setup \n"
175  + "cd {} \n".format(self.adjust_pathadjust_path(output_dir))
176  + f"{command} \n"
177  + "echo $? > {}/script_{}.done \n".format(self.pathpath, job.name)
178  + f"rm {tmp_name} \n"
179  )
180 
181  # Make the helpfile-shellscript executable
182  st = os.stat(tmp_name)
183  os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
184 
185  # Prepare the command line command for submission to the cluster
186  params = [
187  "bsub",
188  "-oo",
189  log_file,
190  "-q",
191  "l",
192  tmp_name,
193  ]
194 
195  # Log the command we are about the execute
196  self.loggerlogger.debug(subprocess.list2cmdline(params))
197 
198  if not dry:
199  try:
200  proc = subprocess.run(
201  params,
202  stdout=subprocess.PIPE,
203  stderr=subprocess.PIPE,
204  universal_newlines=True,
205  )
206  except subprocess.CalledProcessError:
207  job.status = "failed"
208  self.loggerlogger.error("Failed to submit job. Here's the traceback:")
209  self.loggerlogger.error(traceback.format_exc())
210  self.loggerlogger.error("Will attempt to cleanup job files.")
211  self._cleanup_cleanup(job)
212  return
213  else:
214  if proc.stdout.strip():
215  self.loggerlogger.debug(
216  f"Stdout of job submission: '{proc.stdout.strip()}'."
217  )
218  if proc.stderr.strip():
219  self.loggerlogger.debug(
220  f"Stderr of job submission: '{proc.stderr.strip()}'."
221  )
222 
223  # Submission succeeded. Get Job ID by parsing output, so that
224  # we can terminate the job later.
225  res = re.search("Job <([0-9]*)> is submitted", proc.stdout)
226  if res:
227  job.job_id = res.group(1)
228  else:
229  self.loggerlogger.error(
230  "Could not find job id! Will not be able to terminate"
231  " this job, even if necessary. "
232  )
233  else:
234  os.system(f"echo 0 > {self.path}/script_{job.name}.done")
235  self._cleanup_cleanup(job)
236 
237  def _cleanup(self, job: Script) -> None:
238  """ Clean up after job has finished. """
239  tmp_name = self._get_tmp_name_get_tmp_name(job)
240  os.unlink(tmp_name)
241 
242  def _get_tmp_name(self, job: Script) -> str:
243  """ Name of temporary file used for job submission. """
244  return self.pathpath + "/" + "script_" + job.name + ".sh"
245 
246  def is_job_finished(self, job: Script) -> Tuple[bool, int]:
247  """!
248  Checks whether the '.done'-file has been created for a job. If so, it
249  returns True, else it returns False.
250  Also deletes the .done-File once it has returned True.
251 
252  @param job: The job of which we want to know if it finished
253  @return: (True if the job has finished, exit code). If we can't find the
254  exit code in the '.done'-file, the returncode will be -654.
255  If the job is not finished, the exit code is returned as 0.
256  """
257 
258  donefile_path = f"{self.path}/script_{job.name}.done"
259 
260  if os.path.isfile(donefile_path):
261  # Job finished.
262  # Read the returncode/exit_status
263  with open(donefile_path) as f:
264  try:
265  returncode = int(f.read().strip())
266  except ValueError:
267  returncode = -654
268 
269  os.remove(donefile_path)
270 
271  return True, returncode
272 
273  else:
274  # If no such file exists, the job has not yet finished
275  return False, 0
276 
277  def terminate(self, job: Script):
278  """! Terminate a running job
279  """
280  if job.job_id:
281  params = ["bkill", job.job_id]
282  self.loggerlogger.debug(subprocess.list2cmdline(params))
283  try:
284  proc = subprocess.run(
285  params,
286  stdout=subprocess.PIPE,
287  stderr=subprocess.PIPE,
288  universal_newlines=True,
289  )
290  except subprocess.CalledProcessError:
291  job.status = "failed"
292  self.loggerlogger.error(
293  "Probably wasn't able to cancel job. Here's the traceback:"
294  )
295  self.loggerlogger.error(traceback.format_exc())
296  else:
297  if proc.stdout.strip():
298  self.loggerlogger.debug(
299  f"Stdout of job termination: '{proc.stdout.strip()}'."
300  )
301  if proc.stderr.strip():
302  self.loggerlogger.debug(
303  f"Stderr of job termination: '{proc.stderr.strip()}'."
304  )
305  finally:
306  self._cleanup_cleanup(job)
307  else:
308  self.loggerlogger.error(
309  "Termination of the job corresponding to steering file "
310  f"{job.path} has been requested, but no job id is available."
311  " Can't do anything."
312  )
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
tools
Path to the basf2 tools and central/local release.
def available(self)
The cluster should always be available to accept new jobs.
None _cleanup(self, Script job)
b2setup
The command for b2setup (and b2code-option)
Tuple[bool, int] is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
path
The path, where the help files are being created Maybe there should be a special subfolder for them?
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
str _get_tmp_name(self, Script job)
def adjust_path(self, path)
This method can be used if path names are different on submission and execution hosts.
def terminate(self, Script job)
Terminate a running job.
def __init__(self)
The default constructor.