9from basf2
import B2INFO
13from caf.backends
import Batch, HTCondor, LSF, Local, PBS
16def command_local(args, backend_args=None):
18 Runs the jobs using the Local backend i.e. local multiprocessing.
21 args : Command line arguments that may contain backend args options. These take priority.
22 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
23 will take priority over these values.
25 B2INFO("Requested use of Local backend")
26 backend = Local(max_processes=args.max_processes, backend_args=backend_args)
30def command_lsf(args, backend_args=None):
32 Runs the jobs using the LSF backend
35 args : Command line arguments that may contain backend args options. These take priority.
36 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
37 will take priority over these values.
39 B2INFO("Requested use of LSF backend")
40 command_line_backend_args = {
"queue": args.queue}
42 command_line_backend_args = {key: value
for key, value
in command_line_backend_args.items()
if value
is not None}
43 if backend_args
is None:
45 backend_args = {**backend_args, **command_line_backend_args}
46 backend = LSF(backend_args=backend_args)
47 backend.global_job_limit = args.global_job_limit
48 backend.sleep_between_submission_checks = args.submission_check_heartbeat
52def command_pbs(args, backend_args=None):
54 Runs the jobs using the PBS backend
57 args : Command line arguments that may contain backend args options. These take priority.
58 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
59 will take priority over these values.
61 B2INFO("Requested use of PBS backend")
62 command_line_backend_args = {
"queue": args.queue}
63 command_line_backend_args = {key: value
for key, value
in command_line_backend_args.items()
if value
is not None}
64 if backend_args
is None:
66 backend_args = {**backend_args, **command_line_backend_args}
67 backend = PBS(backend_args=backend_args)
68 backend.global_job_limit = args.global_job_limit
69 backend.sleep_between_submission_checks = args.submission_check_heartbeat
73def command_condor(args, backend_args=None):
75 Runs the jobs using the HTCondor backend
78 args : Command line arguments that may contain backend args options. These take priority.
79 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
80 will take priority over these values.
82 B2INFO("Requested use of HTCondor backend")
83 command_line_backend_args = {
84 "universe": args.universe,
85 "getenv": args.getenv,
86 "path_prefix": args.path_prefix
88 command_line_backend_args = {key: value
for key, value
in command_line_backend_args.items()
if value
is not None}
89 if backend_args
is None:
91 backend_args = {**backend_args, **command_line_backend_args}
92 backend = HTCondor(backend_args=backend_args)
93 backend.global_job_limit = args.global_job_limit
94 backend.sleep_between_submission_checks = args.submission_check_heartbeat
98def add_basf2_options(parser, default_log_level="INFO"):
99 choices = list(basf2.LogLevel.names.keys())
100 choices.remove(
"default")
101 parser.add_argument(
"--log-level", dest=
"log_level", choices=choices,
102 metavar=
"", default=default_log_level,
103 help=(f
"Set the basf2 LogLevel. (default: {default_log_level}"))
104 parser.add_argument(
"--debug-level", dest=
"debug_level",
105 type=int, metavar=
"",
106 help=
"Set the DEBUG level value, overrides log-level to be DEBUG.")
109def add_monitor_options(parser, default_heartbeat=10):
111 Adds parser options for the monitoring of CAF jobs.
113 parser.add_argument("--heartbeat", dest=
"heartbeat",
114 type=int, metavar=
"", default=default_heartbeat,
115 help=(
"Sets the sleep interval (seconds) between attempts to check the readiness of jobs."
116 f
" (default: {default_heartbeat})")
120def add_job_options(parser):
122 Adds some overall options to a parser, useful for CAF type jobs that can be used
with any backend.
124 group = parser.add_mutually_exclusive_group()
125 group.add_argument("--max-files-per-subjob", dest=
"files_per_subjob",
126 type=int, metavar=
"",
127 help=
"Sets the number of input files that will be used per subjob.")
128 group.add_argument(
"--max-subjobs", dest=
"max_subjobs",
129 type=int, metavar=
"",
130 help=(
"Sets the maximum number of subjobs that will be submitted. "
131 "Input files will be split as evenly as possible between the subjobs."))
134def add_backends_subparsers(parser, default_max_processes=4,
135 default_global_job_limit=Batch.default_global_job_limit,
136 default_submission_check_heartbeat=Batch.default_sleep_between_submission_checks,
137 local_func=command_local, lsf_func=command_lsf,
138 pbs_func=command_pbs, condor_func=command_condor):
140 Adds a subparser for each CAF backend type that
is supported.
141 Provides arguments specific to each Backend type.
144 subparsers = parser.add_subparsers(help=
"Choose the CAF backend to use when submitting jobs.")
145 local_parser = subparsers.add_parser(
"Local",
146 help=
"Use the local backend.",
147 description=
"Runs the jobs using the Local backend i.e. local multiprocessing.",
148 formatter_class=argparse.RawDescriptionHelpFormatter)
149 local_parser.set_defaults(func=local_func)
150 local_parser.add_argument(
"--max-processes", dest=
"max_processes",
151 type=int, metavar=
"", default=default_max_processes,
152 help=(
"Set the multiprocessing Pool size (max concurrent processes)."
153 f
" (default: {default_max_processes})"))
156 lsf_parser = subparsers.add_parser(
"LSF",
157 help=
"Use the LSF backend.",
158 description=
"Runs the jobs using the LSF backend.",
159 formatter_class=argparse.RawDescriptionHelpFormatter)
160 lsf_parser.set_defaults(func=lsf_func)
162 lsf_parser.add_argument(
"--queue", dest=
"queue", metavar=
"",
163 help=(
"The batch queue to use."
166 lsf_parser.add_argument(
"--global-job-limit", dest=
"global_job_limit", metavar=
"", default=default_global_job_limit, type=int,
167 help=(
"The number of batch jobs that can be active for the user before the backend class will stop "
168 "submitting. This is not a completely hard limit, the actual max reached depends on other "
169 "submitting processes and the number submitted before re-checking."
170 f
" (default: {default_global_job_limit})"))
172 lsf_parser.add_argument(
"--submission-check-heartbeat", dest=
"submission_check_heartbeat", metavar=
"", type=int,
173 default=default_submission_check_heartbeat,
174 help=(
"The time (seconds) between checking if there are fewer batch jobs than the global limit. "
175 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
176 f
" (default: {default_submission_check_heartbeat})"))
179 pbs_parser = subparsers.add_parser(
"PBS",
180 help=
"Use the PBS backend.",
181 description=
"Runs the jobs using the PBS backend.",
182 formatter_class=argparse.RawDescriptionHelpFormatter)
183 pbs_parser.set_defaults(func=pbs_func)
185 pbs_parser.add_argument(
"--queue", dest=
"queue", metavar=
"",
186 help=(
"The batch queue to use."
189 pbs_parser.add_argument(
"--global-job-limit", dest=
"global_job_limit", metavar=
"", default=default_global_job_limit, type=int,
190 help=(
"The number of batch jobs that can be active for the user before the backend class will stop "
191 "submitting. This is not a completely hard limit, the actual max reached depends on other "
192 "submitting processes and the number submitted before re-checking."
193 f
" (default: {default_global_job_limit})"))
195 pbs_parser.add_argument(
"--submission-check-heartbeat", dest=
"submission_check_heartbeat", metavar=
"", type=int,
196 default=default_submission_check_heartbeat,
197 help=(
"The time (seconds) between checking if there are fewer batch jobs than the global limit. "
198 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
199 f
" (default: {default_submission_check_heartbeat})"))
202 condor_parser = subparsers.add_parser(
"HTCondor",
203 help=
"Use the HTCondor backend.",
204 description=
"Runs the jobs using the HTCondor backend.",
205 formatter_class=argparse.RawDescriptionHelpFormatter)
206 condor_parser.set_defaults(func=condor_func)
208 condor_parser.add_argument(
"--getenv", dest=
"getenv", metavar=
"",
209 help=(
"Should jobs inherit the submitting environment (doesn't always work as expected)."
212 condor_parser.add_argument(
"--universe", dest=
"universe", metavar=
"",
213 help=(
"Jobs should be submitted using this univese."
216 condor_parser.add_argument(
"--path-prefix", dest=
"path_prefix", metavar=
"", default=
"",
217 help=(
"The string that should be pre-appended to file path given to backend"
218 " e.g. root://dcbldoor.sdcc.bnl.gov:1096"))
220 condor_parser.add_argument(
221 "--global-job-limit",
222 dest=
"global_job_limit",
224 default=default_global_job_limit,
227 "The number of batch jobs that can be active for the user before the backend class will stop "
228 "submitting. This is not a completely hard limit, the actual max reached depends on other "
229 "submitting processes and the number submitted before re-checking."
230 f
" (default: {default_global_job_limit})"))
232 condor_parser.add_argument(
"--submission-check-heartbeat", dest=
"submission_check_heartbeat", metavar=
"", type=int,
233 default=default_submission_check_heartbeat,
234 help=(
"The time (seconds) between checking if there are fewer batch jobs than the global limit. "
235 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
236 f
" (default: {default_submission_check_heartbeat})"))
238 return [local_parser, lsf_parser, pbs_parser, condor_parser]