Belle II Software development
SubJob Class Reference
Inheritance diagram for SubJob:
Job

Public Member Functions

 __init__ (self, job, subjob_id, input_files=None)
 
 output_dir (self)
 
 working_dir (self)
 
 name (self)
 
 status (self)
 
 status (self, status)
 
 subjobs (self)
 
 job_dict (self)
 
 __getattr__ (self, attribute)
 
 __repr__ (self)
 
 ready (self)
 
 update_status (self)
 
 create_subjob (self, i, input_files=None, args=None)
 
 dump_to_json (self, file_path)
 
 from_json (cls, file_path)
 
 dump_input_data (self)
 
 copy_input_sandbox_files_to_working_dir (self)
 
 check_input_data_files (self)
 
 full_command (self)
 
 append_current_basf2_setup_cmds (self)
 

Public Attributes

 id = subjob_id
 Id of Subjob.
 
 parent = job
 Job() instance of parent to this SubJob.
 
 splitter = None
 The SubjobSplitter used to create subjobs if necessary.
 
list input_sandbox_files = []
 Files to be copied directly into the working directory (pathlib.Path).
 
list output_patterns = []
 Files that we produce during the job and want to be returned.
 
list cmd = []
 Command and arguments as a list that will be run by the job on the backend.
 
list args = []
 The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own arguments)
 
list input_files = []
 Input files to job (str), a list of these is copied to the working directory.
 
list setup_cmds = []
 Bash commands to run before the main self.cmd (mainly used for batch system setup)
 
dict backend_args = {}
 Config dictionary for the backend to use when submitting the job.
 
 result = None
 The result object of this Job.
 

Static Public Attributes

dict statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
 Allowed Job status dictionary.
 
list exit_statuses = ["failed", "completed"]
 Job statuses that correspond to the Job being finished (successfully or not)
 

Protected Member Functions

 _get_overall_status_from_subjobs (self)
 

Protected Attributes

str _status = "init"
 The actual status of the overall Job.
 

Detailed Description

This mini-class simply holds basic information about which subjob you are
and a reference to the parent Job object to be able to access the main data there.
Rather than replicating all of the parent job's configuration again.

Definition at line 677 of file backends.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
job,
subjob_id,
input_files = None )
 

Definition at line 684 of file backends.py.

684 def __init__(self, job, subjob_id, input_files=None):
685 """
686 """
687
688 self.id = subjob_id
689
690 self.parent = job
691
692 if not input_files:
693 input_files = []
694 self.input_files = input_files
695
697 self.result = None
698
699 self._status = "init"
700
701 self.args = []
702

Member Function Documentation

◆ __getattr__()

__getattr__ ( self,
attribute )
Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
unless otherwise specified.

Definition at line 759 of file backends.py.

759 def __getattr__(self, attribute):
760 """
761 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
762 unless otherwise specified.
763 """
764 return getattr(self.parent, attribute)
765

◆ __repr__()

__repr__ ( self)
 

Definition at line 766 of file backends.py.

766 def __repr__(self):
767 """
768 """
769 return f"SubJob({self.name})"
770
771

◆ _get_overall_status_from_subjobs()

_get_overall_status_from_subjobs ( self)
protectedinherited
 

Definition at line 462 of file backends.py.

462 def _get_overall_status_from_subjobs(self):
463 """
464 """
465 subjob_statuses = [subjob.status for subjob in self.subjobs.values()]
466 status_level = min([self.statuses[status] for status in subjob_statuses])
467 for status, level in self.statuses.items():
468 if level == status_level:
469 return status
470

◆ append_current_basf2_setup_cmds()

append_current_basf2_setup_cmds ( self)
inherited
This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
It should detect if you are using a local release or CVMFS and append the correct commands
so that the job will have the same basf2 release environment. It should also detect
if a local release is not compiled with the ``opt`` option.

Note that this *doesn't mean that every environment variable is inherited* from the submitting
process environment.

Definition at line 639 of file backends.py.

639 def append_current_basf2_setup_cmds(self):
640 """
641 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
642 It should detect if you are using a local release or CVMFS and append the correct commands
643 so that the job will have the same basf2 release environment. It should also detect
644 if a local release is not compiled with the ``opt`` option.
645
646 Note that this *doesn't mean that every environment variable is inherited* from the submitting
647 process environment.
648 """
649 def append_environment_variable(cmds, envvar):
650 """
651 Append a command for setting an environment variable.
652 """
653 if envvar in os.environ:
654 cmds.append(f"""if [ -z "${{{envvar}}}" ]; then""")
655 cmds.append(f" export {envvar}={os.environ[envvar]}")
656 cmds.append("fi")
657
658 if "BELLE2_TOOLS" not in os.environ:
659 raise BackendError("No BELLE2_TOOLS found in environment")
660 # Export all the environment variables defined via _backend_job_envvars
661 for envvar in _backend_job_envvars:
662 append_environment_variable(self.setup_cmds, envvar)
663 if "BELLE2_RELEASE" in os.environ:
664 self.setup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
665 elif 'BELLE2_LOCAL_DIR' in os.environ:
666 self.setup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
667 self.setup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
668 self.setup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
669 self.setup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
670 self.setup_cmds.append("pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
671 self.setup_cmds.append("source $BACKEND_B2SETUP")
672 # b2code-option has to be executed only after the source of the tools.
673 self.setup_cmds.append("b2code-option $BACKEND_BELLE2_OPTION")
674 self.setup_cmds.append("popd > /dev/null")
675
676

◆ check_input_data_files()

check_input_data_files ( self)
inherited
Check the input files and make sure that there aren't any duplicates.
Also check if the files actually exist if possible.

Definition at line 597 of file backends.py.

597 def check_input_data_files(self):
598 """
599 Check the input files and make sure that there aren't any duplicates.
600 Also check if the files actually exist if possible.
601 """
602 existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
603 for file_path in self.input_files:
604 file_uri = parse_file_uri(file_path)
605 if file_uri.scheme == "file":
606 p = Path(file_uri.path)
607 if p.is_file():
608 if file_uri.geturl() not in existing_input_files:
609 existing_input_files.append(file_uri.geturl())
610 else:
611 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
612 else:
613 B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
614 else:
615 B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
616 if file_path not in existing_input_files:
617 existing_input_files.append(file_path)
618 else:
619 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
620 if self.input_files and not existing_input_files:
621 B2WARNING(f"No valid input file paths found for {self.name}, but some were requested.")
622
623 # Replace the Job's input files with the ones that exist + duplicates removed
624 self.input_files = existing_input_files
625

◆ copy_input_sandbox_files_to_working_dir()

copy_input_sandbox_files_to_working_dir ( self)
inherited
Get all of the requested files for the input sandbox and copy them to the working directory.
Files like the submit.sh or input_data.json are not part of this process.

Definition at line 586 of file backends.py.

586 def copy_input_sandbox_files_to_working_dir(self):
587 """
588 Get all of the requested files for the input sandbox and copy them to the working directory.
589 Files like the submit.sh or input_data.json are not part of this process.
590 """
591 for file_path in self.input_sandbox_files:
592 if file_path.is_dir():
593 shutil.copytree(file_path, Path(self.working_dir, file_path.name))
594 else:
595 shutil.copy(file_path, self.working_dir)
596

◆ create_subjob()

create_subjob ( self,
i,
input_files = None,
args = None )
inherited
Creates a subjob Job object that references that parent Job.
Returns the SubJob object at the end.

Definition at line 434 of file backends.py.

434 def create_subjob(self, i, input_files=None, args=None):
435 """
436 Creates a subjob Job object that references that parent Job.
437 Returns the SubJob object at the end.
438 """
439 if i not in self.subjobs:
440 B2INFO(f"Creating {self}.Subjob({i})")
441 subjob = SubJob(self, i, input_files)
442 if args:
443 subjob.args = args
444 self.subjobs[i] = subjob
445 return subjob
446 else:
447 B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
448

◆ dump_input_data()

dump_input_data ( self)
inherited
Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
string URI objects.

Definition at line 578 of file backends.py.

578 def dump_input_data(self):
579 """
580 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
581 string URI objects.
582 """
583 with open(Path(self.working_dir, _input_data_file_path), mode="w") as input_data_file:
584 json.dump(self.input_files, input_data_file, indent=2)
585

◆ dump_to_json()

dump_to_json ( self,
file_path )
inherited
Dumps the Job object configuration to a JSON file so that it can be read in again later.

Parameters:
  file_path(`basf2.Path`): The filepath we'll dump to

Definition at line 537 of file backends.py.

537 def dump_to_json(self, file_path):
538 """
539 Dumps the Job object configuration to a JSON file so that it can be read in again later.
540
541 Parameters:
542 file_path(`basf2.Path`): The filepath we'll dump to
543 """
544 # \cond false positive doxygen warning about job_dict
545 with open(file_path, mode="w") as job_file:
546 json.dump(self.job_dict, job_file, indent=2)
547 # \endcond
548

◆ from_json()

from_json ( cls,
file_path )
inherited
 

Definition at line 550 of file backends.py.

550 def from_json(cls, file_path):
551 """
552 """
553 with open(file_path) as job_file:
554 job_dict = json.load(job_file)
555 return cls(job_dict["name"], job_dict=job_dict)
556

◆ full_command()

full_command ( self)
inherited
Returns:
    str: The full command that this job will run including any arguments.

Definition at line 627 of file backends.py.

627 def full_command(self):
628 """
629 Returns:
630 str: The full command that this job will run including any arguments.
631 """
632 all_components = self.cmd[:]
633 all_components.extend(self.args)
634 # We do a convert to string just in case arguments were generated as different types
635 full_command = " ".join(map(str, all_components))
636 B2DEBUG(29, f"Full command of {self} is '{full_command}'")
637 return full_command
638
STL class.

◆ job_dict()

job_dict ( self)
Returns:
    dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
    `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
    we only output the input files and arguments that are specific to this subjob and no other details.

Reimplemented from Job.

Definition at line 746 of file backends.py.

746 def job_dict(self):
747 """
748 Returns:
749 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
750 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
751 we only output the input files and arguments that are specific to this subjob and no other details.
752 """
753 job_dict = {}
754 job_dict["id"] = self.id
755 job_dict["input_files"] = self.input_files
756 job_dict["args"] = self.args
757 return job_dict
758

◆ name()

name ( self)
Getter for name of SubJob. Accesses the parent Job name to infer this.

Reimplemented from Job.

Definition at line 715 of file backends.py.

715 def name(self):
716 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
717 return "_".join((self.parent.name, str(self.id)))
718

◆ output_dir()

output_dir ( self)
Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this.

Reimplemented from Job.

Definition at line 704 of file backends.py.

704 def output_dir(self):
705 """
706 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
707 return Path(self.parent.output_dir, str(self.id))
708

◆ ready()

ready ( self)
inherited
Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.

Definition at line 411 of file backends.py.

411 def ready(self):
412 """
413 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
414 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
415 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
416 """
417 if not self.result:
418 B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
419 return False
420 else:
421 return self.result.ready()
422

◆ status() [1/2]

status ( self)
Returns the status of this SubJob.

Reimplemented from Job.

Definition at line 720 of file backends.py.

720 def status(self):
721 """
722 Returns the status of this SubJob.
723 """
724 return self._status
725

◆ status() [2/2]

status ( self,
status )
Sets the status of this Job.

Reimplemented from Job.

Definition at line 727 of file backends.py.

727 def status(self, status):
728 """
729 Sets the status of this Job.
730 """
731 # Print an error only if the job failed.
732 if status == "failed":
733 B2ERROR(f"Setting {self.name} status to failed")
734 else:
735 B2INFO(f"Setting {self.name} status to {status}")
736 self._status = status
737

◆ subjobs()

subjobs ( self)
A subjob cannot have subjobs. Always return empty list.

Reimplemented from Job.

Definition at line 739 of file backends.py.

739 def subjobs(self):
740 """
741 A subjob cannot have subjobs. Always return empty list.
742 """
743 return []
744

◆ update_status()

update_status ( self)
inherited
Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
in the best way for the type of result object/backend.

Definition at line 423 of file backends.py.

423 def update_status(self):
424 """
425 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
426 in the best way for the type of result object/backend.
427 """
428 if not self.result:
429 B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
430 else:
431 self.result.update_status()
432 return self.status
433

◆ working_dir()

working_dir ( self)
Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this.

Reimplemented from Job.

Definition at line 710 of file backends.py.

710 def working_dir(self):
711 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
712 return Path(self.parent.working_dir, str(self.id))
713

Member Data Documentation

◆ _status

str _status = "init"
protectedinherited

The actual status of the overall Job.

The property handles querying for the subjob status to set this

Definition at line 403 of file backends.py.

◆ args

args = []
inherited

The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own arguments)

Definition at line 375 of file backends.py.

◆ backend_args

dict backend_args = {}
inherited

Config dictionary for the backend to use when submitting the job.

Saves us from having multiple attributes that may or may not be used.

Definition at line 382 of file backends.py.

◆ cmd

list cmd = []
inherited

Command and arguments as a list that will be run by the job on the backend.

Definition at line 373 of file backends.py.

◆ exit_statuses

list exit_statuses = ["failed", "completed"]
staticinherited

Job statuses that correspond to the Job being finished (successfully or not)

Definition at line 352 of file backends.py.

◆ id

id = subjob_id

Id of Subjob.

Definition at line 688 of file backends.py.

◆ input_files

input_files = []
inherited

Input files to job (str), a list of these is copied to the working directory.

Definition at line 377 of file backends.py.

◆ input_sandbox_files

list input_sandbox_files = []
inherited

Files to be copied directly into the working directory (pathlib.Path).

Not the input root files, those should be in Job.input_files.

Definition at line 365 of file backends.py.

◆ output_patterns

list output_patterns = []
inherited

Files that we produce during the job and want to be returned.

Can use wildcard (*)

Definition at line 371 of file backends.py.

◆ parent

parent = job

Job() instance of parent to this SubJob.

Definition at line 690 of file backends.py.

◆ result

result = None
inherited

The result object of this Job.

Only filled once the job is submitted to a backend since the backend creates a special result class depending on its type.

Definition at line 401 of file backends.py.

◆ setup_cmds

setup_cmds = []
inherited

Bash commands to run before the main self.cmd (mainly used for batch system setup)

Definition at line 379 of file backends.py.

◆ splitter

splitter = None
inherited

The SubjobSplitter used to create subjobs if necessary.

Definition at line 360 of file backends.py.

◆ statuses

dict statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
staticinherited

Allowed Job status dictionary.

The key is the status name and the value is its level. The lowest level out of all subjobs is the one that is the overall status of the overall job.

Definition at line 349 of file backends.py.


The documentation for this class was generated from the following file: