Belle II Software development
PBS Class Reference
Inheritance diagram for PBS:
Batch Backend

Classes

class  PBSResult
 

Public Member Functions

 __init__ (self, *, backend_args=None)
 
 can_submit (self, njobs=1)
 
 qstat (cls, username="", job_ids=None)
 
 submit (self, job, check_can_submit=True, jobs_per_check=100)
 
 get_batch_submit_script_path (self, job)
 
 get_submit_script_path (self, job)
 

Static Public Member Functions

 create_job_record_from_element (job_elem)
 

Public Attributes

int global_job_limit = self.default_global_job_limit
 The active job limit.
 
int sleep_between_submission_checks = self.default_sleep_between_submission_checks
 Seconds we wait before checking if we can submit a list of jobs.
 
dict backend_args = {**self.default_backend_args, **backend_args}
 The backend args that will be applied to jobs unless the job specifies them itself.
 

Static Public Attributes

str cmd_wkdir = "#PBS -d"
 Working directory directive.
 
str cmd_stdout = "#PBS -o"
 stdout file directive
 
str cmd_stderr = "#PBS -e"
 stderr file directive
 
str cmd_queue = "#PBS -q"
 Queue directive.
 
str cmd_name = "#PBS -N"
 Job name directive.
 
list submission_cmds = []
 Shell command to submit a script, should be implemented in the derived class.
 
int default_global_job_limit = 1000
 Default global limit on the total number of submitted/running jobs that the user can have.
 
int default_sleep_between_submission_checks = 30
 Default time betweeon re-checking if the active jobs is below the global job limit.
 
str submit_script = "submit.sh"
 Default submission script name.
 
str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
 Default exit code file name.
 
dict default_backend_args = {}
 Default backend_args.
 

Protected Member Functions

 _add_batch_directives (self, job, batch_file)
 
 _create_job_result (cls, job, batch_output)
 
 _create_cmd (self, script_path)
 
 _submit_to_batch (cls, cmd)
 
 _create_parent_job_result (cls, parent)
 
 _make_submit_file (self, job, submit_file_path)
 
 _add_wrapper_script_setup (self, job, batch_file)
 
 _add_wrapper_script_teardown (self, job, batch_file)
 

Static Protected Member Functions

 _add_setup (job, batch_file)
 

Detailed Description

Backend for submitting calibration processes to a qsub batch system.

Definition at line 1357 of file backends.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
* ,
backend_args = None )
 

Definition at line 1378 of file backends.py.

1378 def __init__(self, *, backend_args=None):
1379 """
1380 """
1381 super().__init__(backend_args=backend_args)
1382

Member Function Documentation

◆ _add_batch_directives()

_add_batch_directives ( self,
job,
batch_file )
protected
Add PBS directives to submitted script.

Reimplemented from Batch.

Definition at line 1383 of file backends.py.

1383 def _add_batch_directives(self, job, batch_file):
1384 """
1385 Add PBS directives to submitted script.
1386 """
1387 job_backend_args = {**self.backend_args, **job.backend_args}
1388 batch_queue = job_backend_args["queue"]
1389 print("#!/bin/bash", file=batch_file)
1390 print("# --- Start PBS ---", file=batch_file)
1391 print(" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1392 print(" ".join([PBS.cmd_name, job.name]), file=batch_file)
1393 print(" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1394 print(" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1395 print(" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1396 print("# --- End PBS ---", file=batch_file)
1397

◆ _add_setup()

_add_setup ( job,
batch_file )
staticprotectedinherited
Adds setup lines to the shell script file.

Definition at line 807 of file backends.py.

807 def _add_setup(job, batch_file):
808 """
809 Adds setup lines to the shell script file.
810 """
811 for line in job.setup_cmds:
812 print(line, file=batch_file)
813

◆ _add_wrapper_script_setup()

_add_wrapper_script_setup ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
`trap` statements for Ctrl-C situations.

Definition at line 814 of file backends.py.

814 def _add_wrapper_script_setup(self, job, batch_file):
815 """
816 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
817 `trap` statements for Ctrl-C situations.
818 """
819 start_wrapper = f"""# ---
820# trap ctrl-c and call ctrl_c()
821trap '(ctrl_c 130)' SIGINT
822trap '(ctrl_c 143)' SIGTERM
823
824function write_exit_code() {{
825 echo "Writing $1 to exit status file"
826 echo "$1" > {self.exit_code_file}
827 exit $1
828}}
829
830function ctrl_c() {{
831 trap '' SIGINT SIGTERM
832 echo "** Trapped Ctrl-C **"
833 echo "$1" > {self.exit_code_file}
834 exit $1
835}}
836# ---"""
837 print(start_wrapper, file=batch_file)
838

◆ _add_wrapper_script_teardown()

_add_wrapper_script_teardown ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
an exit code of the job cmd being written out to a file. Which means that we can know if the command was
successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
file.

Definition at line 839 of file backends.py.

839 def _add_wrapper_script_teardown(self, job, batch_file):
840 """
841 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
842 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
843 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
844 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
845 file.
846 """
847 end_wrapper = """# ---
848write_exit_code $?"""
849 print(end_wrapper, file=batch_file)
850

◆ _create_cmd()

_create_cmd ( self,
script_path )
protected
 

Reimplemented from Batch.

Definition at line 1406 of file backends.py.

1406 def _create_cmd(self, script_path):
1407 """
1408 """
1409 submission_cmd = self.submission_cmds[:]
1410 submission_cmd.append(script_path.as_posix())
1411 return submission_cmd
1412

◆ _create_job_result()

_create_job_result ( cls,
job,
batch_output )
protected
 

Reimplemented from Batch.

Definition at line 1399 of file backends.py.

1399 def _create_job_result(cls, job, batch_output):
1400 """
1401 """
1402 job_id = batch_output.replace("\n", "")
1403 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1404 job.result = cls.PBSResult(job, job_id)
1405

◆ _create_parent_job_result()

_create_parent_job_result ( cls,
parent )
protected
We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
statuses and allows the use of ready().

Reimplemented from Backend.

Definition at line 1422 of file backends.py.

1422 def _create_parent_job_result(cls, parent):
1423 parent.result = cls.PBSResult(parent, None)
1424

◆ _make_submit_file()

_make_submit_file ( self,
job,
submit_file_path )
protectedinherited
Useful for the HTCondor backend where a submit is needed instead of batch
directives pasted directly into the submission script. It should be overwritten
if needed.

Reimplemented in HTCondor.

Definition at line 1180 of file backends.py.

1180 def _make_submit_file(self, job, submit_file_path):
1181 """
1182 Useful for the HTCondor backend where a submit is needed instead of batch
1183 directives pasted directly into the submission script. It should be overwritten
1184 if needed.
1185 """
1186

◆ _submit_to_batch()

_submit_to_batch ( cls,
cmd )
protected
Do the actual batch submission command and collect the output to find out the job id for later monitoring.

Reimplemented from Batch.

Definition at line 1414 of file backends.py.

1414 def _submit_to_batch(cls, cmd):
1415 """
1416 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1417 """
1418 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
1419 return sub_out
1420

◆ can_submit()

can_submit ( self,
njobs = 1 )
Checks the global number of jobs in PBS right now (submitted or running) for this user.
Returns True if the number is lower that the limit, False if it is higher.

Parameters:
    njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
        are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
        assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
        So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
        and check again before submitting more.

Reimplemented from Batch.

Definition at line 1514 of file backends.py.

1514 def can_submit(self, njobs=1):
1515 """
1516 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1517 Returns True if the number is lower that the limit, False if it is higher.
1518
1519 Parameters:
1520 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1521 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1522 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1523 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1524 and check again before submitting more.
1525 """
1526 B2DEBUG(29, "Calling PBS().can_submit()")
1527 job_info = self.qstat(username=os.environ["USER"])
1528 total_jobs = job_info["NJOBS"]
1529 B2INFO(f"Total jobs active in the PBS system is currently {total_jobs}")
1530 if (total_jobs + njobs) > self.global_job_limit:
1531 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1532 return False
1533 else:
1534 B2INFO("There is enough space to submit more jobs.")
1535 return True
1536

◆ create_job_record_from_element()

create_job_record_from_element ( job_elem)
static
Creates a Job dictionary with various job information from the XML element returned by qstat.

Parameters:
    job_elem (xml.etree.ElementTree.Element): The XML Element of the Job

Returns:
    dict: JSON serialisable dictionary of the Job information we are interested in.

Definition at line 1606 of file backends.py.

1606 def create_job_record_from_element(job_elem):
1607 """
1608 Creates a Job dictionary with various job information from the XML element returned by qstat.
1609
1610 Parameters:
1611 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1612
1613 Returns:
1614 dict: JSON serialisable dictionary of the Job information we are interested in.
1615 """
1616 job_dict = {}
1617 job_dict["Job_Id"] = job_elem.find("Job_Id").text
1618 job_dict["Job_Name"] = job_elem.find("Job_Name").text
1619 job_dict["Job_Owner"] = job_elem.find("Job_Owner").text
1620 job_dict["job_state"] = job_elem.find("job_state").text
1621 job_dict["queue"] = job_elem.find("queue").text
1622 return job_dict
1623
1624

◆ get_batch_submit_script_path()

get_batch_submit_script_path ( self,
job )
inherited
Construct the Path object of the script file that we will submit using the batch command.
For most batch backends this is the same script as the bash script we submit.
But for some they require a separate submission file that describes the job.
To implement that you can implement this function in the Backend class.

Reimplemented in HTCondor.

Definition at line 1336 of file backends.py.

1336 def get_batch_submit_script_path(self, job):
1337 """
1338 Construct the Path object of the script file that we will submit using the batch command.
1339 For most batch backends this is the same script as the bash script we submit.
1340 But for some they require a separate submission file that describes the job.
1341 To implement that you can implement this function in the Backend class.
1342 """
1343 return Path(job.working_dir, self.submit_script)
1344

◆ get_submit_script_path()

get_submit_script_path ( self,
job )
inherited
Construct the Path object of the bash script file that we will submit. It will contain
the actual job command, wrapper commands, setup commands, and any batch directives

Definition at line 860 of file backends.py.

860 def get_submit_script_path(self, job):
861 """
862 Construct the Path object of the bash script file that we will submit. It will contain
863 the actual job command, wrapper commands, setup commands, and any batch directives
864 """
865 return Path(job.working_dir, self.submit_script)
866
867

◆ qstat()

qstat ( cls,
username = "",
job_ids = None )
Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
by qstat.

PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
This one should work for Melbourne's cloud computing centre.

Keyword Args:
    username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
        will be in the output dictionary.
    job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
        the output of this function will be only information about this jobs. If this argument is not given, then all jobs
        matching the other filters will be returned.

Returns:
    dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:

    .. code-block:: python

      {
        "NJOBS": int
        "JOBS":[
                {
                  <key: value>, ...
                }, ...
               ]
      }

Definition at line 1538 of file backends.py.

1538 def qstat(cls, username="", job_ids=None):
1539 """
1540 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1541 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1542 by qstat.
1543
1544 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1545 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1546 This one should work for Melbourne's cloud computing centre.
1547
1548 Keyword Args:
1549 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1550 will be in the output dictionary.
1551 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1552 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1553 matching the other filters will be returned.
1554
1555 Returns:
1556 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1557
1558 .. code-block:: python
1559
1560 {
1561 "NJOBS": int
1562 "JOBS":[
1563 {
1564 <key: value>, ...
1565 }, ...
1566 ]
1567 }
1568 """
1569 B2DEBUG(29, f"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1570 if not job_ids:
1571 job_ids = []
1572 job_ids = set(job_ids)
1573 cmd_list = ["qstat", "-x"]
1574 # We get an XML serialisable summary from qstat. Requires the shell argument.
1575 cmd = " ".join(cmd_list)
1576 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1577 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1578 jobs_dict = {"NJOBS": 0, "JOBS": []}
1579 jobs_xml = ET.fromstring(output)
1580
1581 # For a specific job_id we can be a bit more efficient in XML parsing
1582 if len(job_ids) == 1:
1583 job_elem = jobs_xml.find(f"./Job[Job_Id='{list(job_ids)[0]}']")
1584 if job_elem:
1585 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job_elem))
1586 jobs_dict["NJOBS"] = 1
1587 return jobs_dict
1588
1589 # Since the username given is not exactly the same as the one that PBS stores (<username>@host)
1590 # we have to simply loop through rather than using XPATH.
1591 for job in jobs_xml.iterfind("Job"):
1592 job_owner = job.find("Job_Owner").text.split("@")[0]
1593 if username and username != job_owner:
1594 continue
1595 job_id = job.find("Job_Id").text
1596 if job_ids and job_id not in job_ids:
1597 continue
1598 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job))
1599 jobs_dict["NJOBS"] += 1
1600 # Remove it so that we don't keep checking for it
1601 if job_id in job_ids:
1602 job_ids.remove(job_id)
1603 return jobs_dict
1604

◆ submit()

submit ( self,
job,
check_can_submit = True,
jobs_per_check = 100 )
inherited
 

Reimplemented from Backend.

Definition at line 1205 of file backends.py.

1205 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1206 """
1207 """
1208 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1209 "Did you submit a (Sub)Job?")
1210

Member Data Documentation

◆ backend_args

dict backend_args = {**self.default_backend_args, **backend_args}
inherited

The backend args that will be applied to jobs unless the job specifies them itself.

Definition at line 797 of file backends.py.

◆ cmd_name

str cmd_name = "#PBS -N"
static

Job name directive.

Definition at line 1370 of file backends.py.

◆ cmd_queue

str cmd_queue = "#PBS -q"
static

Queue directive.

Definition at line 1368 of file backends.py.

◆ cmd_stderr

str cmd_stderr = "#PBS -e"
static

stderr file directive

Definition at line 1366 of file backends.py.

◆ cmd_stdout

str cmd_stdout = "#PBS -o"
static

stdout file directive

Definition at line 1364 of file backends.py.

◆ cmd_wkdir

str cmd_wkdir = "#PBS -d"
static

Working directory directive.

Definition at line 1362 of file backends.py.

◆ default_backend_args

dict default_backend_args = {}
staticinherited

Default backend_args.

Definition at line 789 of file backends.py.

◆ default_global_job_limit

int default_global_job_limit = 1000
staticinherited

Default global limit on the total number of submitted/running jobs that the user can have.

This limit will not affect the total number of jobs that are eventually submitted. But the jobs won't actually be submitted until this limit can be respected i.e. until the number of total jobs in the Batch system goes down. Since we actually submit in chunks of N jobs, before checking this limit value again, this value needs to be a little lower than the real batch system limit. Otherwise you could accidentally go over during the N job submission if other processes are checking and submitting concurrently. This is quite common for the first submission of jobs from parallel calibrations.

Note that if there are other jobs already submitted for your account, then these will count towards this limit.

Definition at line 1156 of file backends.py.

◆ default_sleep_between_submission_checks

int default_sleep_between_submission_checks = 30
staticinherited

Default time betweeon re-checking if the active jobs is below the global job limit.

Definition at line 1158 of file backends.py.

◆ exit_code_file

str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
staticinherited

Default exit code file name.

Definition at line 787 of file backends.py.

◆ global_job_limit

int global_job_limit = self.default_global_job_limit
inherited

The active job limit.

This is 'global' because we want to prevent us accidentally submitting too many jobs from all current and previous submission scripts.

Definition at line 1167 of file backends.py.

◆ sleep_between_submission_checks

int sleep_between_submission_checks = self.default_sleep_between_submission_checks
inherited

Seconds we wait before checking if we can submit a list of jobs.

Only relevant once we hit the global limit of active jobs, which is a lot usually.

Definition at line 1170 of file backends.py.

◆ submission_cmds

list submission_cmds = []
staticinherited

Shell command to submit a script, should be implemented in the derived class.

Definition at line 1143 of file backends.py.

◆ submit_script

submit_script = "submit.sh"
staticinherited

Default submission script name.

Definition at line 785 of file backends.py.


The documentation for this class was generated from the following file: