Belle II Software  release-06-01-15
backends.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
13 import re
14 import os
15 from abc import ABC, abstractmethod
16 import json
17 import xml.etree.ElementTree as ET
18 from math import ceil
19 from pathlib import Path
20 from collections import deque
21 from itertools import count, takewhile
22 import shutil
23 import time
24 from datetime import datetime, timedelta
25 import subprocess
26 import multiprocessing as mp
27 
28 from caf.utils import method_dispatch
29 from caf.utils import decode_json_string
30 from caf.utils import grouper
31 from caf.utils import parse_file_uri
32 
33 
34 __all__ = ["Job", "SubJob", "Backend", "Local", "Batch", "LSF", "PBS", "HTCondor", "get_input_data"]
35 
36 
37 _input_data_file_path = Path("__BACKEND_INPUT_FILES__.json")
38 
39 _STDOUT_FILE = "stdout"
40 
41 _STDERR_FILE = "stderr"
42 
43 
44 def get_input_data():
45  """
46  Simple JSON load of the default input data file. Will contain a list of string file paths
47  for use by the job process.
48  """
49  with open(_input_data_file_path, mode="r") as input_data_file:
50  input_data = json.load(input_data_file)
51  return input_data
52 
53 
54 def monitor_jobs(args, jobs):
55  unfinished_jobs = jobs[:]
56  failed_jobs = 0
57  while unfinished_jobs:
58  B2INFO("Updating statuses of unfinished jobs...")
59  for j in unfinished_jobs:
60  j.update_status()
61  B2INFO("Checking if jobs are ready...")
62  for j in unfinished_jobs[:]:
63  if j.ready():
64  if j.status == "failed":
65  B2ERROR(f"{j} is failed")
66  failed_jobs += 1
67  else:
68  B2INFO(f"{j} is finished")
69  unfinished_jobs.remove(j)
70  if unfinished_jobs:
71  B2INFO(f"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
72  time.sleep(args.heartbeat)
73  if failed_jobs > 0:
74  B2ERROR(f"{failed_jobs} jobs failed")
75  else:
76  B2INFO('All jobs finished successfully')
77 
78 
80  def __init__(self, generator_function, *args, **kwargs):
81  """
82  Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
83  initialise it. This lets us re-use a generator by setting it up again fresh. This is not
84  optimal for expensive calculations, but it is nice for making large sequences of
85  Job input arguments on the fly.
86 
87  Parameters:
88  generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
89  should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
90  yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
91  args (tuple): The positional arguments you want to send into the intialisation of the generator.
92  kwargs (dict): The keyword arguments you want to send into the intialisation of the generator.
93  """
94 
95  self.generator_functiongenerator_function = generator_function
96 
97  self.argsargs = args
98 
99  self.kwargskwargs = kwargs
100 
101  @property
102  def generator(self):
103  """
104  Returns:
105  generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
106  to have ``next``/``send`` called on it.
107  """
108  gen = self.generator_functiongenerator_function(*self.argsargs, **self.kwargskwargs)
109  gen.send(None) # Prime it
110  return gen
111 
112 
113 def range_arguments(start=0, stop=None, step=1):
114  """
115  A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
116  It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
117  The `SubJob` object is sent into this function with `send` but is not used.
118 
119  Parameters:
120  start (int): The starting value that will be returned.
121  stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
122  forever.
123  step (int): The step size.
124 
125  Returns:
126  tuple
127  """
128  def should_stop(x):
129  if stop is not None and x >= stop:
130  return True
131  else:
132  return False
133  # The logic of send/yield is tricky. Basically the first yield must have been hit before send(subjob) can work.
134  # However the first yield is also the one that receives the send(subjob) NOT the send(None).
135  subjob = (yield None) # Handle the send(None) and first send(subjob)
136  args = None
137  for i in takewhile(lambda x: not should_stop(x), count(start, step)):
138  args = (i,) # We could have used the subjob object here
139  B2DEBUG(29, f"{subjob} arguments will be {args}")
140  subjob = (yield args) # Set up ready for the next loop (if it happens)
141 
142 
143 class SubjobSplitter(ABC):
144  """
145  Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
146  The `create_subjobs` function should be implemented and used to construct
147  the subjobs of the parent job object.
148 
149  Parameters:
150  arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
151  tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
152  called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
153  arguments to return.
154  """
155 
156  def __init__(self, *, arguments_generator=None):
157  """
158  Derived classes should call `super` to run this.
159  """
160 
161  self.arguments_generatorarguments_generator = arguments_generator
162 
163  @abstractmethod
164  def create_subjobs(self, job):
165  """
166  Implement this method in derived classes to generate the `SubJob` objects.
167  """
168 
169  def assign_arguments(self, job):
170  """
171  Use the `arguments_generator` (if one exists) to assign the argument tuples to the
172  subjobs.
173  """
174  if self.arguments_generator:
175  arg_gen = self.arguments_generator.generator
176  generating = True
177  for subjob in sorted(job.subjobs.values(), key=lambda sj: sj.id):
178  if generating:
179  try:
180  args = arg_gen.send(subjob)
181  except StopIteration:
182  B2ERROR((f"StopIteration called when getting args for {subjob}, "
183  "setting all subsequent subjobs to have empty argument tuples."))
184  args = tuple() # If our generator finishes before the subjobs we use empty argument tuples
185  generating = False
186  else:
187  args = tuple()
188  B2DEBUG(29, f"Arguments for {subjob}: {args}")
189  subjob.args = args
190  return
191 
192  B2INFO((f"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
193  "won't automatically have arguments assigned."))
194 
195  def __repr__(self):
196  return f"{self.__class__.__name__}"
197 
198 
200 
201  def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
202  """
203  Parameters:
204  max_files_per_subjob (int): The maximium number of input files used per `SubJob` created.
205  """
206  super().__init__(arguments_generator=arguments_generator)
207 
208  self.max_files_per_subjobmax_files_per_subjob = max_files_per_subjob
209 
210  def create_subjobs(self, job):
211  """
212  This function creates subjobs for the parent job passed in. It creates as many subjobs as required
213  in order to prevent the number of input files per subjob going over the limit set by
214  `MaxFilesSplitter.max_files_per_subjob`.
215  """
216  if not job.input_files:
217  B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
218  return
219 
220  for i, subjob_input_files in enumerate(grouper(self.max_files_per_subjobmax_files_per_subjob, job.input_files)):
221  job.create_subjob(i, input_files=subjob_input_files)
222 
223  self.assign_argumentsassign_arguments(job)
224 
225  B2INFO(f"{self} created {i+1} Subjobs for {job}")
226 
227 
229 
230  def __init__(self, *, arguments_generator=None, max_subjobs=1000):
231  """
232  Parameters:
233  max_subjobs (int): The maximium number ofsubjobs that will be created.
234  """
235  super().__init__(arguments_generator=arguments_generator)
236 
237  self.max_subjobsmax_subjobs = max_subjobs
238 
239  def create_subjobs(self, job):
240  """
241  This function creates subjobs for the parent job passed in. It creates as many subjobs as required
242  by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
243  more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
244  respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
245  will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
246  """
247  if not job.input_files:
248  B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
249  return
250 
251  # Since we will be popping from the left of the input file list, we construct a deque
252  remaining_input_files = deque(job.input_files)
253  # The initial number of available subjobs, will go down as we create them
254  available_subjobs = self.max_subjobsmax_subjobs
255  subjob_i = 0
256  while remaining_input_files:
257  # How many files should we use for this subjob?
258  num_input_files = ceil(len(remaining_input_files) / available_subjobs)
259  # Pop them from the remaining files
260  subjob_input_files = []
261  for i in range(num_input_files):
262  subjob_input_files.append(remaining_input_files.popleft())
263  # Create the actual subjob
264  job.create_subjob(subjob_i, input_files=subjob_input_files)
265  subjob_i += 1
266  available_subjobs -= 1
267 
268  self.assign_argumentsassign_arguments(job)
269  B2INFO(f"{self} created {subjob_i} Subjobs for {job}")
270 
271 
273  """
274  Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
275  Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
276  of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
277 
278  This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
279  numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
280  assinged to every `SubJob`.
281 
282  Parameters:
283  arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
284  """
285 
286  def __init__(self, *, arguments_generator=None, max_subjobs=None):
287  """
288  """
289  super().__init__(arguments_generator=arguments_generator)
290 
291  self.max_subjobsmax_subjobs = max_subjobs
292 
293  def create_subjobs(self, job):
294  """
295  This function creates subjobs for the parent job passed in. It creates subjobs until the
296  `SubjobSplitter.arguments_generator` finishes.
297 
298  If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
299  subjobs are created.
300  """
301  arg_gen = self.arguments_generatorarguments_generator.generator
302  for i in count():
303  # Reached the maximum?
304  if i >= self.max_subjobsmax_subjobs:
305  raise SplitterError(f"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
306  try:
307  subjob = SubJob(job, i, job.input_files) # We manually create it because we might not need it
308  args = arg_gen.send(subjob) # Might throw StopIteration
309  B2INFO(f"Creating {job}.{subjob}")
310  B2DEBUG(29, f"Arguments for {subjob}: {args}")
311  subjob.args = args
312  job.subjobs[i] = subjob
313  except StopIteration:
314  break
315  B2INFO(f"{self} created {i+1} Subjobs for {job}")
316 
317 
318 class Job:
319  """
320  This generic Job object is used to tell a Backend what to do.
321  This object basically holds necessary information about a process you want to submit to a `Backend`.
322  It should *not* do anything that is backend specific, just hold the configuration for a job to be
323  successfully submitted and monitored using a backend. The result attribute is where backend
324  specific job monitoring goes.
325 
326  Parameters:
327  name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
328 
329  .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
330  """
331 
332 
334  statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
335 
336 
337  exit_statuses = ["failed", "completed"]
338 
339  def __init__(self, name, job_dict=None):
340  """
341  """
342 
343  self.namename = name
344 
345  self.splittersplitter = None
346 
347  if not job_dict:
348 
351 
352  self.working_dirworking_dirworking_dirworking_dir = Path()
353 
354  self.output_diroutput_diroutput_diroutput_dir = Path()
355 
356  self.output_patternsoutput_patterns = []
357 
358  self.cmdcmd = []
359 
360  self.argsargs = []
361 
363 
364  self.setup_cmdssetup_cmds = []
365 
367  self.backend_argsbackend_args = {}
368 
369  self.subjobssubjobs = {}
370  elif job_dict:
371  self.input_sandbox_filesinput_sandbox_filesinput_sandbox_filesinput_sandbox_files = [Path(p) for p in job_dict["input_sandbox_files"]]
372  self.working_dirworking_dirworking_dirworking_dir = Path(job_dict["working_dir"])
373  self.output_diroutput_diroutput_diroutput_dir = Path(job_dict["output_dir"])
374  self.output_patternsoutput_patterns = job_dict["output_patterns"]
375  self.cmdcmd = job_dict["cmd"]
376  self.argsargs = job_dict["args"]
377  self.input_filesinput_filesinput_filesinput_files = job_dict["input_files"]
378  self.setup_cmdssetup_cmds = job_dict["setup_cmds"]
379  self.backend_argsbackend_args = job_dict["backend_args"]
380  self.subjobssubjobs = {}
381  for subjob_dict in job_dict["subjobs"]:
382  self.create_subjobcreate_subjob(subjob_dict["id"], input_files=subjob_dict["input_files"], args=subjob_dict["args"])
383 
384 
386  self.resultresult = None
387 
388  self._status_status = "init"
389 
390  def __repr__(self):
391  """
392  Representation of Job class (what happens when you print a Job() instance).
393  """
394  return f"Job({self.name})"
395 
396  def ready(self):
397  """
398  Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
399  It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
400  their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
401  """
402  if not self.resultresult:
403  B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
404  return False
405  else:
406  return self.resultresult.ready()
407 
408  def update_status(self):
409  """
410  Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
411  in the best way for the type of result object/backend.
412  """
413  if not self.resultresult:
414  B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
415  else:
416  self.resultresult.update_status()
417  return self.statusstatusstatusstatus
418 
419  def create_subjob(self, i, input_files=None, args=None):
420  """
421  Creates a subjob Job object that references that parent Job.
422  Returns the SubJob object at the end.
423  """
424  if i not in self.subjobssubjobs:
425  B2INFO(f"Creating {self}.Subjob({i})")
426  subjob = SubJob(self, i, input_files)
427  if args:
428  subjob.args = args
429  self.subjobssubjobs[i] = subjob
430  return subjob
431  else:
432  B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
433 
434  @property
435  def status(self):
436  """
437  Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
438  subjob status in the hierarchy of statuses in `Job.statuses`.
439  """
440  if self.subjobssubjobs:
441  job_status = self._get_overall_status_from_subjobs_get_overall_status_from_subjobs()
442  if job_status != self._status_status:
443 
444  self.statusstatusstatusstatus = job_status
445  return self._status_status
446 
447  def _get_overall_status_from_subjobs(self):
448  subjob_statuses = [subjob.status for subjob in self.subjobssubjobs.values()]
449  status_level = min([self.statusesstatuses[status] for status in subjob_statuses])
450  for status, level in self.statusesstatuses.items():
451  if level == status_level:
452  return status
453 
454  @status.setter
455  def status(self, status):
456  """
457  Sets the status of this Job.
458  """
459  # Print an error only if the job failed.
460  if status == 'failed':
461  B2ERROR(f"Setting {self.name} status to failed")
462  else:
463  B2INFO(f"Setting {self.name} status to {status}")
464  self._status_status = status
465 
466  @property
467  def output_dir(self):
468  return self._output_dir_output_dir
469 
470  @output_dir.setter
471  def output_dir(self, value):
472  self._output_dir_output_dir = Path(value).absolute()
473 
474  @property
475  def working_dir(self):
476  return self._working_dir_working_dir
477 
478  @working_dir.setter
479  def working_dir(self, value):
480  self._working_dir_working_dir = Path(value).absolute()
481 
482  @property
483  def input_sandbox_files(self):
484  return self._input_sandbox_files_input_sandbox_files
485 
486  @input_sandbox_files.setter
487  def input_sandbox_files(self, value):
488  self._input_sandbox_files_input_sandbox_files = [Path(p).absolute() for p in value]
489 
490  @property
491  def input_files(self):
492  return self._input_files_input_files
493 
494  @input_files.setter
495  def input_files(self, value):
496  self._input_files_input_files = value
497 
498  @property
499  def max_subjobs(self):
500  return self.splittersplitter.max_subjobs
501 
502  @max_subjobs.setter
503  def max_subjobs(self, value):
504  self.splittersplitter = MaxSubjobsSplitter(max_subjobs=value)
505  B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
506 
507  @property
508  def max_files_per_subjob(self):
509  return self.splittersplitter.max_files_per_subjob
510 
511  @max_files_per_subjob.setter
512  def max_files_per_subjob(self, value):
513  self.splittersplitter = MaxFilesSplitter(max_files_per_subjob=value)
514  B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
515 
516  def dump_to_json(self, file_path):
517  """
518  Dumps the Job object configuration to a JSON file so that it can be read in again later.
519 
520  Parameters:
521  file_path(`basf2.Path`): The filepath we'll dump to
522  """
523  with open(file_path, mode="w") as job_file:
524  json.dump(self.job_dictjob_dict, job_file, indent=2)
525 
526  @classmethod
527  def from_json(cls, file_path):
528  with open(file_path, mode="r") as job_file:
529  job_dict = json.load(job_file)
530  return cls(job_dict["name"], job_dict=job_dict)
531 
532  @property
533  def job_dict(self):
534  """
535  Returns:
536  dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
537  `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
538  """
539  job_dict = {}
540  job_dict["name"] = self.namename
541  job_dict["input_sandbox_files"] = [i.as_posix() for i in self.input_sandbox_filesinput_sandbox_filesinput_sandbox_filesinput_sandbox_files]
542  job_dict["working_dir"] = self.working_dirworking_dirworking_dirworking_dir.as_posix()
543  job_dict["output_dir"] = self.output_diroutput_diroutput_diroutput_dir.as_posix()
544  job_dict["output_patterns"] = self.output_patternsoutput_patterns
545  job_dict["cmd"] = self.cmdcmd
546  job_dict["args"] = self.argsargs
547  job_dict["input_files"] = self.input_filesinput_filesinput_filesinput_files
548  job_dict["setup_cmds"] = self.setup_cmdssetup_cmds
549  job_dict["backend_args"] = self.backend_argsbackend_args
550  job_dict["subjobs"] = [sj.job_dict for sj in self.subjobssubjobs.values()]
551  return job_dict
552 
553  def dump_input_data(self):
554  """
555  Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
556  string URI objects.
557  """
558  with open(Path(self.working_dirworking_dirworking_dirworking_dir, _input_data_file_path), mode="w") as input_data_file:
559  json.dump(self.input_filesinput_filesinput_filesinput_files, input_data_file, indent=2)
560 
562  """
563  Get all of the requested files for the input sandbox and copy them to the working directory.
564  Files like the submit.sh or input_data.json are not part of this process.
565  """
566  for file_path in self.input_sandbox_filesinput_sandbox_filesinput_sandbox_filesinput_sandbox_files:
567  if file_path.is_dir():
568  shutil.copytree(file_path, Path(self.working_dirworking_dirworking_dirworking_dir, file_path.name))
569  else:
570  shutil.copy(file_path, self.working_dirworking_dirworking_dirworking_dir)
571 
573  """
574  Check the input files and make sure that there aren't any duplicates.
575  Also check if the files actually exist if possible.
576  """
577  existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
578  for file_path in self.input_filesinput_filesinput_filesinput_files:
579  file_uri = parse_file_uri(file_path)
580  if file_uri.scheme == "file":
581  p = Path(file_uri.path)
582  if p.is_file():
583  if file_uri.geturl() not in existing_input_files:
584  existing_input_files.append(file_uri.geturl())
585  else:
586  B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
587  else:
588  B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
589  else:
590  B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
591  if file_path not in existing_input_files:
592  existing_input_files.append(file_path)
593  else:
594  B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
595  if self.input_filesinput_filesinput_filesinput_files and not existing_input_files:
596  B2WARNING(f"No valid input file paths found for {self.name}, but some were requested.")
597 
598  # Replace the Job's input files with the ones that exist + duplicates removed
599  self.input_filesinput_filesinput_filesinput_files = existing_input_files
600 
601  @property
602  def full_command(self):
603  """
604  Returns:
605  str: The full command that this job will run including any arguments.
606  """
607  all_components = self.cmdcmd[:]
608  all_components.extend(self.argsargs)
609  # We do a convert to string just in case arguments were generated as different types
610  full_command = " ".join(map(str, all_components))
611  B2DEBUG(29, f"Full command of {self} is '{full_command}'")
612  return full_command
613 
615  """
616  This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
617  It should detect if you are using a local release or CVMFS and append the correct commands
618  so that the job will have the same basf2 release environment. It should also detect
619  if a local release is not compiled with the ``opt`` option.
620 
621  Note that this *doesn't mean that every environment variable is inherited* from the submitting
622  process environment.
623  """
624  if "BELLE2_TOOLS" not in os.environ:
625  raise BackendError("No BELLE2_TOOLS found in environment")
626  if "BELLE2_CONFIG_DIR" in os.environ:
627  self.setup_cmdssetup_cmds.append(f"""if [ -z "${{BELLE2_CONFIG_DIR}}" ]; then
628  export BELLE2_CONFIG_DIR={os.environ['BELLE2_CONFIG_DIR']}
629  fi""")
630  if "VO_BELLE2_SW_DIR" in os.environ:
631  self.setup_cmdssetup_cmds.append(f"""if [ -z "${{VO_BELLE2_SW_DIR}}" ]; then
632  export VO_BELLE2_SW_DIR={os.environ['VO_BELLE2_SW_DIR']}
633  fi""")
634  if "BELLE2_RELEASE" in os.environ:
635  self.setup_cmdssetup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
636  elif 'BELLE2_LOCAL_DIR' in os.environ:
637  self.setup_cmdssetup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
638  self.setup_cmdssetup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
639  self.setup_cmdssetup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
640  self.setup_cmdssetup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
641  self.setup_cmdssetup_cmds.append("pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
642  self.setup_cmdssetup_cmds.append("source $BACKEND_B2SETUP")
643  # b2code-option has to be executed only after the source of the tools.
644  self.setup_cmdssetup_cmds.append("b2code-option $BACKEND_BELLE2_OPTION")
645  self.setup_cmdssetup_cmds.append("popd > /dev/null")
646 
647 
648 class SubJob(Job):
649  """
650  This mini-class simply holds basic information about which subjob you are
651  and a reference to the parent Job object to be able to access the main data there.
652  Rather than replicating all of the parent job's configuration again.
653  """
654 
655  def __init__(self, job, subjob_id, input_files=None):
656  """
657  """
658 
659  self.idid = subjob_id
660 
661  self.parentparent = job
662 
663  if not input_files:
664  input_files = []
665  self.input_filesinput_filesinput_filesinput_filesinput_files = input_files
666 
668  self.resultresultresult = None
669 
670  self._status_status_status = "init"
671 
672  self.argsargsargs = []
673 
674  @property
675  def output_dir(self):
676  """
677  Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
678  return Path(self.parentparent.output_dir, str(self.idid))
679 
680  @property
681  def working_dir(self):
682  """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
683  return Path(self.parentparent.working_dir, str(self.idid))
684 
685  @property
686  def name(self):
687  """Getter for name of SubJob. Accesses the parent Job name to infer this."""
688  return "_".join((self.parentparent.name, str(self.idid)))
689 
690  @property
691  def status(self):
692  """
693  Returns the status of this SubJob.
694  """
695  return self._status_status_status
696 
697  @status.setter
698  def status(self, status):
699  """
700  Sets the status of this Job.
701  """
702  # Print an error only if the job failed.
703  if status == "failed":
704  B2ERROR(f"Setting {self.name} status to failed")
705  else:
706  B2INFO(f"Setting {self.name} status to {status}")
707  self._status_status_status = status
708 
709  @property
710  def subjobs(self):
711  """
712  A subjob cannot have subjobs. Always return empty list.
713  """
714  return []
715 
716  @property
717  def job_dict(self):
718  """
719  Returns:
720  dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
721  `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
722  we only output the input files and arguments that are specific to this subjob and no other details.
723  """
724  job_dict = {}
725  job_dict["id"] = self.idid
726  job_dict["input_files"] = self.input_filesinput_filesinput_filesinput_filesinput_files
727  job_dict["args"] = self.argsargsargs
728  return job_dict
729 
730  def __getattr__(self, attribute):
731  """
732  Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
733  unless otherwise specified.
734  """
735  return getattr(self.parentparent, attribute)
736 
737  def __repr__(self):
738  """
739  """
740  return f"SubJob({self.name})"
741 
742 
743 class Backend(ABC):
744  """
745  Abstract base class for a valid backend.
746  Classes derived from this will implement their own submission of basf2 jobs
747  to whatever backend they describe.
748  Some common methods/attributes go into this base class.
749 
750  For backend_args the priority from lowest to highest is:
751 
752  backend.default_backend_args -> backend.backend_args -> job.backend_args
753  """
754 
755  submit_script = "submit.sh"
756 
757  exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
758 
759  default_backend_args = {}
760 
761  def __init__(self, *, backend_args=None):
762  """
763  """
764  if backend_args is None:
765  backend_args = {}
766 
767  self.backend_argsbackend_args = {**self.default_backend_argsdefault_backend_args, **backend_args}
768 
769  @abstractmethod
770  def submit(self, job):
771  """
772  Base method for submitting collection jobs to the backend type. This MUST be
773  implemented for a correctly written backend class deriving from Backend().
774  """
775 
776  @staticmethod
777  def _add_setup(job, batch_file):
778  """
779  Adds setup lines to the shell script file.
780  """
781  for line in job.setup_cmds:
782  print(line, file=batch_file)
783 
784  def _add_wrapper_script_setup(self, job, batch_file):
785  """
786  Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
787  `trap` statements for Ctrl-C situations.
788  """
789  start_wrapper = f"""# ---
790 # trap ctrl-c and call ctrl_c()
791 trap '(ctrl_c 130)' SIGINT
792 trap '(ctrl_c 143)' SIGTERM
793 
794 function write_exit_code() {{
795  echo "Writing $1 to exit status file"
796  echo "$1" > {self.exit_code_file}
797  exit $1
798 }}
799 
800 function ctrl_c() {{
801  trap '' SIGINT SIGTERM
802  echo "** Trapped Ctrl-C **"
803  echo "$1" > {self.exit_code_file}
804  exit $1
805 }}
806 # ---"""
807  print(start_wrapper, file=batch_file)
808 
809  def _add_wrapper_script_teardown(self, job, batch_file):
810  """
811  Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
812  an exit code of the job cmd being written out to a file. Which means that we can know if the command was
813  successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
814  removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
815  file.
816  """
817  end_wrapper = """# ---
818 write_exit_code $?"""
819  print(end_wrapper, file=batch_file)
820 
821  @classmethod
822  def _create_parent_job_result(cls, parent):
823  """
824  We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
825  so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
826  statuses and allows the use of ready().
827  """
828  raise NotImplementedError
829 
830  def get_submit_script_path(self, job):
831  """
832  Construct the Path object of the bash script file that we will submit. It will contain
833  the actual job command, wrapper commands, setup commands, and any batch directives
834  """
835  return Path(job.working_dir, self.submit_scriptsubmit_script)
836 
837 
838 class Result():
839  """
840  Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
841  submitted to a backend. It provides a way to query a job's status to find out if it's ready.
842  """
843 
844  def __init__(self, job):
845  """
846  Pass in the job object to allow the result to access the job's properties and do post-processing.
847  """
848 
849  self.jobjob = job
850 
851  self._is_ready_is_ready = False
852 
854  self.time_to_wait_for_exit_code_filetime_to_wait_for_exit_code_file = timedelta(minutes=5)
855 
856  self.exit_code_file_initial_timeexit_code_file_initial_time = None
857 
858  def ready(self):
859  """
860  Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
861  the 'readiness' based on the known job status.
862  """
863  B2DEBUG(29, f"Calling {self.job}.result.ready()")
864  if self._is_ready_is_ready:
865  return True
866  elif self.jobjob.status in self.jobjob.exit_statuses:
867  self._is_ready_is_ready = True
868  return True
869  else:
870  return False
871 
872  def update_status(self):
873  """
874  Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
875  actually look up the job's status from some database/exit code file.
876  """
877  raise NotImplementedError
878 
880  """
881  Read the exit code file to discover the exit status of the job command. Useful falback if the job is no longer
882  known to the job database (batch system purged it for example). Since some backends may take time to download
883  the output files of the job back to the working directory we use a time limit on how long to wait.
884  """
885  if not self.exit_code_file_initial_timeexit_code_file_initial_time:
886  self.exit_code_file_initial_timeexit_code_file_initial_time = datetime.now()
887  exit_code_path = Path(self.jobjob.working_dir, Backend.exit_code_file)
888  with open(exit_code_path, "r") as f:
889  exit_code = int(f.read().strip())
890  B2DEBUG(29, f"Exit code from file for {self.job} was {exit_code}")
891  return exit_code
892 
893 
894 class Local(Backend):
895  """
896  Backend for local processes i.e. on the same machine but in a subprocess.
897 
898  Note that you should call the self.join() method to close the pool and wait for any
899  running processes to finish before exiting the process. Once you've called join you will have to set up a new
900  instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
901  somewhere, then the main python process might end before your pool is done.
902 
903  Keyword Arguments:
904  max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
905  It's the maximium simultaneous subjobs.
906  Try not to specify a large number or a number larger than the number of cores.
907  It won't crash the program but it will slow down and negatively impact performance.
908  """
909 
910  def __init__(self, *, backend_args=None, max_processes=1):
911  """
912  """
913  super().__init__(backend_args=backend_args)
914 
915  self.poolpool = None
916 
917  self.max_processesmax_processesmax_processesmax_processes = max_processes
918 
920  """
921  Result class to help monitor status of jobs submitted by Local backend.
922  """
923 
924  def __init__(self, job, result):
925  """
926  Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
927  post processing of the job.
928  """
929  super().__init__(job)
930 
931  self.resultresult = result
932 
933  def _update_result_status(self):
934  if self.resultresult.ready() and (self.jobjob.status not in self.jobjob.exit_statuses):
935  return_code = self.resultresult.get()
936  if return_code:
937  self.jobjob.status = "failed"
938  else:
939  self.jobjob.status = "completed"
940 
941  def update_status(self):
942  """
943  Update the job's (or subjobs') status by calling the result object.
944  """
945  B2DEBUG(29, f"Calling {self.job}.result.update_status()")
946  if self.jobjob.subjobs:
947  for subjob in self.jobjob.subjobs.values():
948  subjob.result._update_result_status()
949  else:
950  self._update_result_status_update_result_status()
951 
952  def join(self):
953  """
954  Closes and joins the Pool, letting you wait for all results currently
955  still processing.
956  """
957  B2INFO("Joining Process Pool, waiting for results to finish...")
958  self.poolpool.close()
959  self.poolpool.join()
960  B2INFO("Process Pool joined.")
961 
962  @property
963  def max_processes(self):
964  """
965  Getter for max_processes
966  """
967  return self._max_processes_max_processes
968 
969  @max_processes.setter
970  def max_processes(self, value):
971  """
972  Setter for max_processes, we also check for a previous Pool(), wait for it to join
973  and then create a new one with the new value of max_processes
974  """
975 
976  self._max_processes_max_processes = value
977  if self.poolpool:
978  B2INFO("New max_processes requested. But a pool already exists.")
979  self.joinjoin()
980  B2INFO(f"Starting up new Pool with {self.max_processes} processes")
981  self.poolpool = mp.Pool(processes=self.max_processesmax_processesmax_processesmax_processes)
982 
983  @method_dispatch
984  def submit(self, job):
985  """
986  """
987  raise NotImplementedError(("This is an abstract submit(job) method that shouldn't have been called. "
988  "Did you submit a (Sub)Job?"))
989 
990  @submit.register(SubJob)
991  def _(self, job):
992  """
993  Submission of a `SubJob` for the Local backend
994  """
995  # Make sure the output directory of the job is created
996  job.output_dir.mkdir(parents=True, exist_ok=True)
997  # Make sure the working directory of the job is created
998  job.working_dir.mkdir(parents=True, exist_ok=True)
999  job.copy_input_sandbox_files_to_working_dir()
1000  job.dump_input_data()
1001  # Get the path to the bash script we run
1002  script_path = self.get_submit_script_pathget_submit_script_path(job)
1003  with open(script_path, mode="w") as batch_file:
1004  print("#!/bin/bash", file=batch_file)
1005  self._add_wrapper_script_setup_add_wrapper_script_setup(job, batch_file)
1006  self._add_setup_add_setup(job, batch_file)
1007  print(job.full_command, file=batch_file)
1008  self._add_wrapper_script_teardown_add_wrapper_script_teardown(job, batch_file)
1009  B2INFO(f"Submitting {job}")
1010  job.result = Local.LocalResult(job,
1011  self.poolpool.apply_async(self.run_jobrun_job,
1012  (job.name,
1013  job.working_dir,
1014  job.output_dir,
1015  script_path)
1016  )
1017  )
1018  job.status = "submitted"
1019  B2INFO(f"{job} submitted")
1020 
1021  @submit.register(Job)
1022  def _(self, job):
1023  """
1024  Submission of a `Job` for the Local backend
1025  """
1026  # Make sure the output directory of the job is created
1027  job.output_dir.mkdir(parents=True, exist_ok=True)
1028  # Make sure the working directory of the job is created
1029  job.working_dir.mkdir(parents=True, exist_ok=True)
1030  # Check if we have any valid input files
1031  job.check_input_data_files()
1032 
1033  if not job.splitter:
1034  # Get all of the requested files for the input sandbox and copy them to the working directory
1035  job.copy_input_sandbox_files_to_working_dir()
1036  job.dump_input_data()
1037  # Get the path to the bash script we run
1038  script_path = self.get_submit_script_pathget_submit_script_path(job)
1039  with open(script_path, mode="w") as batch_file:
1040  print("#!/bin/bash", file=batch_file)
1041  self._add_wrapper_script_setup_add_wrapper_script_setup(job, batch_file)
1042  self._add_setup_add_setup(job, batch_file)
1043  print(job.full_command, file=batch_file)
1044  self._add_wrapper_script_teardown_add_wrapper_script_teardown(job, batch_file)
1045  B2INFO(f"Submitting {job}")
1046  job.result = Local.LocalResult(job,
1047  self.poolpool.apply_async(self.run_jobrun_job,
1048  (job.name,
1049  job.working_dir,
1050  job.output_dir,
1051  script_path)
1052  )
1053  )
1054  B2INFO(f"{job} submitted")
1055  else:
1056  # Create subjobs according to the splitter's logic
1057  job.splitter.create_subjobs(job)
1058  # Submit the subjobs
1059  self.submitsubmitsubmit(list(job.subjobs.values()))
1060  # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1061  self._create_parent_job_result_create_parent_job_result_create_parent_job_result(job)
1062 
1063  @submit.register(list)
1064  def _(self, jobs):
1065  """
1066  Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1067  """
1068  # Submit the jobs
1069  for job in jobs:
1070  self.submitsubmitsubmit(job)
1071  B2INFO("All requested jobs submitted.")
1072 
1073  @staticmethod
1074  def run_job(name, working_dir, output_dir, script):
1075  """
1076  The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1077  shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1078  """
1079  B2INFO(f"Starting Sub-process: {name}")
1080  from subprocess import Popen
1081  stdout_file_path = Path(working_dir, _STDOUT_FILE)
1082  stderr_file_path = Path(working_dir, _STDERR_FILE)
1083  # Create unix command to redirect stdour and stderr
1084  B2INFO(f"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1085  with open(stdout_file_path, mode="w", buffering=1) as f_out, \
1086  open(stderr_file_path, mode="w", buffering=1) as f_err:
1087  with Popen(["/bin/bash", script.as_posix()],
1088  stdout=f_out,
1089  stderr=f_err,
1090  bufsize=1,
1091  universal_newlines=True,
1092  cwd=working_dir,
1093  env={}) as p:
1094  # We block here and wait so that the return code will be set.
1095  p.wait()
1096  B2INFO(f"Subprocess {name} finished.")
1097  return p.returncode
1098 
1099  @classmethod
1100  def _create_parent_job_result(cls, parent):
1101  parent.result = cls.LocalResultLocalResult(parent, None)
1102 
1103 
1105  """
1106  Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1107  in a derived class. Do not use this class directly!
1108  """
1109 
1110  submission_cmds = []
1111 
1123  default_global_job_limit = 1000
1124 
1125  default_sleep_between_submission_checks = 30
1126 
1127  def __init__(self, *, backend_args=None):
1128  """
1129  Init method for Batch Backend. Does some basic default setup.
1130  """
1131  super().__init__(backend_args=backend_args)
1132 
1134  self.global_job_limitglobal_job_limit = self.default_global_job_limitdefault_global_job_limit
1135 
1137  self.sleep_between_submission_checkssleep_between_submission_checks = self.default_sleep_between_submission_checksdefault_sleep_between_submission_checks
1138 
1139  def _add_batch_directives(self, job, file):
1140  """
1141  Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1142  You should think about where the stdout/err should go, and set the queue name.
1143  """
1144  raise NotImplementedError(("Need to implement a _add_batch_directives(self, job, file) "
1145  f"method in {self.__class__.__name__} backend."))
1146 
1147  def _make_submit_file(self, job, submit_file_path):
1148  """
1149  Useful for the HTCondor backend where a submit is needed instead of batch
1150  directives pasted directly into the submission script. It should be overwritten
1151  if needed.
1152  """
1153 
1154  @classmethod
1155  @abstractmethod
1156  def _submit_to_batch(cls, cmd):
1157  """
1158  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1159  """
1160 
1161  def can_submit(self, *args, **kwargs):
1162  """
1163  Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1164  This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1165 
1166  Returns:
1167  bool: If the job submission can continue based on the current situation.
1168  """
1169  return True
1170 
1171  @method_dispatch
1172  def submit(self, job, check_can_submit=True, jobs_per_check=100):
1173  """
1174  """
1175  raise NotImplementedError(("This is an abstract submit(job) method that shouldn't have been called. "
1176  "Did you submit a (Sub)Job?"))
1177 
1178  @submit.register(SubJob)
1179  def _(self, job, check_can_submit=True, jobs_per_check=100):
1180  """
1181  Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1182  create batch script, and send it off with the batch submission command.
1183  It should apply the correct options (default and user requested).
1184 
1185  Should set a Result object as an attribute of the job.
1186  """
1187  # Make sure the output directory of the job is created, commented out due to permission issues
1188  # job.output_dir.mkdir(parents=True, exist_ok=True)
1189  # Make sure the working directory of the job is created
1190  job.working_dir.mkdir(parents=True, exist_ok=True)
1191  job.copy_input_sandbox_files_to_working_dir()
1192  job.dump_input_data()
1193  # Make submission file if needed
1194  batch_submit_script_path = self.get_batch_submit_script_pathget_batch_submit_script_path(job)
1195  self._make_submit_file_make_submit_file(job, batch_submit_script_path)
1196  # Get the bash file we will actually run, might be the same file
1197  script_path = self.get_submit_script_pathget_submit_script_path(job)
1198  # Construct the batch submission script (with directives if that is supported)
1199  with open(script_path, mode="w") as batch_file:
1200  self._add_batch_directives_add_batch_directives(job, batch_file)
1201  self._add_wrapper_script_setup_add_wrapper_script_setup(job, batch_file)
1202  self._add_setup_add_setup(job, batch_file)
1203  print(job.full_command, file=batch_file)
1204  self._add_wrapper_script_teardown_add_wrapper_script_teardown(job, batch_file)
1205  os.chmod(script_path, 0o755)
1206  B2INFO(f"Submitting {job}")
1207  # Do the actual batch submission
1208  cmd = self._create_cmd_create_cmd(batch_submit_script_path)
1209  output = self._submit_to_batch_submit_to_batch(cmd)
1210  self._create_job_result_create_job_result(job, output)
1211  job.status = "submitted"
1212  B2INFO(f"{job} submitted")
1213 
1214  @submit.register(Job)
1215  def _(self, job, check_can_submit=True, jobs_per_check=100):
1216  """
1217  Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1218  and send it off with the batch submission command, applying the correct options (default and user requested.)
1219 
1220  Should set a Result object as an attribute of the job.
1221  """
1222  # Make sure the output directory of the job is created, commented out due to permissions issue
1223  # job.output_dir.mkdir(parents=True, exist_ok=True)
1224  # Make sure the working directory of the job is created
1225  job.working_dir.mkdir(parents=True, exist_ok=True)
1226  # Check if we have any valid input files
1227  job.check_input_data_files()
1228  # Add any required backend args that are missing (I'm a bit hesitant to actually merge with job.backend_args)
1229  # just in case you want to resubmit the same job with different backend settings later.
1230  # job_backend_args = {**self.backend_args, **job.backend_args}
1231 
1232  # If there's no splitter then we just submit the Job with no SubJobs
1233  if not job.splitter:
1234  # Get all of the requested files for the input sandbox and copy them to the working directory
1235  job.copy_input_sandbox_files_to_working_dir()
1236  job.dump_input_data()
1237  # Make submission file if needed
1238  batch_submit_script_path = self.get_batch_submit_script_pathget_batch_submit_script_path(job)
1239  self._make_submit_file_make_submit_file(job, batch_submit_script_path)
1240  # Get the bash file we will actually run
1241  script_path = self.get_submit_script_pathget_submit_script_path(job)
1242  # Construct the batch submission script (with directives if that is supported)
1243  with open(script_path, mode="w") as batch_file:
1244  self._add_batch_directives_add_batch_directives(job, batch_file)
1245  self._add_wrapper_script_setup_add_wrapper_script_setup(job, batch_file)
1246  self._add_setup_add_setup(job, batch_file)
1247  print(job.full_command, file=batch_file)
1248  self._add_wrapper_script_teardown_add_wrapper_script_teardown(job, batch_file)
1249  os.chmod(script_path, 0o755)
1250  B2INFO(f"Submitting {job}")
1251  # Do the actual batch submission
1252  cmd = self._create_cmd_create_cmd(batch_submit_script_path)
1253  output = self._submit_to_batch_submit_to_batch(cmd)
1254  self._create_job_result_create_job_result(job, output)
1255  job.status = "submitted"
1256  B2INFO(f"{job} submitted")
1257  else:
1258  # Create subjobs according to the splitter's logic
1259  job.splitter.create_subjobs(job)
1260  # Submit the subjobs
1261  self.submitsubmitsubmit(list(job.subjobs.values()))
1262  # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1263  self._create_parent_job_result_create_parent_job_result(job)
1264 
1265  @submit.register(list)
1266  def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1267  """
1268  Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1269  """
1270  B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1271  # Technically this could be a list of Jobs or SubJobs. And if it is a list of Jobs then it might not
1272  # be necessary to check if we can submit right now. We could do it later during the submission of the
1273  # SubJob list. However in the interest of simpler code we just do the check here, and re-check again
1274  # if a SubJob list comes through this function. Slightly inefficient, but much simpler logic.
1275 
1276  # The first thing to do is make sure that we are iterating through the jobs list in chunks that are
1277  # equal to or smaller than the gloabl limit. Otherwise nothing will ever submit.
1278 
1279  if jobs_per_check > self.global_job_limitglobal_job_limit:
1280  B2INFO((f"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1281  f"limit for this backend (={self.global_job_limit}). Will instead use the "
1282  " value of the global job limit."))
1283  jobs_per_check = self.global_job_limitglobal_job_limit
1284 
1285  # We group the jobs list into chunks of length jobs_per_check
1286  for jobs_to_submit in grouper(jobs_per_check, jobs):
1287  # Wait until we are allowed to submit
1288  while not self.can_submitcan_submit(njobs=len(jobs_to_submit)):
1289  B2INFO("Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1290  time.sleep(self.sleep_between_submission_checkssleep_between_submission_checks)
1291  else:
1292  # We loop here since we have already checked if the number of jobs is low enough, we don't want to hit this
1293  # function again unless one of the jobs has subjobs.
1294  B2INFO(f"Submitting the next {len(jobs_to_submit)} jobs...")
1295  for job in jobs_to_submit:
1296  self.submitsubmitsubmit(job, check_can_submit, jobs_per_check)
1297  B2INFO(f"All {len(jobs)} requested jobs submitted")
1298 
1300  """
1301  Construct the Path object of the script file that we will submit using the batch command.
1302  For most batch backends this is the same script as the bash script we submit.
1303  But for some they require a separate submission file that describes the job.
1304  To implement that you can implement this function in the Backend class.
1305  """
1306  return Path(job.working_dir, self.submit_scriptsubmit_script)
1307 
1308  @classmethod
1309  @abstractmethod
1310  def _create_job_result(cls, job, batch_output):
1311  """
1312  """
1313 
1314  @abstractmethod
1315  def _create_cmd(self, job):
1316  """
1317  """
1318 
1319 
1320 class PBS(Batch):
1321  """
1322  Backend for submitting calibration processes to a qsub batch system.
1323  """
1324 
1325  cmd_wkdir = "#PBS -d"
1326 
1327  cmd_stdout = "#PBS -o"
1328 
1329  cmd_stderr = "#PBS -e"
1330 
1331  cmd_queue = "#PBS -q"
1332 
1333  cmd_name = "#PBS -N"
1334 
1335  submission_cmds = ["qsub"]
1336 
1337  default_global_job_limit = 5000
1338 
1339  default_backend_args = {"queue": "short"}
1340 
1341  def __init__(self, *, backend_args=None):
1342  super().__init__(backend_args=backend_args)
1343 
1344  def _add_batch_directives(self, job, batch_file):
1345  """
1346  Add PBS directives to submitted script.
1347  """
1348  job_backend_args = {**self.backend_argsbackend_args, **job.backend_args}
1349  batch_queue = job_backend_args["queue"]
1350  print("#!/bin/bash", file=batch_file)
1351  print("# --- Start PBS ---", file=batch_file)
1352  print(" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1353  print(" ".join([PBS.cmd_name, job.name]), file=batch_file)
1354  print(" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1355  print(" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1356  print(" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1357  print("# --- End PBS ---", file=batch_file)
1358 
1359  @classmethod
1360  def _create_job_result(cls, job, batch_output):
1361  """
1362  """
1363  job_id = batch_output.replace("\n", "")
1364  B2INFO(f"Job ID of {job} recorded as: {job_id}")
1365  job.result = cls.PBSResultPBSResult(job, job_id)
1366 
1367  def _create_cmd(self, script_path):
1368  """
1369  """
1370  submission_cmd = self.submission_cmdssubmission_cmdssubmission_cmds[:]
1371  submission_cmd.append(script_path.as_posix())
1372  return submission_cmd
1373 
1374  @classmethod
1375  def _submit_to_batch(cls, cmd):
1376  """
1377  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1378  """
1379  sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
1380  return sub_out
1381 
1382  @classmethod
1383  def _create_parent_job_result(cls, parent):
1384  parent.result = cls.PBSResultPBSResult(parent, None)
1385 
1387  """
1388  Simple class to help monitor status of jobs submitted by `PBS` Backend.
1389 
1390  You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1391  When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1392  """
1393 
1394 
1395  backend_code_to_status = {"R": "running",
1396  "C": "completed",
1397  "FINISHED": "completed",
1398  "E": "failed",
1399  "H": "submitted",
1400  "Q": "submitted",
1401  "T": "submitted",
1402  "W": "submitted",
1403  "H": "submitted"
1404  }
1405 
1406  def __init__(self, job, job_id):
1407  """
1408  Pass in the job object and the job id to allow the result to do monitoring and perform
1409  post processing of the job.
1410  """
1411  super().__init__(job)
1412 
1413  self.job_idjob_id = job_id
1414 
1415  def update_status(self):
1416  """
1417  Update the job's (or subjobs') status by calling qstat.
1418  """
1419  B2DEBUG(29, f"Calling {self.job}.result.update_status()")
1420  # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
1421  qstat_output = PBS.qstat()
1422  if self.jobjob.subjobs:
1423  for subjob in self.jobjob.subjobs.values():
1424  subjob.result._update_result_status(qstat_output)
1425  else:
1426  self._update_result_status_update_result_status(qstat_output)
1427 
1428  def _update_result_status(self, qstat_output):
1429  """
1430  Parameters:
1431  qstat_output (dict): The JSON output of a previous call to qstat which we can re-use to find the
1432  status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1433  'job_state' information, otherwise it is useless.
1434 
1435  """
1436  try:
1437  backend_status = self._get_status_from_output_get_status_from_output(qstat_output)
1438  except KeyError:
1439  # If this happens then maybe the job id wasn't in the qstat_output argument because it finished.
1440  # Instead of failing immediately we try looking for the exit code file and then fail if it still isn't there.
1441  B2DEBUG(29, f"Checking of the exit code from file for {self.job}")
1442  try:
1443  exit_code = self.get_exit_code_from_fileget_exit_code_from_file()
1444  except FileNotFoundError:
1445  waiting_time = datetime.now() - self.exit_code_file_initial_timeexit_code_file_initial_time
1446  if self.time_to_wait_for_exit_code_filetime_to_wait_for_exit_code_file > waiting_time:
1447  B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1448  exit_code = 1
1449  else:
1450  B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1451  return
1452  if exit_code:
1453  backend_status = "E"
1454  else:
1455  backend_status = "C"
1456 
1457  try:
1458  new_job_status = self.backend_code_to_statusbackend_code_to_status[backend_status]
1459  except KeyError as err:
1460  raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1461 
1462  if new_job_status != self.jobjob.status:
1463  self.jobjob.status = new_job_status
1464 
1465  def _get_status_from_output(self, output):
1466  for job_info in output["JOBS"]:
1467  if job_info["Job_Id"] == self.job_idjob_id:
1468  return job_info["job_state"]
1469  else:
1470  raise KeyError
1471 
1472  def can_submit(self, njobs=1):
1473  """
1474  Checks the global number of jobs in PBS right now (submitted or running) for this user.
1475  Returns True if the number is lower that the limit, False if it is higher.
1476 
1477  Parameters:
1478  njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1479  are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1480  assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1481  So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1482  and check again before submitting more.
1483  """
1484  B2DEBUG(29, "Calling PBS().can_submit()")
1485  job_info = self.qstatqstat(username=os.environ["USER"])
1486  total_jobs = job_info["NJOBS"]
1487  B2INFO(f"Total jobs active in the PBS system is currently {total_jobs}")
1488  if (total_jobs + njobs) > self.global_job_limitglobal_job_limit:
1489  B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1490  return False
1491  else:
1492  B2INFO("There is enough space to submit more jobs.")
1493  return True
1494 
1495  @classmethod
1496  def qstat(cls, username="", job_ids=None):
1497  """
1498  Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1499  ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1500  by qstat.
1501 
1502  PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1503  finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1504  This one should work for Melbourne's cloud computing centre.
1505 
1506  Keyword Args:
1507  username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1508  will be in the output dictionary.
1509  job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1510  the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1511  matching the other filters will be returned.
1512 
1513  Returns:
1514  dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1515 
1516  .. code-block:: python
1517 
1518  {
1519  "NJOBS": int
1520  "JOBS":[
1521  {
1522  <key: value>, ...
1523  }, ...
1524  ]
1525  }
1526  """
1527  B2DEBUG(29, f"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1528  if not job_ids:
1529  job_ids = []
1530  job_ids = set(job_ids)
1531  cmd_list = ["qstat", "-x"]
1532  # We get an XML serialisable summary from qstat. Requires the shell argument.
1533  cmd = " ".join(cmd_list)
1534  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1535  output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1536  jobs_dict = {"NJOBS": 0, "JOBS": []}
1537  jobs_xml = ET.fromstring(output)
1538 
1539  # For a specific job_id we can be a bit more efficient in XML parsing
1540  if len(job_ids) == 1:
1541  job_elem = jobs_xml.find(f"./Job[Job_Id='{list(job_ids)[0]}']")
1542  if job_elem:
1543  jobs_dict["JOBS"].append(cls.create_job_record_from_elementcreate_job_record_from_element(job_elem))
1544  jobs_dict["NJOBS"] = 1
1545  return jobs_dict
1546 
1547  # Since the username given is not exactly the same as the one that PBS stores (<username>@host)
1548  # we have to simply loop through rather than using XPATH.
1549  for job in jobs_xml.iterfind("Job"):
1550  job_owner = job.find("Job_Owner").text.split("@")[0]
1551  if username and username != job_owner:
1552  continue
1553  job_id = job.find("Job_Id").text
1554  if job_ids and job_id not in job_ids:
1555  continue
1556  jobs_dict["JOBS"].append(cls.create_job_record_from_elementcreate_job_record_from_element(job))
1557  jobs_dict["NJOBS"] += 1
1558  # Remove it so that we don't keep checking for it
1559  if job_id in job_ids:
1560  job_ids.remove(job_id)
1561  return jobs_dict
1562 
1563  @staticmethod
1565  """
1566  Creates a Job dictionary with various job information from the XML element returned by qstat.
1567 
1568  Parameters:
1569  job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1570 
1571  Returns:
1572  dict: JSON serialisable dictionary of the Job information we are interested in.
1573  """
1574  job_dict = {}
1575  job_dict["Job_Id"] = job_elem.find("Job_Id").text
1576  job_dict["Job_Name"] = job_elem.find("Job_Name").text
1577  job_dict["Job_Owner"] = job_elem.find("Job_Owner").text
1578  job_dict["job_state"] = job_elem.find("job_state").text
1579  job_dict["queue"] = job_elem.find("queue").text
1580  return job_dict
1581 
1582 
1583 class LSF(Batch):
1584  """
1585  Backend for submitting calibration processes to a qsub batch system.
1586  """
1587 
1588  cmd_wkdir = "#BSUB -cwd"
1589 
1590  cmd_stdout = "#BSUB -o"
1591 
1592  cmd_stderr = "#BSUB -e"
1593 
1594  cmd_queue = "#BSUB -q"
1595 
1596  cmd_name = "#BSUB -J"
1597 
1598  submission_cmds = ["bsub", "-env", "\"none\"", "<"]
1599 
1600  default_global_job_limit = 15000
1601 
1602  default_backend_args = {"queue": "s"}
1603 
1604  def __init__(self, *, backend_args=None):
1605  super().__init__(backend_args=backend_args)
1606 
1607  def _add_batch_directives(self, job, batch_file):
1608  """
1609  Adds LSF BSUB directives for the job to a script.
1610  """
1611  job_backend_args = {**self.backend_argsbackend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1612  batch_queue = job_backend_args["queue"]
1613  print("#!/bin/bash", file=batch_file)
1614  print("# --- Start LSF ---", file=batch_file)
1615  print(" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1616  print(" ".join([LSF.cmd_name, job.name]), file=batch_file)
1617  print(" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1618  print(" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1619  print(" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1620  print("# --- End LSF ---", file=batch_file)
1621 
1622  def _create_cmd(self, script_path):
1623  """
1624  """
1625  submission_cmd = self.submission_cmdssubmission_cmdssubmission_cmds[:]
1626  submission_cmd.append(script_path.as_posix())
1627  submission_cmd = " ".join(submission_cmd)
1628  return [submission_cmd]
1629 
1630  @classmethod
1631  def _submit_to_batch(cls, cmd):
1632  """
1633  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1634  """
1635  sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1636  return sub_out
1637 
1639  """
1640  Simple class to help monitor status of jobs submitted by LSF Backend.
1641 
1642  You pass in a `Job` object and job id from a bsub command.
1643  When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1644  """
1645 
1646 
1647  backend_code_to_status = {"RUN": "running",
1648  "DONE": "completed",
1649  "FINISHED": "completed",
1650  "EXIT": "failed",
1651  "PEND": "submitted"
1652  }
1653 
1654  def __init__(self, job, job_id):
1655  """
1656  Pass in the job object and the job id to allow the result to do monitoring and perform
1657  post processing of the job.
1658  """
1659  super().__init__(job)
1660 
1661  self.job_idjob_id = job_id
1662 
1663  def update_status(self):
1664  """
1665  Update the job's (or subjobs') status by calling bjobs.
1666  """
1667  B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
1668  # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
1669  bjobs_output = LSF.bjobs(output_fields=["stat", "id"])
1670  if self.jobjob.subjobs:
1671  for subjob in self.jobjob.subjobs.values():
1672  subjob.result._update_result_status(bjobs_output)
1673  else:
1674  self._update_result_status_update_result_status(bjobs_output)
1675 
1676  def _update_result_status(self, bjobs_output):
1677  """
1678  Parameters:
1679  bjobs_output (dict): The JSON output of a previous call to bjobs which we can re-use to find the
1680  status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1681  'id' information, otherwise it is useless.
1682 
1683  """
1684  try:
1685  backend_status = self._get_status_from_output_get_status_from_output(bjobs_output)
1686  except KeyError:
1687  # If this happens then maybe the job id wasn't in the bjobs_output argument because it finished.
1688  # Instead of failing immediately we try re-running bjobs here explicitly and then fail if it still isn't there.
1689  bjobs_output = LSF.bjobs(output_fields=["stat", "id"], job_id=str(self.job_idjob_id))
1690  try:
1691  backend_status = self._get_status_from_output_get_status_from_output(bjobs_output)
1692  except KeyError:
1693  # If this happened, maybe we're looking at an old finished job. We could fall back to bhist, but it's
1694  # slow and terrible. Instead let's try looking for the exit code file.
1695  try:
1696  exit_code = self.get_exit_code_from_fileget_exit_code_from_file()
1697  except FileNotFoundError:
1698  waiting_time = datetime.now() - self.exit_code_file_initial_timeexit_code_file_initial_time
1699  if self.time_to_wait_for_exit_code_filetime_to_wait_for_exit_code_file > waiting_time:
1700  B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1701  exit_code = 1
1702  else:
1703  B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1704  return
1705  if exit_code:
1706  backend_status = "EXIT"
1707  else:
1708  backend_status = "FINISHED"
1709  try:
1710  new_job_status = self.backend_code_to_statusbackend_code_to_status[backend_status]
1711  except KeyError as err:
1712  raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1713 
1714  if new_job_status != self.jobjob.status:
1715  self.jobjob.status = new_job_status
1716 
1717  def _get_status_from_output(self, output):
1718  if output["JOBS"] and "ERROR" in output["JOBS"][0]:
1719  if output["JOBS"][0]["ERROR"] == f"Job <{self.job_id}> is not found":
1720  raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1721  else:
1722  raise BackendError(f"Unidentified Error during status check for {self.job}: {output}")
1723  else:
1724  for job_info in output["JOBS"]:
1725  if job_info["JOBID"] == self.job_idjob_id:
1726  return job_info["STAT"]
1727  else:
1728  raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1729 
1730  @classmethod
1731  def _create_parent_job_result(cls, parent):
1732  parent.result = cls.LSFResultLSFResult(parent, None)
1733 
1734  @classmethod
1735  def _create_job_result(cls, job, batch_output):
1736  """
1737  """
1738  m = re.search(r"Job <(\d+)>", str(batch_output))
1739  if m:
1740  job_id = m.group(1)
1741  else:
1742  raise BackendError(f"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1743 
1744  B2INFO(f"Job ID of {job} recorded as: {job_id}")
1745  job.result = cls.LSFResultLSFResult(job, job_id)
1746 
1747  def can_submit(self, njobs=1):
1748  """
1749  Checks the global number of jobs in LSF right now (submitted or running) for this user.
1750  Returns True if the number is lower that the limit, False if it is higher.
1751 
1752  Parameters:
1753  njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1754  are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1755  assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1756  So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1757  and check again before submitting more.
1758  """
1759  B2DEBUG(29, "Calling LSF().can_submit()")
1760  job_info = self.bjobsbjobs(output_fields=["stat"])
1761  total_jobs = job_info["NJOBS"]
1762  B2INFO(f"Total jobs active in the LSF system is currently {total_jobs}")
1763  if (total_jobs + njobs) > self.global_job_limitglobal_job_limit:
1764  B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1765  return False
1766  else:
1767  B2INFO("There is enough space to submit more jobs.")
1768  return True
1769 
1770  @classmethod
1771  def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1772  """
1773  Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1774  'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1775 
1776  Parameters:
1777  output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1778  job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1779  the output of this function will be only information about this job. If this argument is not given, then all jobs
1780  matching the other filters will be returned.
1781  username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1782  a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1783  receive job information from all known user jobs matching the other filters.
1784  queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1785 
1786  Returns:
1787  dict: JSON dictionary of the form:
1788 
1789  .. code-block:: python
1790 
1791  {
1792  "NJOBS":<njobs returned by command>,
1793  "JOBS":[
1794  {
1795  <output field: value>, ...
1796  }, ...
1797  ]
1798  }
1799  """
1800  B2DEBUG(29, f"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1801  # We must always return at least one output field when using JSON and -o options. So we choose the job id
1802  if not output_fields:
1803  output_fields = ["id"]
1804  # Output fields should be space separated but in a string.
1805  field_list_cmd = "\""
1806  field_list_cmd += " ".join(output_fields)
1807  field_list_cmd += "\""
1808  cmd_list = ["bjobs", "-o", field_list_cmd]
1809  # If the queue name is set then we add to the command options
1810  if queue:
1811  cmd_list.extend(["-q", queue])
1812  # If the username is set then we add to the command options
1813  if username:
1814  cmd_list.extend(["-u", username])
1815  # Can now add the json option before the final positional argument (if used)
1816  cmd_list.append("-json")
1817  # If the job id is set then we add to the end of the command
1818  if job_id:
1819  cmd_list.append(job_id)
1820  # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1821  cmd = " ".join(cmd_list)
1822  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1823  output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True))
1824  output["NJOBS"] = output["JOBS"]
1825  output["JOBS"] = output["RECORDS"]
1826  del output["RECORDS"]
1827  del output["COMMAND"]
1828  return output
1829 
1830  @classmethod
1831  def bqueues(cls, output_fields=None, queues=None):
1832  """
1833  Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1834  The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1835 
1836  Parameters:
1837  output_fields (list[str]): A list of bqueues -o fields that you would like information about
1838  e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1839  queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1840  By default you will receive information about all queues.
1841 
1842  Returns:
1843  dict: JSON dictionary of the form:
1844 
1845  .. code-block:: python
1846 
1847  {
1848  "COMMAND":"bqueues",
1849  "QUEUES":46,
1850  "RECORDS":[
1851  {
1852  "QUEUE_NAME":"b2_beast",
1853  "STATUS":"Open:Active",
1854  "MAX":"200",
1855  "NJOBS":"0",
1856  "PEND":"0",
1857  "RUN":"0"
1858  }, ...
1859  }
1860  """
1861  B2DEBUG(29, f"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1862  # We must always return at least one output field when using JSON and -o options. So we choose the job id
1863  if not output_fields:
1864  output_fields = ["queue_name", "status", "max", "njobs", "pend", "run"]
1865  # Output fields should be space separated but in a string.
1866  field_list_cmd = "\""
1867  field_list_cmd += " ".join(output_fields)
1868  field_list_cmd += "\""
1869  cmd_list = ["bqueues", "-o", field_list_cmd]
1870  # Can now add the json option before the final positional argument (if used)
1871  cmd_list.append("-json")
1872  # If the queue name is set then we add to the end of the command
1873  if queues:
1874  cmd_list.extend(queues)
1875  # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1876  cmd = " ".join(cmd_list)
1877  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1878  output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1879  return decode_json_string(output)
1880 
1881 
1883  """
1884  Backend for submitting calibration processes to a HTCondor batch system.
1885  """
1886 
1887  batch_submit_script = "submit.sub"
1888 
1889  submission_cmds = ["condor_submit", "-terse"]
1890 
1891  default_global_job_limit = 10000
1892 
1893  default_backend_args = {
1894  "universe": "vanilla",
1895  "getenv": "false",
1896  "request_memory": "4 GB", # We set the default requested memory to 4 GB to maintain parity with KEKCC
1897  "path_prefix": "", # Path prefix for file path
1898  "extra_lines": [] # These should be other HTCondor submit script lines like 'request_cpus = 2'
1899  }
1900 
1901  default_class_ads = ["GlobalJobId", "JobStatus", "Owner"]
1902 
1903  def _make_submit_file(self, job, submit_file_path):
1904  """
1905  Fill HTCondor submission file.
1906  """
1907  # Find all files/directories in the working directory to copy on the worker node
1908 
1909  files_to_transfer = [i.as_posix() for i in job.working_dir.iterdir()]
1910 
1911  job_backend_args = {**self.backend_argsbackend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1912 
1913  with open(submit_file_path, "w") as submit_file:
1914  print(f'executable = {self.get_submit_script_path(job)}', file=submit_file)
1915  print(f'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1916  print(f'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1917  print(f'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1918  print('transfer_input_files = ', ','.join(files_to_transfer), file=submit_file)
1919  print(f'universe = {job_backend_args["universe"]}', file=submit_file)
1920  print(f'getenv = {job_backend_args["getenv"]}', file=submit_file)
1921  print(f'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1922  print('should_transfer_files = Yes', file=submit_file)
1923  print('when_to_transfer_output = ON_EXIT', file=submit_file)
1924  # Any other lines in the backend args that we don't deal with explicitly but maybe someone wants to insert something
1925  for line in job_backend_args["extra_lines"]:
1926  print(line, file=submit_file)
1927  print('queue', file=submit_file)
1928 
1929  def _add_batch_directives(self, job, batch_file):
1930  """
1931  For HTCondor leave empty as the directives are already included in the submit file.
1932  """
1933  print('#!/bin/bash', file=batch_file)
1934 
1935  def _create_cmd(self, script_path):
1936  """
1937  """
1938  submission_cmd = self.submission_cmdssubmission_cmdssubmission_cmds[:]
1939  submission_cmd.append(script_path.as_posix())
1940  return submission_cmd
1941 
1943  """
1944  Construct the Path object of the .sub file that we will use to describe the job.
1945  """
1946  return Path(job.working_dir, self.batch_submit_scriptbatch_submit_script)
1947 
1948  @classmethod
1949  def _submit_to_batch(cls, cmd):
1950  """
1951  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1952  """
1953  job_dir = Path(cmd[-1]).parent.as_posix()
1954  sub_out = ""
1955  attempt = 0
1956  sleep_time = 30
1957 
1958  while attempt < 3:
1959  try:
1960  sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, cwd=job_dir)
1961  break
1962  except subprocess.CalledProcessError as e:
1963  attempt += 1
1964  if attempt == 3:
1965  B2ERROR(f"Error during condor_submit: {str(e)} occurred more than 3 times.")
1966  raise e
1967  else:
1968  B2ERROR(f"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
1969  time.sleep(30)
1970  return sub_out.split()[0]
1971 
1973  """
1974  Simple class to help monitor status of jobs submitted by HTCondor Backend.
1975 
1976  You pass in a `Job` object and job id from a condor_submit command.
1977  When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
1978  to see whether or not the job has finished.
1979  """
1980 
1981 
1982  backend_code_to_status = {0: "submitted",
1983  1: "submitted",
1984  2: "running",
1985  3: "failed",
1986  4: "completed",
1987  5: "submitted",
1988  6: "failed"
1989  }
1990 
1991  def __init__(self, job, job_id):
1992  """
1993  Pass in the job object and the job id to allow the result to do monitoring and perform
1994  post processing of the job.
1995  """
1996  super().__init__(job)
1997 
1998  self.job_idjob_id = job_id
1999 
2000  def update_status(self):
2001  """
2002  Update the job's (or subjobs') status by calling condor_q.
2003  """
2004  B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
2005  # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
2006  condor_q_output = HTCondor.condor_q()
2007  if self.jobjob.subjobs:
2008  for subjob in self.jobjob.subjobs.values():
2009  subjob.result._update_result_status(condor_q_output)
2010  else:
2011  self._update_result_status_update_result_status(condor_q_output)
2012 
2013  def _update_result_status(self, condor_q_output):
2014  """
2015  In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2016  If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2017  ``condor_history``, etc.
2018 
2019  Parameters:
2020  condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can re-use to find the
2021  status of this job if it was active when that command ran.
2022  """
2023  B2DEBUG(29, f"Calling {self.job}.result._update_result_status()")
2024  jobs_info = []
2025  for job_record in condor_q_output["JOBS"]:
2026  job_id = job_record["GlobalJobId"].split("#")[1]
2027  if job_id == self.job_idjob_id:
2028  B2DEBUG(29, f"Found {self.job_id} in condor_q_output.")
2029  jobs_info.append(job_record)
2030 
2031  # Let's look for the exit code file where we expect it
2032  if not jobs_info:
2033  try:
2034  exit_code = self.get_exit_code_from_fileget_exit_code_from_file()
2035  except FileNotFoundError:
2036  waiting_time = datetime.now() - self.exit_code_file_initial_timeexit_code_file_initial_time
2037  if self.time_to_wait_for_exit_code_filetime_to_wait_for_exit_code_file > waiting_time:
2038  B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2039  exit_code = 1
2040  else:
2041  B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
2042  return
2043  if exit_code:
2044  jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2045  else:
2046  jobs_info = [{"JobStatus": 4, "HoldReason": None}] # Set to completed
2047 
2048  # If this job wasn't in the passed in condor_q output, let's try our own with the specific job_id
2049  if not jobs_info:
2050  jobs_info = HTCondor.condor_q(job_id=self.job_idjob_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2051 
2052  # If no job information is returned then the job already left the queue
2053  # check in the history to see if it suceeded or failed
2054  if not jobs_info:
2055  try:
2056  jobs_info = HTCondor.condor_history(job_id=self.job_idjob_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2057  except KeyError:
2058  hold_reason = "No Reason Known"
2059 
2060  # Still no record of it after waiting for the exit code file?
2061  if not jobs_info:
2062  jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2063 
2064  job_info = jobs_info[0]
2065  backend_status = job_info["JobStatus"]
2066  # if job is held (backend_status = 5) then report why then keep waiting
2067  if backend_status == 5:
2068  hold_reason = job_info.get("HoldReason", None)
2069  B2WARNING(f"{self.job} on hold because of {hold_reason}. Keep waiting.")
2070  backend_status = 2
2071  try:
2072  new_job_status = self.backend_code_to_statusbackend_code_to_status[backend_status]
2073  except KeyError as err:
2074  raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
2075  if new_job_status != self.jobjob.status:
2076  self.jobjob.status = new_job_status
2077 
2078  @classmethod
2079  def _create_job_result(cls, job, job_id):
2080  """
2081  """
2082  B2INFO(f"Job ID of {job} recorded as: {job_id}")
2083  job.result = cls.HTCondorResultHTCondorResult(job, job_id)
2084 
2085  @classmethod
2086  def _create_parent_job_result(cls, parent):
2087  parent.result = cls.HTCondorResultHTCondorResult(parent, None)
2088 
2089  def can_submit(self, njobs=1):
2090  """
2091  Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2092  Returns True if the number is lower that the limit, False if it is higher.
2093 
2094  Parameters:
2095  njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2096  are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2097  assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2098  So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2099  and check again before submitting more.
2100  """
2101  B2DEBUG(29, "Calling HTCondor().can_submit()")
2102  jobs_info = self.condor_qcondor_q()
2103  total_jobs = jobs_info["NJOBS"]
2104  B2INFO(f"Total jobs active in the HTCondor system is currently {total_jobs}")
2105  if (total_jobs + njobs) > self.global_job_limitglobal_job_limit:
2106  B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2107  return False
2108  else:
2109  B2INFO("There is enough space to submit more jobs.")
2110  return True
2111 
2112  @classmethod
2113  def condor_q(cls, class_ads=None, job_id="", username=""):
2114  """
2115  Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2116  'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2117  The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2118 
2119  Parameters:
2120  class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2121  By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2122  by the condor_q call.
2123  job_id (str): String representation of the Job ID given by condor_submit during submission.
2124  If this argument is given then the output of this function will be only information about this job.
2125  If this argument is not given, then all jobs matching the other filters will be returned.
2126  username (str): By default we return information about only the current user's jobs. By giving
2127  a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2128  receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2129  so it isn't recommended.
2130 
2131  Returns:
2132  dict: JSON dictionary of the form:
2133 
2134  .. code-block:: python
2135 
2136  {
2137  "NJOBS":<number of records returned by command>,
2138  "JOBS":[
2139  {
2140  <ClassAd: value>, ...
2141  }, ...
2142  ]
2143  }
2144  """
2145  B2DEBUG(29, f"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2146  if not class_ads:
2147  class_ads = cls.default_class_adsdefault_class_ads
2148  # Output fields should be comma separated.
2149  field_list_cmd = ",".join(class_ads)
2150  cmd_list = ["condor_q", "-json", "-attributes", field_list_cmd]
2151  # If job_id is set then we ignore all other filters
2152  if job_id:
2153  cmd_list.append(job_id)
2154  else:
2155  if not username:
2156  username = os.environ["USER"]
2157  # If the username is set to all it is a special case
2158  if username == "all":
2159  cmd_list.append("-allusers")
2160  else:
2161  cmd_list.append(username)
2162  # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2163  cmd = " ".join(cmd_list)
2164  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2165  # condor_q occassionally fails
2166  try:
2167  records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2168  except BaseException:
2169  records = None
2170 
2171  if records:
2172  records = decode_json_string(records)
2173  else:
2174  records = []
2175  jobs_info = {"JOBS": records}
2176  jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2177  return jobs_info
2178 
2179  @classmethod
2180  def condor_history(cls, class_ads=None, job_id="", username=""):
2181  """
2182  Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2183  ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2184  The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2185 
2186  Parameters:
2187  class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2188  By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2189  by the condor_q call.
2190  job_id (str): String representation of the Job ID given by condor_submit during submission.
2191  If this argument is given then the output of this function will be only information about this job.
2192  If this argument is not given, then all jobs matching the other filters will be returned.
2193  username (str): By default we return information about only the current user's jobs. By giving
2194  a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2195  receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2196  and isn't recommended.
2197 
2198  Returns:
2199  dict: JSON dictionary of the form:
2200 
2201  .. code-block:: python
2202 
2203  {
2204  "NJOBS":<number of records returned by command>,
2205  "JOBS":[
2206  {
2207  <ClassAd: value>, ...
2208  }, ...
2209  ]
2210  }
2211  """
2212  B2DEBUG(29, f"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2213  if not class_ads:
2214  class_ads = cls.default_class_adsdefault_class_ads
2215  # Output fields should be comma separated.
2216  field_list_cmd = ",".join(class_ads)
2217  cmd_list = ["condor_history", "-json", "-attributes", field_list_cmd]
2218  # If job_id is set then we ignore all other filters
2219  if job_id:
2220  cmd_list.append(job_id)
2221  else:
2222  if not username:
2223  username = os.environ["USER"]
2224  # If the username is set to all it is a special case
2225  if username != "all":
2226  cmd_list.append(username)
2227  # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2228  cmd = " ".join(cmd_list)
2229  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2230  try:
2231  records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2232  except BaseException:
2233  records = None
2234 
2235  if records:
2236  records = decode_json_string(records)
2237  else:
2238  records = []
2239 
2240  jobs_info = {"JOBS": records}
2241  jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2242  return jobs_info
2243 
2244 
2246  """
2247  Backend for submitting calibration processes to the grid.
2248  """
2249 
2250 
2251 class BackendError(Exception):
2252  """
2253  Base exception class for Backend classes.
2254  """
2255 
2256 
2257 class JobError(Exception):
2258  """
2259  Base exception class for Job objects.
2260  """
2261 
2262 
2263 class SplitterError(Exception):
2264  """
2265  Base exception class for SubjobSplitter objects.
2266  """
generator_function
Generator function that has not been 'primed'.
Definition: backends.py:95
args
Positional argument tuple used to 'prime' the ArgumentsGenerator.generator_function.
Definition: backends.py:97
kwargs
Keyword argument dictionary used to 'prime' the ArgumentsGenerator.generator_function.
Definition: backends.py:99
def __init__(self, generator_function, *args, **kwargs)
Definition: backends.py:80
max_subjobs
If we try to create more than this many subjobs we throw an exception, if None then there is no maxim...
Definition: backends.py:291
def create_subjobs(self, job)
Definition: backends.py:293
def __init__(self, *arguments_generator=None, max_subjobs=None)
Definition: backends.py:286
def submit(self, job)
Definition: backends.py:770
def get_submit_script_path(self, job)
Definition: backends.py:830
def _create_parent_job_result(cls, parent)
Definition: backends.py:822
def __init__(self, *backend_args=None)
Definition: backends.py:761
dictionary default_backend_args
Default backend_args.
Definition: backends.py:759
def _add_setup(job, batch_file)
Definition: backends.py:777
def _add_wrapper_script_setup(self, job, batch_file)
Definition: backends.py:784
backend_args
The backend args that will be applied to jobs unless the job specifies them itself.
Definition: backends.py:767
string submit_script
Default submission script name.
Definition: backends.py:755
def _add_wrapper_script_teardown(self, job, batch_file)
Definition: backends.py:809
int default_global_job_limit
Default global limit on the total number of submitted/running jobs that the user can have.
Definition: backends.py:1123
sleep_between_submission_checks
Seconds we wait before checking if we can submit a list of jobs.
Definition: backends.py:1137
def _create_cmd(self, job)
Definition: backends.py:1315
def can_submit(self, *args, **kwargs)
Definition: backends.py:1161
def submit(self, job, check_can_submit=True, jobs_per_check=100)
Definition: backends.py:1172
list submission_cmds
Shell command to submit a script, should be implemented in the derived class.
Definition: backends.py:1110
def _make_submit_file(self, job, submit_file_path)
Definition: backends.py:1147
def _submit_to_batch(cls, cmd)
Definition: backends.py:1156
def __init__(self, *backend_args=None)
Definition: backends.py:1127
def _add_batch_directives(self, job, file)
Definition: backends.py:1139
def _(self, job, check_can_submit=True, jobs_per_check=100)
Definition: backends.py:1179
int default_sleep_between_submission_checks
Default time betweeon re-checking if the active jobs is below the global job limit.
Definition: backends.py:1125
global_job_limit
The active job limit.
Definition: backends.py:1134
def _create_job_result(cls, job, batch_output)
Definition: backends.py:1310
def get_batch_submit_script_path(self, job)
Definition: backends.py:1299
def __init__(self, job, job_id)
Definition: backends.py:1991
job_id
job id given by HTCondor
Definition: backends.py:1998
dictionary backend_code_to_status
HTCondor statuses mapped to Job statuses.
Definition: backends.py:1982
def _update_result_status(self, condor_q_output)
Definition: backends.py:2013
def condor_q(cls, class_ads=None, job_id="", username="")
Definition: backends.py:2113
def can_submit(self, njobs=1)
Definition: backends.py:2089
string batch_submit_script
HTCondor batch script (different to the wrapper script of Backend.submit_script)
Definition: backends.py:1887
def _create_job_result(cls, job, job_id)
Definition: backends.py:2079
list submission_cmds
Batch submission commands for HTCondor.
Definition: backends.py:1889
def _make_submit_file(self, job, submit_file_path)
Definition: backends.py:1903
def _submit_to_batch(cls, cmd)
Definition: backends.py:1949
def condor_history(cls, class_ads=None, job_id="", username="")
Definition: backends.py:2180
def _create_parent_job_result(cls, parent)
Definition: backends.py:2086
def _add_batch_directives(self, job, batch_file)
Definition: backends.py:1929
def _create_cmd(self, script_path)
Definition: backends.py:1935
list default_class_ads
Default ClassAd attributes to return from commands like condor_q.
Definition: backends.py:1901
def get_batch_submit_script_path(self, job)
Definition: backends.py:1942
status
Not a real attribute, it's a property.
Definition: backends.py:444
def _get_overall_status_from_subjobs(self)
Definition: backends.py:447
def check_input_data_files(self)
Definition: backends.py:572
def status(self)
Definition: backends.py:435
def input_sandbox_files(self, value)
Definition: backends.py:487
def input_sandbox_files(self)
Definition: backends.py:483
input_files
Input files to job (str), a list of these is copied to the working directory.
Definition: backends.py:362
def create_subjob(self, i, input_files=None, args=None)
Definition: backends.py:419
def output_dir(self)
Definition: backends.py:467
working_dir
Working directory of the job (pathlib.Path).
Definition: backends.py:352
def status(self, status)
Definition: backends.py:455
def output_dir(self, value)
Definition: backends.py:471
def working_dir(self)
Definition: backends.py:475
def working_dir(self, value)
Definition: backends.py:479
def dump_to_json(self, file_path)
Definition: backends.py:516
subjobs
dict of subjobs assigned to this job
Definition: backends.py:369
def update_status(self)
Definition: backends.py:408
def job_dict(self)
Definition: backends.py:533
def dump_input_data(self)
Definition: backends.py:553
args
The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own ar...
Definition: backends.py:360
setup_cmds
Bash commands to run before the main self.cmd (mainly used for batch system setup)
Definition: backends.py:364
def input_files(self, value)
Definition: backends.py:495
output_patterns
Files that we produce during the job and want to be returned.
Definition: backends.py:356
result
The result object of this Job.
Definition: backends.py:386
def full_command(self)
Definition: backends.py:602
def __repr__(self)
Definition: backends.py:390
splitter
The SubjobSplitter used to create subjobs if necessary.
Definition: backends.py:345
def __init__(self, name, job_dict=None)
Definition: backends.py:339
_input_sandbox_files
Definition: backends.py:488
def append_current_basf2_setup_cmds(self)
Definition: backends.py:614
name
Job object's name.
Definition: backends.py:343
def ready(self)
Definition: backends.py:396
_status
The actual status of the overall Job.
Definition: backends.py:388
def input_files(self)
Definition: backends.py:491
def copy_input_sandbox_files_to_working_dir(self)
Definition: backends.py:561
backend_args
Config dictionary for the backend to use when submitting the job.
Definition: backends.py:367
input_sandbox_files
Files to be copied directly into the working directory (pathlib.Path).
Definition: backends.py:350
cmd
Command and arguments as a list that wil be run by the job on the backend.
Definition: backends.py:358
dictionary statuses
Allowed Job status dictionary.
Definition: backends.py:334
output_dir
Output directory (pathlib.Path), where we will download our output_files to.
Definition: backends.py:354
def __init__(self, job, job_id)
Definition: backends.py:1654
job_id
job id given by LSF
Definition: backends.py:1661
def _update_result_status(self, bjobs_output)
Definition: backends.py:1676
dictionary backend_code_to_status
LSF statuses mapped to Job statuses.
Definition: backends.py:1647
def update_status(self)
Definition: backends.py:1663
def _get_status_from_output(self, output)
Definition: backends.py:1717
def can_submit(self, njobs=1)
Definition: backends.py:1747
def bjobs(cls, output_fields=None, job_id="", username="", queue="")
Definition: backends.py:1771
list submission_cmds
Shell command to submit a script.
Definition: backends.py:1598
def _submit_to_batch(cls, cmd)
Definition: backends.py:1631
def _create_parent_job_result(cls, parent)
Definition: backends.py:1731
def __init__(self, *backend_args=None)
Definition: backends.py:1604
def _add_batch_directives(self, job, batch_file)
Definition: backends.py:1607
def _create_cmd(self, script_path)
Definition: backends.py:1622
def bqueues(cls, output_fields=None, queues=None)
Definition: backends.py:1831
def _create_job_result(cls, job, batch_output)
Definition: backends.py:1735
def __init__(self, job, result)
Definition: backends.py:924
def _update_result_status(self)
Definition: backends.py:933
result
The underlying result from the backend.
Definition: backends.py:931
def submit(self, job)
Definition: backends.py:984
def max_processes(self)
Definition: backends.py:963
pool
The actual Pool object of this instance of the Backend.
Definition: backends.py:915
_max_processes
Internal attribute of max_processes.
Definition: backends.py:976
def _create_parent_job_result(cls, parent)
Definition: backends.py:1100
def max_processes(self, value)
Definition: backends.py:970
def _(self, job)
Definition: backends.py:991
max_processes
The size of the multiprocessing process pool.
Definition: backends.py:917
def join(self)
Definition: backends.py:952
def __init__(self, *backend_args=None, max_processes=1)
Definition: backends.py:910
def run_job(name, working_dir, output_dir, script)
Definition: backends.py:1074
def __init__(self, *arguments_generator=None, max_files_per_subjob=1)
Definition: backends.py:201
max_files_per_subjob
The maximium number of input files that will be used for each SubJob created.
Definition: backends.py:208
def create_subjobs(self, job)
Definition: backends.py:210
def __init__(self, *arguments_generator=None, max_subjobs=1000)
Definition: backends.py:230
max_subjobs
The maximum number of SubJob objects to be created, input files are split evenly between them.
Definition: backends.py:237
def create_subjobs(self, job)
Definition: backends.py:239
def __init__(self, job, job_id)
Definition: backends.py:1406
job_id
job id given by PBS
Definition: backends.py:1413
dictionary backend_code_to_status
PBS statuses mapped to Job statuses.
Definition: backends.py:1395
def update_status(self)
Definition: backends.py:1415
def _update_result_status(self, qstat_output)
Definition: backends.py:1428
def _get_status_from_output(self, output)
Definition: backends.py:1465
def can_submit(self, njobs=1)
Definition: backends.py:1472
def create_job_record_from_element(job_elem)
Definition: backends.py:1564
list submission_cmds
Shell command to submit a script.
Definition: backends.py:1335
def _submit_to_batch(cls, cmd)
Definition: backends.py:1375
def _create_parent_job_result(cls, parent)
Definition: backends.py:1383
def __init__(self, *backend_args=None)
Definition: backends.py:1341
def _add_batch_directives(self, job, batch_file)
Definition: backends.py:1344
def _create_cmd(self, script_path)
Definition: backends.py:1367
def qstat(cls, username="", job_ids=None)
Definition: backends.py:1496
def _create_job_result(cls, job, batch_output)
Definition: backends.py:1360
def get_exit_code_from_file(self)
Definition: backends.py:879
def __init__(self, job)
Definition: backends.py:844
def update_status(self)
Definition: backends.py:872
job
Job object for result.
Definition: backends.py:849
_is_ready
Quicker way to know if it's ready once it has already been found.
Definition: backends.py:851
exit_code_file_initial_time
Time we started waiting for the exit code file to appear.
Definition: backends.py:856
def ready(self)
Definition: backends.py:858
time_to_wait_for_exit_code_file
After our first attempt to view the exit code file once the job is 'finished', how long should we wai...
Definition: backends.py:854
input_files
Input files specific to this subjob.
Definition: backends.py:665
parent
Job() instance of parent to this SubJob.
Definition: backends.py:661
def __init__(self, job, subjob_id, input_files=None)
Definition: backends.py:655
def job_dict(self)
Definition: backends.py:717
args
Arguments specific to this SubJob.
Definition: backends.py:672
result
The result object of this SubJob.
Definition: backends.py:668
def __getattr__(self, attribute)
Definition: backends.py:730
def __repr__(self)
Definition: backends.py:737
_status
Status of the subjob.
Definition: backends.py:670
id
Id of Subjob.
Definition: backends.py:659
def __init__(self, *arguments_generator=None)
Definition: backends.py:156
def assign_arguments(self, job)
Definition: backends.py:169
arguments_generator
The ArgumentsGenerator used when creating subjobs.
Definition: backends.py:161
def create_subjobs(self, job)
Definition: backends.py:164