Belle II Software  release-05-02-19
clustercontrol.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 # std
5 import logging
6 import os
7 import subprocess
8 import stat
9 import shutil
10 import re
11 import traceback
12 from typing import Tuple
13 
14 # ours
15 from validationscript import Script
16 
17 
18 class Cluster:
19  """
20  A class that provides the controls for running jobs on a (remote)
21  cluster. It provides two methods:
22  - is_job_finished(job): Returns True or False, depending on whether the job
23  has finished execution
24  - execute(job): Takes a job and executes it by sending it to the cluster
25  """
26 
27  @staticmethod
28  def is_supported():
29  """
30  Check if the bsub command is available
31  """
32  return shutil.which("bsub") is not None
33 
34  @staticmethod
35  def name():
36  """
37  Returns name of this job contol
38  """
39  return "cluster"
40 
41  @staticmethod
42  def description():
43  """
44  Returns description of this job control
45  """
46  return "Batch submission to bsub-based cluster"
47 
48  def __init__(self):
49  """!
50  The default constructor.
51  - Holds the current working directory, which is also the location of
52  the shellscripts that are being sent to the cluster.
53  - Initializes a logger which writes to validate_basf2.py's log.
54  - Finds the revision of basf2 that will be set up on the cluster.
55  """
56 
57 
59  self.path = os.getcwd()
60 
61 
65  self.logger = logging.getLogger('validate_basf2')
66 
67  # We need to set up the same environment on the cluster like on the
68  # local machine. The information can be extracted from $BELLE2_TOOLS,
69  # $BELLE2_RELEASE_DIR and $BELLE2_LOCAL_DIR
70 
71 
72  self.tools = self.adjust_path(os.environ['BELLE2_TOOLS'])
73  belle2_release_dir = os.environ.get('BELLE2_RELEASE_DIR', None)
74  belle2_local_dir = os.environ.get('BELLE2_LOCAL_DIR', None)
75 
76 
77  self.b2setup = 'b2setup'
78  if belle2_release_dir is not None:
79  self.b2setup += ' ' + belle2_release_dir.split('/')[-1]
80  if belle2_local_dir is not None:
81  self.b2setup = 'MY_BELLE2_DIR=' + \
82  self.adjust_path(belle2_local_dir) + ' ' + self.b2setup
83  if os.environ.get('BELLE2_OPTION') != 'debug':
84  self.b2setup += '; b2code-option ' + \
85  os.environ.get('BELLE2_OPTION')
86 
87  # Write to log which revision we are using
88  self.logger.debug(f'Setting up the following release: {self.b2setup}')
89 
90  # Define the folder in which the log of the cluster messages will be
91  # stored
92  clusterlog_dir = './html/logs/__general__/'
93  if not os.path.exists(clusterlog_dir):
94  os.makedirs(clusterlog_dir)
95 
96  # noinspection PyMethodMayBeStatic
97  def adjust_path(self, path):
98  """!
99  This method can be used if path names are different on submission
100  and execution hosts.
101  @param path: The past that needs to be adjusted
102  @return: The adjusted path
103  """
104 
105  return path
106 
107  # noinspection PyMethodMayBeStatic
108  def available(self):
109  """!
110  The cluster should always be available to accept new jobs.
111  @return: Will always return True if the function can be called
112  """
113 
114  return True
115 
116  def execute(self, job: Script, options='', dry=False, tag='current'):
117  """!
118  Takes a Script object and a string with options and runs it on the
119  cluster, either with ROOT or with basf2, depending on the file type.
120 
121  @param job: The steering file object that should be executed
122  @param options: Options that will be given to the basf2 command
123  @param dry: Whether to perform a dry run or not
124  @param tag: The folder within the results directory
125  @return: None
126  """
127 
128  # Define the folder in which the results (= the ROOT files) should be
129  # created. This is where the files containing plots will end up. By
130  # convention, data files will be stored in the parent dir.
131  # Then make sure the folder exists (create if it does not exist) and
132  # change to cwd to this folder.
133  output_dir = os.path.abspath(f'./results/{tag}/{job.package}')
134  if not os.path.exists(output_dir):
135  os.makedirs(output_dir)
136 
137  log_file = output_dir + '/' + os.path.basename(job.path) + '.log'
138 
139  # Remove any left over done files
140  donefile_path = f"{self.path}/script_{job.name}.done"
141  if os.path.isfile(donefile_path):
142  os.remove(donefile_path)
143 
144  extension = os.path.splitext(job.path)[1]
145  if extension == '.C':
146  # .c files are executed with root
147  command = 'root -b -q ' + job.path
148  else:
149  # .py files are executed with basf2
150  # 'options' contains an option-string for basf2, e.g. '-n 100'
151  command = f'basf2 {job.path} {options}'
152 
153  # Create a helpfile-shellscript, which contains all the commands that
154  # need to be executed by the cluster.
155  # First, set up the basf2 tools and perform b2setup with the correct
156  # revision. The execute the command (i.e. run basf2 or ROOT on a
157  # steering file). Write the return code of that into a *.done file.
158  # Delete the helpfile-shellscript.
159  tmp_name = self._get_tmp_name(job)
160  with open(tmp_name, 'w+') as tmp_file:
161  tmp_file.write('#!/bin/bash \n\n' +
162  'BELLE2_NO_TOOLS_CHECK=1 \n' +
163  'source {0}/b2setup \n'.format(self.tools) +
164  'cd {0} \n'.format(self.adjust_path(output_dir)) +
165  '{0} \n'.format(command) +
166  'echo $? > {0}/script_{1}.done \n'
167  .format(self.path, job.name) +
168  'rm {0} \n'.format(tmp_name))
169 
170  # Make the helpfile-shellscript executable
171  st = os.stat(tmp_name)
172  os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
173 
174  # Prepare the command line command for submission to the cluster
175  params = [
176  "bsub", "-oo", log_file, "-q", "l", tmp_name,
177  ]
178 
179  # Log the command we are about the execute
180  self.logger.debug(subprocess.list2cmdline(params))
181 
182  if not dry:
183  try:
184  proc = subprocess.run(
185  params,
186  stdout=subprocess.PIPE,
187  stderr=subprocess.PIPE,
188  universal_newlines=True,
189  )
190  except subprocess.CalledProcessError:
191  job.status = 'failed'
192  self.logger.error("Failed to submit job. Here's the traceback:")
193  self.logger.error(traceback.format_exc())
194  self.logger.error("Will attempt to cleanup job files.")
195  self._cleanup(job)
196  return
197  else:
198  if proc.stdout.strip():
199  self.logger.debug(
200  f"Stdout of job submission: '{proc.stdout.strip()}'."
201  )
202  if proc.stderr.strip():
203  self.logger.debug(
204  f"Stderr of job submission: '{proc.stderr.strip()}'."
205  )
206 
207  # Submission succeeded. Get Job ID by parsing output, so that
208  # we can terminate the job later.
209  res = re.search("Job <([0-9]*)> is submitted", proc.stdout)
210  if res:
211  job.job_id = res.group(1)
212  else:
213  self.logger.error(
214  f"Could not find job id! Will not be able to terminate"
215  f" this job, even if necessary. "
216  )
217  else:
218  os.system(f'echo 0 > {self.path}/script_{job.name}.done')
219  self._cleanup(job)
220 
221  def _cleanup(self, job: Script) -> None:
222  """ Clean up after job has finished. """
223  tmp_name = self._get_tmp_name(job)
224  os.unlink(tmp_name)
225 
226  def _get_tmp_name(self, job: Script) -> str:
227  """ Name of temporary file used for job submission. """
228  return self.path + '/' + 'script_' + job.name + '.sh'
229 
230  def is_job_finished(self, job: Script) -> Tuple[bool, int]:
231  """!
232  Checks whether the '.done'-file has been created for a job. If so, it
233  returns True, else it returns False.
234  Also deletes the .done-File once it has returned True.
235 
236  @param job: The job of which we want to know if it finished
237  @return: (True if the job has finished, exit code). If we can't find the
238  exit code in the '.done'-file, the returncode will be -666.
239  If the job is not finished, the exit code is returned as 0.
240  """
241 
242  donefile_path = f"{self.path}/script_{job.name}.done"
243 
244  if os.path.isfile(donefile_path):
245  # Job finished.
246  # Read the returncode/exit_status
247  with open(donefile_path) as f:
248  try:
249  returncode = int(f.read().strip())
250  except ValueError:
251  returncode = -666
252 
253  os.remove(donefile_path)
254 
255  return True, returncode
256 
257  else:
258  # If no such file exists, the job has not yet finished
259  return False, 0
260 
261  def terminate(self, job: Script):
262  """! Terminate a running job
263  """
264  if job.job_id:
265  params = ["bkill", job.job_id]
266  self.logger.debug(subprocess.list2cmdline(params))
267  try:
268  proc = subprocess.run(
269  params,
270  stdout=subprocess.PIPE,
271  stderr=subprocess.PIPE,
272  universal_newlines=True,
273  )
274  except subprocess.CalledProcessError:
275  job.status = 'failed'
276  self.logger.error(
277  f"Probably wasn't able to cancel job. Here's the traceback:"
278  )
279  self.logger.error(traceback.format_exc())
280  else:
281  if proc.stdout.strip():
282  self.logger.debug(
283  f"Stdout of job termination: '{proc.stdout.strip()}'."
284  )
285  if proc.stderr.strip():
286  self.logger.debug(
287  f"Stderr of job termination: '{proc.stderr.strip()}'."
288  )
289  finally:
290  self._cleanup(job)
291  else:
292  self.logger.error(
293  "Termination of the job corresponding to steering file "
294  f"{job.path} has been requested, but no job id is available."
295  f" Can't do anything."
296  )
clustercontrol.Cluster.b2setup
b2setup
The command for b2setup (and b2code-option)
Definition: clustercontrol.py:77
clustercontrol.Cluster
Definition: clustercontrol.py:18
clustercontrol.Cluster.description
def description()
Definition: clustercontrol.py:42
clustercontrol.Cluster.is_supported
def is_supported()
Definition: clustercontrol.py:28
clustercontrol.Cluster._cleanup
None _cleanup(self, Script job)
Definition: clustercontrol.py:221
clustercontrol.Cluster.adjust_path
def adjust_path(self, path)
This method can be used if path names are different on submission and execution hosts.
Definition: clustercontrol.py:97
clustercontrol.Cluster.available
def available(self)
The cluster should always be available to accept new jobs.
Definition: clustercontrol.py:108
clustercontrol.Cluster.logger
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
Definition: clustercontrol.py:65
clustercontrol.Cluster.is_job_finished
Tuple[bool, int] is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
Definition: clustercontrol.py:230
clustercontrol.Cluster.path
path
The path, where the help files are being created Maybe there should be a special subfolder for them?
Definition: clustercontrol.py:59
clustercontrol.Cluster._get_tmp_name
str _get_tmp_name(self, Script job)
Definition: clustercontrol.py:226
clustercontrol.Cluster.execute
def execute(self, Script job, options='', dry=False, tag='current')
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
Definition: clustercontrol.py:116
clustercontrol.Cluster.name
def name()
Definition: clustercontrol.py:35
clustercontrol.Cluster.tools
tools
Path to the basf2 tools and central/local release.
Definition: clustercontrol.py:72
clustercontrol.Cluster.__init__
def __init__(self)
The default constructor.
Definition: clustercontrol.py:48
clustercontrol.Cluster.terminate
def terminate(self, Script job)
Terminate a running job.
Definition: clustercontrol.py:261