14 from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
17 from abc
import ABC, abstractmethod
19 import xml.etree.ElementTree
as ET
21 from pathlib
import Path
22 from collections
import deque
23 from itertools
import count, takewhile
26 from datetime
import datetime, timedelta
28 import multiprocessing
as mp
30 from caf.utils
import method_dispatch
31 from caf.utils
import decode_json_string
32 from caf.utils
import grouper
33 from caf.utils
import parse_file_uri
36 __all__ = [
"Job",
"SubJob",
"Backend",
"Local",
"Batch",
"LSF",
"PBS",
"HTCondor",
"get_input_data"]
39 _input_data_file_path = Path(
"__BACKEND_INPUT_FILES__.json")
41 _STDOUT_FILE =
"stdout"
43 _STDERR_FILE =
"stderr"
45 _backend_job_envvars = (
49 "BELLE2_EXTERNALS_TOPDIR",
50 "BELLE2_CONDB_METADATA",
57 Simple JSON load of the default input data file. Will contain a list of string file paths
58 for use by the job process.
60 with open(_input_data_file_path)
as input_data_file:
61 input_data = json.load(input_data_file)
65 def monitor_jobs(args, jobs):
66 unfinished_jobs = jobs[:]
68 while unfinished_jobs:
69 B2INFO(
"Updating statuses of unfinished jobs...")
70 for j
in unfinished_jobs:
72 B2INFO(
"Checking if jobs are ready...")
73 for j
in unfinished_jobs[:]:
75 if j.status ==
"failed":
76 B2ERROR(f
"{j} is failed")
79 B2INFO(f
"{j} is finished")
80 unfinished_jobs.remove(j)
82 B2INFO(f
"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
83 time.sleep(args.heartbeat)
85 B2ERROR(f
"{failed_jobs} jobs failed")
87 B2INFO(
'All jobs finished successfully')
90 class ArgumentsGenerator():
91 def __init__(self, generator_function, *args, **kwargs):
93 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
94 initialise it. This lets us re-use a generator by setting it up again fresh. This is not
95 optimal for expensive calculations, but it is nice for making large sequences of
96 Job input arguments on the fly.
99 generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
100 should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
101 yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
102 args (tuple): The positional arguments you want to send into the intialisation of the generator.
103 kwargs (dict): The keyword arguments you want to send into the intialisation of the generator.
106 self.generator_function = generator_function
116 generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
117 to have ``next``/``send`` called on it.
119 gen = self.generator_function(*self.args, **self.kwargs)
124 def range_arguments(start=0, stop=None, step=1):
126 A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
127 It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
128 The `SubJob` object is sent into this function with `send` but is not used.
131 start (int): The starting value that will be returned.
132 stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
134 step (int): The step size.
140 if stop
is not None and x >= stop:
146 subjob = (
yield None)
148 for i
in takewhile(
lambda x:
not should_stop(x), count(start, step)):
150 B2DEBUG(29, f
"{subjob} arguments will be {args}")
151 subjob = (
yield args)
154 class SubjobSplitter(ABC):
156 Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
157 The `create_subjobs` function should be implemented and used to construct
158 the subjobs of the parent job object.
161 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
162 tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
163 called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
167 def __init__(self, *, arguments_generator=None):
169 Derived classes should call `super` to run this.
172 self.arguments_generator = arguments_generator
175 def create_subjobs(self, job):
177 Implement this method in derived classes to generate the `SubJob` objects.
180 def assign_arguments(self, job):
182 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
185 if self.arguments_generator:
186 arg_gen = self.arguments_generator.generator
188 for subjob
in sorted(job.subjobs.values(), key=
lambda sj: sj.id):
191 args = arg_gen.send(subjob)
192 except StopIteration:
193 B2ERROR(f
"StopIteration called when getting args for {subjob}, "
194 "setting all subsequent subjobs to have empty argument tuples.")
199 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
203 B2INFO(f
"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
204 "won't automatically have arguments assigned.")
207 return f
"{self.__class__.__name__}"
210 class MaxFilesSplitter(SubjobSplitter):
212 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
215 max_files_per_subjob (int): The maximium number of input files used per `SubJob` created.
217 super().__init__(arguments_generator=arguments_generator)
219 self.max_files_per_subjob = max_files_per_subjob
221 def create_subjobs(self, job):
223 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
224 in order to prevent the number of input files per subjob going over the limit set by
225 `MaxFilesSplitter.max_files_per_subjob`.
227 if not job.input_files:
228 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
231 for i, subjob_input_files
in enumerate(grouper(self.max_files_per_subjob, job.input_files)):
232 job.create_subjob(i, input_files=subjob_input_files)
234 self.assign_arguments(job)
236 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
239 class MaxSubjobsSplitter(SubjobSplitter):
241 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
244 max_subjobs (int): The maximium number ofsubjobs that will be created.
246 super().__init__(arguments_generator=arguments_generator)
248 self.max_subjobs = max_subjobs
250 def create_subjobs(self, job):
252 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
253 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
254 more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
255 respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
256 will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
258 if not job.input_files:
259 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
263 remaining_input_files = deque(job.input_files)
265 available_subjobs = self.max_subjobs
267 while remaining_input_files:
269 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
271 subjob_input_files = []
272 for i
in range(num_input_files):
273 subjob_input_files.append(remaining_input_files.popleft())
275 job.create_subjob(subjob_i, input_files=subjob_input_files)
277 available_subjobs -= 1
279 self.assign_arguments(job)
280 B2INFO(f
"{self} created {subjob_i} Subjobs for {job}")
283 class ArgumentsSplitter(SubjobSplitter):
285 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
286 Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
287 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
289 This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
290 numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
291 assinged to every `SubJob`.
294 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
297 def __init__(self, *, arguments_generator=None, max_subjobs=None):
300 super().__init__(arguments_generator=arguments_generator)
302 self.max_subjobs = max_subjobs
304 def create_subjobs(self, job):
306 This function creates subjobs for the parent job passed in. It creates subjobs until the
307 `SubjobSplitter.arguments_generator` finishes.
309 If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
312 arg_gen = self.arguments_generator.generator
315 if i >= self.max_subjobs:
316 raise SplitterError(f
"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
318 subjob = SubJob(job, i, job.input_files)
319 args = arg_gen.send(subjob)
320 B2INFO(f
"Creating {job}.{subjob}")
321 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
323 job.subjobs[i] = subjob
324 except StopIteration:
326 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
331 This generic Job object is used to tell a Backend what to do.
332 This object basically holds necessary information about a process you want to submit to a `Backend`.
333 It should *not* do anything that is backend specific, just hold the configuration for a job to be
334 successfully submitted and monitored using a backend. The result attribute is where backend
335 specific job monitoring goes.
338 name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
340 .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
345 statuses = {
"init": 0,
"submitted": 1,
"running": 2,
"failed": 3,
"completed": 4}
348 exit_statuses = [
"failed",
"completed"]
350 def __init__(self, name, job_dict=None):
361 self.input_sandbox_files = []
363 self.working_dir = Path()
365 self.output_dir = Path()
367 self.output_patterns = []
373 self.input_files = []
378 self.backend_args = {}
382 self.input_sandbox_files = [Path(p)
for p
in job_dict[
"input_sandbox_files"]]
383 self.working_dir = Path(job_dict[
"working_dir"])
384 self.output_dir = Path(job_dict[
"output_dir"])
385 self.output_patterns = job_dict[
"output_patterns"]
386 self.cmd = job_dict[
"cmd"]
387 self.args = job_dict[
"args"]
388 self.input_files = job_dict[
"input_files"]
389 self.setup_cmds = job_dict[
"setup_cmds"]
390 self.backend_args = job_dict[
"backend_args"]
392 for subjob_dict
in job_dict[
"subjobs"]:
393 self.create_subjob(subjob_dict[
"id"], input_files=subjob_dict[
"input_files"], args=subjob_dict[
"args"])
399 self._status =
"init"
403 Representation of Job class (what happens when you print a Job() instance).
405 return f
"Job({self.name})"
409 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
410 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
411 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
414 B2DEBUG(29, f
"You requested the ready() status for {self} but there is no result object set, returning False.")
417 return self.result.ready()
419 def update_status(self):
421 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
422 in the best way for the type of result object/backend.
425 B2DEBUG(29, f
"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
427 self.result.update_status()
430 def create_subjob(self, i, input_files=None, args=None):
432 Creates a subjob Job object that references that parent Job.
433 Returns the SubJob object at the end.
435 if i
not in self.subjobs:
436 B2INFO(f
"Creating {self}.Subjob({i})")
437 subjob = SubJob(self, i, input_files)
440 self.subjobs[i] = subjob
443 B2WARNING(f
"{self} already contains SubJob({i})! This will not be created.")
448 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
449 subjob status in the hierarchy of statuses in `Job.statuses`.
452 job_status = self._get_overall_status_from_subjobs()
453 if job_status != self._status:
455 self.status = job_status
458 def _get_overall_status_from_subjobs(self):
459 subjob_statuses = [subjob.status
for subjob
in self.subjobs.values()]
460 status_level = min([self.statuses[status]
for status
in subjob_statuses])
461 for status, level
in self.statuses.items():
462 if level == status_level:
466 def status(self, status):
468 Sets the status of this Job.
471 if status ==
'failed':
472 B2ERROR(f
"Setting {self.name} status to failed")
474 B2INFO(f
"Setting {self.name} status to {status}")
475 self._status = status
478 def output_dir(self):
479 return self._output_dir
482 def output_dir(self, value):
483 self._output_dir = Path(value).absolute()
486 def working_dir(self):
487 return self._working_dir
490 def working_dir(self, value):
491 self._working_dir = Path(value).absolute()
494 def input_sandbox_files(self):
495 return self._input_sandbox_files
497 @input_sandbox_files.setter
498 def input_sandbox_files(self, value):
499 self._input_sandbox_files = [Path(p).absolute()
for p
in value]
502 def input_files(self):
503 return self._input_files
506 def input_files(self, value):
507 self._input_files = value
510 def max_subjobs(self):
511 return self.splitter.max_subjobs
514 def max_subjobs(self, value):
515 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
516 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
519 def max_files_per_subjob(self):
520 return self.splitter.max_files_per_subjob
522 @max_files_per_subjob.setter
523 def max_files_per_subjob(self, value):
524 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
525 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
527 def dump_to_json(self, file_path):
529 Dumps the Job object configuration to a JSON file so that it can be read in again later.
532 file_path(`basf2.Path`): The filepath we'll dump to
534 with open(file_path, mode=
"w")
as job_file:
535 json.dump(self.job_dict, job_file, indent=2)
538 def from_json(cls, file_path):
539 with open(file_path)
as job_file:
540 job_dict = json.load(job_file)
541 return cls(job_dict[
"name"], job_dict=job_dict)
547 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
548 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
551 job_dict[
"name"] = self.name
552 job_dict[
"input_sandbox_files"] = [i.as_posix()
for i
in self.input_sandbox_files]
553 job_dict[
"working_dir"] = self.working_dir.as_posix()
554 job_dict[
"output_dir"] = self.output_dir.as_posix()
555 job_dict[
"output_patterns"] = self.output_patterns
556 job_dict[
"cmd"] = self.cmd
557 job_dict[
"args"] = self.args
558 job_dict[
"input_files"] = self.input_files
559 job_dict[
"setup_cmds"] = self.setup_cmds
560 job_dict[
"backend_args"] = self.backend_args
561 job_dict[
"subjobs"] = [sj.job_dict
for sj
in self.subjobs.values()]
564 def dump_input_data(self):
566 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
569 with open(Path(self.working_dir, _input_data_file_path), mode=
"w")
as input_data_file:
570 json.dump(self.input_files, input_data_file, indent=2)
572 def copy_input_sandbox_files_to_working_dir(self):
574 Get all of the requested files for the input sandbox and copy them to the working directory.
575 Files like the submit.sh or input_data.json are not part of this process.
577 for file_path
in self.input_sandbox_files:
578 if file_path.is_dir():
579 shutil.copytree(file_path, Path(self.working_dir, file_path.name))
581 shutil.copy(file_path, self.working_dir)
583 def check_input_data_files(self):
585 Check the input files and make sure that there aren't any duplicates.
586 Also check if the files actually exist if possible.
588 existing_input_files = []
589 for file_path
in self.input_files:
590 file_uri = parse_file_uri(file_path)
591 if file_uri.scheme ==
"file":
592 p = Path(file_uri.path)
594 if file_uri.geturl()
not in existing_input_files:
595 existing_input_files.append(file_uri.geturl())
597 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
599 B2WARNING(f
"Requested input file path {file_path} does not exist, skipping it.")
601 B2DEBUG(29, f
"{file_path} is not a local file URI. Skipping checking if file exists")
602 if file_path
not in existing_input_files:
603 existing_input_files.append(file_path)
605 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
606 if self.input_files
and not existing_input_files:
607 B2WARNING(f
"No valid input file paths found for {self.name}, but some were requested.")
610 self.input_files = existing_input_files
613 def full_command(self):
616 str: The full command that this job will run including any arguments.
618 all_components = self.cmd[:]
619 all_components.extend(self.args)
621 full_command =
" ".join(map(str, all_components))
622 B2DEBUG(29, f
"Full command of {self} is '{full_command}'")
625 def append_current_basf2_setup_cmds(self):
627 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
628 It should detect if you are using a local release or CVMFS and append the correct commands
629 so that the job will have the same basf2 release environment. It should also detect
630 if a local release is not compiled with the ``opt`` option.
632 Note that this *doesn't mean that every environment variable is inherited* from the submitting
635 def append_environment_variable(cmds, envvar):
637 Append a command for setting an environment variable.
639 if envvar
in os.environ:
640 cmds.append(f
"""if [ -z "${{{envvar}}}" ]; then""")
641 cmds.append(f
" export {envvar}={os.environ[envvar]}")
644 if "BELLE2_TOOLS" not in os.environ:
645 raise BackendError(
"No BELLE2_TOOLS found in environment")
647 for envvar
in _backend_job_envvars:
648 append_environment_variable(self.setup_cmds, envvar)
649 if "BELLE2_RELEASE" in os.environ:
650 self.setup_cmds.append(f
"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
651 elif 'BELLE2_LOCAL_DIR' in os.environ:
652 self.setup_cmds.append(
"export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
653 self.setup_cmds.append(f
"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
654 self.setup_cmds.append(f
"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
655 self.setup_cmds.append(f
"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
656 self.setup_cmds.append(
"pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
657 self.setup_cmds.append(
"source $BACKEND_B2SETUP")
659 self.setup_cmds.append(
"b2code-option $BACKEND_BELLE2_OPTION")
660 self.setup_cmds.append(
"popd > /dev/null")
665 This mini-class simply holds basic information about which subjob you are
666 and a reference to the parent Job object to be able to access the main data there.
667 Rather than replicating all of the parent job's configuration again.
670 def __init__(self, job, subjob_id, input_files=None):
680 self.input_files = input_files
685 self._status =
"init"
690 def output_dir(self):
692 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
693 return Path(self.parent.output_dir, str(self.id))
696 def working_dir(self):
697 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
698 return Path(self.parent.working_dir, str(self.id))
702 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
703 return "_".join((self.parent.name, str(self.id)))
708 Returns the status of this SubJob.
713 def status(self, status):
715 Sets the status of this Job.
718 if status ==
"failed":
719 B2ERROR(f
"Setting {self.name} status to failed")
721 B2INFO(f
"Setting {self.name} status to {status}")
722 self._status = status
727 A subjob cannot have subjobs. Always return empty list.
735 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
736 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
737 we only output the input files and arguments that are specific to this subjob and no other details.
740 job_dict[
"id"] = self.id
741 job_dict[
"input_files"] = self.input_files
742 job_dict[
"args"] = self.args
745 def __getattr__(self, attribute):
747 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
748 unless otherwise specified.
750 return getattr(self.parent, attribute)
755 return f
"SubJob({self.name})"
760 Abstract base class for a valid backend.
761 Classes derived from this will implement their own submission of basf2 jobs
762 to whatever backend they describe.
763 Some common methods/attributes go into this base class.
765 For backend_args the priority from lowest to highest is:
767 backend.default_backend_args -> backend.backend_args -> job.backend_args
770 submit_script =
"submit.sh"
772 exit_code_file =
"__BACKEND_CMD_EXIT_STATUS__"
774 default_backend_args = {}
776 def __init__(self, *, backend_args=None):
779 if backend_args
is None:
782 self.backend_args = {**self.default_backend_args, **backend_args}
785 def submit(self, job):
787 Base method for submitting collection jobs to the backend type. This MUST be
788 implemented for a correctly written backend class deriving from Backend().
792 def _add_setup(job, batch_file):
794 Adds setup lines to the shell script file.
796 for line
in job.setup_cmds:
797 print(line, file=batch_file)
799 def _add_wrapper_script_setup(self, job, batch_file):
801 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
802 `trap` statements for Ctrl-C situations.
804 start_wrapper = f
"""# ---
805 # trap ctrl-c and call ctrl_c()
806 trap '(ctrl_c 130)' SIGINT
807 trap '(ctrl_c 143)' SIGTERM
809 function write_exit_code() {{
810 echo "Writing $1 to exit status file"
811 echo "$1" > {self.exit_code_file}
816 trap '' SIGINT SIGTERM
817 echo "** Trapped Ctrl-C **"
818 echo "$1" > {self.exit_code_file}
822 print(start_wrapper, file=batch_file)
824 def _add_wrapper_script_teardown(self, job, batch_file):
826 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
827 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
828 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
829 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
832 end_wrapper =
"""# ---
833 write_exit_code $?"""
834 print(end_wrapper, file=batch_file)
837 def _create_parent_job_result(cls, parent):
839 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
840 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
841 statuses and allows the use of ready().
843 raise NotImplementedError
845 def get_submit_script_path(self, job):
847 Construct the Path object of the bash script file that we will submit. It will contain
848 the actual job command, wrapper commands, setup commands, and any batch directives
850 return Path(job.working_dir, self.submit_script)
855 Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
856 submitted to a backend. It provides a way to query a job's status to find out if it's ready.
859 def __init__(self, job):
861 Pass in the job object to allow the result to access the job's properties and do post-processing.
866 self._is_ready =
False
869 self.time_to_wait_for_exit_code_file = timedelta(minutes=5)
871 self.exit_code_file_initial_time =
None
875 Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
876 the 'readiness' based on the known job status.
878 B2DEBUG(29, f
"Calling {self.job}.result.ready()")
881 elif self.job.status
in self.job.exit_statuses:
882 self._is_ready =
True
887 def update_status(self):
889 Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
890 actually look up the job's status from some database/exit code file.
892 raise NotImplementedError
894 def get_exit_code_from_file(self):
896 Read the exit code file to discover the exit status of the job command. Useful falback if the job is no longer
897 known to the job database (batch system purged it for example). Since some backends may take time to download
898 the output files of the job back to the working directory we use a time limit on how long to wait.
900 if not self.exit_code_file_initial_time:
901 self.exit_code_file_initial_time = datetime.now()
902 exit_code_path = Path(self.job.working_dir, Backend.exit_code_file)
903 with open(exit_code_path)
as f:
904 exit_code = int(f.read().strip())
905 B2DEBUG(29, f
"Exit code from file for {self.job} was {exit_code}")
909 class Local(Backend):
911 Backend for local processes i.e. on the same machine but in a subprocess.
913 Note that you should call the self.join() method to close the pool and wait for any
914 running processes to finish before exiting the process. Once you've called join you will have to set up a new
915 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
916 somewhere, then the main python process might end before your pool is done.
919 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
920 It's the maximium simultaneous subjobs.
921 Try not to specify a large number or a number larger than the number of cores.
922 It won't crash the program but it will slow down and negatively impact performance.
925 def __init__(self, *, backend_args=None, max_processes=1):
928 super().__init__(backend_args=backend_args)
932 self.max_processes = max_processes
934 class LocalResult(Result):
936 Result class to help monitor status of jobs submitted by Local backend.
939 def __init__(self, job, result):
941 Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
942 post processing of the job.
944 super().__init__(job)
948 def _update_result_status(self):
949 if self.result.ready()
and (self.job.status
not in self.job.exit_statuses):
950 return_code = self.result.get()
952 self.job.status =
"failed"
954 self.job.status =
"completed"
956 def update_status(self):
958 Update the job's (or subjobs') status by calling the result object.
960 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
962 for subjob
in self.job.subjobs.values():
963 subjob.result._update_result_status()
965 self._update_result_status()
969 Closes and joins the Pool, letting you wait for all results currently
972 B2INFO(
"Joining Process Pool, waiting for results to finish...")
975 B2INFO(
"Process Pool joined.")
978 def max_processes(self):
980 Getter for max_processes
982 return self._max_processes
984 @max_processes.setter
985 def max_processes(self, value):
987 Setter for max_processes, we also check for a previous Pool(), wait for it to join
988 and then create a new one with the new value of max_processes
991 self._max_processes = value
993 B2INFO(
"New max_processes requested. But a pool already exists.")
995 B2INFO(f
"Starting up new Pool with {self.max_processes} processes")
996 self.pool = mp.Pool(processes=self.max_processes)
999 def submit(self, job):
1002 raise NotImplementedError(
"This is an abstract submit(job) method that shouldn't have been called. "
1003 "Did you submit a (Sub)Job?")
1005 @submit.register(SubJob)
1008 Submission of a `SubJob` for the Local backend
1011 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1013 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1014 job.copy_input_sandbox_files_to_working_dir()
1015 job.dump_input_data()
1017 script_path = self.get_submit_script_path(job)
1018 with open(script_path, mode=
"w")
as batch_file:
1019 print(
"#!/bin/bash", file=batch_file)
1020 self._add_wrapper_script_setup(job, batch_file)
1021 self._add_setup(job, batch_file)
1022 print(job.full_command, file=batch_file)
1023 self._add_wrapper_script_teardown(job, batch_file)
1024 B2INFO(f
"Submitting {job}")
1025 job.result = Local.LocalResult(job,
1026 self.pool.apply_async(self.run_job,
1033 job.status =
"submitted"
1034 B2INFO(f
"{job} submitted")
1036 @submit.register(Job)
1039 Submission of a `Job` for the Local backend
1042 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1044 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1046 job.check_input_data_files()
1048 if not job.splitter:
1050 job.copy_input_sandbox_files_to_working_dir()
1051 job.dump_input_data()
1053 script_path = self.get_submit_script_path(job)
1054 with open(script_path, mode=
"w")
as batch_file:
1055 print(
"#!/bin/bash", file=batch_file)
1056 self._add_wrapper_script_setup(job, batch_file)
1057 self._add_setup(job, batch_file)
1058 print(job.full_command, file=batch_file)
1059 self._add_wrapper_script_teardown(job, batch_file)
1060 B2INFO(f
"Submitting {job}")
1061 job.result = Local.LocalResult(job,
1062 self.pool.apply_async(self.run_job,
1069 B2INFO(f
"{job} submitted")
1072 job.splitter.create_subjobs(job)
1074 self.submit(list(job.subjobs.values()))
1076 self._create_parent_job_result(job)
1078 @submit.register(list)
1081 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1086 B2INFO(
"All requested jobs submitted.")
1089 def run_job(name, working_dir, output_dir, script):
1091 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1092 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1094 B2INFO(f
"Starting Sub-process: {name}")
1095 from subprocess
import Popen
1096 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1097 stderr_file_path = Path(working_dir, _STDERR_FILE)
1099 B2INFO(f
"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1100 with open(stdout_file_path, mode=
"w", buffering=1)
as f_out, \
1101 open(stderr_file_path, mode=
"w", buffering=1)
as f_err:
1102 with Popen([
"/bin/bash", script.as_posix()],
1106 universal_newlines=
True,
1111 B2INFO(f
"Subprocess {name} finished.")
1115 def _create_parent_job_result(cls, parent):
1116 parent.result = cls.LocalResult(parent,
None)
1119 class Batch(Backend):
1121 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1122 in a derived class. Do not use this class directly!
1125 submission_cmds = []
1138 default_global_job_limit = 1000
1140 default_sleep_between_submission_checks = 30
1142 def __init__(self, *, backend_args=None):
1144 Init method for Batch Backend. Does some basic default setup.
1146 super().__init__(backend_args=backend_args)
1149 self.global_job_limit = self.default_global_job_limit
1152 self.sleep_between_submission_checks = self.default_sleep_between_submission_checks
1154 def _add_batch_directives(self, job, file):
1156 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1157 You should think about where the stdout/err should go, and set the queue name.
1159 raise NotImplementedError(
"Need to implement a _add_batch_directives(self, job, file) "
1160 f
"method in {self.__class__.__name__} backend.")
1162 def _make_submit_file(self, job, submit_file_path):
1164 Useful for the HTCondor backend where a submit is needed instead of batch
1165 directives pasted directly into the submission script. It should be overwritten
1171 def _submit_to_batch(cls, cmd):
1173 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1176 def can_submit(self, *args, **kwargs):
1178 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1179 This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1182 bool: If the job submission can continue based on the current situation.
1187 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1190 raise NotImplementedError(
"This is an abstract submit(job) method that shouldn't have been called. "
1191 "Did you submit a (Sub)Job?")
1193 @submit.register(SubJob)
1194 def _(self, job, check_can_submit=True, jobs_per_check=100):
1196 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1197 create batch script, and send it off with the batch submission command.
1198 It should apply the correct options (default and user requested).
1200 Should set a Result object as an attribute of the job.
1205 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1206 job.copy_input_sandbox_files_to_working_dir()
1207 job.dump_input_data()
1209 batch_submit_script_path = self.get_batch_submit_script_path(job)
1210 self._make_submit_file(job, batch_submit_script_path)
1212 script_path = self.get_submit_script_path(job)
1214 with open(script_path, mode=
"w")
as batch_file:
1215 self._add_batch_directives(job, batch_file)
1216 self._add_wrapper_script_setup(job, batch_file)
1217 self._add_setup(job, batch_file)
1218 print(job.full_command, file=batch_file)
1219 self._add_wrapper_script_teardown(job, batch_file)
1220 os.chmod(script_path, 0o755)
1221 B2INFO(f
"Submitting {job}")
1223 cmd = self._create_cmd(batch_submit_script_path)
1224 output = self._submit_to_batch(cmd)
1225 self._create_job_result(job, output)
1226 job.status =
"submitted"
1227 B2INFO(f
"{job} submitted")
1229 @submit.register(Job)
1230 def _(self, job, check_can_submit=True, jobs_per_check=100):
1232 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1233 and send it off with the batch submission command, applying the correct options (default and user requested.)
1235 Should set a Result object as an attribute of the job.
1240 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1242 job.check_input_data_files()
1248 if not job.splitter:
1250 job.copy_input_sandbox_files_to_working_dir()
1251 job.dump_input_data()
1253 batch_submit_script_path = self.get_batch_submit_script_path(job)
1254 self._make_submit_file(job, batch_submit_script_path)
1256 script_path = self.get_submit_script_path(job)
1258 with open(script_path, mode=
"w")
as batch_file:
1259 self._add_batch_directives(job, batch_file)
1260 self._add_wrapper_script_setup(job, batch_file)
1261 self._add_setup(job, batch_file)
1262 print(job.full_command, file=batch_file)
1263 self._add_wrapper_script_teardown(job, batch_file)
1264 os.chmod(script_path, 0o755)
1265 B2INFO(f
"Submitting {job}")
1267 cmd = self._create_cmd(batch_submit_script_path)
1268 output = self._submit_to_batch(cmd)
1269 self._create_job_result(job, output)
1270 job.status =
"submitted"
1271 B2INFO(f
"{job} submitted")
1274 job.splitter.create_subjobs(job)
1276 self.submit(list(job.subjobs.values()))
1278 self._create_parent_job_result(job)
1280 @submit.register(list)
1281 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1283 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1285 B2INFO(f
"Submitting a list of {len(jobs)} jobs to a Batch backend")
1294 if jobs_per_check > self.global_job_limit:
1295 B2INFO(f
"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1296 f
"limit for this backend (={self.global_job_limit}). Will instead use the "
1297 " value of the global job limit.")
1298 jobs_per_check = self.global_job_limit
1301 for jobs_to_submit
in grouper(jobs_per_check, jobs):
1303 while not self.can_submit(njobs=len(jobs_to_submit)):
1304 B2INFO(
"Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1305 time.sleep(self.sleep_between_submission_checks)
1309 B2INFO(f
"Submitting the next {len(jobs_to_submit)} jobs...")
1310 for job
in jobs_to_submit:
1311 self.submit(job, check_can_submit, jobs_per_check)
1312 B2INFO(f
"All {len(jobs)} requested jobs submitted")
1314 def get_batch_submit_script_path(self, job):
1316 Construct the Path object of the script file that we will submit using the batch command.
1317 For most batch backends this is the same script as the bash script we submit.
1318 But for some they require a separate submission file that describes the job.
1319 To implement that you can implement this function in the Backend class.
1321 return Path(job.working_dir, self.submit_script)
1325 def _create_job_result(cls, job, batch_output):
1330 def _create_cmd(self, job):
1337 Backend for submitting calibration processes to a qsub batch system.
1340 cmd_wkdir =
"#PBS -d"
1342 cmd_stdout =
"#PBS -o"
1344 cmd_stderr =
"#PBS -e"
1346 cmd_queue =
"#PBS -q"
1348 cmd_name =
"#PBS -N"
1350 submission_cmds = [
"qsub"]
1352 default_global_job_limit = 5000
1354 default_backend_args = {
"queue":
"short"}
1356 def __init__(self, *, backend_args=None):
1357 super().__init__(backend_args=backend_args)
1359 def _add_batch_directives(self, job, batch_file):
1361 Add PBS directives to submitted script.
1363 job_backend_args = {**self.backend_args, **job.backend_args}
1364 batch_queue = job_backend_args[
"queue"]
1365 print(
"#!/bin/bash", file=batch_file)
1366 print(
"# --- Start PBS ---", file=batch_file)
1367 print(
" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1368 print(
" ".join([PBS.cmd_name, job.name]), file=batch_file)
1369 print(
" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1370 print(
" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1371 print(
" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1372 print(
"# --- End PBS ---", file=batch_file)
1375 def _create_job_result(cls, job, batch_output):
1378 job_id = batch_output.replace(
"\n",
"")
1379 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1380 job.result = cls.PBSResult(job, job_id)
1382 def _create_cmd(self, script_path):
1385 submission_cmd = self.submission_cmds[:]
1386 submission_cmd.append(script_path.as_posix())
1387 return submission_cmd
1390 def _submit_to_batch(cls, cmd):
1392 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1394 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True)
1398 def _create_parent_job_result(cls, parent):
1399 parent.result = cls.PBSResult(parent,
None)
1401 class PBSResult(Result):
1403 Simple class to help monitor status of jobs submitted by `PBS` Backend.
1405 You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1406 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1410 backend_code_to_status = {
"R":
"running",
1412 "FINISHED":
"completed",
1421 def __init__(self, job, job_id):
1423 Pass in the job object and the job id to allow the result to do monitoring and perform
1424 post processing of the job.
1426 super().__init__(job)
1428 self.job_id = job_id
1430 def update_status(self):
1432 Update the job's (or subjobs') status by calling qstat.
1434 B2DEBUG(29, f
"Calling {self.job}.result.update_status()")
1436 qstat_output = PBS.qstat()
1437 if self.job.subjobs:
1438 for subjob
in self.job.subjobs.values():
1439 subjob.result._update_result_status(qstat_output)
1441 self._update_result_status(qstat_output)
1443 def _update_result_status(self, qstat_output):
1446 qstat_output (dict): The JSON output of a previous call to qstat which we can re-use to find the
1447 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1448 'job_state' information, otherwise it is useless.
1452 backend_status = self._get_status_from_output(qstat_output)
1456 B2DEBUG(29, f
"Checking of the exit code from file for {self.job}")
1458 exit_code = self.get_exit_code_from_file()
1459 except FileNotFoundError:
1460 waiting_time = datetime.now() - self.exit_code_file_initial_time
1461 if self.time_to_wait_for_exit_code_file > waiting_time:
1462 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1465 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1468 backend_status =
"E"
1470 backend_status =
"C"
1473 new_job_status = self.backend_code_to_status[backend_status]
1474 except KeyError
as err:
1475 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1477 if new_job_status != self.job.status:
1478 self.job.status = new_job_status
1480 def _get_status_from_output(self, output):
1481 for job_info
in output[
"JOBS"]:
1482 if job_info[
"Job_Id"] == self.job_id:
1483 return job_info[
"job_state"]
1487 def can_submit(self, njobs=1):
1489 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1490 Returns True if the number is lower that the limit, False if it is higher.
1493 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1494 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1495 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1496 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1497 and check again before submitting more.
1499 B2DEBUG(29,
"Calling PBS().can_submit()")
1500 job_info = self.qstat(username=os.environ[
"USER"])
1501 total_jobs = job_info[
"NJOBS"]
1502 B2INFO(f
"Total jobs active in the PBS system is currently {total_jobs}")
1503 if (total_jobs + njobs) > self.global_job_limit:
1504 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1507 B2INFO(
"There is enough space to submit more jobs.")
1511 def qstat(cls, username="", job_ids=None):
1513 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1514 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1517 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1518 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1519 This one should work for Melbourne's cloud computing centre.
1522 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1523 will be in the output dictionary.
1524 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1525 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1526 matching the other filters will be returned.
1529 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1531 .. code-block:: python
1542 B2DEBUG(29, f
"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1545 job_ids = set(job_ids)
1546 cmd_list = [
"qstat",
"-x"]
1548 cmd =
" ".join(cmd_list)
1549 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1550 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1551 jobs_dict = {
"NJOBS": 0,
"JOBS": []}
1552 jobs_xml = ET.fromstring(output)
1555 if len(job_ids) == 1:
1556 job_elem = jobs_xml.find(f
"./Job[Job_Id='{list(job_ids)[0]}']")
1558 jobs_dict[
"JOBS"].append(cls.create_job_record_from_element(job_elem))
1559 jobs_dict[
"NJOBS"] = 1
1564 for job
in jobs_xml.iterfind(
"Job"):
1565 job_owner = job.find(
"Job_Owner").text.split(
"@")[0]
1566 if username
and username != job_owner:
1568 job_id = job.find(
"Job_Id").text
1569 if job_ids
and job_id
not in job_ids:
1571 jobs_dict[
"JOBS"].append(cls.create_job_record_from_element(job))
1572 jobs_dict[
"NJOBS"] += 1
1574 if job_id
in job_ids:
1575 job_ids.remove(job_id)
1579 def create_job_record_from_element(job_elem):
1581 Creates a Job dictionary with various job information from the XML element returned by qstat.
1584 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1587 dict: JSON serialisable dictionary of the Job information we are interested in.
1590 job_dict[
"Job_Id"] = job_elem.find(
"Job_Id").text
1591 job_dict[
"Job_Name"] = job_elem.find(
"Job_Name").text
1592 job_dict[
"Job_Owner"] = job_elem.find(
"Job_Owner").text
1593 job_dict[
"job_state"] = job_elem.find(
"job_state").text
1594 job_dict[
"queue"] = job_elem.find(
"queue").text
1600 Backend for submitting calibration processes to a qsub batch system.
1603 cmd_wkdir =
"#BSUB -cwd"
1605 cmd_stdout =
"#BSUB -o"
1607 cmd_stderr =
"#BSUB -e"
1609 cmd_queue =
"#BSUB -q"
1611 cmd_name =
"#BSUB -J"
1613 submission_cmds = [
"bsub",
"-env",
"\"none\"",
"<"]
1615 default_global_job_limit = 15000
1617 default_backend_args = {
"queue":
"s"}
1619 def __init__(self, *, backend_args=None):
1620 super().__init__(backend_args=backend_args)
1622 def _add_batch_directives(self, job, batch_file):
1624 Adds LSF BSUB directives for the job to a script.
1626 job_backend_args = {**self.backend_args, **job.backend_args}
1627 batch_queue = job_backend_args[
"queue"]
1628 print(
"#!/bin/bash", file=batch_file)
1629 print(
"# --- Start LSF ---", file=batch_file)
1630 print(
" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1631 print(
" ".join([LSF.cmd_name, job.name]), file=batch_file)
1632 print(
" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1633 print(
" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1634 print(
" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1635 print(
"# --- End LSF ---", file=batch_file)
1637 def _create_cmd(self, script_path):
1640 submission_cmd = self.submission_cmds[:]
1641 submission_cmd.append(script_path.as_posix())
1642 submission_cmd =
" ".join(submission_cmd)
1643 return [submission_cmd]
1646 def _submit_to_batch(cls, cmd):
1648 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1650 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1653 class LSFResult(Result):
1655 Simple class to help monitor status of jobs submitted by LSF Backend.
1657 You pass in a `Job` object and job id from a bsub command.
1658 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1662 backend_code_to_status = {
"RUN":
"running",
1663 "DONE":
"completed",
1664 "FINISHED":
"completed",
1669 def __init__(self, job, job_id):
1671 Pass in the job object and the job id to allow the result to do monitoring and perform
1672 post processing of the job.
1674 super().__init__(job)
1676 self.job_id = job_id
1678 def update_status(self):
1680 Update the job's (or subjobs') status by calling bjobs.
1682 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
1684 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"])
1685 if self.job.subjobs:
1686 for subjob
in self.job.subjobs.values():
1687 subjob.result._update_result_status(bjobs_output)
1689 self._update_result_status(bjobs_output)
1691 def _update_result_status(self, bjobs_output):
1694 bjobs_output (dict): The JSON output of a previous call to bjobs which we can re-use to find the
1695 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1696 'id' information, otherwise it is useless.
1700 backend_status = self._get_status_from_output(bjobs_output)
1704 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"], job_id=str(self.job_id))
1706 backend_status = self._get_status_from_output(bjobs_output)
1711 exit_code = self.get_exit_code_from_file()
1712 except FileNotFoundError:
1713 waiting_time = datetime.now() - self.exit_code_file_initial_time
1714 if self.time_to_wait_for_exit_code_file > waiting_time:
1715 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1718 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1721 backend_status =
"EXIT"
1723 backend_status =
"FINISHED"
1725 new_job_status = self.backend_code_to_status[backend_status]
1726 except KeyError
as err:
1727 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1729 if new_job_status != self.job.status:
1730 self.job.status = new_job_status
1732 def _get_status_from_output(self, output):
1733 if output[
"JOBS"]
and "ERROR" in output[
"JOBS"][0]:
1734 if output[
"JOBS"][0][
"ERROR"] == f
"Job <{self.job_id}> is not found":
1735 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1737 raise BackendError(f
"Unidentified Error during status check for {self.job}: {output}")
1739 for job_info
in output[
"JOBS"]:
1740 if job_info[
"JOBID"] == self.job_id:
1741 return job_info[
"STAT"]
1743 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1746 def _create_parent_job_result(cls, parent):
1747 parent.result = cls.LSFResult(parent,
None)
1750 def _create_job_result(cls, job, batch_output):
1753 m = re.search(
r"Job <(\d+)>", str(batch_output))
1757 raise BackendError(f
"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1759 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1760 job.result = cls.LSFResult(job, job_id)
1762 def can_submit(self, njobs=1):
1764 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1765 Returns True if the number is lower that the limit, False if it is higher.
1768 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1769 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1770 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1771 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1772 and check again before submitting more.
1774 B2DEBUG(29,
"Calling LSF().can_submit()")
1775 job_info = self.bjobs(output_fields=[
"stat"])
1776 total_jobs = job_info[
"NJOBS"]
1777 B2INFO(f
"Total jobs active in the LSF system is currently {total_jobs}")
1778 if (total_jobs + njobs) > self.global_job_limit:
1779 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1782 B2INFO(
"There is enough space to submit more jobs.")
1786 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1788 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1789 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1792 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1793 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1794 the output of this function will be only information about this job. If this argument is not given, then all jobs
1795 matching the other filters will be returned.
1796 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1797 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1798 receive job information from all known user jobs matching the other filters.
1799 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1802 dict: JSON dictionary of the form:
1804 .. code-block:: python
1807 "NJOBS":<njobs returned by command>,
1810 <output field: value>, ...
1815 B2DEBUG(29, f
"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1817 if not output_fields:
1818 output_fields = [
"id"]
1820 field_list_cmd =
"\""
1821 field_list_cmd +=
" ".join(output_fields)
1822 field_list_cmd +=
"\""
1823 cmd_list = [
"bjobs",
"-o", field_list_cmd]
1826 cmd_list.extend([
"-q", queue])
1829 cmd_list.extend([
"-u", username])
1831 cmd_list.append(
"-json")
1834 cmd_list.append(job_id)
1836 cmd =
" ".join(cmd_list)
1837 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1838 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True))
1839 output[
"NJOBS"] = output[
"JOBS"]
1840 output[
"JOBS"] = output[
"RECORDS"]
1841 del output[
"RECORDS"]
1842 del output[
"COMMAND"]
1846 def bqueues(cls, output_fields=None, queues=None):
1848 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1849 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1852 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1853 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1854 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1855 By default you will receive information about all queues.
1858 dict: JSON dictionary of the form:
1860 .. code-block:: python
1863 "COMMAND":"bqueues",
1867 "QUEUE_NAME":"b2_beast",
1868 "STATUS":"Open:Active",
1876 B2DEBUG(29, f
"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1878 if not output_fields:
1879 output_fields = [
"queue_name",
"status",
"max",
"njobs",
"pend",
"run"]
1881 field_list_cmd =
"\""
1882 field_list_cmd +=
" ".join(output_fields)
1883 field_list_cmd +=
"\""
1884 cmd_list = [
"bqueues",
"-o", field_list_cmd]
1886 cmd_list.append(
"-json")
1889 cmd_list.extend(queues)
1891 cmd =
" ".join(cmd_list)
1892 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1893 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1894 return decode_json_string(output)
1897 class HTCondor(Batch):
1899 Backend for submitting calibration processes to a HTCondor batch system.
1902 batch_submit_script =
"submit.sub"
1904 submission_cmds = [
"condor_submit",
"-terse"]
1906 default_global_job_limit = 10000
1908 default_backend_args = {
1909 "universe":
"vanilla",
1911 "request_memory":
"4 GB",
1916 default_class_ads = [
"GlobalJobId",
"JobStatus",
"Owner"]
1918 def _make_submit_file(self, job, submit_file_path):
1920 Fill HTCondor submission file.
1924 files_to_transfer = [i.as_posix()
for i
in job.working_dir.iterdir()]
1926 job_backend_args = {**self.backend_args, **job.backend_args}
1928 with open(submit_file_path,
"w")
as submit_file:
1929 print(f
'executable = {self.get_submit_script_path(job)}', file=submit_file)
1930 print(f
'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1931 print(f
'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1932 print(f
'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1933 print(
'transfer_input_files = ',
','.join(files_to_transfer), file=submit_file)
1934 print(f
'universe = {job_backend_args["universe"]}', file=submit_file)
1935 print(f
'getenv = {job_backend_args["getenv"]}', file=submit_file)
1936 print(f
'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1937 print(
'should_transfer_files = Yes', file=submit_file)
1938 print(
'when_to_transfer_output = ON_EXIT', file=submit_file)
1940 for line
in job_backend_args[
"extra_lines"]:
1941 print(line, file=submit_file)
1942 print(
'queue', file=submit_file)
1944 def _add_batch_directives(self, job, batch_file):
1946 For HTCondor leave empty as the directives are already included in the submit file.
1948 print(
'#!/bin/bash', file=batch_file)
1950 def _create_cmd(self, script_path):
1953 submission_cmd = self.submission_cmds[:]
1954 submission_cmd.append(script_path.as_posix())
1955 return submission_cmd
1957 def get_batch_submit_script_path(self, job):
1959 Construct the Path object of the .sub file that we will use to describe the job.
1961 return Path(job.working_dir, self.batch_submit_script)
1964 def _submit_to_batch(cls, cmd):
1966 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1968 job_dir = Path(cmd[-1]).parent.as_posix()
1975 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, cwd=job_dir)
1977 except subprocess.CalledProcessError
as e:
1980 B2ERROR(f
"Error during condor_submit: {str(e)} occurred more than 3 times.")
1983 B2ERROR(f
"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
1985 return sub_out.split()[0]
1987 class HTCondorResult(Result):
1989 Simple class to help monitor status of jobs submitted by HTCondor Backend.
1991 You pass in a `Job` object and job id from a condor_submit command.
1992 When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
1993 to see whether or not the job has finished.
1997 backend_code_to_status = {0:
"submitted",
2006 def __init__(self, job, job_id):
2008 Pass in the job object and the job id to allow the result to do monitoring and perform
2009 post processing of the job.
2011 super().__init__(job)
2013 self.job_id = job_id
2015 def update_status(self):
2017 Update the job's (or subjobs') status by calling condor_q.
2019 B2DEBUG(29, f
"Calling {self.job.name}.result.update_status()")
2021 condor_q_output = HTCondor.condor_q()
2022 if self.job.subjobs:
2023 for subjob
in self.job.subjobs.values():
2024 subjob.result._update_result_status(condor_q_output)
2026 self._update_result_status(condor_q_output)
2028 def _update_result_status(self, condor_q_output):
2030 In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2031 If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2032 ``condor_history``, etc.
2035 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can re-use to find the
2036 status of this job if it was active when that command ran.
2038 B2DEBUG(29, f
"Calling {self.job}.result._update_result_status()")
2040 for job_record
in condor_q_output[
"JOBS"]:
2041 job_id = job_record[
"GlobalJobId"].split(
"#")[1]
2042 if job_id == self.job_id:
2043 B2DEBUG(29, f
"Found {self.job_id} in condor_q_output.")
2044 jobs_info.append(job_record)
2049 exit_code = self.get_exit_code_from_file()
2050 except FileNotFoundError:
2051 waiting_time = datetime.now() - self.exit_code_file_initial_time
2052 if self.time_to_wait_for_exit_code_file > waiting_time:
2053 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2056 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
2059 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2061 jobs_info = [{
"JobStatus": 4,
"HoldReason":
None}]
2065 jobs_info = HTCondor.condor_q(job_id=self.job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2071 jobs_info = HTCondor.condor_history(job_id=self.job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2073 hold_reason =
"No Reason Known"
2077 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2079 job_info = jobs_info[0]
2080 backend_status = job_info[
"JobStatus"]
2082 if backend_status == 5:
2083 hold_reason = job_info.get(
"HoldReason",
None)
2084 B2WARNING(f
"{self.job} on hold because of {hold_reason}. Keep waiting.")
2087 new_job_status = self.backend_code_to_status[backend_status]
2088 except KeyError
as err:
2089 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
2090 if new_job_status != self.job.status:
2091 self.job.status = new_job_status
2094 def _create_job_result(cls, job, job_id):
2097 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
2098 job.result = cls.HTCondorResult(job, job_id)
2101 def _create_parent_job_result(cls, parent):
2102 parent.result = cls.HTCondorResult(parent,
None)
2104 def can_submit(self, njobs=1):
2106 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2107 Returns True if the number is lower that the limit, False if it is higher.
2110 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2111 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2112 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2113 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2114 and check again before submitting more.
2116 B2DEBUG(29,
"Calling HTCondor().can_submit()")
2117 jobs_info = self.condor_q()
2118 total_jobs = jobs_info[
"NJOBS"]
2119 B2INFO(f
"Total jobs active in the HTCondor system is currently {total_jobs}")
2120 if (total_jobs + njobs) > self.global_job_limit:
2121 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2124 B2INFO(
"There is enough space to submit more jobs.")
2128 def condor_q(cls, class_ads=None, job_id="", username=""):
2130 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2131 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2132 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2135 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2136 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2137 by the condor_q call.
2138 job_id (str): String representation of the Job ID given by condor_submit during submission.
2139 If this argument is given then the output of this function will be only information about this job.
2140 If this argument is not given, then all jobs matching the other filters will be returned.
2141 username (str): By default we return information about only the current user's jobs. By giving
2142 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2143 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2144 so it isn't recommended.
2147 dict: JSON dictionary of the form:
2149 .. code-block:: python
2152 "NJOBS":<number of records returned by command>,
2155 <ClassAd: value>, ...
2160 B2DEBUG(29, f
"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2162 class_ads = cls.default_class_ads
2164 field_list_cmd =
",".join(class_ads)
2165 cmd_list = [
"condor_q",
"-json",
"-attributes", field_list_cmd]
2168 cmd_list.append(job_id)
2171 username = os.environ[
"USER"]
2173 if username ==
"all":
2174 cmd_list.append(
"-allusers")
2176 cmd_list.append(username)
2178 cmd =
" ".join(cmd_list)
2179 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2182 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2183 except BaseException:
2187 records = decode_json_string(records)
2190 jobs_info = {
"JOBS": records}
2191 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2195 def condor_history(cls, class_ads=None, job_id="", username=""):
2197 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2198 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2199 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2202 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2203 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2204 by the condor_q call.
2205 job_id (str): String representation of the Job ID given by condor_submit during submission.
2206 If this argument is given then the output of this function will be only information about this job.
2207 If this argument is not given, then all jobs matching the other filters will be returned.
2208 username (str): By default we return information about only the current user's jobs. By giving
2209 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2210 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2211 and isn't recommended.
2214 dict: JSON dictionary of the form:
2216 .. code-block:: python
2219 "NJOBS":<number of records returned by command>,
2222 <ClassAd: value>, ...
2227 B2DEBUG(29, f
"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2229 class_ads = cls.default_class_ads
2231 field_list_cmd =
",".join(class_ads)
2232 cmd_list = [
"condor_history",
"-json",
"-attributes", field_list_cmd]
2235 cmd_list.append(job_id)
2238 username = os.environ[
"USER"]
2240 if username !=
"all":
2241 cmd_list.append(username)
2243 cmd =
" ".join(cmd_list)
2244 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2246 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2247 except BaseException:
2251 records = decode_json_string(records)
2255 jobs_info = {
"JOBS": records}
2256 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2260 class DIRAC(Backend):
2262 Backend for submitting calibration processes to the grid.
2266 class BackendError(Exception):
2268 Base exception class for Backend classes.
2272 class JobError(Exception):
2274 Base exception class for Job objects.
2278 class SplitterError(Exception):
2280 Base exception class for SubjobSplitter objects.