Belle II Software development
LSF Class Reference
Inheritance diagram for LSF:
Batch Backend

Classes

class  LSFResult
 

Public Member Functions

 __init__ (self, *, backend_args=None)
 
 can_submit (self, njobs=1)
 
 bjobs (cls, output_fields=None, job_id="", username="", queue="")
 
 bqueues (cls, output_fields=None, queues=None)
 
 submit (self, job, check_can_submit=True, jobs_per_check=100)
 
 get_batch_submit_script_path (self, job)
 
 get_submit_script_path (self, job)
 

Public Attributes

int global_job_limit = self.default_global_job_limit
 The active job limit.
 
int sleep_between_submission_checks = self.default_sleep_between_submission_checks
 Seconds we wait before checking if we can submit a list of jobs.
 
dict backend_args = {**self.default_backend_args, **backend_args}
 The backend args that will be applied to jobs unless the job specifies them itself.
 

Static Public Attributes

str cmd_wkdir = "#BSUB -cwd"
 Working directory directive.
 
str cmd_stdout = "#BSUB -o"
 stdout file directive
 
str cmd_stderr = "#BSUB -e"
 stderr file directive
 
str cmd_queue = "#BSUB -q"
 Queue directive.
 
str cmd_name = "#BSUB -J"
 Job name directive.
 
list submission_cmds = []
 Shell command to submit a script, should be implemented in the derived class.
 
int default_global_job_limit = 1000
 Default global limit on the total number of submitted/running jobs that the user can have.
 
int default_sleep_between_submission_checks = 30
 Default time betweeon re-checking if the active jobs is below the global job limit.
 
str submit_script = "submit.sh"
 Default submission script name.
 
str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
 Default exit code file name.
 
dict default_backend_args = {}
 Default backend_args.
 

Protected Member Functions

 _add_batch_directives (self, job, batch_file)
 
 _create_cmd (self, script_path)
 
 _submit_to_batch (cls, cmd)
 
 _create_parent_job_result (cls, parent)
 
 _create_job_result (cls, job, batch_output)
 
 _make_submit_file (self, job, submit_file_path)
 
 _add_wrapper_script_setup (self, job, batch_file)
 
 _add_wrapper_script_teardown (self, job, batch_file)
 

Static Protected Member Functions

 _add_setup (job, batch_file)
 

Detailed Description

Backend for submitting calibration processes to a qsub batch system.

Definition at line 1625 of file backends.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
* ,
backend_args = None )
 

Definition at line 1646 of file backends.py.

1646 def __init__(self, *, backend_args=None):
1647 """
1648 """
1649 super().__init__(backend_args=backend_args)
1650

Member Function Documentation

◆ _add_batch_directives()

_add_batch_directives ( self,
job,
batch_file )
protected
Adds LSF BSUB directives for the job to a script.

Reimplemented from Batch.

Definition at line 1651 of file backends.py.

1651 def _add_batch_directives(self, job, batch_file):
1652 """
1653 Adds LSF BSUB directives for the job to a script.
1654 """
1655 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1656 batch_queue = job_backend_args["queue"]
1657 print("#!/bin/bash", file=batch_file)
1658 print("# --- Start LSF ---", file=batch_file)
1659 print(" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1660 print(" ".join([LSF.cmd_name, job.name]), file=batch_file)
1661 print(" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1662 print(" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1663 print(" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1664 print("# --- End LSF ---", file=batch_file)
1665

◆ _add_setup()

_add_setup ( job,
batch_file )
staticprotectedinherited
Adds setup lines to the shell script file.

Definition at line 807 of file backends.py.

807 def _add_setup(job, batch_file):
808 """
809 Adds setup lines to the shell script file.
810 """
811 for line in job.setup_cmds:
812 print(line, file=batch_file)
813

◆ _add_wrapper_script_setup()

_add_wrapper_script_setup ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
`trap` statements for Ctrl-C situations.

Definition at line 814 of file backends.py.

814 def _add_wrapper_script_setup(self, job, batch_file):
815 """
816 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
817 `trap` statements for Ctrl-C situations.
818 """
819 start_wrapper = f"""# ---
820# trap ctrl-c and call ctrl_c()
821trap '(ctrl_c 130)' SIGINT
822trap '(ctrl_c 143)' SIGTERM
823
824function write_exit_code() {{
825 echo "Writing $1 to exit status file"
826 echo "$1" > {self.exit_code_file}
827 exit $1
828}}
829
830function ctrl_c() {{
831 trap '' SIGINT SIGTERM
832 echo "** Trapped Ctrl-C **"
833 echo "$1" > {self.exit_code_file}
834 exit $1
835}}
836# ---"""
837 print(start_wrapper, file=batch_file)
838

◆ _add_wrapper_script_teardown()

_add_wrapper_script_teardown ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
an exit code of the job cmd being written out to a file. Which means that we can know if the command was
successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
file.

Definition at line 839 of file backends.py.

839 def _add_wrapper_script_teardown(self, job, batch_file):
840 """
841 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
842 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
843 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
844 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
845 file.
846 """
847 end_wrapper = """# ---
848write_exit_code $?"""
849 print(end_wrapper, file=batch_file)
850

◆ _create_cmd()

_create_cmd ( self,
script_path )
protected
 

Reimplemented from Batch.

Definition at line 1666 of file backends.py.

1666 def _create_cmd(self, script_path):
1667 """
1668 """
1669 submission_cmd = self.submission_cmds[:]
1670 submission_cmd.append(script_path.as_posix())
1671 submission_cmd = " ".join(submission_cmd)
1672 return [submission_cmd]
1673

◆ _create_job_result()

_create_job_result ( cls,
job,
batch_output )
protected
 

Reimplemented from Batch.

Definition at line 1782 of file backends.py.

1782 def _create_job_result(cls, job, batch_output):
1783 """
1784 """
1785 m = re.search(r"Job <(\d+)>", str(batch_output))
1786 if m:
1787 job_id = m.group(1)
1788 else:
1789 raise BackendError(f"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1790
1791 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1792 job.result = cls.LSFResult(job, job_id)
1793

◆ _create_parent_job_result()

_create_parent_job_result ( cls,
parent )
protected
We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
statuses and allows the use of ready().

Reimplemented from Backend.

Definition at line 1778 of file backends.py.

1778 def _create_parent_job_result(cls, parent):
1779 parent.result = cls.LSFResult(parent, None)
1780

◆ _make_submit_file()

_make_submit_file ( self,
job,
submit_file_path )
protectedinherited
Useful for the HTCondor backend where a submit is needed instead of batch
directives pasted directly into the submission script. It should be overwritten
if needed.

Reimplemented in HTCondor.

Definition at line 1180 of file backends.py.

1180 def _make_submit_file(self, job, submit_file_path):
1181 """
1182 Useful for the HTCondor backend where a submit is needed instead of batch
1183 directives pasted directly into the submission script. It should be overwritten
1184 if needed.
1185 """
1186

◆ _submit_to_batch()

_submit_to_batch ( cls,
cmd )
protected
Do the actual batch submission command and collect the output to find out the job id for later monitoring.

Reimplemented from Batch.

Definition at line 1675 of file backends.py.

1675 def _submit_to_batch(cls, cmd):
1676 """
1677 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1678 """
1679 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1680 return sub_out
1681

◆ bjobs()

bjobs ( cls,
output_fields = None,
job_id = "",
username = "",
queue = "" )
Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.

Parameters:
    output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
    job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
        the output of this function will be only information about this job. If this argument is not given, then all jobs
        matching the other filters will be returned.
    username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
        a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
        receive job information from all known user jobs matching the other filters.
    queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.

Returns:
    dict: JSON dictionary of the form:

    .. code-block:: python

      {
        "NJOBS":<njobs returned by command>,
        "JOBS":[
                {
                  <output field: value>, ...
                }, ...
               ]
      }

Definition at line 1818 of file backends.py.

1818 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1819 """
1820 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1821 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1822
1823 Parameters:
1824 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1825 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1826 the output of this function will be only information about this job. If this argument is not given, then all jobs
1827 matching the other filters will be returned.
1828 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1829 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1830 receive job information from all known user jobs matching the other filters.
1831 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1832
1833 Returns:
1834 dict: JSON dictionary of the form:
1835
1836 .. code-block:: python
1837
1838 {
1839 "NJOBS":<njobs returned by command>,
1840 "JOBS":[
1841 {
1842 <output field: value>, ...
1843 }, ...
1844 ]
1845 }
1846 """
1847 B2DEBUG(29, f"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1848 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1849 if not output_fields:
1850 output_fields = ["id"]
1851 # Output fields should be space separated but in a string.
1852 field_list_cmd = "\""
1853 field_list_cmd += " ".join(output_fields)
1854 field_list_cmd += "\""
1855 cmd_list = ["bjobs", "-o", field_list_cmd]
1856 # If the queue name is set then we add to the command options
1857 if queue:
1858 cmd_list.extend(["-q", queue])
1859 # If the username is set then we add to the command options
1860 if username:
1861 cmd_list.extend(["-u", username])
1862 # Can now add the json option before the final positional argument (if used)
1863 cmd_list.append("-json")
1864 # If the job id is set then we add to the end of the command
1865 if job_id:
1866 cmd_list.append(job_id)
1867 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1868 cmd = " ".join(cmd_list)
1869 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1870 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True))
1871 output["NJOBS"] = output["JOBS"]
1872 output["JOBS"] = output["RECORDS"]
1873 del output["RECORDS"]
1874 del output["COMMAND"]
1875 return output
1876

◆ bqueues()

bqueues ( cls,
output_fields = None,
queues = None )
Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
The result is the JSON dictionary returned by output of the ``-json`` bqueues option.

Parameters:
    output_fields (list[str]): A list of bqueues -o fields that you would like information about
        e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
    queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
        By default you will receive information about all queues.

Returns:
    dict: JSON dictionary of the form:

    .. code-block:: python

      {
        "COMMAND":"bqueues",
        "QUEUES":46,
        "RECORDS":[
          {
            "QUEUE_NAME":"b2_beast",
            "STATUS":"Open:Active",
            "MAX":"200",
            "NJOBS":"0",
            "PEND":"0",
            "RUN":"0"
          }, ...
      }

Definition at line 1878 of file backends.py.

1878 def bqueues(cls, output_fields=None, queues=None):
1879 """
1880 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1881 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1882
1883 Parameters:
1884 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1885 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1886 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1887 By default you will receive information about all queues.
1888
1889 Returns:
1890 dict: JSON dictionary of the form:
1891
1892 .. code-block:: python
1893
1894 {
1895 "COMMAND":"bqueues",
1896 "QUEUES":46,
1897 "RECORDS":[
1898 {
1899 "QUEUE_NAME":"b2_beast",
1900 "STATUS":"Open:Active",
1901 "MAX":"200",
1902 "NJOBS":"0",
1903 "PEND":"0",
1904 "RUN":"0"
1905 }, ...
1906 }
1907 """
1908 B2DEBUG(29, f"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1909 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1910 if not output_fields:
1911 output_fields = ["queue_name", "status", "max", "njobs", "pend", "run"]
1912 # Output fields should be space separated but in a string.
1913 field_list_cmd = "\""
1914 field_list_cmd += " ".join(output_fields)
1915 field_list_cmd += "\""
1916 cmd_list = ["bqueues", "-o", field_list_cmd]
1917 # Can now add the json option before the final positional argument (if used)
1918 cmd_list.append("-json")
1919 # If the queue name is set then we add to the end of the command
1920 if queues:
1921 cmd_list.extend(queues)
1922 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1923 cmd = " ".join(cmd_list)
1924 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1925 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1926 return decode_json_string(output)
1927
1928

◆ can_submit()

can_submit ( self,
njobs = 1 )
Checks the global number of jobs in LSF right now (submitted or running) for this user.
Returns True if the number is lower that the limit, False if it is higher.

Parameters:
    njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
        are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
        assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
        So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
        and check again before submitting more.

Reimplemented from Batch.

Definition at line 1794 of file backends.py.

1794 def can_submit(self, njobs=1):
1795 """
1796 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1797 Returns True if the number is lower that the limit, False if it is higher.
1798
1799 Parameters:
1800 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1801 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1802 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1803 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1804 and check again before submitting more.
1805 """
1806 B2DEBUG(29, "Calling LSF().can_submit()")
1807 job_info = self.bjobs(output_fields=["stat"])
1808 total_jobs = job_info["NJOBS"]
1809 B2INFO(f"Total jobs active in the LSF system is currently {total_jobs}")
1810 if (total_jobs + njobs) > self.global_job_limit:
1811 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1812 return False
1813 else:
1814 B2INFO("There is enough space to submit more jobs.")
1815 return True
1816

◆ get_batch_submit_script_path()

get_batch_submit_script_path ( self,
job )
inherited
Construct the Path object of the script file that we will submit using the batch command.
For most batch backends this is the same script as the bash script we submit.
But for some they require a separate submission file that describes the job.
To implement that you can implement this function in the Backend class.

Reimplemented in HTCondor.

Definition at line 1336 of file backends.py.

1336 def get_batch_submit_script_path(self, job):
1337 """
1338 Construct the Path object of the script file that we will submit using the batch command.
1339 For most batch backends this is the same script as the bash script we submit.
1340 But for some they require a separate submission file that describes the job.
1341 To implement that you can implement this function in the Backend class.
1342 """
1343 return Path(job.working_dir, self.submit_script)
1344

◆ get_submit_script_path()

get_submit_script_path ( self,
job )
inherited
Construct the Path object of the bash script file that we will submit. It will contain
the actual job command, wrapper commands, setup commands, and any batch directives

Definition at line 860 of file backends.py.

860 def get_submit_script_path(self, job):
861 """
862 Construct the Path object of the bash script file that we will submit. It will contain
863 the actual job command, wrapper commands, setup commands, and any batch directives
864 """
865 return Path(job.working_dir, self.submit_script)
866
867

◆ submit()

submit ( self,
job,
check_can_submit = True,
jobs_per_check = 100 )
inherited
 

Reimplemented from Backend.

Definition at line 1205 of file backends.py.

1205 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1206 """
1207 """
1208 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1209 "Did you submit a (Sub)Job?")
1210

Member Data Documentation

◆ backend_args

dict backend_args = {**self.default_backend_args, **backend_args}
inherited

The backend args that will be applied to jobs unless the job specifies them itself.

Definition at line 797 of file backends.py.

◆ cmd_name

str cmd_name = "#BSUB -J"
static

Job name directive.

Definition at line 1638 of file backends.py.

◆ cmd_queue

str cmd_queue = "#BSUB -q"
static

Queue directive.

Definition at line 1636 of file backends.py.

◆ cmd_stderr

str cmd_stderr = "#BSUB -e"
static

stderr file directive

Definition at line 1634 of file backends.py.

◆ cmd_stdout

str cmd_stdout = "#BSUB -o"
static

stdout file directive

Definition at line 1632 of file backends.py.

◆ cmd_wkdir

str cmd_wkdir = "#BSUB -cwd"
static

Working directory directive.

Definition at line 1630 of file backends.py.

◆ default_backend_args

dict default_backend_args = {}
staticinherited

Default backend_args.

Definition at line 789 of file backends.py.

◆ default_global_job_limit

int default_global_job_limit = 1000
staticinherited

Default global limit on the total number of submitted/running jobs that the user can have.

This limit will not affect the total number of jobs that are eventually submitted. But the jobs won't actually be submitted until this limit can be respected i.e. until the number of total jobs in the Batch system goes down. Since we actually submit in chunks of N jobs, before checking this limit value again, this value needs to be a little lower than the real batch system limit. Otherwise you could accidentally go over during the N job submission if other processes are checking and submitting concurrently. This is quite common for the first submission of jobs from parallel calibrations.

Note that if there are other jobs already submitted for your account, then these will count towards this limit.

Definition at line 1156 of file backends.py.

◆ default_sleep_between_submission_checks

int default_sleep_between_submission_checks = 30
staticinherited

Default time betweeon re-checking if the active jobs is below the global job limit.

Definition at line 1158 of file backends.py.

◆ exit_code_file

str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
staticinherited

Default exit code file name.

Definition at line 787 of file backends.py.

◆ global_job_limit

int global_job_limit = self.default_global_job_limit
inherited

The active job limit.

This is 'global' because we want to prevent us accidentally submitting too many jobs from all current and previous submission scripts.

Definition at line 1167 of file backends.py.

◆ sleep_between_submission_checks

int sleep_between_submission_checks = self.default_sleep_between_submission_checks
inherited

Seconds we wait before checking if we can submit a list of jobs.

Only relevant once we hit the global limit of active jobs, which is a lot usually.

Definition at line 1170 of file backends.py.

◆ submission_cmds

list submission_cmds = []
staticinherited

Shell command to submit a script, should be implemented in the derived class.

Definition at line 1143 of file backends.py.

◆ submit_script

submit_script = "submit.sh"
staticinherited

Default submission script name.

Definition at line 785 of file backends.py.


The documentation for this class was generated from the following file: