Belle II Software development
backends.py
1#!/usr/bin/env python3
2
3
10
11from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
12import re
13import os
14from abc import ABC, abstractmethod
15import json
16import xml.etree.ElementTree as ET
17from math import ceil
18from pathlib import Path
19from collections import deque
20from itertools import count, takewhile
21import shutil
22import time
23from datetime import datetime, timedelta
24import subprocess
25import multiprocessing as mp
26
27from caf.utils import method_dispatch
28from caf.utils import decode_json_string
29from caf.utils import grouper
30from caf.utils import parse_file_uri
31
32
33__all__ = ["Job", "SubJob", "Backend", "Local", "Batch", "LSF", "PBS", "HTCondor", "get_input_data"]
34
35
36_input_data_file_path = Path("__BACKEND_INPUT_FILES__.json")
37
38_STDOUT_FILE = "stdout"
39
40_STDERR_FILE = "stderr"
41
42_backend_job_envvars = (
43 "TMPDIR",
44 "BELLE2_CONFIG_DIR",
45 "VO_BELLE2_SW_DIR",
46 "BELLE2_EXTERNALS_TOPDIR",
47 "BELLE2_CONDB_METADATA",
48 "BELLE2_CONDB_PROXY",
49)
50
51
52def get_input_data():
53 """
54 Simple JSON load of the default input data file. Will contain a list of string file paths
55 for use by the job process.
56 """
57 with open(_input_data_file_path) as input_data_file:
58 input_data = json.load(input_data_file)
59 return input_data
60
61
62def monitor_jobs(args, jobs):
63 unfinished_jobs = jobs[:]
64 failed_jobs = 0
65 while unfinished_jobs:
66 B2INFO("Updating statuses of unfinished jobs...")
67 for j in unfinished_jobs:
68 j.update_status()
69 B2INFO("Checking if jobs are ready...")
70 for j in unfinished_jobs[:]:
71 if j.ready():
72 if j.status == "failed":
73 B2ERROR(f"{j} is failed")
74 failed_jobs += 1
75 else:
76 B2INFO(f"{j} is finished")
77 unfinished_jobs.remove(j)
78 if unfinished_jobs:
79 B2INFO(f"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
80 time.sleep(args.heartbeat)
81 if failed_jobs > 0:
82 B2ERROR(f"{failed_jobs} jobs failed")
83 else:
84 B2INFO('All jobs finished successfully')
85
86
88 """
89 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
90 initialise it. This lets us reuse a generator by setting it up again fresh. This is not
91 optimal for expensive calculations, but it is nice for making large sequences of
92 Job input arguments on the fly.
93 """
94 def __init__(self, generator_function, *args, **kwargs):
95 """
96 Parameters:
97 generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
98 should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
99 yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
100 args (tuple): The positional arguments you want to send into the initialisation of the generator.
101 kwargs (dict): The keyword arguments you want to send into the initialisation of the generator.
102 """
103
104 self.generator_function = generator_function
105
106 self.args = args
107
108 self.kwargs = kwargs
109
110 @property
111 def generator(self):
112 """
113 Returns:
114 generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
115 to have ``next``/``send`` called on it.
116 """
117 gen = self.generator_function(*self.args, **self.kwargs)
118 gen.send(None) # Prime it
119 return gen
120
121
122def range_arguments(start=0, stop=None, step=1):
123 """
124 A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
125 It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
126 The `SubJob` object is sent into this function with `send` but is not used.
127
128 Parameters:
129 start (int): The starting value that will be returned.
130 stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
131 forever.
132 step (int): The step size.
133
134 Returns:
135 tuple
136 """
137 def should_stop(x):
138 if stop is not None and x >= stop:
139 return True
140 else:
141 return False
142 # The logic of send/yield is tricky. Basically the first yield must have been hit before send(subjob) can work.
143 # However the first yield is also the one that receives the send(subjob) NOT the send(None).
144 subjob = (yield None) # Handle the send(None) and first send(subjob)
145 args = None
146 for i in takewhile(lambda x: not should_stop(x), count(start, step)):
147 args = (i,) # We could have used the subjob object here
148 B2DEBUG(29, f"{subjob} arguments will be {args}")
149 subjob = (yield args) # Set up ready for the next loop (if it happens)
150
151
152class SubjobSplitter(ABC):
153 """
154 Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
155 The `create_subjobs` function should be implemented and used to construct
156 the subjobs of the parent job object.
157
158 Parameters:
159 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
160 tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
161 called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
162 arguments to return.
163 """
164
165 def __init__(self, *, arguments_generator=None):
166 """
167 Derived classes should call `super` to run this.
168 """
169
170 self.arguments_generator = arguments_generator
171
172 @abstractmethod
173 def create_subjobs(self, job):
174 """
175 Implement this method in derived classes to generate the `SubJob` objects.
176 """
177
178 def assign_arguments(self, job):
179 """
180 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
181 subjobs.
182 """
183 if self.arguments_generator:
184 arg_gen = self.arguments_generator.generator
185 generating = True
186 for subjob in sorted(job.subjobs.values(), key=lambda sj: sj.id):
187 if generating:
188 try:
189 args = arg_gen.send(subjob)
190 except StopIteration:
191 B2ERROR(f"StopIteration called when getting args for {subjob}, "
192 "setting all subsequent subjobs to have empty argument tuples.")
193 args = tuple() # If our generator finishes before the subjobs we use empty argument tuples
194 generating = False
195 else:
196 args = tuple()
197 B2DEBUG(29, f"Arguments for {subjob}: {args}")
198 subjob.args = args
199 return
200
201 B2INFO(f"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
202 "won't automatically have arguments assigned.")
203
204 def __repr__(self):
205 """
206 """
207 return f"{self.__class__.__name__}"
208
209
211 """
212
213 """
214 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
215 """
216 Parameters:
217 max_files_per_subjob (int): The maximum number of input files used per `SubJob` created.
218 """
219 super().__init__(arguments_generator=arguments_generator)
220
221 self.max_files_per_subjob = max_files_per_subjob
222
223 def create_subjobs(self, job):
224 """
225 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
226 in order to prevent the number of input files per subjob going over the limit set by
227 `MaxFilesSplitter.max_files_per_subjob`.
228 """
229 if not job.input_files:
230 B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
231 return
232
233 for i, subjob_input_files in enumerate(grouper(self.max_files_per_subjob, job.input_files)):
234 job.create_subjob(i, input_files=subjob_input_files)
235
236 self.assign_arguments(job)
237
238 B2INFO(f"{self} created {i+1} Subjobs for {job}")
239
240
242 """
243
244 """
245 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
246 """
247 Parameters:
248 max_subjobs (int): The maximum number ofsubjobs that will be created.
249 """
250 super().__init__(arguments_generator=arguments_generator)
251
252 self.max_subjobs = max_subjobs
253
254 def create_subjobs(self, job):
255 """
256 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
257 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
258 more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
259 respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
260 will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
261 """
262 if not job.input_files:
263 B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
264 return
265
266 # Since we will be popping from the left of the input file list, we construct a deque
267 remaining_input_files = deque(job.input_files)
268 # The initial number of available subjobs, will go down as we create them
269 available_subjobs = self.max_subjobs
270 subjob_i = 0
271 while remaining_input_files:
272 # How many files should we use for this subjob?
273 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
274 # Pop them from the remaining files
275 subjob_input_files = []
276 for i in range(num_input_files):
277 subjob_input_files.append(remaining_input_files.popleft())
278 # Create the actual subjob
279 job.create_subjob(subjob_i, input_files=subjob_input_files)
280 subjob_i += 1
281 available_subjobs -= 1
282
283 self.assign_arguments(job)
284 B2INFO(f"{self} created {subjob_i} Subjobs for {job}")
285
286
288 """
289 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
290 Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
291 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
292
293 This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
294 numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
295 assigned to every `SubJob`.
296
297 Parameters:
298 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
299 """
300
301 def __init__(self, *, arguments_generator=None, max_subjobs=None):
302 """
303 """
304 super().__init__(arguments_generator=arguments_generator)
305
306 self.max_subjobs = max_subjobs
307
308 def create_subjobs(self, job):
309 """
310 This function creates subjobs for the parent job passed in. It creates subjobs until the
311 `SubjobSplitter.arguments_generator` finishes.
312
313 If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
314 subjobs are created.
315 """
316 arg_gen = self.arguments_generator.generator
317 for i in count():
318 # Reached the maximum?
319 if i >= self.max_subjobs:
320 raise SplitterError(f"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
321 try:
322 subjob = SubJob(job, i, job.input_files) # We manually create it because we might not need it
323 args = arg_gen.send(subjob) # Might throw StopIteration
324 B2INFO(f"Creating {job}.{subjob}")
325 B2DEBUG(29, f"Arguments for {subjob}: {args}")
326 subjob.args = args
327 job.subjobs[i] = subjob
328 except StopIteration:
329 break
330 B2INFO(f"{self} created {i+1} Subjobs for {job}")
331
332
333class Job:
334 """
335 This generic Job object is used to tell a Backend what to do.
336 This object basically holds necessary information about a process you want to submit to a `Backend`.
337 It should *not* do anything that is backend specific, just hold the configuration for a job to be
338 successfully submitted and monitored using a backend. The result attribute is where backend
339 specific job monitoring goes.
340
341 Parameters:
342 name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
343
344 .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
345 """
346
347
349 statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
350
351
352 exit_statuses = ["failed", "completed"]
353
354 def __init__(self, name, job_dict=None):
355 """
356 """
357
358 self.name = name
359
360 self.splitter = None
361
362 if not job_dict:
363
366
367 self.working_dir = Path()
368
369 self.output_dir = Path()
370
372
373 self.cmd = []
374
375 self.args = []
376
377 self.input_files = []
378
379 self.setup_cmds = []
380
382 self.backend_args = {}
383
384 self.subjobs = {}
385 elif job_dict:
386 self.input_sandbox_files = [Path(p) for p in job_dict["input_sandbox_files"]]
387 self.working_dir = Path(job_dict["working_dir"])
388 self.output_dir = Path(job_dict["output_dir"])
389 self.output_patterns = job_dict["output_patterns"]
390 self.cmd = job_dict["cmd"]
391 self.args = job_dict["args"]
392 self.input_files = job_dict["input_files"]
393 self.setup_cmds = job_dict["setup_cmds"]
394 self.backend_args = job_dict["backend_args"]
395 self.subjobs = {}
396 for subjob_dict in job_dict["subjobs"]:
397 self.create_subjob(subjob_dict["id"], input_files=subjob_dict["input_files"], args=subjob_dict["args"])
398
399
401 self.result = None
402
403 self._status = "init"
404
405 def __repr__(self):
406 """
407 Representation of Job class (what happens when you print a Job() instance).
408 """
409 return f"Job({self.name})"
410
411 def ready(self):
412 """
413 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
414 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
415 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
416 """
417 if not self.result:
418 B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
419 return False
420 else:
421 return self.result.ready()
422
423 def update_status(self):
424 """
425 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
426 in the best way for the type of result object/backend.
427 """
428 if not self.result:
429 B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
430 else:
431 self.result.update_status()
432 return self.status
433
434 def create_subjob(self, i, input_files=None, args=None):
435 """
436 Creates a subjob Job object that references that parent Job.
437 Returns the SubJob object at the end.
438 """
439 if i not in self.subjobs:
440 B2INFO(f"Creating {self}.Subjob({i})")
441 subjob = SubJob(self, i, input_files)
442 if args:
443 subjob.args = args
444 self.subjobs[i] = subjob
445 return subjob
446 else:
447 B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
448
449 @property
450 def status(self):
451 """
452 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
453 subjob status in the hierarchy of statuses in `Job.statuses`.
454 """
455 if self.subjobs:
456 job_status = self._get_overall_status_from_subjobs()
457 if job_status != self._status:
458
459 self.status = job_status
460 return self._status
461
463 """
464 """
465 subjob_statuses = [subjob.status for subjob in self.subjobs.values()]
466 status_level = min([self.statuses[status] for status in subjob_statuses])
467 for status, level in self.statuses.items():
468 if level == status_level:
469 return status
470
471 @status.setter
472 def status(self, status):
473 """
474 Sets the status of this Job.
475 """
476 # Print an error only if the job failed.
477 if status == 'failed':
478 B2ERROR(f"Setting {self.name} status to failed")
479 else:
480 B2INFO(f"Setting {self.name} status to {status}")
481 self._status = status
482
483 # \cond silence doxygen warnings
484
485 @property
486 def output_dir(self):
487 return self._output_dir
488
489 @output_dir.setter
490 def output_dir(self, value):
491 self._output_dir = Path(value).absolute()
492
493 @property
494 def working_dir(self):
495 return self._working_dir
496
497 @working_dir.setter
498 def working_dir(self, value):
499 self._working_dir = Path(value).absolute()
500
501 @property
502 def input_sandbox_files(self):
503 return self._input_sandbox_files
504
505 @input_sandbox_files.setter
506 def input_sandbox_files(self, value):
507 self._input_sandbox_files = [Path(p).absolute() for p in value]
508
509 @property
510 def input_files(self):
511 return self._input_files
512
513 @input_files.setter
514 def input_files(self, value):
515 self._input_files = value
516
517 @property
518 def max_subjobs(self):
519 return self.splitter.max_subjobs
520
521 @max_subjobs.setter
522 def max_subjobs(self, value):
523 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
524 B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
525
526 @property
527 def max_files_per_subjob(self):
528 return self.splitter.max_files_per_subjob
529
530 @max_files_per_subjob.setter
531 def max_files_per_subjob(self, value):
532 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
533 B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
534
535 # \endcond
536
537 def dump_to_json(self, file_path):
538 """
539 Dumps the Job object configuration to a JSON file so that it can be read in again later.
540
541 Parameters:
542 file_path(`basf2.Path`): The filepath we'll dump to
543 """
544 # \cond false positive doxygen warning about job_dict
545 with open(file_path, mode="w") as job_file:
546 json.dump(self.job_dict, job_file, indent=2)
547 # \endcond
548
549 @classmethod
550 def from_json(cls, file_path):
551 """
552 """
553 with open(file_path) as job_file:
554 job_dict = json.load(job_file)
555 return cls(job_dict["name"], job_dict=job_dict)
556
557 @property
558 def job_dict(self):
559 """
560 Returns:
561 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
562 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
563 """
564 job_dict = {}
565 job_dict["name"] = self.name
566 job_dict["input_sandbox_files"] = [i.as_posix() for i in self.input_sandbox_files]
567 job_dict["working_dir"] = self.working_dir.as_posix()
568 job_dict["output_dir"] = self.output_dir.as_posix()
569 job_dict["output_patterns"] = self.output_patterns
570 job_dict["cmd"] = self.cmd
571 job_dict["args"] = self.args
572 job_dict["input_files"] = self.input_files
573 job_dict["setup_cmds"] = self.setup_cmds
574 job_dict["backend_args"] = self.backend_args
575 job_dict["subjobs"] = [sj.job_dict for sj in self.subjobs.values()]
576 return job_dict
577
579 """
580 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
581 string URI objects.
582 """
583 with open(Path(self.working_dir, _input_data_file_path), mode="w") as input_data_file:
584 json.dump(self.input_files, input_data_file, indent=2)
585
587 """
588 Get all of the requested files for the input sandbox and copy them to the working directory.
589 Files like the submit.sh or input_data.json are not part of this process.
590 """
591 for file_path in self.input_sandbox_files:
592 if file_path.is_dir():
593 shutil.copytree(file_path, Path(self.working_dir, file_path.name))
594 else:
595 shutil.copy(file_path, self.working_dir)
596
598 """
599 Check the input files and make sure that there aren't any duplicates.
600 Also check if the files actually exist if possible.
601 """
602 existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
603 for file_path in self.input_files:
604 file_uri = parse_file_uri(file_path)
605 if file_uri.scheme == "file":
606 p = Path(file_uri.path)
607 if p.is_file():
608 if file_uri.geturl() not in existing_input_files:
609 existing_input_files.append(file_uri.geturl())
610 else:
611 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
612 else:
613 B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
614 else:
615 B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
616 if file_path not in existing_input_files:
617 existing_input_files.append(file_path)
618 else:
619 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
620 if self.input_files and not existing_input_files:
621 B2WARNING(f"No valid input file paths found for {self.name}, but some were requested.")
622
623 # Replace the Job's input files with the ones that exist + duplicates removed
624 self.input_files = existing_input_files
625
626 @property
627 def full_command(self):
628 """
629 Returns:
630 str: The full command that this job will run including any arguments.
631 """
632 all_components = self.cmd[:]
633 all_components.extend(self.args)
634 # We do a convert to string just in case arguments were generated as different types
635 full_command = " ".join(map(str, all_components))
636 B2DEBUG(29, f"Full command of {self} is '{full_command}'")
637 return full_command
638
640 """
641 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
642 It should detect if you are using a local release or CVMFS and append the correct commands
643 so that the job will have the same basf2 release environment. It should also detect
644 if a local release is not compiled with the ``opt`` option.
645
646 Note that this *doesn't mean that every environment variable is inherited* from the submitting
647 process environment.
648 """
649 def append_environment_variable(cmds, envvar):
650 """
651 Append a command for setting an environment variable.
652 """
653 if envvar in os.environ:
654 cmds.append(f"""if [ -z "${{{envvar}}}" ]; then""")
655 cmds.append(f" export {envvar}={os.environ[envvar]}")
656 cmds.append("fi")
657
658 if "BELLE2_TOOLS" not in os.environ:
659 raise BackendError("No BELLE2_TOOLS found in environment")
660 # Export all the environment variables defined via _backend_job_envvars
661 for envvar in _backend_job_envvars:
662 append_environment_variable(self.setup_cmds, envvar)
663 if "BELLE2_RELEASE" in os.environ:
664 self.setup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
665 elif 'BELLE2_LOCAL_DIR' in os.environ:
666 self.setup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
667 self.setup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
668 self.setup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
669 self.setup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
670 self.setup_cmds.append("pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
671 self.setup_cmds.append("source $BACKEND_B2SETUP")
672 # b2code-option has to be executed only after the source of the tools.
673 self.setup_cmds.append("b2code-option $BACKEND_BELLE2_OPTION")
674 self.setup_cmds.append("popd > /dev/null")
675
676
677class SubJob(Job):
678 """
679 This mini-class simply holds basic information about which subjob you are
680 and a reference to the parent Job object to be able to access the main data there.
681 Rather than replicating all of the parent job's configuration again.
682 """
683
684 def __init__(self, job, subjob_id, input_files=None):
685 """
686 """
687
688 self.id = subjob_id
689
690 self.parent = job
691
692 if not input_files:
693 input_files = []
694 self.input_files = input_files
695
697 self.result = None
698
699 self._status = "init"
700
701 self.args = []
702
703 @property
704 def output_dir(self):
705 """
706 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
707 return Path(self.parent.output_dir, str(self.id))
708
709 @property
710 def working_dir(self):
711 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
712 return Path(self.parent.working_dir, str(self.id))
713
714 @property
715 def name(self):
716 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
717 return "_".join((self.parent.name, str(self.id)))
718
719 @property
720 def status(self):
721 """
722 Returns the status of this SubJob.
723 """
724 return self._status
725
726 @status.setter
727 def status(self, status):
728 """
729 Sets the status of this Job.
730 """
731 # Print an error only if the job failed.
732 if status == "failed":
733 B2ERROR(f"Setting {self.name} status to failed")
734 else:
735 B2INFO(f"Setting {self.name} status to {status}")
736 self._status = status
737
738 @property
739 def subjobs(self):
740 """
741 A subjob cannot have subjobs. Always return empty list.
742 """
743 return []
744
745 @property
746 def job_dict(self):
747 """
748 Returns:
749 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
750 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
751 we only output the input files and arguments that are specific to this subjob and no other details.
752 """
753 job_dict = {}
754 job_dict["id"] = self.id
755 job_dict["input_files"] = self.input_files
756 job_dict["args"] = self.args
757 return job_dict
758
759 def __getattr__(self, attribute):
760 """
761 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
762 unless otherwise specified.
763 """
764 return getattr(self.parent, attribute)
765
766 def __repr__(self):
767 """
768 """
769 return f"SubJob({self.name})"
770
771
772class Backend(ABC):
773 """
774 Abstract base class for a valid backend.
775 Classes derived from this will implement their own submission of basf2 jobs
776 to whatever backend they describe.
777 Some common methods/attributes go into this base class.
778
779 For backend_args the priority from lowest to highest is:
780
781 backend.default_backend_args -> backend.backend_args -> job.backend_args
782 """
783
784 submit_script = "submit.sh"
785
786 exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
787
788 default_backend_args = {}
789
790 def __init__(self, *, backend_args=None):
791 """
792 """
793 if backend_args is None:
794 backend_args = {}
795
796 self.backend_args = {**self.default_backend_args, **backend_args}
797
798 @abstractmethod
799 def submit(self, job):
800 """
801 Base method for submitting collection jobs to the backend type. This MUST be
802 implemented for a correctly written backend class deriving from Backend().
803 """
804
805 @staticmethod
806 def _add_setup(job, batch_file):
807 """
808 Adds setup lines to the shell script file.
809 """
810 for line in job.setup_cmds:
811 print(line, file=batch_file)
812
813 def _add_wrapper_script_setup(self, job, batch_file):
814 """
815 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
816 `trap` statements for Ctrl-C situations.
817 """
818 start_wrapper = f"""# ---
819# trap ctrl-c and call ctrl_c()
820trap '(ctrl_c 130)' SIGINT
821trap '(ctrl_c 143)' SIGTERM
822
823function write_exit_code() {{
824 echo "Writing $1 to exit status file"
825 echo "$1" > {self.exit_code_file}
826 exit $1
827}}
828
829function ctrl_c() {{
830 trap '' SIGINT SIGTERM
831 echo "** Trapped Ctrl-C **"
832 echo "$1" > {self.exit_code_file}
833 exit $1
834}}
835# ---"""
836 print(start_wrapper, file=batch_file)
837
838 def _add_wrapper_script_teardown(self, job, batch_file):
839 """
840 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
841 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
842 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
843 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
844 file.
845 """
846 end_wrapper = """# ---
847write_exit_code $?"""
848 print(end_wrapper, file=batch_file)
849
850 @classmethod
852 """
853 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
854 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
855 statuses and allows the use of ready().
856 """
857 raise NotImplementedError
858
860 """
861 Construct the Path object of the bash script file that we will submit. It will contain
862 the actual job command, wrapper commands, setup commands, and any batch directives
863 """
864 return Path(job.working_dir, self.submit_script)
865
866
867class Result():
868 """
869 Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
870 submitted to a backend. It provides a way to query a job's status to find out if it's ready.
871 """
872
873 def __init__(self, job):
874 """
875 Pass in the job object to allow the result to access the job's properties and do post-processing.
876 """
877
878 self.job = job
879
880 self._is_ready = False
881
883 self.time_to_wait_for_exit_code_file = timedelta(minutes=5)
884
886
887 def ready(self):
888 """
889 Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
890 the 'readiness' based on the known job status.
891 """
892 B2DEBUG(29, f"Calling {self.job}.result.ready()")
893 if self._is_ready:
894 return True
895 elif self.job.status in self.job.exit_statuses:
896 self._is_ready = True
897 return True
898 else:
899 return False
900
901 def update_status(self):
902 """
903 Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
904 actually look up the job's status from some database/exit code file.
905 """
906 raise NotImplementedError
907
909 """
910 Read the exit code file to discover the exit status of the job command. Useful fallback if the job is no longer
911 known to the job database (batch system purged it for example). Since some backends may take time to download
912 the output files of the job back to the working directory we use a time limit on how long to wait.
913 """
914 if not self.exit_code_file_initial_time:
915 self.exit_code_file_initial_time = datetime.now()
916 exit_code_path = Path(self.job.working_dir, Backend.exit_code_file)
917 with open(exit_code_path) as f:
918 exit_code = int(f.read().strip())
919 B2DEBUG(29, f"Exit code from file for {self.job} was {exit_code}")
920 return exit_code
921
922
924 """
925 Backend for local processes i.e. on the same machine but in a subprocess.
926
927 Note that you should call the self.join() method to close the pool and wait for any
928 running processes to finish before exiting the process. Once you've called join you will have to set up a new
929 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
930 somewhere, then the main python process might end before your pool is done.
931
932 Keyword Arguments:
933 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
934 It's the maximum simultaneous subjobs.
935 Try not to specify a large number or a number larger than the number of cores.
936 It won't crash the program but it will slow down and negatively impact performance.
937 """
938
939 def __init__(self, *, backend_args=None, max_processes=1):
940 """
941 """
942 super().__init__(backend_args=backend_args)
943
944 self.pool = None
945
946 self.max_processes = max_processes
947
949 """
950 Result class to help monitor status of jobs submitted by Local backend.
951 """
952
953 def __init__(self, job, result):
954 """
955 Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
956 post processing of the job.
957 """
958 super().__init__(job)
959
960 self.result = result
961
963 """
964 Update result status
965 """
966 if self.result.ready() and (self.job.status not in self.job.exit_statuses):
967 return_code = self.result.get()
968 if return_code:
969 self.job.status = "failed"
970 else:
971 self.job.status = "completed"
972
973 def update_status(self):
974 """
975 Update the job's (or subjobs') status by calling the result object.
976 """
977 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
978 if self.job.subjobs:
979 for subjob in self.job.subjobs.values():
980 subjob.result._update_result_status()
981 else:
983
984 def join(self):
985 """
986 Closes and joins the Pool, letting you wait for all results currently
987 still processing.
988 """
989 B2INFO("Joining Process Pool, waiting for results to finish...")
990 self.pool.close()
991 self.pool.join()
992 B2INFO("Process Pool joined.")
993
994 @property
995 def max_processes(self):
996 """
997 Getter for max_processes
998 """
999 return self._max_processes
1000
1001 @max_processes.setter
1002 def max_processes(self, value):
1003 """
1004 Setter for max_processes, we also check for a previous Pool(), wait for it to join
1005 and then create a new one with the new value of max_processes
1006 """
1007
1008 self._max_processes = value
1009 if self.pool:
1010 B2INFO("New max_processes requested. But a pool already exists.")
1011 self.join()
1012 B2INFO(f"Starting up new Pool with {self.max_processes} processes")
1013 self.pool = mp.Pool(processes=self.max_processes)
1014
1015 @method_dispatch
1016 def submit(self, job):
1017 """
1018 """
1019 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1020 "Did you submit a (Sub)Job?")
1021
1022 @submit.register(SubJob)
1023 def _(self, job):
1024 """
1025 Submission of a `SubJob` for the Local backend
1026 """
1027 # Make sure the output directory of the job is created
1028 job.output_dir.mkdir(parents=True, exist_ok=True)
1029 # Make sure the working directory of the job is created
1030 job.working_dir.mkdir(parents=True, exist_ok=True)
1031 job.copy_input_sandbox_files_to_working_dir()
1032 job.dump_input_data()
1033 # Get the path to the bash script we run
1034 script_path = self.get_submit_script_path(job)
1035 with open(script_path, mode="w") as batch_file:
1036 print("#!/bin/bash", file=batch_file)
1037 self._add_wrapper_script_setup(job, batch_file)
1038 self._add_setup(job, batch_file)
1039 print(job.full_command, file=batch_file)
1040 self._add_wrapper_script_teardown(job, batch_file)
1041 B2INFO(f"Submitting {job}")
1042 job.result = Local.LocalResult(job,
1043 self.pool.apply_async(self.run_job,
1044 (job.name,
1045 job.working_dir,
1046 job.output_dir,
1047 script_path)
1048 )
1049 )
1050 job.status = "submitted"
1051 B2INFO(f"{job} submitted")
1052
1053 @submit.register(Job)
1054 def _(self, job):
1055 """
1056 Submission of a `Job` for the Local backend
1057 """
1058 # Make sure the output directory of the job is created
1059 job.output_dir.mkdir(parents=True, exist_ok=True)
1060 # Make sure the working directory of the job is created
1061 job.working_dir.mkdir(parents=True, exist_ok=True)
1062 # Check if we have any valid input files
1063 job.check_input_data_files()
1064
1065 if not job.splitter:
1066 # Get all of the requested files for the input sandbox and copy them to the working directory
1067 job.copy_input_sandbox_files_to_working_dir()
1068 job.dump_input_data()
1069 # Get the path to the bash script we run
1070 script_path = self.get_submit_script_path(job)
1071 with open(script_path, mode="w") as batch_file:
1072 print("#!/bin/bash", file=batch_file)
1073 self._add_wrapper_script_setup(job, batch_file)
1074 self._add_setup(job, batch_file)
1075 print(job.full_command, file=batch_file)
1076 self._add_wrapper_script_teardown(job, batch_file)
1077 B2INFO(f"Submitting {job}")
1078 job.result = Local.LocalResult(job,
1079 self.pool.apply_async(self.run_job,
1080 (job.name,
1081 job.working_dir,
1082 job.output_dir,
1083 script_path)
1084 )
1085 )
1086 B2INFO(f"{job} submitted")
1087 else:
1088 # Create subjobs according to the splitter's logic
1089 job.splitter.create_subjobs(job)
1090 # Submit the subjobs
1091 self.submit(list(job.subjobs.values()))
1092 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1094
1095 @submit.register(list)
1096 def _(self, jobs):
1097 """
1098 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1099 """
1100 # Submit the jobs
1101 for job in jobs:
1102 self.submit(job)
1103 B2INFO("All requested jobs submitted.")
1104
1105 @staticmethod
1106 def run_job(name, working_dir, output_dir, script):
1107 """
1108 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1109 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1110 """
1111 B2INFO(f"Starting Sub-process: {name}")
1112 from subprocess import Popen
1113 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1114 stderr_file_path = Path(working_dir, _STDERR_FILE)
1115 # Create unix command to redirect stdour and stderr
1116 B2INFO(f"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1117 with open(stdout_file_path, mode="w", buffering=1) as f_out, \
1118 open(stderr_file_path, mode="w", buffering=1) as f_err:
1119 with Popen(["/bin/bash", script.as_posix()],
1120 stdout=f_out,
1121 stderr=f_err,
1122 bufsize=1,
1123 universal_newlines=True,
1124 cwd=working_dir,
1125 env={}) as p:
1126 # We block here and wait so that the return code will be set.
1127 p.wait()
1128 B2INFO(f"Subprocess {name} finished.")
1129 return p.returncode
1130
1131 @classmethod
1133 parent.result = cls.LocalResult(parent, None)
1134
1135
1137 """
1138 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1139 in a derived class. Do not use this class directly!
1140 """
1141
1142 submission_cmds = []
1143
1155 default_global_job_limit = 1000
1156
1157 default_sleep_between_submission_checks = 30
1158
1159 def __init__(self, *, backend_args=None):
1160 """
1161 Init method for Batch Backend. Does some basic default setup.
1162 """
1163 super().__init__(backend_args=backend_args)
1164
1167
1170
1171 def _add_batch_directives(self, job, file):
1172 """
1173 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1174 You should think about where the stdout/err should go, and set the queue name.
1175 """
1176 raise NotImplementedError("Need to implement a _add_batch_directives(self, job, file) "
1177 f"method in {self.__class__.__name__} backend.")
1178
1179 def _make_submit_file(self, job, submit_file_path):
1180 """
1181 Useful for the HTCondor backend where a submit is needed instead of batch
1182 directives pasted directly into the submission script. It should be overwritten
1183 if needed.
1184 """
1185
1186 @classmethod
1187 @abstractmethod
1188 def _submit_to_batch(cls, cmd):
1189 """
1190 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1191 """
1192
1193 def can_submit(self, *args, **kwargs):
1194 """
1195 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1196 This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1197
1198 Returns:
1199 bool: If the job submission can continue based on the current situation.
1200 """
1201 return True
1202
1203 @method_dispatch
1204 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1205 """
1206 """
1207 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1208 "Did you submit a (Sub)Job?")
1209
1210 @submit.register(SubJob)
1211 def _(self, job, check_can_submit=True, jobs_per_check=100):
1212 """
1213 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1214 create batch script, and send it off with the batch submission command.
1215 It should apply the correct options (default and user requested).
1216
1217 Should set a Result object as an attribute of the job.
1218 """
1219 # Make sure the output directory of the job is created, commented out due to permission issues
1220 # job.output_dir.mkdir(parents=True, exist_ok=True)
1221 # Make sure the working directory of the job is created
1222 job.working_dir.mkdir(parents=True, exist_ok=True)
1223 job.copy_input_sandbox_files_to_working_dir()
1224 job.dump_input_data()
1225 # Make submission file if needed
1226 batch_submit_script_path = self.get_batch_submit_script_path(job)
1227 self._make_submit_file(job, batch_submit_script_path)
1228 # Get the bash file we will actually run, might be the same file
1229 script_path = self.get_submit_script_path(job)
1230 # Construct the batch submission script (with directives if that is supported)
1231 with open(script_path, mode="w") as batch_file:
1232 self._add_batch_directives(job, batch_file)
1233 self._add_wrapper_script_setup(job, batch_file)
1234 self._add_setup(job, batch_file)
1235 print(job.full_command, file=batch_file)
1236 self._add_wrapper_script_teardown(job, batch_file)
1237 os.chmod(script_path, 0o755)
1238 B2INFO(f"Submitting {job}")
1239 # Do the actual batch submission
1240 cmd = self._create_cmd(batch_submit_script_path)
1241 output = self._submit_to_batch(cmd)
1242 self._create_job_result(job, output)
1243 job.status = "submitted"
1244 B2INFO(f"{job} submitted")
1245
1246 @submit.register(Job)
1247 def _(self, job, check_can_submit=True, jobs_per_check=100):
1248 """
1249 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1250 and send it off with the batch submission command, applying the correct options (default and user requested.)
1251
1252 Should set a Result object as an attribute of the job.
1253 """
1254 # Make sure the output directory of the job is created, commented out due to permissions issue
1255 # job.output_dir.mkdir(parents=True, exist_ok=True)
1256 # Make sure the working directory of the job is created
1257 job.working_dir.mkdir(parents=True, exist_ok=True)
1258 # Check if we have any valid input files
1259 job.check_input_data_files()
1260 # Add any required backend args that are missing (I'm a bit hesitant to actually merge with job.backend_args)
1261 # just in case you want to resubmit the same job with different backend settings later.
1262 # job_backend_args = {**self.backend_args, **job.backend_args}
1263
1264 # If there's no splitter then we just submit the Job with no SubJobs
1265 if not job.splitter:
1266 # Get all of the requested files for the input sandbox and copy them to the working directory
1267 job.copy_input_sandbox_files_to_working_dir()
1268 job.dump_input_data()
1269 # Make submission file if needed
1270 batch_submit_script_path = self.get_batch_submit_script_path(job)
1271 self._make_submit_file(job, batch_submit_script_path)
1272 # Get the bash file we will actually run
1273 script_path = self.get_submit_script_path(job)
1274 # Construct the batch submission script (with directives if that is supported)
1275 with open(script_path, mode="w") as batch_file:
1276 self._add_batch_directives(job, batch_file)
1277 self._add_wrapper_script_setup(job, batch_file)
1278 self._add_setup(job, batch_file)
1279 print(job.full_command, file=batch_file)
1280 self._add_wrapper_script_teardown(job, batch_file)
1281 os.chmod(script_path, 0o755)
1282 B2INFO(f"Submitting {job}")
1283 # Do the actual batch submission
1284 cmd = self._create_cmd(batch_submit_script_path)
1285 output = self._submit_to_batch(cmd)
1286 self._create_job_result(job, output)
1287 job.status = "submitted"
1288 B2INFO(f"{job} submitted")
1289 else:
1290 # Create subjobs according to the splitter's logic
1291 job.splitter.create_subjobs(job)
1292 # Submit the subjobs
1293 self.submit(list(job.subjobs.values()))
1294 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1296
1297 @submit.register(list)
1298 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1299 """
1300 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1301 """
1302 B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1303 # Technically this could be a list of Jobs or SubJobs. And if it is a list of Jobs then it might not
1304 # be necessary to check if we can submit right now. We could do it later during the submission of the
1305 # SubJob list. However in the interest of simpler code we just do the check here, and re-check again
1306 # if a SubJob list comes through this function. Slightly inefficient, but much simpler logic.
1307
1308 # The first thing to do is make sure that we are iterating through the jobs list in chunks that are
1309 # equal to or smaller than the global limit. Otherwise nothing will ever submit.
1310
1311 if jobs_per_check > self.global_job_limit:
1312 B2INFO(f"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1313 f"limit for this backend (={self.global_job_limit}). Will instead use the "
1314 " value of the global job limit.")
1315 jobs_per_check = self.global_job_limit
1316
1317 # We group the jobs list into chunks of length jobs_per_check
1318 for jobs_to_submit in grouper(jobs_per_check, jobs):
1319 # Wait until we are allowed to submit
1320 while not self.can_submit(njobs=len(jobs_to_submit)):
1321 B2INFO("Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1322 time.sleep(self.sleep_between_submission_checks)
1323 else:
1324 # We loop here since we have already checked if the number of jobs is low enough, we don't want to hit this
1325 # function again unless one of the jobs has subjobs.
1326 B2INFO(f"Submitting the next {len(jobs_to_submit)} jobs...")
1327 for job in jobs_to_submit:
1328 self.submit(job, check_can_submit, jobs_per_check)
1329 B2INFO(f"All {len(jobs)} requested jobs submitted")
1330
1332 """
1333 Construct the Path object of the script file that we will submit using the batch command.
1334 For most batch backends this is the same script as the bash script we submit.
1335 But for some they require a separate submission file that describes the job.
1336 To implement that you can implement this function in the Backend class.
1337 """
1338 return Path(job.working_dir, self.submit_script)
1339
1340 @classmethod
1341 @abstractmethod
1342 def _create_job_result(cls, job, batch_output):
1343 """
1344 """
1345
1346 @abstractmethod
1347 def _create_cmd(self, job):
1348 """
1349 """
1350
1351
1352class PBS(Batch):
1353 """
1354 Backend for submitting calibration processes to a qsub batch system.
1355 """
1356
1357 cmd_wkdir = "#PBS -d"
1358
1359 cmd_stdout = "#PBS -o"
1360
1361 cmd_stderr = "#PBS -e"
1362
1363 cmd_queue = "#PBS -q"
1364
1365 cmd_name = "#PBS -N"
1366
1367 submission_cmds = ["qsub"]
1368
1369 default_global_job_limit = 5000
1370
1371 default_backend_args = {"queue": "short"}
1372
1373 def __init__(self, *, backend_args=None):
1374 """
1375 """
1376 super().__init__(backend_args=backend_args)
1377
1378 def _add_batch_directives(self, job, batch_file):
1379 """
1380 Add PBS directives to submitted script.
1381 """
1382 job_backend_args = {**self.backend_args, **job.backend_args}
1383 batch_queue = job_backend_args["queue"]
1384 print("#!/bin/bash", file=batch_file)
1385 print("# --- Start PBS ---", file=batch_file)
1386 print(" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1387 print(" ".join([PBS.cmd_name, job.name]), file=batch_file)
1388 print(" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1389 print(" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1390 print(" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1391 print("# --- End PBS ---", file=batch_file)
1392
1393 @classmethod
1394 def _create_job_result(cls, job, batch_output):
1395 """
1396 """
1397 job_id = batch_output.replace("\n", "")
1398 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1399 job.result = cls.PBSResult(job, job_id)
1400
1401 def _create_cmd(self, script_path):
1402 """
1403 """
1404 submission_cmd = self.submission_cmds[:]
1405 submission_cmd.append(script_path.as_posix())
1406 return submission_cmd
1407
1408 @classmethod
1409 def _submit_to_batch(cls, cmd):
1410 """
1411 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1412 """
1413 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
1414 return sub_out
1415
1416 @classmethod
1418 parent.result = cls.PBSResult(parent, None)
1419
1421 """
1422 Simple class to help monitor status of jobs submitted by `PBS` Backend.
1423
1424 You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1425 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1426 """
1427
1428
1429 backend_code_to_status = {"R": "running",
1430 "C": "completed",
1431 "FINISHED": "completed",
1432 "E": "failed",
1433 "H": "submitted",
1434 "Q": "submitted",
1435 "T": "submitted",
1436 "W": "submitted",
1437 "H": "submitted"
1438 }
1439
1440 def __init__(self, job, job_id):
1441 """
1442 Pass in the job object and the job id to allow the result to do monitoring and perform
1443 post processing of the job.
1444 """
1445 super().__init__(job)
1446
1447 self.job_id = job_id
1448
1449 def update_status(self):
1450 """
1451 Update the job's (or subjobs') status by calling qstat.
1452 """
1453 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
1454 # Get all jobs info and reuse it for each status update to minimise tie spent on this updating.
1455 qstat_output = PBS.qstat()
1456 if self.job.subjobs:
1457 for subjob in self.job.subjobs.values():
1458 subjob.result._update_result_status(qstat_output)
1459 else:
1460 self._update_result_status(qstat_output)
1461
1462 def _update_result_status(self, qstat_output):
1463 """
1464 Parameters:
1465 qstat_output (dict): The JSON output of a previous call to qstat which we can reuse to find the
1466 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1467 'job_state' information, otherwise it is useless.
1468
1469 """
1470 try:
1471 backend_status = self._get_status_from_output(qstat_output)
1472 except KeyError:
1473 # If this happens then maybe the job id wasn't in the qstat_output argument because it finished.
1474 # Instead of failing immediately we try looking for the exit code file and then fail if it still isn't there.
1475 B2DEBUG(29, f"Checking of the exit code from file for {self.job}")
1476 try:
1477 exit_code = self.get_exit_code_from_file()
1478 except FileNotFoundError:
1479 waiting_time = datetime.now() - self.exit_code_file_initial_time
1480 if self.time_to_wait_for_exit_code_file > waiting_time:
1481 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1482 exit_code = 1
1483 else:
1484 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1485 return
1486 if exit_code:
1487 backend_status = "E"
1488 else:
1489 backend_status = "C"
1490
1491 try:
1492 new_job_status = self.backend_code_to_status[backend_status]
1493 except KeyError as err:
1494 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1495
1496 if new_job_status != self.job.status:
1497 self.job.status = new_job_status
1498
1499 def _get_status_from_output(self, output):
1500 """
1501 Get status from output
1502 """
1503 for job_info in output["JOBS"]:
1504 if job_info["Job_Id"] == self.job_id:
1505 return job_info["job_state"]
1506 else:
1507 raise KeyError
1508
1509 def can_submit(self, njobs=1):
1510 """
1511 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1512 Returns True if the number is lower that the limit, False if it is higher.
1513
1514 Parameters:
1515 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1516 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1517 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1518 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1519 and check again before submitting more.
1520 """
1521 B2DEBUG(29, "Calling PBS().can_submit()")
1522 job_info = self.qstat(username=os.environ["USER"])
1523 total_jobs = job_info["NJOBS"]
1524 B2INFO(f"Total jobs active in the PBS system is currently {total_jobs}")
1525 if (total_jobs + njobs) > self.global_job_limit:
1526 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1527 return False
1528 else:
1529 B2INFO("There is enough space to submit more jobs.")
1530 return True
1531
1532 @classmethod
1533 def qstat(cls, username="", job_ids=None):
1534 """
1535 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1536 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1537 by qstat.
1538
1539 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1540 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1541 This one should work for Melbourne's cloud computing centre.
1542
1543 Keyword Args:
1544 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1545 will be in the output dictionary.
1546 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1547 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1548 matching the other filters will be returned.
1549
1550 Returns:
1551 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1552
1553 .. code-block:: python
1554
1555 {
1556 "NJOBS": int
1557 "JOBS":[
1558 {
1559 <key: value>, ...
1560 }, ...
1561 ]
1562 }
1563 """
1564 B2DEBUG(29, f"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1565 if not job_ids:
1566 job_ids = []
1567 job_ids = set(job_ids)
1568 cmd_list = ["qstat", "-x"]
1569 # We get an XML serialisable summary from qstat. Requires the shell argument.
1570 cmd = " ".join(cmd_list)
1571 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1572 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1573 jobs_dict = {"NJOBS": 0, "JOBS": []}
1574 jobs_xml = ET.fromstring(output)
1575
1576 # For a specific job_id we can be a bit more efficient in XML parsing
1577 if len(job_ids) == 1:
1578 job_elem = jobs_xml.find(f"./Job[Job_Id='{list(job_ids)[0]}']")
1579 if job_elem:
1580 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job_elem))
1581 jobs_dict["NJOBS"] = 1
1582 return jobs_dict
1583
1584 # Since the username given is not exactly the same as the one that PBS stores (<username>@host)
1585 # we have to simply loop through rather than using XPATH.
1586 for job in jobs_xml.iterfind("Job"):
1587 job_owner = job.find("Job_Owner").text.split("@")[0]
1588 if username and username != job_owner:
1589 continue
1590 job_id = job.find("Job_Id").text
1591 if job_ids and job_id not in job_ids:
1592 continue
1593 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job))
1594 jobs_dict["NJOBS"] += 1
1595 # Remove it so that we don't keep checking for it
1596 if job_id in job_ids:
1597 job_ids.remove(job_id)
1598 return jobs_dict
1599
1600 @staticmethod
1602 """
1603 Creates a Job dictionary with various job information from the XML element returned by qstat.
1604
1605 Parameters:
1606 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1607
1608 Returns:
1609 dict: JSON serialisable dictionary of the Job information we are interested in.
1610 """
1611 job_dict = {}
1612 job_dict["Job_Id"] = job_elem.find("Job_Id").text
1613 job_dict["Job_Name"] = job_elem.find("Job_Name").text
1614 job_dict["Job_Owner"] = job_elem.find("Job_Owner").text
1615 job_dict["job_state"] = job_elem.find("job_state").text
1616 job_dict["queue"] = job_elem.find("queue").text
1617 return job_dict
1618
1619
1620class LSF(Batch):
1621 """
1622 Backend for submitting calibration processes to a qsub batch system.
1623 """
1624
1625 cmd_wkdir = "#BSUB -cwd"
1626
1627 cmd_stdout = "#BSUB -o"
1628
1629 cmd_stderr = "#BSUB -e"
1630
1631 cmd_queue = "#BSUB -q"
1632
1633 cmd_name = "#BSUB -J"
1634
1635 submission_cmds = ["bsub", "-env", "\"none\"", "<"]
1636
1637 default_global_job_limit = 15000
1638
1639 default_backend_args = {"queue": "s"}
1640
1641 def __init__(self, *, backend_args=None):
1642 """
1643 """
1644 super().__init__(backend_args=backend_args)
1645
1646 def _add_batch_directives(self, job, batch_file):
1647 """
1648 Adds LSF BSUB directives for the job to a script.
1649 """
1650 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1651 batch_queue = job_backend_args["queue"]
1652 print("#!/bin/bash", file=batch_file)
1653 print("# --- Start LSF ---", file=batch_file)
1654 print(" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1655 print(" ".join([LSF.cmd_name, job.name]), file=batch_file)
1656 print(" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1657 print(" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1658 print(" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1659 print("# --- End LSF ---", file=batch_file)
1660
1661 def _create_cmd(self, script_path):
1662 """
1663 """
1664 submission_cmd = self.submission_cmds[:]
1665 submission_cmd.append(script_path.as_posix())
1666 submission_cmd = " ".join(submission_cmd)
1667 return [submission_cmd]
1668
1669 @classmethod
1670 def _submit_to_batch(cls, cmd):
1671 """
1672 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1673 """
1674 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1675 return sub_out
1676
1678 """
1679 Simple class to help monitor status of jobs submitted by LSF Backend.
1680
1681 You pass in a `Job` object and job id from a bsub command.
1682 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1683 """
1684
1685
1686 backend_code_to_status = {"RUN": "running",
1687 "DONE": "completed",
1688 "FINISHED": "completed",
1689 "EXIT": "failed",
1690 "PEND": "submitted"
1691 }
1692
1693 def __init__(self, job, job_id):
1694 """
1695 Pass in the job object and the job id to allow the result to do monitoring and perform
1696 post processing of the job.
1697 """
1698 super().__init__(job)
1699
1700 self.job_id = job_id
1701
1702 def update_status(self):
1703 """
1704 Update the job's (or subjobs') status by calling bjobs.
1705 """
1706 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
1707 # Get all jobs info and reuse it for each status update to minimise tie spent on this updating.
1708 bjobs_output = LSF.bjobs(output_fields=["stat", "id"])
1709 if self.job.subjobs:
1710 for subjob in self.job.subjobs.values():
1711 subjob.result._update_result_status(bjobs_output)
1712 else:
1713 self._update_result_status(bjobs_output)
1714
1715 def _update_result_status(self, bjobs_output):
1716 """
1717 Parameters:
1718 bjobs_output (dict): The JSON output of a previous call to bjobs which we can reuse to find the
1719 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1720 'id' information, otherwise it is useless.
1721
1722 """
1723 try:
1724 backend_status = self._get_status_from_output(bjobs_output)
1725 except KeyError:
1726 # If this happens then maybe the job id wasn't in the bjobs_output argument because it finished.
1727 # Instead of failing immediately we try re-running bjobs here explicitly and then fail if it still isn't there.
1728 bjobs_output = LSF.bjobs(output_fields=["stat", "id"], job_id=str(self.job_id))
1729 try:
1730 backend_status = self._get_status_from_output(bjobs_output)
1731 except KeyError:
1732 # If this happened, maybe we're looking at an old finished job. We could fall back to bhist, but it's
1733 # slow and terrible. Instead let's try looking for the exit code file.
1734 try:
1735 exit_code = self.get_exit_code_from_file()
1736 except FileNotFoundError:
1737 waiting_time = datetime.now() - self.exit_code_file_initial_time
1738 if self.time_to_wait_for_exit_code_file > waiting_time:
1739 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1740 exit_code = 1
1741 else:
1742 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1743 return
1744 if exit_code:
1745 backend_status = "EXIT"
1746 else:
1747 backend_status = "FINISHED"
1748 try:
1749 new_job_status = self.backend_code_to_status[backend_status]
1750 except KeyError as err:
1751 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1752
1753 if new_job_status != self.job.status:
1754 self.job.status = new_job_status
1755
1756 def _get_status_from_output(self, output):
1757 """
1758 Get status from output
1759 """
1760 if output["JOBS"] and "ERROR" in output["JOBS"][0]:
1761 if output["JOBS"][0]["ERROR"] == f"Job <{self.job_id}> is not found":
1762 raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1763 else:
1764 raise BackendError(f"Unidentified Error during status check for {self.job}: {output}")
1765 else:
1766 for job_info in output["JOBS"]:
1767 if job_info["JOBID"] == self.job_id:
1768 return job_info["STAT"]
1769 else:
1770 raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1771
1772 @classmethod
1774 parent.result = cls.LSFResult(parent, None)
1775
1776 @classmethod
1777 def _create_job_result(cls, job, batch_output):
1778 """
1779 """
1780 m = re.search(r"Job <(\d+)>", str(batch_output))
1781 if m:
1782 job_id = m.group(1)
1783 else:
1784 raise BackendError(f"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1785
1786 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1787 job.result = cls.LSFResult(job, job_id)
1788
1789 def can_submit(self, njobs=1):
1790 """
1791 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1792 Returns True if the number is lower that the limit, False if it is higher.
1793
1794 Parameters:
1795 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1796 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1797 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1798 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1799 and check again before submitting more.
1800 """
1801 B2DEBUG(29, "Calling LSF().can_submit()")
1802 job_info = self.bjobs(output_fields=["stat"])
1803 total_jobs = job_info["NJOBS"]
1804 B2INFO(f"Total jobs active in the LSF system is currently {total_jobs}")
1805 if (total_jobs + njobs) > self.global_job_limit:
1806 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1807 return False
1808 else:
1809 B2INFO("There is enough space to submit more jobs.")
1810 return True
1811
1812 @classmethod
1813 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1814 """
1815 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1816 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1817
1818 Parameters:
1819 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1820 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1821 the output of this function will be only information about this job. If this argument is not given, then all jobs
1822 matching the other filters will be returned.
1823 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1824 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1825 receive job information from all known user jobs matching the other filters.
1826 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1827
1828 Returns:
1829 dict: JSON dictionary of the form:
1830
1831 .. code-block:: python
1832
1833 {
1834 "NJOBS":<njobs returned by command>,
1835 "JOBS":[
1836 {
1837 <output field: value>, ...
1838 }, ...
1839 ]
1840 }
1841 """
1842 B2DEBUG(29, f"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1843 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1844 if not output_fields:
1845 output_fields = ["id"]
1846 # Output fields should be space separated but in a string.
1847 field_list_cmd = "\""
1848 field_list_cmd += " ".join(output_fields)
1849 field_list_cmd += "\""
1850 cmd_list = ["bjobs", "-o", field_list_cmd]
1851 # If the queue name is set then we add to the command options
1852 if queue:
1853 cmd_list.extend(["-q", queue])
1854 # If the username is set then we add to the command options
1855 if username:
1856 cmd_list.extend(["-u", username])
1857 # Can now add the json option before the final positional argument (if used)
1858 cmd_list.append("-json")
1859 # If the job id is set then we add to the end of the command
1860 if job_id:
1861 cmd_list.append(job_id)
1862 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1863 cmd = " ".join(cmd_list)
1864 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1865 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True))
1866 output["NJOBS"] = output["JOBS"]
1867 output["JOBS"] = output["RECORDS"]
1868 del output["RECORDS"]
1869 del output["COMMAND"]
1870 return output
1871
1872 @classmethod
1873 def bqueues(cls, output_fields=None, queues=None):
1874 """
1875 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1876 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1877
1878 Parameters:
1879 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1880 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1881 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1882 By default you will receive information about all queues.
1883
1884 Returns:
1885 dict: JSON dictionary of the form:
1886
1887 .. code-block:: python
1888
1889 {
1890 "COMMAND":"bqueues",
1891 "QUEUES":46,
1892 "RECORDS":[
1893 {
1894 "QUEUE_NAME":"b2_beast",
1895 "STATUS":"Open:Active",
1896 "MAX":"200",
1897 "NJOBS":"0",
1898 "PEND":"0",
1899 "RUN":"0"
1900 }, ...
1901 }
1902 """
1903 B2DEBUG(29, f"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1904 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1905 if not output_fields:
1906 output_fields = ["queue_name", "status", "max", "njobs", "pend", "run"]
1907 # Output fields should be space separated but in a string.
1908 field_list_cmd = "\""
1909 field_list_cmd += " ".join(output_fields)
1910 field_list_cmd += "\""
1911 cmd_list = ["bqueues", "-o", field_list_cmd]
1912 # Can now add the json option before the final positional argument (if used)
1913 cmd_list.append("-json")
1914 # If the queue name is set then we add to the end of the command
1915 if queues:
1916 cmd_list.extend(queues)
1917 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1918 cmd = " ".join(cmd_list)
1919 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1920 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1921 return decode_json_string(output)
1922
1923
1925 """
1926 Backend for submitting calibration processes to a HTCondor batch system.
1927 """
1928
1929 batch_submit_script = "submit.sub"
1930
1931 submission_cmds = ["condor_submit", "-terse"]
1932
1933 default_global_job_limit = 10000
1934
1935 default_backend_args = {
1936 "universe": "vanilla",
1937 "getenv": "false",
1938 "request_memory": "4 GB", # We set the default requested memory to 4 GB to maintain parity with KEKCC
1939 "path_prefix": "", # Path prefix for file path
1940 "extra_lines": [] # These should be other HTCondor submit script lines like 'request_cpus = 2'
1941 }
1942
1943 default_class_ads = ["GlobalJobId", "JobStatus", "Owner"]
1944
1945 def _make_submit_file(self, job, submit_file_path):
1946 """
1947 Fill HTCondor submission file.
1948 """
1949 # Find all files/directories in the working directory to copy on the worker node
1950
1951 files_to_transfer = [i.as_posix() for i in job.working_dir.iterdir()]
1952
1953 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1954
1955 with open(submit_file_path, "w") as submit_file:
1956 print(f'executable = {self.get_submit_script_path(job)}', file=submit_file)
1957 print(f'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1958 print(f'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1959 print(f'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1960 print('transfer_input_files = ', ','.join(files_to_transfer), file=submit_file)
1961 print(f'universe = {job_backend_args["universe"]}', file=submit_file)
1962 print(f'getenv = {job_backend_args["getenv"]}', file=submit_file)
1963 print(f'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1964 print('should_transfer_files = Yes', file=submit_file)
1965 print('when_to_transfer_output = ON_EXIT', file=submit_file)
1966 # Any other lines in the backend args that we don't deal with explicitly but maybe someone wants to insert something
1967 for line in job_backend_args["extra_lines"]:
1968 print(line, file=submit_file)
1969 print('queue', file=submit_file)
1970
1971 def _add_batch_directives(self, job, batch_file):
1972 """
1973 For HTCondor leave empty as the directives are already included in the submit file.
1974 """
1975 print('#!/bin/bash', file=batch_file)
1976
1977 def _create_cmd(self, script_path):
1978 """
1979 """
1980 submission_cmd = self.submission_cmds[:]
1981 submission_cmd.append(script_path.as_posix())
1982 return submission_cmd
1983
1985 """
1986 Construct the Path object of the .sub file that we will use to describe the job.
1987 """
1988 return Path(job.working_dir, self.batch_submit_script)
1989
1990 @classmethod
1991 def _submit_to_batch(cls, cmd):
1992 """
1993 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1994 """
1995 job_dir = Path(cmd[-1]).parent.as_posix()
1996 sub_out = ""
1997 attempt = 0
1998 sleep_time = 30
1999
2000 while attempt < 3:
2001 try:
2002 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, cwd=job_dir)
2003 break
2004 except subprocess.CalledProcessError as e:
2005 attempt += 1
2006 if attempt == 3:
2007 B2ERROR(f"Error during condor_submit: {str(e)} occurred more than 3 times.")
2008 raise e
2009 else:
2010 B2ERROR(f"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
2011 time.sleep(30)
2012 return re.search(r"(\d+\.\d+) - \d+\.\d+", sub_out).groups()[0]
2013
2015 """
2016 Simple class to help monitor status of jobs submitted by HTCondor Backend.
2017
2018 You pass in a `Job` object and job id from a condor_submit command.
2019 When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
2020 to see whether or not the job has finished.
2021 """
2022
2023
2024 backend_code_to_status = {0: "submitted",
2025 1: "submitted",
2026 2: "running",
2027 3: "failed",
2028 4: "completed",
2029 5: "submitted",
2030 6: "failed"
2031 }
2032
2033 def __init__(self, job, job_id):
2034 """
2035 Pass in the job object and the job id to allow the result to do monitoring and perform
2036 post processing of the job.
2037 """
2038 super().__init__(job)
2039
2040 self.job_id = job_id
2041
2042 def update_status(self):
2043 """
2044 Update the job's (or subjobs') status by calling condor_q.
2045 """
2046 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
2047 # Get all jobs info and reuse it for each status update to minimise tie spent on this updating.
2048 condor_q_output = HTCondor.condor_q()
2049 if self.job.subjobs:
2050 for subjob in self.job.subjobs.values():
2051 subjob.result._update_result_status(condor_q_output)
2052 else:
2053 self._update_result_status(condor_q_output)
2054
2055 def _update_result_status(self, condor_q_output):
2056 """
2057 In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2058 If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2059 ``condor_history``, etc.
2060
2061 Parameters:
2062 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can reuse to find the
2063 status of this job if it was active when that command ran.
2064 """
2065 B2DEBUG(29, f"Calling {self.job}.result._update_result_status()")
2066 jobs_info = []
2067 for job_record in condor_q_output["JOBS"]:
2068 job_id = job_record["GlobalJobId"].split("#")[1]
2069 if job_id == self.job_id:
2070 B2DEBUG(29, f"Found {self.job_id} in condor_q_output.")
2071 jobs_info.append(job_record)
2072
2073 # Let's look for the exit code file where we expect it
2074 if not jobs_info:
2075 try:
2076 exit_code = self.get_exit_code_from_file()
2077 except FileNotFoundError:
2078 waiting_time = datetime.now() - self.exit_code_file_initial_time
2079 if self.time_to_wait_for_exit_code_file > waiting_time:
2080 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2081 exit_code = 1
2082 else:
2083 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
2084 return
2085 if exit_code:
2086 jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2087 else:
2088 jobs_info = [{"JobStatus": 4, "HoldReason": None}] # Set to completed
2089
2090 # If this job wasn't in the passed in condor_q output, let's try our own with the specific job_id
2091 if not jobs_info:
2092 jobs_info = HTCondor.condor_q(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2093
2094 # If no job information is returned then the job already left the queue
2095 # check in the history to see if it succeeded or failed
2096 if not jobs_info:
2097 try:
2098 jobs_info = HTCondor.condor_history(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2099 except KeyError:
2100 hold_reason = "No Reason Known"
2101
2102 # Still no record of it after waiting for the exit code file?
2103 if not jobs_info:
2104 jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2105
2106 job_info = jobs_info[0]
2107 backend_status = job_info["JobStatus"]
2108 # if job is held (backend_status = 5) then report why then keep waiting
2109 if backend_status == 5:
2110 hold_reason = job_info.get("HoldReason", None)
2111 B2WARNING(f"{self.job} on hold because of {hold_reason}. Keep waiting.")
2112 backend_status = 2
2113 try:
2114 new_job_status = self.backend_code_to_status[backend_status]
2115 except KeyError as err:
2116 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
2117 if new_job_status != self.job.status:
2118 self.job.status = new_job_status
2119
2120 @classmethod
2121 def _create_job_result(cls, job, job_id):
2122 """
2123 """
2124 B2INFO(f"Job ID of {job} recorded as: {job_id}")
2125 job.result = cls.HTCondorResult(job, job_id)
2126
2127 @classmethod
2129 parent.result = cls.HTCondorResult(parent, None)
2130
2131 def can_submit(self, njobs=1):
2132 """
2133 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2134 Returns True if the number is lower that the limit, False if it is higher.
2135
2136 Parameters:
2137 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2138 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2139 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2140 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2141 and check again before submitting more.
2142 """
2143 B2DEBUG(29, "Calling HTCondor().can_submit()")
2144 jobs_info = self.condor_q()
2145 total_jobs = jobs_info["NJOBS"]
2146 B2INFO(f"Total jobs active in the HTCondor system is currently {total_jobs}")
2147 if (total_jobs + njobs) > self.global_job_limit:
2148 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2149 return False
2150 else:
2151 B2INFO("There is enough space to submit more jobs.")
2152 return True
2153
2154 @classmethod
2155 def condor_q(cls, class_ads=None, job_id="", username=""):
2156 """
2157 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2158 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2159 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2160
2161 Parameters:
2162 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2163 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2164 by the condor_q call.
2165 job_id (str): String representation of the Job ID given by condor_submit during submission.
2166 If this argument is given then the output of this function will be only information about this job.
2167 If this argument is not given, then all jobs matching the other filters will be returned.
2168 username (str): By default we return information about only the current user's jobs. By giving
2169 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2170 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2171 so it isn't recommended.
2172
2173 Returns:
2174 dict: JSON dictionary of the form:
2175
2176 .. code-block:: python
2177
2178 {
2179 "NJOBS":<number of records returned by command>,
2180 "JOBS":[
2181 {
2182 <ClassAd: value>, ...
2183 }, ...
2184 ]
2185 }
2186 """
2187 B2DEBUG(29, f"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2188 if not class_ads:
2189 class_ads = cls.default_class_ads
2190 # Output fields should be comma separated.
2191 field_list_cmd = ",".join(class_ads)
2192 cmd_list = ["condor_q", "-json", "-attributes", field_list_cmd]
2193 # If job_id is set then we ignore all other filters
2194 if job_id:
2195 cmd_list.append(job_id)
2196 else:
2197 if not username:
2198 username = os.environ["USER"]
2199 # If the username is set to all it is a special case
2200 if username == "all":
2201 cmd_list.append("-allusers")
2202 else:
2203 cmd_list.append(username)
2204 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2205 cmd = " ".join(cmd_list)
2206 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2207 # condor_q occasionally fails
2208 try:
2209 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2210 except BaseException:
2211 records = None
2212
2213 if records:
2214 records = decode_json_string(records)
2215 else:
2216 records = []
2217 jobs_info = {"JOBS": records}
2218 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2219 return jobs_info
2220
2221 @classmethod
2222 def condor_history(cls, class_ads=None, job_id="", username=""):
2223 """
2224 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2225 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2226 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2227
2228 Parameters:
2229 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2230 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2231 by the condor_q call.
2232 job_id (str): String representation of the Job ID given by condor_submit during submission.
2233 If this argument is given then the output of this function will be only information about this job.
2234 If this argument is not given, then all jobs matching the other filters will be returned.
2235 username (str): By default we return information about only the current user's jobs. By giving
2236 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2237 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2238 and isn't recommended.
2239
2240 Returns:
2241 dict: JSON dictionary of the form:
2242
2243 .. code-block:: python
2244
2245 {
2246 "NJOBS":<number of records returned by command>,
2247 "JOBS":[
2248 {
2249 <ClassAd: value>, ...
2250 }, ...
2251 ]
2252 }
2253 """
2254 B2DEBUG(29, f"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2255 if not class_ads:
2256 class_ads = cls.default_class_ads
2257 # Output fields should be comma separated.
2258 field_list_cmd = ",".join(class_ads)
2259 cmd_list = ["condor_history", "-json", "-attributes", field_list_cmd]
2260 # If job_id is set then we ignore all other filters
2261 if job_id:
2262 cmd_list.append(job_id)
2263 else:
2264 if not username:
2265 username = os.environ["USER"]
2266 # If the username is set to all it is a special case
2267 if username != "all":
2268 cmd_list.append(username)
2269 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2270 cmd = " ".join(cmd_list)
2271 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2272 try:
2273 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2274 except BaseException:
2275 records = None
2276
2277 if records:
2278 records = decode_json_string(records)
2279 else:
2280 records = []
2281
2282 jobs_info = {"JOBS": records}
2283 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2284 return jobs_info
2285
2286
2288 """
2289 Backend for submitting calibration processes to the grid.
2290 """
2291
2292
2293class BackendError(Exception):
2294 """
2295 Base exception class for Backend classes.
2296 """
2297
2298
2299class JobError(Exception):
2300 """
2301 Base exception class for Job objects.
2302 """
2303
2304
2305class SplitterError(Exception):
2306 """
2307 Base exception class for SubjobSplitter objects.
2308 """
generator_function
Generator function that has not been 'primed'.
Definition backends.py:104
args
Positional argument tuple used to 'prime' the ArgumentsGenerator.generator_function.
Definition backends.py:106
kwargs
Keyword argument dictionary used to 'prime' the ArgumentsGenerator.generator_function.
Definition backends.py:108
__init__(self, generator_function, *args, **kwargs)
Definition backends.py:94
max_subjobs
If we try to create more than this many subjobs we throw an exception, if None then there is no maxim...
Definition backends.py:306
__init__(self, *, arguments_generator=None, max_subjobs=None)
Definition backends.py:301
_create_parent_job_result(cls, parent)
Definition backends.py:851
dict backend_args
The backend args that will be applied to jobs unless the job specifies them itself.
Definition backends.py:796
_add_wrapper_script_teardown(self, job, batch_file)
Definition backends.py:838
_add_wrapper_script_setup(self, job, batch_file)
Definition backends.py:813
dict default_backend_args
Default backend_args.
Definition backends.py:788
submit(self, job)
Definition backends.py:799
str submit_script
Default submission script name.
Definition backends.py:784
get_submit_script_path(self, job)
Definition backends.py:859
__init__(self, *, backend_args=None)
Definition backends.py:790
_add_setup(job, batch_file)
Definition backends.py:806
int default_global_job_limit
Default global limit on the total number of submitted/running jobs that the user can have.
Definition backends.py:1155
int sleep_between_submission_checks
Seconds we wait before checking if we can submit a list of jobs.
Definition backends.py:1169
_submit_to_batch(cls, cmd)
Definition backends.py:1188
get_batch_submit_script_path(self, job)
Definition backends.py:1331
list submission_cmds
Shell command to submit a script, should be implemented in the derived class.
Definition backends.py:1142
_make_submit_file(self, job, submit_file_path)
Definition backends.py:1179
int global_job_limit
The active job limit.
Definition backends.py:1166
_(self, job, check_can_submit=True, jobs_per_check=100)
Definition backends.py:1211
submit(self, job, check_can_submit=True, jobs_per_check=100)
Definition backends.py:1204
can_submit(self, *args, **kwargs)
Definition backends.py:1193
int default_sleep_between_submission_checks
Default time betweeon re-checking if the active jobs is below the global job limit.
Definition backends.py:1157
_add_batch_directives(self, job, file)
Definition backends.py:1171
__init__(self, *, backend_args=None)
Definition backends.py:1159
_create_cmd(self, job)
Definition backends.py:1347
_create_job_result(cls, job, batch_output)
Definition backends.py:1342
_update_result_status(self, condor_q_output)
Definition backends.py:2055
job_id
job id given by HTCondor
Definition backends.py:2040
__init__(self, job, job_id)
Definition backends.py:2033
dict backend_code_to_status
HTCondor statuses mapped to Job statuses.
Definition backends.py:2024
_create_parent_job_result(cls, parent)
Definition backends.py:2128
_submit_to_batch(cls, cmd)
Definition backends.py:1991
get_batch_submit_script_path(self, job)
Definition backends.py:1984
_create_cmd(self, script_path)
Definition backends.py:1977
_add_batch_directives(self, job, batch_file)
Definition backends.py:1971
can_submit(self, njobs=1)
Definition backends.py:2131
_make_submit_file(self, job, submit_file_path)
Definition backends.py:1945
_create_job_result(cls, job, job_id)
Definition backends.py:2121
condor_q(cls, class_ads=None, job_id="", username="")
Definition backends.py:2155
condor_history(cls, class_ads=None, job_id="", username="")
Definition backends.py:2222
list default_class_ads
Default ClassAd attributes to return from commands like condor_q.
Definition backends.py:1943
str batch_submit_script
HTCondor batch script (different to the wrapper script of Backend.submit_script)
Definition backends.py:1929
status
Not a real attribute, it's a property.
Definition backends.py:459
_get_overall_status_from_subjobs(self)
Definition backends.py:462
check_input_data_files(self)
Definition backends.py:597
list input_sandbox_files
Files to be copied directly into the working directory (pathlib.Path).
Definition backends.py:365
list input_files
Input files to job (str), a list of these is copied to the working directory.
Definition backends.py:377
dump_input_data(self)
Definition backends.py:578
dict backend_args
Config dictionary for the backend to use when submitting the job.
Definition backends.py:382
full_command(self)
Definition backends.py:627
__init__(self, name, job_dict=None)
Definition backends.py:354
working_dir
Working directory of the job (pathlib.Path).
Definition backends.py:367
dict statuses
Allowed Job status dictionary.
Definition backends.py:349
update_status(self)
Definition backends.py:423
append_current_basf2_setup_cmds(self)
Definition backends.py:639
__repr__(self)
Definition backends.py:405
dump_to_json(self, file_path)
Definition backends.py:537
list args
The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own ar...
Definition backends.py:375
list setup_cmds
Bash commands to run before the main self.cmd (mainly used for batch system setup)
Definition backends.py:379
str _status
The actual status of the overall Job.
Definition backends.py:403
list cmd
Command and arguments as a list that will be run by the job on the backend.
Definition backends.py:373
result
The result object of this Job.
Definition backends.py:401
list output_patterns
Files that we produce during the job and want to be returned.
Definition backends.py:371
create_subjob(self, i, input_files=None, args=None)
Definition backends.py:434
dict subjobs
dict of subjobs assigned to this job
Definition backends.py:384
splitter
The SubjobSplitter used to create subjobs if necessary.
Definition backends.py:360
name
Job object's name.
Definition backends.py:358
copy_input_sandbox_files_to_working_dir(self)
Definition backends.py:586
ready(self)
Definition backends.py:411
from_json(cls, file_path)
Definition backends.py:550
output_dir
Output directory (pathlib.Path), where we will download our output_files to.
Definition backends.py:369
job_dict(self)
Definition backends.py:558
job_id
job id given by LSF
Definition backends.py:1700
_get_status_from_output(self, output)
Definition backends.py:1756
_update_result_status(self, bjobs_output)
Definition backends.py:1715
__init__(self, job, job_id)
Definition backends.py:1693
dict backend_code_to_status
LSF statuses mapped to Job statuses.
Definition backends.py:1686
_create_parent_job_result(cls, parent)
Definition backends.py:1773
_submit_to_batch(cls, cmd)
Definition backends.py:1670
_create_cmd(self, script_path)
Definition backends.py:1661
_add_batch_directives(self, job, batch_file)
Definition backends.py:1646
can_submit(self, njobs=1)
Definition backends.py:1789
bqueues(cls, output_fields=None, queues=None)
Definition backends.py:1873
__init__(self, *, backend_args=None)
Definition backends.py:1641
_create_job_result(cls, job, batch_output)
Definition backends.py:1777
bjobs(cls, output_fields=None, job_id="", username="", queue="")
Definition backends.py:1813
__init__(self, job, result)
Definition backends.py:953
result
The underlying result from the backend.
Definition backends.py:960
_create_parent_job_result(cls, parent)
Definition backends.py:1132
pool
The actual Pool object of this instance of the Backend.
Definition backends.py:944
_max_processes
Internal attribute of max_processes.
Definition backends.py:1008
submit(self, job)
Definition backends.py:1016
max_processes
The size of the multiprocessing process pool.
Definition backends.py:946
_(self, job)
Definition backends.py:1023
__init__(self, *, backend_args=None, max_processes=1)
Definition backends.py:939
run_job(name, working_dir, output_dir, script)
Definition backends.py:1106
__init__(self, *, arguments_generator=None, max_files_per_subjob=1)
Definition backends.py:214
max_files_per_subjob
The maximum number of input files that will be used for each SubJob created.
Definition backends.py:221
__init__(self, *, arguments_generator=None, max_subjobs=1000)
Definition backends.py:245
max_subjobs
The maximum number of SubJob objects to be created, input files are split evenly between them.
Definition backends.py:252
job_id
job id given by PBS
Definition backends.py:1447
_get_status_from_output(self, output)
Definition backends.py:1499
__init__(self, job, job_id)
Definition backends.py:1440
_update_result_status(self, qstat_output)
Definition backends.py:1462
dict backend_code_to_status
PBS statuses mapped to Job statuses.
Definition backends.py:1429
create_job_record_from_element(job_elem)
Definition backends.py:1601
_create_parent_job_result(cls, parent)
Definition backends.py:1417
_submit_to_batch(cls, cmd)
Definition backends.py:1409
_create_cmd(self, script_path)
Definition backends.py:1401
_add_batch_directives(self, job, batch_file)
Definition backends.py:1378
can_submit(self, njobs=1)
Definition backends.py:1509
qstat(cls, username="", job_ids=None)
Definition backends.py:1533
__init__(self, *, backend_args=None)
Definition backends.py:1373
_create_job_result(cls, job, batch_output)
Definition backends.py:1394
update_status(self)
Definition backends.py:901
job
Job object for result.
Definition backends.py:878
exit_code_file_initial_time
Time we started waiting for the exit code file to appear.
Definition backends.py:885
bool _is_ready
Quicker way to know if it's ready once it has already been found.
Definition backends.py:880
time_to_wait_for_exit_code_file
After our first attempt to view the exit code file once the job is 'finished', how long should we wai...
Definition backends.py:883
__init__(self, job)
Definition backends.py:873
get_exit_code_from_file(self)
Definition backends.py:908
__init__(self, job, subjob_id, input_files=None)
Definition backends.py:684
parent
Job() instance of parent to this SubJob.
Definition backends.py:690
__getattr__(self, attribute)
Definition backends.py:759
id
Id of Subjob.
Definition backends.py:688
assign_arguments(self, job)
Definition backends.py:178
__init__(self, *, arguments_generator=None)
Definition backends.py:165
arguments_generator
The ArgumentsGenerator used when creating subjobs.
Definition backends.py:170
create_subjobs(self, job)
Definition backends.py:173
STL class.