14from basf2
import B2DEBUG, B2ERROR, B2INFO, B2WARNING
17from abc
import ABC, abstractmethod
19import xml.etree.ElementTree
as ET
21from pathlib
import Path
22from collections
import deque
23from itertools
import count, takewhile
26from datetime
import datetime, timedelta
28import multiprocessing
as mp
30from caf.utils
import method_dispatch
31from caf.utils
import decode_json_string
32from caf.utils
import grouper
33from caf.utils
import parse_file_uri
36__all__ = [
"Job",
"SubJob",
"Backend",
"Local",
"Batch",
"LSF",
"PBS",
"HTCondor",
"get_input_data"]
39_input_data_file_path = Path(
"__BACKEND_INPUT_FILES__.json")
41_STDOUT_FILE =
"stdout"
43_STDERR_FILE =
"stderr"
45_backend_job_envvars = (
49 "BELLE2_EXTERNALS_TOPDIR",
50 "BELLE2_CONDB_METADATA",
57 Simple JSON load of the default input data file. Will contain a list of string file paths
58 for use by the job process.
60 with open(_input_data_file_path)
as input_data_file:
61 input_data = json.load(input_data_file)
65def monitor_jobs(args, jobs):
66 unfinished_jobs = jobs[:]
68 while unfinished_jobs:
69 B2INFO(
"Updating statuses of unfinished jobs...")
70 for j
in unfinished_jobs:
72 B2INFO(
"Checking if jobs are ready...")
73 for j
in unfinished_jobs[:]:
75 if j.status ==
"failed":
76 B2ERROR(f
"{j} is failed")
79 B2INFO(f
"{j} is finished")
80 unfinished_jobs.remove(j)
82 B2INFO(f
"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
83 time.sleep(args.heartbeat)
85 B2ERROR(f
"{failed_jobs} jobs failed")
87 B2INFO(
'All jobs finished successfully')
90class ArgumentsGenerator():
91 def __init__(self, generator_function, *args, **kwargs):
93 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
94 initialise it. This lets us reuse a generator by setting it up again fresh. This
is not
95 optimal
for expensive calculations, but it
is nice
for making large sequences of
96 Job input arguments on the fly.
99 generator_function (py:function): A function (callable) that contains a ``
yield`` statement. This generator
100 should *
not* be initialised i.e. you haven
't called it with ``generator_function(*args, **kwargs)`` yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
101 args (tuple): The positional arguments you want to send into the initialisation of the generator.
102 kwargs (dict): The keyword arguments you want to send into the initialisation of the generator.
105 self.generator_function = generator_function
115 generator: The initialised generator (using the args and kwargs
for initialisation). It should be ready
116 to have ``next``/``send`` called on it.
118 gen = self.generator_function(*self.args, **self.kwargs)
123def range_arguments(start=0, stop=None, step=1):
125 A simple example Arguments Generator function for use
as a `ArgumentsGenerator.generator_function`.
126 It will
return increasing values using itertools.count. By default it
is infinite
and will
not call `StopIteration`.
127 The `SubJob` object
is sent into this function
with `send` but
is not used.
130 start (int): The starting value that will be returned.
131 stop (int): At this value the `StopIteration` will be thrown. If this
is `
None` then this generator will
continue
133 step (int): The step size.
139 if stop
is not None and x >= stop:
145 subjob = (
yield None)
147 for i
in takewhile(
lambda x:
not should_stop(x), count(start, step)):
149 B2DEBUG(29, f
"{subjob} arguments will be {args}")
150 subjob = (
yield args)
153class SubjobSplitter(ABC):
155 Abstract base class. This
class handles the logic of creating subjobs for a `Job` object.
156 The `create_subjobs` function should be implemented
and used to construct
157 the subjobs of the parent job object.
160 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will
yield the argument
161 tuple
for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs`
is
162 called. The `SubJob` will be sent into the generator
with ``send(subjob)`` so that the generator can decide what
166 def __init__(self, *, arguments_generator=None):
168 Derived classes should call `super` to run this.
171 self.arguments_generator = arguments_generator
174 def create_subjobs(self, job):
176 Implement this method in derived classes to generate the `SubJob` objects.
179 def assign_arguments(self, job):
181 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
184 if self.arguments_generator:
185 arg_gen = self.arguments_generator.generator
187 for subjob
in sorted(job.subjobs.values(), key=
lambda sj: sj.id):
190 args = arg_gen.send(subjob)
191 except StopIteration:
192 B2ERROR(f
"StopIteration called when getting args for {subjob}, "
193 "setting all subsequent subjobs to have empty argument tuples.")
198 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
202 B2INFO(f
"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
203 "won't automatically have arguments assigned.")
206 return f
"{self.__class__.__name__}"
209class MaxFilesSplitter(SubjobSplitter):
211 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
214 max_files_per_subjob (int): The maximum number of input files used per `SubJob` created.
216 super().__init__(arguments_generator=arguments_generator)
218 self.max_files_per_subjob = max_files_per_subjob
220 def create_subjobs(self, job):
222 This function creates subjobs for the parent job passed
in. It creates
as many subjobs
as required
223 in order to prevent the number of input files per subjob going over the limit set by
224 `MaxFilesSplitter.max_files_per_subjob`.
226 if not job.input_files:
227 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
230 for i, subjob_input_files
in enumerate(grouper(self.max_files_per_subjob, job.input_files)):
231 job.create_subjob(i, input_files=subjob_input_files)
233 self.assign_arguments(job)
235 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
238class MaxSubjobsSplitter(SubjobSplitter):
240 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
243 max_subjobs (int): The maximum number ofsubjobs that will be created.
245 super().__init__(arguments_generator=arguments_generator)
247 self.max_subjobs = max_subjobs
249 def create_subjobs(self, job):
251 This function creates subjobs for the parent job passed
in. It creates
as many subjobs
as required
252 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
253 more input files than `max_subjobs` it instead groups files by the minimum number per subjob
in order to
254 respect the subjob limit e.g. If you have 11 input files
and a maximum number of subjobs of 4, then it
255 will create 4 subjobs, 3 of them
with 3 input files,
and one
with 2 input files.
257 if not job.input_files:
258 B2WARNING(f
"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
262 remaining_input_files = deque(job.input_files)
264 available_subjobs = self.max_subjobs
266 while remaining_input_files:
268 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
270 subjob_input_files = []
271 for i
in range(num_input_files):
272 subjob_input_files.append(remaining_input_files.popleft())
274 job.create_subjob(subjob_i, input_files=subjob_input_files)
276 available_subjobs -= 1
278 self.assign_arguments(job)
279 B2INFO(f
"{self} created {subjob_i} Subjobs for {job}")
282class ArgumentsSplitter(SubjobSplitter):
284 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
285 Be VERY careful to
not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
286 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to
try and prevent this
and throw an exception.
288 This splitter
is useful
for MC production jobs where you don
't have any input files, but you want to control the exp/run numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
289 assigned to every `SubJob`.
292 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that
is used to assign arguments
295 def __init__(self, *, arguments_generator=None, max_subjobs=None):
298 super().__init__(arguments_generator=arguments_generator)
300 self.max_subjobs = max_subjobs
302 def create_subjobs(self, job):
304 This function creates subjobs for the parent job passed
in. It creates subjobs until the
305 `SubjobSplitter.arguments_generator` finishes.
307 If `ArgumentsSplitter.max_subjobs`
is set, then it will throw an exception
if more than this number of
310 arg_gen = self.arguments_generator.generator
313 if i >= self.max_subjobs:
314 raise SplitterError(f
"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
316 subjob = SubJob(job, i, job.input_files)
317 args = arg_gen.send(subjob)
318 B2INFO(f
"Creating {job}.{subjob}")
319 B2DEBUG(29, f
"Arguments for {subjob}: {args}")
321 job.subjobs[i] = subjob
322 except StopIteration:
324 B2INFO(f
"{self} created {i+1} Subjobs for {job}")
329 This generic Job object is used to tell a Backend what to do.
330 This object basically holds necessary information about a process you want to submit to a `Backend`.
331 It should *
not* do anything that
is backend specific, just hold the configuration
for a job to be
332 successfully submitted
and monitored using a backend. The result attribute
is where backend
333 specific job monitoring goes.
336 name (str): Simply a name to describe the Job,
not used
for any critical purpose
in the CAF
338 .. warning:: It
is recommended to always use absolute paths
for files when submitting a `Job`.
343 statuses = {"init": 0,
"submitted": 1,
"running": 2,
"failed": 3,
"completed": 4}
346 exit_statuses = [
"failed",
"completed"]
348 def __init__(self, name, job_dict=None):
359 self.input_sandbox_files = []
361 self.working_dir = Path()
363 self.output_dir = Path()
365 self.output_patterns = []
371 self.input_files = []
376 self.backend_args = {}
380 self.input_sandbox_files = [Path(p)
for p
in job_dict[
"input_sandbox_files"]]
381 self.working_dir = Path(job_dict[
"working_dir"])
382 self.output_dir = Path(job_dict[
"output_dir"])
383 self.output_patterns = job_dict[
"output_patterns"]
384 self.cmd = job_dict[
"cmd"]
385 self.args = job_dict[
"args"]
386 self.input_files = job_dict[
"input_files"]
387 self.setup_cmds = job_dict[
"setup_cmds"]
388 self.backend_args = job_dict[
"backend_args"]
390 for subjob_dict
in job_dict[
"subjobs"]:
391 self.create_subjob(subjob_dict[
"id"], input_files=subjob_dict[
"input_files"], args=subjob_dict[
"args"])
397 self._status =
"init"
401 Representation of Job class (what happens when you
print a Job() instance).
403 return f
"Job({self.name})"
407 Returns whether or not the Job has finished. If the job has subjobs then it will
return true when they are all finished.
408 It will
return False as soon
as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
409 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses
if necessary.
412 B2DEBUG(29, f
"You requested the ready() status for {self} but there is no result object set, returning False.")
415 return self.result.ready()
417 def update_status(self):
419 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any) in the best way
for the type of result object/backend.
422 B2DEBUG(29, f
"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
424 self.result.update_status()
427 def create_subjob(self, i, input_files=None, args=None):
429 Creates a subjob Job object that references that parent Job.
430 Returns the SubJob object at the end.
432 if i
not in self.subjobs:
433 B2INFO(f
"Creating {self}.Subjob({i})")
434 subjob = SubJob(self, i, input_files)
437 self.subjobs[i] = subjob
440 B2WARNING(f
"{self} already contains SubJob({i})! This will not be created.")
445 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
446 subjob status
in the hierarchy of statuses
in `Job.statuses`.
449 job_status = self._get_overall_status_from_subjobs()
450 if job_status != self._status:
452 self.status = job_status
455 def _get_overall_status_from_subjobs(self):
456 subjob_statuses = [subjob.status
for subjob
in self.subjobs.values()]
457 status_level = min([self.statuses[status]
for status
in subjob_statuses])
458 for status, level
in self.statuses.items():
459 if level == status_level:
463 def status(self, status):
465 Sets the status of this Job.
468 if status ==
'failed':
469 B2ERROR(f
"Setting {self.name} status to failed")
471 B2INFO(f
"Setting {self.name} status to {status}")
472 self._status = status
475 def output_dir(self):
476 return self._output_dir
479 def output_dir(self, value):
480 self._output_dir = Path(value).absolute()
483 def working_dir(self):
484 return self._working_dir
487 def working_dir(self, value):
488 self._working_dir = Path(value).absolute()
491 def input_sandbox_files(self):
492 return self._input_sandbox_files
494 @input_sandbox_files.setter
495 def input_sandbox_files(self, value):
496 self._input_sandbox_files = [Path(p).absolute()
for p
in value]
499 def input_files(self):
500 return self._input_files
503 def input_files(self, value):
504 self._input_files = value
507 def max_subjobs(self):
508 return self.splitter.max_subjobs
511 def max_subjobs(self, value):
512 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
513 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
516 def max_files_per_subjob(self):
517 return self.splitter.max_files_per_subjob
519 @max_files_per_subjob.setter
520 def max_files_per_subjob(self, value):
521 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
522 B2DEBUG(29, f
"Changed splitter to {self.splitter} for {self}.")
524 def dump_to_json(self, file_path):
526 Dumps the Job object configuration to a JSON file so that it can be read in again later.
529 file_path(`basf2.Path`): The filepath we
'll dump to """
530 with open(file_path, mode=
"w")
as job_file:
531 json.dump(self.job_dict, job_file, indent=2)
534 def from_json(cls, file_path):
535 with open(file_path)
as job_file:
536 job_dict = json.load(job_file)
537 return cls(job_dict[
"name"], job_dict=job_dict)
543 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
544 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
547 job_dict["name"] = self.name
548 job_dict[
"input_sandbox_files"] = [i.as_posix()
for i
in self.input_sandbox_files]
549 job_dict[
"working_dir"] = self.working_dir.as_posix()
550 job_dict[
"output_dir"] = self.output_dir.as_posix()
551 job_dict[
"output_patterns"] = self.output_patterns
552 job_dict[
"cmd"] = self.cmd
553 job_dict[
"args"] = self.args
554 job_dict[
"input_files"] = self.input_files
555 job_dict[
"setup_cmds"] = self.setup_cmds
556 job_dict[
"backend_args"] = self.backend_args
557 job_dict[
"subjobs"] = [sj.job_dict
for sj
in self.subjobs.values()]
560 def dump_input_data(self):
562 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
565 with open(Path(self.working_dir, _input_data_file_path), mode=
"w")
as input_data_file:
566 json.dump(self.input_files, input_data_file, indent=2)
568 def copy_input_sandbox_files_to_working_dir(self):
570 Get all of the requested files for the input sandbox
and copy them to the working directory.
571 Files like the submit.sh
or input_data.json are
not part of this process.
573 for file_path
in self.input_sandbox_files:
574 if file_path.is_dir():
575 shutil.copytree(file_path, Path(self.working_dir, file_path.name))
577 shutil.copy(file_path, self.working_dir)
579 def check_input_data_files(self):
581 Check the input files and make sure that there aren
't any duplicates.
582 Also check if the files actually exist
if possible.
584 existing_input_files = []
585 for file_path
in self.input_files:
586 file_uri = parse_file_uri(file_path)
587 if file_uri.scheme ==
"file":
588 p = Path(file_uri.path)
590 if file_uri.geturl()
not in existing_input_files:
591 existing_input_files.append(file_uri.geturl())
593 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
595 B2WARNING(f
"Requested input file path {file_path} does not exist, skipping it.")
597 B2DEBUG(29, f
"{file_path} is not a local file URI. Skipping checking if file exists")
598 if file_path
not in existing_input_files:
599 existing_input_files.append(file_path)
601 B2WARNING(f
"Requested input file path {file_path} was already added, skipping it.")
602 if self.input_files
and not existing_input_files:
603 B2WARNING(f
"No valid input file paths found for {self.name}, but some were requested.")
606 self.input_files = existing_input_files
609 def full_command(self):
612 str: The full command that this job will run including any arguments.
614 all_components = self.cmd[:]
615 all_components.extend(self.args)
617 full_command =
" ".join(map(str, all_components))
618 B2DEBUG(29, f
"Full command of {self} is '{full_command}'")
621 def append_current_basf2_setup_cmds(self):
623 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
624 It should detect if you are using a local release
or CVMFS
and append the correct commands
625 so that the job will have the same basf2 release environment. It should also detect
626 if a local release
is not compiled
with the ``opt`` option.
628 Note that this *doesn
't mean that every environment variable is inherited* from the submitting
631 def append_environment_variable(cmds, envvar):
633 Append a command for setting an environment variable.
635 if envvar
in os.environ:
636 cmds.append(f
"""if [ -z "${{{envvar}}}" ]; then""")
637 cmds.append(f
" export {envvar}={os.environ[envvar]}")
640 if "BELLE2_TOOLS" not in os.environ:
641 raise BackendError(
"No BELLE2_TOOLS found in environment")
643 for envvar
in _backend_job_envvars:
644 append_environment_variable(self.setup_cmds, envvar)
645 if "BELLE2_RELEASE" in os.environ:
646 self.setup_cmds.append(f
"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
647 elif 'BELLE2_LOCAL_DIR' in os.environ:
648 self.setup_cmds.append(
"export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
649 self.setup_cmds.append(f
"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
650 self.setup_cmds.append(f
"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
651 self.setup_cmds.append(f
"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
652 self.setup_cmds.append(
"pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
653 self.setup_cmds.append(
"source $BACKEND_B2SETUP")
655 self.setup_cmds.append(
"b2code-option $BACKEND_BELLE2_OPTION")
656 self.setup_cmds.append(
"popd > /dev/null")
661 This mini-class simply holds basic information about which subjob you are
662 and a reference to the parent Job object to be able to access the main data there.
663 Rather than replicating all of the parent job
's configuration again.
666 def __init__(self, job, subjob_id, input_files=None):
676 self.input_files = input_files
681 self._status =
"init"
686 def output_dir(self):
688 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this.
"""
689 return Path(self.parent.output_dir, str(self.id))
692 def working_dir(self):
693 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
694 return Path(self.parent.working_dir, str(self.id))
698 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
699 return "_".join((self.parent.name, str(self.id)))
704 Returns the status of this SubJob.
709 def status(self, status):
711 Sets the status of this Job.
714 if status ==
"failed":
715 B2ERROR(f
"Setting {self.name} status to failed")
717 B2INFO(f
"Setting {self.name} status to {status}")
718 self._status = status
723 A subjob cannot have subjobs. Always return empty list.
731 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
732 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config we only output the input files and arguments that are specific to this subjob
and no other details.
735 job_dict["id"] = self.id
736 job_dict[
"input_files"] = self.input_files
737 job_dict[
"args"] = self.args
740 def __getattr__(self, attribute):
742 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
743 unless otherwise specified.
745 return getattr(self.parent, attribute)
750 return f
"SubJob({self.name})"
755 Abstract base class for a valid backend.
756 Classes derived
from this will implement their own submission of basf2 jobs
757 to whatever backend they describe.
758 Some common methods/attributes go into this base
class.
760 For backend_args the priority
from lowest to highest
is:
762 backend.default_backend_args -> backend.backend_args -> job.backend_args
765 submit_script = "submit.sh"
767 exit_code_file =
"__BACKEND_CMD_EXIT_STATUS__"
769 default_backend_args = {}
771 def __init__(self, *, backend_args=None):
774 if backend_args
is None:
777 self.backend_args = {**self.default_backend_args, **backend_args}
780 def submit(self, job):
782 Base method for submitting collection jobs to the backend type. This MUST be
783 implemented
for a correctly written backend
class deriving from Backend().
787 def _add_setup(job, batch_file):
789 Adds setup lines to the shell script file.
791 for line
in job.setup_cmds:
792 print(line, file=batch_file)
794 def _add_wrapper_script_setup(self, job, batch_file):
796 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
797 `trap` statements
for Ctrl-C situations.
799 start_wrapper = f"""# ---
800# trap ctrl-c and call ctrl_c()
801trap '(ctrl_c 130)' SIGINT
802trap '(ctrl_c 143)' SIGTERM
804function write_exit_code() {{
805 echo "Writing $1 to exit status file"
806 echo
"$1" > {self.exit_code_file}
811 trap
'' SIGINT SIGTERM
812 echo
"** Trapped Ctrl-C **"
813 echo
"$1" > {self.exit_code_file}
817 print(start_wrapper, file=batch_file)
819 def _add_wrapper_script_teardown(self, job, batch_file):
821 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
822 an exit code of the job cmd being written out to a file. Which means that we can know
if the command was
823 successful
or not even
if the backend server/monitoring database purges the data about our job i.e. If PBS
824 removes job information too quickly we may never know
if a job succeeded
or failed without some kind of exit
827 end_wrapper = """# ---
829 print(end_wrapper, file=batch_file)
832 def _create_parent_job_result(cls, parent):
834 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
835 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
836 statuses
and allows the use of ready().
838 raise NotImplementedError
840 def get_submit_script_path(self, job):
842 Construct the Path object of the bash script file that we will submit. It will contain
843 the actual job command, wrapper commands, setup commands, and any batch directives
845 return Path(job.working_dir, self.submit_script)
850 Base class for Result objects. A Result
is created
for each `Job` (
or `Job.SubJob`) object
851 submitted to a backend. It provides a way to query a job
's status to find out if it's ready.
854 def __init__(self, job):
856 Pass in the job object to allow the result to access the job
's properties and do post-processing. """
860 self._is_ready = False
863 self.time_to_wait_for_exit_code_file = timedelta(minutes=5)
865 self.exit_code_file_initial_time =
None
869 Returns whether or not this job result
is known to be ready. Doesn
't actually change the job status. Just changes the 'readiness' based on the known job status.
871 B2DEBUG(29, f"Calling {self.job}.result.ready()")
874 elif self.job.status
in self.job.exit_statuses:
875 self._is_ready =
True
880 def update_status(self):
882 Update the job's (and subjobs') status so that `Result.ready` will
return the up to date status. This call will have to
883 actually look up the job
's status from some database/exit code file.
885 raise NotImplementedError
887 def get_exit_code_from_file(self):
889 Read the exit code file to discover the exit status of the job command. Useful fallback if the job
is no longer
890 known to the job database (batch system purged it
for example). Since some backends may take time to download
891 the output files of the job back to the working directory we use a time limit on how long to wait.
893 if not self.exit_code_file_initial_time:
894 self.exit_code_file_initial_time = datetime.now()
895 exit_code_path = Path(self.job.working_dir, Backend.exit_code_file)
896 with open(exit_code_path)
as f:
897 exit_code = int(f.read().strip())
898 B2DEBUG(29, f
"Exit code from file for {self.job} was {exit_code}")
904 Backend for local processes i.e. on the same machine but
in a subprocess.
906 Note that you should call the self.join() method to close the pool
and wait
for any
907 running processes to finish before exiting the process. Once you
've called join you will have to set up a new
908 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
909 somewhere, then the main python process might end before your pool
is done.
912 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
913 It
's the maximum simultaneous subjobs.
914 Try not to specify a large number
or a number larger than the number of cores.
915 It won
't crash the program but it will slow down and negatively impact performance.
918 def __init__(self, *, backend_args=None, max_processes=1):
921 super().__init__(backend_args=backend_args)
925 self.max_processes = max_processes
927 class LocalResult(Result):
929 Result class to help
monitor status of jobs submitted by Local backend.
932 def __init__(self, job, result):
934 Pass in the job object
and the multiprocessing result to allow the result to do monitoring
and perform
935 post processing of the job.
937 super().__init__(job)
941 def _update_result_status(self):
942 if self.result.ready()
and (self.job.status
not in self.job.exit_statuses):
943 return_code = self.result.get()
945 self.job.status =
"failed"
947 self.job.status =
"completed"
949 def update_status(self):
951 Update the job's (or subjobs') status by calling the result object.
953 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
955 for subjob
in self.job.subjobs.values():
956 subjob.result._update_result_status()
958 self._update_result_status()
962 Closes and joins the Pool, letting you wait
for all results currently
965 B2INFO("Joining Process Pool, waiting for results to finish...")
968 B2INFO(
"Process Pool joined.")
971 def max_processes(self):
973 Getter for max_processes
975 return self._max_processes
977 @max_processes.setter
978 def max_processes(self, value):
980 Setter for max_processes, we also check
for a previous Pool(), wait
for it to join
981 and then create a new one
with the new value of max_processes
984 self._max_processes = value
986 B2INFO(
"New max_processes requested. But a pool already exists.")
988 B2INFO(f
"Starting up new Pool with {self.max_processes} processes")
989 self.pool = mp.Pool(processes=self.max_processes)
992 def submit(self, job):
995 raise NotImplementedError(
"This is an abstract submit(job) method that shouldn't have been called. "
996 "Did you submit a (Sub)Job?")
998 @submit.register(SubJob)
1001 Submission of a `SubJob` for the Local backend
1004 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1006 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1007 job.copy_input_sandbox_files_to_working_dir()
1008 job.dump_input_data()
1010 script_path = self.get_submit_script_path(job)
1011 with open(script_path, mode=
"w")
as batch_file:
1012 print(
"#!/bin/bash", file=batch_file)
1013 self._add_wrapper_script_setup(job, batch_file)
1014 self._add_setup(job, batch_file)
1015 print(job.full_command, file=batch_file)
1016 self._add_wrapper_script_teardown(job, batch_file)
1017 B2INFO(f
"Submitting {job}")
1018 job.result = Local.LocalResult(job,
1019 self.pool.apply_async(self.run_job,
1026 job.status =
"submitted"
1027 B2INFO(f
"{job} submitted")
1029 @submit.register(Job)
1032 Submission of a `Job` for the Local backend
1035 job.output_dir.mkdir(parents=
True, exist_ok=
True)
1037 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1039 job.check_input_data_files()
1041 if not job.splitter:
1043 job.copy_input_sandbox_files_to_working_dir()
1044 job.dump_input_data()
1046 script_path = self.get_submit_script_path(job)
1047 with open(script_path, mode=
"w")
as batch_file:
1048 print(
"#!/bin/bash", file=batch_file)
1049 self._add_wrapper_script_setup(job, batch_file)
1050 self._add_setup(job, batch_file)
1051 print(job.full_command, file=batch_file)
1052 self._add_wrapper_script_teardown(job, batch_file)
1053 B2INFO(f
"Submitting {job}")
1054 job.result = Local.LocalResult(job,
1055 self.pool.apply_async(self.run_job,
1062 B2INFO(f
"{job} submitted")
1065 job.splitter.create_subjobs(job)
1067 self.submit(list(job.subjobs.values()))
1069 self._create_parent_job_result(job)
1071 @submit.register(list)
1074 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1079 B2INFO(
"All requested jobs submitted.")
1082 def run_job(name, working_dir, output_dir, script):
1084 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1085 shell command
in a subprocess
and captures the stdout
and stderr of the subprocess to files.
1087 B2INFO(f"Starting Sub-process: {name}")
1088 from subprocess
import Popen
1089 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1090 stderr_file_path = Path(working_dir, _STDERR_FILE)
1092 B2INFO(f
"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1093 with open(stdout_file_path, mode=
"w", buffering=1)
as f_out, \
1094 open(stderr_file_path, mode=
"w", buffering=1)
as f_err:
1095 with Popen([
"/bin/bash", script.as_posix()],
1099 universal_newlines=
True,
1104 B2INFO(f
"Subprocess {name} finished.")
1108 def _create_parent_job_result(cls, parent):
1109 parent.result = cls.LocalResult(parent,
None)
1112class Batch(Backend):
1114 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1115 in a derived
class. Do
not use this
class directly!
1118 submission_cmds = []
1131 default_global_job_limit = 1000
1133 default_sleep_between_submission_checks = 30
1135 def __init__(self, *, backend_args=None):
1137 Init method for Batch Backend. Does some basic default setup.
1139 super().__init__(backend_args=backend_args)
1142 self.global_job_limit = self.default_global_job_limit
1145 self.sleep_between_submission_checks = self.default_sleep_between_submission_checks
1147 def _add_batch_directives(self, job, file):
1149 Should be implemented in a derived
class to write a batch submission script to the job.working_dir.
1150 You should think about where the stdout/err should go,
and set the queue name.
1152 raise NotImplementedError(
"Need to implement a _add_batch_directives(self, job, file) "
1153 f
"method in {self.__class__.__name__} backend.")
1155 def _make_submit_file(self, job, submit_file_path):
1157 Useful for the HTCondor backend where a submit
is needed instead of batch
1158 directives pasted directly into the submission script. It should be overwritten
1164 def _submit_to_batch(cls, cmd):
1166 Do the actual batch submission command and collect the output to find out the job id
for later monitoring.
1169 def can_submit(self, *args, **kwargs):
1171 Should be implemented in a derived
class to check that submitting the next job(s) shouldn
't fail. This is initially meant to make sure that we don
't go over the global limits of jobs (submitted + running).
1174 bool: If the job submission can continue based on the current situation.
1179 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1182 raise NotImplementedError(
"This is an abstract submit(job) method that shouldn't have been called. "
1183 "Did you submit a (Sub)Job?")
1185 @submit.register(SubJob)
1186 def _(self, job, check_can_submit=True, jobs_per_check=100):
1188 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1189 create batch script,
and send it off
with the batch submission command.
1190 It should apply the correct options (default
and user requested).
1192 Should set a Result object
as an attribute of the job.
1197 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1198 job.copy_input_sandbox_files_to_working_dir()
1199 job.dump_input_data()
1201 batch_submit_script_path = self.get_batch_submit_script_path(job)
1202 self._make_submit_file(job, batch_submit_script_path)
1204 script_path = self.get_submit_script_path(job)
1206 with open(script_path, mode=
"w")
as batch_file:
1207 self._add_batch_directives(job, batch_file)
1208 self._add_wrapper_script_setup(job, batch_file)
1209 self._add_setup(job, batch_file)
1210 print(job.full_command, file=batch_file)
1211 self._add_wrapper_script_teardown(job, batch_file)
1212 os.chmod(script_path, 0o755)
1213 B2INFO(f
"Submitting {job}")
1215 cmd = self._create_cmd(batch_submit_script_path)
1216 output = self._submit_to_batch(cmd)
1217 self._create_job_result(job, output)
1218 job.status =
"submitted"
1219 B2INFO(f
"{job} submitted")
1221 @submit.register(Job)
1222 def _(self, job, check_can_submit=True, jobs_per_check=100):
1224 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1225 and send it off
with the batch submission command, applying the correct options (default
and user requested.)
1227 Should set a Result object
as an attribute of the job.
1232 job.working_dir.mkdir(parents=
True, exist_ok=
True)
1234 job.check_input_data_files()
1240 if not job.splitter:
1242 job.copy_input_sandbox_files_to_working_dir()
1243 job.dump_input_data()
1245 batch_submit_script_path = self.get_batch_submit_script_path(job)
1246 self._make_submit_file(job, batch_submit_script_path)
1248 script_path = self.get_submit_script_path(job)
1250 with open(script_path, mode=
"w")
as batch_file:
1251 self._add_batch_directives(job, batch_file)
1252 self._add_wrapper_script_setup(job, batch_file)
1253 self._add_setup(job, batch_file)
1254 print(job.full_command, file=batch_file)
1255 self._add_wrapper_script_teardown(job, batch_file)
1256 os.chmod(script_path, 0o755)
1257 B2INFO(f
"Submitting {job}")
1259 cmd = self._create_cmd(batch_submit_script_path)
1260 output = self._submit_to_batch(cmd)
1261 self._create_job_result(job, output)
1262 job.status =
"submitted"
1263 B2INFO(f
"{job} submitted")
1266 job.splitter.create_subjobs(job)
1268 self.submit(list(job.subjobs.values()))
1270 self._create_parent_job_result(job)
1272 @submit.register(list)
1273 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1275 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1277 B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1286 if jobs_per_check > self.global_job_limit:
1287 B2INFO(f
"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1288 f
"limit for this backend (={self.global_job_limit}). Will instead use the "
1289 " value of the global job limit.")
1290 jobs_per_check = self.global_job_limit
1293 for jobs_to_submit
in grouper(jobs_per_check, jobs):
1295 while not self.can_submit(njobs=len(jobs_to_submit)):
1296 B2INFO(
"Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1297 time.sleep(self.sleep_between_submission_checks)
1301 B2INFO(f
"Submitting the next {len(jobs_to_submit)} jobs...")
1302 for job
in jobs_to_submit:
1303 self.submit(job, check_can_submit, jobs_per_check)
1304 B2INFO(f
"All {len(jobs)} requested jobs submitted")
1306 def get_batch_submit_script_path(self, job):
1308 Construct the Path object of the script file that we will submit using the batch command.
1309 For most batch backends this is the same script
as the bash script we submit.
1310 But
for some they require a separate submission file that describes the job.
1311 To implement that you can implement this function
in the Backend
class.
1313 return Path(job.working_dir, self.submit_script)
1317 def _create_job_result(cls, job, batch_output):
1322 def _create_cmd(self, job):
1329 Backend for submitting calibration processes to a qsub batch system.
1332 cmd_wkdir = "#PBS -d"
1334 cmd_stdout =
"#PBS -o"
1336 cmd_stderr =
"#PBS -e"
1338 cmd_queue =
"#PBS -q"
1340 cmd_name =
"#PBS -N"
1342 submission_cmds = [
"qsub"]
1344 default_global_job_limit = 5000
1346 default_backend_args = {
"queue":
"short"}
1348 def __init__(self, *, backend_args=None):
1349 super().__init__(backend_args=backend_args)
1351 def _add_batch_directives(self, job, batch_file):
1353 Add PBS directives to submitted script.
1355 job_backend_args = {**self.backend_args, **job.backend_args}
1356 batch_queue = job_backend_args["queue"]
1357 print(
"#!/bin/bash", file=batch_file)
1358 print(
"# --- Start PBS ---", file=batch_file)
1359 print(
" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1360 print(
" ".join([PBS.cmd_name, job.name]), file=batch_file)
1361 print(
" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1362 print(
" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1363 print(
" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1364 print(
"# --- End PBS ---", file=batch_file)
1367 def _create_job_result(cls, job, batch_output):
1370 job_id = batch_output.replace("\n",
"")
1371 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1372 job.result = cls.PBSResult(job, job_id)
1374 def _create_cmd(self, script_path):
1377 submission_cmd = self.submission_cmds[:]
1378 submission_cmd.append(script_path.as_posix())
1379 return submission_cmd
1382 def _submit_to_batch(cls, cmd):
1384 Do the actual batch submission command and collect the output to find out the job id
for later monitoring.
1386 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
1390 def _create_parent_job_result(cls, parent):
1391 parent.result = cls.PBSResult(parent,
None)
1393 class PBSResult(Result):
1395 Simple class to help
monitor status of jobs submitted by `PBS` Backend.
1397 You
pass in a `Job` object (
or `SubJob`)
and job id
from a qsub command.
1398 When you call the `ready` method it runs bjobs to see whether
or not the job has finished.
1402 backend_code_to_status = {"R":
"running",
1404 "FINISHED":
"completed",
1413 def __init__(self, job, job_id):
1415 Pass in the job object
and the job id to allow the result to do monitoring
and perform
1416 post processing of the job.
1418 super().__init__(job)
1420 self.job_id = job_id
1422 def update_status(self):
1424 Update the job's (or subjobs') status by calling qstat.
1426 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
1428 qstat_output = PBS.qstat()
1429 if self.job.subjobs:
1430 for subjob
in self.job.subjobs.values():
1431 subjob.result._update_result_status(qstat_output)
1433 self._update_result_status(qstat_output)
1435 def _update_result_status(self, qstat_output):
1438 qstat_output (dict): The JSON output of a previous call to qstat which we can reuse to find the
1439 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1440 'job_state' information, otherwise it
is useless.
1444 backend_status = self._get_status_from_output(qstat_output)
1448 B2DEBUG(29, f
"Checking of the exit code from file for {self.job}")
1450 exit_code = self.get_exit_code_from_file()
1451 except FileNotFoundError:
1452 waiting_time = datetime.now() - self.exit_code_file_initial_time
1453 if self.time_to_wait_for_exit_code_file > waiting_time:
1454 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1457 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1460 backend_status =
"E"
1462 backend_status =
"C"
1465 new_job_status = self.backend_code_to_status[backend_status]
1466 except KeyError
as err:
1467 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1469 if new_job_status != self.job.status:
1470 self.job.status = new_job_status
1472 def _get_status_from_output(self, output):
1473 for job_info
in output[
"JOBS"]:
1474 if job_info[
"Job_Id"] == self.job_id:
1475 return job_info[
"job_state"]
1479 def can_submit(self, njobs=1):
1481 Checks the global number of jobs
in PBS right now (submitted
or running)
for this user.
1482 Returns
True if the number
is lower that the limit,
False if it
is higher.
1485 njobs (int): The number of jobs that we want to submit before checking again. Lets us check
if we
1486 are sufficiently below the limit
in order to (somewhat) safely submit. It
is slightly dangerous to
1487 assume that it
is safe to submit too many jobs since there might be other processes also submitting jobs.
1488 So njobs really shouldn
't be abused when you might be getting close to the limit i.e. keep it <=250 and check again before submitting more.
1490 B2DEBUG(29, "Calling PBS().can_submit()")
1491 job_info = self.qstat(username=os.environ[
"USER"])
1492 total_jobs = job_info[
"NJOBS"]
1493 B2INFO(f
"Total jobs active in the PBS system is currently {total_jobs}")
1494 if (total_jobs + njobs) > self.global_job_limit:
1495 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1498 B2INFO(
"There is enough space to submit more jobs.")
1502 def qstat(cls, username="", job_ids=None):
1504 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1505 [
'job_id']
or for the username. The result
is a JSON dictionary containing come of the useful job attributes returned
1508 PBS
is kind of annoying
as depending on the configuration it can forget about jobs immediately. So the status of a
1509 finished job
is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1510 This one should work
for Melbourne
's cloud computing centre.
1513 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>
@hostnames
1514 will be
in the output dictionary.
1515 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument
is given then
1516 the output of this function will be only information about this jobs. If this argument
is not given, then all jobs
1517 matching the other filters will be returned.
1520 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1522 .. code-block:: python
1533 B2DEBUG(29, f"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1536 job_ids = set(job_ids)
1537 cmd_list = [
"qstat",
"-x"]
1539 cmd =
" ".join(cmd_list)
1540 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1541 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1542 jobs_dict = {
"NJOBS": 0,
"JOBS": []}
1543 jobs_xml = ET.fromstring(output)
1546 if len(job_ids) == 1:
1547 job_elem = jobs_xml.find(f
"./Job[Job_Id='{list(job_ids)[0]}']")
1549 jobs_dict[
"JOBS"].append(cls.create_job_record_from_element(job_elem))
1550 jobs_dict[
"NJOBS"] = 1
1555 for job
in jobs_xml.iterfind(
"Job"):
1556 job_owner = job.find(
"Job_Owner").text.split(
"@")[0]
1557 if username
and username != job_owner:
1559 job_id = job.find(
"Job_Id").text
1560 if job_ids
and job_id
not in job_ids:
1562 jobs_dict[
"JOBS"].append(cls.create_job_record_from_element(job))
1563 jobs_dict[
"NJOBS"] += 1
1565 if job_id
in job_ids:
1566 job_ids.remove(job_id)
1570 def create_job_record_from_element(job_elem):
1572 Creates a Job dictionary with various job information
from the XML element returned by qstat.
1575 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1578 dict: JSON serialisable dictionary of the Job information we are interested
in.
1581 job_dict["Job_Id"] = job_elem.find(
"Job_Id").text
1582 job_dict[
"Job_Name"] = job_elem.find(
"Job_Name").text
1583 job_dict[
"Job_Owner"] = job_elem.find(
"Job_Owner").text
1584 job_dict[
"job_state"] = job_elem.find(
"job_state").text
1585 job_dict[
"queue"] = job_elem.find(
"queue").text
1591 Backend for submitting calibration processes to a qsub batch system.
1594 cmd_wkdir = "#BSUB -cwd"
1596 cmd_stdout =
"#BSUB -o"
1598 cmd_stderr =
"#BSUB -e"
1600 cmd_queue =
"#BSUB -q"
1602 cmd_name =
"#BSUB -J"
1604 submission_cmds = [
"bsub",
"-env",
"\"none\"",
"<"]
1606 default_global_job_limit = 15000
1608 default_backend_args = {
"queue":
"s"}
1610 def __init__(self, *, backend_args=None):
1611 super().__init__(backend_args=backend_args)
1613 def _add_batch_directives(self, job, batch_file):
1615 Adds LSF BSUB directives for the job to a script.
1617 job_backend_args = {**self.backend_args, **job.backend_args}
1618 batch_queue = job_backend_args[
"queue"]
1619 print(
"#!/bin/bash", file=batch_file)
1620 print(
"# --- Start LSF ---", file=batch_file)
1621 print(
" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1622 print(
" ".join([LSF.cmd_name, job.name]), file=batch_file)
1623 print(
" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1624 print(
" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1625 print(
" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1626 print(
"# --- End LSF ---", file=batch_file)
1628 def _create_cmd(self, script_path):
1631 submission_cmd = self.submission_cmds[:]
1632 submission_cmd.append(script_path.as_posix())
1633 submission_cmd = " ".join(submission_cmd)
1634 return [submission_cmd]
1637 def _submit_to_batch(cls, cmd):
1639 Do the actual batch submission command and collect the output to find out the job id
for later monitoring.
1641 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=
True)
1644 class LSFResult(Result):
1646 Simple class to help
monitor status of jobs submitted by LSF Backend.
1648 You
pass in a `Job` object
and job id
from a bsub command.
1649 When you call the `ready` method it runs bjobs to see whether
or not the job has finished.
1653 backend_code_to_status = {"RUN":
"running",
1654 "DONE":
"completed",
1655 "FINISHED":
"completed",
1660 def __init__(self, job, job_id):
1662 Pass in the job object
and the job id to allow the result to do monitoring
and perform
1663 post processing of the job.
1665 super().__init__(job)
1667 self.job_id = job_id
1669 def update_status(self):
1671 Update the job's (or subjobs') status by calling bjobs.
1673 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
1675 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"])
1676 if self.job.subjobs:
1677 for subjob
in self.job.subjobs.values():
1678 subjob.result._update_result_status(bjobs_output)
1680 self._update_result_status(bjobs_output)
1682 def _update_result_status(self, bjobs_output):
1685 bjobs_output (dict): The JSON output of a previous call to bjobs which we can reuse to find the
1686 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1687 'id' information, otherwise it
is useless.
1691 backend_status = self._get_status_from_output(bjobs_output)
1695 bjobs_output = LSF.bjobs(output_fields=[
"stat",
"id"], job_id=str(self.job_id))
1697 backend_status = self._get_status_from_output(bjobs_output)
1702 exit_code = self.get_exit_code_from_file()
1703 except FileNotFoundError:
1704 waiting_time = datetime.now() - self.exit_code_file_initial_time
1705 if self.time_to_wait_for_exit_code_file > waiting_time:
1706 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1709 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
1712 backend_status =
"EXIT"
1714 backend_status =
"FINISHED"
1716 new_job_status = self.backend_code_to_status[backend_status]
1717 except KeyError
as err:
1718 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
1720 if new_job_status != self.job.status:
1721 self.job.status = new_job_status
1723 def _get_status_from_output(self, output):
1724 if output[
"JOBS"]
and "ERROR" in output[
"JOBS"][0]:
1725 if output[
"JOBS"][0][
"ERROR"] == f
"Job <{self.job_id}> is not found":
1726 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1728 raise BackendError(f
"Unidentified Error during status check for {self.job}: {output}")
1730 for job_info
in output[
"JOBS"]:
1731 if job_info[
"JOBID"] == self.job_id:
1732 return job_info[
"STAT"]
1734 raise KeyError(f
"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1737 def _create_parent_job_result(cls, parent):
1738 parent.result = cls.LSFResult(parent,
None)
1741 def _create_job_result(cls, job, batch_output):
1744 m = re.search(r"Job <(\d+)>", str(batch_output))
1748 raise BackendError(f
"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1750 B2INFO(f
"Job ID of {job} recorded as: {job_id}")
1751 job.result = cls.LSFResult(job, job_id)
1753 def can_submit(self, njobs=1):
1755 Checks the global number of jobs
in LSF right now (submitted
or running)
for this user.
1756 Returns
True if the number
is lower that the limit,
False if it
is higher.
1759 njobs (int): The number of jobs that we want to submit before checking again. Lets us check
if we
1760 are sufficiently below the limit
in order to (somewhat) safely submit. It
is slightly dangerous to
1761 assume that it
is safe to submit too many jobs since there might be other processes also submitting jobs.
1762 So njobs really shouldn
't be abused when you might be getting close to the limit i.e. keep it <=250 and check again before submitting more.
1764 B2DEBUG(29, "Calling LSF().can_submit()")
1765 job_info = self.bjobs(output_fields=[
"stat"])
1766 total_jobs = job_info[
"NJOBS"]
1767 B2INFO(f
"Total jobs active in the LSF system is currently {total_jobs}")
1768 if (total_jobs + njobs) > self.global_job_limit:
1769 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1772 B2INFO(
"There is enough space to submit more jobs.")
1776 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1778 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1779 'job_id',
'username',
and 'queue'. The result
is the JSON dictionary returned by output of the ``-json`` bjobs option.
1782 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. [
'stat',
'name',
'id']
1783 job_id (str): String representation of the Job ID given by bsub during submission If this argument
is given then
1784 the output of this function will be only information about this job. If this argument
is not given, then all jobs
1785 matching the other filters will be returned.
1786 username (str): By default bjobs (
and this function)
return information about only the current user
's jobs. By giving
1787 a username you can access the job information of a specific user's jobs. By giving ``username='all
'`` you will
1788 receive job information from all known user jobs matching the other filters.
1789 queue (str): Set this argument to receive job information about jobs that are
in the given queue
and no other.
1792 dict: JSON dictionary of the form:
1794 .. code-block:: python
1797 "NJOBS":<njobs returned by command>,
1800 <output field: value>, ...
1805 B2DEBUG(29, f"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1807 if not output_fields:
1808 output_fields = [
"id"]
1810 field_list_cmd =
"\""
1811 field_list_cmd +=
" ".join(output_fields)
1812 field_list_cmd +=
"\""
1813 cmd_list = [
"bjobs",
"-o", field_list_cmd]
1816 cmd_list.extend([
"-q", queue])
1819 cmd_list.extend([
"-u", username])
1821 cmd_list.append(
"-json")
1824 cmd_list.append(job_id)
1826 cmd =
" ".join(cmd_list)
1827 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1828 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True))
1829 output[
"NJOBS"] = output[
"JOBS"]
1830 output[
"JOBS"] = output[
"RECORDS"]
1831 del output[
"RECORDS"]
1832 del output[
"COMMAND"]
1836 def bqueues(cls, output_fields=None, queues=None):
1838 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1839 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1842 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1843 e.g. the default
is [
'queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1844 queues (list[str]): Set this argument to receive information about only the queues that are requested
and no others.
1845 By default you will receive information about all queues.
1848 dict: JSON dictionary of the form:
1850 .. code-block:: python
1853 "COMMAND":
"bqueues",
1857 "QUEUE_NAME":
"b2_beast",
1858 "STATUS":
"Open:Active",
1866 B2DEBUG(29, f"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1868 if not output_fields:
1869 output_fields = [
"queue_name",
"status",
"max",
"njobs",
"pend",
"run"]
1871 field_list_cmd =
"\""
1872 field_list_cmd +=
" ".join(output_fields)
1873 field_list_cmd +=
"\""
1874 cmd_list = [
"bqueues",
"-o", field_list_cmd]
1876 cmd_list.append(
"-json")
1879 cmd_list.extend(queues)
1881 cmd =
" ".join(cmd_list)
1882 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
1883 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
1884 return decode_json_string(output)
1887class HTCondor(Batch):
1889 Backend for submitting calibration processes to a HTCondor batch system.
1892 batch_submit_script = "submit.sub"
1894 submission_cmds = [
"condor_submit",
"-terse"]
1896 default_global_job_limit = 10000
1898 default_backend_args = {
1899 "universe":
"vanilla",
1901 "request_memory":
"4 GB",
1906 default_class_ads = [
"GlobalJobId",
"JobStatus",
"Owner"]
1908 def _make_submit_file(self, job, submit_file_path):
1910 Fill HTCondor submission file.
1914 files_to_transfer = [i.as_posix()
for i
in job.working_dir.iterdir()]
1916 job_backend_args = {**self.backend_args, **job.backend_args}
1918 with open(submit_file_path,
"w")
as submit_file:
1919 print(f
'executable = {self.get_submit_script_path(job)}', file=submit_file)
1920 print(f
'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1921 print(f
'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1922 print(f
'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1923 print(
'transfer_input_files = ',
','.join(files_to_transfer), file=submit_file)
1924 print(f
'universe = {job_backend_args["universe"]}', file=submit_file)
1925 print(f
'getenv = {job_backend_args["getenv"]}', file=submit_file)
1926 print(f
'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1927 print(
'should_transfer_files = Yes', file=submit_file)
1928 print(
'when_to_transfer_output = ON_EXIT', file=submit_file)
1930 for line
in job_backend_args[
"extra_lines"]:
1931 print(line, file=submit_file)
1932 print(
'queue', file=submit_file)
1934 def _add_batch_directives(self, job, batch_file):
1936 For HTCondor leave empty as the directives are already included
in the submit file.
1938 print('#!/bin/bash', file=batch_file)
1940 def _create_cmd(self, script_path):
1943 submission_cmd = self.submission_cmds[:]
1944 submission_cmd.append(script_path.as_posix())
1945 return submission_cmd
1947 def get_batch_submit_script_path(self, job):
1949 Construct the Path object of the .sub file that we will use to describe the job.
1951 return Path(job.working_dir, self.batch_submit_script)
1954 def _submit_to_batch(cls, cmd):
1956 Do the actual batch submission command and collect the output to find out the job id
for later monitoring.
1958 job_dir = Path(cmd[-1]).parent.as_posix()
1965 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, cwd=job_dir)
1967 except subprocess.CalledProcessError
as e:
1970 B2ERROR(f
"Error during condor_submit: {str(e)} occurred more than 3 times.")
1973 B2ERROR(f
"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
1975 return re.search(
r"(\d+\.\d+) - \d+\.\d+", sub_out).groups()[0]
1977 class HTCondorResult(Result):
1979 Simple class to help
monitor status of jobs submitted by HTCondor Backend.
1981 You
pass in a `Job` object
and job id
from a condor_submit command.
1982 When you call the `ready` method it runs condor_q
and,
if needed, ``condor_history``
1983 to see whether
or not the job has finished.
1987 backend_code_to_status = {0: "submitted",
1996 def __init__(self, job, job_id):
1998 Pass in the job object
and the job id to allow the result to do monitoring
and perform
1999 post processing of the job.
2001 super().__init__(job)
2003 self.job_id = job_id
2005 def update_status(self):
2007 Update the job's (or subjobs') status by calling condor_q.
2009 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
2011 condor_q_output = HTCondor.condor_q()
2012 if self.job.subjobs:
2013 for subjob
in self.job.subjobs.values():
2014 subjob.result._update_result_status(condor_q_output)
2016 self._update_result_status(condor_q_output)
2018 def _update_result_status(self, condor_q_output):
2020 In order to be slightly more efficient we pass in a previous call to condor_q to see
if it can work.
2021 If it
is there we update the job
's status. If not we are forced to start calling condor_q and, if needed, ``condor_history``, etc.
2024 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can reuse to find the
2025 status of this job if it was active when that command ran.
2027 B2DEBUG(29, f"Calling {self.job}.result._update_result_status()")
2029 for job_record
in condor_q_output[
"JOBS"]:
2030 job_id = job_record[
"GlobalJobId"].split(
"#")[1]
2031 if job_id == self.job_id:
2032 B2DEBUG(29, f
"Found {self.job_id} in condor_q_output.")
2033 jobs_info.append(job_record)
2038 exit_code = self.get_exit_code_from_file()
2039 except FileNotFoundError:
2040 waiting_time = datetime.now() - self.exit_code_file_initial_time
2041 if self.time_to_wait_for_exit_code_file > waiting_time:
2042 B2ERROR(f
"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2045 B2WARNING(f
"Exit code file for {self.job} missing, will wait longer.")
2048 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2050 jobs_info = [{
"JobStatus": 4,
"HoldReason":
None}]
2054 jobs_info = HTCondor.condor_q(job_id=self.job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2060 jobs_info = HTCondor.condor_history(job_id=self.job_id, class_ads=[
"JobStatus",
"HoldReason"])[
"JOBS"]
2062 hold_reason =
"No Reason Known"
2066 jobs_info = [{
"JobStatus": 6,
"HoldReason":
None}]
2068 job_info = jobs_info[0]
2069 backend_status = job_info[
"JobStatus"]
2071 if backend_status == 5:
2072 hold_reason = job_info.get(
"HoldReason",
None)
2073 B2WARNING(f
"{self.job} on hold because of {hold_reason}. Keep waiting.")
2076 new_job_status = self.backend_code_to_status[backend_status]
2077 except KeyError
as err:
2078 raise BackendError(f
"Unidentified backend status found for {self.job}: {backend_status}")
from err
2079 if new_job_status != self.job.status:
2080 self.job.status = new_job_status
2083 def _create_job_result(cls, job, job_id):
2086 B2INFO(f"Job ID of {job} recorded as: {job_id}")
2087 job.result = cls.HTCondorResult(job, job_id)
2090 def _create_parent_job_result(cls, parent):
2091 parent.result = cls.HTCondorResult(parent,
None)
2093 def can_submit(self, njobs=1):
2095 Checks the global number of jobs
in HTCondor right now (submitted
or running)
for this user.
2096 Returns
True if the number
is lower that the limit,
False if it
is higher.
2099 njobs (int): The number of jobs that we want to submit before checking again. Lets us check
if we
2100 are sufficiently below the limit
in order to (somewhat) safely submit. It
is slightly dangerous to
2101 assume that it
is safe to submit too many jobs since there might be other processes also submitting jobs.
2102 So njobs really shouldn
't be abused when you might be getting close to the limit i.e. keep it <=250
2103 and check again before submitting more.
2105 B2DEBUG(29, "Calling HTCondor().can_submit()")
2106 jobs_info = self.condor_q()
2107 total_jobs = jobs_info[
"NJOBS"]
2108 B2INFO(f
"Total jobs active in the HTCondor system is currently {total_jobs}")
2109 if (total_jobs + njobs) > self.global_job_limit:
2110 B2INFO(f
"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2113 B2INFO(
"There is enough space to submit more jobs.")
2117 def condor_q(cls, class_ads=None, job_id="", username=""):
2119 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2120 'job_id' and 'username'. Note that setting job_id negates username so it
is ignored.
2121 The result
is the JSON dictionary returned by output of the ``-json`` condor_q option.
2124 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2125 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2126 by the condor_q call.
2127 job_id (str): String representation of the Job ID given by condor_submit during submission.
2128 If this argument
is given then the output of this function will be only information about this job.
2129 If this argument
is not given, then all jobs matching the other filters will be returned.
2130 username (str): By default we
return information about only the current user
's jobs. By giving
2131 a username you can access the job information of a specific user's jobs. By giving ``username='all
'`` you will
2132 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2133 so it isn
't recommended.
2136 dict: JSON dictionary of the form:
2138 .. code-block:: python
2141 "NJOBS":<number of records returned by command>,
2144 <ClassAd: value>, ...
2149 B2DEBUG(29, f"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2151 class_ads = cls.default_class_ads
2153 field_list_cmd =
",".join(class_ads)
2154 cmd_list = [
"condor_q",
"-json",
"-attributes", field_list_cmd]
2157 cmd_list.append(job_id)
2160 username = os.environ[
"USER"]
2162 if username ==
"all":
2163 cmd_list.append(
"-allusers")
2165 cmd_list.append(username)
2167 cmd =
" ".join(cmd_list)
2168 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2171 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2172 except BaseException:
2176 records = decode_json_string(records)
2179 jobs_info = {
"JOBS": records}
2180 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2184 def condor_history(cls, class_ads=None, job_id="", username=""):
2186 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2187 ``job_id`` and ``username``. Note that setting job_id negates username so it
is ignored.
2188 The result
is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2191 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2192 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2193 by the condor_q call.
2194 job_id (str): String representation of the Job ID given by condor_submit during submission.
2195 If this argument
is given then the output of this function will be only information about this job.
2196 If this argument
is not given, then all jobs matching the other filters will be returned.
2197 username (str): By default we
return information about only the current user
's jobs. By giving
2198 a username you can access the job information of a specific user's jobs. By giving ``username='all
'`` you will
2199 receive job information from all known user jobs matching the other filters. This
is limited to 10000 records
2200 and isn
't recommended.
2203 dict: JSON dictionary of the form:
2205 .. code-block:: python
2208 "NJOBS":<number of records returned by command>,
2211 <ClassAd: value>, ...
2216 B2DEBUG(29, f"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2218 class_ads = cls.default_class_ads
2220 field_list_cmd =
",".join(class_ads)
2221 cmd_list = [
"condor_history",
"-json",
"-attributes", field_list_cmd]
2224 cmd_list.append(job_id)
2227 username = os.environ[
"USER"]
2229 if username !=
"all":
2230 cmd_list.append(username)
2232 cmd =
" ".join(cmd_list)
2233 B2DEBUG(29, f
"Calling subprocess with command = '{cmd}'")
2235 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=
True, shell=
True)
2236 except BaseException:
2240 records = decode_json_string(records)
2244 jobs_info = {
"JOBS": records}
2245 jobs_info[
"NJOBS"] = len(jobs_info[
"JOBS"])
2249class DIRAC(Backend):
2251 Backend for submitting calibration processes to the grid.
2255class BackendError(Exception):
2257 Base exception class for Backend classes.
2261class JobError(Exception):
2263 Base exception class for Job objects.
2267class SplitterError(Exception):
2269 Base exception class for SubjobSplitter objects.