12 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
15 from abc
import ABC, abstractmethod
17 import xml.etree.ElementTree
as ET
19 from pathlib
import Path
20 from collections
import deque
21 from itertools
import count, takewhile
24 from datetime
import datetime, timedelta
26 import multiprocessing
as mp
28 from caf.utils
import method_dispatch
29 from caf.utils
import decode_json_string
30 from caf.utils
import grouper
31 from caf.utils
import parse_file_uri
34 __all__ = [
"Job",
"SubJob",
"Backend",
"Local",
"Batch",
"LSF",
"PBS",
"HTCondor",
"get_input_data"]
37 _input_data_file_path = Path(
"__BACKEND_INPUT_FILES__.json")
39 _STDOUT_FILE =
"stdout"
41 _STDERR_FILE =
"stderr"
46 Simple JSON load of the default input data file. Will contain a list of string file paths
47 for use by the job process.
49 with open(_input_data_file_path, mode=
"r")
as input_data_file:
50 input_data = json.load(input_data_file)
54 def monitor_jobs(args, jobs):
55 unfinished_jobs = jobs[:]
57 while unfinished_jobs:
58 B2INFO(
"Updating statuses of unfinished jobs...")
59 for j
in unfinished_jobs:
61 B2INFO(
"Checking if jobs are ready...")
62 for j
in unfinished_jobs[:]:
64 if j.status ==
"failed":
65 B2ERROR(f
"{j} is failed")
68 B2INFO(f
"{j} is finished")
69 unfinished_jobs.remove(j)
71 B2INFO(f
"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
72 time.sleep(args.heartbeat)
74 B2ERROR(f
"{failed_jobs} jobs failed")
76 B2INFO(
'All jobs finished successfully')
80 def __init__(self, generator_function, *args, **kwargs):
82 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
83 initialise it. This lets us re-use a generator by setting it up again fresh. This is not
84 optimal for expensive calculations, but it is nice for making large sequences of
85 Job input arguments on the fly.
88 generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
89 should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
90 yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
91 args (tuple): The positional arguments you want to send into the intialisation of the generator.
92 kwargs (dict): The keyword arguments you want to send into the intialisation of the generator.
105 generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
106 to have ``next``/``send`` called on it.
113 def range_arguments(start=0, stop=None, step=1):
115 A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
116 It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
117 The `SubJob` object is sent into this function with `send` but is not used.
120 start (int): The starting value that will be returned.
121 stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
123 step (int): The step size.
129 if stop
is not None and x >= stop:
135 subjob = (
yield None)
137 for i
in takewhile(
lambda x:
not should_stop(x), count(start, step)):
139 B2DEBUG(29, f
"{subjob} arguments will be {args}")
140 subjob = (
yield args)
145 Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
146 The `create_subjobs` function should be implemented and used to construct
147 the subjobs of the parent job object.
150 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
151 tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
152 called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
158 Derived classes should call `super` to run this.
166 Implement this method in derived classes to generate the `SubJob` objects.
171 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
174 if self.arguments_generator:
175 arg_gen = self.arguments_generator.generator
177 for subjob
in sorted(job.subjobs.values(), key=
lambda sj: sj.id):
180 args = arg_gen.send(subjob)
181 except StopIteration:
182 B2ERROR((f
"StopIteration called when getting args for {subjob}, "
183 "setting all subsequent subjobs to have empty argument tuples."))
188 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
192 B2INFO((f
"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
193 "won't automatically have arguments assigned."))
196 return f
"{self.__class__.__name__}"
201 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
204 max_files_per_subjob (int): The maximium number of input files used per `SubJob` created.
206 super().
__init__(arguments_generator=arguments_generator)
212 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
213 in order to prevent the number of input files per subjob going over the limit set by
214 `MaxFilesSplitter.max_files_per_subjob`.
216 if not job.input_files:
217 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
220 for i, subjob_input_files
in enumerate(grouper(self.
max_files_per_subjobmax_files_per_subjob, job.input_files)):
221 job.create_subjob(i, input_files=subjob_input_files)
225 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
230 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
233 max_subjobs (int): The maximium number ofsubjobs that will be created.
235 super().
__init__(arguments_generator=arguments_generator)
241 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
242 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
243 more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
244 respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
245 will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
247 if not job.input_files:
248 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
252 remaining_input_files = deque(job.input_files)
256 while remaining_input_files:
258 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
260 subjob_input_files = []
261 for i
in range(num_input_files):
262 subjob_input_files.append(remaining_input_files.popleft())
264 job.create_subjob(subjob_i, input_files=subjob_input_files)
266 available_subjobs -= 1
269 B2INFO(f
"{self} created {subjob_i} Subjobs for {job}")
274 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
275 Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
276 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
278 This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
279 numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
280 assinged to every `SubJob`.
283 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
286 def __init__(self, *, arguments_generator=None, max_subjobs=None):
289 super().
__init__(arguments_generator=arguments_generator)
295 This function creates subjobs for the parent job passed in. It creates subjobs until the
296 `SubjobSplitter.arguments_generator` finishes.
298 If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
305 raise SplitterError(f
"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
307 subjob =
SubJob(job, i, job.input_files)
308 args = arg_gen.send(subjob)
309 B2INFO(f
"Creating {job}.{subjob}")
310 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
312 job.subjobs[i] = subjob
313 except StopIteration:
315 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
320 This generic Job object is used to tell a Backend what to do.
321 This object basically holds necessary information about a process you want to submit to a `Backend`.
322 It should *not* do anything that is backend specific, just hold the configuration for a job to be
323 successfully submitted and monitored using a backend. The result attribute is where backend
324 specific job monitoring goes.
327 name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
329 .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
334 statuses = {
"init": 0,
"submitted": 1,
"running": 2,
"failed": 3,
"completed": 4}
337 exit_statuses = [
"failed",
"completed"]
375 self.
cmdcmd = job_dict[
"cmd"]
376 self.
argsargs = job_dict[
"args"]
378 self.
setup_cmdssetup_cmds = job_dict[
"setup_cmds"]
379 self.
backend_argsbackend_args = job_dict[
"backend_args"]
381 for subjob_dict
in job_dict[
"subjobs"]:
382 self.
create_subjobcreate_subjob(subjob_dict[
"id"], input_files=subjob_dict[
"input_files"], args=subjob_dict[
"args"])
392 Representation of Job class (what happens when you print a Job() instance).
394 return f
"Job({self.name})"
398 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
399 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
400 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
403 B2DEBUG(29, f
"You requested the ready() status for {self} but there is no result object set, returning False.")
410 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
411 in the best way for the type of result object/backend.
414 B2DEBUG(29, f
"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
421 Creates a subjob Job object that references that parent Job.
422 Returns the SubJob object at the end.
424 if i
not in self.
subjobssubjobs:
425 B2INFO(f
"Creating {self}.Subjob({i})")
426 subjob =
SubJob(self, i, input_files)
429 self.
subjobssubjobs[i] = subjob
432 B2WARNING(f
"{self} already contains SubJob({i})! This will not be created.")
437 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
438 subjob status in the hierarchy of statuses in `Job.statuses`.
442 if job_status != self.
_status_status:
447 def _get_overall_status_from_subjobs(self):
448 subjob_statuses = [subjob.status
for subjob
in self.
subjobssubjobs.values()]
449 status_level = min([self.
statusesstatuses[status]
for status
in subjob_statuses])
450 for status, level
in self.
statusesstatuses.items():
451 if level == status_level:
457 Sets the status of this Job.
460 if status ==
'failed':
461 B2ERROR(f
"Setting {self.name} status to failed")
463 B2INFO(f
"Setting {self.name} status to {status}")
472 self.
_output_dir_output_dir = Path(value).absolute()
486 @input_sandbox_files.setter
499 def max_subjobs(self):
500 return self.
splittersplitter.max_subjobs
503 def max_subjobs(self, value):
505 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
508 def max_files_per_subjob(self):
509 return self.
splittersplitter.max_files_per_subjob
511 @max_files_per_subjob.setter
512 def max_files_per_subjob(self, value):
514 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
518 Dumps the Job object configuration to a JSON file so that it can be read in again later.
521 file_path(`basf2.Path`): The filepath we'll dump to
523 with open(file_path, mode=
"w")
as job_file:
524 json.dump(self.
job_dictjob_dict, job_file, indent=2)
527 def from_json(cls, file_path):
528 with open(file_path, mode=
"r")
as job_file:
529 job_dict = json.load(job_file)
530 return cls(job_dict[
"name"], job_dict=job_dict)
536 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
537 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
540 job_dict[
"name"] = self.
namename
545 job_dict[
"cmd"] = self.
cmdcmd
546 job_dict[
"args"] = self.
argsargs
548 job_dict[
"setup_cmds"] = self.
setup_cmdssetup_cmds
549 job_dict[
"backend_args"] = self.
backend_argsbackend_args
550 job_dict[
"subjobs"] = [sj.job_dict
for sj
in self.
subjobssubjobs.values()]
555 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
563 Get all of the requested files for the input sandbox and copy them to the working directory.
564 Files like the submit.sh or input_data.json are not part of this process.
567 if file_path.is_dir():
574 Check the input files and make sure that there aren't any duplicates.
575 Also check if the files actually exist if possible.
577 existing_input_files = []
579 file_uri = parse_file_uri(file_path)
580 if file_uri.scheme ==
"file":
581 p = Path(file_uri.path)
583 if file_uri.geturl()
not in existing_input_files:
584 existing_input_files.append(file_uri.geturl())
586 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
588 B2WARNING(f
"Requested input file path {file_path} does not exist, skipping it.")
590 B2DEBUG(29, f
"{file_path} is not a local file URI. Skipping checking if file exists")
591 if file_path
not in existing_input_files:
592 existing_input_files.append(file_path)
594 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
596 B2WARNING(f
"No valid input file paths found for {self.name}, but some were requested.")
605 str: The full command that this job will run including any arguments.
607 all_components = self.
cmdcmd[:]
608 all_components.extend(self.
argsargs)
610 full_command =
" ".join(map(str, all_components))
611 B2DEBUG(29, f
"Full command of {self} is '{full_command}'")
616 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
617 It should detect if you are using a local release or CVMFS and append the correct commands
618 so that the job will have the same basf2 release environment. It should also detect
619 if a local release is not compiled with the ``opt`` option.
621 Note that this *doesn't mean that every environment variable is inherited* from the submitting
624 if "BELLE2_TOOLS" not in os.environ:
625 raise BackendError(
"No BELLE2_TOOLS found in environment")
626 if "BELLE2_CONFIG_DIR" in os.environ:
627 self.
setup_cmdssetup_cmds.append(f
"""if [ -z "${{BELLE2_CONFIG_DIR}}" ]; then
628 export BELLE2_CONFIG_DIR={os.environ['BELLE2_CONFIG_DIR']}
630 if "VO_BELLE2_SW_DIR" in os.environ:
631 self.
setup_cmdssetup_cmds.append(f
"""if [ -z "${{VO_BELLE2_SW_DIR}}" ]; then
632 export VO_BELLE2_SW_DIR={os.environ['VO_BELLE2_SW_DIR']}
634 if "BELLE2_RELEASE" in os.environ:
635 self.
setup_cmdssetup_cmds.append(f
"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
636 elif 'BELLE2_LOCAL_DIR' in os.environ:
637 self.
setup_cmdssetup_cmds.append(
"export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
638 self.
setup_cmdssetup_cmds.append(f
"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
639 self.
setup_cmdssetup_cmds.append(f
"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
640 self.
setup_cmdssetup_cmds.append(f
"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
641 self.
setup_cmdssetup_cmds.append(
"pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
642 self.
setup_cmdssetup_cmds.append(
"source $BACKEND_B2SETUP")
644 self.
setup_cmdssetup_cmds.append(
"b2code-option $BACKEND_BELLE2_OPTION")
645 self.
setup_cmdssetup_cmds.append(
"popd > /dev/null")
650 This mini-class simply holds basic information about which subjob you are
651 and a reference to the parent Job object to be able to access the main data there.
652 Rather than replicating all of the parent job's configuration again.
655 def __init__(self, job, subjob_id, input_files=None):
659 self.
idid = subjob_id
677 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
678 return Path(self.
parentparent.output_dir, str(self.
idid))
682 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
683 return Path(self.
parentparent.working_dir, str(self.
idid))
687 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
688 return "_".join((self.
parentparent.name, str(self.
idid)))
693 Returns the status of this SubJob.
700 Sets the status of this Job.
703 if status ==
"failed":
704 B2ERROR(f
"Setting {self.name} status to failed")
706 B2INFO(f
"Setting {self.name} status to {status}")
712 A subjob cannot have subjobs. Always return empty list.
720 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
721 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
722 we only output the input files and arguments that are specific to this subjob and no other details.
725 job_dict[
"id"] = self.
idid
727 job_dict[
"args"] = self.
argsargsargs
732 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
733 unless otherwise specified.
735 return getattr(self.
parentparent, attribute)
740 return f
"SubJob({self.name})"
745 Abstract base class for a valid backend.
746 Classes derived from this will implement their own submission of basf2 jobs
747 to whatever backend they describe.
748 Some common methods/attributes go into this base class.
750 For backend_args the priority from lowest to highest is:
752 backend.default_backend_args -> backend.backend_args -> job.backend_args
755 submit_script =
"submit.sh"
757 exit_code_file =
"__BACKEND_CMD_EXIT_STATUS__"
759 default_backend_args = {}
764 if backend_args
is None:
772 Base method for submitting collection jobs to the backend type. This MUST be
773 implemented for a correctly written backend class deriving from Backend().
779 Adds setup lines to the shell script file.
781 for line
in job.setup_cmds:
782 print(line, file=batch_file)
786 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
787 `trap` statements for Ctrl-C situations.
789 start_wrapper = f
"""# ---
790 # trap ctrl-c and call ctrl_c()
791 trap '(ctrl_c 130)' SIGINT
792 trap '(ctrl_c 143)' SIGTERM
794 function write_exit_code() {{
795 echo "Writing $1 to exit status file"
796 echo "$1" > {self.exit_code_file}
801 trap '' SIGINT SIGTERM
802 echo "** Trapped Ctrl-C **"
803 echo "$1" > {self.exit_code_file}
807 print(start_wrapper, file=batch_file)
811 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
812 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
813 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
814 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
817 end_wrapper =
"""# ---
818 write_exit_code $?"""
819 print(end_wrapper, file=batch_file)
824 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
825 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
826 statuses and allows the use of ready().
828 raise NotImplementedError
832 Construct the Path object of the bash script file that we will submit. It will contain
833 the actual job command, wrapper commands, setup commands, and any batch directives
835 return Path(job.working_dir, self.
submit_scriptsubmit_script)
840 Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
841 submitted to a backend. It provides a way to query a job's status to find out if it's ready.
846 Pass in the job object to allow the result to access the job's properties and do post-processing.
860 Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
861 the 'readiness' based on the known job status.
863 B2DEBUG(29, f
"Calling {self.job}.result.ready()")
866 elif self.
jobjob.status
in self.
jobjob.exit_statuses:
874 Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
875 actually look up the job's status from some database/exit code file.
877 raise NotImplementedError
881 Read the exit code file to discover the exit status of the job command. Useful falback if the job is no longer
882 known to the job database (batch system purged it for example). Since some backends may take time to download
883 the output files of the job back to the working directory we use a time limit on how long to wait.
887 exit_code_path = Path(self.
jobjob.working_dir, Backend.exit_code_file)
888 with open(exit_code_path,
"r")
as f:
889 exit_code = int(f.read().strip())
890 B2DEBUG(29, f
"Exit code from file for {self.job} was {exit_code}")
896 Backend for local processes i.e. on the same machine but in a subprocess.
898 Note that you should call the self.join() method to close the pool and wait for any
899 running processes to finish before exiting the process. Once you've called join you will have to set up a new
900 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
901 somewhere, then the main python process might end before your pool is done.
904 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
905 It's the maximium simultaneous subjobs.
906 Try not to specify a large number or a number larger than the number of cores.
907 It won't crash the program but it will slow down and negatively impact performance.
910 def __init__(self, *, backend_args=None, max_processes=1):
913 super().
__init__(backend_args=backend_args)
921 Result class to help monitor status of jobs submitted by Local backend.
926 Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
927 post processing of the job.
933 def _update_result_status(self):
934 if self.
resultresult.
ready()
and (self.
jobjob.status
not in self.
jobjob.exit_statuses):
935 return_code = self.
resultresult.get()
937 self.
jobjob.status =
"failed"
939 self.
jobjob.status =
"completed"
943 Update the job's (or subjobs') status by calling the result object.
945 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
946 if self.
jobjob.subjobs:
947 for subjob
in self.
jobjob.subjobs.values():
948 subjob.result._update_result_status()
954 Closes and joins the Pool, letting you wait for all results currently
957 B2INFO(
"Joining Process Pool, waiting for results to finish...")
958 self.
poolpool.close()
960 B2INFO(
"Process Pool joined.")
965 Getter for max_processes
969 @max_processes.setter
972 Setter for max_processes, we also check for a previous Pool(), wait for it to join
973 and then create a new one with the new value of max_processes
978 B2INFO(
"New max_processes requested. But a pool already exists.")
980 B2INFO(f
"Starting up new Pool with {self.max_processes} processes")
987 raise NotImplementedError((
"This is an abstract submit(job) method that shouldn't have been called. "
988 "Did you submit a (Sub)Job?"))
990 @submit.register(SubJob)
993 Submission of a `SubJob` for the Local backend
996 job.output_dir.mkdir(parents=
True, exist_ok=
True)
998 job.working_dir.mkdir(parents=
True, exist_ok=
True)
999 job.copy_input_sandbox_files_to_working_dir()
1000 job.dump_input_data()
1003 with open(script_path, mode=
"w")
as batch_file:
1004 print(
"#!/bin/bash", file=batch_file)
1007 print(job.full_command, file=batch_file)
1009 B2INFO(f
"Submitting {job}")
1018 job.status =
"submitted"
1019 B2INFO(f
"{job} submitted")
1021 @submit.register(Job)
1024 Submission of a `Job` for the Local backend
1027 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1029 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1031 job.check_input_data_files()
1033 if not job.splitter:
1035 job.copy_input_sandbox_files_to_working_dir()
1036 job.dump_input_data()
1039 with open(script_path, mode=
"w")
as batch_file:
1040 print(
"#!/bin/bash", file=batch_file)
1043 print(job.full_command, file=batch_file)
1045 B2INFO(f
"Submitting {job}")
1054 B2INFO(f
"{job} submitted")
1057 job.splitter.create_subjobs(job)
1063 @submit.register(list)
1066 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1071 B2INFO(
"All requested jobs submitted.")
1074 def run_job(name, working_dir, output_dir, script):
1076 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1077 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1079 B2INFO(f
"Starting Sub-process: {name}")
1080 from subprocess
import Popen
1081 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1082 stderr_file_path = Path(working_dir, _STDERR_FILE)
1084 B2INFO(f
"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1085 with open(stdout_file_path, mode=
"w", buffering=1)
as f_out, \
1086 open(stderr_file_path, mode=
"w", buffering=1)
as f_err:
1087 with Popen([
"/bin/bash", script.as_posix()],
1091 universal_newlines=
True,
1096 B2INFO(f
"Subprocess {name} finished.")
1101 parent.result = cls.
LocalResultLocalResult(parent,
None)
1106 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1107 in a derived class. Do not use this class directly!
1110 submission_cmds = []
1123 default_global_job_limit = 1000
1125 default_sleep_between_submission_checks = 30
1129 Init method for Batch Backend. Does some basic default setup.
1131 super().
__init__(backend_args=backend_args)
1141 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1142 You should think about where the stdout/err should go, and set the queue name.
1144 raise NotImplementedError((
"Need to implement a _add_batch_directives(self, job, file) "
1145 f
"method in {self.__class__.__name__} backend."))
1149 Useful for the HTCondor backend where a submit is needed instead of batch
1150 directives pasted directly into the submission script. It should be overwritten
1158 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1163 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1164 This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1167 bool: If the job submission can continue based on the current situation.
1172 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1175 raise NotImplementedError((
"This is an abstract submit(job) method that shouldn't have been called. "
1176 "Did you submit a (Sub)Job?"))
1178 @submit.register(SubJob)
1179 def _(self, job, check_can_submit=True, jobs_per_check=100):
1181 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1182 create batch script, and send it off with the batch submission command.
1183 It should apply the correct options (default and user requested).
1185 Should set a Result object as an attribute of the job.
1190 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1191 job.copy_input_sandbox_files_to_working_dir()
1192 job.dump_input_data()
1199 with open(script_path, mode=
"w")
as batch_file:
1203 print(job.full_command, file=batch_file)
1205 os.chmod(script_path, 0o755)
1206 B2INFO(f
"Submitting {job}")
1208 cmd = self.
_create_cmd_create_cmd(batch_submit_script_path)
1211 job.status =
"submitted"
1212 B2INFO(f
"{job} submitted")
1214 @submit.register(Job)
1215 def _(self, job, check_can_submit=True, jobs_per_check=100):
1217 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1218 and send it off with the batch submission command, applying the correct options (default and user requested.)
1220 Should set a Result object as an attribute of the job.
1225 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1227 job.check_input_data_files()
1233 if not job.splitter:
1235 job.copy_input_sandbox_files_to_working_dir()
1236 job.dump_input_data()
1243 with open(script_path, mode=
"w")
as batch_file:
1247 print(job.full_command, file=batch_file)
1249 os.chmod(script_path, 0o755)
1250 B2INFO(f
"Submitting {job}")
1252 cmd = self.
_create_cmd_create_cmd(batch_submit_script_path)
1255 job.status =
"submitted"
1256 B2INFO(f
"{job} submitted")
1259 job.splitter.create_subjobs(job)
1265 @submit.register(list)
1266 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1268 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1270 B2INFO(f
"Submitting a list of {len(jobs)} jobs to a Batch backend")
1280 B2INFO((f
"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1281 f
"limit for this backend (={self.global_job_limit}). Will instead use the "
1282 " value of the global job limit."))
1286 for jobs_to_submit
in grouper(jobs_per_check, jobs):
1288 while not self.
can_submitcan_submit(njobs=len(jobs_to_submit)):
1289 B2INFO(
"Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1294 B2INFO(f
"Submitting the next {len(jobs_to_submit)} jobs...")
1295 for job
in jobs_to_submit:
1296 self.
submitsubmitsubmit(job, check_can_submit, jobs_per_check)
1297 B2INFO(f
"All {len(jobs)} requested jobs submitted")
1301 Construct the Path object of the script file that we will submit using the batch command.
1302 For most batch backends this is the same script as the bash script we submit.
1303 But for some they require a separate submission file that describes the job.
1304 To implement that you can implement this function in the Backend class.
1306 return Path(job.working_dir, self.
submit_scriptsubmit_script)
1322 Backend for submitting calibration processes to a qsub batch system.
1325 cmd_wkdir =
"#PBS -d"
1327 cmd_stdout =
"#PBS -o"
1329 cmd_stderr =
"#PBS -e"
1331 cmd_queue =
"#PBS -q"
1333 cmd_name =
"#PBS -N"
1335 submission_cmds = [
"qsub"]
1337 default_global_job_limit = 5000
1339 default_backend_args = {
"queue":
"short"}
1342 super().
__init__(backend_args=backend_args)
1346 Add PBS directives to submitted script.
1348 job_backend_args = {**self.
backend_argsbackend_args, **job.backend_args}
1349 batch_queue = job_backend_args[
"queue"]
1350 print(
"#!/bin/bash", file=batch_file)
1351 print(
"# --- Start PBS ---", file=batch_file)
1352 print(
" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1353 print(
" ".join([PBS.cmd_name, job.name]), file=batch_file)
1354 print(
" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1355 print(
" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1356 print(
" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1357 print(
"# --- End PBS ---", file=batch_file)
1363 job_id = batch_output.replace(
"\n",
"")
1364 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1365 job.result = cls.
PBSResultPBSResult(job, job_id)
1371 submission_cmd.append(script_path.as_posix())
1372 return submission_cmd
1377 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1379 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True)
1384 parent.result = cls.
PBSResultPBSResult(parent,
None)
1388 Simple class to help monitor status of jobs submitted by `PBS` Backend.
1390 You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1391 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1395 backend_code_to_status = {
"R":
"running",
1397 "FINISHED":
"completed",
1408 Pass in the job object and the job id to allow the result to do monitoring and perform
1409 post processing of the job.
1417 Update the job's (or subjobs') status by calling qstat.
1419 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
1421 qstat_output = PBS.qstat()
1422 if self.
jobjob.subjobs:
1423 for subjob
in self.
jobjob.subjobs.values():
1424 subjob.result._update_result_status(qstat_output)
1431 qstat_output (dict): The JSON output of a previous call to qstat which we can re-use to find the
1432 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1433 'job_state' information, otherwise it is useless.
1441 B2DEBUG(29, f
"Checking of the exit code from file for {self.job}")
1444 except FileNotFoundError:
1447 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1450 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1453 backend_status =
"E"
1455 backend_status =
"C"
1459 except KeyError
as err:
1460 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1462 if new_job_status != self.
jobjob.status:
1463 self.
jobjob.status = new_job_status
1465 def _get_status_from_output(self, output):
1466 for job_info
in output[
"JOBS"]:
1467 if job_info[
"Job_Id"] == self.
job_idjob_id:
1468 return job_info[
"job_state"]
1474 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1475 Returns True if the number is lower that the limit, False if it is higher.
1478 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1479 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1480 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1481 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1482 and check again before submitting more.
1484 B2DEBUG(29,
"Calling PBS().can_submit()")
1485 job_info = self.
qstatqstat(username=os.environ[
"USER"])
1486 total_jobs = job_info[
"NJOBS"]
1487 B2INFO(f
"Total jobs active in the PBS system is currently {total_jobs}")
1489 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1492 B2INFO(
"There is enough space to submit more jobs.")
1496 def qstat(cls, username="", job_ids=None):
1498 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1499 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1502 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1503 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1504 This one should work for Melbourne's cloud computing centre.
1507 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1508 will be in the output dictionary.
1509 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1510 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1511 matching the other filters will be returned.
1514 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1516 .. code-block:: python
1527 B2DEBUG(29, f
"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1530 job_ids = set(job_ids)
1531 cmd_list = [
"qstat",
"-x"]
1533 cmd =
" ".join(cmd_list)
1534 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1535 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1536 jobs_dict = {
"NJOBS": 0,
"JOBS": []}
1537 jobs_xml = ET.fromstring(output)
1540 if len(job_ids) == 1:
1541 job_elem = jobs_xml.find(f
"./Job[Job_Id='{list(job_ids)[0]}']")
1544 jobs_dict[
"NJOBS"] = 1
1549 for job
in jobs_xml.iterfind(
"Job"):
1550 job_owner = job.find(
"Job_Owner").text.split(
"@")[0]
1551 if username
and username != job_owner:
1553 job_id = job.find(
"Job_Id").text
1554 if job_ids
and job_id
not in job_ids:
1557 jobs_dict[
"NJOBS"] += 1
1559 if job_id
in job_ids:
1560 job_ids.remove(job_id)
1566 Creates a Job dictionary with various job information from the XML element returned by qstat.
1569 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1572 dict: JSON serialisable dictionary of the Job information we are interested in.
1575 job_dict[
"Job_Id"] = job_elem.find(
"Job_Id").text
1576 job_dict[
"Job_Name"] = job_elem.find(
"Job_Name").text
1577 job_dict[
"Job_Owner"] = job_elem.find(
"Job_Owner").text
1578 job_dict[
"job_state"] = job_elem.find(
"job_state").text
1579 job_dict[
"queue"] = job_elem.find(
"queue").text
1585 Backend for submitting calibration processes to a qsub batch system.
1588 cmd_wkdir =
"#BSUB -cwd"
1590 cmd_stdout =
"#BSUB -o"
1592 cmd_stderr =
"#BSUB -e"
1594 cmd_queue =
"#BSUB -q"
1596 cmd_name =
"#BSUB -J"
1598 submission_cmds = [
"bsub",
"-env",
"\"none\"",
"<"]
1600 default_global_job_limit = 15000
1602 default_backend_args = {
"queue":
"s"}
1605 super().
__init__(backend_args=backend_args)
1609 Adds LSF BSUB directives for the job to a script.
1611 job_backend_args = {**self.
backend_argsbackend_args, **job.backend_args}
1612 batch_queue = job_backend_args[
"queue"]
1613 print(
"#!/bin/bash", file=batch_file)
1614 print(
"# --- Start LSF ---", file=batch_file)
1615 print(
" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1616 print(
" ".join([LSF.cmd_name, job.name]), file=batch_file)
1617 print(
" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1618 print(
" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1619 print(
" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1620 print(
"# --- End LSF ---", file=batch_file)
1626 submission_cmd.append(script_path.as_posix())
1627 submission_cmd =
" ".join(submission_cmd)
1628 return [submission_cmd]
1633 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1635 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1640 Simple class to help monitor status of jobs submitted by LSF Backend.
1642 You pass in a `Job` object and job id from a bsub command.
1643 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1647 backend_code_to_status = {
"RUN":
"running",
1648 "DONE":
"completed",
1649 "FINISHED":
"completed",
1656 Pass in the job object and the job id to allow the result to do monitoring and perform
1657 post processing of the job.
1665 Update the job's (or subjobs') status by calling bjobs.
1667 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
1669 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"])
1670 if self.
jobjob.subjobs:
1671 for subjob
in self.
jobjob.subjobs.values():
1672 subjob.result._update_result_status(bjobs_output)
1679 bjobs_output (dict): The JSON output of a previous call to bjobs which we can re-use to find the
1680 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1681 'id' information, otherwise it is useless.
1689 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"], job_id=str(self.
job_idjob_id))
1697 except FileNotFoundError:
1700 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1703 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1706 backend_status =
"EXIT"
1708 backend_status =
"FINISHED"
1711 except KeyError
as err:
1712 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1714 if new_job_status != self.
jobjob.status:
1715 self.
jobjob.status = new_job_status
1717 def _get_status_from_output(self, output):
1718 if output[
"JOBS"]
and "ERROR" in output[
"JOBS"][0]:
1719 if output[
"JOBS"][0][
"ERROR"] == f
"Job <{self.job_id}> is not found":
1720 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1722 raise BackendError(f
"Unidentified Error during status check for {self.job}: {output}")
1724 for job_info
in output[
"JOBS"]:
1725 if job_info[
"JOBID"] == self.
job_idjob_id:
1726 return job_info[
"STAT"]
1728 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1732 parent.result = cls.
LSFResultLSFResult(parent,
None)
1738 m = re.search(
r"Job <(\d+)>", str(batch_output))
1742 raise BackendError(f
"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1744 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1745 job.result = cls.
LSFResultLSFResult(job, job_id)
1749 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1750 Returns True if the number is lower that the limit, False if it is higher.
1753 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1754 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1755 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1756 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1757 and check again before submitting more.
1759 B2DEBUG(29,
"Calling LSF().can_submit()")
1760 job_info = self.
bjobsbjobs(output_fields=[
"stat"])
1761 total_jobs = job_info[
"NJOBS"]
1762 B2INFO(f
"Total jobs active in the LSF system is currently {total_jobs}")
1764 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1767 B2INFO(
"There is enough space to submit more jobs.")
1771 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1773 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1774 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1777 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1778 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1779 the output of this function will be only information about this job. If this argument is not given, then all jobs
1780 matching the other filters will be returned.
1781 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1782 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1783 receive job information from all known user jobs matching the other filters.
1784 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1787 dict: JSON dictionary of the form:
1789 .. code-block:: python
1792 "NJOBS":<njobs returned by command>,
1795 <output field: value>, ...
1800 B2DEBUG(29, f
"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1802 if not output_fields:
1803 output_fields = [
"id"]
1805 field_list_cmd =
"\""
1806 field_list_cmd +=
" ".join(output_fields)
1807 field_list_cmd +=
"\""
1808 cmd_list = [
"bjobs",
"-o", field_list_cmd]
1811 cmd_list.extend([
"-q", queue])
1814 cmd_list.extend([
"-u", username])
1816 cmd_list.append(
"-json")
1819 cmd_list.append(job_id)
1821 cmd =
" ".join(cmd_list)
1822 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1823 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True))
1824 output[
"NJOBS"] = output[
"JOBS"]
1825 output[
"JOBS"] = output[
"RECORDS"]
1826 del output[
"RECORDS"]
1827 del output[
"COMMAND"]
1831 def bqueues(cls, output_fields=None, queues=None):
1833 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1834 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1837 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1838 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1839 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1840 By default you will receive information about all queues.
1843 dict: JSON dictionary of the form:
1845 .. code-block:: python
1848 "COMMAND":"bqueues",
1852 "QUEUE_NAME":"b2_beast",
1853 "STATUS":"Open:Active",
1861 B2DEBUG(29, f
"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1863 if not output_fields:
1864 output_fields = [
"queue_name",
"status",
"max",
"njobs",
"pend",
"run"]
1866 field_list_cmd =
"\""
1867 field_list_cmd +=
" ".join(output_fields)
1868 field_list_cmd +=
"\""
1869 cmd_list = [
"bqueues",
"-o", field_list_cmd]
1871 cmd_list.append(
"-json")
1874 cmd_list.extend(queues)
1876 cmd =
" ".join(cmd_list)
1877 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1878 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1879 return decode_json_string(output)
1884 Backend for submitting calibration processes to a HTCondor batch system.
1887 batch_submit_script =
"submit.sub"
1889 submission_cmds = [
"condor_submit",
"-terse"]
1891 default_global_job_limit = 10000
1893 default_backend_args = {
1894 "universe":
"vanilla",
1896 "request_memory":
"4 GB",
1901 default_class_ads = [
"GlobalJobId",
"JobStatus",
"Owner"]
1905 Fill HTCondor submission file.
1909 files_to_transfer = [i.as_posix()
for i
in job.working_dir.iterdir()]
1911 job_backend_args = {**self.
backend_argsbackend_args, **job.backend_args}
1913 with open(submit_file_path,
"w")
as submit_file:
1914 print(f
'executable = {self.get_submit_script_path(job)}', file=submit_file)
1915 print(f
'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1916 print(f
'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1917 print(f
'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1918 print(
'transfer_input_files = ',
','.join(files_to_transfer), file=submit_file)
1919 print(f
'universe = {job_backend_args["universe"]}', file=submit_file)
1920 print(f
'getenv = {job_backend_args["getenv"]}', file=submit_file)
1921 print(f
'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1922 print(
'should_transfer_files = Yes', file=submit_file)
1923 print(
'when_to_transfer_output = ON_EXIT', file=submit_file)
1925 for line
in job_backend_args[
"extra_lines"]:
1926 print(line, file=submit_file)
1927 print(
'queue', file=submit_file)
1931 For HTCondor leave empty as the directives are already included in the submit file.
1933 print(
'#!/bin/bash', file=batch_file)
1939 submission_cmd.append(script_path.as_posix())
1940 return submission_cmd
1944 Construct the Path object of the .sub file that we will use to describe the job.
1951 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1953 job_dir = Path(cmd[-1]).parent.as_posix()
1960 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, cwd=job_dir)
1962 except subprocess.CalledProcessError
as e:
1965 B2ERROR(f
"Error during condor_submit: {str(e)} occurred more than 3 times.")
1968 B2ERROR(f
"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
1970 return sub_out.split()[0]
1974 Simple class to help monitor status of jobs submitted by HTCondor Backend.
1976 You pass in a `Job` object and job id from a condor_submit command.
1977 When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
1978 to see whether or not the job has finished.
1982 backend_code_to_status = {0:
"submitted",
1993 Pass in the job object and the job id to allow the result to do monitoring and perform
1994 post processing of the job.
2002 Update the job's (or subjobs') status by calling condor_q.
2004 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
2006 condor_q_output = HTCondor.condor_q()
2007 if self.
jobjob.subjobs:
2008 for subjob
in self.
jobjob.subjobs.values():
2009 subjob.result._update_result_status(condor_q_output)
2015 In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2016 If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2017 ``condor_history``, etc.
2020 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can re-use to find the
2021 status of this job if it was active when that command ran.
2023 B2DEBUG(29, f
"Calling {self.job}.result._update_result_status()")
2025 for job_record
in condor_q_output[
"JOBS"]:
2026 job_id = job_record[
"GlobalJobId"].split(
"#")[1]
2027 if job_id == self.
job_idjob_id:
2028 B2DEBUG(29, f
"Found {self.job_id} in condor_q_output.")
2029 jobs_info.append(job_record)
2035 except FileNotFoundError:
2038 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2041 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
2044 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2046 jobs_info = [{
"JobStatus": 4,
"HoldReason":
None}]
2050 jobs_info = HTCondor.condor_q(job_id=self.
job_idjob_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2056 jobs_info = HTCondor.condor_history(job_id=self.
job_idjob_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2058 hold_reason =
"No Reason Known"
2062 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2064 job_info = jobs_info[0]
2065 backend_status = job_info[
"JobStatus"]
2067 if backend_status == 5:
2068 hold_reason = job_info.get(
"HoldReason",
None)
2069 B2WARNING(f
"{self.job} on hold because of {hold_reason}. Keep waiting.")
2073 except KeyError
as err:
2074 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
2075 if new_job_status != self.
jobjob.status:
2076 self.
jobjob.status = new_job_status
2082 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
2091 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2092 Returns True if the number is lower that the limit, False if it is higher.
2095 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2096 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2097 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2098 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2099 and check again before submitting more.
2101 B2DEBUG(29,
"Calling HTCondor().can_submit()")
2102 jobs_info = self.
condor_qcondor_q()
2103 total_jobs = jobs_info[
"NJOBS"]
2104 B2INFO(f
"Total jobs active in the HTCondor system is currently {total_jobs}")
2106 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2109 B2INFO(
"There is enough space to submit more jobs.")
2113 def condor_q(cls, class_ads=None, job_id="", username=""):
2115 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2116 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2117 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2120 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2121 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2122 by the condor_q call.
2123 job_id (str): String representation of the Job ID given by condor_submit during submission.
2124 If this argument is given then the output of this function will be only information about this job.
2125 If this argument is not given, then all jobs matching the other filters will be returned.
2126 username (str): By default we return information about only the current user's jobs. By giving
2127 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2128 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2129 so it isn't recommended.
2132 dict: JSON dictionary of the form:
2134 .. code-block:: python
2137 "NJOBS":<number of records returned by command>,
2140 <ClassAd: value>, ...
2145 B2DEBUG(29, f
"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2149 field_list_cmd =
",".join(class_ads)
2150 cmd_list = [
"condor_q",
"-json",
"-attributes", field_list_cmd]
2153 cmd_list.append(job_id)
2156 username = os.environ[
"USER"]
2158 if username ==
"all":
2159 cmd_list.append(
"-allusers")
2161 cmd_list.append(username)
2163 cmd =
" ".join(cmd_list)
2164 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2167 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2168 except BaseException:
2172 records = decode_json_string(records)
2175 jobs_info = {
"JOBS": records}
2176 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2182 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2183 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2184 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2187 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2188 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2189 by the condor_q call.
2190 job_id (str): String representation of the Job ID given by condor_submit during submission.
2191 If this argument is given then the output of this function will be only information about this job.
2192 If this argument is not given, then all jobs matching the other filters will be returned.
2193 username (str): By default we return information about only the current user's jobs. By giving
2194 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2195 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2196 and isn't recommended.
2199 dict: JSON dictionary of the form:
2201 .. code-block:: python
2204 "NJOBS":<number of records returned by command>,
2207 <ClassAd: value>, ...
2212 B2DEBUG(29, f
"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2216 field_list_cmd =
",".join(class_ads)
2217 cmd_list = [
"condor_history",
"-json",
"-attributes", field_list_cmd]
2220 cmd_list.append(job_id)
2223 username = os.environ[
"USER"]
2225 if username !=
"all":
2226 cmd_list.append(username)
2228 cmd =
" ".join(cmd_list)
2229 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2231 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2232 except BaseException:
2236 records = decode_json_string(records)
2240 jobs_info = {
"JOBS": records}
2241 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2247 Backend for submitting calibration processes to the grid.
2251 class BackendError(Exception):
2253 Base exception class for Backend classes.
2259 Base exception class for Job objects.
2265 Base exception class for SubjobSplitter objects.
generator_function
Generator function that has not been 'primed'.
args
Positional argument tuple used to 'prime' the ArgumentsGenerator.generator_function.
kwargs
Keyword argument dictionary used to 'prime' the ArgumentsGenerator.generator_function.
def __init__(self, generator_function, *args, **kwargs)
max_subjobs
If we try to create more than this many subjobs we throw an exception, if None then there is no maxim...
def create_subjobs(self, job)
def __init__(self, *arguments_generator=None, max_subjobs=None)
def get_submit_script_path(self, job)
def _create_parent_job_result(cls, parent)
def __init__(self, *backend_args=None)
dictionary default_backend_args
Default backend_args.
def _add_setup(job, batch_file)
def _add_wrapper_script_setup(self, job, batch_file)
backend_args
The backend args that will be applied to jobs unless the job specifies them itself.
string submit_script
Default submission script name.
def _add_wrapper_script_teardown(self, job, batch_file)
int default_global_job_limit
Default global limit on the total number of submitted/running jobs that the user can have.
sleep_between_submission_checks
Seconds we wait before checking if we can submit a list of jobs.
def _create_cmd(self, job)
def can_submit(self, *args, **kwargs)
def submit(self, job, check_can_submit=True, jobs_per_check=100)
list submission_cmds
Shell command to submit a script, should be implemented in the derived class.
def _make_submit_file(self, job, submit_file_path)
def _submit_to_batch(cls, cmd)
def __init__(self, *backend_args=None)
def _add_batch_directives(self, job, file)
def _(self, job, check_can_submit=True, jobs_per_check=100)
int default_sleep_between_submission_checks
Default time betweeon re-checking if the active jobs is below the global job limit.
global_job_limit
The active job limit.
def _create_job_result(cls, job, batch_output)
def get_batch_submit_script_path(self, job)
def __init__(self, job, job_id)
job_id
job id given by HTCondor
dictionary backend_code_to_status
HTCondor statuses mapped to Job statuses.
def _update_result_status(self, condor_q_output)
def condor_q(cls, class_ads=None, job_id="", username="")
def can_submit(self, njobs=1)
string batch_submit_script
HTCondor batch script (different to the wrapper script of Backend.submit_script)
def _create_job_result(cls, job, job_id)
list submission_cmds
Batch submission commands for HTCondor.
def _make_submit_file(self, job, submit_file_path)
def _submit_to_batch(cls, cmd)
def condor_history(cls, class_ads=None, job_id="", username="")
def _create_parent_job_result(cls, parent)
def _add_batch_directives(self, job, batch_file)
def _create_cmd(self, script_path)
list default_class_ads
Default ClassAd attributes to return from commands like condor_q.
def get_batch_submit_script_path(self, job)
status
Not a real attribute, it's a property.
def _get_overall_status_from_subjobs(self)
def check_input_data_files(self)
def input_sandbox_files(self, value)
def input_sandbox_files(self)
input_files
Input files to job (str), a list of these is copied to the working directory.
def create_subjob(self, i, input_files=None, args=None)
working_dir
Working directory of the job (pathlib.Path).
def output_dir(self, value)
def working_dir(self, value)
def dump_to_json(self, file_path)
subjobs
dict of subjobs assigned to this job
def dump_input_data(self)
args
The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own ar...
setup_cmds
Bash commands to run before the main self.cmd (mainly used for batch system setup)
def input_files(self, value)
output_patterns
Files that we produce during the job and want to be returned.
result
The result object of this Job.
splitter
The SubjobSplitter used to create subjobs if necessary.
def __init__(self, name, job_dict=None)
def append_current_basf2_setup_cmds(self)
_status
The actual status of the overall Job.
def copy_input_sandbox_files_to_working_dir(self)
backend_args
Config dictionary for the backend to use when submitting the job.
input_sandbox_files
Files to be copied directly into the working directory (pathlib.Path).
cmd
Command and arguments as a list that wil be run by the job on the backend.
dictionary statuses
Allowed Job status dictionary.
output_dir
Output directory (pathlib.Path), where we will download our output_files to.
def __init__(self, job, job_id)
job_id
job id given by LSF
def _update_result_status(self, bjobs_output)
dictionary backend_code_to_status
LSF statuses mapped to Job statuses.
def _get_status_from_output(self, output)
def can_submit(self, njobs=1)
def bjobs(cls, output_fields=None, job_id="", username="", queue="")
list submission_cmds
Shell command to submit a script.
def _submit_to_batch(cls, cmd)
def _create_parent_job_result(cls, parent)
def __init__(self, *backend_args=None)
def _add_batch_directives(self, job, batch_file)
def _create_cmd(self, script_path)
def bqueues(cls, output_fields=None, queues=None)
def _create_job_result(cls, job, batch_output)
def __init__(self, job, result)
def _update_result_status(self)
result
The underlying result from the backend.
pool
The actual Pool object of this instance of the Backend.
_max_processes
Internal attribute of max_processes.
def _create_parent_job_result(cls, parent)
def max_processes(self, value)
max_processes
The size of the multiprocessing process pool.
def __init__(self, *backend_args=None, max_processes=1)
def run_job(name, working_dir, output_dir, script)
def __init__(self, *arguments_generator=None, max_files_per_subjob=1)
max_files_per_subjob
The maximium number of input files that will be used for each SubJob created.
def create_subjobs(self, job)
def __init__(self, *arguments_generator=None, max_subjobs=1000)
max_subjobs
The maximum number of SubJob objects to be created, input files are split evenly between them.
def create_subjobs(self, job)
def __init__(self, job, job_id)
job_id
job id given by PBS
dictionary backend_code_to_status
PBS statuses mapped to Job statuses.
def _update_result_status(self, qstat_output)
def _get_status_from_output(self, output)
def can_submit(self, njobs=1)
def create_job_record_from_element(job_elem)
list submission_cmds
Shell command to submit a script.
def _submit_to_batch(cls, cmd)
def _create_parent_job_result(cls, parent)
def __init__(self, *backend_args=None)
def _add_batch_directives(self, job, batch_file)
def _create_cmd(self, script_path)
def qstat(cls, username="", job_ids=None)
def _create_job_result(cls, job, batch_output)
def get_exit_code_from_file(self)
job
Job object for result.
_is_ready
Quicker way to know if it's ready once it has already been found.
exit_code_file_initial_time
Time we started waiting for the exit code file to appear.
time_to_wait_for_exit_code_file
After our first attempt to view the exit code file once the job is 'finished', how long should we wai...
input_files
Input files specific to this subjob.
parent
Job() instance of parent to this SubJob.
def __init__(self, job, subjob_id, input_files=None)
args
Arguments specific to this SubJob.
result
The result object of this SubJob.
def __getattr__(self, attribute)
_status
Status of the subjob.
def __init__(self, *arguments_generator=None)
def assign_arguments(self, job)
arguments_generator
The ArgumentsGenerator used when creating subjobs.
def create_subjobs(self, job)