Belle II Software  release-08-01-10
backends.py
1 #!/usr/bin/env python3
2 
3 # disable doxygen check for this file
4 # @cond
5 
6 
13 
14 from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
15 import re
16 import os
17 from abc import ABC, abstractmethod
18 import json
19 import xml.etree.ElementTree as ET
20 from math import ceil
21 from pathlib import Path
22 from collections import deque
23 from itertools import count, takewhile
24 import shutil
25 import time
26 from datetime import datetime, timedelta
27 import subprocess
28 import multiprocessing as mp
29 
30 from caf.utils import method_dispatch
31 from caf.utils import decode_json_string
32 from caf.utils import grouper
33 from caf.utils import parse_file_uri
34 
35 
36 __all__ = ["Job", "SubJob", "Backend", "Local", "Batch", "LSF", "PBS", "HTCondor", "get_input_data"]
37 
38 
39 _input_data_file_path = Path("__BACKEND_INPUT_FILES__.json")
40 
41 _STDOUT_FILE = "stdout"
42 
43 _STDERR_FILE = "stderr"
44 
45 _backend_job_envvars = (
46  "TMPDIR",
47  "BELLE2_CONFIG_DIR",
48  "VO_BELLE2_SW_DIR",
49  "BELLE2_EXTERNALS_TOPDIR",
50  "BELLE2_CONDB_METADATA",
51  "BELLE2_CONDB_PROXY",
52 )
53 
54 
55 def get_input_data():
56  """
57  Simple JSON load of the default input data file. Will contain a list of string file paths
58  for use by the job process.
59  """
60  with open(_input_data_file_path) as input_data_file:
61  input_data = json.load(input_data_file)
62  return input_data
63 
64 
65 def monitor_jobs(args, jobs):
66  unfinished_jobs = jobs[:]
67  failed_jobs = 0
68  while unfinished_jobs:
69  B2INFO("Updating statuses of unfinished jobs...")
70  for j in unfinished_jobs:
71  j.update_status()
72  B2INFO("Checking if jobs are ready...")
73  for j in unfinished_jobs[:]:
74  if j.ready():
75  if j.status == "failed":
76  B2ERROR(f"{j} is failed")
77  failed_jobs += 1
78  else:
79  B2INFO(f"{j} is finished")
80  unfinished_jobs.remove(j)
81  if unfinished_jobs:
82  B2INFO(f"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
83  time.sleep(args.heartbeat)
84  if failed_jobs > 0:
85  B2ERROR(f"{failed_jobs} jobs failed")
86  else:
87  B2INFO('All jobs finished successfully')
88 
89 
90 class ArgumentsGenerator():
91  def __init__(self, generator_function, *args, **kwargs):
92  """
93  Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
94  initialise it. This lets us re-use a generator by setting it up again fresh. This is not
95  optimal for expensive calculations, but it is nice for making large sequences of
96  Job input arguments on the fly.
97 
98  Parameters:
99  generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
100  should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
101  yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
102  args (tuple): The positional arguments you want to send into the intialisation of the generator.
103  kwargs (dict): The keyword arguments you want to send into the intialisation of the generator.
104  """
105 
106  self.generator_function = generator_function
107 
108  self.args = args
109 
110  self.kwargs = kwargs
111 
112  @property
113  def generator(self):
114  """
115  Returns:
116  generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
117  to have ``next``/``send`` called on it.
118  """
119  gen = self.generator_function(*self.args, **self.kwargs)
120  gen.send(None) # Prime it
121  return gen
122 
123 
124 def range_arguments(start=0, stop=None, step=1):
125  """
126  A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
127  It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
128  The `SubJob` object is sent into this function with `send` but is not used.
129 
130  Parameters:
131  start (int): The starting value that will be returned.
132  stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
133  forever.
134  step (int): The step size.
135 
136  Returns:
137  tuple
138  """
139  def should_stop(x):
140  if stop is not None and x >= stop:
141  return True
142  else:
143  return False
144  # The logic of send/yield is tricky. Basically the first yield must have been hit before send(subjob) can work.
145  # However the first yield is also the one that receives the send(subjob) NOT the send(None).
146  subjob = (yield None) # Handle the send(None) and first send(subjob)
147  args = None
148  for i in takewhile(lambda x: not should_stop(x), count(start, step)):
149  args = (i,) # We could have used the subjob object here
150  B2DEBUG(29, f"{subjob} arguments will be {args}")
151  subjob = (yield args) # Set up ready for the next loop (if it happens)
152 
153 
154 class SubjobSplitter(ABC):
155  """
156  Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
157  The `create_subjobs` function should be implemented and used to construct
158  the subjobs of the parent job object.
159 
160  Parameters:
161  arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
162  tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
163  called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
164  arguments to return.
165  """
166 
167  def __init__(self, *, arguments_generator=None):
168  """
169  Derived classes should call `super` to run this.
170  """
171 
172  self.arguments_generator = arguments_generator
173 
174  @abstractmethod
175  def create_subjobs(self, job):
176  """
177  Implement this method in derived classes to generate the `SubJob` objects.
178  """
179 
180  def assign_arguments(self, job):
181  """
182  Use the `arguments_generator` (if one exists) to assign the argument tuples to the
183  subjobs.
184  """
185  if self.arguments_generator:
186  arg_gen = self.arguments_generator.generator
187  generating = True
188  for subjob in sorted(job.subjobs.values(), key=lambda sj: sj.id):
189  if generating:
190  try:
191  args = arg_gen.send(subjob)
192  except StopIteration:
193  B2ERROR(f"StopIteration called when getting args for {subjob}, "
194  "setting all subsequent subjobs to have empty argument tuples.")
195  args = tuple() # If our generator finishes before the subjobs we use empty argument tuples
196  generating = False
197  else:
198  args = tuple()
199  B2DEBUG(29, f"Arguments for {subjob}: {args}")
200  subjob.args = args
201  return
202 
203  B2INFO(f"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
204  "won't automatically have arguments assigned.")
205 
206  def __repr__(self):
207  return f"{self.__class__.__name__}"
208 
209 
210 class MaxFilesSplitter(SubjobSplitter):
211 
212  def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
213  """
214  Parameters:
215  max_files_per_subjob (int): The maximium number of input files used per `SubJob` created.
216  """
217  super().__init__(arguments_generator=arguments_generator)
218 
219  self.max_files_per_subjob = max_files_per_subjob
220 
221  def create_subjobs(self, job):
222  """
223  This function creates subjobs for the parent job passed in. It creates as many subjobs as required
224  in order to prevent the number of input files per subjob going over the limit set by
225  `MaxFilesSplitter.max_files_per_subjob`.
226  """
227  if not job.input_files:
228  B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
229  return
230 
231  for i, subjob_input_files in enumerate(grouper(self.max_files_per_subjob, job.input_files)):
232  job.create_subjob(i, input_files=subjob_input_files)
233 
234  self.assign_arguments(job)
235 
236  B2INFO(f"{self} created {i+1} Subjobs for {job}")
237 
238 
239 class MaxSubjobsSplitter(SubjobSplitter):
240 
241  def __init__(self, *, arguments_generator=None, max_subjobs=1000):
242  """
243  Parameters:
244  max_subjobs (int): The maximium number ofsubjobs that will be created.
245  """
246  super().__init__(arguments_generator=arguments_generator)
247 
248  self.max_subjobs = max_subjobs
249 
250  def create_subjobs(self, job):
251  """
252  This function creates subjobs for the parent job passed in. It creates as many subjobs as required
253  by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
254  more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
255  respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
256  will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
257  """
258  if not job.input_files:
259  B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
260  return
261 
262  # Since we will be popping from the left of the input file list, we construct a deque
263  remaining_input_files = deque(job.input_files)
264  # The initial number of available subjobs, will go down as we create them
265  available_subjobs = self.max_subjobs
266  subjob_i = 0
267  while remaining_input_files:
268  # How many files should we use for this subjob?
269  num_input_files = ceil(len(remaining_input_files) / available_subjobs)
270  # Pop them from the remaining files
271  subjob_input_files = []
272  for i in range(num_input_files):
273  subjob_input_files.append(remaining_input_files.popleft())
274  # Create the actual subjob
275  job.create_subjob(subjob_i, input_files=subjob_input_files)
276  subjob_i += 1
277  available_subjobs -= 1
278 
279  self.assign_arguments(job)
280  B2INFO(f"{self} created {subjob_i} Subjobs for {job}")
281 
282 
283 class ArgumentsSplitter(SubjobSplitter):
284  """
285  Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
286  Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
287  of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
288 
289  This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
290  numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
291  assinged to every `SubJob`.
292 
293  Parameters:
294  arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
295  """
296 
297  def __init__(self, *, arguments_generator=None, max_subjobs=None):
298  """
299  """
300  super().__init__(arguments_generator=arguments_generator)
301 
302  self.max_subjobs = max_subjobs
303 
304  def create_subjobs(self, job):
305  """
306  This function creates subjobs for the parent job passed in. It creates subjobs until the
307  `SubjobSplitter.arguments_generator` finishes.
308 
309  If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
310  subjobs are created.
311  """
312  arg_gen = self.arguments_generator.generator
313  for i in count():
314  # Reached the maximum?
315  if i >= self.max_subjobs:
316  raise SplitterError(f"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
317  try:
318  subjob = SubJob(job, i, job.input_files) # We manually create it because we might not need it
319  args = arg_gen.send(subjob) # Might throw StopIteration
320  B2INFO(f"Creating {job}.{subjob}")
321  B2DEBUG(29, f"Arguments for {subjob}: {args}")
322  subjob.args = args
323  job.subjobs[i] = subjob
324  except StopIteration:
325  break
326  B2INFO(f"{self} created {i+1} Subjobs for {job}")
327 
328 
329 class Job:
330  """
331  This generic Job object is used to tell a Backend what to do.
332  This object basically holds necessary information about a process you want to submit to a `Backend`.
333  It should *not* do anything that is backend specific, just hold the configuration for a job to be
334  successfully submitted and monitored using a backend. The result attribute is where backend
335  specific job monitoring goes.
336 
337  Parameters:
338  name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
339 
340  .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
341  """
342 
343 
345  statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
346 
347 
348  exit_statuses = ["failed", "completed"]
349 
350  def __init__(self, name, job_dict=None):
351  """
352  """
353 
354  self.name = name
355 
356  self.splitter = None
357 
358  if not job_dict:
359 
361  self.input_sandbox_files = []
362 
363  self.working_dir = Path()
364 
365  self.output_dir = Path()
366 
367  self.output_patterns = []
368 
369  self.cmd = []
370 
371  self.args = []
372 
373  self.input_files = []
374 
375  self.setup_cmds = []
376 
378  self.backend_args = {}
379 
380  self.subjobs = {}
381  elif job_dict:
382  self.input_sandbox_files = [Path(p) for p in job_dict["input_sandbox_files"]]
383  self.working_dir = Path(job_dict["working_dir"])
384  self.output_dir = Path(job_dict["output_dir"])
385  self.output_patterns = job_dict["output_patterns"]
386  self.cmd = job_dict["cmd"]
387  self.args = job_dict["args"]
388  self.input_files = job_dict["input_files"]
389  self.setup_cmds = job_dict["setup_cmds"]
390  self.backend_args = job_dict["backend_args"]
391  self.subjobs = {}
392  for subjob_dict in job_dict["subjobs"]:
393  self.create_subjob(subjob_dict["id"], input_files=subjob_dict["input_files"], args=subjob_dict["args"])
394 
395 
397  self.result = None
398 
399  self._status = "init"
400 
401  def __repr__(self):
402  """
403  Representation of Job class (what happens when you print a Job() instance).
404  """
405  return f"Job({self.name})"
406 
407  def ready(self):
408  """
409  Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
410  It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
411  their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
412  """
413  if not self.result:
414  B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
415  return False
416  else:
417  return self.result.ready()
418 
419  def update_status(self):
420  """
421  Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
422  in the best way for the type of result object/backend.
423  """
424  if not self.result:
425  B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
426  else:
427  self.result.update_status()
428  return self.status
429 
430  def create_subjob(self, i, input_files=None, args=None):
431  """
432  Creates a subjob Job object that references that parent Job.
433  Returns the SubJob object at the end.
434  """
435  if i not in self.subjobs:
436  B2INFO(f"Creating {self}.Subjob({i})")
437  subjob = SubJob(self, i, input_files)
438  if args:
439  subjob.args = args
440  self.subjobs[i] = subjob
441  return subjob
442  else:
443  B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
444 
445  @property
446  def status(self):
447  """
448  Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
449  subjob status in the hierarchy of statuses in `Job.statuses`.
450  """
451  if self.subjobs:
452  job_status = self._get_overall_status_from_subjobs()
453  if job_status != self._status:
454 
455  self.status = job_status
456  return self._status
457 
458  def _get_overall_status_from_subjobs(self):
459  subjob_statuses = [subjob.status for subjob in self.subjobs.values()]
460  status_level = min([self.statuses[status] for status in subjob_statuses])
461  for status, level in self.statuses.items():
462  if level == status_level:
463  return status
464 
465  @status.setter
466  def status(self, status):
467  """
468  Sets the status of this Job.
469  """
470  # Print an error only if the job failed.
471  if status == 'failed':
472  B2ERROR(f"Setting {self.name} status to failed")
473  else:
474  B2INFO(f"Setting {self.name} status to {status}")
475  self._status = status
476 
477  @property
478  def output_dir(self):
479  return self._output_dir
480 
481  @output_dir.setter
482  def output_dir(self, value):
483  self._output_dir = Path(value).absolute()
484 
485  @property
486  def working_dir(self):
487  return self._working_dir
488 
489  @working_dir.setter
490  def working_dir(self, value):
491  self._working_dir = Path(value).absolute()
492 
493  @property
494  def input_sandbox_files(self):
495  return self._input_sandbox_files
496 
497  @input_sandbox_files.setter
498  def input_sandbox_files(self, value):
499  self._input_sandbox_files = [Path(p).absolute() for p in value]
500 
501  @property
502  def input_files(self):
503  return self._input_files
504 
505  @input_files.setter
506  def input_files(self, value):
507  self._input_files = value
508 
509  @property
510  def max_subjobs(self):
511  return self.splitter.max_subjobs
512 
513  @max_subjobs.setter
514  def max_subjobs(self, value):
515  self.splitter = MaxSubjobsSplitter(max_subjobs=value)
516  B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
517 
518  @property
519  def max_files_per_subjob(self):
520  return self.splitter.max_files_per_subjob
521 
522  @max_files_per_subjob.setter
523  def max_files_per_subjob(self, value):
524  self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
525  B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
526 
527  def dump_to_json(self, file_path):
528  """
529  Dumps the Job object configuration to a JSON file so that it can be read in again later.
530 
531  Parameters:
532  file_path(`basf2.Path`): The filepath we'll dump to
533  """
534  with open(file_path, mode="w") as job_file:
535  json.dump(self.job_dict, job_file, indent=2)
536 
537  @classmethod
538  def from_json(cls, file_path):
539  with open(file_path) as job_file:
540  job_dict = json.load(job_file)
541  return cls(job_dict["name"], job_dict=job_dict)
542 
543  @property
544  def job_dict(self):
545  """
546  Returns:
547  dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
548  `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
549  """
550  job_dict = {}
551  job_dict["name"] = self.name
552  job_dict["input_sandbox_files"] = [i.as_posix() for i in self.input_sandbox_files]
553  job_dict["working_dir"] = self.working_dir.as_posix()
554  job_dict["output_dir"] = self.output_dir.as_posix()
555  job_dict["output_patterns"] = self.output_patterns
556  job_dict["cmd"] = self.cmd
557  job_dict["args"] = self.args
558  job_dict["input_files"] = self.input_files
559  job_dict["setup_cmds"] = self.setup_cmds
560  job_dict["backend_args"] = self.backend_args
561  job_dict["subjobs"] = [sj.job_dict for sj in self.subjobs.values()]
562  return job_dict
563 
564  def dump_input_data(self):
565  """
566  Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
567  string URI objects.
568  """
569  with open(Path(self.working_dir, _input_data_file_path), mode="w") as input_data_file:
570  json.dump(self.input_files, input_data_file, indent=2)
571 
572  def copy_input_sandbox_files_to_working_dir(self):
573  """
574  Get all of the requested files for the input sandbox and copy them to the working directory.
575  Files like the submit.sh or input_data.json are not part of this process.
576  """
577  for file_path in self.input_sandbox_files:
578  if file_path.is_dir():
579  shutil.copytree(file_path, Path(self.working_dir, file_path.name))
580  else:
581  shutil.copy(file_path, self.working_dir)
582 
583  def check_input_data_files(self):
584  """
585  Check the input files and make sure that there aren't any duplicates.
586  Also check if the files actually exist if possible.
587  """
588  existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
589  for file_path in self.input_files:
590  file_uri = parse_file_uri(file_path)
591  if file_uri.scheme == "file":
592  p = Path(file_uri.path)
593  if p.is_file():
594  if file_uri.geturl() not in existing_input_files:
595  existing_input_files.append(file_uri.geturl())
596  else:
597  B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
598  else:
599  B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
600  else:
601  B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
602  if file_path not in existing_input_files:
603  existing_input_files.append(file_path)
604  else:
605  B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
606  if self.input_files and not existing_input_files:
607  B2WARNING(f"No valid input file paths found for {self.name}, but some were requested.")
608 
609  # Replace the Job's input files with the ones that exist + duplicates removed
610  self.input_files = existing_input_files
611 
612  @property
613  def full_command(self):
614  """
615  Returns:
616  str: The full command that this job will run including any arguments.
617  """
618  all_components = self.cmd[:]
619  all_components.extend(self.args)
620  # We do a convert to string just in case arguments were generated as different types
621  full_command = " ".join(map(str, all_components))
622  B2DEBUG(29, f"Full command of {self} is '{full_command}'")
623  return full_command
624 
625  def append_current_basf2_setup_cmds(self):
626  """
627  This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
628  It should detect if you are using a local release or CVMFS and append the correct commands
629  so that the job will have the same basf2 release environment. It should also detect
630  if a local release is not compiled with the ``opt`` option.
631 
632  Note that this *doesn't mean that every environment variable is inherited* from the submitting
633  process environment.
634  """
635  def append_environment_variable(cmds, envvar):
636  """
637  Append a command for setting an environment variable.
638  """
639  if envvar in os.environ:
640  cmds.append(f"""if [ -z "${{{envvar}}}" ]; then""")
641  cmds.append(f" export {envvar}={os.environ[envvar]}")
642  cmds.append("fi")
643 
644  if "BELLE2_TOOLS" not in os.environ:
645  raise BackendError("No BELLE2_TOOLS found in environment")
646  # Export all the environment variables defined via _backend_job_envvars
647  for envvar in _backend_job_envvars:
648  append_environment_variable(self.setup_cmds, envvar)
649  if "BELLE2_RELEASE" in os.environ:
650  self.setup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
651  elif 'BELLE2_LOCAL_DIR' in os.environ:
652  self.setup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
653  self.setup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
654  self.setup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
655  self.setup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
656  self.setup_cmds.append("pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
657  self.setup_cmds.append("source $BACKEND_B2SETUP")
658  # b2code-option has to be executed only after the source of the tools.
659  self.setup_cmds.append("b2code-option $BACKEND_BELLE2_OPTION")
660  self.setup_cmds.append("popd > /dev/null")
661 
662 
663 class SubJob(Job):
664  """
665  This mini-class simply holds basic information about which subjob you are
666  and a reference to the parent Job object to be able to access the main data there.
667  Rather than replicating all of the parent job's configuration again.
668  """
669 
670  def __init__(self, job, subjob_id, input_files=None):
671  """
672  """
673 
674  self.id = subjob_id
675 
676  self.parent = job
677 
678  if not input_files:
679  input_files = []
680  self.input_files = input_files
681 
683  self.result = None
684 
685  self._status = "init"
686 
687  self.args = []
688 
689  @property
690  def output_dir(self):
691  """
692  Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
693  return Path(self.parent.output_dir, str(self.id))
694 
695  @property
696  def working_dir(self):
697  """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
698  return Path(self.parent.working_dir, str(self.id))
699 
700  @property
701  def name(self):
702  """Getter for name of SubJob. Accesses the parent Job name to infer this."""
703  return "_".join((self.parent.name, str(self.id)))
704 
705  @property
706  def status(self):
707  """
708  Returns the status of this SubJob.
709  """
710  return self._status
711 
712  @status.setter
713  def status(self, status):
714  """
715  Sets the status of this Job.
716  """
717  # Print an error only if the job failed.
718  if status == "failed":
719  B2ERROR(f"Setting {self.name} status to failed")
720  else:
721  B2INFO(f"Setting {self.name} status to {status}")
722  self._status = status
723 
724  @property
725  def subjobs(self):
726  """
727  A subjob cannot have subjobs. Always return empty list.
728  """
729  return []
730 
731  @property
732  def job_dict(self):
733  """
734  Returns:
735  dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
736  `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
737  we only output the input files and arguments that are specific to this subjob and no other details.
738  """
739  job_dict = {}
740  job_dict["id"] = self.id
741  job_dict["input_files"] = self.input_files
742  job_dict["args"] = self.args
743  return job_dict
744 
745  def __getattr__(self, attribute):
746  """
747  Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
748  unless otherwise specified.
749  """
750  return getattr(self.parent, attribute)
751 
752  def __repr__(self):
753  """
754  """
755  return f"SubJob({self.name})"
756 
757 
758 class Backend(ABC):
759  """
760  Abstract base class for a valid backend.
761  Classes derived from this will implement their own submission of basf2 jobs
762  to whatever backend they describe.
763  Some common methods/attributes go into this base class.
764 
765  For backend_args the priority from lowest to highest is:
766 
767  backend.default_backend_args -> backend.backend_args -> job.backend_args
768  """
769 
770  submit_script = "submit.sh"
771 
772  exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
773 
774  default_backend_args = {}
775 
776  def __init__(self, *, backend_args=None):
777  """
778  """
779  if backend_args is None:
780  backend_args = {}
781 
782  self.backend_args = {**self.default_backend_args, **backend_args}
783 
784  @abstractmethod
785  def submit(self, job):
786  """
787  Base method for submitting collection jobs to the backend type. This MUST be
788  implemented for a correctly written backend class deriving from Backend().
789  """
790 
791  @staticmethod
792  def _add_setup(job, batch_file):
793  """
794  Adds setup lines to the shell script file.
795  """
796  for line in job.setup_cmds:
797  print(line, file=batch_file)
798 
799  def _add_wrapper_script_setup(self, job, batch_file):
800  """
801  Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
802  `trap` statements for Ctrl-C situations.
803  """
804  start_wrapper = f"""# ---
805 # trap ctrl-c and call ctrl_c()
806 trap '(ctrl_c 130)' SIGINT
807 trap '(ctrl_c 143)' SIGTERM
808 
809 function write_exit_code() {{
810  echo "Writing $1 to exit status file"
811  echo "$1" > {self.exit_code_file}
812  exit $1
813 }}
814 
815 function ctrl_c() {{
816  trap '' SIGINT SIGTERM
817  echo "** Trapped Ctrl-C **"
818  echo "$1" > {self.exit_code_file}
819  exit $1
820 }}
821 # ---"""
822  print(start_wrapper, file=batch_file)
823 
824  def _add_wrapper_script_teardown(self, job, batch_file):
825  """
826  Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
827  an exit code of the job cmd being written out to a file. Which means that we can know if the command was
828  successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
829  removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
830  file.
831  """
832  end_wrapper = """# ---
833 write_exit_code $?"""
834  print(end_wrapper, file=batch_file)
835 
836  @classmethod
837  def _create_parent_job_result(cls, parent):
838  """
839  We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
840  so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
841  statuses and allows the use of ready().
842  """
843  raise NotImplementedError
844 
845  def get_submit_script_path(self, job):
846  """
847  Construct the Path object of the bash script file that we will submit. It will contain
848  the actual job command, wrapper commands, setup commands, and any batch directives
849  """
850  return Path(job.working_dir, self.submit_script)
851 
852 
853 class Result():
854  """
855  Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
856  submitted to a backend. It provides a way to query a job's status to find out if it's ready.
857  """
858 
859  def __init__(self, job):
860  """
861  Pass in the job object to allow the result to access the job's properties and do post-processing.
862  """
863 
864  self.job = job
865 
866  self._is_ready = False
867 
869  self.time_to_wait_for_exit_code_file = timedelta(minutes=5)
870 
871  self.exit_code_file_initial_time = None
872 
873  def ready(self):
874  """
875  Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
876  the 'readiness' based on the known job status.
877  """
878  B2DEBUG(29, f"Calling {self.job}.result.ready()")
879  if self._is_ready:
880  return True
881  elif self.job.status in self.job.exit_statuses:
882  self._is_ready = True
883  return True
884  else:
885  return False
886 
887  def update_status(self):
888  """
889  Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
890  actually look up the job's status from some database/exit code file.
891  """
892  raise NotImplementedError
893 
894  def get_exit_code_from_file(self):
895  """
896  Read the exit code file to discover the exit status of the job command. Useful falback if the job is no longer
897  known to the job database (batch system purged it for example). Since some backends may take time to download
898  the output files of the job back to the working directory we use a time limit on how long to wait.
899  """
900  if not self.exit_code_file_initial_time:
901  self.exit_code_file_initial_time = datetime.now()
902  exit_code_path = Path(self.job.working_dir, Backend.exit_code_file)
903  with open(exit_code_path) as f:
904  exit_code = int(f.read().strip())
905  B2DEBUG(29, f"Exit code from file for {self.job} was {exit_code}")
906  return exit_code
907 
908 
909 class Local(Backend):
910  """
911  Backend for local processes i.e. on the same machine but in a subprocess.
912 
913  Note that you should call the self.join() method to close the pool and wait for any
914  running processes to finish before exiting the process. Once you've called join you will have to set up a new
915  instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
916  somewhere, then the main python process might end before your pool is done.
917 
918  Keyword Arguments:
919  max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
920  It's the maximium simultaneous subjobs.
921  Try not to specify a large number or a number larger than the number of cores.
922  It won't crash the program but it will slow down and negatively impact performance.
923  """
924 
925  def __init__(self, *, backend_args=None, max_processes=1):
926  """
927  """
928  super().__init__(backend_args=backend_args)
929 
930  self.pool = None
931 
932  self.max_processes = max_processes
933 
934  class LocalResult(Result):
935  """
936  Result class to help monitor status of jobs submitted by Local backend.
937  """
938 
939  def __init__(self, job, result):
940  """
941  Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
942  post processing of the job.
943  """
944  super().__init__(job)
945 
946  self.result = result
947 
948  def _update_result_status(self):
949  if self.result.ready() and (self.job.status not in self.job.exit_statuses):
950  return_code = self.result.get()
951  if return_code:
952  self.job.status = "failed"
953  else:
954  self.job.status = "completed"
955 
956  def update_status(self):
957  """
958  Update the job's (or subjobs') status by calling the result object.
959  """
960  B2DEBUG(29, f"Calling {self.job}.result.update_status()")
961  if self.job.subjobs:
962  for subjob in self.job.subjobs.values():
963  subjob.result._update_result_status()
964  else:
965  self._update_result_status()
966 
967  def join(self):
968  """
969  Closes and joins the Pool, letting you wait for all results currently
970  still processing.
971  """
972  B2INFO("Joining Process Pool, waiting for results to finish...")
973  self.pool.close()
974  self.pool.join()
975  B2INFO("Process Pool joined.")
976 
977  @property
978  def max_processes(self):
979  """
980  Getter for max_processes
981  """
982  return self._max_processes
983 
984  @max_processes.setter
985  def max_processes(self, value):
986  """
987  Setter for max_processes, we also check for a previous Pool(), wait for it to join
988  and then create a new one with the new value of max_processes
989  """
990 
991  self._max_processes = value
992  if self.pool:
993  B2INFO("New max_processes requested. But a pool already exists.")
994  self.join()
995  B2INFO(f"Starting up new Pool with {self.max_processes} processes")
996  self.pool = mp.Pool(processes=self.max_processes)
997 
998  @method_dispatch
999  def submit(self, job):
1000  """
1001  """
1002  raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1003  "Did you submit a (Sub)Job?")
1004 
1005  @submit.register(SubJob)
1006  def _(self, job):
1007  """
1008  Submission of a `SubJob` for the Local backend
1009  """
1010  # Make sure the output directory of the job is created
1011  job.output_dir.mkdir(parents=True, exist_ok=True)
1012  # Make sure the working directory of the job is created
1013  job.working_dir.mkdir(parents=True, exist_ok=True)
1014  job.copy_input_sandbox_files_to_working_dir()
1015  job.dump_input_data()
1016  # Get the path to the bash script we run
1017  script_path = self.get_submit_script_path(job)
1018  with open(script_path, mode="w") as batch_file:
1019  print("#!/bin/bash", file=batch_file)
1020  self._add_wrapper_script_setup(job, batch_file)
1021  self._add_setup(job, batch_file)
1022  print(job.full_command, file=batch_file)
1023  self._add_wrapper_script_teardown(job, batch_file)
1024  B2INFO(f"Submitting {job}")
1025  job.result = Local.LocalResult(job,
1026  self.pool.apply_async(self.run_job,
1027  (job.name,
1028  job.working_dir,
1029  job.output_dir,
1030  script_path)
1031  )
1032  )
1033  job.status = "submitted"
1034  B2INFO(f"{job} submitted")
1035 
1036  @submit.register(Job)
1037  def _(self, job):
1038  """
1039  Submission of a `Job` for the Local backend
1040  """
1041  # Make sure the output directory of the job is created
1042  job.output_dir.mkdir(parents=True, exist_ok=True)
1043  # Make sure the working directory of the job is created
1044  job.working_dir.mkdir(parents=True, exist_ok=True)
1045  # Check if we have any valid input files
1046  job.check_input_data_files()
1047 
1048  if not job.splitter:
1049  # Get all of the requested files for the input sandbox and copy them to the working directory
1050  job.copy_input_sandbox_files_to_working_dir()
1051  job.dump_input_data()
1052  # Get the path to the bash script we run
1053  script_path = self.get_submit_script_path(job)
1054  with open(script_path, mode="w") as batch_file:
1055  print("#!/bin/bash", file=batch_file)
1056  self._add_wrapper_script_setup(job, batch_file)
1057  self._add_setup(job, batch_file)
1058  print(job.full_command, file=batch_file)
1059  self._add_wrapper_script_teardown(job, batch_file)
1060  B2INFO(f"Submitting {job}")
1061  job.result = Local.LocalResult(job,
1062  self.pool.apply_async(self.run_job,
1063  (job.name,
1064  job.working_dir,
1065  job.output_dir,
1066  script_path)
1067  )
1068  )
1069  B2INFO(f"{job} submitted")
1070  else:
1071  # Create subjobs according to the splitter's logic
1072  job.splitter.create_subjobs(job)
1073  # Submit the subjobs
1074  self.submit(list(job.subjobs.values()))
1075  # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1076  self._create_parent_job_result(job)
1077 
1078  @submit.register(list)
1079  def _(self, jobs):
1080  """
1081  Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1082  """
1083  # Submit the jobs
1084  for job in jobs:
1085  self.submit(job)
1086  B2INFO("All requested jobs submitted.")
1087 
1088  @staticmethod
1089  def run_job(name, working_dir, output_dir, script):
1090  """
1091  The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1092  shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1093  """
1094  B2INFO(f"Starting Sub-process: {name}")
1095  from subprocess import Popen
1096  stdout_file_path = Path(working_dir, _STDOUT_FILE)
1097  stderr_file_path = Path(working_dir, _STDERR_FILE)
1098  # Create unix command to redirect stdour and stderr
1099  B2INFO(f"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1100  with open(stdout_file_path, mode="w", buffering=1) as f_out, \
1101  open(stderr_file_path, mode="w", buffering=1) as f_err:
1102  with Popen(["/bin/bash", script.as_posix()],
1103  stdout=f_out,
1104  stderr=f_err,
1105  bufsize=1,
1106  universal_newlines=True,
1107  cwd=working_dir,
1108  env={}) as p:
1109  # We block here and wait so that the return code will be set.
1110  p.wait()
1111  B2INFO(f"Subprocess {name} finished.")
1112  return p.returncode
1113 
1114  @classmethod
1115  def _create_parent_job_result(cls, parent):
1116  parent.result = cls.LocalResult(parent, None)
1117 
1118 
1119 class Batch(Backend):
1120  """
1121  Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1122  in a derived class. Do not use this class directly!
1123  """
1124 
1125  submission_cmds = []
1126 
1138  default_global_job_limit = 1000
1139 
1140  default_sleep_between_submission_checks = 30
1141 
1142  def __init__(self, *, backend_args=None):
1143  """
1144  Init method for Batch Backend. Does some basic default setup.
1145  """
1146  super().__init__(backend_args=backend_args)
1147 
1149  self.global_job_limit = self.default_global_job_limit
1150 
1152  self.sleep_between_submission_checks = self.default_sleep_between_submission_checks
1153 
1154  def _add_batch_directives(self, job, file):
1155  """
1156  Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1157  You should think about where the stdout/err should go, and set the queue name.
1158  """
1159  raise NotImplementedError("Need to implement a _add_batch_directives(self, job, file) "
1160  f"method in {self.__class__.__name__} backend.")
1161 
1162  def _make_submit_file(self, job, submit_file_path):
1163  """
1164  Useful for the HTCondor backend where a submit is needed instead of batch
1165  directives pasted directly into the submission script. It should be overwritten
1166  if needed.
1167  """
1168 
1169  @classmethod
1170  @abstractmethod
1171  def _submit_to_batch(cls, cmd):
1172  """
1173  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1174  """
1175 
1176  def can_submit(self, *args, **kwargs):
1177  """
1178  Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1179  This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1180 
1181  Returns:
1182  bool: If the job submission can continue based on the current situation.
1183  """
1184  return True
1185 
1186  @method_dispatch
1187  def submit(self, job, check_can_submit=True, jobs_per_check=100):
1188  """
1189  """
1190  raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1191  "Did you submit a (Sub)Job?")
1192 
1193  @submit.register(SubJob)
1194  def _(self, job, check_can_submit=True, jobs_per_check=100):
1195  """
1196  Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1197  create batch script, and send it off with the batch submission command.
1198  It should apply the correct options (default and user requested).
1199 
1200  Should set a Result object as an attribute of the job.
1201  """
1202  # Make sure the output directory of the job is created, commented out due to permission issues
1203  # job.output_dir.mkdir(parents=True, exist_ok=True)
1204  # Make sure the working directory of the job is created
1205  job.working_dir.mkdir(parents=True, exist_ok=True)
1206  job.copy_input_sandbox_files_to_working_dir()
1207  job.dump_input_data()
1208  # Make submission file if needed
1209  batch_submit_script_path = self.get_batch_submit_script_path(job)
1210  self._make_submit_file(job, batch_submit_script_path)
1211  # Get the bash file we will actually run, might be the same file
1212  script_path = self.get_submit_script_path(job)
1213  # Construct the batch submission script (with directives if that is supported)
1214  with open(script_path, mode="w") as batch_file:
1215  self._add_batch_directives(job, batch_file)
1216  self._add_wrapper_script_setup(job, batch_file)
1217  self._add_setup(job, batch_file)
1218  print(job.full_command, file=batch_file)
1219  self._add_wrapper_script_teardown(job, batch_file)
1220  os.chmod(script_path, 0o755)
1221  B2INFO(f"Submitting {job}")
1222  # Do the actual batch submission
1223  cmd = self._create_cmd(batch_submit_script_path)
1224  output = self._submit_to_batch(cmd)
1225  self._create_job_result(job, output)
1226  job.status = "submitted"
1227  B2INFO(f"{job} submitted")
1228 
1229  @submit.register(Job)
1230  def _(self, job, check_can_submit=True, jobs_per_check=100):
1231  """
1232  Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1233  and send it off with the batch submission command, applying the correct options (default and user requested.)
1234 
1235  Should set a Result object as an attribute of the job.
1236  """
1237  # Make sure the output directory of the job is created, commented out due to permissions issue
1238  # job.output_dir.mkdir(parents=True, exist_ok=True)
1239  # Make sure the working directory of the job is created
1240  job.working_dir.mkdir(parents=True, exist_ok=True)
1241  # Check if we have any valid input files
1242  job.check_input_data_files()
1243  # Add any required backend args that are missing (I'm a bit hesitant to actually merge with job.backend_args)
1244  # just in case you want to resubmit the same job with different backend settings later.
1245  # job_backend_args = {**self.backend_args, **job.backend_args}
1246 
1247  # If there's no splitter then we just submit the Job with no SubJobs
1248  if not job.splitter:
1249  # Get all of the requested files for the input sandbox and copy them to the working directory
1250  job.copy_input_sandbox_files_to_working_dir()
1251  job.dump_input_data()
1252  # Make submission file if needed
1253  batch_submit_script_path = self.get_batch_submit_script_path(job)
1254  self._make_submit_file(job, batch_submit_script_path)
1255  # Get the bash file we will actually run
1256  script_path = self.get_submit_script_path(job)
1257  # Construct the batch submission script (with directives if that is supported)
1258  with open(script_path, mode="w") as batch_file:
1259  self._add_batch_directives(job, batch_file)
1260  self._add_wrapper_script_setup(job, batch_file)
1261  self._add_setup(job, batch_file)
1262  print(job.full_command, file=batch_file)
1263  self._add_wrapper_script_teardown(job, batch_file)
1264  os.chmod(script_path, 0o755)
1265  B2INFO(f"Submitting {job}")
1266  # Do the actual batch submission
1267  cmd = self._create_cmd(batch_submit_script_path)
1268  output = self._submit_to_batch(cmd)
1269  self._create_job_result(job, output)
1270  job.status = "submitted"
1271  B2INFO(f"{job} submitted")
1272  else:
1273  # Create subjobs according to the splitter's logic
1274  job.splitter.create_subjobs(job)
1275  # Submit the subjobs
1276  self.submit(list(job.subjobs.values()))
1277  # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1278  self._create_parent_job_result(job)
1279 
1280  @submit.register(list)
1281  def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1282  """
1283  Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1284  """
1285  B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1286  # Technically this could be a list of Jobs or SubJobs. And if it is a list of Jobs then it might not
1287  # be necessary to check if we can submit right now. We could do it later during the submission of the
1288  # SubJob list. However in the interest of simpler code we just do the check here, and re-check again
1289  # if a SubJob list comes through this function. Slightly inefficient, but much simpler logic.
1290 
1291  # The first thing to do is make sure that we are iterating through the jobs list in chunks that are
1292  # equal to or smaller than the gloabl limit. Otherwise nothing will ever submit.
1293 
1294  if jobs_per_check > self.global_job_limit:
1295  B2INFO(f"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1296  f"limit for this backend (={self.global_job_limit}). Will instead use the "
1297  " value of the global job limit.")
1298  jobs_per_check = self.global_job_limit
1299 
1300  # We group the jobs list into chunks of length jobs_per_check
1301  for jobs_to_submit in grouper(jobs_per_check, jobs):
1302  # Wait until we are allowed to submit
1303  while not self.can_submit(njobs=len(jobs_to_submit)):
1304  B2INFO("Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1305  time.sleep(self.sleep_between_submission_checks)
1306  else:
1307  # We loop here since we have already checked if the number of jobs is low enough, we don't want to hit this
1308  # function again unless one of the jobs has subjobs.
1309  B2INFO(f"Submitting the next {len(jobs_to_submit)} jobs...")
1310  for job in jobs_to_submit:
1311  self.submit(job, check_can_submit, jobs_per_check)
1312  B2INFO(f"All {len(jobs)} requested jobs submitted")
1313 
1314  def get_batch_submit_script_path(self, job):
1315  """
1316  Construct the Path object of the script file that we will submit using the batch command.
1317  For most batch backends this is the same script as the bash script we submit.
1318  But for some they require a separate submission file that describes the job.
1319  To implement that you can implement this function in the Backend class.
1320  """
1321  return Path(job.working_dir, self.submit_script)
1322 
1323  @classmethod
1324  @abstractmethod
1325  def _create_job_result(cls, job, batch_output):
1326  """
1327  """
1328 
1329  @abstractmethod
1330  def _create_cmd(self, job):
1331  """
1332  """
1333 
1334 
1335 class PBS(Batch):
1336  """
1337  Backend for submitting calibration processes to a qsub batch system.
1338  """
1339 
1340  cmd_wkdir = "#PBS -d"
1341 
1342  cmd_stdout = "#PBS -o"
1343 
1344  cmd_stderr = "#PBS -e"
1345 
1346  cmd_queue = "#PBS -q"
1347 
1348  cmd_name = "#PBS -N"
1349 
1350  submission_cmds = ["qsub"]
1351 
1352  default_global_job_limit = 5000
1353 
1354  default_backend_args = {"queue": "short"}
1355 
1356  def __init__(self, *, backend_args=None):
1357  super().__init__(backend_args=backend_args)
1358 
1359  def _add_batch_directives(self, job, batch_file):
1360  """
1361  Add PBS directives to submitted script.
1362  """
1363  job_backend_args = {**self.backend_args, **job.backend_args}
1364  batch_queue = job_backend_args["queue"]
1365  print("#!/bin/bash", file=batch_file)
1366  print("# --- Start PBS ---", file=batch_file)
1367  print(" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1368  print(" ".join([PBS.cmd_name, job.name]), file=batch_file)
1369  print(" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1370  print(" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1371  print(" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1372  print("# --- End PBS ---", file=batch_file)
1373 
1374  @classmethod
1375  def _create_job_result(cls, job, batch_output):
1376  """
1377  """
1378  job_id = batch_output.replace("\n", "")
1379  B2INFO(f"Job ID of {job} recorded as: {job_id}")
1380  job.result = cls.PBSResult(job, job_id)
1381 
1382  def _create_cmd(self, script_path):
1383  """
1384  """
1385  submission_cmd = self.submission_cmds[:]
1386  submission_cmd.append(script_path.as_posix())
1387  return submission_cmd
1388 
1389  @classmethod
1390  def _submit_to_batch(cls, cmd):
1391  """
1392  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1393  """
1394  sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
1395  return sub_out
1396 
1397  @classmethod
1398  def _create_parent_job_result(cls, parent):
1399  parent.result = cls.PBSResult(parent, None)
1400 
1401  class PBSResult(Result):
1402  """
1403  Simple class to help monitor status of jobs submitted by `PBS` Backend.
1404 
1405  You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1406  When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1407  """
1408 
1409 
1410  backend_code_to_status = {"R": "running",
1411  "C": "completed",
1412  "FINISHED": "completed",
1413  "E": "failed",
1414  "H": "submitted",
1415  "Q": "submitted",
1416  "T": "submitted",
1417  "W": "submitted",
1418  "H": "submitted"
1419  }
1420 
1421  def __init__(self, job, job_id):
1422  """
1423  Pass in the job object and the job id to allow the result to do monitoring and perform
1424  post processing of the job.
1425  """
1426  super().__init__(job)
1427 
1428  self.job_id = job_id
1429 
1430  def update_status(self):
1431  """
1432  Update the job's (or subjobs') status by calling qstat.
1433  """
1434  B2DEBUG(29, f"Calling {self.job}.result.update_status()")
1435  # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
1436  qstat_output = PBS.qstat()
1437  if self.job.subjobs:
1438  for subjob in self.job.subjobs.values():
1439  subjob.result._update_result_status(qstat_output)
1440  else:
1441  self._update_result_status(qstat_output)
1442 
1443  def _update_result_status(self, qstat_output):
1444  """
1445  Parameters:
1446  qstat_output (dict): The JSON output of a previous call to qstat which we can re-use to find the
1447  status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1448  'job_state' information, otherwise it is useless.
1449 
1450  """
1451  try:
1452  backend_status = self._get_status_from_output(qstat_output)
1453  except KeyError:
1454  # If this happens then maybe the job id wasn't in the qstat_output argument because it finished.
1455  # Instead of failing immediately we try looking for the exit code file and then fail if it still isn't there.
1456  B2DEBUG(29, f"Checking of the exit code from file for {self.job}")
1457  try:
1458  exit_code = self.get_exit_code_from_file()
1459  except FileNotFoundError:
1460  waiting_time = datetime.now() - self.exit_code_file_initial_time
1461  if self.time_to_wait_for_exit_code_file > waiting_time:
1462  B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1463  exit_code = 1
1464  else:
1465  B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1466  return
1467  if exit_code:
1468  backend_status = "E"
1469  else:
1470  backend_status = "C"
1471 
1472  try:
1473  new_job_status = self.backend_code_to_status[backend_status]
1474  except KeyError as err:
1475  raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1476 
1477  if new_job_status != self.job.status:
1478  self.job.status = new_job_status
1479 
1480  def _get_status_from_output(self, output):
1481  for job_info in output["JOBS"]:
1482  if job_info["Job_Id"] == self.job_id:
1483  return job_info["job_state"]
1484  else:
1485  raise KeyError
1486 
1487  def can_submit(self, njobs=1):
1488  """
1489  Checks the global number of jobs in PBS right now (submitted or running) for this user.
1490  Returns True if the number is lower that the limit, False if it is higher.
1491 
1492  Parameters:
1493  njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1494  are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1495  assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1496  So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1497  and check again before submitting more.
1498  """
1499  B2DEBUG(29, "Calling PBS().can_submit()")
1500  job_info = self.qstat(username=os.environ["USER"])
1501  total_jobs = job_info["NJOBS"]
1502  B2INFO(f"Total jobs active in the PBS system is currently {total_jobs}")
1503  if (total_jobs + njobs) > self.global_job_limit:
1504  B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1505  return False
1506  else:
1507  B2INFO("There is enough space to submit more jobs.")
1508  return True
1509 
1510  @classmethod
1511  def qstat(cls, username="", job_ids=None):
1512  """
1513  Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1514  ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1515  by qstat.
1516 
1517  PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1518  finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1519  This one should work for Melbourne's cloud computing centre.
1520 
1521  Keyword Args:
1522  username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1523  will be in the output dictionary.
1524  job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1525  the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1526  matching the other filters will be returned.
1527 
1528  Returns:
1529  dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1530 
1531  .. code-block:: python
1532 
1533  {
1534  "NJOBS": int
1535  "JOBS":[
1536  {
1537  <key: value>, ...
1538  }, ...
1539  ]
1540  }
1541  """
1542  B2DEBUG(29, f"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1543  if not job_ids:
1544  job_ids = []
1545  job_ids = set(job_ids)
1546  cmd_list = ["qstat", "-x"]
1547  # We get an XML serialisable summary from qstat. Requires the shell argument.
1548  cmd = " ".join(cmd_list)
1549  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1550  output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1551  jobs_dict = {"NJOBS": 0, "JOBS": []}
1552  jobs_xml = ET.fromstring(output)
1553 
1554  # For a specific job_id we can be a bit more efficient in XML parsing
1555  if len(job_ids) == 1:
1556  job_elem = jobs_xml.find(f"./Job[Job_Id='{list(job_ids)[0]}']")
1557  if job_elem:
1558  jobs_dict["JOBS"].append(cls.create_job_record_from_element(job_elem))
1559  jobs_dict["NJOBS"] = 1
1560  return jobs_dict
1561 
1562  # Since the username given is not exactly the same as the one that PBS stores (<username>@host)
1563  # we have to simply loop through rather than using XPATH.
1564  for job in jobs_xml.iterfind("Job"):
1565  job_owner = job.find("Job_Owner").text.split("@")[0]
1566  if username and username != job_owner:
1567  continue
1568  job_id = job.find("Job_Id").text
1569  if job_ids and job_id not in job_ids:
1570  continue
1571  jobs_dict["JOBS"].append(cls.create_job_record_from_element(job))
1572  jobs_dict["NJOBS"] += 1
1573  # Remove it so that we don't keep checking for it
1574  if job_id in job_ids:
1575  job_ids.remove(job_id)
1576  return jobs_dict
1577 
1578  @staticmethod
1579  def create_job_record_from_element(job_elem):
1580  """
1581  Creates a Job dictionary with various job information from the XML element returned by qstat.
1582 
1583  Parameters:
1584  job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1585 
1586  Returns:
1587  dict: JSON serialisable dictionary of the Job information we are interested in.
1588  """
1589  job_dict = {}
1590  job_dict["Job_Id"] = job_elem.find("Job_Id").text
1591  job_dict["Job_Name"] = job_elem.find("Job_Name").text
1592  job_dict["Job_Owner"] = job_elem.find("Job_Owner").text
1593  job_dict["job_state"] = job_elem.find("job_state").text
1594  job_dict["queue"] = job_elem.find("queue").text
1595  return job_dict
1596 
1597 
1598 class LSF(Batch):
1599  """
1600  Backend for submitting calibration processes to a qsub batch system.
1601  """
1602 
1603  cmd_wkdir = "#BSUB -cwd"
1604 
1605  cmd_stdout = "#BSUB -o"
1606 
1607  cmd_stderr = "#BSUB -e"
1608 
1609  cmd_queue = "#BSUB -q"
1610 
1611  cmd_name = "#BSUB -J"
1612 
1613  submission_cmds = ["bsub", "-env", "\"none\"", "<"]
1614 
1615  default_global_job_limit = 15000
1616 
1617  default_backend_args = {"queue": "s"}
1618 
1619  def __init__(self, *, backend_args=None):
1620  super().__init__(backend_args=backend_args)
1621 
1622  def _add_batch_directives(self, job, batch_file):
1623  """
1624  Adds LSF BSUB directives for the job to a script.
1625  """
1626  job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1627  batch_queue = job_backend_args["queue"]
1628  print("#!/bin/bash", file=batch_file)
1629  print("# --- Start LSF ---", file=batch_file)
1630  print(" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1631  print(" ".join([LSF.cmd_name, job.name]), file=batch_file)
1632  print(" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1633  print(" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1634  print(" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1635  print("# --- End LSF ---", file=batch_file)
1636 
1637  def _create_cmd(self, script_path):
1638  """
1639  """
1640  submission_cmd = self.submission_cmds[:]
1641  submission_cmd.append(script_path.as_posix())
1642  submission_cmd = " ".join(submission_cmd)
1643  return [submission_cmd]
1644 
1645  @classmethod
1646  def _submit_to_batch(cls, cmd):
1647  """
1648  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1649  """
1650  sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1651  return sub_out
1652 
1653  class LSFResult(Result):
1654  """
1655  Simple class to help monitor status of jobs submitted by LSF Backend.
1656 
1657  You pass in a `Job` object and job id from a bsub command.
1658  When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1659  """
1660 
1661 
1662  backend_code_to_status = {"RUN": "running",
1663  "DONE": "completed",
1664  "FINISHED": "completed",
1665  "EXIT": "failed",
1666  "PEND": "submitted"
1667  }
1668 
1669  def __init__(self, job, job_id):
1670  """
1671  Pass in the job object and the job id to allow the result to do monitoring and perform
1672  post processing of the job.
1673  """
1674  super().__init__(job)
1675 
1676  self.job_id = job_id
1677 
1678  def update_status(self):
1679  """
1680  Update the job's (or subjobs') status by calling bjobs.
1681  """
1682  B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
1683  # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
1684  bjobs_output = LSF.bjobs(output_fields=["stat", "id"])
1685  if self.job.subjobs:
1686  for subjob in self.job.subjobs.values():
1687  subjob.result._update_result_status(bjobs_output)
1688  else:
1689  self._update_result_status(bjobs_output)
1690 
1691  def _update_result_status(self, bjobs_output):
1692  """
1693  Parameters:
1694  bjobs_output (dict): The JSON output of a previous call to bjobs which we can re-use to find the
1695  status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1696  'id' information, otherwise it is useless.
1697 
1698  """
1699  try:
1700  backend_status = self._get_status_from_output(bjobs_output)
1701  except KeyError:
1702  # If this happens then maybe the job id wasn't in the bjobs_output argument because it finished.
1703  # Instead of failing immediately we try re-running bjobs here explicitly and then fail if it still isn't there.
1704  bjobs_output = LSF.bjobs(output_fields=["stat", "id"], job_id=str(self.job_id))
1705  try:
1706  backend_status = self._get_status_from_output(bjobs_output)
1707  except KeyError:
1708  # If this happened, maybe we're looking at an old finished job. We could fall back to bhist, but it's
1709  # slow and terrible. Instead let's try looking for the exit code file.
1710  try:
1711  exit_code = self.get_exit_code_from_file()
1712  except FileNotFoundError:
1713  waiting_time = datetime.now() - self.exit_code_file_initial_time
1714  if self.time_to_wait_for_exit_code_file > waiting_time:
1715  B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1716  exit_code = 1
1717  else:
1718  B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1719  return
1720  if exit_code:
1721  backend_status = "EXIT"
1722  else:
1723  backend_status = "FINISHED"
1724  try:
1725  new_job_status = self.backend_code_to_status[backend_status]
1726  except KeyError as err:
1727  raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1728 
1729  if new_job_status != self.job.status:
1730  self.job.status = new_job_status
1731 
1732  def _get_status_from_output(self, output):
1733  if output["JOBS"] and "ERROR" in output["JOBS"][0]:
1734  if output["JOBS"][0]["ERROR"] == f"Job <{self.job_id}> is not found":
1735  raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1736  else:
1737  raise BackendError(f"Unidentified Error during status check for {self.job}: {output}")
1738  else:
1739  for job_info in output["JOBS"]:
1740  if job_info["JOBID"] == self.job_id:
1741  return job_info["STAT"]
1742  else:
1743  raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1744 
1745  @classmethod
1746  def _create_parent_job_result(cls, parent):
1747  parent.result = cls.LSFResult(parent, None)
1748 
1749  @classmethod
1750  def _create_job_result(cls, job, batch_output):
1751  """
1752  """
1753  m = re.search(r"Job <(\d+)>", str(batch_output))
1754  if m:
1755  job_id = m.group(1)
1756  else:
1757  raise BackendError(f"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1758 
1759  B2INFO(f"Job ID of {job} recorded as: {job_id}")
1760  job.result = cls.LSFResult(job, job_id)
1761 
1762  def can_submit(self, njobs=1):
1763  """
1764  Checks the global number of jobs in LSF right now (submitted or running) for this user.
1765  Returns True if the number is lower that the limit, False if it is higher.
1766 
1767  Parameters:
1768  njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1769  are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1770  assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1771  So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1772  and check again before submitting more.
1773  """
1774  B2DEBUG(29, "Calling LSF().can_submit()")
1775  job_info = self.bjobs(output_fields=["stat"])
1776  total_jobs = job_info["NJOBS"]
1777  B2INFO(f"Total jobs active in the LSF system is currently {total_jobs}")
1778  if (total_jobs + njobs) > self.global_job_limit:
1779  B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1780  return False
1781  else:
1782  B2INFO("There is enough space to submit more jobs.")
1783  return True
1784 
1785  @classmethod
1786  def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1787  """
1788  Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1789  'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1790 
1791  Parameters:
1792  output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1793  job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1794  the output of this function will be only information about this job. If this argument is not given, then all jobs
1795  matching the other filters will be returned.
1796  username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1797  a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1798  receive job information from all known user jobs matching the other filters.
1799  queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1800 
1801  Returns:
1802  dict: JSON dictionary of the form:
1803 
1804  .. code-block:: python
1805 
1806  {
1807  "NJOBS":<njobs returned by command>,
1808  "JOBS":[
1809  {
1810  <output field: value>, ...
1811  }, ...
1812  ]
1813  }
1814  """
1815  B2DEBUG(29, f"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1816  # We must always return at least one output field when using JSON and -o options. So we choose the job id
1817  if not output_fields:
1818  output_fields = ["id"]
1819  # Output fields should be space separated but in a string.
1820  field_list_cmd = "\""
1821  field_list_cmd += " ".join(output_fields)
1822  field_list_cmd += "\""
1823  cmd_list = ["bjobs", "-o", field_list_cmd]
1824  # If the queue name is set then we add to the command options
1825  if queue:
1826  cmd_list.extend(["-q", queue])
1827  # If the username is set then we add to the command options
1828  if username:
1829  cmd_list.extend(["-u", username])
1830  # Can now add the json option before the final positional argument (if used)
1831  cmd_list.append("-json")
1832  # If the job id is set then we add to the end of the command
1833  if job_id:
1834  cmd_list.append(job_id)
1835  # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1836  cmd = " ".join(cmd_list)
1837  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1838  output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True))
1839  output["NJOBS"] = output["JOBS"]
1840  output["JOBS"] = output["RECORDS"]
1841  del output["RECORDS"]
1842  del output["COMMAND"]
1843  return output
1844 
1845  @classmethod
1846  def bqueues(cls, output_fields=None, queues=None):
1847  """
1848  Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1849  The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1850 
1851  Parameters:
1852  output_fields (list[str]): A list of bqueues -o fields that you would like information about
1853  e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1854  queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1855  By default you will receive information about all queues.
1856 
1857  Returns:
1858  dict: JSON dictionary of the form:
1859 
1860  .. code-block:: python
1861 
1862  {
1863  "COMMAND":"bqueues",
1864  "QUEUES":46,
1865  "RECORDS":[
1866  {
1867  "QUEUE_NAME":"b2_beast",
1868  "STATUS":"Open:Active",
1869  "MAX":"200",
1870  "NJOBS":"0",
1871  "PEND":"0",
1872  "RUN":"0"
1873  }, ...
1874  }
1875  """
1876  B2DEBUG(29, f"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1877  # We must always return at least one output field when using JSON and -o options. So we choose the job id
1878  if not output_fields:
1879  output_fields = ["queue_name", "status", "max", "njobs", "pend", "run"]
1880  # Output fields should be space separated but in a string.
1881  field_list_cmd = "\""
1882  field_list_cmd += " ".join(output_fields)
1883  field_list_cmd += "\""
1884  cmd_list = ["bqueues", "-o", field_list_cmd]
1885  # Can now add the json option before the final positional argument (if used)
1886  cmd_list.append("-json")
1887  # If the queue name is set then we add to the end of the command
1888  if queues:
1889  cmd_list.extend(queues)
1890  # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1891  cmd = " ".join(cmd_list)
1892  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1893  output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1894  return decode_json_string(output)
1895 
1896 
1897 class HTCondor(Batch):
1898  """
1899  Backend for submitting calibration processes to a HTCondor batch system.
1900  """
1901 
1902  batch_submit_script = "submit.sub"
1903 
1904  submission_cmds = ["condor_submit", "-terse"]
1905 
1906  default_global_job_limit = 10000
1907 
1908  default_backend_args = {
1909  "universe": "vanilla",
1910  "getenv": "false",
1911  "request_memory": "4 GB", # We set the default requested memory to 4 GB to maintain parity with KEKCC
1912  "path_prefix": "", # Path prefix for file path
1913  "extra_lines": [] # These should be other HTCondor submit script lines like 'request_cpus = 2'
1914  }
1915 
1916  default_class_ads = ["GlobalJobId", "JobStatus", "Owner"]
1917 
1918  def _make_submit_file(self, job, submit_file_path):
1919  """
1920  Fill HTCondor submission file.
1921  """
1922  # Find all files/directories in the working directory to copy on the worker node
1923 
1924  files_to_transfer = [i.as_posix() for i in job.working_dir.iterdir()]
1925 
1926  job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1927 
1928  with open(submit_file_path, "w") as submit_file:
1929  print(f'executable = {self.get_submit_script_path(job)}', file=submit_file)
1930  print(f'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1931  print(f'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1932  print(f'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1933  print('transfer_input_files = ', ','.join(files_to_transfer), file=submit_file)
1934  print(f'universe = {job_backend_args["universe"]}', file=submit_file)
1935  print(f'getenv = {job_backend_args["getenv"]}', file=submit_file)
1936  print(f'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1937  print('should_transfer_files = Yes', file=submit_file)
1938  print('when_to_transfer_output = ON_EXIT', file=submit_file)
1939  # Any other lines in the backend args that we don't deal with explicitly but maybe someone wants to insert something
1940  for line in job_backend_args["extra_lines"]:
1941  print(line, file=submit_file)
1942  print('queue', file=submit_file)
1943 
1944  def _add_batch_directives(self, job, batch_file):
1945  """
1946  For HTCondor leave empty as the directives are already included in the submit file.
1947  """
1948  print('#!/bin/bash', file=batch_file)
1949 
1950  def _create_cmd(self, script_path):
1951  """
1952  """
1953  submission_cmd = self.submission_cmds[:]
1954  submission_cmd.append(script_path.as_posix())
1955  return submission_cmd
1956 
1957  def get_batch_submit_script_path(self, job):
1958  """
1959  Construct the Path object of the .sub file that we will use to describe the job.
1960  """
1961  return Path(job.working_dir, self.batch_submit_script)
1962 
1963  @classmethod
1964  def _submit_to_batch(cls, cmd):
1965  """
1966  Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1967  """
1968  job_dir = Path(cmd[-1]).parent.as_posix()
1969  sub_out = ""
1970  attempt = 0
1971  sleep_time = 30
1972 
1973  while attempt < 3:
1974  try:
1975  sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, cwd=job_dir)
1976  break
1977  except subprocess.CalledProcessError as e:
1978  attempt += 1
1979  if attempt == 3:
1980  B2ERROR(f"Error during condor_submit: {str(e)} occurred more than 3 times.")
1981  raise e
1982  else:
1983  B2ERROR(f"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
1984  time.sleep(30)
1985  return sub_out.split()[0]
1986 
1987  class HTCondorResult(Result):
1988  """
1989  Simple class to help monitor status of jobs submitted by HTCondor Backend.
1990 
1991  You pass in a `Job` object and job id from a condor_submit command.
1992  When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
1993  to see whether or not the job has finished.
1994  """
1995 
1996 
1997  backend_code_to_status = {0: "submitted",
1998  1: "submitted",
1999  2: "running",
2000  3: "failed",
2001  4: "completed",
2002  5: "submitted",
2003  6: "failed"
2004  }
2005 
2006  def __init__(self, job, job_id):
2007  """
2008  Pass in the job object and the job id to allow the result to do monitoring and perform
2009  post processing of the job.
2010  """
2011  super().__init__(job)
2012 
2013  self.job_id = job_id
2014 
2015  def update_status(self):
2016  """
2017  Update the job's (or subjobs') status by calling condor_q.
2018  """
2019  B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
2020  # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
2021  condor_q_output = HTCondor.condor_q()
2022  if self.job.subjobs:
2023  for subjob in self.job.subjobs.values():
2024  subjob.result._update_result_status(condor_q_output)
2025  else:
2026  self._update_result_status(condor_q_output)
2027 
2028  def _update_result_status(self, condor_q_output):
2029  """
2030  In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2031  If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2032  ``condor_history``, etc.
2033 
2034  Parameters:
2035  condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can re-use to find the
2036  status of this job if it was active when that command ran.
2037  """
2038  B2DEBUG(29, f"Calling {self.job}.result._update_result_status()")
2039  jobs_info = []
2040  for job_record in condor_q_output["JOBS"]:
2041  job_id = job_record["GlobalJobId"].split("#")[1]
2042  if job_id == self.job_id:
2043  B2DEBUG(29, f"Found {self.job_id} in condor_q_output.")
2044  jobs_info.append(job_record)
2045 
2046  # Let's look for the exit code file where we expect it
2047  if not jobs_info:
2048  try:
2049  exit_code = self.get_exit_code_from_file()
2050  except FileNotFoundError:
2051  waiting_time = datetime.now() - self.exit_code_file_initial_time
2052  if self.time_to_wait_for_exit_code_file > waiting_time:
2053  B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2054  exit_code = 1
2055  else:
2056  B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
2057  return
2058  if exit_code:
2059  jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2060  else:
2061  jobs_info = [{"JobStatus": 4, "HoldReason": None}] # Set to completed
2062 
2063  # If this job wasn't in the passed in condor_q output, let's try our own with the specific job_id
2064  if not jobs_info:
2065  jobs_info = HTCondor.condor_q(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2066 
2067  # If no job information is returned then the job already left the queue
2068  # check in the history to see if it suceeded or failed
2069  if not jobs_info:
2070  try:
2071  jobs_info = HTCondor.condor_history(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2072  except KeyError:
2073  hold_reason = "No Reason Known"
2074 
2075  # Still no record of it after waiting for the exit code file?
2076  if not jobs_info:
2077  jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2078 
2079  job_info = jobs_info[0]
2080  backend_status = job_info["JobStatus"]
2081  # if job is held (backend_status = 5) then report why then keep waiting
2082  if backend_status == 5:
2083  hold_reason = job_info.get("HoldReason", None)
2084  B2WARNING(f"{self.job} on hold because of {hold_reason}. Keep waiting.")
2085  backend_status = 2
2086  try:
2087  new_job_status = self.backend_code_to_status[backend_status]
2088  except KeyError as err:
2089  raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
2090  if new_job_status != self.job.status:
2091  self.job.status = new_job_status
2092 
2093  @classmethod
2094  def _create_job_result(cls, job, job_id):
2095  """
2096  """
2097  B2INFO(f"Job ID of {job} recorded as: {job_id}")
2098  job.result = cls.HTCondorResult(job, job_id)
2099 
2100  @classmethod
2101  def _create_parent_job_result(cls, parent):
2102  parent.result = cls.HTCondorResult(parent, None)
2103 
2104  def can_submit(self, njobs=1):
2105  """
2106  Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2107  Returns True if the number is lower that the limit, False if it is higher.
2108 
2109  Parameters:
2110  njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2111  are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2112  assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2113  So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2114  and check again before submitting more.
2115  """
2116  B2DEBUG(29, "Calling HTCondor().can_submit()")
2117  jobs_info = self.condor_q()
2118  total_jobs = jobs_info["NJOBS"]
2119  B2INFO(f"Total jobs active in the HTCondor system is currently {total_jobs}")
2120  if (total_jobs + njobs) > self.global_job_limit:
2121  B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2122  return False
2123  else:
2124  B2INFO("There is enough space to submit more jobs.")
2125  return True
2126 
2127  @classmethod
2128  def condor_q(cls, class_ads=None, job_id="", username=""):
2129  """
2130  Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2131  'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2132  The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2133 
2134  Parameters:
2135  class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2136  By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2137  by the condor_q call.
2138  job_id (str): String representation of the Job ID given by condor_submit during submission.
2139  If this argument is given then the output of this function will be only information about this job.
2140  If this argument is not given, then all jobs matching the other filters will be returned.
2141  username (str): By default we return information about only the current user's jobs. By giving
2142  a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2143  receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2144  so it isn't recommended.
2145 
2146  Returns:
2147  dict: JSON dictionary of the form:
2148 
2149  .. code-block:: python
2150 
2151  {
2152  "NJOBS":<number of records returned by command>,
2153  "JOBS":[
2154  {
2155  <ClassAd: value>, ...
2156  }, ...
2157  ]
2158  }
2159  """
2160  B2DEBUG(29, f"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2161  if not class_ads:
2162  class_ads = cls.default_class_ads
2163  # Output fields should be comma separated.
2164  field_list_cmd = ",".join(class_ads)
2165  cmd_list = ["condor_q", "-json", "-attributes", field_list_cmd]
2166  # If job_id is set then we ignore all other filters
2167  if job_id:
2168  cmd_list.append(job_id)
2169  else:
2170  if not username:
2171  username = os.environ["USER"]
2172  # If the username is set to all it is a special case
2173  if username == "all":
2174  cmd_list.append("-allusers")
2175  else:
2176  cmd_list.append(username)
2177  # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2178  cmd = " ".join(cmd_list)
2179  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2180  # condor_q occassionally fails
2181  try:
2182  records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2183  except BaseException:
2184  records = None
2185 
2186  if records:
2187  records = decode_json_string(records)
2188  else:
2189  records = []
2190  jobs_info = {"JOBS": records}
2191  jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2192  return jobs_info
2193 
2194  @classmethod
2195  def condor_history(cls, class_ads=None, job_id="", username=""):
2196  """
2197  Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2198  ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2199  The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2200 
2201  Parameters:
2202  class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2203  By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2204  by the condor_q call.
2205  job_id (str): String representation of the Job ID given by condor_submit during submission.
2206  If this argument is given then the output of this function will be only information about this job.
2207  If this argument is not given, then all jobs matching the other filters will be returned.
2208  username (str): By default we return information about only the current user's jobs. By giving
2209  a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2210  receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2211  and isn't recommended.
2212 
2213  Returns:
2214  dict: JSON dictionary of the form:
2215 
2216  .. code-block:: python
2217 
2218  {
2219  "NJOBS":<number of records returned by command>,
2220  "JOBS":[
2221  {
2222  <ClassAd: value>, ...
2223  }, ...
2224  ]
2225  }
2226  """
2227  B2DEBUG(29, f"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2228  if not class_ads:
2229  class_ads = cls.default_class_ads
2230  # Output fields should be comma separated.
2231  field_list_cmd = ",".join(class_ads)
2232  cmd_list = ["condor_history", "-json", "-attributes", field_list_cmd]
2233  # If job_id is set then we ignore all other filters
2234  if job_id:
2235  cmd_list.append(job_id)
2236  else:
2237  if not username:
2238  username = os.environ["USER"]
2239  # If the username is set to all it is a special case
2240  if username != "all":
2241  cmd_list.append(username)
2242  # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2243  cmd = " ".join(cmd_list)
2244  B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2245  try:
2246  records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2247  except BaseException:
2248  records = None
2249 
2250  if records:
2251  records = decode_json_string(records)
2252  else:
2253  records = []
2254 
2255  jobs_info = {"JOBS": records}
2256  jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2257  return jobs_info
2258 
2259 
2260 class DIRAC(Backend):
2261  """
2262  Backend for submitting calibration processes to the grid.
2263  """
2264 
2265 
2266 class BackendError(Exception):
2267  """
2268  Base exception class for Backend classes.
2269  """
2270 
2271 
2272 class JobError(Exception):
2273  """
2274  Base exception class for Job objects.
2275  """
2276 
2277 
2278 class SplitterError(Exception):
2279  """
2280  Base exception class for SubjobSplitter objects.
2281  """
2282 
2283 # @endcond