19 from typing 
import Tuple
 
   22 from validationscript 
import Script
 
   27     A class that provides the controls for running jobs on a (remote) 
   28     cluster. It provides two methods: 
   29     - is_job_finished(job): Returns True or False, depending on whether the job 
   30         has finished execution 
   31     - execute(job): Takes a job and executes it by sending it to the cluster 
   37         Check if the bsub command is available 
   39         return shutil.which(
"bsub") 
is not None 
   44         Returns name of this job contol 
   51         Returns description of this job control 
   53         return "Batch submission to bsub-based cluster" 
   57         The default constructor. 
   58         - Holds the current working directory, which is also the location of 
   59           the shellscripts that are being sent to the cluster. 
   60         - Initializes a logger which writes to validate_basf2.py's log. 
   61         - Finds the revision of basf2 that will be set up on the cluster. 
   66         self.
pathpath = os.getcwd()
 
   72         self.
loggerlogger = logging.getLogger(
"validate_basf2")
 
   80         belle2_release_dir = os.environ.get(
"BELLE2_RELEASE_DIR", 
None)
 
   81         belle2_local_dir = os.environ.get(
"BELLE2_LOCAL_DIR", 
None)
 
   85         if belle2_release_dir 
is not None:
 
   86             self.
b2setupb2setup += 
" " + belle2_release_dir.split(
"/")[-1]
 
   87         if belle2_local_dir 
is not None:
 
   94         if os.environ.get(
"BELLE2_OPTION") != 
"debug":
 
   95             self.
b2setupb2setup += 
"; b2code-option " + os.environ.get(
"BELLE2_OPTION")
 
   98         self.
loggerlogger.debug(f
"Setting up the following release: {self.b2setup}")
 
  102         clusterlog_dir = 
"./html/logs/__general__/" 
  103         if not os.path.exists(clusterlog_dir):
 
  104             os.makedirs(clusterlog_dir)
 
  109         This method can be used if path names are different on submission 
  111         @param path: The past that needs to be adjusted 
  112         @return: The adjusted path 
  120         The cluster should always be available to accept new jobs. 
  121         @return: Will always return True if the function can be called 
  126     def execute(self, job: Script, options=
"", dry=
False, tag=
"current"):
 
  128         Takes a Script object and a string with options and runs it on the 
  129         cluster, either with ROOT or with basf2, depending on the file type. 
  131         @param job: The steering file object that should be executed 
  132         @param options: Options that will be given to the basf2 command 
  133         @param dry: Whether to perform a dry run or not 
  134         @param tag: The folder within the results directory 
  143         output_dir = os.path.abspath(f
"./results/{tag}/{job.package}")
 
  144         if not os.path.exists(output_dir):
 
  145             os.makedirs(output_dir)
 
  147         log_file = output_dir + 
"/" + os.path.basename(job.path) + 
".log" 
  150         donefile_path = f
"{self.path}/script_{job.name}.done" 
  151         if os.path.isfile(donefile_path):
 
  152             os.remove(donefile_path)
 
  154         extension = os.path.splitext(job.path)[1]
 
  155         if extension == 
".C":
 
  157             command = 
"root -b -q " + job.path
 
  161             command = f
"basf2 {job.path} {options}" 
  170         with open(tmp_name, 
"w+") 
as tmp_file:
 
  173                 + 
"BELLE2_NO_TOOLS_CHECK=1 \n" 
  174                 + f
"source {self.tools}/b2setup \n" 
  175                 + 
"cd {} \n".format(self.
adjust_pathadjust_path(output_dir))
 
  177                 + 
"echo $? > {}/script_{}.done \n".format(self.
pathpath, job.name)
 
  178                 + f
"rm {tmp_name} \n" 
  182         st = os.stat(tmp_name)
 
  183         os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
 
  196         self.
loggerlogger.debug(subprocess.list2cmdline(params))
 
  200                 proc = subprocess.run(
 
  202                     stdout=subprocess.PIPE,
 
  203                     stderr=subprocess.PIPE,
 
  204                     universal_newlines=
True,
 
  206             except subprocess.CalledProcessError:
 
  207                 job.status = 
"failed" 
  208                 self.
loggerlogger.error(
"Failed to submit job. Here's the traceback:")
 
  209                 self.
loggerlogger.error(traceback.format_exc())
 
  210                 self.
loggerlogger.error(
"Will attempt to cleanup job files.")
 
  214                 if proc.stdout.strip():
 
  216                         f
"Stdout of job submission: '{proc.stdout.strip()}'." 
  218                 if proc.stderr.strip():
 
  220                         f
"Stderr of job submission: '{proc.stderr.strip()}'." 
  225                 res = re.search(
"Job <([0-9]*)> is submitted", proc.stdout)
 
  227                     job.job_id = res.group(1)
 
  230                         "Could not find job id! Will not be able to terminate" 
  231                         " this job, even if necessary. " 
  234             os.system(f
"echo 0 > {self.path}/script_{job.name}.done")
 
  238         """ Clean up after job has finished. """ 
  243         """ Name of temporary file used for job submission. """ 
  244         return self.
pathpath + 
"/" + 
"script_" + job.name + 
".sh" 
  248         Checks whether the '.done'-file has been created for a job. If so, it 
  249         returns True, else it returns False. 
  250         Also deletes the .done-File once it has returned True. 
  252         @param job: The job of which we want to know if it finished 
  253         @return: (True if the job has finished, exit code). If we can't find the 
  254             exit code in the '.done'-file, the returncode will be -654. 
  255             If the job is not finished, the exit code is returned as 0. 
  258         donefile_path = f
"{self.path}/script_{job.name}.done" 
  260         if os.path.isfile(donefile_path):
 
  263             with open(donefile_path) 
as f:
 
  265                     returncode = int(f.read().strip())
 
  269             os.remove(donefile_path)
 
  271             return True, returncode
 
  278         """! Terminate a running job 
  281             params = [
"bkill", job.job_id]
 
  282             self.
loggerlogger.debug(subprocess.list2cmdline(params))
 
  284                 proc = subprocess.run(
 
  286                     stdout=subprocess.PIPE,
 
  287                     stderr=subprocess.PIPE,
 
  288                     universal_newlines=
True,
 
  290             except subprocess.CalledProcessError:
 
  291                 job.status = 
"failed" 
  293                     "Probably wasn't able to cancel job. Here's the traceback:" 
  295                 self.
loggerlogger.error(traceback.format_exc())
 
  297                 if proc.stdout.strip():
 
  299                         f
"Stdout of job termination: '{proc.stdout.strip()}'." 
  301                 if proc.stderr.strip():
 
  303                         f
"Stderr of job termination: '{proc.stderr.strip()}'." 
  309                 "Termination of the job corresponding to steering file " 
  310                 f
"{job.path} has been requested, but no job id is available." 
  311                 " Can't do anything." 
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
tools
Path to the basf2 tools and central/local release.
def available(self)
The cluster should always be available to accept new jobs.
None _cleanup(self, Script job)
b2setup
The command for b2setup (and b2code-option)
Tuple[bool, int] is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
path
The path, where the help files are being created Maybe there should be a special subfolder for them?
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
str _get_tmp_name(self, Script job)
def adjust_path(self, path)
This method can be used if path names are different on submission and execution hosts.
def terminate(self, Script job)
Terminate a running job.
def __init__(self)
The default constructor.