11from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
14from abc
import ABC, abstractmethod
16import xml.etree.ElementTree
as ET
18from pathlib
import Path
19from collections
import deque
20from itertools
import count, takewhile
23from datetime
import datetime, timedelta
25import multiprocessing
as mp
27from caf.utils
import method_dispatch
28from caf.utils
import decode_json_string
29from caf.utils
import grouper
30from caf.utils
import parse_file_uri
33__all__ = [
"Job",
"SubJob",
"Backend",
"Local",
"Batch",
"LSF",
"PBS",
"HTCondor",
"get_input_data"]
36_input_data_file_path = Path(
"__BACKEND_INPUT_FILES__.json")
38_STDOUT_FILE =
"stdout"
40_STDERR_FILE =
"stderr"
42_backend_job_envvars = (
46 "BELLE2_EXTERNALS_TOPDIR",
47 "BELLE2_CONDB_METADATA",
54 Simple JSON load of the default input data file. Will contain a list of string file paths
55 for use by the job process.
57 with open(_input_data_file_path)
as input_data_file:
58 input_data = json.load(input_data_file)
62def monitor_jobs(args, jobs):
63 unfinished_jobs = jobs[:]
65 while unfinished_jobs:
66 B2INFO(
"Updating statuses of unfinished jobs...")
67 for j
in unfinished_jobs:
69 B2INFO(
"Checking if jobs are ready...")
70 for j
in unfinished_jobs[:]:
72 if j.status ==
"failed":
73 B2ERROR(f
"{j} is failed")
76 B2INFO(f
"{j} is finished")
77 unfinished_jobs.remove(j)
79 B2INFO(f
"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
80 time.sleep(args.heartbeat)
82 B2ERROR(f
"{failed_jobs} jobs failed")
84 B2INFO(
'All jobs finished successfully')
89 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
90 initialise it. This lets us reuse a generator by setting it up again fresh. This is not
91 optimal for expensive calculations, but it is nice for making large sequences of
92 Job input arguments on the fly.
95 def __init__(self, generator_function, *args, **kwargs):
98 generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
99 should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
100 yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
101 args (tuple): The positional arguments you want to send into the initialisation of the generator.
102 kwargs (dict): The keyword arguments you want to send into the initialisation of the generator.
115 generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
116 to have ``next``/``send`` called on it.
123def range_arguments(start=0, stop=None, step=1):
125 A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
126 It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
127 The `SubJob` object is sent into this function with `send` but is not used.
130 start (int): The starting value that will be returned.
131 stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
133 step (int): The step size.
139 if stop
is not None and x >= stop:
145 subjob = (
yield None)
147 for i
in takewhile(
lambda x:
not should_stop(x), count(start, step)):
149 B2DEBUG(29, f
"{subjob} arguments will be {args}")
150 subjob = (
yield args)
155 Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
156 The `create_subjobs` function should be implemented and used to construct
157 the subjobs of the parent job object.
160 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
161 tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
162 called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
168 Derived classes should call `super` to run this.
176 Implement this method in derived classes to generate the `SubJob` objects.
181 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
184 if self.arguments_generator:
185 arg_gen = self.arguments_generator.generator
187 for subjob
in sorted(job.subjobs.values(), key=
lambda sj: sj.id):
190 args = arg_gen.send(subjob)
191 except StopIteration:
192 B2ERROR(f
"StopIteration called when getting args for {subjob}, "
193 "setting all subsequent subjobs to have empty argument tuples.")
198 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
202 B2INFO(f
"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
203 "won't automatically have arguments assigned.")
208 return f
"{self.__class__.__name__}"
216 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
219 max_files_per_subjob (int): The maximum number of input files used per `SubJob` created.
221 super().
__init__(arguments_generator=arguments_generator)
227 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
228 in order to prevent the number of input files per subjob going over the limit set by
229 `MaxFilesSplitter.max_files_per_subjob`.
231 if not job.input_files:
232 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
236 job.create_subjob(i, input_files=subjob_input_files)
240 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
248 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
251 max_subjobs (int): The maximum number ofsubjobs that will be created.
253 super().
__init__(arguments_generator=arguments_generator)
259 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
260 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
261 more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
262 respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
263 will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
265 if not job.input_files:
266 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
270 remaining_input_files = deque(job.input_files)
274 while remaining_input_files:
276 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
278 subjob_input_files = []
279 for i
in range(num_input_files):
280 subjob_input_files.append(remaining_input_files.popleft())
282 job.create_subjob(subjob_i, input_files=subjob_input_files)
284 available_subjobs -= 1
287 B2INFO(f
"{self} created {subjob_i} Subjobs for {job}")
292 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
293 Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
294 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
296 This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
297 numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
298 assigned to every `SubJob`.
301 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
304 def __init__(self, *, arguments_generator=None, max_subjobs=None):
307 super().
__init__(arguments_generator=arguments_generator)
313 This function creates subjobs for the parent job passed in. It creates subjobs until the
314 `SubjobSplitter.arguments_generator` finishes.
316 If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
323 raise SplitterError(f
"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
325 subjob =
SubJob(job, i, job.input_files)
326 args = arg_gen.send(subjob)
327 B2INFO(f
"Creating {job}.{subjob}")
328 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
330 job.subjobs[i] = subjob
331 except StopIteration:
333 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
338 This generic Job object is used to tell a Backend what to do.
339 This object basically holds necessary information about a process you want to submit to a `Backend`.
340 It should *not* do anything that is backend specific, just hold the configuration for a job to be
341 successfully submitted and monitored using a backend. The result attribute is where backend
342 specific job monitoring goes.
345 name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
347 .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
352 statuses = {
"init": 0,
"submitted": 1,
"running": 2,
"failed": 3,
"completed": 4}
355 exit_statuses = [
"failed",
"completed"]
390 self.
output_dir = Path(job_dict[
"output_dir"])
392 self.
cmd = job_dict[
"cmd"]
393 self.
args = job_dict[
"args"]
397 for subjob_dict
in job_dict[
"subjobs"]:
398 self.
create_subjob(subjob_dict[
"id"], input_files=subjob_dict[
"input_files"], args=subjob_dict[
"args"])
408 Representation of Job class (what happens when you print a Job() instance).
410 return f
"Job({self.name})"
414 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
415 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
416 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
419 B2DEBUG(29, f
"You requested the ready() status for {self} but there is no result object set, returning False.")
426 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
427 in the best way for the type of result object/backend.
430 B2DEBUG(29, f
"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
437 Creates a subjob Job object that references that parent Job.
438 Returns the SubJob object at the end.
441 B2INFO(f
"Creating {self}.Subjob({i})")
442 subjob =
SubJob(self, i, input_files)
448 B2WARNING(f
"{self} already contains SubJob({i})! This will not be created.")
453 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
454 subjob status in the hierarchy of statuses in `Job.statuses`.
466 subjob_statuses = [subjob.status
for subjob
in self.
subjobs.values()]
467 status_level = min([self.
statuses[status]
for status
in subjob_statuses])
468 for status, level
in self.
statuses.items():
469 if level == status_level:
475 Sets the status of this Job.
478 if status ==
'failed':
479 B2ERROR(f
"Setting {self.name} status to failed")
481 B2INFO(f
"Setting {self.name} status to {status}")
488 return self._output_dir
492 self._output_dir = Path(value).absolute()
496 return self._working_dir
500 self._working_dir = Path(value).absolute()
504 return self._input_sandbox_files
506 @input_sandbox_files.setter
508 self._input_sandbox_files = [Path(p).absolute()
for p
in value]
512 return self._input_files
516 self._input_files = value
519 def max_subjobs(self):
523 def max_subjobs(self, value):
525 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
528 def max_files_per_subjob(self):
529 return self.
splitter.max_files_per_subjob
531 @max_files_per_subjob.setter
532 def max_files_per_subjob(self, value):
534 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
540 Dumps the Job object configuration to a JSON file so that it can be read in again later.
543 file_path(`basf2.Path`): The filepath we'll dump to
546 with open(file_path, mode=
"w")
as job_file:
547 json.dump(self.
job_dict, job_file, indent=2)
554 with open(file_path)
as job_file:
555 job_dict = json.load(job_file)
556 return cls(job_dict[
"name"], job_dict=job_dict)
562 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
563 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
566 job_dict[
"name"] = self.
name
568 job_dict[
"working_dir"] = self.
working_dir.as_posix()
569 job_dict[
"output_dir"] = self.
output_dir.as_posix()
571 job_dict[
"cmd"] = self.
cmd
572 job_dict[
"args"] = self.
args
576 job_dict[
"subjobs"] = [sj.job_dict
for sj
in self.
subjobs.values()]
581 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
584 with open(Path(self.
working_dir, _input_data_file_path), mode=
"w")
as input_data_file:
585 json.dump(self.
input_files, input_data_file, indent=2)
589 Get all of the requested files for the input sandbox and copy them to the working directory.
590 Files like the submit.sh or input_data.json are not part of this process.
593 if file_path.is_dir():
594 shutil.copytree(file_path, Path(self.
working_dir, file_path.name))
600 Check the input files and make sure that there aren't any duplicates.
601 Also check if the files actually exist if possible.
603 existing_input_files = []
605 file_uri = parse_file_uri(file_path)
606 if file_uri.scheme ==
"file":
607 p = Path(file_uri.path)
609 if file_uri.geturl()
not in existing_input_files:
610 existing_input_files.append(file_uri.geturl())
612 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
614 B2WARNING(f
"Requested input file path {file_path} does not exist, skipping it.")
616 B2DEBUG(29, f
"{file_path} is not a local file URI. Skipping checking if file exists")
617 if file_path
not in existing_input_files:
618 existing_input_files.append(file_path)
620 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
622 B2WARNING(f
"No valid input file paths found for {self.name}, but some were requested.")
631 str: The full command that this job will run including any arguments.
633 all_components = self.
cmd[:]
634 all_components.extend(self.
args)
636 full_command =
" ".join(
map(str, all_components))
637 B2DEBUG(29, f
"Full command of {self} is '{full_command}'")
642 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
643 It should detect if you are using a local release or CVMFS and append the correct commands
644 so that the job will have the same basf2 release environment. It should also detect
645 if a local release is not compiled with the ``opt`` option.
647 Note that this *doesn't mean that every environment variable is inherited* from the submitting
650 def append_environment_variable(cmds, envvar):
652 Append a command for setting an environment variable.
654 if envvar
in os.environ:
655 cmds.append(f
"""if [ -z "${{{envvar}}}" ]; then""")
656 cmds.append(f
" export {envvar}={os.environ[envvar]}")
659 if "BELLE2_TOOLS" not in os.environ:
660 raise BackendError(
"No BELLE2_TOOLS found in environment")
662 for envvar
in _backend_job_envvars:
663 append_environment_variable(self.
setup_cmds, envvar)
664 if "BELLE2_RELEASE" in os.environ:
665 self.
setup_cmds.append(f
"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
666 elif 'BELLE2_LOCAL_DIR' in os.environ:
667 self.
setup_cmds.append(
"export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
668 self.
setup_cmds.append(f
"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
669 self.
setup_cmds.append(f
"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
670 self.
setup_cmds.append(f
"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
671 self.
setup_cmds.append(
"pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
672 self.
setup_cmds.append(
"source $BACKEND_B2SETUP")
674 self.
setup_cmds.append(
"b2code-option $BACKEND_BELLE2_OPTION")
680 This mini-class simply holds basic information about which subjob you are
681 and a reference to the parent Job object to be able to access the main data there.
682 Rather than replicating all of the parent job's configuration again.
685 def __init__(self, job, subjob_id, input_files=None):
707 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
708 return Path(self.
parent.output_dir, str(self.
id))
712 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
713 return Path(self.
parent.working_dir, str(self.
id))
717 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
718 return "_".join((self.
parent.name, str(self.
id)))
723 Returns the status of this SubJob.
730 Sets the status of this Job.
733 if status ==
"failed":
734 B2ERROR(f
"Setting {self.name} status to failed")
736 B2INFO(f
"Setting {self.name} status to {status}")
742 A subjob cannot have subjobs. Always return empty list.
750 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
751 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
752 we only output the input files and arguments that are specific to this subjob and no other details.
755 job_dict[
"id"] = self.
id
757 job_dict[
"args"] = self.
args
762 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
763 unless otherwise specified.
765 return getattr(self.
parent, attribute)
770 return f
"SubJob({self.name})"
775 Abstract base class for a valid backend.
776 Classes derived from this will implement their own submission of basf2 jobs
777 to whatever backend they describe.
778 Some common methods/attributes go into this base class.
780 For backend_args the priority from lowest to highest is:
782 backend.default_backend_args -> backend.backend_args -> job.backend_args
785 submit_script =
"submit.sh"
787 exit_code_file =
"__BACKEND_CMD_EXIT_STATUS__"
789 default_backend_args = {}
794 if backend_args
is None:
802 Base method for submitting collection jobs to the backend type. This MUST be
803 implemented for a correctly written backend class deriving from Backend().
809 Adds setup lines to the shell script file.
811 for line
in job.setup_cmds:
812 print(line, file=batch_file)
816 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
817 `trap` statements for Ctrl-C situations.
819 start_wrapper = f
"""# ---
820# trap ctrl-c and call ctrl_c()
821trap '(ctrl_c 130)' SIGINT
822trap '(ctrl_c 143)' SIGTERM
824function write_exit_code() {{
825 echo "Writing $1 to exit status file"
826 echo "$1" > {self.exit_code_file}
831 trap '' SIGINT SIGTERM
832 echo "** Trapped Ctrl-C **"
833 echo "$1" > {self.exit_code_file}
837 print(start_wrapper, file=batch_file)
841 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
842 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
843 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
844 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
847 end_wrapper =
"""# ---
849 print(end_wrapper, file=batch_file)
854 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
855 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
856 statuses and allows the use of ready().
858 raise NotImplementedError
862 Construct the Path object of the bash script file that we will submit. It will contain
863 the actual job command, wrapper commands, setup commands, and any batch directives
870 Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
871 submitted to a backend. It provides a way to query a job's status to find out if it's ready.
876 Pass in the job object to allow the result to access the job's properties and do post-processing.
890 Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
891 the 'readiness' based on the known job status.
893 B2DEBUG(29, f
"Calling {self.job}.result.ready()")
896 elif self.
job.status
in self.
job.exit_statuses:
904 Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
905 actually look up the job's status from some database/exit code file.
907 raise NotImplementedError
911 Read the exit code file to discover the exit status of the job command. Useful fallback if the job is no longer
912 known to the job database (batch system purged it for example). Since some backends may take time to download
913 the output files of the job back to the working directory we use a time limit on how long to wait.
917 exit_code_path = Path(self.
job.working_dir, Backend.exit_code_file)
918 with open(exit_code_path)
as f:
919 exit_code = int(f.read().strip())
920 B2DEBUG(29, f
"Exit code from file for {self.job} was {exit_code}")
926 Backend for local processes i.e. on the same machine but in a subprocess.
928 Note that you should call the self.join() method to close the pool and wait for any
929 running processes to finish before exiting the process. Once you've called join you will have to set up a new
930 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
931 somewhere, then the main python process might end before your pool is done.
934 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
935 It's the maximum simultaneous subjobs.
936 Try not to specify a large number or a number larger than the number of cores.
937 It won't crash the program but it will slow down and negatively impact performance.
940 def __init__(self, *, backend_args=None, max_processes=1):
943 super().
__init__(backend_args=backend_args)
951 Result class to help monitor status of jobs submitted by Local backend.
956 Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
957 post processing of the job.
968 return_code = self.
result.get()
970 self.
job.status =
"failed"
972 self.
job.status =
"completed"
976 Update the job's (or subjobs') status by calling the result object.
978 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
980 for subjob
in self.
job.subjobs.values():
981 subjob.result._update_result_status()
987 Closes and joins the Pool, letting you wait for all results currently
990 B2INFO(
"Joining Process Pool, waiting for results to finish...")
993 B2INFO(
"Process Pool joined.")
998 Getter for max_processes
1002 @max_processes.setter
1005 Setter for max_processes, we also check for a previous Pool(), wait for it to join
1006 and then create a new one with the new value of max_processes
1011 B2INFO(
"New max_processes requested. But a pool already exists.")
1013 B2INFO(f
"Starting up new Pool with {self.max_processes} processes")
1020 raise NotImplementedError(
"This is an abstract submit(job) method that shouldn't have been called. "
1021 "Did you submit a (Sub)Job?")
1023 @submit.register(SubJob)
1026 Submission of a `SubJob` for the Local backend
1029 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1031 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1032 job.copy_input_sandbox_files_to_working_dir()
1033 job.dump_input_data()
1036 with open(script_path, mode=
"w")
as batch_file:
1037 print(
"#!/bin/bash", file=batch_file)
1040 print(job.full_command, file=batch_file)
1042 B2INFO(f
"Submitting {job}")
1051 job.status =
"submitted"
1052 B2INFO(f
"{job} submitted")
1054 @submit.register(Job)
1057 Submission of a `Job` for the Local backend
1060 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1062 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1064 job.check_input_data_files()
1066 if not job.splitter:
1068 job.copy_input_sandbox_files_to_working_dir()
1069 job.dump_input_data()
1072 with open(script_path, mode=
"w")
as batch_file:
1073 print(
"#!/bin/bash", file=batch_file)
1076 print(job.full_command, file=batch_file)
1078 B2INFO(f
"Submitting {job}")
1087 B2INFO(f
"{job} submitted")
1090 job.splitter.create_subjobs(job)
1092 self.
submit(list(job.subjobs.values()))
1096 @submit.register(list)
1099 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1104 B2INFO(
"All requested jobs submitted.")
1107 def run_job(name, working_dir, output_dir, script):
1109 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1110 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1112 B2INFO(f
"Starting Sub-process: {name}")
1113 from subprocess
import Popen
1114 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1115 stderr_file_path = Path(working_dir, _STDERR_FILE)
1117 B2INFO(f
"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1118 with open(stdout_file_path, mode=
"w", buffering=1)
as f_out, \
1119 open(stderr_file_path, mode=
"w", buffering=1)
as f_err:
1120 with Popen([
"/bin/bash", script.as_posix()],
1124 universal_newlines=
True,
1129 B2INFO(f
"Subprocess {name} finished.")
1139 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1140 in a derived class. Do not use this class directly!
1143 submission_cmds = []
1156 default_global_job_limit = 1000
1158 default_sleep_between_submission_checks = 30
1162 Init method for Batch Backend. Does some basic default setup.
1164 super().
__init__(backend_args=backend_args)
1174 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1175 You should think about where the stdout/err should go, and set the queue name.
1177 raise NotImplementedError(
"Need to implement a _add_batch_directives(self, job, file) "
1178 f
"method in {self.__class__.__name__} backend.")
1182 Useful for the HTCondor backend where a submit is needed instead of batch
1183 directives pasted directly into the submission script. It should be overwritten
1191 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1196 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1197 This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1200 bool: If the job submission can continue based on the current situation.
1205 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1208 raise NotImplementedError(
"This is an abstract submit(job) method that shouldn't have been called. "
1209 "Did you submit a (Sub)Job?")
1214 @submit.register(SubJob)
1215 def _(self, job, check_can_submit=True, jobs_per_check=100):
1217 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1218 create batch script, and send it off with the batch submission command.
1219 It should apply the correct options (default and user requested).
1221 Should set a Result object as an attribute of the job.
1226 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1227 job.copy_input_sandbox_files_to_working_dir()
1228 job.dump_input_data()
1235 with open(script_path, mode=
"w")
as batch_file:
1239 print(job.full_command, file=batch_file)
1241 os.chmod(script_path, 0o755)
1242 B2INFO(f
"Submitting {job}")
1247 job.status =
"submitted"
1248 B2INFO(f
"{job} submitted")
1250 @submit.register(Job)
1251 def _(self, job, check_can_submit=True, jobs_per_check=100):
1253 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1254 and send it off with the batch submission command, applying the correct options (default and user requested.)
1256 Should set a Result object as an attribute of the job.
1261 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1263 job.check_input_data_files()
1269 if not job.splitter:
1271 job.copy_input_sandbox_files_to_working_dir()
1272 job.dump_input_data()
1279 with open(script_path, mode=
"w")
as batch_file:
1283 print(job.full_command, file=batch_file)
1285 os.chmod(script_path, 0o755)
1286 B2INFO(f
"Submitting {job}")
1291 job.status =
"submitted"
1292 B2INFO(f
"{job} submitted")
1295 job.splitter.create_subjobs(job)
1297 self.
submit(list(job.subjobs.values()))
1301 @submit.register(list)
1302 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1304 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1306 B2INFO(f
"Submitting a list of {len(jobs)} jobs to a Batch backend")
1316 B2INFO(f
"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1317 f
"limit for this backend (={self.global_job_limit}). Will instead use the "
1318 " value of the global job limit.")
1322 for jobs_to_submit
in grouper(jobs_per_check, jobs):
1324 while not self.
can_submit(njobs=len(jobs_to_submit)):
1325 B2INFO(
"Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1330 B2INFO(f
"Submitting the next {len(jobs_to_submit)} jobs...")
1331 for job
in jobs_to_submit:
1332 self.
submit(job, check_can_submit, jobs_per_check)
1333 B2INFO(f
"All {len(jobs)} requested jobs submitted")
1338 Construct the Path object of the script file that we will submit using the batch command.
1339 For most batch backends this is the same script as the bash script we submit.
1340 But for some they require a separate submission file that describes the job.
1341 To implement that you can implement this function in the Backend class.
1359 Backend for submitting calibration processes to a qsub batch system.
1362 cmd_wkdir =
"#PBS -d"
1364 cmd_stdout =
"#PBS -o"
1366 cmd_stderr =
"#PBS -e"
1368 cmd_queue =
"#PBS -q"
1370 cmd_name =
"#PBS -N"
1372 submission_cmds = [
"qsub"]
1374 default_global_job_limit = 5000
1376 default_backend_args = {
"queue":
"short"}
1381 super().
__init__(backend_args=backend_args)
1385 Add PBS directives to submitted script.
1387 job_backend_args = {**self.
backend_args, **job.backend_args}
1388 batch_queue = job_backend_args[
"queue"]
1389 print(
"#!/bin/bash", file=batch_file)
1390 print(
"# --- Start PBS ---", file=batch_file)
1391 print(
" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1392 print(
" ".join([PBS.cmd_name, job.name]), file=batch_file)
1393 print(
" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1394 print(
" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1395 print(
" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1396 print(
"# --- End PBS ---", file=batch_file)
1402 job_id = batch_output.replace(
"\n",
"")
1403 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1410 submission_cmd.append(script_path.as_posix())
1411 return submission_cmd
1416 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1418 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True)
1423 parent.result = cls.
PBSResult(parent,
None)
1427 Simple class to help monitor status of jobs submitted by `PBS` Backend.
1429 You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1430 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1434 backend_code_to_status = {
"R":
"running",
1436 "FINISHED":
"completed",
1447 Pass in the job object and the job id to allow the result to do monitoring and perform
1448 post processing of the job.
1456 Update the job's (or subjobs') status by calling qstat.
1458 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
1460 qstat_output = PBS.qstat()
1461 if self.
job.subjobs:
1462 for subjob
in self.
job.subjobs.values():
1463 subjob.result._update_result_status(qstat_output)
1470 qstat_output (dict): The JSON output of a previous call to qstat which we can reuse to find the
1471 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1472 'job_state' information, otherwise it is useless.
1480 B2DEBUG(29, f
"Checking of the exit code from file for {self.job}")
1483 except FileNotFoundError:
1486 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1489 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1492 backend_status =
"E"
1494 backend_status =
"C"
1498 except KeyError
as err:
1499 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1501 if new_job_status != self.
job.status:
1502 self.
job.status = new_job_status
1506 Get status from output
1508 for job_info
in output[
"JOBS"]:
1509 if job_info[
"Job_Id"] == self.
job_id:
1510 return job_info[
"job_state"]
1516 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1517 Returns True if the number is lower that the limit, False if it is higher.
1520 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1521 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1522 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1523 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1524 and check again before submitting more.
1526 B2DEBUG(29,
"Calling PBS().can_submit()")
1527 job_info = self.
qstat(username=os.environ[
"USER"])
1528 total_jobs = job_info[
"NJOBS"]
1529 B2INFO(f
"Total jobs active in the PBS system is currently {total_jobs}")
1531 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1534 B2INFO(
"There is enough space to submit more jobs.")
1538 def qstat(cls, username="", job_ids=None):
1540 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1541 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1544 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1545 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1546 This one should work for Melbourne's cloud computing centre.
1549 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1550 will be in the output dictionary.
1551 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1552 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1553 matching the other filters will be returned.
1556 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1558 .. code-block:: python
1569 B2DEBUG(29, f
"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1572 job_ids = set(job_ids)
1573 cmd_list = [
"qstat",
"-x"]
1575 cmd =
" ".join(cmd_list)
1576 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1577 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1578 jobs_dict = {
"NJOBS": 0,
"JOBS": []}
1579 jobs_xml = ET.fromstring(output)
1582 if len(job_ids) == 1:
1583 job_elem = jobs_xml.find(f
"./Job[Job_Id='{list(job_ids)[0]}']")
1586 jobs_dict[
"NJOBS"] = 1
1591 for job
in jobs_xml.iterfind(
"Job"):
1592 job_owner = job.find(
"Job_Owner").text.split(
"@")[0]
1593 if username
and username != job_owner:
1595 job_id = job.find(
"Job_Id").text
1596 if job_ids
and job_id
not in job_ids:
1599 jobs_dict[
"NJOBS"] += 1
1601 if job_id
in job_ids:
1602 job_ids.remove(job_id)
1608 Creates a Job dictionary with various job information from the XML element returned by qstat.
1611 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1614 dict: JSON serialisable dictionary of the Job information we are interested in.
1617 job_dict[
"Job_Id"] = job_elem.find(
"Job_Id").text
1618 job_dict[
"Job_Name"] = job_elem.find(
"Job_Name").text
1619 job_dict[
"Job_Owner"] = job_elem.find(
"Job_Owner").text
1620 job_dict[
"job_state"] = job_elem.find(
"job_state").text
1621 job_dict[
"queue"] = job_elem.find(
"queue").text
1627 Backend for submitting calibration processes to a qsub batch system.
1630 cmd_wkdir =
"#BSUB -cwd"
1632 cmd_stdout =
"#BSUB -o"
1634 cmd_stderr =
"#BSUB -e"
1636 cmd_queue =
"#BSUB -q"
1638 cmd_name =
"#BSUB -J"
1640 submission_cmds = [
"bsub",
"-env",
"\"none\"",
"<"]
1642 default_global_job_limit = 15000
1644 default_backend_args = {
"queue":
"s"}
1649 super().
__init__(backend_args=backend_args)
1653 Adds LSF BSUB directives for the job to a script.
1655 job_backend_args = {**self.
backend_args, **job.backend_args}
1656 batch_queue = job_backend_args[
"queue"]
1657 print(
"#!/bin/bash", file=batch_file)
1658 print(
"# --- Start LSF ---", file=batch_file)
1659 print(
" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1660 print(
" ".join([LSF.cmd_name, job.name]), file=batch_file)
1661 print(
" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1662 print(
" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1663 print(
" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1664 print(
"# --- End LSF ---", file=batch_file)
1670 submission_cmd.append(script_path.as_posix())
1671 submission_cmd =
" ".join(submission_cmd)
1672 return [submission_cmd]
1677 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1679 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1684 Simple class to help monitor status of jobs submitted by LSF Backend.
1686 You pass in a `Job` object and job id from a bsub command.
1687 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1691 backend_code_to_status = {
"RUN":
"running",
1692 "DONE":
"completed",
1693 "FINISHED":
"completed",
1700 Pass in the job object and the job id to allow the result to do monitoring and perform
1701 post processing of the job.
1709 Update the job's (or subjobs') status by calling bjobs.
1711 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
1713 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"])
1714 if self.
job.subjobs:
1715 for subjob
in self.
job.subjobs.values():
1716 subjob.result._update_result_status(bjobs_output)
1723 bjobs_output (dict): The JSON output of a previous call to bjobs which we can reuse to find the
1724 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1725 'id' information, otherwise it is useless.
1733 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"], job_id=str(self.
job_id))
1741 except FileNotFoundError:
1744 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1747 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1750 backend_status =
"EXIT"
1752 backend_status =
"FINISHED"
1755 except KeyError
as err:
1756 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1758 if new_job_status != self.
job.status:
1759 self.
job.status = new_job_status
1763 Get status from output
1765 if output[
"JOBS"]
and "ERROR" in output[
"JOBS"][0]:
1766 if output[
"JOBS"][0][
"ERROR"] == f
"Job <{self.job_id}> is not found":
1767 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1769 raise BackendError(f
"Unidentified Error during status check for {self.job}: {output}")
1771 for job_info
in output[
"JOBS"]:
1772 if job_info[
"JOBID"] == self.
job_id:
1773 return job_info[
"STAT"]
1775 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1779 parent.result = cls.
LSFResult(parent,
None)
1785 m = re.search(
r"Job <(\d+)>", str(batch_output))
1789 raise BackendError(f
"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1791 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1796 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1797 Returns True if the number is lower that the limit, False if it is higher.
1800 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1801 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1802 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1803 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1804 and check again before submitting more.
1806 B2DEBUG(29,
"Calling LSF().can_submit()")
1807 job_info = self.
bjobs(output_fields=[
"stat"])
1808 total_jobs = job_info[
"NJOBS"]
1809 B2INFO(f
"Total jobs active in the LSF system is currently {total_jobs}")
1811 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1814 B2INFO(
"There is enough space to submit more jobs.")
1818 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1820 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1821 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1824 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1825 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1826 the output of this function will be only information about this job. If this argument is not given, then all jobs
1827 matching the other filters will be returned.
1828 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1829 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1830 receive job information from all known user jobs matching the other filters.
1831 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1834 dict: JSON dictionary of the form:
1836 .. code-block:: python
1839 "NJOBS":<njobs returned by command>,
1842 <output field: value>, ...
1847 B2DEBUG(29, f
"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1849 if not output_fields:
1850 output_fields = [
"id"]
1852 field_list_cmd =
"\""
1853 field_list_cmd +=
" ".join(output_fields)
1854 field_list_cmd +=
"\""
1855 cmd_list = [
"bjobs",
"-o", field_list_cmd]
1858 cmd_list.extend([
"-q", queue])
1861 cmd_list.extend([
"-u", username])
1863 cmd_list.append(
"-json")
1866 cmd_list.append(job_id)
1868 cmd =
" ".join(cmd_list)
1869 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1870 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True))
1871 output[
"NJOBS"] = output[
"JOBS"]
1872 output[
"JOBS"] = output[
"RECORDS"]
1873 del output[
"RECORDS"]
1874 del output[
"COMMAND"]
1878 def bqueues(cls, output_fields=None, queues=None):
1880 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1881 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1884 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1885 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1886 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1887 By default you will receive information about all queues.
1890 dict: JSON dictionary of the form:
1892 .. code-block:: python
1895 "COMMAND":"bqueues",
1899 "QUEUE_NAME":"b2_beast",
1900 "STATUS":"Open:Active",
1908 B2DEBUG(29, f
"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1910 if not output_fields:
1911 output_fields = [
"queue_name",
"status",
"max",
"njobs",
"pend",
"run"]
1913 field_list_cmd =
"\""
1914 field_list_cmd +=
" ".join(output_fields)
1915 field_list_cmd +=
"\""
1916 cmd_list = [
"bqueues",
"-o", field_list_cmd]
1918 cmd_list.append(
"-json")
1921 cmd_list.extend(queues)
1923 cmd =
" ".join(cmd_list)
1924 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1925 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1926 return decode_json_string(output)
1931 Backend for submitting calibration processes to a HTCondor batch system.
1934 batch_submit_script =
"submit.sub"
1936 submission_cmds = [
"condor_submit",
"-terse"]
1938 default_global_job_limit = 10000
1940 default_backend_args = {
1941 "universe":
"vanilla",
1943 "request_memory":
"4 GB",
1948 default_class_ads = [
"GlobalJobId",
"JobStatus",
"Owner"]
1952 Fill HTCondor submission file.
1956 files_to_transfer = [i.as_posix()
for i
in job.working_dir.iterdir()]
1958 job_backend_args = {**self.
backend_args, **job.backend_args}
1960 with open(submit_file_path,
"w")
as submit_file:
1961 print(f
'executable = {self.get_submit_script_path(job)}', file=submit_file)
1962 print(f
'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1963 print(f
'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1964 print(f
'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1965 print(
'transfer_input_files = ',
','.join(files_to_transfer), file=submit_file)
1966 print(f
'universe = {job_backend_args["universe"]}', file=submit_file)
1967 print(f
'getenv = {job_backend_args["getenv"]}', file=submit_file)
1968 print(f
'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1969 print(
'should_transfer_files = Yes', file=submit_file)
1970 print(
'when_to_transfer_output = ON_EXIT', file=submit_file)
1972 for line
in job_backend_args[
"extra_lines"]:
1973 print(line, file=submit_file)
1974 print(
'queue', file=submit_file)
1978 For HTCondor leave empty as the directives are already included in the submit file.
1980 print(
'#!/bin/bash', file=batch_file)
1986 submission_cmd.append(script_path.as_posix())
1987 return submission_cmd
1991 Construct the Path object of the .sub file that we will use to describe the job.
1998 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
2000 job_dir = Path(cmd[-1]).parent.as_posix()
2007 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, cwd=job_dir)
2009 except subprocess.CalledProcessError
as e:
2012 B2ERROR(f
"Error during condor_submit: {str(e)} occurred more than 3 times.")
2015 B2ERROR(f
"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
2017 return re.search(
r"(\d+\.\d+) - \d+\.\d+", sub_out).groups()[0]
2021 Simple class to help monitor status of jobs submitted by HTCondor Backend.
2023 You pass in a `Job` object and job id from a condor_submit command.
2024 When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
2025 to see whether or not the job has finished.
2029 backend_code_to_status = {0:
"submitted",
2040 Pass in the job object and the job id to allow the result to do monitoring and perform
2041 post processing of the job.
2049 Update the job's (or subjobs') status by calling condor_q.
2051 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
2053 condor_q_output = HTCondor.condor_q()
2054 if self.
job.subjobs:
2055 for subjob
in self.
job.subjobs.values():
2056 subjob.result._update_result_status(condor_q_output)
2062 In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2063 If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2064 ``condor_history``, etc.
2067 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can reuse to find the
2068 status of this job if it was active when that command ran.
2070 B2DEBUG(29, f
"Calling {self.job}.result._update_result_status()")
2072 for job_record
in condor_q_output[
"JOBS"]:
2073 job_id = job_record[
"GlobalJobId"].split(
"#")[1]
2074 if job_id == self.
job_id:
2075 B2DEBUG(29, f
"Found {self.job_id} in condor_q_output.")
2076 jobs_info.append(job_record)
2082 except FileNotFoundError:
2085 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2088 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
2091 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2093 jobs_info = [{
"JobStatus": 4,
"HoldReason":
None}]
2097 jobs_info = HTCondor.condor_q(job_id=self.
job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2103 jobs_info = HTCondor.condor_history(job_id=self.
job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2105 hold_reason =
"No Reason Known"
2109 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2111 job_info = jobs_info[0]
2112 backend_status = job_info[
"JobStatus"]
2114 if backend_status == 5:
2115 hold_reason = job_info.get(
"HoldReason",
None)
2116 B2WARNING(f
"{self.job} on hold because of {hold_reason}. Keep waiting.")
2120 except KeyError
as err:
2121 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
2122 if new_job_status != self.
job.status:
2123 self.
job.status = new_job_status
2129 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
2138 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2139 Returns True if the number is lower that the limit, False if it is higher.
2142 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2143 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2144 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2145 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2146 and check again before submitting more.
2148 B2DEBUG(29,
"Calling HTCondor().can_submit()")
2150 total_jobs = jobs_info[
"NJOBS"]
2151 B2INFO(f
"Total jobs active in the HTCondor system is currently {total_jobs}")
2153 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2156 B2INFO(
"There is enough space to submit more jobs.")
2160 def condor_q(cls, class_ads=None, job_id="", username=""):
2162 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2163 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2164 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2167 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2168 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2169 by the condor_q call.
2170 job_id (str): String representation of the Job ID given by condor_submit during submission.
2171 If this argument is given then the output of this function will be only information about this job.
2172 If this argument is not given, then all jobs matching the other filters will be returned.
2173 username (str): By default we return information about only the current user's jobs. By giving
2174 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2175 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2176 so it isn't recommended.
2179 dict: JSON dictionary of the form:
2181 .. code-block:: python
2184 "NJOBS":<number of records returned by command>,
2187 <ClassAd: value>, ...
2192 B2DEBUG(29, f
"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2196 field_list_cmd =
",".join(class_ads)
2197 cmd_list = [
"condor_q",
"-json",
"-attributes", field_list_cmd]
2200 cmd_list.append(job_id)
2203 username = os.environ[
"USER"]
2205 if username ==
"all":
2206 cmd_list.append(
"-allusers")
2208 cmd_list.append(username)
2210 cmd =
" ".join(cmd_list)
2211 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2214 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2215 except BaseException:
2219 records = decode_json_string(records)
2222 jobs_info = {
"JOBS": records}
2223 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2229 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2230 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2231 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2234 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2235 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2236 by the condor_q call.
2237 job_id (str): String representation of the Job ID given by condor_submit during submission.
2238 If this argument is given then the output of this function will be only information about this job.
2239 If this argument is not given, then all jobs matching the other filters will be returned.
2240 username (str): By default we return information about only the current user's jobs. By giving
2241 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2242 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2243 and isn't recommended.
2246 dict: JSON dictionary of the form:
2248 .. code-block:: python
2251 "NJOBS":<number of records returned by command>,
2254 <ClassAd: value>, ...
2259 B2DEBUG(29, f
"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2263 field_list_cmd =
",".join(class_ads)
2264 cmd_list = [
"condor_history",
"-json",
"-attributes", field_list_cmd]
2267 cmd_list.append(job_id)
2270 username = os.environ[
"USER"]
2272 if username !=
"all":
2273 cmd_list.append(username)
2275 cmd =
" ".join(cmd_list)
2276 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2278 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2279 except BaseException:
2283 records = decode_json_string(records)
2287 jobs_info = {
"JOBS": records}
2288 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2294 Backend for submitting calibration processes to the grid.
2298class BackendError(Exception):
2300 Base exception class for Backend classes.
2306 Base exception class for Job objects.
2312 Base exception class for SubjobSplitter objects.
generator_function
Generator function that has not been 'primed'.
args
Positional argument tuple used to 'prime' the ArgumentsGenerator.generator_function.
kwargs
Keyword argument dictionary used to 'prime' the ArgumentsGenerator.generator_function.
__init__(self, generator_function, *args, **kwargs)
max_subjobs
If we try to create more than this many subjobs we throw an exception, if None then there is no maxim...
create_subjobs(self, job)
__init__(self, *, arguments_generator=None, max_subjobs=None)
_create_parent_job_result(cls, parent)
dict backend_args
The backend args that will be applied to jobs unless the job specifies them itself.
_add_wrapper_script_teardown(self, job, batch_file)
_add_wrapper_script_setup(self, job, batch_file)
dict default_backend_args
Default backend_args.
str submit_script
Default submission script name.
get_submit_script_path(self, job)
__init__(self, *, backend_args=None)
_add_setup(job, batch_file)
int default_global_job_limit
Default global limit on the total number of submitted/running jobs that the user can have.
_submit_to_batch(cls, cmd)
get_batch_submit_script_path(self, job)
int sleep_between_submission_checks
Seconds we wait before checking if we can submit a list of jobs.
list submission_cmds
Shell command to submit a script, should be implemented in the derived class.
_make_submit_file(self, job, submit_file_path)
int global_job_limit
The active job limit.
submit(self, job, check_can_submit=True, jobs_per_check=100)
can_submit(self, *args, **kwargs)
int default_sleep_between_submission_checks
Default time betweeon re-checking if the active jobs is below the global job limit.
_add_batch_directives(self, job, file)
__init__(self, *, backend_args=None)
_create_job_result(cls, job, batch_output)
_update_result_status(self, condor_q_output)
job_id
job id given by HTCondor
__init__(self, job, job_id)
dict backend_code_to_status
HTCondor statuses mapped to Job statuses.
_create_parent_job_result(cls, parent)
_submit_to_batch(cls, cmd)
get_batch_submit_script_path(self, job)
_create_cmd(self, script_path)
_add_batch_directives(self, job, batch_file)
can_submit(self, njobs=1)
_make_submit_file(self, job, submit_file_path)
_create_job_result(cls, job, job_id)
condor_q(cls, class_ads=None, job_id="", username="")
condor_history(cls, class_ads=None, job_id="", username="")
list default_class_ads
Default ClassAd attributes to return from commands like condor_q.
str batch_submit_script
HTCondor batch script (different to the wrapper script of Backend.submit_script)
status
Not a real attribute, it's a property.
_get_overall_status_from_subjobs(self)
check_input_data_files(self)
list input_sandbox_files
Files to be copied directly into the working directory (pathlib.Path).
list input_files
Input files to job (str), a list of these is copied to the working directory.
dict backend_args
Config dictionary for the backend to use when submitting the job.
__init__(self, name, job_dict=None)
working_dir
Working directory of the job (pathlib.Path).
dict statuses
Allowed Job status dictionary.
append_current_basf2_setup_cmds(self)
dump_to_json(self, file_path)
list args
The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own ar...
list setup_cmds
Bash commands to run before the main self.cmd (mainly used for batch system setup)
str _status
The actual status of the overall Job.
list cmd
Command and arguments as a list that will be run by the job on the backend.
result
The result object of this Job.
list output_patterns
Files that we produce during the job and want to be returned.
create_subjob(self, i, input_files=None, args=None)
dict subjobs
dict of subjobs assigned to this job
splitter
The SubjobSplitter used to create subjobs if necessary.
copy_input_sandbox_files_to_working_dir(self)
from_json(cls, file_path)
output_dir
Output directory (pathlib.Path), where we will download our output_files to.
job_id
job id given by LSF
_get_status_from_output(self, output)
_update_result_status(self, bjobs_output)
__init__(self, job, job_id)
dict backend_code_to_status
LSF statuses mapped to Job statuses.
_create_parent_job_result(cls, parent)
_submit_to_batch(cls, cmd)
_create_cmd(self, script_path)
_add_batch_directives(self, job, batch_file)
can_submit(self, njobs=1)
bqueues(cls, output_fields=None, queues=None)
__init__(self, *, backend_args=None)
_create_job_result(cls, job, batch_output)
bjobs(cls, output_fields=None, job_id="", username="", queue="")
__init__(self, job, result)
_update_result_status(self)
result
The underlying result from the backend.
_create_parent_job_result(cls, parent)
pool
The actual Pool object of this instance of the Backend.
_max_processes
Internal attribute of max_processes.
max_processes
The size of the multiprocessing process pool.
__init__(self, *, backend_args=None, max_processes=1)
run_job(name, working_dir, output_dir, script)
__init__(self, *, arguments_generator=None, max_files_per_subjob=1)
create_subjobs(self, job)
max_files_per_subjob
The maximum number of input files that will be used for each SubJob created.
__init__(self, *, arguments_generator=None, max_subjobs=1000)
max_subjobs
The maximum number of SubJob objects to be created, input files are split evenly between them.
create_subjobs(self, job)
job_id
job id given by PBS
_get_status_from_output(self, output)
__init__(self, job, job_id)
_update_result_status(self, qstat_output)
dict backend_code_to_status
PBS statuses mapped to Job statuses.
create_job_record_from_element(job_elem)
_create_parent_job_result(cls, parent)
_submit_to_batch(cls, cmd)
_create_cmd(self, script_path)
_add_batch_directives(self, job, batch_file)
can_submit(self, njobs=1)
qstat(cls, username="", job_ids=None)
__init__(self, *, backend_args=None)
_create_job_result(cls, job, batch_output)
job
Job object for result.
exit_code_file_initial_time
Time we started waiting for the exit code file to appear.
bool _is_ready
Quicker way to know if it's ready once it has already been found.
time_to_wait_for_exit_code_file
After our first attempt to view the exit code file once the job is 'finished', how long should we wai...
get_exit_code_from_file(self)
__init__(self, job, subjob_id, input_files=None)
parent
Job() instance of parent to this SubJob.
__getattr__(self, attribute)
assign_arguments(self, job)
__init__(self, *, arguments_generator=None)
arguments_generator
The ArgumentsGenerator used when creating subjobs.
create_subjobs(self, job)