Belle II Software development
Local Class Reference
Inheritance diagram for Local:
Backend

Classes

class  LocalResult
 

Public Member Functions

 __init__ (self, *, backend_args=None, max_processes=1)
 
 join (self)
 
 max_processes (self)
 
 max_processes (self, value)
 
 submit (self, job)
 
 get_submit_script_path (self, job)
 

Static Public Member Functions

 run_job (name, working_dir, output_dir, script)
 

Public Attributes

 pool = None
 The actual Pool object of this instance of the Backend.
 
 max_processes = max_processes
 The size of the multiprocessing process pool.
 
dict backend_args = {**self.default_backend_args, **backend_args}
 The backend args that will be applied to jobs unless the job specifies them itself.
 

Static Public Attributes

str submit_script = "submit.sh"
 Default submission script name.
 
str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
 Default exit code file name.
 
dict default_backend_args = {}
 Default backend_args.
 

Protected Member Functions

 _ (self, job)
 
 _ (self, job)
 
 _ (self, jobs)
 
 _create_parent_job_result (cls, parent)
 
 _add_wrapper_script_setup (self, job, batch_file)
 
 _add_wrapper_script_teardown (self, job, batch_file)
 

Static Protected Member Functions

 _add_setup (job, batch_file)
 

Protected Attributes

 _max_processes = value
 Internal attribute of max_processes.
 

Detailed Description

Backend for local processes i.e. on the same machine but in a subprocess.

Note that you should call the self.join() method to close the pool and wait for any
running processes to finish before exiting the process. Once you've called join you will have to set up a new
instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
somewhere, then the main python process might end before your pool is done.

Keyword Arguments:
    max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
        It's the maximum simultaneous subjobs.
        Try not to specify a large number or a number larger than the number of cores.
        It won't crash the program but it will slow down and negatively impact performance.

Definition at line 924 of file backends.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
* ,
backend_args = None,
max_processes = 1 )
 

Definition at line 940 of file backends.py.

940 def __init__(self, *, backend_args=None, max_processes=1):
941 """
942 """
943 super().__init__(backend_args=backend_args)
944
945 self.pool = None
946
947 self.max_processes = max_processes
948

Member Function Documentation

◆ _() [1/3]

_ ( self,
job )
protected
Submission of a `SubJob` for the Local backend

Definition at line 1024 of file backends.py.

1024 def _(self, job):
1025 """
1026 Submission of a `SubJob` for the Local backend
1027 """
1028 # Make sure the output directory of the job is created
1029 job.output_dir.mkdir(parents=True, exist_ok=True)
1030 # Make sure the working directory of the job is created
1031 job.working_dir.mkdir(parents=True, exist_ok=True)
1032 job.copy_input_sandbox_files_to_working_dir()
1033 job.dump_input_data()
1034 # Get the path to the bash script we run
1035 script_path = self.get_submit_script_path(job)
1036 with open(script_path, mode="w") as batch_file:
1037 print("#!/bin/bash", file=batch_file)
1038 self._add_wrapper_script_setup(job, batch_file)
1039 self._add_setup(job, batch_file)
1040 print(job.full_command, file=batch_file)
1041 self._add_wrapper_script_teardown(job, batch_file)
1042 B2INFO(f"Submitting {job}")
1043 job.result = Local.LocalResult(job,
1044 self.pool.apply_async(self.run_job,
1045 (job.name,
1046 job.working_dir,
1047 job.output_dir,
1048 script_path)
1049 )
1050 )
1051 job.status = "submitted"
1052 B2INFO(f"{job} submitted")
1053

◆ _() [2/3]

_ ( self,
job )
protected
Submission of a `Job` for the Local backend

Definition at line 1055 of file backends.py.

1055 def _(self, job):
1056 """
1057 Submission of a `Job` for the Local backend
1058 """
1059 # Make sure the output directory of the job is created
1060 job.output_dir.mkdir(parents=True, exist_ok=True)
1061 # Make sure the working directory of the job is created
1062 job.working_dir.mkdir(parents=True, exist_ok=True)
1063 # Check if we have any valid input files
1064 job.check_input_data_files()
1065
1066 if not job.splitter:
1067 # Get all of the requested files for the input sandbox and copy them to the working directory
1068 job.copy_input_sandbox_files_to_working_dir()
1069 job.dump_input_data()
1070 # Get the path to the bash script we run
1071 script_path = self.get_submit_script_path(job)
1072 with open(script_path, mode="w") as batch_file:
1073 print("#!/bin/bash", file=batch_file)
1074 self._add_wrapper_script_setup(job, batch_file)
1075 self._add_setup(job, batch_file)
1076 print(job.full_command, file=batch_file)
1077 self._add_wrapper_script_teardown(job, batch_file)
1078 B2INFO(f"Submitting {job}")
1079 job.result = Local.LocalResult(job,
1080 self.pool.apply_async(self.run_job,
1081 (job.name,
1082 job.working_dir,
1083 job.output_dir,
1084 script_path)
1085 )
1086 )
1087 B2INFO(f"{job} submitted")
1088 else:
1089 # Create subjobs according to the splitter's logic
1090 job.splitter.create_subjobs(job)
1091 # Submit the subjobs
1092 self.submit(list(job.subjobs.values()))
1093 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1094 self._create_parent_job_result(job)
1095

◆ _() [3/3]

_ ( self,
jobs )
protected
Submit method of Local() that takes a list of jobs instead of just one and submits each one.

Definition at line 1097 of file backends.py.

1097 def _(self, jobs):
1098 """
1099 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1100 """
1101 # Submit the jobs
1102 for job in jobs:
1103 self.submit(job)
1104 B2INFO("All requested jobs submitted.")
1105

◆ _add_setup()

_add_setup ( job,
batch_file )
staticprotectedinherited
Adds setup lines to the shell script file.

Definition at line 807 of file backends.py.

807 def _add_setup(job, batch_file):
808 """
809 Adds setup lines to the shell script file.
810 """
811 for line in job.setup_cmds:
812 print(line, file=batch_file)
813

◆ _add_wrapper_script_setup()

_add_wrapper_script_setup ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
`trap` statements for Ctrl-C situations.

Definition at line 814 of file backends.py.

814 def _add_wrapper_script_setup(self, job, batch_file):
815 """
816 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
817 `trap` statements for Ctrl-C situations.
818 """
819 start_wrapper = f"""# ---
820# trap ctrl-c and call ctrl_c()
821trap '(ctrl_c 130)' SIGINT
822trap '(ctrl_c 143)' SIGTERM
823
824function write_exit_code() {{
825 echo "Writing $1 to exit status file"
826 echo "$1" > {self.exit_code_file}
827 exit $1
828}}
829
830function ctrl_c() {{
831 trap '' SIGINT SIGTERM
832 echo "** Trapped Ctrl-C **"
833 echo "$1" > {self.exit_code_file}
834 exit $1
835}}
836# ---"""
837 print(start_wrapper, file=batch_file)
838

◆ _add_wrapper_script_teardown()

_add_wrapper_script_teardown ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
an exit code of the job cmd being written out to a file. Which means that we can know if the command was
successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
file.

Definition at line 839 of file backends.py.

839 def _add_wrapper_script_teardown(self, job, batch_file):
840 """
841 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
842 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
843 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
844 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
845 file.
846 """
847 end_wrapper = """# ---
848write_exit_code $?"""
849 print(end_wrapper, file=batch_file)
850

◆ _create_parent_job_result()

_create_parent_job_result ( cls,
parent )
protected
We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
statuses and allows the use of ready().

Reimplemented from Backend.

Definition at line 1133 of file backends.py.

1133 def _create_parent_job_result(cls, parent):
1134 parent.result = cls.LocalResult(parent, None)
1135
1136

◆ get_submit_script_path()

get_submit_script_path ( self,
job )
inherited
Construct the Path object of the bash script file that we will submit. It will contain
the actual job command, wrapper commands, setup commands, and any batch directives

Definition at line 860 of file backends.py.

860 def get_submit_script_path(self, job):
861 """
862 Construct the Path object of the bash script file that we will submit. It will contain
863 the actual job command, wrapper commands, setup commands, and any batch directives
864 """
865 return Path(job.working_dir, self.submit_script)
866
867

◆ join()

join ( self)
Closes and joins the Pool, letting you wait for all results currently
still processing.

Definition at line 985 of file backends.py.

985 def join(self):
986 """
987 Closes and joins the Pool, letting you wait for all results currently
988 still processing.
989 """
990 B2INFO("Joining Process Pool, waiting for results to finish...")
991 self.pool.close()
992 self.pool.join()
993 B2INFO("Process Pool joined.")
994

◆ max_processes() [1/2]

max_processes ( self)
Getter for max_processes

Definition at line 996 of file backends.py.

996 def max_processes(self):
997 """
998 Getter for max_processes
999 """
1000 return self._max_processes
1001

◆ max_processes() [2/2]

max_processes ( self,
value )
Setter for max_processes, we also check for a previous Pool(), wait for it to join
and then create a new one with the new value of max_processes

Definition at line 1003 of file backends.py.

1003 def max_processes(self, value):
1004 """
1005 Setter for max_processes, we also check for a previous Pool(), wait for it to join
1006 and then create a new one with the new value of max_processes
1007 """
1008
1009 self._max_processes = value
1010 if self.pool:
1011 B2INFO("New max_processes requested. But a pool already exists.")
1012 self.join()
1013 B2INFO(f"Starting up new Pool with {self.max_processes} processes")
1014 self.pool = mp.Pool(processes=self.max_processes)
1015

◆ run_job()

run_job ( name,
working_dir,
output_dir,
script )
static
The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
shell command in a subprocess and captures the stdout and stderr of the subprocess to files.

Definition at line 1107 of file backends.py.

1107 def run_job(name, working_dir, output_dir, script):
1108 """
1109 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1110 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1111 """
1112 B2INFO(f"Starting Sub-process: {name}")
1113 from subprocess import Popen
1114 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1115 stderr_file_path = Path(working_dir, _STDERR_FILE)
1116 # Create unix command to redirect stdour and stderr
1117 B2INFO(f"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1118 with open(stdout_file_path, mode="w", buffering=1) as f_out, \
1119 open(stderr_file_path, mode="w", buffering=1) as f_err:
1120 with Popen(["/bin/bash", script.as_posix()],
1121 stdout=f_out,
1122 stderr=f_err,
1123 bufsize=1,
1124 universal_newlines=True,
1125 cwd=working_dir,
1126 env={}) as p:
1127 # We block here and wait so that the return code will be set.
1128 p.wait()
1129 B2INFO(f"Subprocess {name} finished.")
1130 return p.returncode
1131

◆ submit()

submit ( self,
job )
 

Reimplemented from Backend.

Definition at line 1017 of file backends.py.

1017 def submit(self, job):
1018 """
1019 """
1020 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1021 "Did you submit a (Sub)Job?")
1022

Member Data Documentation

◆ _max_processes

_max_processes = value
protected

Internal attribute of max_processes.

Definition at line 1009 of file backends.py.

◆ backend_args

dict backend_args = {**self.default_backend_args, **backend_args}
inherited

The backend args that will be applied to jobs unless the job specifies them itself.

Definition at line 797 of file backends.py.

◆ default_backend_args

dict default_backend_args = {}
staticinherited

Default backend_args.

Definition at line 789 of file backends.py.

◆ exit_code_file

str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
staticinherited

Default exit code file name.

Definition at line 787 of file backends.py.

◆ max_processes

max_processes = max_processes

The size of the multiprocessing process pool.

Definition at line 947 of file backends.py.

◆ pool

pool = None

The actual Pool object of this instance of the Backend.

Definition at line 945 of file backends.py.

◆ submit_script

submit_script = "submit.sh"
staticinherited

Default submission script name.

Definition at line 785 of file backends.py.


The documentation for this class was generated from the following file: