Belle II Software  release-08-01-10
cli.py
1 
8 import basf2
9 from basf2 import B2INFO
10 
11 import argparse
12 
13 from caf.backends import Batch, HTCondor, LSF, Local, PBS
14 
15 
16 def command_local(args, backend_args=None):
17  """
18  Runs the jobs using the Local backend i.e. local multiprocessing.
19 
20  Parameters:
21  args : Command line arguments that may contain backend args options. These take priority.
22  backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
23  will take priority over these values.
24  """
25  B2INFO("Requested use of Local backend")
26  backend = Local(max_processes=args.max_processes, backend_args=backend_args)
27  return backend
28 
29 
30 def command_lsf(args, backend_args=None):
31  """
32  Runs the jobs using the LSF backend
33 
34  Parameters:
35  args : Command line arguments that may contain backend args options. These take priority.
36  backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
37  will take priority over these values.
38  """
39  B2INFO("Requested use of LSF backend")
40  command_line_backend_args = {"queue": args.queue}
41  # If any backend_args are None then they shouldn't overwrite
42  command_line_backend_args = {key: value for key, value in command_line_backend_args.items() if value is not None}
43  if backend_args is None:
44  backend_args = {}
45  backend_args = {**backend_args, **command_line_backend_args}
46  backend = LSF(backend_args=backend_args) # Sets the backend defaults (can be overriden at the Job level)
47  backend.global_job_limit = args.global_job_limit
48  backend.sleep_between_submission_checks = args.submission_check_heartbeat
49  return backend
50 
51 
52 def command_pbs(args, backend_args=None):
53  """
54  Runs the jobs using the PBS backend
55 
56  Parameters:
57  args : Command line arguments that may contain backend args options. These take priority.
58  backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
59  will take priority over these values.
60  """
61  B2INFO("Requested use of PBS backend")
62  command_line_backend_args = {"queue": args.queue}
63  command_line_backend_args = {key: value for key, value in command_line_backend_args.items() if value is not None}
64  if backend_args is None:
65  backend_args = {}
66  backend_args = {**backend_args, **command_line_backend_args}
67  backend = PBS(backend_args=backend_args) # Sets the backend defaults (can be overriden at the Job level)
68  backend.global_job_limit = args.global_job_limit
69  backend.sleep_between_submission_checks = args.submission_check_heartbeat
70  return backend
71 
72 
73 def command_condor(args, backend_args=None):
74  """
75  Runs the jobs using the HTCondor backend
76 
77  Parameters:
78  args : Command line arguments that may contain backend args options. These take priority.
79  backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
80  will take priority over these values.
81  """
82  B2INFO("Requested use of HTCondor backend")
83  command_line_backend_args = {
84  "universe": args.universe,
85  "getenv": args.getenv,
86  "path_prefix": args.path_prefix
87  }
88  command_line_backend_args = {key: value for key, value in command_line_backend_args.items() if value is not None}
89  if backend_args is None:
90  backend_args = {}
91  backend_args = {**backend_args, **command_line_backend_args}
92  backend = HTCondor(backend_args=backend_args) # Sets the backend defaults (can be overriden at the Job level)
93  backend.global_job_limit = args.global_job_limit
94  backend.sleep_between_submission_checks = args.submission_check_heartbeat
95  return backend
96 
97 
98 def add_basf2_options(parser, default_log_level="INFO"):
99  choices = list(basf2.LogLevel.names.keys())
100  choices.remove("default")
101  parser.add_argument("--log-level", dest="log_level", choices=choices,
102  metavar="", default=default_log_level,
103  help=(f"Set the basf2 LogLevel. (default: {default_log_level}"))
104  parser.add_argument("--debug-level", dest="debug_level",
105  type=int, metavar="",
106  help="Set the DEBUG level value, overrides log-level to be DEBUG.")
107 
108 
109 def add_monitor_options(parser, default_heartbeat=10):
110  """
111  Adds parser options for the monitoring of CAF jobs.
112  """
113  parser.add_argument("--heartbeat", dest="heartbeat",
114  type=int, metavar="", default=default_heartbeat,
115  help=("Sets the sleep interval (seconds) between attempts to check the readiness of jobs."
116  f" (default: {default_heartbeat})")
117  )
118 
119 
120 def add_job_options(parser):
121  """
122  Adds some overall options to a parser, useful for CAF type jobs that can be used with any backend.
123  """
124  group = parser.add_mutually_exclusive_group()
125  group.add_argument("--max-files-per-subjob", dest="files_per_subjob",
126  type=int, metavar="",
127  help="Sets the number of input files that wil be used per subjob.")
128  group.add_argument("--max-subjobs", dest="max_subjobs",
129  type=int, metavar="",
130  help=("Sets the maximum number of subjobs that will be submitted. "
131  "Input files will be split as evenly as possible between the subjobs."))
132 
133 
134 def add_backends_subparsers(parser, default_max_processes=4,
135  default_global_job_limit=Batch.default_global_job_limit,
136  default_submission_check_heartbeat=Batch.default_sleep_between_submission_checks,
137  local_func=command_local, lsf_func=command_lsf,
138  pbs_func=command_pbs, condor_func=command_condor):
139  """
140  Adds a subparser for each CAF backend type that is supported.
141  Provides arguments specific to each Backend type.
142  """
143  # Add each backend as a subparser with their own options
144  subparsers = parser.add_subparsers(help="Choose the CAF backend to use when submitting jobs.")
145  local_parser = subparsers.add_parser("Local",
146  help="Use the local backend.",
147  description="Runs the jobs using the Local backend i.e. local multiprocessing.",
148  formatter_class=argparse.RawDescriptionHelpFormatter)
149  local_parser.set_defaults(func=local_func)
150  local_parser.add_argument("--max-processes", dest="max_processes",
151  type=int, metavar="", default=default_max_processes,
152  help=("Set the multiprocessing Pool size (max concurrent processes)."
153  f" (default: {default_max_processes})"))
154 
155 
156  lsf_parser = subparsers.add_parser("LSF",
157  help="Use the LSF backend.",
158  description="Runs the jobs using the LSF backend.",
159  formatter_class=argparse.RawDescriptionHelpFormatter)
160  lsf_parser.set_defaults(func=lsf_func)
161 
162  lsf_parser.add_argument("--queue", dest="queue", metavar="",
163  help=("The batch queue to use."
164  " (e.g. s)"))
165 
166  lsf_parser.add_argument("--global-job-limit", dest="global_job_limit", metavar="", default=default_global_job_limit, type=int,
167  help=("The number of batch jobs that can be active for the user before the backend class will stop "
168  "submitting. This is not a completely hard limit, the actual max reached depends on other "
169  "submitting processes and the number submitted before re-checking."
170  f" (default: {default_global_job_limit})"))
171 
172  lsf_parser.add_argument("--submission-check-heartbeat", dest="submission_check_heartbeat", metavar="", type=int,
173  default=default_submission_check_heartbeat,
174  help=("The time (seconds) between checking if there are fewer batch jobs than the global limit. "
175  "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
176  f" (default: {default_submission_check_heartbeat})"))
177 
178 
179  pbs_parser = subparsers.add_parser("PBS",
180  help="Use the PBS backend.",
181  description="Runs the jobs using the PBS backend.",
182  formatter_class=argparse.RawDescriptionHelpFormatter)
183  pbs_parser.set_defaults(func=pbs_func)
184 
185  pbs_parser.add_argument("--queue", dest="queue", metavar="",
186  help=("The batch queue to use."
187  " e.g. short"))
188 
189  pbs_parser.add_argument("--global-job-limit", dest="global_job_limit", metavar="", default=default_global_job_limit, type=int,
190  help=("The number of batch jobs that can be active for the user before the backend class will stop "
191  "submitting. This is not a completely hard limit, the actual max reached depends on other "
192  "submitting processes and the number submitted before re-checking."
193  f" (default: {default_global_job_limit})"))
194 
195  pbs_parser.add_argument("--submission-check-heartbeat", dest="submission_check_heartbeat", metavar="", type=int,
196  default=default_submission_check_heartbeat,
197  help=("The time (seconds) between checking if there are fewer batch jobs than the global limit. "
198  "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
199  f" (default: {default_submission_check_heartbeat})"))
200 
201 
202  condor_parser = subparsers.add_parser("HTCondor",
203  help="Use the HTCondor backend.",
204  description="Runs the jobs using the HTCondor backend.",
205  formatter_class=argparse.RawDescriptionHelpFormatter)
206  condor_parser.set_defaults(func=condor_func)
207 
208  condor_parser.add_argument("--getenv", dest="getenv", metavar="",
209  help=("Should jobs inherit the submitting environment (doesn't always work as expected)."
210  " e.g. false"))
211 
212  condor_parser.add_argument("--universe", dest="universe", metavar="",
213  help=("Jobs should be submitted using this univese."
214  " e.g. vanilla"))
215 
216  condor_parser.add_argument("--path-prefix", dest="path_prefix", metavar="", default="",
217  help=("The string that should be pre-appended to file path given to backend"
218  " e.g. root://dcbldoor.sdcc.bnl.gov:1096"))
219 
220  condor_parser.add_argument(
221  "--global-job-limit",
222  dest="global_job_limit",
223  metavar="",
224  default=default_global_job_limit,
225  type=int,
226  help=(
227  "The number of batch jobs that can be active for the user before the backend class will stop "
228  "submitting. This is not a completely hard limit, the actual max reached depends on other "
229  "submitting processes and the number submitted before re-checking."
230  f" (default: {default_global_job_limit})"))
231 
232  condor_parser.add_argument("--submission-check-heartbeat", dest="submission_check_heartbeat", metavar="", type=int,
233  default=default_submission_check_heartbeat,
234  help=("The time (seconds) between checking if there are fewer batch jobs than the global limit. "
235  "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
236  f" (default: {default_submission_check_heartbeat})"))
237 
238  return [local_parser, lsf_parser, pbs_parser, condor_parser]