11from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
14from abc
import ABC, abstractmethod
16import xml.etree.ElementTree
as ET
18from pathlib
import Path
19from collections
import deque
20from itertools
import count, takewhile
23from datetime
import datetime, timedelta
25import multiprocessing
as mp
27from caf.utils
import method_dispatch
28from caf.utils
import decode_json_string
29from caf.utils
import grouper
30from caf.utils
import parse_file_uri
33__all__ = [
"Job",
"SubJob",
"Backend",
"Local",
"Batch",
"LSF",
"PBS",
"HTCondor",
"get_input_data"]
36_input_data_file_path = Path(
"__BACKEND_INPUT_FILES__.json")
38_STDOUT_FILE =
"stdout"
40_STDERR_FILE =
"stderr"
42_backend_job_envvars = (
46 "BELLE2_EXTERNALS_TOPDIR",
47 "BELLE2_CONDB_METADATA",
54 Simple JSON load of the default input data file. Will contain a list of string file paths
55 for use by the job process.
57 with open(_input_data_file_path)
as input_data_file:
58 input_data = json.load(input_data_file)
62def monitor_jobs(args, jobs):
63 unfinished_jobs = jobs[:]
65 while unfinished_jobs:
66 B2INFO(
"Updating statuses of unfinished jobs...")
67 for j
in unfinished_jobs:
69 B2INFO(
"Checking if jobs are ready...")
70 for j
in unfinished_jobs[:]:
72 if j.status ==
"failed":
73 B2ERROR(f
"{j} is failed")
76 B2INFO(f
"{j} is finished")
77 unfinished_jobs.remove(j)
79 B2INFO(f
"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
80 time.sleep(args.heartbeat)
82 B2ERROR(f
"{failed_jobs} jobs failed")
84 B2INFO(
'All jobs finished successfully')
89 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
90 initialise it. This lets us reuse a generator by setting it up again fresh. This is not
91 optimal for expensive calculations, but it is nice for making large sequences of
92 Job input arguments on the fly.
94 def __init__(self, generator_function, *args, **kwargs):
97 generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
98 should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
99 yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
100 args (tuple): The positional arguments you want to send into the initialisation of the generator.
101 kwargs (dict): The keyword arguments you want to send into the initialisation of the generator.
114 generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
115 to have ``next``/``send`` called on it.
122def range_arguments(start=0, stop=None, step=1):
124 A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
125 It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
126 The `SubJob` object is sent into this function with `send` but is not used.
129 start (int): The starting value that will be returned.
130 stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
132 step (int): The step size.
138 if stop
is not None and x >= stop:
144 subjob = (
yield None)
146 for i
in takewhile(
lambda x:
not should_stop(x), count(start, step)):
148 B2DEBUG(29, f
"{subjob} arguments will be {args}")
149 subjob = (
yield args)
154 Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
155 The `create_subjobs` function should be implemented and used to construct
156 the subjobs of the parent job object.
159 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
160 tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
161 called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
167 Derived classes should call `super` to run this.
175 Implement this method in derived classes to generate the `SubJob` objects.
180 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
183 if self.arguments_generator:
184 arg_gen = self.arguments_generator.generator
186 for subjob
in sorted(job.subjobs.values(), key=
lambda sj: sj.id):
189 args = arg_gen.send(subjob)
190 except StopIteration:
191 B2ERROR(f
"StopIteration called when getting args for {subjob}, "
192 "setting all subsequent subjobs to have empty argument tuples.")
197 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
201 B2INFO(f
"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
202 "won't automatically have arguments assigned.")
207 return f
"{self.__class__.__name__}"
214 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
217 max_files_per_subjob (int): The maximum number of input files used per `SubJob` created.
219 super().
__init__(arguments_generator=arguments_generator)
225 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
226 in order to prevent the number of input files per subjob going over the limit set by
227 `MaxFilesSplitter.max_files_per_subjob`.
229 if not job.input_files:
230 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
234 job.create_subjob(i, input_files=subjob_input_files)
238 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
245 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
248 max_subjobs (int): The maximum number ofsubjobs that will be created.
250 super().
__init__(arguments_generator=arguments_generator)
256 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
257 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
258 more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
259 respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
260 will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
262 if not job.input_files:
263 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
267 remaining_input_files = deque(job.input_files)
271 while remaining_input_files:
273 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
275 subjob_input_files = []
276 for i
in range(num_input_files):
277 subjob_input_files.append(remaining_input_files.popleft())
279 job.create_subjob(subjob_i, input_files=subjob_input_files)
281 available_subjobs -= 1
284 B2INFO(f
"{self} created {subjob_i} Subjobs for {job}")
289 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
290 Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
291 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
293 This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
294 numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
295 assigned to every `SubJob`.
298 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
301 def __init__(self, *, arguments_generator=None, max_subjobs=None):
304 super().
__init__(arguments_generator=arguments_generator)
310 This function creates subjobs for the parent job passed in. It creates subjobs until the
311 `SubjobSplitter.arguments_generator` finishes.
313 If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
320 raise SplitterError(f
"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
322 subjob =
SubJob(job, i, job.input_files)
323 args = arg_gen.send(subjob)
324 B2INFO(f
"Creating {job}.{subjob}")
325 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
327 job.subjobs[i] = subjob
328 except StopIteration:
330 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
335 This generic Job object is used to tell a Backend what to do.
336 This object basically holds necessary information about a process you want to submit to a `Backend`.
337 It should *not* do anything that is backend specific, just hold the configuration for a job to be
338 successfully submitted and monitored using a backend. The result attribute is where backend
339 specific job monitoring goes.
342 name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
344 .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
349 statuses = {
"init": 0,
"submitted": 1,
"running": 2,
"failed": 3,
"completed": 4}
352 exit_statuses = [
"failed",
"completed"]
388 self.
output_dir = Path(job_dict[
"output_dir"])
390 self.
cmd = job_dict[
"cmd"]
391 self.
args = job_dict[
"args"]
396 for subjob_dict
in job_dict[
"subjobs"]:
397 self.
create_subjob(subjob_dict[
"id"], input_files=subjob_dict[
"input_files"], args=subjob_dict[
"args"])
407 Representation of Job class (what happens when you print a Job() instance).
409 return f
"Job({self.name})"
413 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
414 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
415 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
418 B2DEBUG(29, f
"You requested the ready() status for {self} but there is no result object set, returning False.")
425 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
426 in the best way for the type of result object/backend.
429 B2DEBUG(29, f
"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
436 Creates a subjob Job object that references that parent Job.
437 Returns the SubJob object at the end.
440 B2INFO(f
"Creating {self}.Subjob({i})")
441 subjob =
SubJob(self, i, input_files)
447 B2WARNING(f
"{self} already contains SubJob({i})! This will not be created.")
452 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
453 subjob status in the hierarchy of statuses in `Job.statuses`.
465 subjob_statuses = [subjob.status
for subjob
in self.
subjobs.values()]
466 status_level = min([self.
statuses[status]
for status
in subjob_statuses])
467 for status, level
in self.
statuses.items():
468 if level == status_level:
474 Sets the status of this Job.
477 if status ==
'failed':
478 B2ERROR(f
"Setting {self.name} status to failed")
480 B2INFO(f
"Setting {self.name} status to {status}")
487 return self._output_dir
491 self._output_dir = Path(value).absolute()
495 return self._working_dir
499 self._working_dir = Path(value).absolute()
503 return self._input_sandbox_files
505 @input_sandbox_files.setter
507 self._input_sandbox_files = [Path(p).absolute()
for p
in value]
511 return self._input_files
515 self._input_files = value
518 def max_subjobs(self):
522 def max_subjobs(self, value):
524 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
527 def max_files_per_subjob(self):
528 return self.
splitter.max_files_per_subjob
530 @max_files_per_subjob.setter
531 def max_files_per_subjob(self, value):
533 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
539 Dumps the Job object configuration to a JSON file so that it can be read in again later.
542 file_path(`basf2.Path`): The filepath we'll dump to
545 with open(file_path, mode=
"w")
as job_file:
546 json.dump(self.
job_dict, job_file, indent=2)
553 with open(file_path)
as job_file:
554 job_dict = json.load(job_file)
555 return cls(job_dict[
"name"], job_dict=job_dict)
561 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
562 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
565 job_dict[
"name"] = self.
name
567 job_dict[
"working_dir"] = self.
working_dir.as_posix()
568 job_dict[
"output_dir"] = self.
output_dir.as_posix()
570 job_dict[
"cmd"] = self.
cmd
571 job_dict[
"args"] = self.
args
575 job_dict[
"subjobs"] = [sj.job_dict
for sj
in self.
subjobs.values()]
580 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
583 with open(Path(self.
working_dir, _input_data_file_path), mode=
"w")
as input_data_file:
584 json.dump(self.
input_files, input_data_file, indent=2)
588 Get all of the requested files for the input sandbox and copy them to the working directory.
589 Files like the submit.sh or input_data.json are not part of this process.
592 if file_path.is_dir():
593 shutil.copytree(file_path, Path(self.
working_dir, file_path.name))
599 Check the input files and make sure that there aren't any duplicates.
600 Also check if the files actually exist if possible.
602 existing_input_files = []
604 file_uri = parse_file_uri(file_path)
605 if file_uri.scheme ==
"file":
606 p = Path(file_uri.path)
608 if file_uri.geturl()
not in existing_input_files:
609 existing_input_files.append(file_uri.geturl())
611 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
613 B2WARNING(f
"Requested input file path {file_path} does not exist, skipping it.")
615 B2DEBUG(29, f
"{file_path} is not a local file URI. Skipping checking if file exists")
616 if file_path
not in existing_input_files:
617 existing_input_files.append(file_path)
619 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
621 B2WARNING(f
"No valid input file paths found for {self.name}, but some were requested.")
630 str: The full command that this job will run including any arguments.
632 all_components = self.
cmd[:]
633 all_components.extend(self.
args)
635 full_command =
" ".join(
map(str, all_components))
636 B2DEBUG(29, f
"Full command of {self} is '{full_command}'")
641 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
642 It should detect if you are using a local release or CVMFS and append the correct commands
643 so that the job will have the same basf2 release environment. It should also detect
644 if a local release is not compiled with the ``opt`` option.
646 Note that this *doesn't mean that every environment variable is inherited* from the submitting
649 def append_environment_variable(cmds, envvar):
651 Append a command for setting an environment variable.
653 if envvar
in os.environ:
654 cmds.append(f
"""if [ -z "${{{envvar}}}" ]; then""")
655 cmds.append(f
" export {envvar}={os.environ[envvar]}")
658 if "BELLE2_TOOLS" not in os.environ:
659 raise BackendError(
"No BELLE2_TOOLS found in environment")
661 for envvar
in _backend_job_envvars:
662 append_environment_variable(self.
setup_cmds, envvar)
663 if "BELLE2_RELEASE" in os.environ:
664 self.
setup_cmds.append(f
"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
665 elif 'BELLE2_LOCAL_DIR' in os.environ:
666 self.
setup_cmds.append(
"export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
667 self.
setup_cmds.append(f
"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
668 self.
setup_cmds.append(f
"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
669 self.
setup_cmds.append(f
"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
670 self.
setup_cmds.append(
"pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
671 self.
setup_cmds.append(
"source $BACKEND_B2SETUP")
673 self.
setup_cmds.append(
"b2code-option $BACKEND_BELLE2_OPTION")
679 This mini-class simply holds basic information about which subjob you are
680 and a reference to the parent Job object to be able to access the main data there.
681 Rather than replicating all of the parent job's configuration again.
684 def __init__(self, job, subjob_id, input_files=None):
706 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
707 return Path(self.
parent.output_dir, str(self.
id))
711 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
712 return Path(self.
parent.working_dir, str(self.
id))
716 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
717 return "_".join((self.
parent.name, str(self.
id)))
722 Returns the status of this SubJob.
729 Sets the status of this Job.
732 if status ==
"failed":
733 B2ERROR(f
"Setting {self.name} status to failed")
735 B2INFO(f
"Setting {self.name} status to {status}")
741 A subjob cannot have subjobs. Always return empty list.
749 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
750 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
751 we only output the input files and arguments that are specific to this subjob and no other details.
754 job_dict[
"id"] = self.
id
756 job_dict[
"args"] = self.
args
761 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
762 unless otherwise specified.
764 return getattr(self.
parent, attribute)
769 return f
"SubJob({self.name})"
774 Abstract base class for a valid backend.
775 Classes derived from this will implement their own submission of basf2 jobs
776 to whatever backend they describe.
777 Some common methods/attributes go into this base class.
779 For backend_args the priority from lowest to highest is:
781 backend.default_backend_args -> backend.backend_args -> job.backend_args
784 submit_script =
"submit.sh"
786 exit_code_file =
"__BACKEND_CMD_EXIT_STATUS__"
788 default_backend_args = {}
793 if backend_args
is None:
801 Base method for submitting collection jobs to the backend type. This MUST be
802 implemented for a correctly written backend class deriving from Backend().
808 Adds setup lines to the shell script file.
810 for line
in job.setup_cmds:
811 print(line, file=batch_file)
815 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
816 `trap` statements for Ctrl-C situations.
818 start_wrapper = f
"""# ---
819# trap ctrl-c and call ctrl_c()
820trap '(ctrl_c 130)' SIGINT
821trap '(ctrl_c 143)' SIGTERM
823function write_exit_code() {{
824 echo "Writing $1 to exit status file"
825 echo "$1" > {self.exit_code_file}
830 trap '' SIGINT SIGTERM
831 echo "** Trapped Ctrl-C **"
832 echo "$1" > {self.exit_code_file}
836 print(start_wrapper, file=batch_file)
840 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
841 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
842 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
843 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
846 end_wrapper =
"""# ---
848 print(end_wrapper, file=batch_file)
853 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
854 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
855 statuses and allows the use of ready().
857 raise NotImplementedError
861 Construct the Path object of the bash script file that we will submit. It will contain
862 the actual job command, wrapper commands, setup commands, and any batch directives
869 Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
870 submitted to a backend. It provides a way to query a job's status to find out if it's ready.
875 Pass in the job object to allow the result to access the job's properties and do post-processing.
889 Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
890 the 'readiness' based on the known job status.
892 B2DEBUG(29, f
"Calling {self.job}.result.ready()")
895 elif self.
job.status
in self.
job.exit_statuses:
903 Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
904 actually look up the job's status from some database/exit code file.
906 raise NotImplementedError
910 Read the exit code file to discover the exit status of the job command. Useful fallback if the job is no longer
911 known to the job database (batch system purged it for example). Since some backends may take time to download
912 the output files of the job back to the working directory we use a time limit on how long to wait.
916 exit_code_path = Path(self.
job.working_dir, Backend.exit_code_file)
917 with open(exit_code_path)
as f:
918 exit_code = int(f.read().strip())
919 B2DEBUG(29, f
"Exit code from file for {self.job} was {exit_code}")
925 Backend for local processes i.e. on the same machine but in a subprocess.
927 Note that you should call the self.join() method to close the pool and wait for any
928 running processes to finish before exiting the process. Once you've called join you will have to set up a new
929 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
930 somewhere, then the main python process might end before your pool is done.
933 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
934 It's the maximum simultaneous subjobs.
935 Try not to specify a large number or a number larger than the number of cores.
936 It won't crash the program but it will slow down and negatively impact performance.
939 def __init__(self, *, backend_args=None, max_processes=1):
942 super().
__init__(backend_args=backend_args)
950 Result class to help monitor status of jobs submitted by Local backend.
955 Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
956 post processing of the job.
967 return_code = self.
result.get()
969 self.
job.status =
"failed"
971 self.
job.status =
"completed"
975 Update the job's (or subjobs') status by calling the result object.
977 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
979 for subjob
in self.
job.subjobs.values():
980 subjob.result._update_result_status()
986 Closes and joins the Pool, letting you wait for all results currently
989 B2INFO(
"Joining Process Pool, waiting for results to finish...")
992 B2INFO(
"Process Pool joined.")
997 Getter for max_processes
1001 @max_processes.setter
1004 Setter for max_processes, we also check for a previous Pool(), wait for it to join
1005 and then create a new one with the new value of max_processes
1010 B2INFO(
"New max_processes requested. But a pool already exists.")
1012 B2INFO(f
"Starting up new Pool with {self.max_processes} processes")
1019 raise NotImplementedError(
"This is an abstract submit(job) method that shouldn't have been called. "
1020 "Did you submit a (Sub)Job?")
1022 @submit.register(SubJob)
1025 Submission of a `SubJob` for the Local backend
1028 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1030 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1031 job.copy_input_sandbox_files_to_working_dir()
1032 job.dump_input_data()
1035 with open(script_path, mode=
"w")
as batch_file:
1036 print(
"#!/bin/bash", file=batch_file)
1039 print(job.full_command, file=batch_file)
1041 B2INFO(f
"Submitting {job}")
1050 job.status =
"submitted"
1051 B2INFO(f
"{job} submitted")
1053 @submit.register(Job)
1056 Submission of a `Job` for the Local backend
1059 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1061 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1063 job.check_input_data_files()
1065 if not job.splitter:
1067 job.copy_input_sandbox_files_to_working_dir()
1068 job.dump_input_data()
1071 with open(script_path, mode=
"w")
as batch_file:
1072 print(
"#!/bin/bash", file=batch_file)
1075 print(job.full_command, file=batch_file)
1077 B2INFO(f
"Submitting {job}")
1086 B2INFO(f
"{job} submitted")
1089 job.splitter.create_subjobs(job)
1091 self.
submit(list(job.subjobs.values()))
1095 @submit.register(list)
1098 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1103 B2INFO(
"All requested jobs submitted.")
1106 def run_job(name, working_dir, output_dir, script):
1108 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1109 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1111 B2INFO(f
"Starting Sub-process: {name}")
1112 from subprocess
import Popen
1113 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1114 stderr_file_path = Path(working_dir, _STDERR_FILE)
1116 B2INFO(f
"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1117 with open(stdout_file_path, mode=
"w", buffering=1)
as f_out, \
1118 open(stderr_file_path, mode=
"w", buffering=1)
as f_err:
1119 with Popen([
"/bin/bash", script.as_posix()],
1123 universal_newlines=
True,
1128 B2INFO(f
"Subprocess {name} finished.")
1138 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1139 in a derived class. Do not use this class directly!
1142 submission_cmds = []
1155 default_global_job_limit = 1000
1157 default_sleep_between_submission_checks = 30
1161 Init method for Batch Backend. Does some basic default setup.
1163 super().
__init__(backend_args=backend_args)
1173 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1174 You should think about where the stdout/err should go, and set the queue name.
1176 raise NotImplementedError(
"Need to implement a _add_batch_directives(self, job, file) "
1177 f
"method in {self.__class__.__name__} backend.")
1181 Useful for the HTCondor backend where a submit is needed instead of batch
1182 directives pasted directly into the submission script. It should be overwritten
1190 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1195 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1196 This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1199 bool: If the job submission can continue based on the current situation.
1204 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1207 raise NotImplementedError(
"This is an abstract submit(job) method that shouldn't have been called. "
1208 "Did you submit a (Sub)Job?")
1210 @submit.register(SubJob)
1211 def _(self, job, check_can_submit=True, jobs_per_check=100):
1213 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1214 create batch script, and send it off with the batch submission command.
1215 It should apply the correct options (default and user requested).
1217 Should set a Result object as an attribute of the job.
1222 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1223 job.copy_input_sandbox_files_to_working_dir()
1224 job.dump_input_data()
1231 with open(script_path, mode=
"w")
as batch_file:
1235 print(job.full_command, file=batch_file)
1237 os.chmod(script_path, 0o755)
1238 B2INFO(f
"Submitting {job}")
1243 job.status =
"submitted"
1244 B2INFO(f
"{job} submitted")
1246 @submit.register(Job)
1247 def _(self, job, check_can_submit=True, jobs_per_check=100):
1249 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1250 and send it off with the batch submission command, applying the correct options (default and user requested.)
1252 Should set a Result object as an attribute of the job.
1257 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1259 job.check_input_data_files()
1265 if not job.splitter:
1267 job.copy_input_sandbox_files_to_working_dir()
1268 job.dump_input_data()
1275 with open(script_path, mode=
"w")
as batch_file:
1279 print(job.full_command, file=batch_file)
1281 os.chmod(script_path, 0o755)
1282 B2INFO(f
"Submitting {job}")
1287 job.status =
"submitted"
1288 B2INFO(f
"{job} submitted")
1291 job.splitter.create_subjobs(job)
1293 self.
submit(list(job.subjobs.values()))
1297 @submit.register(list)
1298 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1300 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1302 B2INFO(f
"Submitting a list of {len(jobs)} jobs to a Batch backend")
1312 B2INFO(f
"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1313 f
"limit for this backend (={self.global_job_limit}). Will instead use the "
1314 " value of the global job limit.")
1318 for jobs_to_submit
in grouper(jobs_per_check, jobs):
1320 while not self.
can_submit(njobs=len(jobs_to_submit)):
1321 B2INFO(
"Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1326 B2INFO(f
"Submitting the next {len(jobs_to_submit)} jobs...")
1327 for job
in jobs_to_submit:
1328 self.
submit(job, check_can_submit, jobs_per_check)
1329 B2INFO(f
"All {len(jobs)} requested jobs submitted")
1333 Construct the Path object of the script file that we will submit using the batch command.
1334 For most batch backends this is the same script as the bash script we submit.
1335 But for some they require a separate submission file that describes the job.
1336 To implement that you can implement this function in the Backend class.
1354 Backend for submitting calibration processes to a qsub batch system.
1357 cmd_wkdir =
"#PBS -d"
1359 cmd_stdout =
"#PBS -o"
1361 cmd_stderr =
"#PBS -e"
1363 cmd_queue =
"#PBS -q"
1365 cmd_name =
"#PBS -N"
1367 submission_cmds = [
"qsub"]
1369 default_global_job_limit = 5000
1371 default_backend_args = {
"queue":
"short"}
1376 super().
__init__(backend_args=backend_args)
1380 Add PBS directives to submitted script.
1382 job_backend_args = {**self.
backend_args, **job.backend_args}
1383 batch_queue = job_backend_args[
"queue"]
1384 print(
"#!/bin/bash", file=batch_file)
1385 print(
"# --- Start PBS ---", file=batch_file)
1386 print(
" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1387 print(
" ".join([PBS.cmd_name, job.name]), file=batch_file)
1388 print(
" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1389 print(
" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1390 print(
" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1391 print(
"# --- End PBS ---", file=batch_file)
1397 job_id = batch_output.replace(
"\n",
"")
1398 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1405 submission_cmd.append(script_path.as_posix())
1406 return submission_cmd
1411 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1413 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True)
1418 parent.result = cls.
PBSResult(parent,
None)
1422 Simple class to help monitor status of jobs submitted by `PBS` Backend.
1424 You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1425 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1429 backend_code_to_status = {
"R":
"running",
1431 "FINISHED":
"completed",
1442 Pass in the job object and the job id to allow the result to do monitoring and perform
1443 post processing of the job.
1451 Update the job's (or subjobs') status by calling qstat.
1453 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
1455 qstat_output = PBS.qstat()
1456 if self.
job.subjobs:
1457 for subjob
in self.
job.subjobs.values():
1458 subjob.result._update_result_status(qstat_output)
1465 qstat_output (dict): The JSON output of a previous call to qstat which we can reuse to find the
1466 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1467 'job_state' information, otherwise it is useless.
1475 B2DEBUG(29, f
"Checking of the exit code from file for {self.job}")
1478 except FileNotFoundError:
1481 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1484 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1487 backend_status =
"E"
1489 backend_status =
"C"
1493 except KeyError
as err:
1494 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1496 if new_job_status != self.
job.status:
1497 self.
job.status = new_job_status
1501 Get status from output
1503 for job_info
in output[
"JOBS"]:
1504 if job_info[
"Job_Id"] == self.
job_id:
1505 return job_info[
"job_state"]
1511 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1512 Returns True if the number is lower that the limit, False if it is higher.
1515 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1516 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1517 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1518 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1519 and check again before submitting more.
1521 B2DEBUG(29,
"Calling PBS().can_submit()")
1522 job_info = self.
qstat(username=os.environ[
"USER"])
1523 total_jobs = job_info[
"NJOBS"]
1524 B2INFO(f
"Total jobs active in the PBS system is currently {total_jobs}")
1526 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1529 B2INFO(
"There is enough space to submit more jobs.")
1533 def qstat(cls, username="", job_ids=None):
1535 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1536 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1539 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1540 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1541 This one should work for Melbourne's cloud computing centre.
1544 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1545 will be in the output dictionary.
1546 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1547 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1548 matching the other filters will be returned.
1551 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1553 .. code-block:: python
1564 B2DEBUG(29, f
"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1567 job_ids = set(job_ids)
1568 cmd_list = [
"qstat",
"-x"]
1570 cmd =
" ".join(cmd_list)
1571 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1572 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1573 jobs_dict = {
"NJOBS": 0,
"JOBS": []}
1574 jobs_xml = ET.fromstring(output)
1577 if len(job_ids) == 1:
1578 job_elem = jobs_xml.find(f
"./Job[Job_Id='{list(job_ids)[0]}']")
1581 jobs_dict[
"NJOBS"] = 1
1586 for job
in jobs_xml.iterfind(
"Job"):
1587 job_owner = job.find(
"Job_Owner").text.split(
"@")[0]
1588 if username
and username != job_owner:
1590 job_id = job.find(
"Job_Id").text
1591 if job_ids
and job_id
not in job_ids:
1594 jobs_dict[
"NJOBS"] += 1
1596 if job_id
in job_ids:
1597 job_ids.remove(job_id)
1603 Creates a Job dictionary with various job information from the XML element returned by qstat.
1606 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1609 dict: JSON serialisable dictionary of the Job information we are interested in.
1612 job_dict[
"Job_Id"] = job_elem.find(
"Job_Id").text
1613 job_dict[
"Job_Name"] = job_elem.find(
"Job_Name").text
1614 job_dict[
"Job_Owner"] = job_elem.find(
"Job_Owner").text
1615 job_dict[
"job_state"] = job_elem.find(
"job_state").text
1616 job_dict[
"queue"] = job_elem.find(
"queue").text
1622 Backend for submitting calibration processes to a qsub batch system.
1625 cmd_wkdir =
"#BSUB -cwd"
1627 cmd_stdout =
"#BSUB -o"
1629 cmd_stderr =
"#BSUB -e"
1631 cmd_queue =
"#BSUB -q"
1633 cmd_name =
"#BSUB -J"
1635 submission_cmds = [
"bsub",
"-env",
"\"none\"",
"<"]
1637 default_global_job_limit = 15000
1639 default_backend_args = {
"queue":
"s"}
1644 super().
__init__(backend_args=backend_args)
1648 Adds LSF BSUB directives for the job to a script.
1650 job_backend_args = {**self.
backend_args, **job.backend_args}
1651 batch_queue = job_backend_args[
"queue"]
1652 print(
"#!/bin/bash", file=batch_file)
1653 print(
"# --- Start LSF ---", file=batch_file)
1654 print(
" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1655 print(
" ".join([LSF.cmd_name, job.name]), file=batch_file)
1656 print(
" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1657 print(
" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1658 print(
" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1659 print(
"# --- End LSF ---", file=batch_file)
1665 submission_cmd.append(script_path.as_posix())
1666 submission_cmd =
" ".join(submission_cmd)
1667 return [submission_cmd]
1672 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1674 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1679 Simple class to help monitor status of jobs submitted by LSF Backend.
1681 You pass in a `Job` object and job id from a bsub command.
1682 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1686 backend_code_to_status = {
"RUN":
"running",
1687 "DONE":
"completed",
1688 "FINISHED":
"completed",
1695 Pass in the job object and the job id to allow the result to do monitoring and perform
1696 post processing of the job.
1704 Update the job's (or subjobs') status by calling bjobs.
1706 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
1708 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"])
1709 if self.
job.subjobs:
1710 for subjob
in self.
job.subjobs.values():
1711 subjob.result._update_result_status(bjobs_output)
1718 bjobs_output (dict): The JSON output of a previous call to bjobs which we can reuse to find the
1719 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1720 'id' information, otherwise it is useless.
1728 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"], job_id=str(self.
job_id))
1736 except FileNotFoundError:
1739 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1742 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1745 backend_status =
"EXIT"
1747 backend_status =
"FINISHED"
1750 except KeyError
as err:
1751 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1753 if new_job_status != self.
job.status:
1754 self.
job.status = new_job_status
1758 Get status from output
1760 if output[
"JOBS"]
and "ERROR" in output[
"JOBS"][0]:
1761 if output[
"JOBS"][0][
"ERROR"] == f
"Job <{self.job_id}> is not found":
1762 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1764 raise BackendError(f
"Unidentified Error during status check for {self.job}: {output}")
1766 for job_info
in output[
"JOBS"]:
1767 if job_info[
"JOBID"] == self.
job_id:
1768 return job_info[
"STAT"]
1770 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1774 parent.result = cls.
LSFResult(parent,
None)
1780 m = re.search(
r"Job <(\d+)>", str(batch_output))
1784 raise BackendError(f
"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1786 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1791 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1792 Returns True if the number is lower that the limit, False if it is higher.
1795 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1796 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1797 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1798 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1799 and check again before submitting more.
1801 B2DEBUG(29,
"Calling LSF().can_submit()")
1802 job_info = self.
bjobs(output_fields=[
"stat"])
1803 total_jobs = job_info[
"NJOBS"]
1804 B2INFO(f
"Total jobs active in the LSF system is currently {total_jobs}")
1806 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1809 B2INFO(
"There is enough space to submit more jobs.")
1813 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1815 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1816 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1819 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1820 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1821 the output of this function will be only information about this job. If this argument is not given, then all jobs
1822 matching the other filters will be returned.
1823 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1824 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1825 receive job information from all known user jobs matching the other filters.
1826 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1829 dict: JSON dictionary of the form:
1831 .. code-block:: python
1834 "NJOBS":<njobs returned by command>,
1837 <output field: value>, ...
1842 B2DEBUG(29, f
"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1844 if not output_fields:
1845 output_fields = [
"id"]
1847 field_list_cmd =
"\""
1848 field_list_cmd +=
" ".join(output_fields)
1849 field_list_cmd +=
"\""
1850 cmd_list = [
"bjobs",
"-o", field_list_cmd]
1853 cmd_list.extend([
"-q", queue])
1856 cmd_list.extend([
"-u", username])
1858 cmd_list.append(
"-json")
1861 cmd_list.append(job_id)
1863 cmd =
" ".join(cmd_list)
1864 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1865 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True))
1866 output[
"NJOBS"] = output[
"JOBS"]
1867 output[
"JOBS"] = output[
"RECORDS"]
1868 del output[
"RECORDS"]
1869 del output[
"COMMAND"]
1873 def bqueues(cls, output_fields=None, queues=None):
1875 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1876 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1879 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1880 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1881 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1882 By default you will receive information about all queues.
1885 dict: JSON dictionary of the form:
1887 .. code-block:: python
1890 "COMMAND":"bqueues",
1894 "QUEUE_NAME":"b2_beast",
1895 "STATUS":"Open:Active",
1903 B2DEBUG(29, f
"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1905 if not output_fields:
1906 output_fields = [
"queue_name",
"status",
"max",
"njobs",
"pend",
"run"]
1908 field_list_cmd =
"\""
1909 field_list_cmd +=
" ".join(output_fields)
1910 field_list_cmd +=
"\""
1911 cmd_list = [
"bqueues",
"-o", field_list_cmd]
1913 cmd_list.append(
"-json")
1916 cmd_list.extend(queues)
1918 cmd =
" ".join(cmd_list)
1919 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1920 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1921 return decode_json_string(output)
1926 Backend for submitting calibration processes to a HTCondor batch system.
1929 batch_submit_script =
"submit.sub"
1931 submission_cmds = [
"condor_submit",
"-terse"]
1933 default_global_job_limit = 10000
1935 default_backend_args = {
1936 "universe":
"vanilla",
1938 "request_memory":
"4 GB",
1943 default_class_ads = [
"GlobalJobId",
"JobStatus",
"Owner"]
1947 Fill HTCondor submission file.
1951 files_to_transfer = [i.as_posix()
for i
in job.working_dir.iterdir()]
1953 job_backend_args = {**self.
backend_args, **job.backend_args}
1955 with open(submit_file_path,
"w")
as submit_file:
1956 print(f
'executable = {self.get_submit_script_path(job)}', file=submit_file)
1957 print(f
'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1958 print(f
'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1959 print(f
'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1960 print(
'transfer_input_files = ',
','.join(files_to_transfer), file=submit_file)
1961 print(f
'universe = {job_backend_args["universe"]}', file=submit_file)
1962 print(f
'getenv = {job_backend_args["getenv"]}', file=submit_file)
1963 print(f
'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1964 print(
'should_transfer_files = Yes', file=submit_file)
1965 print(
'when_to_transfer_output = ON_EXIT', file=submit_file)
1967 for line
in job_backend_args[
"extra_lines"]:
1968 print(line, file=submit_file)
1969 print(
'queue', file=submit_file)
1973 For HTCondor leave empty as the directives are already included in the submit file.
1975 print(
'#!/bin/bash', file=batch_file)
1981 submission_cmd.append(script_path.as_posix())
1982 return submission_cmd
1986 Construct the Path object of the .sub file that we will use to describe the job.
1993 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1995 job_dir = Path(cmd[-1]).parent.as_posix()
2002 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, cwd=job_dir)
2004 except subprocess.CalledProcessError
as e:
2007 B2ERROR(f
"Error during condor_submit: {str(e)} occurred more than 3 times.")
2010 B2ERROR(f
"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
2012 return re.search(
r"(\d+\.\d+) - \d+\.\d+", sub_out).groups()[0]
2016 Simple class to help monitor status of jobs submitted by HTCondor Backend.
2018 You pass in a `Job` object and job id from a condor_submit command.
2019 When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
2020 to see whether or not the job has finished.
2024 backend_code_to_status = {0:
"submitted",
2035 Pass in the job object and the job id to allow the result to do monitoring and perform
2036 post processing of the job.
2044 Update the job's (or subjobs') status by calling condor_q.
2046 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
2048 condor_q_output = HTCondor.condor_q()
2049 if self.
job.subjobs:
2050 for subjob
in self.
job.subjobs.values():
2051 subjob.result._update_result_status(condor_q_output)
2057 In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2058 If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2059 ``condor_history``, etc.
2062 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can reuse to find the
2063 status of this job if it was active when that command ran.
2065 B2DEBUG(29, f
"Calling {self.job}.result._update_result_status()")
2067 for job_record
in condor_q_output[
"JOBS"]:
2068 job_id = job_record[
"GlobalJobId"].split(
"#")[1]
2069 if job_id == self.
job_id:
2070 B2DEBUG(29, f
"Found {self.job_id} in condor_q_output.")
2071 jobs_info.append(job_record)
2077 except FileNotFoundError:
2080 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2083 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
2086 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2088 jobs_info = [{
"JobStatus": 4,
"HoldReason":
None}]
2092 jobs_info = HTCondor.condor_q(job_id=self.
job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2098 jobs_info = HTCondor.condor_history(job_id=self.
job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2100 hold_reason =
"No Reason Known"
2104 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2106 job_info = jobs_info[0]
2107 backend_status = job_info[
"JobStatus"]
2109 if backend_status == 5:
2110 hold_reason = job_info.get(
"HoldReason",
None)
2111 B2WARNING(f
"{self.job} on hold because of {hold_reason}. Keep waiting.")
2115 except KeyError
as err:
2116 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
2117 if new_job_status != self.
job.status:
2118 self.
job.status = new_job_status
2124 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
2133 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2134 Returns True if the number is lower that the limit, False if it is higher.
2137 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2138 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2139 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2140 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2141 and check again before submitting more.
2143 B2DEBUG(29,
"Calling HTCondor().can_submit()")
2145 total_jobs = jobs_info[
"NJOBS"]
2146 B2INFO(f
"Total jobs active in the HTCondor system is currently {total_jobs}")
2148 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2151 B2INFO(
"There is enough space to submit more jobs.")
2155 def condor_q(cls, class_ads=None, job_id="", username=""):
2157 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2158 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2159 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2162 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2163 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2164 by the condor_q call.
2165 job_id (str): String representation of the Job ID given by condor_submit during submission.
2166 If this argument is given then the output of this function will be only information about this job.
2167 If this argument is not given, then all jobs matching the other filters will be returned.
2168 username (str): By default we return information about only the current user's jobs. By giving
2169 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2170 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2171 so it isn't recommended.
2174 dict: JSON dictionary of the form:
2176 .. code-block:: python
2179 "NJOBS":<number of records returned by command>,
2182 <ClassAd: value>, ...
2187 B2DEBUG(29, f
"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2191 field_list_cmd =
",".join(class_ads)
2192 cmd_list = [
"condor_q",
"-json",
"-attributes", field_list_cmd]
2195 cmd_list.append(job_id)
2198 username = os.environ[
"USER"]
2200 if username ==
"all":
2201 cmd_list.append(
"-allusers")
2203 cmd_list.append(username)
2205 cmd =
" ".join(cmd_list)
2206 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2209 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2210 except BaseException:
2214 records = decode_json_string(records)
2217 jobs_info = {
"JOBS": records}
2218 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2224 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2225 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2226 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2229 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2230 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2231 by the condor_q call.
2232 job_id (str): String representation of the Job ID given by condor_submit during submission.
2233 If this argument is given then the output of this function will be only information about this job.
2234 If this argument is not given, then all jobs matching the other filters will be returned.
2235 username (str): By default we return information about only the current user's jobs. By giving
2236 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2237 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2238 and isn't recommended.
2241 dict: JSON dictionary of the form:
2243 .. code-block:: python
2246 "NJOBS":<number of records returned by command>,
2249 <ClassAd: value>, ...
2254 B2DEBUG(29, f
"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2258 field_list_cmd =
",".join(class_ads)
2259 cmd_list = [
"condor_history",
"-json",
"-attributes", field_list_cmd]
2262 cmd_list.append(job_id)
2265 username = os.environ[
"USER"]
2267 if username !=
"all":
2268 cmd_list.append(username)
2270 cmd =
" ".join(cmd_list)
2271 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2273 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2274 except BaseException:
2278 records = decode_json_string(records)
2282 jobs_info = {
"JOBS": records}
2283 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2289 Backend for submitting calibration processes to the grid.
2293class BackendError(Exception):
2295 Base exception class for Backend classes.
2301 Base exception class for Job objects.
2307 Base exception class for SubjobSplitter objects.
generator_function
Generator function that has not been 'primed'.
args
Positional argument tuple used to 'prime' the ArgumentsGenerator.generator_function.
kwargs
Keyword argument dictionary used to 'prime' the ArgumentsGenerator.generator_function.
__init__(self, generator_function, *args, **kwargs)
max_subjobs
If we try to create more than this many subjobs we throw an exception, if None then there is no maxim...
create_subjobs(self, job)
__init__(self, *, arguments_generator=None, max_subjobs=None)
_create_parent_job_result(cls, parent)
dict backend_args
The backend args that will be applied to jobs unless the job specifies them itself.
_add_wrapper_script_teardown(self, job, batch_file)
_add_wrapper_script_setup(self, job, batch_file)
dict default_backend_args
Default backend_args.
str submit_script
Default submission script name.
get_submit_script_path(self, job)
__init__(self, *, backend_args=None)
_add_setup(job, batch_file)
int default_global_job_limit
Default global limit on the total number of submitted/running jobs that the user can have.
int sleep_between_submission_checks
Seconds we wait before checking if we can submit a list of jobs.
_submit_to_batch(cls, cmd)
get_batch_submit_script_path(self, job)
list submission_cmds
Shell command to submit a script, should be implemented in the derived class.
_make_submit_file(self, job, submit_file_path)
int global_job_limit
The active job limit.
_(self, job, check_can_submit=True, jobs_per_check=100)
submit(self, job, check_can_submit=True, jobs_per_check=100)
can_submit(self, *args, **kwargs)
int default_sleep_between_submission_checks
Default time betweeon re-checking if the active jobs is below the global job limit.
_add_batch_directives(self, job, file)
__init__(self, *, backend_args=None)
_create_job_result(cls, job, batch_output)
_update_result_status(self, condor_q_output)
job_id
job id given by HTCondor
__init__(self, job, job_id)
dict backend_code_to_status
HTCondor statuses mapped to Job statuses.
_create_parent_job_result(cls, parent)
_submit_to_batch(cls, cmd)
get_batch_submit_script_path(self, job)
_create_cmd(self, script_path)
_add_batch_directives(self, job, batch_file)
can_submit(self, njobs=1)
_make_submit_file(self, job, submit_file_path)
_create_job_result(cls, job, job_id)
condor_q(cls, class_ads=None, job_id="", username="")
condor_history(cls, class_ads=None, job_id="", username="")
list default_class_ads
Default ClassAd attributes to return from commands like condor_q.
str batch_submit_script
HTCondor batch script (different to the wrapper script of Backend.submit_script)
status
Not a real attribute, it's a property.
_get_overall_status_from_subjobs(self)
check_input_data_files(self)
list input_sandbox_files
Files to be copied directly into the working directory (pathlib.Path).
list input_files
Input files to job (str), a list of these is copied to the working directory.
dict backend_args
Config dictionary for the backend to use when submitting the job.
__init__(self, name, job_dict=None)
working_dir
Working directory of the job (pathlib.Path).
dict statuses
Allowed Job status dictionary.
append_current_basf2_setup_cmds(self)
dump_to_json(self, file_path)
list args
The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own ar...
list setup_cmds
Bash commands to run before the main self.cmd (mainly used for batch system setup)
str _status
The actual status of the overall Job.
list cmd
Command and arguments as a list that will be run by the job on the backend.
result
The result object of this Job.
list output_patterns
Files that we produce during the job and want to be returned.
create_subjob(self, i, input_files=None, args=None)
dict subjobs
dict of subjobs assigned to this job
splitter
The SubjobSplitter used to create subjobs if necessary.
copy_input_sandbox_files_to_working_dir(self)
from_json(cls, file_path)
output_dir
Output directory (pathlib.Path), where we will download our output_files to.
job_id
job id given by LSF
_get_status_from_output(self, output)
_update_result_status(self, bjobs_output)
__init__(self, job, job_id)
dict backend_code_to_status
LSF statuses mapped to Job statuses.
_create_parent_job_result(cls, parent)
_submit_to_batch(cls, cmd)
_create_cmd(self, script_path)
_add_batch_directives(self, job, batch_file)
can_submit(self, njobs=1)
bqueues(cls, output_fields=None, queues=None)
__init__(self, *, backend_args=None)
_create_job_result(cls, job, batch_output)
bjobs(cls, output_fields=None, job_id="", username="", queue="")
__init__(self, job, result)
_update_result_status(self)
result
The underlying result from the backend.
_create_parent_job_result(cls, parent)
pool
The actual Pool object of this instance of the Backend.
_max_processes
Internal attribute of max_processes.
max_processes
The size of the multiprocessing process pool.
__init__(self, *, backend_args=None, max_processes=1)
run_job(name, working_dir, output_dir, script)
__init__(self, *, arguments_generator=None, max_files_per_subjob=1)
create_subjobs(self, job)
max_files_per_subjob
The maximum number of input files that will be used for each SubJob created.
__init__(self, *, arguments_generator=None, max_subjobs=1000)
max_subjobs
The maximum number of SubJob objects to be created, input files are split evenly between them.
create_subjobs(self, job)
job_id
job id given by PBS
_get_status_from_output(self, output)
__init__(self, job, job_id)
_update_result_status(self, qstat_output)
dict backend_code_to_status
PBS statuses mapped to Job statuses.
create_job_record_from_element(job_elem)
_create_parent_job_result(cls, parent)
_submit_to_batch(cls, cmd)
_create_cmd(self, script_path)
_add_batch_directives(self, job, batch_file)
can_submit(self, njobs=1)
qstat(cls, username="", job_ids=None)
__init__(self, *, backend_args=None)
_create_job_result(cls, job, batch_output)
job
Job object for result.
exit_code_file_initial_time
Time we started waiting for the exit code file to appear.
bool _is_ready
Quicker way to know if it's ready once it has already been found.
time_to_wait_for_exit_code_file
After our first attempt to view the exit code file once the job is 'finished', how long should we wai...
get_exit_code_from_file(self)
__init__(self, job, subjob_id, input_files=None)
parent
Job() instance of parent to this SubJob.
__getattr__(self, attribute)
assign_arguments(self, job)
__init__(self, *, arguments_generator=None)
arguments_generator
The ArgumentsGenerator used when creating subjobs.
create_subjobs(self, job)