Belle II Software development
HTCondor Class Reference
Inheritance diagram for HTCondor:
Batch Backend

Classes

class  HTCondorResult
 

Public Member Functions

 get_batch_submit_script_path (self, job)
 
 can_submit (self, njobs=1)
 
 condor_q (cls, class_ads=None, job_id="", username="")
 
 condor_history (cls, class_ads=None, job_id="", username="")
 
 submit (self, job, check_can_submit=True, jobs_per_check=100)
 
 get_submit_script_path (self, job)
 

Public Attributes

int global_job_limit = self.default_global_job_limit
 The active job limit.
 
int sleep_between_submission_checks = self.default_sleep_between_submission_checks
 Seconds we wait before checking if we can submit a list of jobs.
 
dict backend_args = {**self.default_backend_args, **backend_args}
 The backend args that will be applied to jobs unless the job specifies them itself.
 

Static Public Attributes

str batch_submit_script = "submit.sub"
 HTCondor batch script (different to the wrapper script of Backend.submit_script)
 
list default_class_ads = ["GlobalJobId", "JobStatus", "Owner"]
 Default ClassAd attributes to return from commands like condor_q.
 
list submission_cmds = []
 Shell command to submit a script, should be implemented in the derived class.
 
int default_global_job_limit = 1000
 Default global limit on the total number of submitted/running jobs that the user can have.
 
int default_sleep_between_submission_checks = 30
 Default time betweeon re-checking if the active jobs is below the global job limit.
 
str submit_script = "submit.sh"
 Default submission script name.
 
str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
 Default exit code file name.
 
dict default_backend_args = {}
 Default backend_args.
 

Protected Member Functions

 _make_submit_file (self, job, submit_file_path)
 
 _add_batch_directives (self, job, batch_file)
 
 _create_cmd (self, script_path)
 
 _submit_to_batch (cls, cmd)
 
 _create_job_result (cls, job, job_id)
 
 _create_parent_job_result (cls, parent)
 
 _ (self, job, check_can_submit=True, jobs_per_check=100)
 
 _ (self, job, check_can_submit=True, jobs_per_check=100)
 
 _ (self, jobs, check_can_submit=True, jobs_per_check=100)
 
 _add_wrapper_script_setup (self, job, batch_file)
 
 _add_wrapper_script_teardown (self, job, batch_file)
 

Static Protected Member Functions

 _add_setup (job, batch_file)
 

Detailed Description

Backend for submitting calibration processes to a HTCondor batch system.

Definition at line 1924 of file backends.py.

Member Function Documentation

◆ _() [1/3]

_ ( self,
job,
check_can_submit = True,
jobs_per_check = 100 )
protectedinherited
Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
create batch script, and send it off with the batch submission command.
It should apply the correct options (default and user requested).

Should set a Result object as an attribute of the job.

Definition at line 1211 of file backends.py.

1211 def _(self, job, check_can_submit=True, jobs_per_check=100):
1212 """
1213 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1214 create batch script, and send it off with the batch submission command.
1215 It should apply the correct options (default and user requested).
1216
1217 Should set a Result object as an attribute of the job.
1218 """
1219 # Make sure the output directory of the job is created, commented out due to permission issues
1220 # job.output_dir.mkdir(parents=True, exist_ok=True)
1221 # Make sure the working directory of the job is created
1222 job.working_dir.mkdir(parents=True, exist_ok=True)
1223 job.copy_input_sandbox_files_to_working_dir()
1224 job.dump_input_data()
1225 # Make submission file if needed
1226 batch_submit_script_path = self.get_batch_submit_script_path(job)
1227 self._make_submit_file(job, batch_submit_script_path)
1228 # Get the bash file we will actually run, might be the same file
1229 script_path = self.get_submit_script_path(job)
1230 # Construct the batch submission script (with directives if that is supported)
1231 with open(script_path, mode="w") as batch_file:
1232 self._add_batch_directives(job, batch_file)
1233 self._add_wrapper_script_setup(job, batch_file)
1234 self._add_setup(job, batch_file)
1235 print(job.full_command, file=batch_file)
1236 self._add_wrapper_script_teardown(job, batch_file)
1237 os.chmod(script_path, 0o755)
1238 B2INFO(f"Submitting {job}")
1239 # Do the actual batch submission
1240 cmd = self._create_cmd(batch_submit_script_path)
1241 output = self._submit_to_batch(cmd)
1242 self._create_job_result(job, output)
1243 job.status = "submitted"
1244 B2INFO(f"{job} submitted")
1245

◆ _() [2/3]

_ ( self,
job,
check_can_submit = True,
jobs_per_check = 100 )
protectedinherited
Submit method of Batch backend. Should take job object, create needed directories, create batch script,
and send it off with the batch submission command, applying the correct options (default and user requested.)

Should set a Result object as an attribute of the job.

Definition at line 1247 of file backends.py.

1247 def _(self, job, check_can_submit=True, jobs_per_check=100):
1248 """
1249 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1250 and send it off with the batch submission command, applying the correct options (default and user requested.)
1251
1252 Should set a Result object as an attribute of the job.
1253 """
1254 # Make sure the output directory of the job is created, commented out due to permissions issue
1255 # job.output_dir.mkdir(parents=True, exist_ok=True)
1256 # Make sure the working directory of the job is created
1257 job.working_dir.mkdir(parents=True, exist_ok=True)
1258 # Check if we have any valid input files
1259 job.check_input_data_files()
1260 # Add any required backend args that are missing (I'm a bit hesitant to actually merge with job.backend_args)
1261 # just in case you want to resubmit the same job with different backend settings later.
1262 # job_backend_args = {**self.backend_args, **job.backend_args}
1263
1264 # If there's no splitter then we just submit the Job with no SubJobs
1265 if not job.splitter:
1266 # Get all of the requested files for the input sandbox and copy them to the working directory
1267 job.copy_input_sandbox_files_to_working_dir()
1268 job.dump_input_data()
1269 # Make submission file if needed
1270 batch_submit_script_path = self.get_batch_submit_script_path(job)
1271 self._make_submit_file(job, batch_submit_script_path)
1272 # Get the bash file we will actually run
1273 script_path = self.get_submit_script_path(job)
1274 # Construct the batch submission script (with directives if that is supported)
1275 with open(script_path, mode="w") as batch_file:
1276 self._add_batch_directives(job, batch_file)
1277 self._add_wrapper_script_setup(job, batch_file)
1278 self._add_setup(job, batch_file)
1279 print(job.full_command, file=batch_file)
1280 self._add_wrapper_script_teardown(job, batch_file)
1281 os.chmod(script_path, 0o755)
1282 B2INFO(f"Submitting {job}")
1283 # Do the actual batch submission
1284 cmd = self._create_cmd(batch_submit_script_path)
1285 output = self._submit_to_batch(cmd)
1286 self._create_job_result(job, output)
1287 job.status = "submitted"
1288 B2INFO(f"{job} submitted")
1289 else:
1290 # Create subjobs according to the splitter's logic
1291 job.splitter.create_subjobs(job)
1292 # Submit the subjobs
1293 self.submit(list(job.subjobs.values()))
1294 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1295 self._create_parent_job_result(job)
1296

◆ _() [3/3]

_ ( self,
jobs,
check_can_submit = True,
jobs_per_check = 100 )
protectedinherited
Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.

Definition at line 1298 of file backends.py.

1298 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1299 """
1300 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1301 """
1302 B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1303 # Technically this could be a list of Jobs or SubJobs. And if it is a list of Jobs then it might not
1304 # be necessary to check if we can submit right now. We could do it later during the submission of the
1305 # SubJob list. However in the interest of simpler code we just do the check here, and re-check again
1306 # if a SubJob list comes through this function. Slightly inefficient, but much simpler logic.
1307
1308 # The first thing to do is make sure that we are iterating through the jobs list in chunks that are
1309 # equal to or smaller than the global limit. Otherwise nothing will ever submit.
1310
1311 if jobs_per_check > self.global_job_limit:
1312 B2INFO(f"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1313 f"limit for this backend (={self.global_job_limit}). Will instead use the "
1314 " value of the global job limit.")
1315 jobs_per_check = self.global_job_limit
1316
1317 # We group the jobs list into chunks of length jobs_per_check
1318 for jobs_to_submit in grouper(jobs_per_check, jobs):
1319 # Wait until we are allowed to submit
1320 while not self.can_submit(njobs=len(jobs_to_submit)):
1321 B2INFO("Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1322 time.sleep(self.sleep_between_submission_checks)
1323 else:
1324 # We loop here since we have already checked if the number of jobs is low enough, we don't want to hit this
1325 # function again unless one of the jobs has subjobs.
1326 B2INFO(f"Submitting the next {len(jobs_to_submit)} jobs...")
1327 for job in jobs_to_submit:
1328 self.submit(job, check_can_submit, jobs_per_check)
1329 B2INFO(f"All {len(jobs)} requested jobs submitted")
1330

◆ _add_batch_directives()

_add_batch_directives ( self,
job,
batch_file )
protected
For HTCondor leave empty as the directives are already included in the submit file.

Reimplemented from Batch.

Definition at line 1971 of file backends.py.

1971 def _add_batch_directives(self, job, batch_file):
1972 """
1973 For HTCondor leave empty as the directives are already included in the submit file.
1974 """
1975 print('#!/bin/bash', file=batch_file)
1976

◆ _add_setup()

_add_setup ( job,
batch_file )
staticprotectedinherited
Adds setup lines to the shell script file.

Definition at line 806 of file backends.py.

806 def _add_setup(job, batch_file):
807 """
808 Adds setup lines to the shell script file.
809 """
810 for line in job.setup_cmds:
811 print(line, file=batch_file)
812

◆ _add_wrapper_script_setup()

_add_wrapper_script_setup ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
`trap` statements for Ctrl-C situations.

Definition at line 813 of file backends.py.

813 def _add_wrapper_script_setup(self, job, batch_file):
814 """
815 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
816 `trap` statements for Ctrl-C situations.
817 """
818 start_wrapper = f"""# ---
819# trap ctrl-c and call ctrl_c()
820trap '(ctrl_c 130)' SIGINT
821trap '(ctrl_c 143)' SIGTERM
822
823function write_exit_code() {{
824 echo "Writing $1 to exit status file"
825 echo "$1" > {self.exit_code_file}
826 exit $1
827}}
828
829function ctrl_c() {{
830 trap '' SIGINT SIGTERM
831 echo "** Trapped Ctrl-C **"
832 echo "$1" > {self.exit_code_file}
833 exit $1
834}}
835# ---"""
836 print(start_wrapper, file=batch_file)
837

◆ _add_wrapper_script_teardown()

_add_wrapper_script_teardown ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
an exit code of the job cmd being written out to a file. Which means that we can know if the command was
successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
file.

Definition at line 838 of file backends.py.

838 def _add_wrapper_script_teardown(self, job, batch_file):
839 """
840 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
841 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
842 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
843 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
844 file.
845 """
846 end_wrapper = """# ---
847write_exit_code $?"""
848 print(end_wrapper, file=batch_file)
849

◆ _create_cmd()

_create_cmd ( self,
script_path )
protected
 

Reimplemented from Batch.

Definition at line 1977 of file backends.py.

1977 def _create_cmd(self, script_path):
1978 """
1979 """
1980 submission_cmd = self.submission_cmds[:]
1981 submission_cmd.append(script_path.as_posix())
1982 return submission_cmd
1983

◆ _create_job_result()

_create_job_result ( cls,
job,
job_id )
protected
 

Reimplemented from Batch.

Definition at line 2121 of file backends.py.

2121 def _create_job_result(cls, job, job_id):
2122 """
2123 """
2124 B2INFO(f"Job ID of {job} recorded as: {job_id}")
2125 job.result = cls.HTCondorResult(job, job_id)
2126

◆ _create_parent_job_result()

_create_parent_job_result ( cls,
parent )
protected
We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
statuses and allows the use of ready().

Reimplemented from Backend.

Definition at line 2128 of file backends.py.

2128 def _create_parent_job_result(cls, parent):
2129 parent.result = cls.HTCondorResult(parent, None)
2130

◆ _make_submit_file()

_make_submit_file ( self,
job,
submit_file_path )
protected
Fill HTCondor submission file.

Reimplemented from Batch.

Definition at line 1945 of file backends.py.

1945 def _make_submit_file(self, job, submit_file_path):
1946 """
1947 Fill HTCondor submission file.
1948 """
1949 # Find all files/directories in the working directory to copy on the worker node
1950
1951 files_to_transfer = [i.as_posix() for i in job.working_dir.iterdir()]
1952
1953 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1954
1955 with open(submit_file_path, "w") as submit_file:
1956 print(f'executable = {self.get_submit_script_path(job)}', file=submit_file)
1957 print(f'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1958 print(f'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1959 print(f'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1960 print('transfer_input_files = ', ','.join(files_to_transfer), file=submit_file)
1961 print(f'universe = {job_backend_args["universe"]}', file=submit_file)
1962 print(f'getenv = {job_backend_args["getenv"]}', file=submit_file)
1963 print(f'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1964 print('should_transfer_files = Yes', file=submit_file)
1965 print('when_to_transfer_output = ON_EXIT', file=submit_file)
1966 # Any other lines in the backend args that we don't deal with explicitly but maybe someone wants to insert something
1967 for line in job_backend_args["extra_lines"]:
1968 print(line, file=submit_file)
1969 print('queue', file=submit_file)
1970

◆ _submit_to_batch()

_submit_to_batch ( cls,
cmd )
protected
Do the actual batch submission command and collect the output to find out the job id for later monitoring.

Reimplemented from Batch.

Definition at line 1991 of file backends.py.

1991 def _submit_to_batch(cls, cmd):
1992 """
1993 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1994 """
1995 job_dir = Path(cmd[-1]).parent.as_posix()
1996 sub_out = ""
1997 attempt = 0
1998 sleep_time = 30
1999
2000 while attempt < 3:
2001 try:
2002 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, cwd=job_dir)
2003 break
2004 except subprocess.CalledProcessError as e:
2005 attempt += 1
2006 if attempt == 3:
2007 B2ERROR(f"Error during condor_submit: {str(e)} occurred more than 3 times.")
2008 raise e
2009 else:
2010 B2ERROR(f"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
2011 time.sleep(30)
2012 return re.search(r"(\d+\.\d+) - \d+\.\d+", sub_out).groups()[0]
2013

◆ can_submit()

can_submit ( self,
njobs = 1 )
Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
Returns True if the number is lower that the limit, False if it is higher.

Parameters:
    njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
        are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
        assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
        So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
        and check again before submitting more.

Reimplemented from Batch.

Definition at line 2131 of file backends.py.

2131 def can_submit(self, njobs=1):
2132 """
2133 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2134 Returns True if the number is lower that the limit, False if it is higher.
2135
2136 Parameters:
2137 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2138 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2139 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2140 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2141 and check again before submitting more.
2142 """
2143 B2DEBUG(29, "Calling HTCondor().can_submit()")
2144 jobs_info = self.condor_q()
2145 total_jobs = jobs_info["NJOBS"]
2146 B2INFO(f"Total jobs active in the HTCondor system is currently {total_jobs}")
2147 if (total_jobs + njobs) > self.global_job_limit:
2148 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2149 return False
2150 else:
2151 B2INFO("There is enough space to submit more jobs.")
2152 return True
2153

◆ condor_history()

condor_history ( cls,
class_ads = None,
job_id = "",
username = "" )
Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.

Parameters:
    class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
        By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
        by the condor_q call.
    job_id (str): String representation of the Job ID given by condor_submit during submission.
        If this argument is given then the output of this function will be only information about this job.
        If this argument is not given, then all jobs matching the other filters will be returned.
    username (str): By default we return information about only the current user's jobs. By giving
        a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
        receive job information from all known user jobs matching the other filters. This is limited to 10000 records
        and isn't recommended.

Returns:
    dict: JSON dictionary of the form:

    .. code-block:: python

      {
        "NJOBS":<number of records returned by command>,
        "JOBS":[
                {
                 <ClassAd: value>, ...
                }, ...
               ]
      }

Definition at line 2222 of file backends.py.

2222 def condor_history(cls, class_ads=None, job_id="", username=""):
2223 """
2224 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2225 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2226 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2227
2228 Parameters:
2229 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2230 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2231 by the condor_q call.
2232 job_id (str): String representation of the Job ID given by condor_submit during submission.
2233 If this argument is given then the output of this function will be only information about this job.
2234 If this argument is not given, then all jobs matching the other filters will be returned.
2235 username (str): By default we return information about only the current user's jobs. By giving
2236 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2237 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2238 and isn't recommended.
2239
2240 Returns:
2241 dict: JSON dictionary of the form:
2242
2243 .. code-block:: python
2244
2245 {
2246 "NJOBS":<number of records returned by command>,
2247 "JOBS":[
2248 {
2249 <ClassAd: value>, ...
2250 }, ...
2251 ]
2252 }
2253 """
2254 B2DEBUG(29, f"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2255 if not class_ads:
2256 class_ads = cls.default_class_ads
2257 # Output fields should be comma separated.
2258 field_list_cmd = ",".join(class_ads)
2259 cmd_list = ["condor_history", "-json", "-attributes", field_list_cmd]
2260 # If job_id is set then we ignore all other filters
2261 if job_id:
2262 cmd_list.append(job_id)
2263 else:
2264 if not username:
2265 username = os.environ["USER"]
2266 # If the username is set to all it is a special case
2267 if username != "all":
2268 cmd_list.append(username)
2269 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2270 cmd = " ".join(cmd_list)
2271 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2272 try:
2273 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2274 except BaseException:
2275 records = None
2276
2277 if records:
2278 records = decode_json_string(records)
2279 else:
2280 records = []
2281
2282 jobs_info = {"JOBS": records}
2283 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2284 return jobs_info
2285
2286

◆ condor_q()

condor_q ( cls,
class_ads = None,
job_id = "",
username = "" )
Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
The result is the JSON dictionary returned by output of the ``-json`` condor_q option.

Parameters:
    class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
        By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
        by the condor_q call.
    job_id (str): String representation of the Job ID given by condor_submit during submission.
        If this argument is given then the output of this function will be only information about this job.
        If this argument is not given, then all jobs matching the other filters will be returned.
    username (str): By default we return information about only the current user's jobs. By giving
        a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
        receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
        so it isn't recommended.

Returns:
    dict: JSON dictionary of the form:

    .. code-block:: python

      {
        "NJOBS":<number of records returned by command>,
        "JOBS":[
                {
                 <ClassAd: value>, ...
                }, ...
               ]
      }

Definition at line 2155 of file backends.py.

2155 def condor_q(cls, class_ads=None, job_id="", username=""):
2156 """
2157 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2158 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2159 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2160
2161 Parameters:
2162 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2163 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2164 by the condor_q call.
2165 job_id (str): String representation of the Job ID given by condor_submit during submission.
2166 If this argument is given then the output of this function will be only information about this job.
2167 If this argument is not given, then all jobs matching the other filters will be returned.
2168 username (str): By default we return information about only the current user's jobs. By giving
2169 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2170 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2171 so it isn't recommended.
2172
2173 Returns:
2174 dict: JSON dictionary of the form:
2175
2176 .. code-block:: python
2177
2178 {
2179 "NJOBS":<number of records returned by command>,
2180 "JOBS":[
2181 {
2182 <ClassAd: value>, ...
2183 }, ...
2184 ]
2185 }
2186 """
2187 B2DEBUG(29, f"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2188 if not class_ads:
2189 class_ads = cls.default_class_ads
2190 # Output fields should be comma separated.
2191 field_list_cmd = ",".join(class_ads)
2192 cmd_list = ["condor_q", "-json", "-attributes", field_list_cmd]
2193 # If job_id is set then we ignore all other filters
2194 if job_id:
2195 cmd_list.append(job_id)
2196 else:
2197 if not username:
2198 username = os.environ["USER"]
2199 # If the username is set to all it is a special case
2200 if username == "all":
2201 cmd_list.append("-allusers")
2202 else:
2203 cmd_list.append(username)
2204 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2205 cmd = " ".join(cmd_list)
2206 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2207 # condor_q occasionally fails
2208 try:
2209 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2210 except BaseException:
2211 records = None
2212
2213 if records:
2214 records = decode_json_string(records)
2215 else:
2216 records = []
2217 jobs_info = {"JOBS": records}
2218 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2219 return jobs_info
2220

◆ get_batch_submit_script_path()

get_batch_submit_script_path ( self,
job )
Construct the Path object of the .sub file that we will use to describe the job.

Reimplemented from Batch.

Definition at line 1984 of file backends.py.

1984 def get_batch_submit_script_path(self, job):
1985 """
1986 Construct the Path object of the .sub file that we will use to describe the job.
1987 """
1988 return Path(job.working_dir, self.batch_submit_script)
1989

◆ get_submit_script_path()

get_submit_script_path ( self,
job )
inherited
Construct the Path object of the bash script file that we will submit. It will contain
the actual job command, wrapper commands, setup commands, and any batch directives

Definition at line 859 of file backends.py.

859 def get_submit_script_path(self, job):
860 """
861 Construct the Path object of the bash script file that we will submit. It will contain
862 the actual job command, wrapper commands, setup commands, and any batch directives
863 """
864 return Path(job.working_dir, self.submit_script)
865
866

◆ submit()

submit ( self,
job,
check_can_submit = True,
jobs_per_check = 100 )
inherited
 

Reimplemented from Backend.

Definition at line 1204 of file backends.py.

1204 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1205 """
1206 """
1207 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1208 "Did you submit a (Sub)Job?")
1209

Member Data Documentation

◆ backend_args

dict backend_args = {**self.default_backend_args, **backend_args}
inherited

The backend args that will be applied to jobs unless the job specifies them itself.

Definition at line 796 of file backends.py.

◆ batch_submit_script

batch_submit_script = "submit.sub"
static

HTCondor batch script (different to the wrapper script of Backend.submit_script)

Definition at line 1929 of file backends.py.

◆ default_backend_args

dict default_backend_args = {}
staticinherited

Default backend_args.

Definition at line 788 of file backends.py.

◆ default_class_ads

list default_class_ads = ["GlobalJobId", "JobStatus", "Owner"]
static

Default ClassAd attributes to return from commands like condor_q.

Definition at line 1943 of file backends.py.

◆ default_global_job_limit

int default_global_job_limit = 1000
staticinherited

Default global limit on the total number of submitted/running jobs that the user can have.

This limit will not affect the total number of jobs that are eventually submitted. But the jobs won't actually be submitted until this limit can be respected i.e. until the number of total jobs in the Batch system goes down. Since we actually submit in chunks of N jobs, before checking this limit value again, this value needs to be a little lower than the real batch system limit. Otherwise you could accidentally go over during the N job submission if other processes are checking and submitting concurrently. This is quite common for the first submission of jobs from parallel calibrations.

Note that if there are other jobs already submitted for your account, then these will count towards this limit.

Definition at line 1155 of file backends.py.

◆ default_sleep_between_submission_checks

int default_sleep_between_submission_checks = 30
staticinherited

Default time betweeon re-checking if the active jobs is below the global job limit.

Definition at line 1157 of file backends.py.

◆ exit_code_file

str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
staticinherited

Default exit code file name.

Definition at line 786 of file backends.py.

◆ global_job_limit

int global_job_limit = self.default_global_job_limit
inherited

The active job limit.

This is 'global' because we want to prevent us accidentally submitting too many jobs from all current and previous submission scripts.

Definition at line 1166 of file backends.py.

◆ sleep_between_submission_checks

sleep_between_submission_checks = self.default_sleep_between_submission_checks
inherited

Seconds we wait before checking if we can submit a list of jobs.

Only relevant once we hit the global limit of active jobs, which is a lot usually.

Definition at line 1169 of file backends.py.

◆ submission_cmds

list submission_cmds = []
staticinherited

Shell command to submit a script, should be implemented in the derived class.

Definition at line 1142 of file backends.py.

◆ submit_script

submit_script = "submit.sh"
staticinherited

Default submission script name.

Definition at line 784 of file backends.py.


The documentation for this class was generated from the following file: