2 from basf2
import B2INFO
6 from caf.backends
import Batch, HTCondor, LSF, Local, PBS
9 def command_local(args, backend_args=None):
11 Runs the jobs using the Local backend i.e. local multiprocessing.
14 args : Command line arguments that may contain backend args options. These take priority.
15 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
16 will take priority over these values.
18 B2INFO(f
"Requested use of Local backend")
19 backend = Local(max_processes=args.max_processes, backend_args=backend_args)
23 def command_lsf(args, backend_args=None):
25 Runs the jobs using the LSF backend
28 args : Command line arguments that may contain backend args options. These take priority.
29 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
30 will take priority over these values.
32 B2INFO(f
"Requested use of LSF backend")
33 command_line_backend_args = {
"queue": args.queue}
35 command_line_backend_args = {key: value
for key, value
in command_line_backend_args.items()
if value
is not None}
36 if backend_args
is None:
38 backend_args = {**backend_args, **command_line_backend_args}
39 backend = LSF(backend_args=backend_args)
40 backend.global_job_limit = args.global_job_limit
41 backend.sleep_between_submission_checks = args.submission_check_heartbeat
45 def command_pbs(args, backend_args=None):
47 Runs the jobs using the PBS backend
50 args : Command line arguments that may contain backend args options. These take priority.
51 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
52 will take priority over these values.
54 B2INFO(f
"Requested use of PBS backend")
55 command_line_backend_args = {
"queue": args.queue}
56 command_line_backend_args = {key: value
for key, value
in command_line_backend_args.items()
if value
is not None}
57 if backend_args
is None:
59 backend_args = {**backend_args, **command_line_backend_args}
60 backend = PBS(backend_args=backend_args)
61 backend.global_job_limit = args.global_job_limit
62 backend.sleep_between_submission_checks = args.submission_check_heartbeat
66 def command_condor(args, backend_args=None):
68 Runs the jobs using the HTCondor backend
71 args : Command line arguments that may contain backend args options. These take priority.
72 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
73 will take priority over these values.
75 B2INFO(f
"Requested use of HTCondor backend")
76 command_line_backend_args = {
77 "universe": args.universe,
78 "getenv": args.getenv,
79 "path_prefix": args.path_prefix
81 command_line_backend_args = {key: value
for key, value
in command_line_backend_args.items()
if value
is not None}
82 if backend_args
is None:
84 backend_args = {**backend_args, **command_line_backend_args}
85 backend = HTCondor(backend_args=backend_args)
86 backend.global_job_limit = args.global_job_limit
87 backend.sleep_between_submission_checks = args.submission_check_heartbeat
91 def add_basf2_options(parser, default_log_level="INFO"):
92 choices = list(basf2.LogLevel.names.keys())
93 choices.remove(
"default")
94 parser.add_argument(
"--log-level", dest=
"log_level", choices=choices,
95 metavar=
"", default=default_log_level,
96 help=(f
"Set the basf2 LogLevel. (default: {default_log_level}"))
97 parser.add_argument(
"--debug-level", dest=
"debug_level",
99 help=
"Set the DEBUG level value, overrides log-level to be DEBUG.")
102 def add_monitor_options(parser, default_heartbeat=10):
104 Adds parser options for the monitoring of CAF jobs.
106 parser.add_argument(
"--heartbeat", dest=
"heartbeat",
107 type=int, metavar=
"", default=default_heartbeat,
108 help=(
"Sets the sleep interval (seconds) between attempts to check the readiness of jobs."
109 f
" (default: {default_heartbeat})")
113 def add_job_options(parser):
115 Adds some overall options to a parser, useful for CAF type jobs that can be used with any backend.
117 group = parser.add_mutually_exclusive_group()
118 group.add_argument(
"--max-files-per-subjob", dest=
"files_per_subjob",
119 type=int, metavar=
"",
120 help=
"Sets the number of input files that wil be used per subjob.")
121 group.add_argument(
"--max-subjobs", dest=
"max_subjobs",
122 type=int, metavar=
"",
123 help=(
"Sets the maximum number of subjobs that will be submitted. "
124 "Input files will be split as evenly as possible between the subjobs."))
127 def add_backends_subparsers(parser, default_max_processes=4,
128 default_global_job_limit=Batch.default_global_job_limit,
129 default_submission_check_heartbeat=Batch.default_sleep_between_submission_checks,
130 local_func=command_local, lsf_func=command_lsf,
131 pbs_func=command_pbs, condor_func=command_condor):
133 Adds a subparser for each CAF backend type that is supported.
134 Provides arguments specific to each Backend type.
137 subparsers = parser.add_subparsers(help=
"Choose the CAF backend to use when submitting jobs.")
138 local_parser = subparsers.add_parser(
"Local",
139 help=
"Use the local backend.",
140 description=
"Runs the jobs using the Local backend i.e. local multiprocessing.",
141 formatter_class=argparse.RawDescriptionHelpFormatter)
142 local_parser.set_defaults(func=local_func)
143 local_parser.add_argument(
"--max-processes", dest=
"max_processes",
144 type=int, metavar=
"", default=default_max_processes,
145 help=(
"Set the multiprocessing Pool size (max concurrent processes)."
146 f
" (default: {default_max_processes})"))
149 lsf_parser = subparsers.add_parser(
"LSF",
150 help=
"Use the LSF backend.",
151 description=
"Runs the jobs using the LSF backend.",
152 formatter_class=argparse.RawDescriptionHelpFormatter)
153 lsf_parser.set_defaults(func=lsf_func)
155 lsf_parser.add_argument(
"--queue", dest=
"queue", metavar=
"",
156 help=(
"The batch queue to use."
159 lsf_parser.add_argument(
"--global-job-limit", dest=
"global_job_limit", metavar=
"", default=default_global_job_limit, type=int,
160 help=(
"The number of batch jobs that can be active for the user before the backend class will stop "
161 "submitting. This is not a completely hard limit, the actual max reached depends on other "
162 "submitting processes and the number submitted before re-checking."
163 f
" (default: {default_global_job_limit})"))
165 lsf_parser.add_argument(
"--submission-check-heartbeat", dest=
"submission_check_heartbeat", metavar=
"", type=int,
166 default=default_submission_check_heartbeat,
167 help=(
"The time (seconds) between checking if there are fewer batch jobs than the global limit. "
168 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
169 f
" (default: {default_submission_check_heartbeat})"))
172 pbs_parser = subparsers.add_parser(
"PBS",
173 help=
"Use the PBS backend.",
174 description=
"Runs the jobs using the PBS backend.",
175 formatter_class=argparse.RawDescriptionHelpFormatter)
176 pbs_parser.set_defaults(func=pbs_func)
178 pbs_parser.add_argument(
"--queue", dest=
"queue", metavar=
"",
179 help=(
"The batch queue to use."
182 pbs_parser.add_argument(
"--global-job-limit", dest=
"global_job_limit", metavar=
"", default=default_global_job_limit, type=int,
183 help=(
"The number of batch jobs that can be active for the user before the backend class will stop "
184 "submitting. This is not a completely hard limit, the actual max reached depends on other "
185 "submitting processes and the number submitted before re-checking."
186 f
" (default: {default_global_job_limit})"))
188 pbs_parser.add_argument(
"--submission-check-heartbeat", dest=
"submission_check_heartbeat", metavar=
"", type=int,
189 default=default_submission_check_heartbeat,
190 help=(
"The time (seconds) between checking if there are fewer batch jobs than the global limit. "
191 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
192 f
" (default: {default_submission_check_heartbeat})"))
195 condor_parser = subparsers.add_parser(
"HTCondor",
196 help=
"Use the HTCondor backend.",
197 description=
"Runs the jobs using the HTCondor backend.",
198 formatter_class=argparse.RawDescriptionHelpFormatter)
199 condor_parser.set_defaults(func=condor_func)
201 condor_parser.add_argument(
"--getenv", dest=
"getenv", metavar=
"",
202 help=(
"Should jobs inherit the submitting environment (doesn't always work as expected)."
205 condor_parser.add_argument(
"--universe", dest=
"universe", metavar=
"",
206 help=(
"Jobs should be submitted using this univese."
209 condor_parser.add_argument(
"--path-prefix", dest=
"path_prefix", metavar=
"", default=
"",
210 help=(
"The string that should be pre-appended to file path given to backend"
211 " e.g. root://dcbldoor.sdcc.bnl.gov:1096"))
213 condor_parser.add_argument(
214 "--global-job-limit",
215 dest=
"global_job_limit",
217 default=default_global_job_limit,
220 "The number of batch jobs that can be active for the user before the backend class will stop "
221 "submitting. This is not a completely hard limit, the actual max reached depends on other "
222 "submitting processes and the number submitted before re-checking."
223 f
" (default: {default_global_job_limit})"))
225 condor_parser.add_argument(
"--submission-check-heartbeat", dest=
"submission_check_heartbeat", metavar=
"", type=int,
226 default=default_submission_check_heartbeat,
227 help=(
"The time (seconds) between checking if there are fewer batch jobs than the global limit. "
228 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
229 f
" (default: {default_submission_check_heartbeat})"))
231 return [local_parser, lsf_parser, pbs_parser, condor_parser]