4 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
7 from abc
import ABC, abstractmethod
9 import xml.etree.ElementTree
as ET
11 from pathlib
import Path
12 from collections
import deque
13 from itertools
import count, takewhile
16 from datetime
import datetime, timedelta
18 import multiprocessing
as mp
20 from caf.utils
import method_dispatch
21 from caf.utils
import decode_json_string
22 from caf.utils
import grouper
23 from caf.utils
import parse_file_uri
26 __all__ = [
"Job",
"SubJob",
"Backend",
"Local",
"Batch",
"LSF",
"PBS",
"HTCondor",
"get_input_data"]
29 _input_data_file_path = Path(
"__BACKEND_INPUT_FILES__.json")
31 _STDOUT_FILE =
"stdout"
33 _STDERR_FILE =
"stderr"
38 Simple JSON load of the default input data file. Will contain a list of string file paths
39 for use by the job process.
41 with open(_input_data_file_path, mode=
"r")
as input_data_file:
42 input_data = json.load(input_data_file)
46 def monitor_jobs(args, jobs):
47 unfinished_jobs = jobs[:]
49 while unfinished_jobs:
50 B2INFO(f
"Updating statuses of unfinished jobs...")
51 for j
in unfinished_jobs:
53 B2INFO(f
"Checking if jobs are ready...")
54 for j
in unfinished_jobs[:]:
56 if j.status ==
"failed":
57 B2ERROR(f
"{j} is failed")
60 B2INFO(f
"{j} is finished")
61 unfinished_jobs.remove(j)
63 B2INFO(f
"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
64 time.sleep(args.heartbeat)
66 B2ERROR(f
"{failed_jobs} jobs failed")
68 B2INFO(
'All jobs finished successfully')
72 def __init__(self, generator_function, *args, **kwargs):
74 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
75 initialise it. This lets us re-use a generator by setting it up again fresh. This is not
76 optimal for expensive calculations, but it is nice for making large sequences of
77 Job input arguments on the fly.
80 generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
81 should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
82 yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
83 args (tuple): The positional arguments you want to send into the intialisation of the generator.
84 kwargs (dict): The keyword arguments you want to send into the intialisation of the generator.
97 generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
98 to have ``next``/``send`` called on it.
105 def range_arguments(start=0, stop=None, step=1):
107 A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
108 It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
109 The `SubJob` object is sent into this function with `send` but is not used.
112 start (int): The starting value that will be returned.
113 stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
115 step (int): The step size.
121 if stop
is not None and x >= stop:
127 subjob = (
yield None)
129 for i
in takewhile(
lambda x:
not should_stop(x), count(start, step)):
131 B2DEBUG(29, f
"{subjob} arguments will be {args}")
132 subjob = (
yield args)
137 Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
138 The `create_subjobs` function should be implemented and used to construct
139 the subjobs of the parent job object.
142 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
143 tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
144 called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
150 Derived classes should call `super` to run this.
158 Implement this method in derived classes to generate the `SubJob` objects.
163 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
166 if self.arguments_generator:
167 arg_gen = self.arguments_generator.generator
169 for subjob
in sorted(job.subjobs.values(), key=
lambda sj: sj.id):
172 args = arg_gen.send(subjob)
173 except StopIteration:
174 B2ERROR((f
"StopIteration called when getting args for {subjob}, "
175 "setting all subsequent subjobs to have empty argument tuples."))
180 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
184 B2INFO((f
"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
185 "won't automatically have arguments assigned."))
188 return f
"{self.__class__.__name__}"
193 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
196 max_files_per_subjob (int): The maximium number of input files used per `SubJob` created.
198 super().
__init__(arguments_generator=arguments_generator)
204 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
205 in order to prevent the number of input files per subjob going over the limit set by
206 `MaxFilesSplitter.max_files_per_subjob`.
208 if not job.input_files:
209 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
213 subjob = job.create_subjob(i, input_files=subjob_input_files)
217 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
222 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
225 max_subjobs (int): The maximium number ofsubjobs that will be created.
227 super().
__init__(arguments_generator=arguments_generator)
233 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
234 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
235 more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
236 respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
237 will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
239 if not job.input_files:
240 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
244 remaining_input_files = deque(job.input_files)
248 while remaining_input_files:
250 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
252 subjob_input_files = []
253 for i
in range(num_input_files):
254 subjob_input_files.append(remaining_input_files.popleft())
256 subjob = job.create_subjob(subjob_i, input_files=subjob_input_files)
258 available_subjobs -= 1
261 B2INFO(f
"{self} created {subjob_i} Subjobs for {job}")
266 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
267 Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
268 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
270 This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
271 numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
272 assinged to every `SubJob`.
275 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
278 def __init__(self, *, arguments_generator=None, max_subjobs=None):
281 super().
__init__(arguments_generator=arguments_generator)
287 This function creates subjobs for the parent job passed in. It creates subjobs until the
288 `SubjobSplitter.arguments_generator` finishes.
290 If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
297 raise SplitterError(f
"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
299 subjob =
SubJob(job, i, job.input_files)
300 args = arg_gen.send(subjob)
301 B2INFO(f
"Creating {job}.{subjob}")
302 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
304 job.subjobs[i] = subjob
305 except StopIteration:
307 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
312 This generic Job object is used to tell a Backend what to do.
313 This object basically holds necessary information about a process you want to submit to a `Backend`.
314 It should *not* do anything that is backend specific, just hold the configuration for a job to be
315 successfully submitted and monitored using a backend. The result attribute is where backend
316 specific job monitoring goes.
319 name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
321 .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
326 statuses = {
"init": 0,
"submitted": 1,
"running": 2,
"failed": 3,
"completed": 4}
329 exit_statuses = [
"failed",
"completed"]
365 self.
output_dir = Path(job_dict[
"output_dir"])
367 self.
cmd = job_dict[
"cmd"]
368 self.
args = job_dict[
"args"]
373 for subjob_dict
in job_dict[
"subjobs"]:
374 self.
create_subjob(subjob_dict[
"id"], input_files=subjob_dict[
"input_files"], args=subjob_dict[
"args"])
384 Representation of Job class (what happens when you print a Job() instance).
386 return f
"Job({self.name})"
390 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
391 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
392 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
395 B2DEBUG(29, f
"You requested the ready() status for {self} but there is no result object set, returning False.")
402 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
403 in the best way for the type of result object/backend.
406 B2DEBUG(29, f
"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
413 Creates a subjob Job object that references that parent Job.
414 Returns the SubJob object at the end.
417 B2INFO(f
"Creating {self}.Subjob({i})")
418 subjob =
SubJob(self, i, input_files)
424 B2WARNING(f
"{self} already contains SubJob({i})! This will not be created.")
429 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
430 subjob status in the hierarchy of statuses in `Job.statuses`.
439 def _get_overall_status_from_subjobs(self):
440 subjob_statuses = [subjob.status
for subjob
in self.
subjobs.values()]
441 status_level = min([self.
statuses[status]
for status
in subjob_statuses])
442 for status, level
in self.
statuses.items():
443 if level == status_level:
449 Sets the status of this Job.
452 if status ==
'failed':
453 B2ERROR(f
"Setting {self.name} status to failed")
455 B2INFO(f
"Setting {self.name} status to {status}")
478 @input_sandbox_files.setter
491 def max_subjobs(self):
495 def max_subjobs(self, value):
497 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
500 def max_files_per_subjob(self):
501 return self.
splitter.max_files_per_subjob
503 @max_files_per_subjob.setter
504 def max_files_per_subjob(self, value):
506 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
510 Dumps the Job object configuration to a JSON file so that it can be read in again later.
513 file_path(`Path`): The filepath we'll dump to
515 with open(file_path, mode=
"w")
as job_file:
516 json.dump(self.
job_dict, job_file, indent=2)
519 def from_json(cls, file_path):
520 with open(file_path, mode=
"r")
as job_file:
521 job_dict = json.load(job_file)
522 return cls(job_dict[
"name"], job_dict=job_dict)
528 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects. `Path` objects are converted to
529 string via ``Path.as_posix()``.
532 job_dict[
"name"] = self.
name
534 job_dict[
"working_dir"] = self.
working_dir.as_posix()
535 job_dict[
"output_dir"] = self.
output_dir.as_posix()
537 job_dict[
"cmd"] = self.
cmd
538 job_dict[
"args"] = self.
args
542 job_dict[
"subjobs"] = [sj.job_dict
for sj
in self.
subjobs.values()]
547 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
550 with open(Path(self.
working_dir, _input_data_file_path), mode=
"w")
as input_data_file:
551 json.dump(self.
input_files, input_data_file, indent=2)
555 Get all of the requested files for the input sandbox and copy them to the working directory.
556 Files like the submit.sh or input_data.json are not part of this process.
559 if file_path.is_dir():
560 shutil.copytree(file_path, Path(self.
working_dir, file_path.name))
566 Check the input files and make sure that there aren't any duplicates.
567 Also check if the files actually exist if possible.
569 existing_input_files = []
571 file_uri = parse_file_uri(file_path)
572 if file_uri.scheme ==
"file":
573 p = Path(file_uri.path)
575 if file_uri.geturl()
not in existing_input_files:
576 existing_input_files.append(file_uri.geturl())
578 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
580 B2WARNING(f
"Requested input file path {file_path} does not exist, skipping it.")
582 B2DEBUG(29, f
"{file_path} is not a local file URI. Skipping checking if file exists")
583 if file_path
not in existing_input_files:
584 existing_input_files.append(file_path)
586 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
588 B2WARNING(f
"No valid input file paths found for {job}, but some were requested.")
597 str: The full command that this job will run including any arguments.
599 all_components = self.
cmd[:]
600 all_components.extend(self.
args)
602 full_command =
" ".join(map(str, all_components))
603 B2DEBUG(29, f
"Full command of {self} is '{full_command}'")
608 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
609 It should detect if you are using a local release or CVMFS and append the correct commands
610 so that the job will have the same basf2 release environment. It should also detect
611 if a local release is not compiled with the ``opt`` option.
613 Note that this *doesn't mean that every environment variable is inherited* from the submitting
616 if "BELLE2_TOOLS" not in os.environ:
617 raise BackendError(
"No BELLE2_TOOLS found in environment")
618 if "BELLE2_RELEASE" in os.environ:
619 self.
setup_cmds.append(f
"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
620 elif 'BELLE2_LOCAL_DIR' in os.environ:
621 self.
setup_cmds.append(
"export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
622 self.
setup_cmds.append(f
"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
623 self.
setup_cmds.append(f
"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
624 self.
setup_cmds.append(f
"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
625 self.
setup_cmds.append(f
"pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
626 self.
setup_cmds.append(f
"source $BACKEND_B2SETUP")
628 self.
setup_cmds.append(f
"b2code-option $BACKEND_BELLE2_OPTION")
634 This mini-class simply holds basic information about which subjob you are
635 and a reference to the parent Job object to be able to access the main data there.
636 Rather than replicating all of the parent job's configuration again.
639 def __init__(self, job, subjob_id, input_files=None):
661 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
662 return Path(self.
parent.output_dir, str(self.
id))
666 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
667 return Path(self.
parent.working_dir, str(self.
id))
671 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
672 return "_".join((self.
parent.name, str(self.
id)))
677 Returns the status of this SubJob.
684 Sets the status of this Job.
687 if status ==
"failed":
688 B2ERROR(f
"Setting {self.name} status to failed")
690 B2INFO(f
"Setting {self.name} status to {status}")
696 A subjob cannot have subjobs. Always return empty list.
704 dict: A JSON serialisable representation of the `SubJob`. `Path` objects are converted to
705 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
706 we only output the input files and arguments that are specific to this subjob and no other details.
709 job_dict[
"id"] = self.
id
711 job_dict[
"args"] = self.
args
716 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
717 unless otherwise specified.
719 return getattr(self.
parent, attribute)
724 return f
"SubJob({self.name})"
729 Abstract base class for a valid backend.
730 Classes derived from this will implement their own submission of basf2 jobs
731 to whatever backend they describe.
732 Some common methods/attributes go into this base class.
734 For backend_args the priority from lowest to highest is:
736 backend.default_backend_args -> backend.backend_args -> job.backend_args
739 submit_script =
"submit.sh"
741 exit_code_file =
"__BACKEND_CMD_EXIT_STATUS__"
743 default_backend_args = {}
748 if backend_args
is None:
756 Base method for submitting collection jobs to the backend type. This MUST be
757 implemented for a correctly written backend class deriving from Backend().
763 Adds setup lines to the shell script file.
765 for line
in job.setup_cmds:
766 print(line, file=batch_file)
770 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
771 `trap` statements for Ctrl-C situations.
773 start_wrapper = f
"""# ---
774 # trap ctrl-c and call ctrl_c()
775 trap '(ctrl_c 130)' SIGINT
776 trap '(ctrl_c 143)' SIGTERM
778 function write_exit_code() {{
779 echo "Writing $1 to exit status file"
780 echo "$1" > {self.exit_code_file}
785 trap '' SIGINT SIGTERM
786 echo "** Trapped Ctrl-C **"
787 echo "$1" > {self.exit_code_file}
791 print(start_wrapper, file=batch_file)
795 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
796 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
797 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
798 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
801 end_wrapper =
"""# ---
802 write_exit_code $?"""
803 print(end_wrapper, file=batch_file)
808 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
809 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
810 statuses and allows the use of ready().
812 raise NotImplementedError
816 Construct the Path object of the bash script file that we will submit. It will contain
817 the actual job command, wrapper commands, setup commands, and any batch directives
824 Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
825 submitted to a backend. It provides a way to query a job's status to find out if it's ready.
830 Pass in the job object to allow the result to access the job's properties and do post-processing.
844 Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
845 the 'readiness' based on the known job status.
847 B2DEBUG(29, f
"Calling {self.job}.result.ready()")
850 elif self.
job.status
in self.
job.exit_statuses:
858 Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
859 actually look up the job's status from some database/exit code file.
861 raise NotImplementedError
865 Read the exit code file to discover the exit status of the job command. Useful falback if the job is no longer
866 known to the job database (batch system purged it for example). Since some backends may take time to download
867 the output files of the job back to the working directory we use a time limit on how long to wait.
871 exit_code_path = Path(self.
job.working_dir, Backend.exit_code_file)
872 with open(exit_code_path,
"r")
as f:
873 exit_code = int(f.read().strip())
874 B2DEBUG(29, f
"Exit code from file for {self.job} was {exit_code}")
880 Backend for local processes i.e. on the same machine but in a subprocess.
882 Note that you should call the self.join() method to close the pool and wait for any
883 running processes to finish before exiting the process. Once you've called join you will have to set up a new
884 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
885 somewhere, then the main python process might end before your pool is done.
888 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
889 It's the maximium simultaneous subjobs.
890 Try not to specify a large number or a number larger than the number of cores.
891 It won't crash the program but it will slow down and negatively impact performance.
894 def __init__(self, *, backend_args=None, max_processes=1):
897 super().
__init__(backend_args=backend_args)
905 Result class to help monitor status of jobs submitted by Local backend.
910 Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
911 post processing of the job.
917 def _update_result_status(self):
919 return_code = self.
result.get()
921 self.
job.status =
"failed"
923 self.
job.status =
"completed"
927 Update the job's (or subjobs') status by calling the result object.
929 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
931 for subjob
in self.
job.subjobs.values():
932 subjob.result._update_result_status()
938 Closes and joins the Pool, letting you wait for all results currently
941 B2INFO(
"Joining Process Pool, waiting for results to finish...")
944 B2INFO(
"Process Pool joined.")
949 Getter for max_processes
953 @max_processes.setter
956 Setter for max_processes, we also check for a previous Pool(), wait for it to join
957 and then create a new one with the new value of max_processes
962 B2INFO(f
"New max_processes requested. But a pool already exists.")
964 B2INFO(f
"Starting up new Pool with {self.max_processes} processes")
971 raise NotImplementedError((
"This is an abstract submit(job) method that shouldn't have been called. "
972 "Did you submit a (Sub)Job?"))
974 @submit.register(SubJob)
977 Submission of a `SubJob` for the Local backend
980 job.output_dir.mkdir(parents=
True, exist_ok=
True)
982 job.working_dir.mkdir(parents=
True, exist_ok=
True)
983 job.copy_input_sandbox_files_to_working_dir()
984 job.dump_input_data()
987 with open(script_path, mode=
"w")
as batch_file:
988 print(
"#!/bin/bash", file=batch_file)
991 print(job.full_command, file=batch_file)
993 B2INFO(f
"Submitting {job}")
1002 job.status =
"submitted"
1003 B2INFO(f
"{job} submitted")
1005 @submit.register(Job)
1008 Submission of a `Job` for the Local backend
1011 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1013 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1015 job.check_input_data_files()
1017 if not job.splitter:
1019 job.copy_input_sandbox_files_to_working_dir()
1020 job.dump_input_data()
1023 with open(script_path, mode=
"w")
as batch_file:
1024 print(
"#!/bin/bash", file=batch_file)
1027 print(job.full_command, file=batch_file)
1029 B2INFO(f
"Submitting {job}")
1038 B2INFO(f
"{job} submitted")
1041 job.splitter.create_subjobs(job)
1043 self.
submit(list(job.subjobs.values()))
1047 @submit.register(list)
1050 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1055 B2INFO(
"All requested jobs submitted.")
1058 def run_job(name, working_dir, output_dir, script):
1060 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1061 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1063 B2INFO(f
"Starting Sub-process: {name}")
1064 from subprocess
import Popen
1065 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1066 stderr_file_path = Path(working_dir, _STDERR_FILE)
1068 B2INFO(f
"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1069 with open(stdout_file_path, mode=
"w", buffering=1)
as f_out, \
1070 open(stderr_file_path, mode=
"w", buffering=1)
as f_err:
1071 with Popen([
"/bin/bash", script.as_posix()],
1075 universal_newlines=
True,
1080 B2INFO(f
"Subprocess {name} finished.")
1090 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1091 in a derived class. Do not use this class directly!
1094 submission_cmds = []
1107 default_global_job_limit = 1000
1109 default_sleep_between_submission_checks = 30
1113 Init method for Batch Backend. Does some basic default setup.
1115 super().
__init__(backend_args=backend_args)
1125 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1126 You should think about where the stdout/err should go, and set the queue name.
1128 raise NotImplementedError((
"Need to implement a _add_batch_directives(self, job, file) "
1129 f
"method in {self.__class__.__name__} backend."))
1133 Useful for the HTCondor backend where a submit is needed instead of batch
1134 directives pasted directly into the submission script. It should be overwritten
1142 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1147 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1148 This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1151 bool: If the job submission can continue based on the current situation.
1156 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1159 raise NotImplementedError((
"This is an abstract submit(job) method that shouldn't have been called. "
1160 "Did you submit a (Sub)Job?"))
1162 @submit.register(SubJob)
1163 def _(self, job, check_can_submit=True, jobs_per_check=100):
1165 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1166 create batch script, and send it off with the batch submission command.
1167 It should apply the correct options (default and user requested).
1169 Should set a Result object as an attribute of the job.
1174 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1175 job.copy_input_sandbox_files_to_working_dir()
1176 job.dump_input_data()
1183 with open(script_path, mode=
"w")
as batch_file:
1187 print(job.full_command, file=batch_file)
1189 os.chmod(script_path, 0o755)
1190 B2INFO(f
"Submitting {job}")
1195 job.status =
"submitted"
1196 B2INFO(f
"{job} submitted")
1198 @submit.register(Job)
1199 def _(self, job, check_can_submit=True, jobs_per_check=100):
1201 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1202 and send it off with the batch submission command, applying the correct options (default and user requested.)
1204 Should set a Result object as an attribute of the job.
1209 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1211 job.check_input_data_files()
1214 job_backend_args = {**self.
backend_args, **job.backend_args}
1217 if not job.splitter:
1219 job.copy_input_sandbox_files_to_working_dir()
1220 job.dump_input_data()
1227 with open(script_path, mode=
"w")
as batch_file:
1231 print(job.full_command, file=batch_file)
1233 os.chmod(script_path, 0o755)
1234 B2INFO(f
"Submitting {job}")
1239 job.status =
"submitted"
1240 B2INFO(f
"{job} submitted")
1243 job.splitter.create_subjobs(job)
1245 self.
submit(list(job.subjobs.values()))
1249 @submit.register(list)
1250 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1252 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1254 B2INFO(f
"Submitting a list of {len(jobs)} jobs to a Batch backend")
1264 B2INFO((f
"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1265 f
"limit for this backend (={self.global_job_limit}). Will instead use the "
1266 " value of the global job limit."))
1270 for jobs_to_submit
in grouper(jobs_per_check, jobs):
1272 while not self.
can_submit(njobs=len(jobs_to_submit)):
1273 B2INFO(f
"Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1278 B2INFO(f
"Submitting the next {len(jobs_to_submit)} jobs...")
1279 for job
in jobs_to_submit:
1280 self.
submit(job, check_can_submit, jobs_per_check)
1281 B2INFO(f
"All {len(jobs)} requested jobs submitted")
1285 Construct the Path object of the script file that we will submit using the batch command.
1286 For most batch backends this is the same script as the bash script we submit.
1287 But for some they require a separate submission file that describes the job.
1288 To implement that you can implement this function in the Backend class.
1306 Backend for submitting calibration processes to a qsub batch system.
1309 cmd_wkdir =
"#PBS -d"
1311 cmd_stdout =
"#PBS -o"
1313 cmd_stderr =
"#PBS -e"
1315 cmd_queue =
"#PBS -q"
1317 cmd_name =
"#PBS -N"
1319 submission_cmds = [
"qsub"]
1321 default_global_job_limit = 5000
1323 default_backend_args = {
"queue":
"short"}
1326 super().
__init__(backend_args=backend_args)
1330 Add PBS directives to submitted script.
1332 job_backend_args = {**self.
backend_args, **job.backend_args}
1333 batch_queue = job_backend_args[
"queue"]
1334 print(
"#!/bin/bash", file=batch_file)
1335 print(
"# --- Start PBS ---", file=batch_file)
1336 print(
" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1337 print(
" ".join([PBS.cmd_name, job.name]), file=batch_file)
1338 print(
" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1339 print(
" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1340 print(
" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1341 print(
"# --- End PBS ---", file=batch_file)
1347 job_id = batch_output.replace(
"\n",
"")
1348 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1355 submission_cmd.append(script_path.as_posix())
1356 return submission_cmd
1361 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1363 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True)
1368 parent.result = cls.
PBSResult(parent,
None)
1372 Simple class to help monitor status of jobs submitted by `PBS` Backend.
1374 You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1375 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1379 backend_code_to_status = {
"R":
"running",
1381 "FINISHED":
"completed",
1392 Pass in the job object and the job id to allow the result to do monitoring and perform
1393 post processing of the job.
1401 Update the job's (or subjobs') status by calling qstat.
1403 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
1405 qstat_output = PBS.qstat()
1406 if self.
job.subjobs:
1407 for subjob
in self.
job.subjobs.values():
1408 subjob.result._update_result_status(qstat_output)
1415 qstat_output (dict): The JSON output of a previous call to qstat which we can re-use to find the
1416 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1417 'job_state' information, otherwise it is useless.
1425 B2DEBUG(29, f
"Checking of the exit code from file for {self.job}")
1428 except FileNotFoundError:
1431 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1434 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1437 backend_status =
"E"
1439 backend_status =
"C"
1443 except KeyError
as err:
1444 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
1446 if new_job_status != self.
job.status:
1447 self.
job.status = new_job_status
1449 def _get_status_from_output(self, output):
1450 for job_info
in output[
"JOBS"]:
1451 if job_info[
"Job_Id"] == self.
job_id:
1452 return job_info[
"job_state"]
1458 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1459 Returns True if the number is lower that the limit, False if it is higher.
1462 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1463 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1464 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1465 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1466 and check again before submitting more.
1468 B2DEBUG(29,
"Calling PBS().can_submit()")
1469 job_info = self.
qstat(username=os.environ[
"USER"])
1470 total_jobs = job_info[
"NJOBS"]
1471 B2INFO(f
"Total jobs active in the PBS system is currently {total_jobs}")
1473 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1476 B2INFO(
"There is enough space to submit more jobs.")
1480 def qstat(cls, username="", job_ids=None):
1482 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1483 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1486 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1487 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1488 This one should work for Melbourne's cloud computing centre.
1491 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1492 will be in the output dictionary.
1493 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1494 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1495 matching the other filters will be returned.
1498 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1500 .. code-block:: python
1511 B2DEBUG(29, f
"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1514 job_ids = set(job_ids)
1515 cmd_list = [
"qstat",
"-x"]
1517 cmd =
" ".join(cmd_list)
1518 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1519 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1520 jobs_dict = {
"NJOBS": 0,
"JOBS": []}
1521 jobs_xml = ET.fromstring(output)
1524 if len(job_ids) == 1:
1525 job_elem = jobs_xml.find(f
"./Job[Job_Id='{list(job_ids)[0]}']")
1528 jobs_dict[
"NJOBS"] = 1
1533 for job
in jobs_xml.iterfind(
"Job"):
1534 job_owner = job.find(
"Job_Owner").text.split(
"@")[0]
1535 if username
and username != job_owner:
1537 job_id = job.find(
"Job_Id").text
1538 if job_ids
and job_id
not in job_ids:
1541 jobs_dict[
"NJOBS"] += 1
1543 if job_id
in job_ids:
1544 job_ids.remove(job_id)
1550 Creates a Job dictionary with various job information from the XML element returned by qstat.
1553 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1556 dict: JSON serialisable dictionary of the Job information we are interested in.
1559 job_dict[
"Job_Id"] = job_elem.find(
"Job_Id").text
1560 job_dict[
"Job_Name"] = job_elem.find(
"Job_Name").text
1561 job_dict[
"Job_Owner"] = job_elem.find(
"Job_Owner").text
1562 job_dict[
"job_state"] = job_elem.find(
"job_state").text
1563 job_dict[
"queue"] = job_elem.find(
"queue").text
1569 Backend for submitting calibration processes to a qsub batch system.
1572 cmd_wkdir =
"#BSUB -cwd"
1574 cmd_stdout =
"#BSUB -o"
1576 cmd_stderr =
"#BSUB -e"
1578 cmd_queue =
"#BSUB -q"
1580 cmd_name =
"#BSUB -J"
1582 submission_cmds = [
"bsub",
"-env",
"\"none\"",
"<"]
1584 default_global_job_limit = 15000
1586 default_backend_args = {
"queue":
"s"}
1589 super().
__init__(backend_args=backend_args)
1593 Adds LSF BSUB directives for the job to a script.
1595 job_backend_args = {**self.
backend_args, **job.backend_args}
1596 batch_queue = job_backend_args[
"queue"]
1597 print(
"#!/bin/bash", file=batch_file)
1598 print(
"# --- Start LSF ---", file=batch_file)
1599 print(
" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1600 print(
" ".join([LSF.cmd_name, job.name]), file=batch_file)
1601 print(
" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1602 print(
" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1603 print(
" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1604 print(
"# --- End LSF ---", file=batch_file)
1610 submission_cmd.append(script_path.as_posix())
1611 submission_cmd =
" ".join(submission_cmd)
1612 return [submission_cmd]
1617 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1619 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1624 Simple class to help monitor status of jobs submitted by LSF Backend.
1626 You pass in a `Job` object and job id from a bsub command.
1627 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1631 backend_code_to_status = {
"RUN":
"running",
1632 "DONE":
"completed",
1633 "FINISHED":
"completed",
1640 Pass in the job object and the job id to allow the result to do monitoring and perform
1641 post processing of the job.
1649 Update the job's (or subjobs') status by calling bjobs.
1651 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
1653 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"])
1654 if self.
job.subjobs:
1655 for subjob
in self.
job.subjobs.values():
1656 subjob.result._update_result_status(bjobs_output)
1663 bjobs_output (dict): The JSON output of a previous call to bjobs which we can re-use to find the
1664 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1665 'id' information, otherwise it is useless.
1673 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"], job_id=str(self.
job_id))
1681 except FileNotFoundError:
1684 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1687 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1690 backend_status =
"EXIT"
1692 backend_status =
"FINISHED"
1695 except KeyError
as err:
1696 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
1698 if new_job_status != self.
job.status:
1699 self.
job.status = new_job_status
1701 def _get_status_from_output(self, output):
1702 if output[
"JOBS"]
and "ERROR" in output[
"JOBS"][0]:
1703 if output[
"JOBS"][0][
"ERROR"] == f
"Job <{self.job_id}> is not found":
1704 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1706 raise BackendError(f
"Unidentified Error during status check for {self.job}: {output}")
1708 for job_info
in output[
"JOBS"]:
1709 if job_info[
"JOBID"] == self.
job_id:
1710 return job_info[
"STAT"]
1712 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1716 parent.result = cls.
LSFResult(parent,
None)
1722 m = re.search(
r"Job <(\d+)>", str(batch_output))
1726 raise BackendError(f
"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1728 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1733 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1734 Returns True if the number is lower that the limit, False if it is higher.
1737 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1738 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1739 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1740 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1741 and check again before submitting more.
1743 B2DEBUG(29,
"Calling LSF().can_submit()")
1744 job_info = self.
bjobs(output_fields=[
"stat"])
1745 total_jobs = job_info[
"NJOBS"]
1746 B2INFO(f
"Total jobs active in the LSF system is currently {total_jobs}")
1748 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1751 B2INFO(
"There is enough space to submit more jobs.")
1755 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1757 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1758 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1761 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1762 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1763 the output of this function will be only information about this job. If this argument is not given, then all jobs
1764 matching the other filters will be returned.
1765 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1766 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1767 receive job information from all known user jobs matching the other filters.
1768 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1771 dict: JSON dictionary of the form:
1773 .. code-block:: python
1776 "NJOBS":<njobs returned by command>,
1779 <output field: value>, ...
1784 B2DEBUG(29, f
"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1786 if not output_fields:
1787 output_fields = [
"id"]
1789 field_list_cmd =
"\""
1790 field_list_cmd +=
" ".join(output_fields)
1791 field_list_cmd +=
"\""
1792 cmd_list = [
"bjobs",
"-o", field_list_cmd]
1795 cmd_list.extend([
"-q", queue])
1798 cmd_list.extend([
"-u", username])
1800 cmd_list.append(
"-json")
1803 cmd_list.append(job_id)
1805 cmd =
" ".join(cmd_list)
1806 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1807 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True))
1808 output[
"NJOBS"] = output[
"JOBS"]
1809 output[
"JOBS"] = output[
"RECORDS"]
1810 del output[
"RECORDS"]
1811 del output[
"COMMAND"]
1815 def bqueues(cls, output_fields=None, queues=None):
1817 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1818 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1821 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1822 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1823 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1824 By default you will receive information about all queues.
1827 dict: JSON dictionary of the form:
1829 .. code-block:: python
1832 "COMMAND":"bqueues",
1836 "QUEUE_NAME":"b2_beast",
1837 "STATUS":"Open:Active",
1845 B2DEBUG(29, f
"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1847 if not output_fields:
1848 output_fields = [
"queue_name",
"status",
"max",
"njobs",
"pend",
"run"]
1850 field_list_cmd =
"\""
1851 field_list_cmd +=
" ".join(output_fields)
1852 field_list_cmd +=
"\""
1853 cmd_list = [
"bqueues",
"-o", field_list_cmd]
1855 cmd_list.append(
"-json")
1858 cmd_list.extend(queues)
1860 cmd =
" ".join(cmd_list)
1861 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1862 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1863 return decode_json_string(output)
1868 Backend for submitting calibration processes to a HTCondor batch system.
1871 batch_submit_script =
"submit.sub"
1873 submission_cmds = [
"condor_submit",
"-terse"]
1875 default_global_job_limit = 10000
1877 default_backend_args = {
1878 "universe":
"vanilla",
1880 "request_memory":
"4 GB",
1885 default_class_ads = [
"GlobalJobId",
"JobStatus",
"Owner"]
1889 Fill HTCondor submission file.
1893 files_to_transfer = [i.as_posix()
for i
in job.working_dir.iterdir()]
1895 job_backend_args = {**self.
backend_args, **job.backend_args}
1897 with open(submit_file_path,
"w")
as submit_file:
1898 print(f
'executable = {self.get_submit_script_path(job)}', file=submit_file)
1899 print(f
'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1900 print(f
'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1901 print(f
'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1902 print(f
'transfer_input_files = ',
','.join(files_to_transfer), file=submit_file)
1903 print(f
'universe = {job_backend_args["universe"]}', file=submit_file)
1904 print(f
'getenv = {job_backend_args["getenv"]}', file=submit_file)
1905 print(f
'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1906 print(
'should_transfer_files = Yes', file=submit_file)
1907 print(
'when_to_transfer_output = ON_EXIT', file=submit_file)
1909 for line
in job_backend_args[
"extra_lines"]:
1910 print(line, file=submit_file)
1911 print(
'queue', file=submit_file)
1915 For HTCondor leave empty as the directives are already included in the submit file.
1917 print(
'#!/bin/bash', file=batch_file)
1923 submission_cmd.append(script_path.as_posix())
1924 return submission_cmd
1928 Construct the Path object of the .sub file that we will use to describe the job.
1935 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1937 job_dir = Path(cmd[-1]).parent.as_posix()
1944 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, cwd=job_dir)
1946 except subprocess.CalledProcessError
as e:
1949 B2ERROR(f
"Error during condor_submit: {str(e)} occurred more than 3 times.")
1952 B2ERROR(f
"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
1954 return sub_out.split()[0]
1958 Simple class to help monitor status of jobs submitted by HTCondor Backend.
1960 You pass in a `Job` object and job id from a condor_submit command.
1961 When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
1962 to see whether or not the job has finished.
1966 backend_code_to_status = {0:
"submitted",
1977 Pass in the job object and the job id to allow the result to do monitoring and perform
1978 post processing of the job.
1986 Update the job's (or subjobs') status by calling condor_q.
1988 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
1990 condor_q_output = HTCondor.condor_q()
1991 if self.
job.subjobs:
1992 for subjob
in self.
job.subjobs.values():
1993 subjob.result._update_result_status(condor_q_output)
1999 In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2000 If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2001 ``condor_history``, etc.
2004 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can re-use to find the
2005 status of this job if it was active when that command ran.
2007 B2DEBUG(29, f
"Calling {self.job}.result._update_result_status()")
2009 for job_record
in condor_q_output[
"JOBS"]:
2010 job_id = job_record[
"GlobalJobId"].split(
"#")[1]
2011 if job_id == self.
job_id:
2012 B2DEBUG(29, f
"Found {self.job_id} in condor_q_output.")
2013 jobs_info.append(job_record)
2019 except FileNotFoundError:
2022 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2025 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
2028 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2030 jobs_info = [{
"JobStatus": 4,
"HoldReason":
None}]
2034 jobs_info = HTCondor.condor_q(job_id=self.
job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2040 jobs_info = HTCondor.condor_history(job_id=self.
job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2042 hold_reason =
"No Reason Known"
2046 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2048 job_info = jobs_info[0]
2049 backend_status = job_info[
"JobStatus"]
2051 if backend_status == 5:
2052 hold_reason = job_info.get(
"HoldReason",
None)
2053 B2WARNING(f
"{self.job} on hold because of {hold_reason}. Keep waiting.")
2057 except KeyError
as err:
2058 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
2059 if new_job_status != self.
job.status:
2060 self.
job.status = new_job_status
2066 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
2075 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2076 Returns True if the number is lower that the limit, False if it is higher.
2079 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2080 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2081 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2082 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2083 and check again before submitting more.
2085 B2DEBUG(29,
"Calling HTCondor().can_submit()")
2087 total_jobs = jobs_info[
"NJOBS"]
2088 B2INFO(f
"Total jobs active in the HTCondor system is currently {total_jobs}")
2090 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2093 B2INFO(
"There is enough space to submit more jobs.")
2097 def condor_q(cls, class_ads=None, job_id="", username=""):
2099 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2100 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2101 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2104 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2105 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2106 by the condor_q call.
2107 job_id (str): String representation of the Job ID given by condor_submit during submission.
2108 If this argument is given then the output of this function will be only information about this job.
2109 If this argument is not given, then all jobs matching the other filters will be returned.
2110 username (str): By default we return information about only the current user's jobs. By giving
2111 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2112 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2113 so it isn't recommended.
2116 dict: JSON dictionary of the form:
2118 .. code-block:: python
2121 "NJOBS":<number of records returned by command>,
2124 <ClassAd: value>, ...
2129 B2DEBUG(29, f
"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2133 field_list_cmd =
",".join(class_ads)
2134 cmd_list = [
"condor_q",
"-json",
"-attributes", field_list_cmd]
2137 cmd_list.append(job_id)
2140 username = os.environ[
"USER"]
2142 if username ==
"all":
2143 cmd_list.append(
"-allusers")
2145 cmd_list.append(username)
2147 cmd =
" ".join(cmd_list)
2148 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2151 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2152 except BaseException:
2156 records = decode_json_string(records)
2159 jobs_info = {
"JOBS": records}
2160 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2166 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2167 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2168 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2171 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2172 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2173 by the condor_q call.
2174 job_id (str): String representation of the Job ID given by condor_submit during submission.
2175 If this argument is given then the output of this function will be only information about this job.
2176 If this argument is not given, then all jobs matching the other filters will be returned.
2177 username (str): By default we return information about only the current user's jobs. By giving
2178 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2179 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2180 and isn't recommended.
2183 dict: JSON dictionary of the form:
2185 .. code-block:: python
2188 "NJOBS":<number of records returned by command>,
2191 <ClassAd: value>, ...
2196 B2DEBUG(29, f
"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2200 field_list_cmd =
",".join(class_ads)
2201 cmd_list = [
"condor_history",
"-json",
"-attributes", field_list_cmd]
2204 cmd_list.append(job_id)
2207 username = os.environ[
"USER"]
2209 if username !=
"all":
2210 cmd_list.append(username)
2212 cmd =
" ".join(cmd_list)
2213 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2215 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2216 except BaseException:
2220 records = decode_json_string(records)
2224 jobs_info = {
"JOBS": records}
2225 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2231 Backend for submitting calibration processes to the grid.
2235 class BackendError(Exception):
2237 Base exception class for Backend classes.
2243 Base exception class for Job objects.
2249 Base exception class for this module.