Belle II Software development
Batch Class Reference
Inheritance diagram for Batch:
Backend HTCondor LSF PBS

Public Member Functions

 __init__ (self, *, backend_args=None)
 
 can_submit (self, *args, **kwargs)
 
 submit (self, job, check_can_submit=True, jobs_per_check=100)
 
 get_batch_submit_script_path (self, job)
 
 get_submit_script_path (self, job)
 

Public Attributes

int global_job_limit = self.default_global_job_limit
 The active job limit.
 
int sleep_between_submission_checks = self.default_sleep_between_submission_checks
 Seconds we wait before checking if we can submit a list of jobs.
 
dict backend_args = {**self.default_backend_args, **backend_args}
 The backend args that will be applied to jobs unless the job specifies them itself.
 

Static Public Attributes

list submission_cmds = []
 Shell command to submit a script, should be implemented in the derived class.
 
int default_global_job_limit = 1000
 Default global limit on the total number of submitted/running jobs that the user can have.
 
int default_sleep_between_submission_checks = 30
 Default time betweeon re-checking if the active jobs is below the global job limit.
 
str submit_script = "submit.sh"
 Default submission script name.
 
str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
 Default exit code file name.
 
dict default_backend_args = {}
 Default backend_args.
 

Protected Member Functions

 _add_batch_directives (self, job, file)
 
 _make_submit_file (self, job, submit_file_path)
 
 _submit_to_batch (cls, cmd)
 
 _ (self, job, check_can_submit=True, jobs_per_check=100)
 
 _ (self, job, check_can_submit=True, jobs_per_check=100)
 
 _ (self, jobs, check_can_submit=True, jobs_per_check=100)
 
 _create_job_result (cls, job, batch_output)
 
 _create_cmd (self, job)
 
 _add_wrapper_script_setup (self, job, batch_file)
 
 _add_wrapper_script_teardown (self, job, batch_file)
 
 _create_parent_job_result (cls, parent)
 

Static Protected Member Functions

 _add_setup (job, batch_file)
 

Detailed Description

Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
in a derived class. Do not use this class directly!

Definition at line 1136 of file backends.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
* ,
backend_args = None )
Init method for Batch Backend. Does some basic default setup.

Definition at line 1159 of file backends.py.

1159 def __init__(self, *, backend_args=None):
1160 """
1161 Init method for Batch Backend. Does some basic default setup.
1162 """
1163 super().__init__(backend_args=backend_args)
1164
1166 self.global_job_limit = self.default_global_job_limit
1167
1169 self.sleep_between_submission_checks = self.default_sleep_between_submission_checks
1170

Member Function Documentation

◆ _() [1/3]

_ ( self,
job,
check_can_submit = True,
jobs_per_check = 100 )
protected
Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
create batch script, and send it off with the batch submission command.
It should apply the correct options (default and user requested).

Should set a Result object as an attribute of the job.

Definition at line 1211 of file backends.py.

1211 def _(self, job, check_can_submit=True, jobs_per_check=100):
1212 """
1213 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1214 create batch script, and send it off with the batch submission command.
1215 It should apply the correct options (default and user requested).
1216
1217 Should set a Result object as an attribute of the job.
1218 """
1219 # Make sure the output directory of the job is created, commented out due to permission issues
1220 # job.output_dir.mkdir(parents=True, exist_ok=True)
1221 # Make sure the working directory of the job is created
1222 job.working_dir.mkdir(parents=True, exist_ok=True)
1223 job.copy_input_sandbox_files_to_working_dir()
1224 job.dump_input_data()
1225 # Make submission file if needed
1226 batch_submit_script_path = self.get_batch_submit_script_path(job)
1227 self._make_submit_file(job, batch_submit_script_path)
1228 # Get the bash file we will actually run, might be the same file
1229 script_path = self.get_submit_script_path(job)
1230 # Construct the batch submission script (with directives if that is supported)
1231 with open(script_path, mode="w") as batch_file:
1232 self._add_batch_directives(job, batch_file)
1233 self._add_wrapper_script_setup(job, batch_file)
1234 self._add_setup(job, batch_file)
1235 print(job.full_command, file=batch_file)
1236 self._add_wrapper_script_teardown(job, batch_file)
1237 os.chmod(script_path, 0o755)
1238 B2INFO(f"Submitting {job}")
1239 # Do the actual batch submission
1240 cmd = self._create_cmd(batch_submit_script_path)
1241 output = self._submit_to_batch(cmd)
1242 self._create_job_result(job, output)
1243 job.status = "submitted"
1244 B2INFO(f"{job} submitted")
1245

◆ _() [2/3]

_ ( self,
job,
check_can_submit = True,
jobs_per_check = 100 )
protected
Submit method of Batch backend. Should take job object, create needed directories, create batch script,
and send it off with the batch submission command, applying the correct options (default and user requested.)

Should set a Result object as an attribute of the job.

Definition at line 1247 of file backends.py.

1247 def _(self, job, check_can_submit=True, jobs_per_check=100):
1248 """
1249 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1250 and send it off with the batch submission command, applying the correct options (default and user requested.)
1251
1252 Should set a Result object as an attribute of the job.
1253 """
1254 # Make sure the output directory of the job is created, commented out due to permissions issue
1255 # job.output_dir.mkdir(parents=True, exist_ok=True)
1256 # Make sure the working directory of the job is created
1257 job.working_dir.mkdir(parents=True, exist_ok=True)
1258 # Check if we have any valid input files
1259 job.check_input_data_files()
1260 # Add any required backend args that are missing (I'm a bit hesitant to actually merge with job.backend_args)
1261 # just in case you want to resubmit the same job with different backend settings later.
1262 # job_backend_args = {**self.backend_args, **job.backend_args}
1263
1264 # If there's no splitter then we just submit the Job with no SubJobs
1265 if not job.splitter:
1266 # Get all of the requested files for the input sandbox and copy them to the working directory
1267 job.copy_input_sandbox_files_to_working_dir()
1268 job.dump_input_data()
1269 # Make submission file if needed
1270 batch_submit_script_path = self.get_batch_submit_script_path(job)
1271 self._make_submit_file(job, batch_submit_script_path)
1272 # Get the bash file we will actually run
1273 script_path = self.get_submit_script_path(job)
1274 # Construct the batch submission script (with directives if that is supported)
1275 with open(script_path, mode="w") as batch_file:
1276 self._add_batch_directives(job, batch_file)
1277 self._add_wrapper_script_setup(job, batch_file)
1278 self._add_setup(job, batch_file)
1279 print(job.full_command, file=batch_file)
1280 self._add_wrapper_script_teardown(job, batch_file)
1281 os.chmod(script_path, 0o755)
1282 B2INFO(f"Submitting {job}")
1283 # Do the actual batch submission
1284 cmd = self._create_cmd(batch_submit_script_path)
1285 output = self._submit_to_batch(cmd)
1286 self._create_job_result(job, output)
1287 job.status = "submitted"
1288 B2INFO(f"{job} submitted")
1289 else:
1290 # Create subjobs according to the splitter's logic
1291 job.splitter.create_subjobs(job)
1292 # Submit the subjobs
1293 self.submit(list(job.subjobs.values()))
1294 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1295 self._create_parent_job_result(job)
1296

◆ _() [3/3]

_ ( self,
jobs,
check_can_submit = True,
jobs_per_check = 100 )
protected
Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.

Definition at line 1298 of file backends.py.

1298 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1299 """
1300 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1301 """
1302 B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1303 # Technically this could be a list of Jobs or SubJobs. And if it is a list of Jobs then it might not
1304 # be necessary to check if we can submit right now. We could do it later during the submission of the
1305 # SubJob list. However in the interest of simpler code we just do the check here, and re-check again
1306 # if a SubJob list comes through this function. Slightly inefficient, but much simpler logic.
1307
1308 # The first thing to do is make sure that we are iterating through the jobs list in chunks that are
1309 # equal to or smaller than the global limit. Otherwise nothing will ever submit.
1310
1311 if jobs_per_check > self.global_job_limit:
1312 B2INFO(f"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1313 f"limit for this backend (={self.global_job_limit}). Will instead use the "
1314 " value of the global job limit.")
1315 jobs_per_check = self.global_job_limit
1316
1317 # We group the jobs list into chunks of length jobs_per_check
1318 for jobs_to_submit in grouper(jobs_per_check, jobs):
1319 # Wait until we are allowed to submit
1320 while not self.can_submit(njobs=len(jobs_to_submit)):
1321 B2INFO("Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1322 time.sleep(self.sleep_between_submission_checks)
1323 else:
1324 # We loop here since we have already checked if the number of jobs is low enough, we don't want to hit this
1325 # function again unless one of the jobs has subjobs.
1326 B2INFO(f"Submitting the next {len(jobs_to_submit)} jobs...")
1327 for job in jobs_to_submit:
1328 self.submit(job, check_can_submit, jobs_per_check)
1329 B2INFO(f"All {len(jobs)} requested jobs submitted")
1330

◆ _add_batch_directives()

_add_batch_directives ( self,
job,
file )
protected
Should be implemented in a derived class to write a batch submission script to the job.working_dir.
You should think about where the stdout/err should go, and set the queue name.

Reimplemented in HTCondor, LSF, and PBS.

Definition at line 1171 of file backends.py.

1171 def _add_batch_directives(self, job, file):
1172 """
1173 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1174 You should think about where the stdout/err should go, and set the queue name.
1175 """
1176 raise NotImplementedError("Need to implement a _add_batch_directives(self, job, file) "
1177 f"method in {self.__class__.__name__} backend.")
1178

◆ _add_setup()

_add_setup ( job,
batch_file )
staticprotectedinherited
Adds setup lines to the shell script file.

Definition at line 806 of file backends.py.

806 def _add_setup(job, batch_file):
807 """
808 Adds setup lines to the shell script file.
809 """
810 for line in job.setup_cmds:
811 print(line, file=batch_file)
812

◆ _add_wrapper_script_setup()

_add_wrapper_script_setup ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
`trap` statements for Ctrl-C situations.

Definition at line 813 of file backends.py.

813 def _add_wrapper_script_setup(self, job, batch_file):
814 """
815 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
816 `trap` statements for Ctrl-C situations.
817 """
818 start_wrapper = f"""# ---
819# trap ctrl-c and call ctrl_c()
820trap '(ctrl_c 130)' SIGINT
821trap '(ctrl_c 143)' SIGTERM
822
823function write_exit_code() {{
824 echo "Writing $1 to exit status file"
825 echo "$1" > {self.exit_code_file}
826 exit $1
827}}
828
829function ctrl_c() {{
830 trap '' SIGINT SIGTERM
831 echo "** Trapped Ctrl-C **"
832 echo "$1" > {self.exit_code_file}
833 exit $1
834}}
835# ---"""
836 print(start_wrapper, file=batch_file)
837

◆ _add_wrapper_script_teardown()

_add_wrapper_script_teardown ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
an exit code of the job cmd being written out to a file. Which means that we can know if the command was
successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
file.

Definition at line 838 of file backends.py.

838 def _add_wrapper_script_teardown(self, job, batch_file):
839 """
840 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
841 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
842 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
843 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
844 file.
845 """
846 end_wrapper = """# ---
847write_exit_code $?"""
848 print(end_wrapper, file=batch_file)
849

◆ _create_cmd()

_create_cmd ( self,
job )
protected
 

Reimplemented in HTCondor, LSF, and PBS.

Definition at line 1347 of file backends.py.

1347 def _create_cmd(self, job):
1348 """
1349 """
1350
1351

◆ _create_job_result()

_create_job_result ( cls,
job,
batch_output )
protected
 

Reimplemented in HTCondor, LSF, and PBS.

Definition at line 1342 of file backends.py.

1342 def _create_job_result(cls, job, batch_output):
1343 """
1344 """
1345

◆ _create_parent_job_result()

_create_parent_job_result ( cls,
parent )
protectedinherited
We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
statuses and allows the use of ready().

Reimplemented in HTCondor, Local, LSF, and PBS.

Definition at line 851 of file backends.py.

851 def _create_parent_job_result(cls, parent):
852 """
853 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
854 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
855 statuses and allows the use of ready().
856 """
857 raise NotImplementedError
858

◆ _make_submit_file()

_make_submit_file ( self,
job,
submit_file_path )
protected
Useful for the HTCondor backend where a submit is needed instead of batch
directives pasted directly into the submission script. It should be overwritten
if needed.

Reimplemented in HTCondor.

Definition at line 1179 of file backends.py.

1179 def _make_submit_file(self, job, submit_file_path):
1180 """
1181 Useful for the HTCondor backend where a submit is needed instead of batch
1182 directives pasted directly into the submission script. It should be overwritten
1183 if needed.
1184 """
1185

◆ _submit_to_batch()

_submit_to_batch ( cls,
cmd )
protected
Do the actual batch submission command and collect the output to find out the job id for later monitoring.

Reimplemented in HTCondor, LSF, and PBS.

Definition at line 1188 of file backends.py.

1188 def _submit_to_batch(cls, cmd):
1189 """
1190 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1191 """
1192

◆ can_submit()

can_submit ( self,
* args,
** kwargs )
Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).

Returns:
    bool: If the job submission can continue based on the current situation.

Reimplemented in HTCondor, LSF, and PBS.

Definition at line 1193 of file backends.py.

1193 def can_submit(self, *args, **kwargs):
1194 """
1195 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1196 This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1197
1198 Returns:
1199 bool: If the job submission can continue based on the current situation.
1200 """
1201 return True
1202

◆ get_batch_submit_script_path()

get_batch_submit_script_path ( self,
job )
Construct the Path object of the script file that we will submit using the batch command.
For most batch backends this is the same script as the bash script we submit.
But for some they require a separate submission file that describes the job.
To implement that you can implement this function in the Backend class.

Reimplemented in HTCondor.

Definition at line 1331 of file backends.py.

1331 def get_batch_submit_script_path(self, job):
1332 """
1333 Construct the Path object of the script file that we will submit using the batch command.
1334 For most batch backends this is the same script as the bash script we submit.
1335 But for some they require a separate submission file that describes the job.
1336 To implement that you can implement this function in the Backend class.
1337 """
1338 return Path(job.working_dir, self.submit_script)
1339

◆ get_submit_script_path()

get_submit_script_path ( self,
job )
inherited
Construct the Path object of the bash script file that we will submit. It will contain
the actual job command, wrapper commands, setup commands, and any batch directives

Definition at line 859 of file backends.py.

859 def get_submit_script_path(self, job):
860 """
861 Construct the Path object of the bash script file that we will submit. It will contain
862 the actual job command, wrapper commands, setup commands, and any batch directives
863 """
864 return Path(job.working_dir, self.submit_script)
865
866

◆ submit()

submit ( self,
job,
check_can_submit = True,
jobs_per_check = 100 )
 

Reimplemented from Backend.

Definition at line 1204 of file backends.py.

1204 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1205 """
1206 """
1207 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1208 "Did you submit a (Sub)Job?")
1209

Member Data Documentation

◆ backend_args

dict backend_args = {**self.default_backend_args, **backend_args}
inherited

The backend args that will be applied to jobs unless the job specifies them itself.

Definition at line 796 of file backends.py.

◆ default_backend_args

dict default_backend_args = {}
staticinherited

Default backend_args.

Definition at line 788 of file backends.py.

◆ default_global_job_limit

int default_global_job_limit = 1000
static

Default global limit on the total number of submitted/running jobs that the user can have.

This limit will not affect the total number of jobs that are eventually submitted. But the jobs won't actually be submitted until this limit can be respected i.e. until the number of total jobs in the Batch system goes down. Since we actually submit in chunks of N jobs, before checking this limit value again, this value needs to be a little lower than the real batch system limit. Otherwise you could accidentally go over during the N job submission if other processes are checking and submitting concurrently. This is quite common for the first submission of jobs from parallel calibrations.

Note that if there are other jobs already submitted for your account, then these will count towards this limit.

Definition at line 1155 of file backends.py.

◆ default_sleep_between_submission_checks

int default_sleep_between_submission_checks = 30
static

Default time betweeon re-checking if the active jobs is below the global job limit.

Definition at line 1157 of file backends.py.

◆ exit_code_file

str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
staticinherited

Default exit code file name.

Definition at line 786 of file backends.py.

◆ global_job_limit

int global_job_limit = self.default_global_job_limit

The active job limit.

This is 'global' because we want to prevent us accidentally submitting too many jobs from all current and previous submission scripts.

Definition at line 1166 of file backends.py.

◆ sleep_between_submission_checks

sleep_between_submission_checks = self.default_sleep_between_submission_checks

Seconds we wait before checking if we can submit a list of jobs.

Only relevant once we hit the global limit of active jobs, which is a lot usually.

Definition at line 1169 of file backends.py.

◆ submission_cmds

list submission_cmds = []
static

Shell command to submit a script, should be implemented in the derived class.

Definition at line 1142 of file backends.py.

◆ submit_script

submit_script = "submit.sh"
staticinherited

Default submission script name.

Definition at line 784 of file backends.py.


The documentation for this class was generated from the following file: