Belle II Software  release-05-01-25
cli.py
1 import basf2
2 from basf2 import B2INFO
3 
4 import argparse
5 
6 from caf.backends import Batch, HTCondor, LSF, Local, PBS
7 
8 
9 def command_local(args, backend_args=None):
10  """
11  Runs the jobs using the Local backend i.e. local multiprocessing.
12 
13  Parameters:
14  args : Command line arguments that may contain backend args options. These take priority.
15  backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
16  will take priority over these values.
17  """
18  B2INFO(f"Requested use of Local backend")
19  backend = Local(max_processes=args.max_processes, backend_args=backend_args)
20  return backend
21 
22 
23 def command_lsf(args, backend_args=None):
24  """
25  Runs the jobs using the LSF backend
26 
27  Parameters:
28  args : Command line arguments that may contain backend args options. These take priority.
29  backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
30  will take priority over these values.
31  """
32  B2INFO(f"Requested use of LSF backend")
33  command_line_backend_args = {"queue": args.queue}
34  # If any backend_args are None then they shouldn't overwrite
35  command_line_backend_args = {key: value for key, value in command_line_backend_args.items() if value is not None}
36  if backend_args is None:
37  backend_args = {}
38  backend_args = {**backend_args, **command_line_backend_args}
39  backend = LSF(backend_args=backend_args) # Sets the backend defaults (can be overriden at the Job level)
40  backend.global_job_limit = args.global_job_limit
41  backend.sleep_between_submission_checks = args.submission_check_heartbeat
42  return backend
43 
44 
45 def command_pbs(args, backend_args=None):
46  """
47  Runs the jobs using the PBS backend
48 
49  Parameters:
50  args : Command line arguments that may contain backend args options. These take priority.
51  backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
52  will take priority over these values.
53  """
54  B2INFO(f"Requested use of PBS backend")
55  command_line_backend_args = {"queue": args.queue}
56  command_line_backend_args = {key: value for key, value in command_line_backend_args.items() if value is not None}
57  if backend_args is None:
58  backend_args = {}
59  backend_args = {**backend_args, **command_line_backend_args}
60  backend = PBS(backend_args=backend_args) # Sets the backend defaults (can be overriden at the Job level)
61  backend.global_job_limit = args.global_job_limit
62  backend.sleep_between_submission_checks = args.submission_check_heartbeat
63  return backend
64 
65 
66 def command_condor(args, backend_args=None):
67  """
68  Runs the jobs using the HTCondor backend
69 
70  Parameters:
71  args : Command line arguments that may contain backend args options. These take priority.
72  backend_args (dict): Backend arguments that will be applied. Specific backend args set by the args positional argument
73  will take priority over these values.
74  """
75  B2INFO(f"Requested use of HTCondor backend")
76  command_line_backend_args = {
77  "universe": args.universe,
78  "getenv": args.getenv,
79  "path_prefix": args.path_prefix
80  }
81  command_line_backend_args = {key: value for key, value in command_line_backend_args.items() if value is not None}
82  if backend_args is None:
83  backend_args = {}
84  backend_args = {**backend_args, **command_line_backend_args}
85  backend = HTCondor(backend_args=backend_args) # Sets the backend defaults (can be overriden at the Job level)
86  backend.global_job_limit = args.global_job_limit
87  backend.sleep_between_submission_checks = args.submission_check_heartbeat
88  return backend
89 
90 
91 def add_basf2_options(parser, default_log_level="INFO"):
92  choices = list(basf2.LogLevel.names.keys())
93  choices.remove("default")
94  parser.add_argument("--log-level", dest="log_level", choices=choices,
95  metavar="", default=default_log_level,
96  help=(f"Set the basf2 LogLevel. (default: {default_log_level}"))
97  parser.add_argument("--debug-level", dest="debug_level",
98  type=int, metavar="",
99  help="Set the DEBUG level value, overrides log-level to be DEBUG.")
100 
101 
102 def add_monitor_options(parser, default_heartbeat=10):
103  """
104  Adds parser options for the monitoring of CAF jobs.
105  """
106  parser.add_argument("--heartbeat", dest="heartbeat",
107  type=int, metavar="", default=default_heartbeat,
108  help=("Sets the sleep interval (seconds) between attempts to check the readiness of jobs."
109  f" (default: {default_heartbeat})")
110  )
111 
112 
113 def add_job_options(parser):
114  """
115  Adds some overall options to a parser, useful for CAF type jobs that can be used with any backend.
116  """
117  group = parser.add_mutually_exclusive_group()
118  group.add_argument("--max-files-per-subjob", dest="files_per_subjob",
119  type=int, metavar="",
120  help="Sets the number of input files that wil be used per subjob.")
121  group.add_argument("--max-subjobs", dest="max_subjobs",
122  type=int, metavar="",
123  help=("Sets the maximum number of subjobs that will be submitted. "
124  "Input files will be split as evenly as possible between the subjobs."))
125 
126 
127 def add_backends_subparsers(parser, default_max_processes=4,
128  default_global_job_limit=Batch.default_global_job_limit,
129  default_submission_check_heartbeat=Batch.default_sleep_between_submission_checks,
130  local_func=command_local, lsf_func=command_lsf,
131  pbs_func=command_pbs, condor_func=command_condor):
132  """
133  Adds a subparser for each CAF backend type that is supported.
134  Provides arguments specific to each Backend type.
135  """
136  # Add each backend as a subparser with their own options
137  subparsers = parser.add_subparsers(help="Choose the CAF backend to use when submitting jobs.")
138  local_parser = subparsers.add_parser("Local",
139  help="Use the local backend.",
140  description="Runs the jobs using the Local backend i.e. local multiprocessing.",
141  formatter_class=argparse.RawDescriptionHelpFormatter)
142  local_parser.set_defaults(func=local_func)
143  local_parser.add_argument("--max-processes", dest="max_processes",
144  type=int, metavar="", default=default_max_processes,
145  help=("Set the multiprocessing Pool size (max concurrent processes)."
146  f" (default: {default_max_processes})"))
147 
148 
149  lsf_parser = subparsers.add_parser("LSF",
150  help="Use the LSF backend.",
151  description="Runs the jobs using the LSF backend.",
152  formatter_class=argparse.RawDescriptionHelpFormatter)
153  lsf_parser.set_defaults(func=lsf_func)
154 
155  lsf_parser.add_argument("--queue", dest="queue", metavar="",
156  help=("The batch queue to use."
157  " (e.g. s)"))
158 
159  lsf_parser.add_argument("--global-job-limit", dest="global_job_limit", metavar="", default=default_global_job_limit, type=int,
160  help=("The number of batch jobs that can be active for the user before the backend class will stop "
161  "submitting. This is not a completely hard limit, the actual max reached depends on other "
162  "submitting processes and the number submitted before re-checking."
163  f" (default: {default_global_job_limit})"))
164 
165  lsf_parser.add_argument("--submission-check-heartbeat", dest="submission_check_heartbeat", metavar="", type=int,
166  default=default_submission_check_heartbeat,
167  help=("The time (seconds) between checking if there are fewer batch jobs than the global limit. "
168  "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
169  f" (default: {default_submission_check_heartbeat})"))
170 
171 
172  pbs_parser = subparsers.add_parser("PBS",
173  help="Use the PBS backend.",
174  description="Runs the jobs using the PBS backend.",
175  formatter_class=argparse.RawDescriptionHelpFormatter)
176  pbs_parser.set_defaults(func=pbs_func)
177 
178  pbs_parser.add_argument("--queue", dest="queue", metavar="",
179  help=("The batch queue to use."
180  " e.g. short"))
181 
182  pbs_parser.add_argument("--global-job-limit", dest="global_job_limit", metavar="", default=default_global_job_limit, type=int,
183  help=("The number of batch jobs that can be active for the user before the backend class will stop "
184  "submitting. This is not a completely hard limit, the actual max reached depends on other "
185  "submitting processes and the number submitted before re-checking."
186  f" (default: {default_global_job_limit})"))
187 
188  pbs_parser.add_argument("--submission-check-heartbeat", dest="submission_check_heartbeat", metavar="", type=int,
189  default=default_submission_check_heartbeat,
190  help=("The time (seconds) between checking if there are fewer batch jobs than the global limit. "
191  "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
192  f" (default: {default_submission_check_heartbeat})"))
193 
194 
195  condor_parser = subparsers.add_parser("HTCondor",
196  help="Use the HTCondor backend.",
197  description="Runs the jobs using the HTCondor backend.",
198  formatter_class=argparse.RawDescriptionHelpFormatter)
199  condor_parser.set_defaults(func=condor_func)
200 
201  condor_parser.add_argument("--getenv", dest="getenv", metavar="",
202  help=("Should jobs inherit the submitting environment (doesn't always work as expected)."
203  f" e.g. false"))
204 
205  condor_parser.add_argument("--universe", dest="universe", metavar="",
206  help=("Jobs should be submitted using this univese."
207  " e.g. vanilla"))
208 
209  condor_parser.add_argument("--path-prefix", dest="path_prefix", metavar="", default="",
210  help=("The string that should be pre-appended to file path given to backend"
211  " e.g. root://dcbldoor.sdcc.bnl.gov:1096"))
212 
213  condor_parser.add_argument(
214  "--global-job-limit",
215  dest="global_job_limit",
216  metavar="",
217  default=default_global_job_limit,
218  type=int,
219  help=(
220  "The number of batch jobs that can be active for the user before the backend class will stop "
221  "submitting. This is not a completely hard limit, the actual max reached depends on other "
222  "submitting processes and the number submitted before re-checking."
223  f" (default: {default_global_job_limit})"))
224 
225  condor_parser.add_argument("--submission-check-heartbeat", dest="submission_check_heartbeat", metavar="", type=int,
226  default=default_submission_check_heartbeat,
227  help=("The time (seconds) between checking if there are fewer batch jobs than the global limit. "
228  "Generally not needed to change, but it certainly shouldn't be set lower than 30 seconds."
229  f" (default: {default_submission_check_heartbeat})"))
230 
231  return [local_parser, lsf_parser, pbs_parser, condor_parser]