9 from basf2
import B2INFO
13 from caf.backends
import Batch, HTCondor, LSF, Local, PBS
16 def command_local(args, backend_args=None):
18 Runs the jobs using the Local backend i.e. local multiprocessing.
21 args : Command line arguments that may contain backend args options. These take priority.
22 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
23 will take priority over these values.
25 B2INFO(
"Requested use of Local backend")
26 backend = Local(max_processes=args.max_processes, backend_args=backend_args)
30 def command_lsf(args, backend_args=None):
32 Runs the jobs using the LSF backend
35 args : Command line arguments that may contain backend args options. These take priority.
36 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
37 will take priority over these values.
39 B2INFO(
"Requested use of LSF backend")
40 command_line_backend_args = {
"queue": args.queue}
42 command_line_backend_args = {key: value
for key, value
in command_line_backend_args.items()
if value
is not None}
43 if backend_args
is None:
45 backend_args = {**backend_args, **command_line_backend_args}
46 backend = LSF(backend_args=backend_args)
47 backend.global_job_limit = args.global_job_limit
48 backend.sleep_between_submission_checks = args.submission_check_heartbeat
52 def command_pbs(args, backend_args=None):
54 Runs the jobs using the PBS backend
57 args : Command line arguments that may contain backend args options. These take priority.
58 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
59 will take priority over these values.
61 B2INFO(
"Requested use of PBS backend")
62 command_line_backend_args = {
"queue": args.queue}
63 command_line_backend_args = {key: value
for key, value
in command_line_backend_args.items()
if value
is not None}
64 if backend_args
is None:
66 backend_args = {**backend_args, **command_line_backend_args}
67 backend = PBS(backend_args=backend_args)
68 backend.global_job_limit = args.global_job_limit
69 backend.sleep_between_submission_checks = args.submission_check_heartbeat
73 def command_condor(args, backend_args=None):
75 Runs the jobs using the HTCondor backend
78 args : Command line arguments that may contain backend args options. These take priority.
79 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
80 will take priority over these values.
82 B2INFO(
"Requested use of HTCondor backend")
83 command_line_backend_args = {
84 "universe": args.universe,
85 "getenv": args.getenv,
86 "path_prefix": args.path_prefix
88 command_line_backend_args = {key: value
for key, value
in command_line_backend_args.items()
if value
is not None}
89 if backend_args
is None:
91 backend_args = {**backend_args, **command_line_backend_args}
92 backend = HTCondor(backend_args=backend_args)
93 backend.global_job_limit = args.global_job_limit
94 backend.sleep_between_submission_checks = args.submission_check_heartbeat
98 def add_basf2_options(parser, default_log_level="INFO"):
99 choices = list(basf2.LogLevel.names.keys())
100 choices.remove(
"default")
101 parser.add_argument(
"--log-level", dest=
"log_level", choices=choices,
102 metavar=
"", default=default_log_level,
103 help=(f
"Set the basf2 LogLevel. (default: {default_log_level}"))
104 parser.add_argument(
"--debug-level", dest=
"debug_level",
105 type=int, metavar=
"",
106 help=
"Set the DEBUG level value, overrides log-level to be DEBUG.")
109 def add_monitor_options(parser, default_heartbeat=10):
111 Adds parser options for the monitoring of CAF jobs.
113 parser.add_argument(
"--heartbeat", dest=
"heartbeat",
114 type=int, metavar=
"", default=default_heartbeat,
115 help=(
"Sets the sleep interval (seconds) between attempts to check the readiness of jobs."
116 f
" (default: {default_heartbeat})")
120 def add_job_options(parser):
122 Adds some overall options to a parser, useful for CAF type jobs that can be used with any backend.
124 group = parser.add_mutually_exclusive_group()
125 group.add_argument(
"--max-files-per-subjob", dest=
"files_per_subjob",
126 type=int, metavar=
"",
127 help=
"Sets the number of input files that wil be used per subjob.")
128 group.add_argument(
"--max-subjobs", dest=
"max_subjobs",
129 type=int, metavar=
"",
130 help=(
"Sets the maximum number of subjobs that will be submitted. "
131 "Input files will be split as evenly as possible between the subjobs."))
134 def add_backends_subparsers(parser, default_max_processes=4,
135 default_global_job_limit=Batch.default_global_job_limit,
136 default_submission_check_heartbeat=Batch.default_sleep_between_submission_checks,
137 local_func=command_local, lsf_func=command_lsf,
138 pbs_func=command_pbs, condor_func=command_condor):
140 Adds a subparser for each CAF backend type that is supported.
141 Provides arguments specific to each Backend type.
144 subparsers = parser.add_subparsers(help=
"Choose the CAF backend to use when submitting jobs.")
145 local_parser = subparsers.add_parser(
"Local",
146 help=
"Use the local backend.",
147 description=
"Runs the jobs using the Local backend i.e. local multiprocessing.",
148 formatter_class=argparse.RawDescriptionHelpFormatter)
149 local_parser.set_defaults(func=local_func)
150 local_parser.add_argument(
"--max-processes", dest=
"max_processes",
151 type=int, metavar=
"", default=default_max_processes,
152 help=(
"Set the multiprocessing Pool size (max concurrent processes)."
153 f
" (default: {default_max_processes})"))
156 lsf_parser = subparsers.add_parser(
"LSF",
157 help=
"Use the LSF backend.",
158 description=
"Runs the jobs using the LSF backend.",
159 formatter_class=argparse.RawDescriptionHelpFormatter)
160 lsf_parser.set_defaults(func=lsf_func)
162 lsf_parser.add_argument(
"--queue", dest=
"queue", metavar=
"",
163 help=(
"The batch queue to use."
166 lsf_parser.add_argument(
"--global-job-limit", dest=
"global_job_limit", metavar=
"", default=default_global_job_limit, type=int,
167 help=(
"The number of batch jobs that can be active for the user before the backend class will stop "
168 "submitting. This is not a completely hard limit, the actual max reached depends on other "
169 "submitting processes and the number submitted before re-checking."
170 f
" (default: {default_global_job_limit})"))
172 lsf_parser.add_argument(
"--submission-check-heartbeat", dest=
"submission_check_heartbeat", metavar=
"", type=int,
173 default=default_submission_check_heartbeat,
174 help=(
"The time (seconds) between checking if there are fewer batch jobs than the global limit. "
175 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
176 f
" (default: {default_submission_check_heartbeat})"))
179 pbs_parser = subparsers.add_parser(
"PBS",
180 help=
"Use the PBS backend.",
181 description=
"Runs the jobs using the PBS backend.",
182 formatter_class=argparse.RawDescriptionHelpFormatter)
183 pbs_parser.set_defaults(func=pbs_func)
185 pbs_parser.add_argument(
"--queue", dest=
"queue", metavar=
"",
186 help=(
"The batch queue to use."
189 pbs_parser.add_argument(
"--global-job-limit", dest=
"global_job_limit", metavar=
"", default=default_global_job_limit, type=int,
190 help=(
"The number of batch jobs that can be active for the user before the backend class will stop "
191 "submitting. This is not a completely hard limit, the actual max reached depends on other "
192 "submitting processes and the number submitted before re-checking."
193 f
" (default: {default_global_job_limit})"))
195 pbs_parser.add_argument(
"--submission-check-heartbeat", dest=
"submission_check_heartbeat", metavar=
"", type=int,
196 default=default_submission_check_heartbeat,
197 help=(
"The time (seconds) between checking if there are fewer batch jobs than the global limit. "
198 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
199 f
" (default: {default_submission_check_heartbeat})"))
202 condor_parser = subparsers.add_parser(
"HTCondor",
203 help=
"Use the HTCondor backend.",
204 description=
"Runs the jobs using the HTCondor backend.",
205 formatter_class=argparse.RawDescriptionHelpFormatter)
206 condor_parser.set_defaults(func=condor_func)
208 condor_parser.add_argument(
"--getenv", dest=
"getenv", metavar=
"",
209 help=(
"Should jobs inherit the submitting environment (doesn't always work as expected)."
212 condor_parser.add_argument(
"--universe", dest=
"universe", metavar=
"",
213 help=(
"Jobs should be submitted using this univese."
216 condor_parser.add_argument(
"--path-prefix", dest=
"path_prefix", metavar=
"", default=
"",
217 help=(
"The string that should be pre-appended to file path given to backend"
218 " e.g. root://dcbldoor.sdcc.bnl.gov:1096"))
220 condor_parser.add_argument(
221 "--global-job-limit",
222 dest=
"global_job_limit",
224 default=default_global_job_limit,
227 "The number of batch jobs that can be active for the user before the backend class will stop "
228 "submitting. This is not a completely hard limit, the actual max reached depends on other "
229 "submitting processes and the number submitted before re-checking."
230 f
" (default: {default_global_job_limit})"))
232 condor_parser.add_argument(
"--submission-check-heartbeat", dest=
"submission_check_heartbeat", metavar=
"", type=int,
233 default=default_submission_check_heartbeat,
234 help=(
"The time (seconds) between checking if there are fewer batch jobs than the global limit. "
235 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
236 f
" (default: {default_submission_check_heartbeat})"))
238 return [local_parser, lsf_parser, pbs_parser, condor_parser]