Belle II Software development
Local Class Reference
Inheritance diagram for Local:
Backend

Classes

class  LocalResult
 

Public Member Functions

 __init__ (self, *, backend_args=None, max_processes=1)
 
 join (self)
 
 max_processes (self)
 
 max_processes (self, value)
 
 submit (self, job)
 
 get_submit_script_path (self, job)
 

Static Public Member Functions

 run_job (name, working_dir, output_dir, script)
 

Public Attributes

 pool = None
 The actual Pool object of this instance of the Backend.
 
 max_processes = max_processes
 The size of the multiprocessing process pool.
 
dict backend_args = {**self.default_backend_args, **backend_args}
 The backend args that will be applied to jobs unless the job specifies them itself.
 

Static Public Attributes

str submit_script = "submit.sh"
 Default submission script name.
 
str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
 Default exit code file name.
 
dict default_backend_args = {}
 Default backend_args.
 

Protected Member Functions

 _ (self, job)
 
 _ (self, job)
 
 _ (self, jobs)
 
 _create_parent_job_result (cls, parent)
 
 _add_wrapper_script_setup (self, job, batch_file)
 
 _add_wrapper_script_teardown (self, job, batch_file)
 

Static Protected Member Functions

 _add_setup (job, batch_file)
 

Protected Attributes

 _max_processes = value
 Internal attribute of max_processes.
 

Detailed Description

Backend for local processes i.e. on the same machine but in a subprocess.

Note that you should call the self.join() method to close the pool and wait for any
running processes to finish before exiting the process. Once you've called join you will have to set up a new
instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
somewhere, then the main python process might end before your pool is done.

Keyword Arguments:
    max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
        It's the maximum simultaneous subjobs.
        Try not to specify a large number or a number larger than the number of cores.
        It won't crash the program but it will slow down and negatively impact performance.

Definition at line 923 of file backends.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
* ,
backend_args = None,
max_processes = 1 )
 

Definition at line 939 of file backends.py.

939 def __init__(self, *, backend_args=None, max_processes=1):
940 """
941 """
942 super().__init__(backend_args=backend_args)
943
944 self.pool = None
945
946 self.max_processes = max_processes
947

Member Function Documentation

◆ _() [1/3]

_ ( self,
job )
protected
Submission of a `SubJob` for the Local backend

Definition at line 1023 of file backends.py.

1023 def _(self, job):
1024 """
1025 Submission of a `SubJob` for the Local backend
1026 """
1027 # Make sure the output directory of the job is created
1028 job.output_dir.mkdir(parents=True, exist_ok=True)
1029 # Make sure the working directory of the job is created
1030 job.working_dir.mkdir(parents=True, exist_ok=True)
1031 job.copy_input_sandbox_files_to_working_dir()
1032 job.dump_input_data()
1033 # Get the path to the bash script we run
1034 script_path = self.get_submit_script_path(job)
1035 with open(script_path, mode="w") as batch_file:
1036 print("#!/bin/bash", file=batch_file)
1037 self._add_wrapper_script_setup(job, batch_file)
1038 self._add_setup(job, batch_file)
1039 print(job.full_command, file=batch_file)
1040 self._add_wrapper_script_teardown(job, batch_file)
1041 B2INFO(f"Submitting {job}")
1042 job.result = Local.LocalResult(job,
1043 self.pool.apply_async(self.run_job,
1044 (job.name,
1045 job.working_dir,
1046 job.output_dir,
1047 script_path)
1048 )
1049 )
1050 job.status = "submitted"
1051 B2INFO(f"{job} submitted")
1052

◆ _() [2/3]

_ ( self,
job )
protected
Submission of a `Job` for the Local backend

Definition at line 1054 of file backends.py.

1054 def _(self, job):
1055 """
1056 Submission of a `Job` for the Local backend
1057 """
1058 # Make sure the output directory of the job is created
1059 job.output_dir.mkdir(parents=True, exist_ok=True)
1060 # Make sure the working directory of the job is created
1061 job.working_dir.mkdir(parents=True, exist_ok=True)
1062 # Check if we have any valid input files
1063 job.check_input_data_files()
1064
1065 if not job.splitter:
1066 # Get all of the requested files for the input sandbox and copy them to the working directory
1067 job.copy_input_sandbox_files_to_working_dir()
1068 job.dump_input_data()
1069 # Get the path to the bash script we run
1070 script_path = self.get_submit_script_path(job)
1071 with open(script_path, mode="w") as batch_file:
1072 print("#!/bin/bash", file=batch_file)
1073 self._add_wrapper_script_setup(job, batch_file)
1074 self._add_setup(job, batch_file)
1075 print(job.full_command, file=batch_file)
1076 self._add_wrapper_script_teardown(job, batch_file)
1077 B2INFO(f"Submitting {job}")
1078 job.result = Local.LocalResult(job,
1079 self.pool.apply_async(self.run_job,
1080 (job.name,
1081 job.working_dir,
1082 job.output_dir,
1083 script_path)
1084 )
1085 )
1086 B2INFO(f"{job} submitted")
1087 else:
1088 # Create subjobs according to the splitter's logic
1089 job.splitter.create_subjobs(job)
1090 # Submit the subjobs
1091 self.submit(list(job.subjobs.values()))
1092 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1093 self._create_parent_job_result(job)
1094

◆ _() [3/3]

_ ( self,
jobs )
protected
Submit method of Local() that takes a list of jobs instead of just one and submits each one.

Definition at line 1096 of file backends.py.

1096 def _(self, jobs):
1097 """
1098 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1099 """
1100 # Submit the jobs
1101 for job in jobs:
1102 self.submit(job)
1103 B2INFO("All requested jobs submitted.")
1104

◆ _add_setup()

_add_setup ( job,
batch_file )
staticprotectedinherited
Adds setup lines to the shell script file.

Definition at line 806 of file backends.py.

806 def _add_setup(job, batch_file):
807 """
808 Adds setup lines to the shell script file.
809 """
810 for line in job.setup_cmds:
811 print(line, file=batch_file)
812

◆ _add_wrapper_script_setup()

_add_wrapper_script_setup ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
`trap` statements for Ctrl-C situations.

Definition at line 813 of file backends.py.

813 def _add_wrapper_script_setup(self, job, batch_file):
814 """
815 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
816 `trap` statements for Ctrl-C situations.
817 """
818 start_wrapper = f"""# ---
819# trap ctrl-c and call ctrl_c()
820trap '(ctrl_c 130)' SIGINT
821trap '(ctrl_c 143)' SIGTERM
822
823function write_exit_code() {{
824 echo "Writing $1 to exit status file"
825 echo "$1" > {self.exit_code_file}
826 exit $1
827}}
828
829function ctrl_c() {{
830 trap '' SIGINT SIGTERM
831 echo "** Trapped Ctrl-C **"
832 echo "$1" > {self.exit_code_file}
833 exit $1
834}}
835# ---"""
836 print(start_wrapper, file=batch_file)
837

◆ _add_wrapper_script_teardown()

_add_wrapper_script_teardown ( self,
job,
batch_file )
protectedinherited
Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
an exit code of the job cmd being written out to a file. Which means that we can know if the command was
successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
file.

Definition at line 838 of file backends.py.

838 def _add_wrapper_script_teardown(self, job, batch_file):
839 """
840 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
841 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
842 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
843 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
844 file.
845 """
846 end_wrapper = """# ---
847write_exit_code $?"""
848 print(end_wrapper, file=batch_file)
849

◆ _create_parent_job_result()

_create_parent_job_result ( cls,
parent )
protected
We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
statuses and allows the use of ready().

Reimplemented from Backend.

Definition at line 1132 of file backends.py.

1132 def _create_parent_job_result(cls, parent):
1133 parent.result = cls.LocalResult(parent, None)
1134
1135

◆ get_submit_script_path()

get_submit_script_path ( self,
job )
inherited
Construct the Path object of the bash script file that we will submit. It will contain
the actual job command, wrapper commands, setup commands, and any batch directives

Definition at line 859 of file backends.py.

859 def get_submit_script_path(self, job):
860 """
861 Construct the Path object of the bash script file that we will submit. It will contain
862 the actual job command, wrapper commands, setup commands, and any batch directives
863 """
864 return Path(job.working_dir, self.submit_script)
865
866

◆ join()

join ( self)
Closes and joins the Pool, letting you wait for all results currently
still processing.

Definition at line 984 of file backends.py.

984 def join(self):
985 """
986 Closes and joins the Pool, letting you wait for all results currently
987 still processing.
988 """
989 B2INFO("Joining Process Pool, waiting for results to finish...")
990 self.pool.close()
991 self.pool.join()
992 B2INFO("Process Pool joined.")
993

◆ max_processes() [1/2]

max_processes ( self)
Getter for max_processes

Definition at line 995 of file backends.py.

995 def max_processes(self):
996 """
997 Getter for max_processes
998 """
999 return self._max_processes
1000

◆ max_processes() [2/2]

max_processes ( self,
value )
Setter for max_processes, we also check for a previous Pool(), wait for it to join
and then create a new one with the new value of max_processes

Definition at line 1002 of file backends.py.

1002 def max_processes(self, value):
1003 """
1004 Setter for max_processes, we also check for a previous Pool(), wait for it to join
1005 and then create a new one with the new value of max_processes
1006 """
1007
1008 self._max_processes = value
1009 if self.pool:
1010 B2INFO("New max_processes requested. But a pool already exists.")
1011 self.join()
1012 B2INFO(f"Starting up new Pool with {self.max_processes} processes")
1013 self.pool = mp.Pool(processes=self.max_processes)
1014

◆ run_job()

run_job ( name,
working_dir,
output_dir,
script )
static
The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
shell command in a subprocess and captures the stdout and stderr of the subprocess to files.

Definition at line 1106 of file backends.py.

1106 def run_job(name, working_dir, output_dir, script):
1107 """
1108 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1109 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1110 """
1111 B2INFO(f"Starting Sub-process: {name}")
1112 from subprocess import Popen
1113 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1114 stderr_file_path = Path(working_dir, _STDERR_FILE)
1115 # Create unix command to redirect stdour and stderr
1116 B2INFO(f"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1117 with open(stdout_file_path, mode="w", buffering=1) as f_out, \
1118 open(stderr_file_path, mode="w", buffering=1) as f_err:
1119 with Popen(["/bin/bash", script.as_posix()],
1120 stdout=f_out,
1121 stderr=f_err,
1122 bufsize=1,
1123 universal_newlines=True,
1124 cwd=working_dir,
1125 env={}) as p:
1126 # We block here and wait so that the return code will be set.
1127 p.wait()
1128 B2INFO(f"Subprocess {name} finished.")
1129 return p.returncode
1130

◆ submit()

submit ( self,
job )
 

Reimplemented from Backend.

Definition at line 1016 of file backends.py.

1016 def submit(self, job):
1017 """
1018 """
1019 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1020 "Did you submit a (Sub)Job?")
1021

Member Data Documentation

◆ _max_processes

_max_processes = value
protected

Internal attribute of max_processes.

Definition at line 1008 of file backends.py.

◆ backend_args

dict backend_args = {**self.default_backend_args, **backend_args}
inherited

The backend args that will be applied to jobs unless the job specifies them itself.

Definition at line 796 of file backends.py.

◆ default_backend_args

dict default_backend_args = {}
staticinherited

Default backend_args.

Definition at line 788 of file backends.py.

◆ exit_code_file

str exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
staticinherited

Default exit code file name.

Definition at line 786 of file backends.py.

◆ max_processes

max_processes = max_processes

The size of the multiprocessing process pool.

Definition at line 946 of file backends.py.

◆ pool

pool = None

The actual Pool object of this instance of the Backend.

Definition at line 944 of file backends.py.

◆ submit_script

submit_script = "submit.sh"
staticinherited

Default submission script name.

Definition at line 784 of file backends.py.


The documentation for this class was generated from the following file: