Belle II Software development
cli.py
1
8import basf2
9from basf2 import B2INFO
10
11import argparse
12
13from caf.backends import Batch, HTCondor, LSF, Local, PBS
14
15
16def command_local(args, backend_args=None):
17 """
18 Runs the jobs using the Local backend i.e. local multiprocessing.
19
20 Parameters:
21 args : Command line arguments that may contain backend args options. These take priority.
22 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
23 will take priority over these values.
24 """
25 B2INFO("Requested use of Local backend")
26 backend = Local(max_processes=args.max_processes, backend_args=backend_args)
27 return backend
28
29
30def command_lsf(args, backend_args=None):
31 """
32 Runs the jobs using the LSF backend
33
34 Parameters:
35 args : Command line arguments that may contain backend args options. These take priority.
36 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
37 will take priority over these values.
38 """
39 B2INFO("Requested use of LSF backend")
40 command_line_backend_args = {"queue": args.queue}
41 # If any backend_args are None then they shouldn't overwrite
42 command_line_backend_args = {key: value for key, value in command_line_backend_args.items() if value is not None}
43 if backend_args is None:
44 backend_args = {}
45 backend_args = {**backend_args, **command_line_backend_args}
46 backend = LSF(backend_args=backend_args) # Sets the backend defaults (can be overriden at the Job level)
47 backend.global_job_limit = args.global_job_limit
48 backend.sleep_between_submission_checks = args.submission_check_heartbeat
49 return backend
50
51
52def command_pbs(args, backend_args=None):
53 """
54 Runs the jobs using the PBS backend
55
56 Parameters:
57 args : Command line arguments that may contain backend args options. These take priority.
58 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
59 will take priority over these values.
60 """
61 B2INFO("Requested use of PBS backend")
62 command_line_backend_args = {"queue": args.queue}
63 command_line_backend_args = {key: value for key, value in command_line_backend_args.items() if value is not None}
64 if backend_args is None:
65 backend_args = {}
66 backend_args = {**backend_args, **command_line_backend_args}
67 backend = PBS(backend_args=backend_args) # Sets the backend defaults (can be overriden at the Job level)
68 backend.global_job_limit = args.global_job_limit
69 backend.sleep_between_submission_checks = args.submission_check_heartbeat
70 return backend
71
72
73def command_condor(args, backend_args=None):
74 """
75 Runs the jobs using the HTCondor backend
76
77 Parameters:
78 args : Command line arguments that may contain backend args options. These take priority.
79 backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
80 will take priority over these values.
81 """
82 B2INFO("Requested use of HTCondor backend")
83 command_line_backend_args = {
84 "universe": args.universe,
85 "getenv": args.getenv,
86 "path_prefix": args.path_prefix
87 }
88 command_line_backend_args = {key: value for key, value in command_line_backend_args.items() if value is not None}
89 if backend_args is None:
90 backend_args = {}
91 backend_args = {**backend_args, **command_line_backend_args}
92 backend = HTCondor(backend_args=backend_args) # Sets the backend defaults (can be overriden at the Job level)
93 backend.global_job_limit = args.global_job_limit
94 backend.sleep_between_submission_checks = args.submission_check_heartbeat
95 return backend
96
97
98def add_basf2_options(parser, default_log_level="INFO"):
99 choices = list(basf2.LogLevel.names.keys())
100 choices.remove("default")
101 parser.add_argument("--log-level", dest="log_level", choices=choices,
102 metavar="", default=default_log_level,
103 help=(f"Set the basf2 LogLevel. (default: {default_log_level}"))
104 parser.add_argument("--debug-level", dest="debug_level",
105 type=int, metavar="",
106 help="Set the DEBUG level value, overrides log-level to be DEBUG.")
107
108
109def add_monitor_options(parser, default_heartbeat=10):
110 """
111 Adds parser options for the monitoring of CAF jobs.
112 """
113 parser.add_argument("--heartbeat", dest="heartbeat",
114 type=int, metavar="", default=default_heartbeat,
115 help=("Sets the sleep interval (seconds) between attempts to check the readiness of jobs."
116 f" (default: {default_heartbeat})")
117 )
118
119
120def add_job_options(parser):
121 """
122 Adds some overall options to a parser, useful for CAF type jobs that can be used with any backend.
123 """
124 group = parser.add_mutually_exclusive_group()
125 group.add_argument("--max-files-per-subjob", dest="files_per_subjob",
126 type=int, metavar="",
127 help="Sets the number of input files that wil be used per subjob.")
128 group.add_argument("--max-subjobs", dest="max_subjobs",
129 type=int, metavar="",
130 help=("Sets the maximum number of subjobs that will be submitted. "
131 "Input files will be split as evenly as possible between the subjobs."))
132
133
134def add_backends_subparsers(parser, default_max_processes=4,
135 default_global_job_limit=Batch.default_global_job_limit,
136 default_submission_check_heartbeat=Batch.default_sleep_between_submission_checks,
137 local_func=command_local, lsf_func=command_lsf,
138 pbs_func=command_pbs, condor_func=command_condor):
139 """
140 Adds a subparser for each CAF backend type that is supported.
141 Provides arguments specific to each Backend type.
142 """
143 # Add each backend as a subparser with their own options
144 subparsers = parser.add_subparsers(help="Choose the CAF backend to use when submitting jobs.")
145 local_parser = subparsers.add_parser("Local",
146 help="Use the local backend.",
147 description="Runs the jobs using the Local backend i.e. local multiprocessing.",
148 formatter_class=argparse.RawDescriptionHelpFormatter)
149 local_parser.set_defaults(func=local_func)
150 local_parser.add_argument("--max-processes", dest="max_processes",
151 type=int, metavar="", default=default_max_processes,
152 help=("Set the multiprocessing Pool size (max concurrent processes)."
153 f" (default: {default_max_processes})"))
154
155
156 lsf_parser = subparsers.add_parser("LSF",
157 help="Use the LSF backend.",
158 description="Runs the jobs using the LSF backend.",
159 formatter_class=argparse.RawDescriptionHelpFormatter)
160 lsf_parser.set_defaults(func=lsf_func)
161
162 lsf_parser.add_argument("--queue", dest="queue", metavar="",
163 help=("The batch queue to use."
164 " (e.g. s)"))
165
166 lsf_parser.add_argument("--global-job-limit", dest="global_job_limit", metavar="", default=default_global_job_limit, type=int,
167 help=("The number of batch jobs that can be active for the user before the backend class will stop "
168 "submitting. This is not a completely hard limit, the actual max reached depends on other "
169 "submitting processes and the number submitted before re-checking."
170 f" (default: {default_global_job_limit})"))
171
172 lsf_parser.add_argument("--submission-check-heartbeat", dest="submission_check_heartbeat", metavar="", type=int,
173 default=default_submission_check_heartbeat,
174 help=("The time (seconds) between checking if there are fewer batch jobs than the global limit. "
175 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
176 f" (default: {default_submission_check_heartbeat})"))
177
178
179 pbs_parser = subparsers.add_parser("PBS",
180 help="Use the PBS backend.",
181 description="Runs the jobs using the PBS backend.",
182 formatter_class=argparse.RawDescriptionHelpFormatter)
183 pbs_parser.set_defaults(func=pbs_func)
184
185 pbs_parser.add_argument("--queue", dest="queue", metavar="",
186 help=("The batch queue to use."
187 " e.g. short"))
188
189 pbs_parser.add_argument("--global-job-limit", dest="global_job_limit", metavar="", default=default_global_job_limit, type=int,
190 help=("The number of batch jobs that can be active for the user before the backend class will stop "
191 "submitting. This is not a completely hard limit, the actual max reached depends on other "
192 "submitting processes and the number submitted before re-checking."
193 f" (default: {default_global_job_limit})"))
194
195 pbs_parser.add_argument("--submission-check-heartbeat", dest="submission_check_heartbeat", metavar="", type=int,
196 default=default_submission_check_heartbeat,
197 help=("The time (seconds) between checking if there are fewer batch jobs than the global limit. "
198 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
199 f" (default: {default_submission_check_heartbeat})"))
200
201
202 condor_parser = subparsers.add_parser("HTCondor",
203 help="Use the HTCondor backend.",
204 description="Runs the jobs using the HTCondor backend.",
205 formatter_class=argparse.RawDescriptionHelpFormatter)
206 condor_parser.set_defaults(func=condor_func)
207
208 condor_parser.add_argument("--getenv", dest="getenv", metavar="",
209 help=("Should jobs inherit the submitting environment (doesn't always work as expected)."
210 " e.g. false"))
211
212 condor_parser.add_argument("--universe", dest="universe", metavar="",
213 help=("Jobs should be submitted using this univese."
214 " e.g. vanilla"))
215
216 condor_parser.add_argument("--path-prefix", dest="path_prefix", metavar="", default="",
217 help=("The string that should be pre-appended to file path given to backend"
218 " e.g. root://dcbldoor.sdcc.bnl.gov:1096"))
219
220 condor_parser.add_argument(
221 "--global-job-limit",
222 dest="global_job_limit",
223 metavar="",
224 default=default_global_job_limit,
225 type=int,
226 help=(
227 "The number of batch jobs that can be active for the user before the backend class will stop "
228 "submitting. This is not a completely hard limit, the actual max reached depends on other "
229 "submitting processes and the number submitted before re-checking."
230 f" (default: {default_global_job_limit})"))
231
232 condor_parser.add_argument("--submission-check-heartbeat", dest="submission_check_heartbeat", metavar="", type=int,
233 default=default_submission_check_heartbeat,
234 help=("The time (seconds) between checking if there are fewer batch jobs than the global limit. "
235 "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
236 f" (default: {default_submission_check_heartbeat})"))
237
238 return [local_parser, lsf_parser, pbs_parser, condor_parser]