Belle II Software  release-05-02-19
backends.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
5 import re
6 import os
7 from abc import ABC, abstractmethod
8 import json
9 import xml.etree.ElementTree as ET
10 from math import ceil
11 from pathlib import Path
12 from collections import deque
13 from itertools import count, takewhile
14 import shutil
15 import time
16 from datetime import datetime, timedelta
17 import subprocess
18 import multiprocessing as mp
19 
20 from caf.utils import method_dispatch
21 from caf.utils import decode_json_string
22 from caf.utils import grouper
23 from caf.utils import parse_file_uri
24 
25 
26 __all__ = ["Job", "SubJob", "Backend", "Local", "Batch", "LSF", "PBS", "HTCondor", "get_input_data"]
27 
28 
29 _input_data_file_path = Path("__BACKEND_INPUT_FILES__.json")
30 
31 _STDOUT_FILE = "stdout"
32 
33 _STDERR_FILE = "stderr"
34 
35 
36 def get_input_data():
37  """
38  Simple JSON load of the default input data file. Will contain a list of string file paths
39  for use by the job process.
40  """
41  with open(_input_data_file_path, mode="r") as input_data_file:
42  input_data = json.load(input_data_file)
43  return input_data
44 
45 
46 def monitor_jobs(args, jobs):
47  unfinished_jobs = jobs[:]
48  failed_jobs = 0
49  while unfinished_jobs:
50  B2INFO(f"Updating statuses of unfinished jobs...")
51  for j in unfinished_jobs:
52  j.update_status()
53  B2INFO(f"Checking if jobs are ready...")
54  for j in unfinished_jobs[:]:
55  if j.ready():
56  if j.status == "failed":
57  B2ERROR(f"{j} is failed")
58  failed_jobs += 1
59  else:
60  B2INFO(f"{j} is finished")
61  unfinished_jobs.remove(j)
62  if unfinished_jobs:
63  B2INFO(f"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
64  time.sleep(args.heartbeat)
65  if failed_jobs > 0:
66  B2ERROR(f"{failed_jobs} jobs failed")
67  else:
68  B2INFO('All jobs finished successfully')
69 
70 
72  def __init__(self, generator_function, *args, **kwargs):
73  """
74  Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
75  initialise it. This lets us re-use a generator by setting it up again fresh. This is not
76  optimal for expensive calculations, but it is nice for making large sequences of
77  Job input arguments on the fly.
78 
79  Parameters:
80  generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
81  should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
82  yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
83  args (tuple): The positional arguments you want to send into the intialisation of the generator.
84  kwargs (dict): The keyword arguments you want to send into the intialisation of the generator.
85  """
86 
87  self.generator_function = generator_function
88 
89  self.args = args
90 
91  self.kwargs = kwargs
92 
93  @property
94  def generator(self):
95  """
96  Returns:
97  generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
98  to have ``next``/``send`` called on it.
99  """
100  gen = self.generator_function(*self.args, **self.kwargs)
101  gen.send(None) # Prime it
102  return gen
103 
104 
105 def range_arguments(start=0, stop=None, step=1):
106  """
107  A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
108  It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
109  The `SubJob` object is sent into this function with `send` but is not used.
110 
111  Parameters:
112  start (int): The starting value that will be returned.
113  stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
114  forever.
115  step (int): The step size.
116 
117  Returns:
118  tuple
119  """
120  def should_stop(x):
121  if stop is not None and x >= stop:
122  return True
123  else:
124  return False
125  # The logic of send/yield is tricky. Basically the first yield must have been hit before send(subjob) can work.
126  # However the first yield is also the one that receives the send(subjob) NOT the send(None).
127  subjob = (yield None) # Handle the send(None) and first send(subjob)
128  args = None
129  for i in takewhile(lambda x: not should_stop(x), count(start, step)):
130  args = (i,) # We could have used the subjob object here
131  B2DEBUG(29, f"{subjob} arguments will be {args}")
132  subjob = (yield args) # Set up ready for the next loop (if it happens)
133 
134 
135 class SubjobSplitter(ABC):
136  """
137  Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
138  The `create_subjobs` function should be implemented and used to construct
139  the subjobs of the parent job object.
140 
141  Parameters:
142  arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
143  tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
144  called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
145  arguments to return.
146  """
147 
148  def __init__(self, *, arguments_generator=None):
149  """
150  Derived classes should call `super` to run this.
151  """
152 
153  self.arguments_generator = arguments_generator
154 
155  @abstractmethod
156  def create_subjobs(self, job):
157  """
158  Implement this method in derived classes to generate the `SubJob` objects.
159  """
160 
161  def assign_arguments(self, job):
162  """
163  Use the `arguments_generator` (if one exists) to assign the argument tuples to the
164  subjobs.
165  """
166  if self.arguments_generator:
167  arg_gen = self.arguments_generator.generator
168  generating = True
169  for subjob in sorted(job.subjobs.values(), key=lambda sj: sj.id):
170  if generating:
171  try:
172  args = arg_gen.send(subjob)
173  except StopIteration:
174  B2ERROR((f"StopIteration called when getting args for {subjob}, "
175  "setting all subsequent subjobs to have empty argument tuples."))
176  args = tuple() # If our generator finishes before the subjobs we use empty argument tuples
177  generating = False
178  else:
179  args = tuple()
180  B2DEBUG(29, f"Arguments for {subjob}: {args}")
181  subjob.args = args
182  return
183 
184  B2INFO((f"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
185  "won't automatically have arguments assigned."))
186 
187  def __repr__(self):
188  return f"{self.__class__.__name__}"
189 
190 
192 
193  def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
194  """
195  Parameters:
196  max_files_per_subjob (int): The maximium number of input files used per `SubJob` created.
197  """
198  super().__init__(arguments_generator=arguments_generator)
199 
200  self.max_files_per_subjob = max_files_per_subjob
201 
202  def create_subjobs(self, job):
203  """
204  This function creates subjobs for the parent job passed in. It creates as many subjobs as required
205  in order to prevent the number of input files per subjob going over the limit set by
206  `MaxFilesSplitter.max_files_per_subjob`.
207  """
208  if not job.input_files:
209  B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
210  return
211 
212  for i, subjob_input_files in enumerate(grouper(self.max_files_per_subjob, job.input_files)):
213  subjob = job.create_subjob(i, input_files=subjob_input_files)
214 
215  self.assign_arguments(job)
216 
217  B2INFO(f"{self} created {i+1} Subjobs for {job}")
218 
219 
221 
222  def __init__(self, *, arguments_generator=None, max_subjobs=1000):
223  """
224  Parameters:
225  max_subjobs (int): The maximium number ofsubjobs that will be created.
226  """
227  super().__init__(arguments_generator=arguments_generator)
228 
229  self.max_subjobs = max_subjobs
230 
231  def create_subjobs(self, job):
232  """
233  This function creates subjobs for the parent job passed in. It creates as many subjobs as required
234  by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
235  more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
236  respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
237  will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
238  """
239  if not job.input_files:
240  B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
241  return
242 
243  # Since we will be popping from the left of the input file list, we construct a deque
244  remaining_input_files = deque(job.input_files)
245  # The initial number of available subjobs, will go down as we create them
246  available_subjobs = self.max_subjobs
247  subjob_i = 0
248  while remaining_input_files:
249  # How many files should we use for this subjob?
250  num_input_files = ceil(len(remaining_input_files) / available_subjobs)
251  # Pop them from the remaining files
252  subjob_input_files = []
253  for i in range(num_input_files):
254  subjob_input_files.append(remaining_input_files.popleft())
255  # Create the actual subjob
256  subjob = job.create_subjob(subjob_i, input_files=subjob_input_files)
257  subjob_i += 1
258  available_subjobs -= 1
259 
260  self.assign_arguments(job)
261  B2INFO(f"{self} created {subjob_i} Subjobs for {job}")
262 
263 
265  """
266  Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
267  Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
268  of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
269 
270  This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
271  numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
272  assinged to every `SubJob`.
273 
274  Parameters:
275  arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
276  """
277 
278  def __init__(self, *, arguments_generator=None, max_subjobs=None):
279  """
280  """
281  super().__init__(arguments_generator=arguments_generator)
282 
283  self.max_subjobs = max_subjobs
284 
285  def create_subjobs(self, job):
286  """
287  This function creates subjobs for the parent job passed in. It creates subjobs until the
288  `SubjobSplitter.arguments_generator` finishes.
289 
290  If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
291  subjobs are created.
292  """
293  arg_gen = self.arguments_generator.generator
294  for i in count():
295  # Reached the maximum?
296  if i >= self.max_subjobs:
297  raise SplitterError(f"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
298  try:
299  subjob = SubJob(job, i, job.input_files) # We manually create it because we might not need it
300  args = arg_gen.send(subjob) # Might throw StopIteration
301  B2INFO(f"Creating {job}.{subjob}")
302  B2DEBUG(29, f"Arguments for {subjob}: {args}")
303  subjob.args = args
304  job.subjobs[i] = subjob
305  except StopIteration:
306  break
307  B2INFO(f"{self} created {i+1} Subjobs for {job}")
308 
309 
310 class Job:
311  """
312  This generic Job object is used to tell a Backend what to do.
313  This object basically holds necessary information about a process you want to submit to a `Backend`.
314  It should *not* do anything that is backend specific, just hold the configuration for a job to be
315  successfully submitted and monitored using a backend. The result attribute is where backend
316  specific job monitoring goes.
317 
318  Parameters:
319  name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
320 
321  .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
322  """
323 
324 
326  statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
327 
328 
329  exit_statuses = ["failed", "completed"]
330 
331  def __init__(self, name, job_dict=None):
332  """
333  """
334 
335  self.name = name
336 
337  self.splitter = None
338 
339  if not job_dict:
340 
343 
344  self.working_dir = Path()
345 
346  self.output_dir = Path()
347 
348  self.output_patterns = []
349 
350  self.cmd = []
351 
352  self.args = []
353 
354  self.input_files = []
355 
356  self.setup_cmds = []
357 
359  self.backend_args = {}
360 
361  self.subjobs = {}
362  elif job_dict:
363  self.input_sandbox_files = [Path(p) for p in job_dict["input_sandbox_files"]]
364  self.working_dir = Path(job_dict["working_dir"])
365  self.output_dir = Path(job_dict["output_dir"])
366  self.output_patterns = job_dict["output_patterns"]
367  self.cmd = job_dict["cmd"]
368  self.args = job_dict["args"]
369  self.input_files = job_dict["input_files"]
370  self.setup_cmds = job_dict["setup_cmds"]
371  self.backend_args = job_dict["backend_args"]
372  self.subjobs = {}
373  for subjob_dict in job_dict["subjobs"]:
374  self.create_subjob(subjob_dict["id"], input_files=subjob_dict["input_files"], args=subjob_dict["args"])
375 
376 
378  self.result = None
379 
380  self._status = "init"
381 
382  def __repr__(self):
383  """
384  Representation of Job class (what happens when you print a Job() instance).
385  """
386  return f"Job({self.name})"
387 
388  def ready(self):
389  """
390  Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
391  It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
392  their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
393  """
394  if not self.result:
395  B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
396  return False
397  else:
398  return self.result.ready()
399 
400  def update_status(self):
401  """
402  Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
403  in the best way for the type of result object/backend.
404  """
405  if not self.result:
406  B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
407  else:
408  self.result.update_status()
409  return self.status
410 
411  def create_subjob(self, i, input_files=None, args=None):
412  """
413  Creates a subjob Job object that references that parent Job.
414  Returns the SubJob object at the end.
415  """
416  if i not in self.subjobs:
417  B2INFO(f"Creating {self}.Subjob({i})")
418  subjob = SubJob(self, i, input_files)
419  if args:
420  subjob.args = args
421  self.subjobs[i] = subjob
422  return subjob
423  else:
424  B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
425 
426  @property
427  def status(self):
428  """
429  Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
430  subjob status in the hierarchy of statuses in `Job.statuses`.
431  """
432  if self.subjobs:
433  job_status = self._get_overall_status_from_subjobs()
434  if job_status != self._status:
435 
436  self.status = job_status
437  return self._status
438 
439  def _get_overall_status_from_subjobs(self):
440  subjob_statuses = [subjob.status for subjob in self.subjobs.values()]
441  status_level = min([self.statuses[status] for status in subjob_statuses])
442  for status, level in self.statuses.items():
443  if level == status_level:
444  return status
445 
446  @status.setter
447  def status(self, status):
448  """
449  Sets the status of this Job.
450  """
451  # Print an error only if the job failed.
452  if status == 'failed':
453  B2ERROR(f"Setting {self.name} status to failed")
454  else:
455  B2INFO(f"Setting {self.name} status to {status}")
456  self._status = status
457 
458  @property
459  def output_dir(self):
460  return self._output_dir
461 
462  @output_dir.setter
463  def output_dir(self, value):
464  self._output_dir = Path(value).absolute()
465 
466  @property
467  def working_dir(self):
468  return self._working_dir
469 
470  @working_dir.setter
471  def working_dir(self, value):
472  self._working_dir = Path(value).absolute()
473 
474  @property
475  def input_sandbox_files(self):
476  return self._input_sandbox_files
477 
478  @input_sandbox_files.setter
479  def input_sandbox_files(self, value):
480  self._input_sandbox_files = [Path(p).absolute() for p in value]
481 
482  @property
483  def input_files(self):
484  return self._input_files
485 
486  @input_files.setter
487  def input_files(self, value):
488  self._input_files = value
489 
490  @property
491  def max_subjobs(self):
492  return self.splitter.max_subjobs
493 
494  @max_subjobs.setter
495  def max_subjobs(self, value):
496  self.splitter = MaxSubjobsSplitter(max_subjobs=value)
497  B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
498 
499  @property
500  def max_files_per_subjob(self):
501  return self.splitter.max_files_per_subjob
502 
503  @max_files_per_subjob.setter
504  def max_files_per_subjob(self, value):
505  self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
506  B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
507 
508  def dump_to_json(self, file_path):
509  """
510  Dumps the Job object configuration to a JSON file so that it can be read in again later.
511 
512  Parameters:
513  file_path(`Path`): The filepath we'll dump to
514  """
515  with open(file_path, mode="w") as job_file:
516  json.dump(self.job_dict, job_file, indent=2)
517 
518  @classmethod
519  def from_json(cls, file_path):
520  with open(file_path, mode="r") as job_file:
521  job_dict = json.load(job_file)
522  return cls(job_dict["name"], job_dict=job_dict)
523 
524  @property
525  def job_dict(self):
526  """
527  Returns:
528  dict: A JSON serialisable representation of the `Job` and its `SubJob` objects. `Path` objects are converted to
529  string via ``Path.as_posix()``.
530  """
531  job_dict = {}
532  job_dict["name"] = self.name
533  job_dict["input_sandbox_files"] = [i.as_posix() for i in self.input_sandbox_files]
534  job_dict["working_dir"] = self.working_dir.as_posix()
535  job_dict["output_dir"] = self.output_dir.as_posix()
536  job_dict["output_patterns"] = self.output_patterns
537  job_dict["cmd"] = self.cmd
538  job_dict["args"] = self.args
539  job_dict["input_files"] = self.input_files
540  job_dict["setup_cmds"] = self.setup_cmds
541  job_dict["backend_args"] = self.backend_args
542  job_dict["subjobs"] = [sj.job_dict for sj in self.subjobs.values()]
543  return job_dict
544 
545  def dump_input_data(self):
546  """
547  Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
548  string URI objects.
549  """
550  with open(Path(self.working_dir, _input_data_file_path), mode="w") as input_data_file:
551  json.dump(self.input_files, input_data_file, indent=2)
552 
554  """
555  Get all of the requested files for the input sandbox and copy them to the working directory.
556  Files like the submit.sh or input_data.json are not part of this process.
557  """
558  for file_path in self.input_sandbox_files:
559  if file_path.is_dir():
560  shutil.copytree(file_path, Path(self.working_dir, file_path.name))
561  else:
562  shutil.copy(file_path, self.working_dir)
563 
565  """
566  Check the input files and make sure that there aren't any duplicates.
567  Also check if the files actually exist if possible.
568  """
569  existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
570  for file_path in self.input_files:
571  file_uri = parse_file_uri(file_path)
572  if file_uri.scheme == "file":
573  p = Path(file_uri.path)
574  if p.is_file():
575  if file_uri.geturl() not in existing_input_files:
576  existing_input_files.append(file_uri.geturl())
577  else:
578  B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
579  else:
580  B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
581  else:
582  B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
583  if file_path not in existing_input_files:
584  existing_input_files.append(file_path)
585  else:
586  B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
587  if self.input_files and not existing_input_files:
588  B2WARNING(f"No valid input file paths found for {job}, but some were requested.")
589 
590  # Replace the Job's input files with the ones that exist + duplicates removed
591  self.input_files = existing_input_files
592 
593  @property
594  def full_command(self):
595  """
596  Returns:
597  str: The full command that this job will run including any arguments.
598  """
599  all_components = self.cmd[:]
600  all_components.extend(self.args)
601  # We do a convert to string just in case arguments were generated as different types
602  full_command = " ".join(map(str, all_components))
603  B2DEBUG(29, f"Full command of {self} is '{full_command}'")
604  return full_command
605 
607  """
608  This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
609  It should detect if you are using a local release or CVMFS and append the correct commands
610  so that the job will have the same basf2 release environment. It should also detect
611  if a local release is not compiled with the ``opt`` option.
612 
613  Note that this *doesn't mean that every environment variable is inherited* from the submitting
614  process environment.
615  """
616  if "BELLE2_TOOLS" not in os.environ:
617  raise BackendError("No BELLE2_TOOLS found in environment")
618  if "BELLE2_RELEASE" in os.environ:
619  self.setup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
620  elif 'BELLE2_LOCAL_DIR' in os.environ:
621  self.setup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
622  self.setup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
623  self.setup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
624  self.setup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
625  self.setup_cmds.append(f"pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
626  self.setup_cmds.append(f"source $BACKEND_B2SETUP")
627  # b2code-option has to be executed only after the source of the tools.
628  self.setup_cmds.append(f"b2code-option $BACKEND_BELLE2_OPTION")
629  self.setup_cmds.append(f"popd > /dev/null")
630 
631 
632 class SubJob(Job):
633  """
634  This mini-class simply holds basic information about which subjob you are
635  and a reference to the parent Job object to be able to access the main data there.
636  Rather than replicating all of the parent job's configuration again.
637  """
638 
639  def __init__(self, job, subjob_id, input_files=None):
640  """
641  """
642 
643  self.id = subjob_id
644 
645  self.parent = job
646 
647  if not input_files:
648  input_files = []
649  self.input_files = input_files
650 
652  self.result = None
653 
654  self._status = "init"
655 
656  self.args = []
657 
658  @property
659  def output_dir(self):
660  """
661  Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
662  return Path(self.parent.output_dir, str(self.id))
663 
664  @property
665  def working_dir(self):
666  """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
667  return Path(self.parent.working_dir, str(self.id))
668 
669  @property
670  def name(self):
671  """Getter for name of SubJob. Accesses the parent Job name to infer this."""
672  return "_".join((self.parent.name, str(self.id)))
673 
674  @property
675  def status(self):
676  """
677  Returns the status of this SubJob.
678  """
679  return self._status
680 
681  @status.setter
682  def status(self, status):
683  """
684  Sets the status of this Job.
685  """
686  # Print an error only if the job failed.
687  if status == "failed":
688  B2ERROR(f"Setting {self.name} status to failed")
689  else:
690  B2INFO(f"Setting {self.name} status to {status}")
691  self._status = status
692 
693  @property
694  def subjobs(self):
695  """
696  A subjob cannot have subjobs. Always return empty list.
697  """
698  return []
699 
700  @property
701  def job_dict(self):
702  """
703  Returns:
704  dict: A JSON serialisable representation of the `SubJob`. `Path` objects are converted to
705  `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
706  we only output the input files and arguments that are specific to this subjob and no other details.
707  """
708  job_dict = {}
709  job_dict["id"] = self.id
710  job_dict["input_files"] = self.input_files
711  job_dict["args"] = self.args
712  return job_dict
713 
714  def __getattr__(self, attribute):
715  """
716  Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
717  unless otherwise specified.
718  """
719  return getattr(self.parent, attribute)
720 
721  def __repr__(self):
722  """
723  """
724  return f"SubJob({self.name})"
725 
726 
727 class Backend(ABC):
728  """
729  Abstract base class for a valid backend.
730  Classes derived from this will implement their own submission of basf2 jobs
731  to whatever backend they describe.
732  Some common methods/attributes go into this base class.
733 
734  For backend_args the priority from lowest to highest is:
735 
736  backend.default_backend_args -> backend.backend_args -> job.backend_args
737  """
738 
739  submit_script = "submit.sh"
740 
741  exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
742 
743  default_backend_args = {}
744 
745  def __init__(self, *, backend_args=None):
746  """
747  """
748  if backend_args is None:
749  backend_args = {}
750 
751  self.backend_args = {**self.default_backend_args, **backend_args}
752 
753  @abstractmethod
754  def submit(self, job):
755  """
756  Base method for submitting collection jobs to the backend type. This MUST be
757  implemented for a correctly written backend class deriving from Backend().
758  """
759 
760  @staticmethod
761  def _add_setup(job, batch_file):
762  """
763  Adds setup lines to the shell script file.
764  """
765  for line in job.setup_cmds:
766  print(line, file=batch_file)
767 
768  def _add_wrapper_script_setup(self, job, batch_file):
769  """
770  Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
771  `trap` statements for Ctrl-C situations.
772  """
773  start_wrapper = f"""# ---
774 # trap ctrl-c and call ctrl_c()
775 trap '(ctrl_c 130)' SIGINT
776 trap '(ctrl_c 143)' SIGTERM
777 
778 function write_exit_code() {{
779  echo "Writing $1 to exit status file"
780  echo "$1" > {self.exit_code_file}
781  exit $1
782 }}
783 
784 function ctrl_c() {{
785  trap '' SIGINT SIGTERM
786  echo "** Trapped Ctrl-C **"
787  echo "$1" > {self.exit_code_file}
788  exit $1
789 }}
790 # ---"""
791  print(start_wrapper, file=batch_file)
792 
793  def _add_wrapper_script_teardown(self, job, batch_file):
794  """
795  Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
796  an exit code of the job cmd being written out to a file. Which means that we can know if the command was
797  successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
798  removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
799  file.
800  """
801  end_wrapper = """# ---
802 write_exit_code $?"""
803  print(end_wrapper, file=batch_file)
804 
805  @classmethod
806  def _create_parent_job_result(cls, parent):
807  """
808  We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
809  so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
810  statuses and allows the use of ready().
811  """
812  raise NotImplementedError
813 
814  def get_submit_script_path(self, job):
815  """
816  Construct the Path object of the bash script file that we will submit. It will contain
817  the actual job command, wrapper commands, setup commands, and any batch directives
818  """
819  return Path(job.working_dir, self.submit_script)
820 
821 
822 class Result():
823  """
824  Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
825  submitted to a backend. It provides a way to query a job's status to find out if it's ready.
826  """
827 
828  def __init__(self, job):
829  """
830  Pass in the job object to allow the result to access the job's properties and do post-processing.
831  """
832 
833  self.job = job
834 
835  self._is_ready = False
836 
838  self.time_to_wait_for_exit_code_file = timedelta(minutes=5)
839 
841 
842  def ready(self):
843  """
844  Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
845  the 'readiness' based on the known job status.
846  """
847  B2DEBUG(29, f"Calling {self.job}.result.ready()")
848  if self._is_ready:
849  return True
850  elif self.job.status in self.job.exit_statuses:
851  self._is_ready = True
852  return True
853  else:
854  return False
855 
856  def update_status(self):
857  """
858  Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
859  actually look up the job's status from some database/exit code file.
860  """
861  raise NotImplementedError
862 
864  """
865  Read the exit code file to discover the exit status of the job command. Useful falback if the job is no longer
866  known to the job database (batch system purged it for example). Since some backends may take time to download
867  the output files of the job back to the working directory we use a time limit on how long to wait.
868  """
869  if not self.exit_code_file_initial_time:
870  self.exit_code_file_initial_time = datetime.now()
871  exit_code_path = Path(self.job.working_dir, Backend.exit_code_file)
872  with open(exit_code_path, "r") as f:
873  exit_code = int(f.read().strip())
874  B2DEBUG(29, f"Exit code from file for {self.job} was {exit_code}")
875  return exit_code
876 
877 
878 class Local(Backend):
879  """
880  Backend for local processes i.e. on the same machine but in a subprocess.
881 
882  Note that you should call the self.join() method to close the pool and wait for any
883  running processes to finish before exiting the process. Once you've called join you will have to set up a new
884  instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
885  somewhere, then the main python process might end before your pool is done.
886 
887  Keyword Arguments:
888  max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
889  It's the maximium simultaneous subjobs.
890  Try not to specify a large number or a number larger than the number of cores.
891  It won't crash the program but it will slow down and negatively impact performance.
892  """
893 
894  def __init__(self, *, backend_args=None, max_processes=1):
895  """
896  """
897  super().__init__(backend_args=backend_args)
898 
899  self.pool = None
900 
901  self.max_processes = max_processes
902 
904  """
905  Result class to help monitor status of jobs submitted by Local backend.
906  """
907 
908  def __init__(self, job, result):
909  """
910  Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
911  post processing of the job.
912  """
913  super().__init__(job)
914 
915  self.result = result
916 
917  def _update_result_status(self):
918  if self.result.ready() and (self.job.status not in self.job.exit_statuses):
919  return_code = self.result.get()
920  if return_code:
921  self.job.status = "failed"
922  else:
923  self.job.status = "completed"
924 
925  def update_status(self):
926  """
927  Update the job's (or subjobs') status by calling the result object.
928  """
929  B2DEBUG(29, f"Calling {self.job}.result.update_status()")
930  if self.job.subjobs:
931  for subjob in self.job.subjobs.values():
932  subjob.result._update_result_status()
933  else:
934  self._update_result_status()
935 
936  def join(self):
937  """
938  Closes and joins the Pool, letting you wait for all results currently
939  still processing.
940  """
941  B2INFO("Joining Process Pool, waiting for results to finish...")
942  self.pool.close()
943  self.pool.join()
944  B2INFO("Process Pool joined.")
945 
946  @property
947  def max_processes(self):
948  """
949  Getter for max_processes
950  """
951  return self._max_processes
952 
953  @max_processes.setter
954  def max_processes(self, value):
955  """
956  Setter for max_processes, we also check for a previous Pool(), wait for it to join
957  and then create a new one with the new value of max_processes
958  """
959 
960  self._max_processes = value
961  if self.pool:
962  B2INFO(f"New max_processes requested. But a pool already exists.")
963  self.join()
964  B2INFO(f"Starting up new Pool with {self.max_processes} processes")
965  self.pool = mp.Pool(processes=self.max_processes)
966 
967  @method_dispatch
968  def submit(self, job):
969  """
970  """
971  raise NotImplementedError(("This is an abstract submit(job) method that shouldn't have been called. "
972  "Did you submit a (Sub)Job?"))
973 
974  @submit.register(SubJob)
975  def _(self, job):
976  """
977  Submission of a `SubJob` for the Local backend
978  """
979  # Make sure the output directory of the job is created
980  job.output_dir.mkdir(parents=True, exist_ok=True)
981  # Make sure the working directory of the job is created
982  job.working_dir.mkdir(parents=True, exist_ok=True)
983  job.copy_input_sandbox_files_to_working_dir()
984  job.dump_input_data()
985  # Get the path to the bash script we run
986  script_path = self.get_submit_script_path(job)
987  with open(script_path, mode="w") as batch_file:
988  print("#!/bin/bash", file=batch_file)
989  self._add_wrapper_script_setup(job, batch_file)
990  self._add_setup(job, batch_file)
991  print(job.full_command, file=batch_file)
992  self._add_wrapper_script_teardown(job, batch_file)
993  B2INFO(f"Submitting {job}")
994  job.result = Local.LocalResult(job,
995  self.pool.apply_async(self.run_job,
996  (job.name,
997  job.working_dir,
998  job.output_dir,
999  script_path)
1000  )
1001  )
1002  job.status = "submitted"
1003  B2INFO(f"{job} submitted")
1004 
1005  @submit.register(Job)
1006  def _(self, job):
1007  """
1008  Submission of a `Job` for the Local backend
1009  """
1010  # Make sure the output directory of the job is created
1011  job.output_dir.mkdir(parents=True, exist_ok=True)
1012  # Make sure the working directory of the job is created
1013  job.working_dir.mkdir(parents=True, exist_ok=True)
1014  # Check if we have any valid input files
1015  job.check_input_data_files()
1016 
1017  if not job.splitter:
1018  # Get all of the requested files for the input sandbox and copy them to the working directory
1019  job.copy_input_sandbox_files_to_working_dir()
1020  job.dump_input_data()
1021  # Get the path to the bash script we run
1022  script_path = self.get_submit_script_path(job)
1023  with open(script_path, mode="w") as batch_file:
1024  print("#!/bin/bash", file=batch_file)
1025  self._add_wrapper_script_setup(job, batch_file)
1026  self._add_setup(job, batch_file)
1027  print(job.full_command, file=batch_file)
1028  self._add_wrapper_script_teardown(job, batch_file)
1029  B2INFO(f"Submitting {job}")
1030  job.result = Local.LocalResult(job,
1031  self.pool.apply_async(self.run_job,
1032  (job.name,
1033  job.working_dir,
1034  job.output_dir,
1035  script_path)
1036  )
1037  )
1038  B2INFO(f"{job} submitted")
1039  else:
1040  # Create subjobs according to the splitter's logic
1041  job.splitter.create_subjobs(job)
1042  # Submit the subjobs
1043  self.submit(list(job.subjobs.values()))
1044  # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1045  self._create_parent_job_result(job)
1046 
1047  @submit.register(list)
1048  def _(self, jobs):
1049  """
1050  Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1051  """
1052  # Submit the jobs
1053  for job in jobs:
1054  self.submit(job)
1055  B2INFO("All requested jobs submitted.")
1056 
1057  @staticmethod
1058  def run_job(name, working_dir, output_dir, script):
1059  """
1060  The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1061  shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1062  """
1063  B2INFO(f"Starting Sub-process: {name}")
1064  from subprocess import Popen
1065  stdout_file_path = Path(working_dir, _STDOUT_FILE)
1066  stderr_file_path = Path(working_dir, _STDERR_FILE)
1067  # Create unix command to redirect stdour and stderr
1068  B2INFO(f"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1069  with open(stdout_file_path, mode="w", buffering=1) as f_out, \
1070  open(stderr_file_path, mode="w", buffering=1) as f_err:
1071  with Popen(["/bin/bash", script.as_posix()],
1072  stdout=f_out,
1073  stderr=f_err,
1074  bufsize=1,
1075  universal_newlines=True,
1076  cwd=working_dir,
1077  env={}) as p:
1078  # We block here and wait so that the return code will be set.
1079  p.wait()
1080  B2INFO(f"Subprocess {name} finished.")
1081  return p.returncode
1082 
1083  @classmethod
1084  def _create_parent_job_result(cls, parent):
1085  parent.result = cls.LocalResult(parent, None)
1086 
1087 
1089  """
1090  Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1091  in a derived class. Do not use this class directly!
1092  """
1093 
1094  submission_cmds = []
1095 
1107  default_global_job_limit = 1000
1108 
1109  default_sleep_between_submission_checks = 30
1110 
1111  def __init__(self, *, backend_args=None):
1112  """
1113  Init method for Batch Backend. Does some basic default setup.
1114  """
1115  super().__init__(backend_args=backend_args)
1116 
1119 
1122 
1123  def _add_batch_directives(self, job, file):
1124  """
1125  Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1126  You should think about where the stdout/err should go, and set the queue name.
1127  """
1128  raise NotImplementedError(("Need to implement a _add_batch_directives(self, job, file) "
1129  f"method in {self.__class__.__name__} backend."))
1130 
1131  def _make_submit_file(self, job, submit_file_path):
1132  """
1133  Useful for the HTCondor backend where a submit is needed instead of batch
1134  directives pasted directly into the submission script. It should be overwritten
1135  if needed.
1136  """
1137 
1138  @classmethod
1139  @abstractmethod
1140  def _submit_to_batch(cls, cmd):
1141  """
1142  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1143  """
1144 
1145  def can_submit(self, *args, **kwargs):
1146  """
1147  Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1148  This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1149 
1150  Returns:
1151  bool: If the job submission can continue based on the current situation.
1152  """
1153  return True
1154 
1155  @method_dispatch
1156  def submit(self, job, check_can_submit=True, jobs_per_check=100):
1157  """
1158  """
1159  raise NotImplementedError(("This is an abstract submit(job) method that shouldn't have been called. "
1160  "Did you submit a (Sub)Job?"))
1161 
1162  @submit.register(SubJob)
1163  def _(self, job, check_can_submit=True, jobs_per_check=100):
1164  """
1165  Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1166  create batch script, and send it off with the batch submission command.
1167  It should apply the correct options (default and user requested).
1168 
1169  Should set a Result object as an attribute of the job.
1170  """
1171  # Make sure the output directory of the job is created, commented out due to permission issues
1172  # job.output_dir.mkdir(parents=True, exist_ok=True)
1173  # Make sure the working directory of the job is created
1174  job.working_dir.mkdir(parents=True, exist_ok=True)
1175  job.copy_input_sandbox_files_to_working_dir()
1176  job.dump_input_data()
1177  # Make submission file if needed
1178  batch_submit_script_path = self.get_batch_submit_script_path(job)
1179  self._make_submit_file(job, batch_submit_script_path)
1180  # Get the bash file we will actually run, might be the same file
1181  script_path = self.get_submit_script_path(job)
1182  # Construct the batch submission script (with directives if that is supported)
1183  with open(script_path, mode="w") as batch_file:
1184  self._add_batch_directives(job, batch_file)
1185  self._add_wrapper_script_setup(job, batch_file)
1186  self._add_setup(job, batch_file)
1187  print(job.full_command, file=batch_file)
1188  self._add_wrapper_script_teardown(job, batch_file)
1189  os.chmod(script_path, 0o755)
1190  B2INFO(f"Submitting {job}")
1191  # Do the actual batch submission
1192  cmd = self._create_cmd(batch_submit_script_path)
1193  output = self._submit_to_batch(cmd)
1194  self._create_job_result(job, output)
1195  job.status = "submitted"
1196  B2INFO(f"{job} submitted")
1197 
1198  @submit.register(Job)
1199  def _(self, job, check_can_submit=True, jobs_per_check=100):
1200  """
1201  Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1202  and send it off with the batch submission command, applying the correct options (default and user requested.)
1203 
1204  Should set a Result object as an attribute of the job.
1205  """
1206  # Make sure the output directory of the job is created, commented out due to permissions issue
1207  # job.output_dir.mkdir(parents=True, exist_ok=True)
1208  # Make sure the working directory of the job is created
1209  job.working_dir.mkdir(parents=True, exist_ok=True)
1210  # Check if we have any valid input files
1211  job.check_input_data_files()
1212  # Add any required backend args that are missing (I'm a bit hesitant to actually merge with job.backend_args)
1213  # just in case you want to resubmit the same job with different backend settings later.
1214  job_backend_args = {**self.backend_args, **job.backend_args}
1215 
1216  # If there's no splitter then we just submit the Job with no SubJobs
1217  if not job.splitter:
1218  # Get all of the requested files for the input sandbox and copy them to the working directory
1219  job.copy_input_sandbox_files_to_working_dir()
1220  job.dump_input_data()
1221  # Make submission file if needed
1222  batch_submit_script_path = self.get_batch_submit_script_path(job)
1223  self._make_submit_file(job, batch_submit_script_path)
1224  # Get the bash file we will actually run
1225  script_path = self.get_submit_script_path(job)
1226  # Construct the batch submission script (with directives if that is supported)
1227  with open(script_path, mode="w") as batch_file:
1228  self._add_batch_directives(job, batch_file)
1229  self._add_wrapper_script_setup(job, batch_file)
1230  self._add_setup(job, batch_file)
1231  print(job.full_command, file=batch_file)
1232  self._add_wrapper_script_teardown(job, batch_file)
1233  os.chmod(script_path, 0o755)
1234  B2INFO(f"Submitting {job}")
1235  # Do the actual batch submission
1236  cmd = self._create_cmd(batch_submit_script_path)
1237  output = self._submit_to_batch(cmd)
1238  self._create_job_result(job, output)
1239  job.status = "submitted"
1240  B2INFO(f"{job} submitted")
1241  else:
1242  # Create subjobs according to the splitter's logic
1243  job.splitter.create_subjobs(job)
1244  # Submit the subjobs
1245  self.submit(list(job.subjobs.values()))
1246  # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1247  self._create_parent_job_result(job)
1248 
1249  @submit.register(list)
1250  def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1251  """
1252  Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1253  """
1254  B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1255  # Technically this could be a list of Jobs or SubJobs. And if it is a list of Jobs then it might not
1256  # be necessary to check if we can submit right now. We could do it later during the submission of the
1257  # SubJob list. However in the interest of simpler code we just do the check here, and re-check again
1258  # if a SubJob list comes through this function. Slightly inefficient, but much simpler logic.
1259 
1260  # The first thing to do is make sure that we are iterating through the jobs list in chunks that are
1261  # equal to or smaller than the gloabl limit. Otherwise nothing will ever submit.
1262 
1263  if jobs_per_check > self.global_job_limit:
1264  B2INFO((f"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1265  f"limit for this backend (={self.global_job_limit}). Will instead use the "
1266  " value of the global job limit."))
1267  jobs_per_check = self.global_job_limit
1268 
1269  # We group the jobs list into chunks of length jobs_per_check
1270  for jobs_to_submit in grouper(jobs_per_check, jobs):
1271  # Wait until we are allowed to submit
1272  while not self.can_submit(njobs=len(jobs_to_submit)):
1273  B2INFO(f"Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1274  time.sleep(self.sleep_between_submission_checks)
1275  else:
1276  # We loop here since we have already checked if the number of jobs is low enough, we don't want to hit this
1277  # function again unless one of the jobs has subjobs.
1278  B2INFO(f"Submitting the next {len(jobs_to_submit)} jobs...")
1279  for job in jobs_to_submit:
1280  self.submit(job, check_can_submit, jobs_per_check)
1281  B2INFO(f"All {len(jobs)} requested jobs submitted")
1282 
1284  """
1285  Construct the Path object of the script file that we will submit using the batch command.
1286  For most batch backends this is the same script as the bash script we submit.
1287  But for some they require a separate submission file that describes the job.
1288  To implement that you can implement this function in the Backend class.
1289  """
1290  return Path(job.working_dir, self.submit_script)
1291 
1292  @classmethod
1293  @abstractmethod
1294  def _create_job_result(cls, job, batch_output):
1295  """
1296  """
1297 
1298  @abstractmethod
1299  def _create_cmd(self, job):
1300  """
1301  """
1302 
1303 
1304 class PBS(Batch):
1305  """
1306  Backend for submitting calibration processes to a qsub batch system.
1307  """
1308 
1309  cmd_wkdir = "#PBS -d"
1310 
1311  cmd_stdout = "#PBS -o"
1312 
1313  cmd_stderr = "#PBS -e"
1314 
1315  cmd_queue = "#PBS -q"
1316 
1317  cmd_name = "#PBS -N"
1318 
1319  submission_cmds = ["qsub"]
1320 
1321  default_global_job_limit = 5000
1322 
1323  default_backend_args = {"queue": "short"}
1324 
1325  def __init__(self, *, backend_args=None):
1326  super().__init__(backend_args=backend_args)
1327 
1328  def _add_batch_directives(self, job, batch_file):
1329  """
1330  Add PBS directives to submitted script.
1331  """
1332  job_backend_args = {**self.backend_args, **job.backend_args}
1333  batch_queue = job_backend_args["queue"]
1334  print("#!/bin/bash", file=batch_file)
1335  print("# --- Start PBS ---", file=batch_file)
1336  print(" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1337  print(" ".join([PBS.cmd_name, job.name]), file=batch_file)
1338  print(" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1339  print(" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1340  print(" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1341  print("# --- End PBS ---", file=batch_file)
1342 
1343  @classmethod
1344  def _create_job_result(cls, job, batch_output):
1345  """
1346  """
1347  job_id = batch_output.replace("\n", "")
1348  B2INFO(f"Job ID of {job} recorded as: {job_id}")
1349  job.result = cls.PBSResult(job, job_id)
1350 
1351  def _create_cmd(self, script_path):
1352  """
1353  """
1354  submission_cmd = self.submission_cmds[:]
1355  submission_cmd.append(script_path.as_posix())
1356  return submission_cmd
1357 
1358  @classmethod
1359  def _submit_to_batch(cls, cmd):
1360  """
1361  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1362  """
1363  sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
1364  return sub_out
1365 
1366  @classmethod
1367  def _create_parent_job_result(cls, parent):
1368  parent.result = cls.PBSResult(parent, None)
1369 
1371  """
1372  Simple class to help monitor status of jobs submitted by `PBS` Backend.
1373 
1374  You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1375  When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1376  """
1377 
1378 
1379  backend_code_to_status = {"R": "running",
1380  "C": "completed",
1381  "FINISHED": "completed",
1382  "E": "failed",
1383  "H": "submitted",
1384  "Q": "submitted",
1385  "T": "submitted",
1386  "W": "submitted",
1387  "H": "submitted"
1388  }
1389 
1390  def __init__(self, job, job_id):
1391  """
1392  Pass in the job object and the job id to allow the result to do monitoring and perform
1393  post processing of the job.
1394  """
1395  super().__init__(job)
1396 
1397  self.job_id = job_id
1398 
1399  def update_status(self):
1400  """
1401  Update the job's (or subjobs') status by calling qstat.
1402  """
1403  B2DEBUG(29, f"Calling {self.job}.result.update_status()")
1404  # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
1405  qstat_output = PBS.qstat()
1406  if self.job.subjobs:
1407  for subjob in self.job.subjobs.values():
1408  subjob.result._update_result_status(qstat_output)
1409  else:
1410  self._update_result_status(qstat_output)
1411 
1412  def _update_result_status(self, qstat_output):
1413  """
1414  Parameters:
1415  qstat_output (dict): The JSON output of a previous call to qstat which we can re-use to find the
1416  status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1417  'job_state' information, otherwise it is useless.
1418 
1419  """
1420  try:
1421  backend_status = self._get_status_from_output(qstat_output)
1422  except KeyError:
1423  # If this happens then maybe the job id wasn't in the qstat_output argument because it finished.
1424  # Instead of failing immediately we try looking for the exit code file and then fail if it still isn't there.
1425  B2DEBUG(29, f"Checking of the exit code from file for {self.job}")
1426  try:
1427  exit_code = self.get_exit_code_from_file()
1428  except FileNotFoundError:
1429  waiting_time = datetime.now() - self.exit_code_file_initial_time
1430  if self.time_to_wait_for_exit_code_file > waiting_time:
1431  B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1432  exit_code = 1
1433  else:
1434  B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1435  return
1436  if exit_code:
1437  backend_status = "E"
1438  else:
1439  backend_status = "C"
1440 
1441  try:
1442  new_job_status = self.backend_code_to_status[backend_status]
1443  except KeyError as err:
1444  raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}")
1445 
1446  if new_job_status != self.job.status:
1447  self.job.status = new_job_status
1448 
1449  def _get_status_from_output(self, output):
1450  for job_info in output["JOBS"]:
1451  if job_info["Job_Id"] == self.job_id:
1452  return job_info["job_state"]
1453  else:
1454  raise KeyError
1455 
1456  def can_submit(self, njobs=1):
1457  """
1458  Checks the global number of jobs in PBS right now (submitted or running) for this user.
1459  Returns True if the number is lower that the limit, False if it is higher.
1460 
1461  Parameters:
1462  njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1463  are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1464  assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1465  So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1466  and check again before submitting more.
1467  """
1468  B2DEBUG(29, "Calling PBS().can_submit()")
1469  job_info = self.qstat(username=os.environ["USER"])
1470  total_jobs = job_info["NJOBS"]
1471  B2INFO(f"Total jobs active in the PBS system is currently {total_jobs}")
1472  if (total_jobs + njobs) > self.global_job_limit:
1473  B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1474  return False
1475  else:
1476  B2INFO("There is enough space to submit more jobs.")
1477  return True
1478 
1479  @classmethod
1480  def qstat(cls, username="", job_ids=None):
1481  """
1482  Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1483  ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1484  by qstat.
1485 
1486  PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1487  finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1488  This one should work for Melbourne's cloud computing centre.
1489 
1490  Keyword Args:
1491  username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1492  will be in the output dictionary.
1493  job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1494  the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1495  matching the other filters will be returned.
1496 
1497  Returns:
1498  dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1499 
1500  .. code-block:: python
1501 
1502  {
1503  "NJOBS": int
1504  "JOBS":[
1505  {
1506  <key: value>, ...
1507  }, ...
1508  ]
1509  }
1510  """
1511  B2DEBUG(29, f"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1512  if not job_ids:
1513  job_ids = []
1514  job_ids = set(job_ids)
1515  cmd_list = ["qstat", "-x"]
1516  # We get an XML serialisable summary from qstat. Requires the shell argument.
1517  cmd = " ".join(cmd_list)
1518  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1519  output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1520  jobs_dict = {"NJOBS": 0, "JOBS": []}
1521  jobs_xml = ET.fromstring(output)
1522 
1523  # For a specific job_id we can be a bit more efficient in XML parsing
1524  if len(job_ids) == 1:
1525  job_elem = jobs_xml.find(f"./Job[Job_Id='{list(job_ids)[0]}']")
1526  if job_elem:
1527  jobs_dict["JOBS"].append(cls.create_job_record_from_element(job_elem))
1528  jobs_dict["NJOBS"] = 1
1529  return jobs_dict
1530 
1531  # Since the username given is not exactly the same as the one that PBS stores (<username>@host)
1532  # we have to simply loop through rather than using XPATH.
1533  for job in jobs_xml.iterfind("Job"):
1534  job_owner = job.find("Job_Owner").text.split("@")[0]
1535  if username and username != job_owner:
1536  continue
1537  job_id = job.find("Job_Id").text
1538  if job_ids and job_id not in job_ids:
1539  continue
1540  jobs_dict["JOBS"].append(cls.create_job_record_from_element(job))
1541  jobs_dict["NJOBS"] += 1
1542  # Remove it so that we don't keep checking for it
1543  if job_id in job_ids:
1544  job_ids.remove(job_id)
1545  return jobs_dict
1546 
1547  @staticmethod
1549  """
1550  Creates a Job dictionary with various job information from the XML element returned by qstat.
1551 
1552  Parameters:
1553  job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1554 
1555  Returns:
1556  dict: JSON serialisable dictionary of the Job information we are interested in.
1557  """
1558  job_dict = {}
1559  job_dict["Job_Id"] = job_elem.find("Job_Id").text
1560  job_dict["Job_Name"] = job_elem.find("Job_Name").text
1561  job_dict["Job_Owner"] = job_elem.find("Job_Owner").text
1562  job_dict["job_state"] = job_elem.find("job_state").text
1563  job_dict["queue"] = job_elem.find("queue").text
1564  return job_dict
1565 
1566 
1567 class LSF(Batch):
1568  """
1569  Backend for submitting calibration processes to a qsub batch system.
1570  """
1571 
1572  cmd_wkdir = "#BSUB -cwd"
1573 
1574  cmd_stdout = "#BSUB -o"
1575 
1576  cmd_stderr = "#BSUB -e"
1577 
1578  cmd_queue = "#BSUB -q"
1579 
1580  cmd_name = "#BSUB -J"
1581 
1582  submission_cmds = ["bsub", "-env", "\"none\"", "<"]
1583 
1584  default_global_job_limit = 15000
1585 
1586  default_backend_args = {"queue": "s"}
1587 
1588  def __init__(self, *, backend_args=None):
1589  super().__init__(backend_args=backend_args)
1590 
1591  def _add_batch_directives(self, job, batch_file):
1592  """
1593  Adds LSF BSUB directives for the job to a script.
1594  """
1595  job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1596  batch_queue = job_backend_args["queue"]
1597  print("#!/bin/bash", file=batch_file)
1598  print("# --- Start LSF ---", file=batch_file)
1599  print(" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1600  print(" ".join([LSF.cmd_name, job.name]), file=batch_file)
1601  print(" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1602  print(" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1603  print(" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1604  print("# --- End LSF ---", file=batch_file)
1605 
1606  def _create_cmd(self, script_path):
1607  """
1608  """
1609  submission_cmd = self.submission_cmds[:]
1610  submission_cmd.append(script_path.as_posix())
1611  submission_cmd = " ".join(submission_cmd)
1612  return [submission_cmd]
1613 
1614  @classmethod
1615  def _submit_to_batch(cls, cmd):
1616  """
1617  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1618  """
1619  sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1620  return sub_out
1621 
1623  """
1624  Simple class to help monitor status of jobs submitted by LSF Backend.
1625 
1626  You pass in a `Job` object and job id from a bsub command.
1627  When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1628  """
1629 
1630 
1631  backend_code_to_status = {"RUN": "running",
1632  "DONE": "completed",
1633  "FINISHED": "completed",
1634  "EXIT": "failed",
1635  "PEND": "submitted"
1636  }
1637 
1638  def __init__(self, job, job_id):
1639  """
1640  Pass in the job object and the job id to allow the result to do monitoring and perform
1641  post processing of the job.
1642  """
1643  super().__init__(job)
1644 
1645  self.job_id = job_id
1646 
1647  def update_status(self):
1648  """
1649  Update the job's (or subjobs') status by calling bjobs.
1650  """
1651  B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
1652  # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
1653  bjobs_output = LSF.bjobs(output_fields=["stat", "id"])
1654  if self.job.subjobs:
1655  for subjob in self.job.subjobs.values():
1656  subjob.result._update_result_status(bjobs_output)
1657  else:
1658  self._update_result_status(bjobs_output)
1659 
1660  def _update_result_status(self, bjobs_output):
1661  """
1662  Parameters:
1663  bjobs_output (dict): The JSON output of a previous call to bjobs which we can re-use to find the
1664  status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1665  'id' information, otherwise it is useless.
1666 
1667  """
1668  try:
1669  backend_status = self._get_status_from_output(bjobs_output)
1670  except KeyError:
1671  # If this happens then maybe the job id wasn't in the bjobs_output argument because it finished.
1672  # Instead of failing immediately we try re-running bjobs here explicitly and then fail if it still isn't there.
1673  bjobs_output = LSF.bjobs(output_fields=["stat", "id"], job_id=str(self.job_id))
1674  try:
1675  backend_status = self._get_status_from_output(bjobs_output)
1676  except KeyError:
1677  # If this happened, maybe we're looking at an old finished job. We could fall back to bhist, but it's
1678  # slow and terrible. Instead let's try looking for the exit code file.
1679  try:
1680  exit_code = self.get_exit_code_from_file()
1681  except FileNotFoundError:
1682  waiting_time = datetime.now() - self.exit_code_file_initial_time
1683  if self.time_to_wait_for_exit_code_file > waiting_time:
1684  B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1685  exit_code = 1
1686  else:
1687  B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1688  return
1689  if exit_code:
1690  backend_status = "EXIT"
1691  else:
1692  backend_status = "FINISHED"
1693  try:
1694  new_job_status = self.backend_code_to_status[backend_status]
1695  except KeyError as err:
1696  raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}")
1697 
1698  if new_job_status != self.job.status:
1699  self.job.status = new_job_status
1700 
1701  def _get_status_from_output(self, output):
1702  if output["JOBS"] and "ERROR" in output["JOBS"][0]:
1703  if output["JOBS"][0]["ERROR"] == f"Job <{self.job_id}> is not found":
1704  raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1705  else:
1706  raise BackendError(f"Unidentified Error during status check for {self.job}: {output}")
1707  else:
1708  for job_info in output["JOBS"]:
1709  if job_info["JOBID"] == self.job_id:
1710  return job_info["STAT"]
1711  else:
1712  raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1713 
1714  @classmethod
1715  def _create_parent_job_result(cls, parent):
1716  parent.result = cls.LSFResult(parent, None)
1717 
1718  @classmethod
1719  def _create_job_result(cls, job, batch_output):
1720  """
1721  """
1722  m = re.search(r"Job <(\d+)>", str(batch_output))
1723  if m:
1724  job_id = m.group(1)
1725  else:
1726  raise BackendError(f"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1727 
1728  B2INFO(f"Job ID of {job} recorded as: {job_id}")
1729  job.result = cls.LSFResult(job, job_id)
1730 
1731  def can_submit(self, njobs=1):
1732  """
1733  Checks the global number of jobs in LSF right now (submitted or running) for this user.
1734  Returns True if the number is lower that the limit, False if it is higher.
1735 
1736  Parameters:
1737  njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1738  are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1739  assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1740  So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1741  and check again before submitting more.
1742  """
1743  B2DEBUG(29, "Calling LSF().can_submit()")
1744  job_info = self.bjobs(output_fields=["stat"])
1745  total_jobs = job_info["NJOBS"]
1746  B2INFO(f"Total jobs active in the LSF system is currently {total_jobs}")
1747  if (total_jobs + njobs) > self.global_job_limit:
1748  B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1749  return False
1750  else:
1751  B2INFO("There is enough space to submit more jobs.")
1752  return True
1753 
1754  @classmethod
1755  def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1756  """
1757  Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1758  'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1759 
1760  Parameters:
1761  output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1762  job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1763  the output of this function will be only information about this job. If this argument is not given, then all jobs
1764  matching the other filters will be returned.
1765  username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1766  a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1767  receive job information from all known user jobs matching the other filters.
1768  queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1769 
1770  Returns:
1771  dict: JSON dictionary of the form:
1772 
1773  .. code-block:: python
1774 
1775  {
1776  "NJOBS":<njobs returned by command>,
1777  "JOBS":[
1778  {
1779  <output field: value>, ...
1780  }, ...
1781  ]
1782  }
1783  """
1784  B2DEBUG(29, f"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1785  # We must always return at least one output field when using JSON and -o options. So we choose the job id
1786  if not output_fields:
1787  output_fields = ["id"]
1788  # Output fields should be space separated but in a string.
1789  field_list_cmd = "\""
1790  field_list_cmd += " ".join(output_fields)
1791  field_list_cmd += "\""
1792  cmd_list = ["bjobs", "-o", field_list_cmd]
1793  # If the queue name is set then we add to the command options
1794  if queue:
1795  cmd_list.extend(["-q", queue])
1796  # If the username is set then we add to the command options
1797  if username:
1798  cmd_list.extend(["-u", username])
1799  # Can now add the json option before the final positional argument (if used)
1800  cmd_list.append("-json")
1801  # If the job id is set then we add to the end of the command
1802  if job_id:
1803  cmd_list.append(job_id)
1804  # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1805  cmd = " ".join(cmd_list)
1806  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1807  output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True))
1808  output["NJOBS"] = output["JOBS"]
1809  output["JOBS"] = output["RECORDS"]
1810  del output["RECORDS"]
1811  del output["COMMAND"]
1812  return output
1813 
1814  @classmethod
1815  def bqueues(cls, output_fields=None, queues=None):
1816  """
1817  Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1818  The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1819 
1820  Parameters:
1821  output_fields (list[str]): A list of bqueues -o fields that you would like information about
1822  e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1823  queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1824  By default you will receive information about all queues.
1825 
1826  Returns:
1827  dict: JSON dictionary of the form:
1828 
1829  .. code-block:: python
1830 
1831  {
1832  "COMMAND":"bqueues",
1833  "QUEUES":46,
1834  "RECORDS":[
1835  {
1836  "QUEUE_NAME":"b2_beast",
1837  "STATUS":"Open:Active",
1838  "MAX":"200",
1839  "NJOBS":"0",
1840  "PEND":"0",
1841  "RUN":"0"
1842  }, ...
1843  }
1844  """
1845  B2DEBUG(29, f"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1846  # We must always return at least one output field when using JSON and -o options. So we choose the job id
1847  if not output_fields:
1848  output_fields = ["queue_name", "status", "max", "njobs", "pend", "run"]
1849  # Output fields should be space separated but in a string.
1850  field_list_cmd = "\""
1851  field_list_cmd += " ".join(output_fields)
1852  field_list_cmd += "\""
1853  cmd_list = ["bqueues", "-o", field_list_cmd]
1854  # Can now add the json option before the final positional argument (if used)
1855  cmd_list.append("-json")
1856  # If the queue name is set then we add to the end of the command
1857  if queues:
1858  cmd_list.extend(queues)
1859  # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1860  cmd = " ".join(cmd_list)
1861  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1862  output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1863  return decode_json_string(output)
1864 
1865 
1867  """
1868  Backend for submitting calibration processes to a HTCondor batch system.
1869  """
1870 
1871  batch_submit_script = "submit.sub"
1872 
1873  submission_cmds = ["condor_submit", "-terse"]
1874 
1875  default_global_job_limit = 10000
1876 
1877  default_backend_args = {
1878  "universe": "vanilla",
1879  "getenv": "false",
1880  "request_memory": "4 GB", # We set the default requested memory to 4 GB to maintain parity with KEKCC
1881  "path_prefix": "", # Path prefix for file path
1882  "extra_lines": [] # These should be other HTCondor submit script lines like 'request_cpus = 2'
1883  }
1884 
1885  default_class_ads = ["GlobalJobId", "JobStatus", "Owner"]
1886 
1887  def _make_submit_file(self, job, submit_file_path):
1888  """
1889  Fill HTCondor submission file.
1890  """
1891  # Find all files/directories in the working directory to copy on the worker node
1892 
1893  files_to_transfer = [i.as_posix() for i in job.working_dir.iterdir()]
1894 
1895  job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1896 
1897  with open(submit_file_path, "w") as submit_file:
1898  print(f'executable = {self.get_submit_script_path(job)}', file=submit_file)
1899  print(f'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1900  print(f'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1901  print(f'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1902  print(f'transfer_input_files = ', ','.join(files_to_transfer), file=submit_file)
1903  print(f'universe = {job_backend_args["universe"]}', file=submit_file)
1904  print(f'getenv = {job_backend_args["getenv"]}', file=submit_file)
1905  print(f'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1906  print('should_transfer_files = Yes', file=submit_file)
1907  print('when_to_transfer_output = ON_EXIT', file=submit_file)
1908  # Any other lines in the backend args that we don't deal with explicitly but maybe someone wants to insert something
1909  for line in job_backend_args["extra_lines"]:
1910  print(line, file=submit_file)
1911  print('queue', file=submit_file)
1912 
1913  def _add_batch_directives(self, job, batch_file):
1914  """
1915  For HTCondor leave empty as the directives are already included in the submit file.
1916  """
1917  print('#!/bin/bash', file=batch_file)
1918 
1919  def _create_cmd(self, script_path):
1920  """
1921  """
1922  submission_cmd = self.submission_cmds[:]
1923  submission_cmd.append(script_path.as_posix())
1924  return submission_cmd
1925 
1927  """
1928  Construct the Path object of the .sub file that we will use to describe the job.
1929  """
1930  return Path(job.working_dir, self.batch_submit_script)
1931 
1932  @classmethod
1933  def _submit_to_batch(cls, cmd):
1934  """
1935  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1936  """
1937  job_dir = Path(cmd[-1]).parent.as_posix()
1938  sub_out = ""
1939  attempt = 0
1940  sleep_time = 30
1941 
1942  while attempt < 3:
1943  try:
1944  sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, cwd=job_dir)
1945  break
1946  except subprocess.CalledProcessError as e:
1947  attempt += 1
1948  if attempt == 3:
1949  B2ERROR(f"Error during condor_submit: {str(e)} occurred more than 3 times.")
1950  raise e
1951  else:
1952  B2ERROR(f"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
1953  time.sleep(30)
1954  return sub_out.split()[0]
1955 
1957  """
1958  Simple class to help monitor status of jobs submitted by HTCondor Backend.
1959 
1960  You pass in a `Job` object and job id from a condor_submit command.
1961  When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
1962  to see whether or not the job has finished.
1963  """
1964 
1965 
1966  backend_code_to_status = {0: "submitted",
1967  1: "submitted",
1968  2: "running",
1969  3: "failed",
1970  4: "completed",
1971  5: "submitted",
1972  6: "failed"
1973  }
1974 
1975  def __init__(self, job, job_id):
1976  """
1977  Pass in the job object and the job id to allow the result to do monitoring and perform
1978  post processing of the job.
1979  """
1980  super().__init__(job)
1981 
1982  self.job_id = job_id
1983 
1984  def update_status(self):
1985  """
1986  Update the job's (or subjobs') status by calling condor_q.
1987  """
1988  B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
1989  # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
1990  condor_q_output = HTCondor.condor_q()
1991  if self.job.subjobs:
1992  for subjob in self.job.subjobs.values():
1993  subjob.result._update_result_status(condor_q_output)
1994  else:
1995  self._update_result_status(condor_q_output)
1996 
1997  def _update_result_status(self, condor_q_output):
1998  """
1999  In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2000  If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2001  ``condor_history``, etc.
2002 
2003  Parameters:
2004  condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can re-use to find the
2005  status of this job if it was active when that command ran.
2006  """
2007  B2DEBUG(29, f"Calling {self.job}.result._update_result_status()")
2008  jobs_info = []
2009  for job_record in condor_q_output["JOBS"]:
2010  job_id = job_record["GlobalJobId"].split("#")[1]
2011  if job_id == self.job_id:
2012  B2DEBUG(29, f"Found {self.job_id} in condor_q_output.")
2013  jobs_info.append(job_record)
2014 
2015  # Let's look for the exit code file where we expect it
2016  if not jobs_info:
2017  try:
2018  exit_code = self.get_exit_code_from_file()
2019  except FileNotFoundError:
2020  waiting_time = datetime.now() - self.exit_code_file_initial_time
2021  if self.time_to_wait_for_exit_code_file > waiting_time:
2022  B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2023  exit_code = 1
2024  else:
2025  B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
2026  return
2027  if exit_code:
2028  jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2029  else:
2030  jobs_info = [{"JobStatus": 4, "HoldReason": None}] # Set to completed
2031 
2032  # If this job wasn't in the passed in condor_q output, let's try our own with the specific job_id
2033  if not jobs_info:
2034  jobs_info = HTCondor.condor_q(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2035 
2036  # If no job information is returned then the job already left the queue
2037  # check in the history to see if it suceeded or failed
2038  if not jobs_info:
2039  try:
2040  jobs_info = HTCondor.condor_history(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2041  except KeyError:
2042  hold_reason = "No Reason Known"
2043 
2044  # Still no record of it after waiting for the exit code file?
2045  if not jobs_info:
2046  jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2047 
2048  job_info = jobs_info[0]
2049  backend_status = job_info["JobStatus"]
2050  # if job is held (backend_status = 5) then report why then keep waiting
2051  if backend_status == 5:
2052  hold_reason = job_info.get("HoldReason", None)
2053  B2WARNING(f"{self.job} on hold because of {hold_reason}. Keep waiting.")
2054  backend_status = 2
2055  try:
2056  new_job_status = self.backend_code_to_status[backend_status]
2057  except KeyError as err:
2058  raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}")
2059  if new_job_status != self.job.status:
2060  self.job.status = new_job_status
2061 
2062  @classmethod
2063  def _create_job_result(cls, job, job_id):
2064  """
2065  """
2066  B2INFO(f"Job ID of {job} recorded as: {job_id}")
2067  job.result = cls.HTCondorResult(job, job_id)
2068 
2069  @classmethod
2070  def _create_parent_job_result(cls, parent):
2071  parent.result = cls.HTCondorResult(parent, None)
2072 
2073  def can_submit(self, njobs=1):
2074  """
2075  Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2076  Returns True if the number is lower that the limit, False if it is higher.
2077 
2078  Parameters:
2079  njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2080  are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2081  assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2082  So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2083  and check again before submitting more.
2084  """
2085  B2DEBUG(29, "Calling HTCondor().can_submit()")
2086  jobs_info = self.condor_q()
2087  total_jobs = jobs_info["NJOBS"]
2088  B2INFO(f"Total jobs active in the HTCondor system is currently {total_jobs}")
2089  if (total_jobs + njobs) > self.global_job_limit:
2090  B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2091  return False
2092  else:
2093  B2INFO("There is enough space to submit more jobs.")
2094  return True
2095 
2096  @classmethod
2097  def condor_q(cls, class_ads=None, job_id="", username=""):
2098  """
2099  Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2100  'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2101  The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2102 
2103  Parameters:
2104  class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2105  By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2106  by the condor_q call.
2107  job_id (str): String representation of the Job ID given by condor_submit during submission.
2108  If this argument is given then the output of this function will be only information about this job.
2109  If this argument is not given, then all jobs matching the other filters will be returned.
2110  username (str): By default we return information about only the current user's jobs. By giving
2111  a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2112  receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2113  so it isn't recommended.
2114 
2115  Returns:
2116  dict: JSON dictionary of the form:
2117 
2118  .. code-block:: python
2119 
2120  {
2121  "NJOBS":<number of records returned by command>,
2122  "JOBS":[
2123  {
2124  <ClassAd: value>, ...
2125  }, ...
2126  ]
2127  }
2128  """
2129  B2DEBUG(29, f"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2130  if not class_ads:
2131  class_ads = cls.default_class_ads
2132  # Output fields should be comma separated.
2133  field_list_cmd = ",".join(class_ads)
2134  cmd_list = ["condor_q", "-json", "-attributes", field_list_cmd]
2135  # If job_id is set then we ignore all other filters
2136  if job_id:
2137  cmd_list.append(job_id)
2138  else:
2139  if not username:
2140  username = os.environ["USER"]
2141  # If the username is set to all it is a special case
2142  if username == "all":
2143  cmd_list.append("-allusers")
2144  else:
2145  cmd_list.append(username)
2146  # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2147  cmd = " ".join(cmd_list)
2148  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2149  # condor_q occassionally fails
2150  try:
2151  records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2152  except BaseException:
2153  records = None
2154 
2155  if records:
2156  records = decode_json_string(records)
2157  else:
2158  records = []
2159  jobs_info = {"JOBS": records}
2160  jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2161  return jobs_info
2162 
2163  @classmethod
2164  def condor_history(cls, class_ads=None, job_id="", username=""):
2165  """
2166  Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2167  ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2168  The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2169 
2170  Parameters:
2171  class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2172  By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2173  by the condor_q call.
2174  job_id (str): String representation of the Job ID given by condor_submit during submission.
2175  If this argument is given then the output of this function will be only information about this job.
2176  If this argument is not given, then all jobs matching the other filters will be returned.
2177  username (str): By default we return information about only the current user's jobs. By giving
2178  a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2179  receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2180  and isn't recommended.
2181 
2182  Returns:
2183  dict: JSON dictionary of the form:
2184 
2185  .. code-block:: python
2186 
2187  {
2188  "NJOBS":<number of records returned by command>,
2189  "JOBS":[
2190  {
2191  <ClassAd: value>, ...
2192  }, ...
2193  ]
2194  }
2195  """
2196  B2DEBUG(29, f"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2197  if not class_ads:
2198  class_ads = cls.default_class_ads
2199  # Output fields should be comma separated.
2200  field_list_cmd = ",".join(class_ads)
2201  cmd_list = ["condor_history", "-json", "-attributes", field_list_cmd]
2202  # If job_id is set then we ignore all other filters
2203  if job_id:
2204  cmd_list.append(job_id)
2205  else:
2206  if not username:
2207  username = os.environ["USER"]
2208  # If the username is set to all it is a special case
2209  if username != "all":
2210  cmd_list.append(username)
2211  # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2212  cmd = " ".join(cmd_list)
2213  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2214  try:
2215  records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2216  except BaseException:
2217  records = None
2218 
2219  if records:
2220  records = decode_json_string(records)
2221  else:
2222  records = []
2223 
2224  jobs_info = {"JOBS": records}
2225  jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2226  return jobs_info
2227 
2228 
2230  """
2231  Backend for submitting calibration processes to the grid.
2232  """
2233 
2234 
2235 class BackendError(Exception):
2236  """
2237  Base exception class for Backend classes.
2238  """
2239 
2240 
2241 class JobError(Exception):
2242  """
2243  Base exception class for Job objects.
2244  """
2245 
2246 
2247 class SplitterError(Exception):
2248  """
2249  Base exception class for this module.
2250  """
backends.Batch._create_job_result
def _create_job_result(cls, job, batch_output)
Definition: backends.py:1294
backends.Batch.sleep_between_submission_checks
sleep_between_submission_checks
Seconds we wait before checking if we can submit a list of jobs.
Definition: backends.py:1121
backends.MaxSubjobsSplitter.max_subjobs
max_subjobs
The maximum number of SubJob objects to be created, input files are split evenly between them.
Definition: backends.py:229
backends.Job.result
result
The result object of this Job.
Definition: backends.py:378
backends.ArgumentsSplitter.create_subjobs
def create_subjobs(self, job)
Definition: backends.py:285
backends.Local._max_processes
_max_processes
Internal attribute of max_processes.
Definition: backends.py:960
backends.Job.setup_cmds
setup_cmds
Bash commands to run before the main self.cmd (mainly used for batch system setup)
Definition: backends.py:356
backends.Job._get_overall_status_from_subjobs
def _get_overall_status_from_subjobs(self)
Definition: backends.py:439
backends.LSF._submit_to_batch
def _submit_to_batch(cls, cmd)
Definition: backends.py:1615
backends.ArgumentsGenerator.generator_function
generator_function
Generator function that has not been 'primed'.
Definition: backends.py:87
backends.Local.submit
def submit(self, job)
Definition: backends.py:968
backends.LSF.LSFResult._update_result_status
def _update_result_status(self, bjobs_output)
Definition: backends.py:1660
backends.ArgumentsGenerator.__init__
def __init__(self, generator_function, *args, **kwargs)
Definition: backends.py:72
backends.Job.cmd
cmd
Command and arguments as a list that wil be run by the job on the backend.
Definition: backends.py:350
backends.Backend._add_setup
def _add_setup(job, batch_file)
Definition: backends.py:761
backends.Result.exit_code_file_initial_time
exit_code_file_initial_time
Time we started waiting for the exit code file to appear.
Definition: backends.py:840
backends.PBS._create_parent_job_result
def _create_parent_job_result(cls, parent)
Definition: backends.py:1367
backends.Job.statuses
dictionary statuses
Allowed Job status dictionary.
Definition: backends.py:326
backends.Job.splitter
splitter
The SubjobSplitter used to create subjobs if necessary.
Definition: backends.py:337
backends.ArgumentsSplitter.__init__
def __init__(self, *arguments_generator=None, max_subjobs=None)
Definition: backends.py:278
backends.Batch.get_batch_submit_script_path
def get_batch_submit_script_path(self, job)
Definition: backends.py:1283
backends.Job._working_dir
_working_dir
Definition: backends.py:472
backends.HTCondor._create_cmd
def _create_cmd(self, script_path)
Definition: backends.py:1919
backends.LSF.bqueues
def bqueues(cls, output_fields=None, queues=None)
Definition: backends.py:1815
backends.ArgumentsGenerator.generator
def generator(self)
Definition: backends.py:94
backends.Local.LocalResult.__init__
def __init__(self, job, result)
Definition: backends.py:908
backends.Job.input_files
input_files
Input files to job (str), a list of these is copied to the working directory.
Definition: backends.py:354
backends.DIRAC
Definition: backends.py:2229
backends.SubJob.parent
parent
Job() instance of parent to this SubJob.
Definition: backends.py:645
backends.Job.append_current_basf2_setup_cmds
def append_current_basf2_setup_cmds(self)
Definition: backends.py:606
backends.LSF._create_job_result
def _create_job_result(cls, job, batch_output)
Definition: backends.py:1719
backends.LSF.LSFResult.update_status
def update_status(self)
Definition: backends.py:1647
backends.LSF.LSFResult._get_status_from_output
def _get_status_from_output(self, output)
Definition: backends.py:1701
backends.Local._
def _(self, job)
Definition: backends.py:975
backends.SubjobSplitter.__init__
def __init__(self, *arguments_generator=None)
Definition: backends.py:148
backends.Local.run_job
def run_job(name, working_dir, output_dir, script)
Definition: backends.py:1058
backends.HTCondor.HTCondorResult.backend_code_to_status
dictionary backend_code_to_status
HTCondor statuses mapped to Job statuses.
Definition: backends.py:1966
backends.HTCondor.condor_history
def condor_history(cls, class_ads=None, job_id="", username="")
Definition: backends.py:2164
backends.Job._status
_status
The actual status of the overall Job.
Definition: backends.py:380
backends.HTCondor.HTCondorResult._update_result_status
def _update_result_status(self, condor_q_output)
Definition: backends.py:1997
backends.LSF._create_cmd
def _create_cmd(self, script_path)
Definition: backends.py:1606
backends.PBS._add_batch_directives
def _add_batch_directives(self, job, batch_file)
Definition: backends.py:1328
backends.PBS.__init__
def __init__(self, *backend_args=None)
Definition: backends.py:1325
backends.HTCondor
Definition: backends.py:1866
backends.HTCondor.HTCondorResult.job_id
job_id
job id given by HTCondor
Definition: backends.py:1982
backends.SubJob
Definition: backends.py:632
backends.Job.args
args
The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own ar...
Definition: backends.py:352
backends.Job.dump_input_data
def dump_input_data(self)
Definition: backends.py:545
backends.PBS.PBSResult.update_status
def update_status(self)
Definition: backends.py:1399
backends.Batch._make_submit_file
def _make_submit_file(self, job, submit_file_path)
Definition: backends.py:1131
backends.Local.LocalResult.update_status
def update_status(self)
Definition: backends.py:925
backends.HTCondor._make_submit_file
def _make_submit_file(self, job, submit_file_path)
Definition: backends.py:1887
backends.HTCondor.HTCondorResult.__init__
def __init__(self, job, job_id)
Definition: backends.py:1975
backends.LSF._create_parent_job_result
def _create_parent_job_result(cls, parent)
Definition: backends.py:1715
backends.Batch.submit
def submit(self, job, check_can_submit=True, jobs_per_check=100)
Definition: backends.py:1156
backends.Result.ready
def ready(self)
Definition: backends.py:842
backends.MaxSubjobsSplitter.create_subjobs
def create_subjobs(self, job)
Definition: backends.py:231
backends.Batch.submission_cmds
list submission_cmds
Shell command to submit a script, should be implemented in the derived class.
Definition: backends.py:1094
backends.PBS.PBSResult
Definition: backends.py:1370
backends.Batch
Definition: backends.py:1088
backends.SubJob.id
id
Id of Subjob.
Definition: backends.py:643
backends.Backend.__init__
def __init__(self, *backend_args=None)
Definition: backends.py:745
backends.LSF.bjobs
def bjobs(cls, output_fields=None, job_id="", username="", queue="")
Definition: backends.py:1755
backends.Local.LocalResult._update_result_status
def _update_result_status(self)
Definition: backends.py:917
backends.Job.dump_to_json
def dump_to_json(self, file_path)
Definition: backends.py:508
backends.SubjobSplitter
Definition: backends.py:135
backends.PBS.PBSResult._update_result_status
def _update_result_status(self, qstat_output)
Definition: backends.py:1412
backends.HTCondor._create_parent_job_result
def _create_parent_job_result(cls, parent)
Definition: backends.py:2070
backends.Backend.submit_script
string submit_script
Default submission script name.
Definition: backends.py:739
backends.Result.job
job
Job object for result.
Definition: backends.py:833
backends.Job._input_sandbox_files
_input_sandbox_files
Definition: backends.py:480
backends.PBS.can_submit
def can_submit(self, njobs=1)
Definition: backends.py:1456
backends.PBS.PBSResult.job_id
job_id
job id given by PBS
Definition: backends.py:1397
backends.Job.copy_input_sandbox_files_to_working_dir
def copy_input_sandbox_files_to_working_dir(self)
Definition: backends.py:553
backends.Backend._create_parent_job_result
def _create_parent_job_result(cls, parent)
Definition: backends.py:806
backends.PBS.PBSResult._get_status_from_output
def _get_status_from_output(self, output)
Definition: backends.py:1449
backends.Job.status
status
Not a real attribute, it's a property.
Definition: backends.py:436
backends.Job.working_dir
working_dir
Working directory of the job (pathlib.Path).
Definition: backends.py:344
backends.Job.subjobs
subjobs
dict of subjobs assigned to this job
Definition: backends.py:361
backends.HTCondor.condor_q
def condor_q(cls, class_ads=None, job_id="", username="")
Definition: backends.py:2097
backends.Job._output_dir
_output_dir
Definition: backends.py:464
backends.Job.create_subjob
def create_subjob(self, i, input_files=None, args=None)
Definition: backends.py:411
backends.Job.name
name
Job object's name.
Definition: backends.py:335
backends.LSF.__init__
def __init__(self, *backend_args=None)
Definition: backends.py:1588
backends.Backend.get_submit_script_path
def get_submit_script_path(self, job)
Definition: backends.py:814
backends.MaxFilesSplitter.__init__
def __init__(self, *arguments_generator=None, max_files_per_subjob=1)
Definition: backends.py:193
backends.Result.update_status
def update_status(self)
Definition: backends.py:856
backends.Local.join
def join(self)
Definition: backends.py:936
backends.JobError
Definition: backends.py:2241
backends.PBS._create_cmd
def _create_cmd(self, script_path)
Definition: backends.py:1351
backends.LSF.can_submit
def can_submit(self, njobs=1)
Definition: backends.py:1731
backends.Result.time_to_wait_for_exit_code_file
time_to_wait_for_exit_code_file
After our first attempt to view the exit code file once the job is 'finished', how long should we wai...
Definition: backends.py:838
backends.Batch.__init__
def __init__(self, *backend_args=None)
Definition: backends.py:1111
backends.LSF
Definition: backends.py:1567
backends.LSF.LSFResult.job_id
job_id
job id given by LSF
Definition: backends.py:1645
backends.ArgumentsGenerator.kwargs
kwargs
Keyword argument dictionary used to 'prime' the ArgumentsGenerator.generator_function.
Definition: backends.py:91
backends.SubJob.job_dict
def job_dict(self)
Definition: backends.py:701
backends.MaxFilesSplitter.create_subjobs
def create_subjobs(self, job)
Definition: backends.py:202
backends.HTCondor.HTCondorResult
Definition: backends.py:1956
backends.MaxFilesSplitter
Definition: backends.py:191
backends.LSF._add_batch_directives
def _add_batch_directives(self, job, batch_file)
Definition: backends.py:1591
backends.Job.check_input_data_files
def check_input_data_files(self)
Definition: backends.py:564
backends.HTCondor.batch_submit_script
string batch_submit_script
HTCondor batch script (different to the wrapper script of Backend.submit_script)
Definition: backends.py:1871
backends.Batch._create_cmd
def _create_cmd(self, job)
Definition: backends.py:1299
backends.Result.__init__
def __init__(self, job)
Definition: backends.py:828
backends.Batch.can_submit
def can_submit(self, *args, **kwargs)
Definition: backends.py:1145
backends.SubjobSplitter.create_subjobs
def create_subjobs(self, job)
Definition: backends.py:156
backends.LSF.LSFResult
Definition: backends.py:1622
backends.Job.__repr__
def __repr__(self)
Definition: backends.py:382
backends.ArgumentsSplitter
Definition: backends.py:264
backends.SubJob.__init__
def __init__(self, job, subjob_id, input_files=None)
Definition: backends.py:639
backends.Local._create_parent_job_result
def _create_parent_job_result(cls, parent)
Definition: backends.py:1084
backends.SubJob.__getattr__
def __getattr__(self, attribute)
Definition: backends.py:714
backends.MaxSubjobsSplitter.__init__
def __init__(self, *arguments_generator=None, max_subjobs=1000)
Definition: backends.py:222
backends.Local.max_processes
max_processes
The size of the multiprocessing process pool.
Definition: backends.py:901
backends.HTCondor._create_job_result
def _create_job_result(cls, job, job_id)
Definition: backends.py:2063
backends.HTCondor.get_batch_submit_script_path
def get_batch_submit_script_path(self, job)
Definition: backends.py:1926
backends.ArgumentsSplitter.max_subjobs
max_subjobs
If we try to create more than this many subjobs we throw an exception, if None then there is no maxim...
Definition: backends.py:283
backends.Backend._add_wrapper_script_setup
def _add_wrapper_script_setup(self, job, batch_file)
Definition: backends.py:768
backends.Result.get_exit_code_from_file
def get_exit_code_from_file(self)
Definition: backends.py:863
backends.HTCondor.HTCondorResult.update_status
def update_status(self)
Definition: backends.py:1984
backends.Job.job_dict
def job_dict(self)
Definition: backends.py:525
backends.Batch.global_job_limit
global_job_limit
The active job limit.
Definition: backends.py:1118
backends.Backend.backend_args
backend_args
The backend args that will be applied to jobs unless the job specifies them itself.
Definition: backends.py:751
backends.PBS.qstat
def qstat(cls, username="", job_ids=None)
Definition: backends.py:1480
backends.Backend.default_backend_args
dictionary default_backend_args
Default backend_args.
Definition: backends.py:743
backends.Batch._add_batch_directives
def _add_batch_directives(self, job, file)
Definition: backends.py:1123
backends.Job._input_files
_input_files
Definition: backends.py:488
backends.SubjobSplitter.assign_arguments
def assign_arguments(self, job)
Definition: backends.py:161
backends.MaxFilesSplitter.max_files_per_subjob
max_files_per_subjob
The maximium number of input files that will be used for each SubJob created.
Definition: backends.py:200
backends.HTCondor._submit_to_batch
def _submit_to_batch(cls, cmd)
Definition: backends.py:1933
backends.Batch._
def _(self, job, check_can_submit=True, jobs_per_check=100)
Definition: backends.py:1163
backends.Job
Definition: backends.py:310
backends.PBS._submit_to_batch
def _submit_to_batch(cls, cmd)
Definition: backends.py:1359
backends.PBS.create_job_record_from_element
def create_job_record_from_element(job_elem)
Definition: backends.py:1548
backends.LSF.LSFResult.__init__
def __init__(self, job, job_id)
Definition: backends.py:1638
backends.Backend.submit
def submit(self, job)
Definition: backends.py:754
backends.SubjobSplitter.arguments_generator
arguments_generator
The ArgumentsGenerator used when creating subjobs.
Definition: backends.py:153
backends.SplitterError
Definition: backends.py:2247
backends.Job.full_command
def full_command(self)
Definition: backends.py:594
backends.Job.ready
def ready(self)
Definition: backends.py:388
backends.MaxSubjobsSplitter
Definition: backends.py:220
backends.PBS.PBSResult.__init__
def __init__(self, job, job_id)
Definition: backends.py:1390
backends.PBS.PBSResult.backend_code_to_status
dictionary backend_code_to_status
PBS statuses mapped to Job statuses.
Definition: backends.py:1379
backends.Local.pool
pool
The actual Pool object of this instance of the Backend.
Definition: backends.py:899
backends.Backend._add_wrapper_script_teardown
def _add_wrapper_script_teardown(self, job, batch_file)
Definition: backends.py:793
backends.Job.input_sandbox_files
input_sandbox_files
Files to be copied directly into the working directory (pathlib.Path).
Definition: backends.py:342
backends.HTCondor.can_submit
def can_submit(self, njobs=1)
Definition: backends.py:2073
backends.Local.__init__
def __init__(self, *backend_args=None, max_processes=1)
Definition: backends.py:894
backends.ArgumentsGenerator.args
args
Positional argument tuple used to 'prime' the ArgumentsGenerator.generator_function.
Definition: backends.py:89
backends.Result._is_ready
_is_ready
Quicker way to know if it's ready once it has already been found.
Definition: backends.py:835
backends.LSF.LSFResult.backend_code_to_status
dictionary backend_code_to_status
LSF statuses mapped to Job statuses.
Definition: backends.py:1631
backends.HTCondor.default_class_ads
list default_class_ads
Default ClassAd attributes to return from commands like condor_q.
Definition: backends.py:1885
backends.Job.output_patterns
output_patterns
Files that we produce during the job and want to be returned.
Definition: backends.py:348
backends.ArgumentsGenerator
Definition: backends.py:71
backends.Job.backend_args
backend_args
Config dictionary for the backend to use when submitting the job.
Definition: backends.py:359
backends.HTCondor._add_batch_directives
def _add_batch_directives(self, job, batch_file)
Definition: backends.py:1913
backends.Batch.default_sleep_between_submission_checks
int default_sleep_between_submission_checks
Default time betweeon re-checking if the active jobs is below the global job limit.
Definition: backends.py:1109
backends.Job.output_dir
output_dir
Output directory (pathlib.Path), where we will download our output_files to.
Definition: backends.py:346
backends.Local.LocalResult
Definition: backends.py:903
backends.Job.__init__
def __init__(self, name, job_dict=None)
Definition: backends.py:331
backends.Batch._submit_to_batch
def _submit_to_batch(cls, cmd)
Definition: backends.py:1140
backends.Local.LocalResult.result
result
The underlying result from the backend.
Definition: backends.py:915
backends.BackendError
Definition: backends.py:2235
backends.SubJob.__repr__
def __repr__(self)
Definition: backends.py:721
backends.Local
Definition: backends.py:878
backends.PBS._create_job_result
def _create_job_result(cls, job, batch_output)
Definition: backends.py:1344
backends.Result
Definition: backends.py:822
backends.Backend
Definition: backends.py:727
backends.Job.update_status
def update_status(self)
Definition: backends.py:400
backends.Batch.default_global_job_limit
int default_global_job_limit
Default global limit on the total number of submitted/running jobs that the user can have.
Definition: backends.py:1107