Belle II Software development
backends.py
1#!/usr/bin/env python3
2
3
10
11from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
12import re
13import os
14from abc import ABC, abstractmethod
15import json
16import xml.etree.ElementTree as ET
17from math import ceil
18from pathlib import Path
19from collections import deque
20from itertools import count, takewhile
21import shutil
22import time
23from datetime import datetime, timedelta
24import subprocess
25import multiprocessing as mp
26
27from caf.utils import method_dispatch
28from caf.utils import decode_json_string
29from caf.utils import grouper
30from caf.utils import parse_file_uri
31
32
33__all__ = ["Job", "SubJob", "Backend", "Local", "Batch", "LSF", "PBS", "HTCondor", "get_input_data"]
34
35
36_input_data_file_path = Path("__BACKEND_INPUT_FILES__.json")
37
38_STDOUT_FILE = "stdout"
39
40_STDERR_FILE = "stderr"
41
42_backend_job_envvars = (
43 "TMPDIR",
44 "BELLE2_CONFIG_DIR",
45 "VO_BELLE2_SW_DIR",
46 "BELLE2_EXTERNALS_TOPDIR",
47 "BELLE2_CONDB_METADATA",
48 "BELLE2_CONDB_PROXY",
49)
50
51
52def get_input_data():
53 """
54 Simple JSON load of the default input data file. Will contain a list of string file paths
55 for use by the job process.
56 """
57 with open(_input_data_file_path) as input_data_file:
58 input_data = json.load(input_data_file)
59 return input_data
60
61
62def monitor_jobs(args, jobs):
63 unfinished_jobs = jobs[:]
64 failed_jobs = 0
65 while unfinished_jobs:
66 B2INFO("Updating statuses of unfinished jobs...")
67 for j in unfinished_jobs:
68 j.update_status()
69 B2INFO("Checking if jobs are ready...")
70 for j in unfinished_jobs[:]:
71 if j.ready():
72 if j.status == "failed":
73 B2ERROR(f"{j} is failed")
74 failed_jobs += 1
75 else:
76 B2INFO(f"{j} is finished")
77 unfinished_jobs.remove(j)
78 if unfinished_jobs:
79 B2INFO(f"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
80 time.sleep(args.heartbeat)
81 if failed_jobs > 0:
82 B2ERROR(f"{failed_jobs} jobs failed")
83 else:
84 B2INFO('All jobs finished successfully')
85
86
88 """
89 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
90 initialise it. This lets us reuse a generator by setting it up again fresh. This is not
91 optimal for expensive calculations, but it is nice for making large sequences of
92 Job input arguments on the fly.
93 """
94
95 def __init__(self, generator_function, *args, **kwargs):
96 """
97 Parameters:
98 generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
99 should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
100 yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
101 args (tuple): The positional arguments you want to send into the initialisation of the generator.
102 kwargs (dict): The keyword arguments you want to send into the initialisation of the generator.
103 """
104
105 self.generator_function = generator_function
106
107 self.args = args
108
109 self.kwargs = kwargs
110
111 @property
112 def generator(self):
113 """
114 Returns:
115 generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
116 to have ``next``/``send`` called on it.
117 """
118 gen = self.generator_function(*self.args, **self.kwargs)
119 gen.send(None) # Prime it
120 return gen
121
122
123def range_arguments(start=0, stop=None, step=1):
124 """
125 A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
126 It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
127 The `SubJob` object is sent into this function with `send` but is not used.
128
129 Parameters:
130 start (int): The starting value that will be returned.
131 stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
132 forever.
133 step (int): The step size.
134
135 Returns:
136 tuple
137 """
138 def should_stop(x):
139 if stop is not None and x >= stop:
140 return True
141 else:
142 return False
143 # The logic of send/yield is tricky. Basically the first yield must have been hit before send(subjob) can work.
144 # However the first yield is also the one that receives the send(subjob) NOT the send(None).
145 subjob = (yield None) # Handle the send(None) and first send(subjob)
146 args = None
147 for i in takewhile(lambda x: not should_stop(x), count(start, step)):
148 args = (i,) # We could have used the subjob object here
149 B2DEBUG(29, f"{subjob} arguments will be {args}")
150 subjob = (yield args) # Set up ready for the next loop (if it happens)
151
152
153class SubjobSplitter(ABC):
154 """
155 Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
156 The `create_subjobs` function should be implemented and used to construct
157 the subjobs of the parent job object.
158
159 Parameters:
160 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
161 tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
162 called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
163 arguments to return.
164 """
165
166 def __init__(self, *, arguments_generator=None):
167 """
168 Derived classes should call `super` to run this.
169 """
170
171 self.arguments_generator = arguments_generator
172
173 @abstractmethod
174 def create_subjobs(self, job):
175 """
176 Implement this method in derived classes to generate the `SubJob` objects.
177 """
178
179 def assign_arguments(self, job):
180 """
181 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
182 subjobs.
183 """
184 if self.arguments_generator:
185 arg_gen = self.arguments_generator.generator
186 generating = True
187 for subjob in sorted(job.subjobs.values(), key=lambda sj: sj.id):
188 if generating:
189 try:
190 args = arg_gen.send(subjob)
191 except StopIteration:
192 B2ERROR(f"StopIteration called when getting args for {subjob}, "
193 "setting all subsequent subjobs to have empty argument tuples.")
194 args = tuple() # If our generator finishes before the subjobs we use empty argument tuples
195 generating = False
196 else:
197 args = tuple()
198 B2DEBUG(29, f"Arguments for {subjob}: {args}")
199 subjob.args = args
200 return
201
202 B2INFO(f"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
203 "won't automatically have arguments assigned.")
204
205 def __repr__(self):
206 """
207 """
208 return f"{self.__class__.__name__}"
209
210
212 """
213
214 """
215
216 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
217 """
218 Parameters:
219 max_files_per_subjob (int): The maximum number of input files used per `SubJob` created.
220 """
221 super().__init__(arguments_generator=arguments_generator)
222
223 self.max_files_per_subjob = max_files_per_subjob
224
225 def create_subjobs(self, job):
226 """
227 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
228 in order to prevent the number of input files per subjob going over the limit set by
229 `MaxFilesSplitter.max_files_per_subjob`.
230 """
231 if not job.input_files:
232 B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
233 return
234
235 for i, subjob_input_files in enumerate(grouper(self.max_files_per_subjob, job.input_files)):
236 job.create_subjob(i, input_files=subjob_input_files)
237
238 self.assign_arguments(job)
239
240 B2INFO(f"{self} created {i+1} Subjobs for {job}")
241
242
244 """
245
246 """
247
248 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
249 """
250 Parameters:
251 max_subjobs (int): The maximum number ofsubjobs that will be created.
252 """
253 super().__init__(arguments_generator=arguments_generator)
254
255 self.max_subjobs = max_subjobs
256
257 def create_subjobs(self, job):
258 """
259 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
260 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
261 more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
262 respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
263 will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
264 """
265 if not job.input_files:
266 B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
267 return
268
269 # Since we will be popping from the left of the input file list, we construct a deque
270 remaining_input_files = deque(job.input_files)
271 # The initial number of available subjobs, will go down as we create them
272 available_subjobs = self.max_subjobs
273 subjob_i = 0
274 while remaining_input_files:
275 # How many files should we use for this subjob?
276 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
277 # Pop them from the remaining files
278 subjob_input_files = []
279 for i in range(num_input_files):
280 subjob_input_files.append(remaining_input_files.popleft())
281 # Create the actual subjob
282 job.create_subjob(subjob_i, input_files=subjob_input_files)
283 subjob_i += 1
284 available_subjobs -= 1
285
286 self.assign_arguments(job)
287 B2INFO(f"{self} created {subjob_i} Subjobs for {job}")
288
289
291 """
292 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
293 Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
294 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
295
296 This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
297 numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
298 assigned to every `SubJob`.
299
300 Parameters:
301 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
302 """
303
304 def __init__(self, *, arguments_generator=None, max_subjobs=None):
305 """
306 """
307 super().__init__(arguments_generator=arguments_generator)
308
309 self.max_subjobs = max_subjobs
310
311 def create_subjobs(self, job):
312 """
313 This function creates subjobs for the parent job passed in. It creates subjobs until the
314 `SubjobSplitter.arguments_generator` finishes.
315
316 If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
317 subjobs are created.
318 """
319 arg_gen = self.arguments_generator.generator
320 for i in count():
321 # Reached the maximum?
322 if i >= self.max_subjobs:
323 raise SplitterError(f"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
324 try:
325 subjob = SubJob(job, i, job.input_files) # We manually create it because we might not need it
326 args = arg_gen.send(subjob) # Might throw StopIteration
327 B2INFO(f"Creating {job}.{subjob}")
328 B2DEBUG(29, f"Arguments for {subjob}: {args}")
329 subjob.args = args
330 job.subjobs[i] = subjob
331 except StopIteration:
332 break
333 B2INFO(f"{self} created {i+1} Subjobs for {job}")
334
335
336class Job:
337 """
338 This generic Job object is used to tell a Backend what to do.
339 This object basically holds necessary information about a process you want to submit to a `Backend`.
340 It should *not* do anything that is backend specific, just hold the configuration for a job to be
341 successfully submitted and monitored using a backend. The result attribute is where backend
342 specific job monitoring goes.
343
344 Parameters:
345 name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
346
347 .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
348 """
349
350
352 statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
353
354
355 exit_statuses = ["failed", "completed"]
356
357 def __init__(self, name, job_dict=None):
358 """
359 """
360
361 self.name = name
362
363 self.splitter = None
364
367
368 self.working_dir = Path()
369
370 self.output_dir = Path()
371
373
374 self.cmd = []
375
376 self.args = []
377
378 self.input_files = []
379
380 self.setup_cmds = []
381
383 self.backend_args = {}
384
385 self.subjobs = {}
386
387 if job_dict:
388 self.input_sandbox_files = [Path(p) for p in job_dict["input_sandbox_files"]]
389 self.working_dir = Path(job_dict["working_dir"])
390 self.output_dir = Path(job_dict["output_dir"])
391 self.output_patterns = job_dict["output_patterns"]
392 self.cmd = job_dict["cmd"]
393 self.args = job_dict["args"]
394 self.input_files = job_dict["input_files"]
395 self.setup_cmds = job_dict["setup_cmds"]
396 self.backend_args = job_dict["backend_args"]
397 for subjob_dict in job_dict["subjobs"]:
398 self.create_subjob(subjob_dict["id"], input_files=subjob_dict["input_files"], args=subjob_dict["args"])
399
400
402 self.result = None
403
404 self._status = "init"
405
406 def __repr__(self):
407 """
408 Representation of Job class (what happens when you print a Job() instance).
409 """
410 return f"Job({self.name})"
411
412 def ready(self):
413 """
414 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
415 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
416 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
417 """
418 if not self.result:
419 B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
420 return False
421 else:
422 return self.result.ready()
423
424 def update_status(self):
425 """
426 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
427 in the best way for the type of result object/backend.
428 """
429 if not self.result:
430 B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
431 else:
432 self.result.update_status()
433 return self.status
434
435 def create_subjob(self, i, input_files=None, args=None):
436 """
437 Creates a subjob Job object that references that parent Job.
438 Returns the SubJob object at the end.
439 """
440 if i not in self.subjobs:
441 B2INFO(f"Creating {self}.Subjob({i})")
442 subjob = SubJob(self, i, input_files)
443 if args:
444 subjob.args = args
445 self.subjobs[i] = subjob
446 return subjob
447 else:
448 B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
449
450 @property
451 def status(self):
452 """
453 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
454 subjob status in the hierarchy of statuses in `Job.statuses`.
455 """
456 if self.subjobs:
457 job_status = self._get_overall_status_from_subjobs()
458 if job_status != self._status:
459
460 self.status = job_status
461 return self._status
462
464 """
465 """
466 subjob_statuses = [subjob.status for subjob in self.subjobs.values()]
467 status_level = min([self.statuses[status] for status in subjob_statuses])
468 for status, level in self.statuses.items():
469 if level == status_level:
470 return status
471
472 @status.setter
473 def status(self, status):
474 """
475 Sets the status of this Job.
476 """
477 # Print an error only if the job failed.
478 if status == 'failed':
479 B2ERROR(f"Setting {self.name} status to failed")
480 else:
481 B2INFO(f"Setting {self.name} status to {status}")
482 self._status = status
483
484 # \cond silence doxygen warnings
485
486 @property
487 def output_dir(self):
488 return self._output_dir
489
490 @output_dir.setter
491 def output_dir(self, value):
492 self._output_dir = Path(value).absolute()
493
494 @property
495 def working_dir(self):
496 return self._working_dir
497
498 @working_dir.setter
499 def working_dir(self, value):
500 self._working_dir = Path(value).absolute()
501
502 @property
503 def input_sandbox_files(self):
504 return self._input_sandbox_files
505
506 @input_sandbox_files.setter
507 def input_sandbox_files(self, value):
508 self._input_sandbox_files = [Path(p).absolute() for p in value]
509
510 @property
511 def input_files(self):
512 return self._input_files
513
514 @input_files.setter
515 def input_files(self, value):
516 self._input_files = value
517
518 @property
519 def max_subjobs(self):
520 return self.splitter.max_subjobs
521
522 @max_subjobs.setter
523 def max_subjobs(self, value):
524 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
525 B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
526
527 @property
528 def max_files_per_subjob(self):
529 return self.splitter.max_files_per_subjob
530
531 @max_files_per_subjob.setter
532 def max_files_per_subjob(self, value):
533 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
534 B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
535
536 # \endcond
537
538 def dump_to_json(self, file_path):
539 """
540 Dumps the Job object configuration to a JSON file so that it can be read in again later.
541
542 Parameters:
543 file_path(`basf2.Path`): The filepath we'll dump to
544 """
545 # \cond false positive doxygen warning about job_dict
546 with open(file_path, mode="w") as job_file:
547 json.dump(self.job_dict, job_file, indent=2)
548 # \endcond
549
550 @classmethod
551 def from_json(cls, file_path):
552 """
553 """
554 with open(file_path) as job_file:
555 job_dict = json.load(job_file)
556 return cls(job_dict["name"], job_dict=job_dict)
557
558 @property
559 def job_dict(self):
560 """
561 Returns:
562 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
563 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
564 """
565 job_dict = {}
566 job_dict["name"] = self.name
567 job_dict["input_sandbox_files"] = [i.as_posix() for i in self.input_sandbox_files]
568 job_dict["working_dir"] = self.working_dir.as_posix()
569 job_dict["output_dir"] = self.output_dir.as_posix()
570 job_dict["output_patterns"] = self.output_patterns
571 job_dict["cmd"] = self.cmd
572 job_dict["args"] = self.args
573 job_dict["input_files"] = self.input_files
574 job_dict["setup_cmds"] = self.setup_cmds
575 job_dict["backend_args"] = self.backend_args
576 job_dict["subjobs"] = [sj.job_dict for sj in self.subjobs.values()]
577 return job_dict
578
580 """
581 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
582 string URI objects.
583 """
584 with open(Path(self.working_dir, _input_data_file_path), mode="w") as input_data_file:
585 json.dump(self.input_files, input_data_file, indent=2)
586
588 """
589 Get all of the requested files for the input sandbox and copy them to the working directory.
590 Files like the submit.sh or input_data.json are not part of this process.
591 """
592 for file_path in self.input_sandbox_files:
593 if file_path.is_dir():
594 shutil.copytree(file_path, Path(self.working_dir, file_path.name))
595 else:
596 shutil.copy(file_path, self.working_dir)
597
599 """
600 Check the input files and make sure that there aren't any duplicates.
601 Also check if the files actually exist if possible.
602 """
603 existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
604 for file_path in self.input_files:
605 file_uri = parse_file_uri(file_path)
606 if file_uri.scheme == "file":
607 p = Path(file_uri.path)
608 if p.is_file():
609 if file_uri.geturl() not in existing_input_files:
610 existing_input_files.append(file_uri.geturl())
611 else:
612 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
613 else:
614 B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
615 else:
616 B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
617 if file_path not in existing_input_files:
618 existing_input_files.append(file_path)
619 else:
620 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
621 if self.input_files and not existing_input_files:
622 B2WARNING(f"No valid input file paths found for {self.name}, but some were requested.")
623
624 # Replace the Job's input files with the ones that exist + duplicates removed
625 self.input_files = existing_input_files
626
627 @property
628 def full_command(self):
629 """
630 Returns:
631 str: The full command that this job will run including any arguments.
632 """
633 all_components = self.cmd[:]
634 all_components.extend(self.args)
635 # We do a convert to string just in case arguments were generated as different types
636 full_command = " ".join(map(str, all_components))
637 B2DEBUG(29, f"Full command of {self} is '{full_command}'")
638 return full_command
639
641 """
642 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
643 It should detect if you are using a local release or CVMFS and append the correct commands
644 so that the job will have the same basf2 release environment. It should also detect
645 if a local release is not compiled with the ``opt`` option.
646
647 Note that this *doesn't mean that every environment variable is inherited* from the submitting
648 process environment.
649 """
650 def append_environment_variable(cmds, envvar):
651 """
652 Append a command for setting an environment variable.
653 """
654 if envvar in os.environ:
655 cmds.append(f"""if [ -z "${{{envvar}}}" ]; then""")
656 cmds.append(f" export {envvar}={os.environ[envvar]}")
657 cmds.append("fi")
658
659 if "BELLE2_TOOLS" not in os.environ:
660 raise BackendError("No BELLE2_TOOLS found in environment")
661 # Export all the environment variables defined via _backend_job_envvars
662 for envvar in _backend_job_envvars:
663 append_environment_variable(self.setup_cmds, envvar)
664 if "BELLE2_RELEASE" in os.environ:
665 self.setup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
666 elif 'BELLE2_LOCAL_DIR' in os.environ:
667 self.setup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
668 self.setup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
669 self.setup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
670 self.setup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
671 self.setup_cmds.append("pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
672 self.setup_cmds.append("source $BACKEND_B2SETUP")
673 # b2code-option has to be executed only after the source of the tools.
674 self.setup_cmds.append("b2code-option $BACKEND_BELLE2_OPTION")
675 self.setup_cmds.append("popd > /dev/null")
676
677
678class SubJob(Job):
679 """
680 This mini-class simply holds basic information about which subjob you are
681 and a reference to the parent Job object to be able to access the main data there.
682 Rather than replicating all of the parent job's configuration again.
683 """
684
685 def __init__(self, job, subjob_id, input_files=None):
686 """
687 """
688
689 self.id = subjob_id
690
691 self.parent = job
692
693 if not input_files:
694 input_files = []
695 self.input_files = input_files
696
698 self.result = None
699
700 self._status = "init"
701
702 self.args = []
703
704 @property
705 def output_dir(self):
706 """
707 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
708 return Path(self.parent.output_dir, str(self.id))
709
710 @property
711 def working_dir(self):
712 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
713 return Path(self.parent.working_dir, str(self.id))
714
715 @property
716 def name(self):
717 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
718 return "_".join((self.parent.name, str(self.id)))
719
720 @property
721 def status(self):
722 """
723 Returns the status of this SubJob.
724 """
725 return self._status
726
727 @status.setter
728 def status(self, status):
729 """
730 Sets the status of this Job.
731 """
732 # Print an error only if the job failed.
733 if status == "failed":
734 B2ERROR(f"Setting {self.name} status to failed")
735 else:
736 B2INFO(f"Setting {self.name} status to {status}")
737 self._status = status
738
739 @property
740 def subjobs(self):
741 """
742 A subjob cannot have subjobs. Always return empty list.
743 """
744 return []
745
746 @property
747 def job_dict(self):
748 """
749 Returns:
750 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
751 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
752 we only output the input files and arguments that are specific to this subjob and no other details.
753 """
754 job_dict = {}
755 job_dict["id"] = self.id
756 job_dict["input_files"] = self.input_files
757 job_dict["args"] = self.args
758 return job_dict
759
760 def __getattr__(self, attribute):
761 """
762 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
763 unless otherwise specified.
764 """
765 return getattr(self.parent, attribute)
766
767 def __repr__(self):
768 """
769 """
770 return f"SubJob({self.name})"
771
772
773class Backend(ABC):
774 """
775 Abstract base class for a valid backend.
776 Classes derived from this will implement their own submission of basf2 jobs
777 to whatever backend they describe.
778 Some common methods/attributes go into this base class.
779
780 For backend_args the priority from lowest to highest is:
781
782 backend.default_backend_args -> backend.backend_args -> job.backend_args
783 """
784
785 submit_script = "submit.sh"
786
787 exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
788
789 default_backend_args = {}
790
791 def __init__(self, *, backend_args=None):
792 """
793 """
794 if backend_args is None:
795 backend_args = {}
796
797 self.backend_args = {**self.default_backend_args, **backend_args}
798
799 @abstractmethod
800 def submit(self, job):
801 """
802 Base method for submitting collection jobs to the backend type. This MUST be
803 implemented for a correctly written backend class deriving from Backend().
804 """
805
806 @staticmethod
807 def _add_setup(job, batch_file):
808 """
809 Adds setup lines to the shell script file.
810 """
811 for line in job.setup_cmds:
812 print(line, file=batch_file)
813
814 def _add_wrapper_script_setup(self, job, batch_file):
815 """
816 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
817 `trap` statements for Ctrl-C situations.
818 """
819 start_wrapper = f"""# ---
820# trap ctrl-c and call ctrl_c()
821trap '(ctrl_c 130)' SIGINT
822trap '(ctrl_c 143)' SIGTERM
823
824function write_exit_code() {{
825 echo "Writing $1 to exit status file"
826 echo "$1" > {self.exit_code_file}
827 exit $1
828}}
829
830function ctrl_c() {{
831 trap '' SIGINT SIGTERM
832 echo "** Trapped Ctrl-C **"
833 echo "$1" > {self.exit_code_file}
834 exit $1
835}}
836# ---"""
837 print(start_wrapper, file=batch_file)
838
839 def _add_wrapper_script_teardown(self, job, batch_file):
840 """
841 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
842 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
843 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
844 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
845 file.
846 """
847 end_wrapper = """# ---
848write_exit_code $?"""
849 print(end_wrapper, file=batch_file)
850
851 @classmethod
853 """
854 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
855 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
856 statuses and allows the use of ready().
857 """
858 raise NotImplementedError
859
861 """
862 Construct the Path object of the bash script file that we will submit. It will contain
863 the actual job command, wrapper commands, setup commands, and any batch directives
864 """
865 return Path(job.working_dir, self.submit_script)
866
867
868class Result():
869 """
870 Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
871 submitted to a backend. It provides a way to query a job's status to find out if it's ready.
872 """
873
874 def __init__(self, job):
875 """
876 Pass in the job object to allow the result to access the job's properties and do post-processing.
877 """
878
879 self.job = job
880
881 self._is_ready = False
882
884 self.time_to_wait_for_exit_code_file = timedelta(minutes=5)
885
887
888 def ready(self):
889 """
890 Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
891 the 'readiness' based on the known job status.
892 """
893 B2DEBUG(29, f"Calling {self.job}.result.ready()")
894 if self._is_ready:
895 return True
896 elif self.job.status in self.job.exit_statuses:
897 self._is_ready = True
898 return True
899 else:
900 return False
901
902 def update_status(self):
903 """
904 Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
905 actually look up the job's status from some database/exit code file.
906 """
907 raise NotImplementedError
908
910 """
911 Read the exit code file to discover the exit status of the job command. Useful fallback if the job is no longer
912 known to the job database (batch system purged it for example). Since some backends may take time to download
913 the output files of the job back to the working directory we use a time limit on how long to wait.
914 """
915 if not self.exit_code_file_initial_time:
916 self.exit_code_file_initial_time = datetime.now()
917 exit_code_path = Path(self.job.working_dir, Backend.exit_code_file)
918 with open(exit_code_path) as f:
919 exit_code = int(f.read().strip())
920 B2DEBUG(29, f"Exit code from file for {self.job} was {exit_code}")
921 return exit_code
922
923
925 """
926 Backend for local processes i.e. on the same machine but in a subprocess.
927
928 Note that you should call the self.join() method to close the pool and wait for any
929 running processes to finish before exiting the process. Once you've called join you will have to set up a new
930 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
931 somewhere, then the main python process might end before your pool is done.
932
933 Keyword Arguments:
934 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
935 It's the maximum simultaneous subjobs.
936 Try not to specify a large number or a number larger than the number of cores.
937 It won't crash the program but it will slow down and negatively impact performance.
938 """
939
940 def __init__(self, *, backend_args=None, max_processes=1):
941 """
942 """
943 super().__init__(backend_args=backend_args)
944
945 self.pool = None
946
947 self.max_processes = max_processes
948
950 """
951 Result class to help monitor status of jobs submitted by Local backend.
952 """
953
954 def __init__(self, job, result):
955 """
956 Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
957 post processing of the job.
958 """
959 super().__init__(job)
960
961 self.result = result
962
964 """
965 Update result status
966 """
967 if self.result.ready() and (self.job.status not in self.job.exit_statuses):
968 return_code = self.result.get()
969 if return_code:
970 self.job.status = "failed"
971 else:
972 self.job.status = "completed"
973
974 def update_status(self):
975 """
976 Update the job's (or subjobs') status by calling the result object.
977 """
978 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
979 if self.job.subjobs:
980 for subjob in self.job.subjobs.values():
981 subjob.result._update_result_status()
982 else:
984
985 def join(self):
986 """
987 Closes and joins the Pool, letting you wait for all results currently
988 still processing.
989 """
990 B2INFO("Joining Process Pool, waiting for results to finish...")
991 self.pool.close()
992 self.pool.join()
993 B2INFO("Process Pool joined.")
994
995 @property
996 def max_processes(self):
997 """
998 Getter for max_processes
999 """
1000 return self._max_processes
1001
1002 @max_processes.setter
1003 def max_processes(self, value):
1004 """
1005 Setter for max_processes, we also check for a previous Pool(), wait for it to join
1006 and then create a new one with the new value of max_processes
1007 """
1008
1009 self._max_processes = value
1010 if self.pool:
1011 B2INFO("New max_processes requested. But a pool already exists.")
1012 self.join()
1013 B2INFO(f"Starting up new Pool with {self.max_processes} processes")
1014 self.pool = mp.Pool(processes=self.max_processes)
1015
1016 @method_dispatch
1017 def submit(self, job):
1018 """
1019 """
1020 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1021 "Did you submit a (Sub)Job?")
1022
1023 @submit.register(SubJob)
1024 def _(self, job):
1025 """
1026 Submission of a `SubJob` for the Local backend
1027 """
1028 # Make sure the output directory of the job is created
1029 job.output_dir.mkdir(parents=True, exist_ok=True)
1030 # Make sure the working directory of the job is created
1031 job.working_dir.mkdir(parents=True, exist_ok=True)
1032 job.copy_input_sandbox_files_to_working_dir()
1033 job.dump_input_data()
1034 # Get the path to the bash script we run
1035 script_path = self.get_submit_script_path(job)
1036 with open(script_path, mode="w") as batch_file:
1037 print("#!/bin/bash", file=batch_file)
1038 self._add_wrapper_script_setup(job, batch_file)
1039 self._add_setup(job, batch_file)
1040 print(job.full_command, file=batch_file)
1041 self._add_wrapper_script_teardown(job, batch_file)
1042 B2INFO(f"Submitting {job}")
1043 job.result = Local.LocalResult(job,
1044 self.pool.apply_async(self.run_job,
1045 (job.name,
1046 job.working_dir,
1047 job.output_dir,
1048 script_path)
1049 )
1050 )
1051 job.status = "submitted"
1052 B2INFO(f"{job} submitted")
1053
1054 @submit.register(Job)
1055 def _(self, job):
1056 """
1057 Submission of a `Job` for the Local backend
1058 """
1059 # Make sure the output directory of the job is created
1060 job.output_dir.mkdir(parents=True, exist_ok=True)
1061 # Make sure the working directory of the job is created
1062 job.working_dir.mkdir(parents=True, exist_ok=True)
1063 # Check if we have any valid input files
1064 job.check_input_data_files()
1065
1066 if not job.splitter:
1067 # Get all of the requested files for the input sandbox and copy them to the working directory
1068 job.copy_input_sandbox_files_to_working_dir()
1069 job.dump_input_data()
1070 # Get the path to the bash script we run
1071 script_path = self.get_submit_script_path(job)
1072 with open(script_path, mode="w") as batch_file:
1073 print("#!/bin/bash", file=batch_file)
1074 self._add_wrapper_script_setup(job, batch_file)
1075 self._add_setup(job, batch_file)
1076 print(job.full_command, file=batch_file)
1077 self._add_wrapper_script_teardown(job, batch_file)
1078 B2INFO(f"Submitting {job}")
1079 job.result = Local.LocalResult(job,
1080 self.pool.apply_async(self.run_job,
1081 (job.name,
1082 job.working_dir,
1083 job.output_dir,
1084 script_path)
1085 )
1086 )
1087 B2INFO(f"{job} submitted")
1088 else:
1089 # Create subjobs according to the splitter's logic
1090 job.splitter.create_subjobs(job)
1091 # Submit the subjobs
1092 self.submit(list(job.subjobs.values()))
1093 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1095
1096 @submit.register(list)
1097 def _(self, jobs):
1098 """
1099 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1100 """
1101 # Submit the jobs
1102 for job in jobs:
1103 self.submit(job)
1104 B2INFO("All requested jobs submitted.")
1105
1106 @staticmethod
1107 def run_job(name, working_dir, output_dir, script):
1108 """
1109 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1110 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1111 """
1112 B2INFO(f"Starting Sub-process: {name}")
1113 from subprocess import Popen
1114 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1115 stderr_file_path = Path(working_dir, _STDERR_FILE)
1116 # Create unix command to redirect stdour and stderr
1117 B2INFO(f"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1118 with open(stdout_file_path, mode="w", buffering=1) as f_out, \
1119 open(stderr_file_path, mode="w", buffering=1) as f_err:
1120 with Popen(["/bin/bash", script.as_posix()],
1121 stdout=f_out,
1122 stderr=f_err,
1123 bufsize=1,
1124 universal_newlines=True,
1125 cwd=working_dir,
1126 env={}) as p:
1127 # We block here and wait so that the return code will be set.
1128 p.wait()
1129 B2INFO(f"Subprocess {name} finished.")
1130 return p.returncode
1131
1132 @classmethod
1134 parent.result = cls.LocalResult(parent, None)
1135
1136
1138 """
1139 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1140 in a derived class. Do not use this class directly!
1141 """
1142
1143 submission_cmds = []
1144
1156 default_global_job_limit = 1000
1157
1158 default_sleep_between_submission_checks = 30
1159
1160 def __init__(self, *, backend_args=None):
1161 """
1162 Init method for Batch Backend. Does some basic default setup.
1163 """
1164 super().__init__(backend_args=backend_args)
1165
1168
1171
1172 def _add_batch_directives(self, job, file):
1173 """
1174 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1175 You should think about where the stdout/err should go, and set the queue name.
1176 """
1177 raise NotImplementedError("Need to implement a _add_batch_directives(self, job, file) "
1178 f"method in {self.__class__.__name__} backend.")
1179
1180 def _make_submit_file(self, job, submit_file_path):
1181 """
1182 Useful for the HTCondor backend where a submit is needed instead of batch
1183 directives pasted directly into the submission script. It should be overwritten
1184 if needed.
1185 """
1186
1187 @classmethod
1188 @abstractmethod
1189 def _submit_to_batch(cls, cmd):
1190 """
1191 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1192 """
1193
1194 def can_submit(self, *args, **kwargs):
1195 """
1196 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1197 This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1198
1199 Returns:
1200 bool: If the job submission can continue based on the current situation.
1201 """
1202 return True
1203
1204 @method_dispatch
1205 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1206 """
1207 """
1208 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1209 "Did you submit a (Sub)Job?")
1210
1211 # Doxygen doesn't understand @method_dispatch overloads (def _) and emits
1212 # "no uniquely matching class member found" warnings. Hide them from doxygen.
1213 # @cond
1214 @submit.register(SubJob)
1215 def _(self, job, check_can_submit=True, jobs_per_check=100):
1216 """
1217 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1218 create batch script, and send it off with the batch submission command.
1219 It should apply the correct options (default and user requested).
1220
1221 Should set a Result object as an attribute of the job.
1222 """
1223 # Make sure the output directory of the job is created, commented out due to permission issues
1224 # job.output_dir.mkdir(parents=True, exist_ok=True)
1225 # Make sure the working directory of the job is created
1226 job.working_dir.mkdir(parents=True, exist_ok=True)
1227 job.copy_input_sandbox_files_to_working_dir()
1228 job.dump_input_data()
1229 # Make submission file if needed
1230 batch_submit_script_path = self.get_batch_submit_script_path(job)
1231 self._make_submit_file(job, batch_submit_script_path)
1232 # Get the bash file we will actually run, might be the same file
1233 script_path = self.get_submit_script_path(job)
1234 # Construct the batch submission script (with directives if that is supported)
1235 with open(script_path, mode="w") as batch_file:
1236 self._add_batch_directives(job, batch_file)
1237 self._add_wrapper_script_setup(job, batch_file)
1238 self._add_setup(job, batch_file)
1239 print(job.full_command, file=batch_file)
1240 self._add_wrapper_script_teardown(job, batch_file)
1241 os.chmod(script_path, 0o755)
1242 B2INFO(f"Submitting {job}")
1243 # Do the actual batch submission
1244 cmd = self._create_cmd(batch_submit_script_path)
1245 output = self._submit_to_batch(cmd)
1246 self._create_job_result(job, output)
1247 job.status = "submitted"
1248 B2INFO(f"{job} submitted")
1249
1250 @submit.register(Job)
1251 def _(self, job, check_can_submit=True, jobs_per_check=100):
1252 """
1253 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1254 and send it off with the batch submission command, applying the correct options (default and user requested.)
1255
1256 Should set a Result object as an attribute of the job.
1257 """
1258 # Make sure the output directory of the job is created, commented out due to permissions issue
1259 # job.output_dir.mkdir(parents=True, exist_ok=True)
1260 # Make sure the working directory of the job is created
1261 job.working_dir.mkdir(parents=True, exist_ok=True)
1262 # Check if we have any valid input files
1263 job.check_input_data_files()
1264 # Add any required backend args that are missing (I'm a bit hesitant to actually merge with job.backend_args)
1265 # just in case you want to resubmit the same job with different backend settings later.
1266 # job_backend_args = {**self.backend_args, **job.backend_args}
1267
1268 # If there's no splitter then we just submit the Job with no SubJobs
1269 if not job.splitter:
1270 # Get all of the requested files for the input sandbox and copy them to the working directory
1271 job.copy_input_sandbox_files_to_working_dir()
1272 job.dump_input_data()
1273 # Make submission file if needed
1274 batch_submit_script_path = self.get_batch_submit_script_path(job)
1275 self._make_submit_file(job, batch_submit_script_path)
1276 # Get the bash file we will actually run
1277 script_path = self.get_submit_script_path(job)
1278 # Construct the batch submission script (with directives if that is supported)
1279 with open(script_path, mode="w") as batch_file:
1280 self._add_batch_directives(job, batch_file)
1281 self._add_wrapper_script_setup(job, batch_file)
1282 self._add_setup(job, batch_file)
1283 print(job.full_command, file=batch_file)
1284 self._add_wrapper_script_teardown(job, batch_file)
1285 os.chmod(script_path, 0o755)
1286 B2INFO(f"Submitting {job}")
1287 # Do the actual batch submission
1288 cmd = self._create_cmd(batch_submit_script_path)
1289 output = self._submit_to_batch(cmd)
1290 self._create_job_result(job, output)
1291 job.status = "submitted"
1292 B2INFO(f"{job} submitted")
1293 else:
1294 # Create subjobs according to the splitter's logic
1295 job.splitter.create_subjobs(job)
1296 # Submit the subjobs
1297 self.submit(list(job.subjobs.values()))
1298 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1300
1301 @submit.register(list)
1302 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1303 """
1304 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1305 """
1306 B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1307 # Technically this could be a list of Jobs or SubJobs. And if it is a list of Jobs then it might not
1308 # be necessary to check if we can submit right now. We could do it later during the submission of the
1309 # SubJob list. However in the interest of simpler code we just do the check here, and re-check again
1310 # if a SubJob list comes through this function. Slightly inefficient, but much simpler logic.
1311
1312 # The first thing to do is make sure that we are iterating through the jobs list in chunks that are
1313 # equal to or smaller than the global limit. Otherwise nothing will ever submit.
1314
1315 if jobs_per_check > self.global_job_limit:
1316 B2INFO(f"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1317 f"limit for this backend (={self.global_job_limit}). Will instead use the "
1318 " value of the global job limit.")
1319 jobs_per_check = self.global_job_limit
1320
1321 # We group the jobs list into chunks of length jobs_per_check
1322 for jobs_to_submit in grouper(jobs_per_check, jobs):
1323 # Wait until we are allowed to submit
1324 while not self.can_submit(njobs=len(jobs_to_submit)):
1325 B2INFO("Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1326 time.sleep(self.sleep_between_submission_checks)
1327 else:
1328 # We loop here since we have already checked if the number of jobs is low enough, we don't want to hit this
1329 # function again unless one of the jobs has subjobs.
1330 B2INFO(f"Submitting the next {len(jobs_to_submit)} jobs...")
1331 for job in jobs_to_submit:
1332 self.submit(job, check_can_submit, jobs_per_check)
1333 B2INFO(f"All {len(jobs)} requested jobs submitted")
1334 # @endcond # end hiding of @method_dispatch overloads
1335
1337 """
1338 Construct the Path object of the script file that we will submit using the batch command.
1339 For most batch backends this is the same script as the bash script we submit.
1340 But for some they require a separate submission file that describes the job.
1341 To implement that you can implement this function in the Backend class.
1342 """
1343 return Path(job.working_dir, self.submit_script)
1344
1345 @classmethod
1346 @abstractmethod
1347 def _create_job_result(cls, job, batch_output):
1348 """
1349 """
1350
1351 @abstractmethod
1352 def _create_cmd(self, job):
1353 """
1354 """
1355
1356
1357class PBS(Batch):
1358 """
1359 Backend for submitting calibration processes to a qsub batch system.
1360 """
1361
1362 cmd_wkdir = "#PBS -d"
1363
1364 cmd_stdout = "#PBS -o"
1365
1366 cmd_stderr = "#PBS -e"
1367
1368 cmd_queue = "#PBS -q"
1369
1370 cmd_name = "#PBS -N"
1371
1372 submission_cmds = ["qsub"]
1373
1374 default_global_job_limit = 5000
1375
1376 default_backend_args = {"queue": "short"}
1377
1378 def __init__(self, *, backend_args=None):
1379 """
1380 """
1381 super().__init__(backend_args=backend_args)
1382
1383 def _add_batch_directives(self, job, batch_file):
1384 """
1385 Add PBS directives to submitted script.
1386 """
1387 job_backend_args = {**self.backend_args, **job.backend_args}
1388 batch_queue = job_backend_args["queue"]
1389 print("#!/bin/bash", file=batch_file)
1390 print("# --- Start PBS ---", file=batch_file)
1391 print(" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1392 print(" ".join([PBS.cmd_name, job.name]), file=batch_file)
1393 print(" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1394 print(" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1395 print(" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1396 print("# --- End PBS ---", file=batch_file)
1397
1398 @classmethod
1399 def _create_job_result(cls, job, batch_output):
1400 """
1401 """
1402 job_id = batch_output.replace("\n", "")
1403 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1404 job.result = cls.PBSResult(job, job_id)
1405
1406 def _create_cmd(self, script_path):
1407 """
1408 """
1409 submission_cmd = self.submission_cmds[:]
1410 submission_cmd.append(script_path.as_posix())
1411 return submission_cmd
1412
1413 @classmethod
1414 def _submit_to_batch(cls, cmd):
1415 """
1416 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1417 """
1418 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
1419 return sub_out
1420
1421 @classmethod
1423 parent.result = cls.PBSResult(parent, None)
1424
1426 """
1427 Simple class to help monitor status of jobs submitted by `PBS` Backend.
1428
1429 You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1430 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1431 """
1432
1433
1434 backend_code_to_status = {"R": "running",
1435 "C": "completed",
1436 "FINISHED": "completed",
1437 "E": "failed",
1438 "H": "submitted",
1439 "Q": "submitted",
1440 "T": "submitted",
1441 "W": "submitted",
1442 "H": "submitted"
1443 }
1444
1445 def __init__(self, job, job_id):
1446 """
1447 Pass in the job object and the job id to allow the result to do monitoring and perform
1448 post processing of the job.
1449 """
1450 super().__init__(job)
1451
1452 self.job_id = job_id
1453
1454 def update_status(self):
1455 """
1456 Update the job's (or subjobs') status by calling qstat.
1457 """
1458 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
1459 # Get all jobs info and reuse it for each status update to minimise tie spent on this updating.
1460 qstat_output = PBS.qstat()
1461 if self.job.subjobs:
1462 for subjob in self.job.subjobs.values():
1463 subjob.result._update_result_status(qstat_output)
1464 else:
1465 self._update_result_status(qstat_output)
1466
1467 def _update_result_status(self, qstat_output):
1468 """
1469 Parameters:
1470 qstat_output (dict): The JSON output of a previous call to qstat which we can reuse to find the
1471 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1472 'job_state' information, otherwise it is useless.
1473
1474 """
1475 try:
1476 backend_status = self._get_status_from_output(qstat_output)
1477 except KeyError:
1478 # If this happens then maybe the job id wasn't in the qstat_output argument because it finished.
1479 # Instead of failing immediately we try looking for the exit code file and then fail if it still isn't there.
1480 B2DEBUG(29, f"Checking of the exit code from file for {self.job}")
1481 try:
1482 exit_code = self.get_exit_code_from_file()
1483 except FileNotFoundError:
1484 waiting_time = datetime.now() - self.exit_code_file_initial_time
1485 if self.time_to_wait_for_exit_code_file > waiting_time:
1486 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1487 exit_code = 1
1488 else:
1489 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1490 return
1491 if exit_code:
1492 backend_status = "E"
1493 else:
1494 backend_status = "C"
1495
1496 try:
1497 new_job_status = self.backend_code_to_status[backend_status]
1498 except KeyError as err:
1499 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1500
1501 if new_job_status != self.job.status:
1502 self.job.status = new_job_status
1503
1504 def _get_status_from_output(self, output):
1505 """
1506 Get status from output
1507 """
1508 for job_info in output["JOBS"]:
1509 if job_info["Job_Id"] == self.job_id:
1510 return job_info["job_state"]
1511 else:
1512 raise KeyError
1513
1514 def can_submit(self, njobs=1):
1515 """
1516 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1517 Returns True if the number is lower that the limit, False if it is higher.
1518
1519 Parameters:
1520 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1521 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1522 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1523 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1524 and check again before submitting more.
1525 """
1526 B2DEBUG(29, "Calling PBS().can_submit()")
1527 job_info = self.qstat(username=os.environ["USER"])
1528 total_jobs = job_info["NJOBS"]
1529 B2INFO(f"Total jobs active in the PBS system is currently {total_jobs}")
1530 if (total_jobs + njobs) > self.global_job_limit:
1531 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1532 return False
1533 else:
1534 B2INFO("There is enough space to submit more jobs.")
1535 return True
1536
1537 @classmethod
1538 def qstat(cls, username="", job_ids=None):
1539 """
1540 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1541 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1542 by qstat.
1543
1544 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1545 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1546 This one should work for Melbourne's cloud computing centre.
1547
1548 Keyword Args:
1549 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1550 will be in the output dictionary.
1551 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1552 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1553 matching the other filters will be returned.
1554
1555 Returns:
1556 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1557
1558 .. code-block:: python
1559
1560 {
1561 "NJOBS": int
1562 "JOBS":[
1563 {
1564 <key: value>, ...
1565 }, ...
1566 ]
1567 }
1568 """
1569 B2DEBUG(29, f"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1570 if not job_ids:
1571 job_ids = []
1572 job_ids = set(job_ids)
1573 cmd_list = ["qstat", "-x"]
1574 # We get an XML serialisable summary from qstat. Requires the shell argument.
1575 cmd = " ".join(cmd_list)
1576 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1577 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1578 jobs_dict = {"NJOBS": 0, "JOBS": []}
1579 jobs_xml = ET.fromstring(output)
1580
1581 # For a specific job_id we can be a bit more efficient in XML parsing
1582 if len(job_ids) == 1:
1583 job_elem = jobs_xml.find(f"./Job[Job_Id='{list(job_ids)[0]}']")
1584 if job_elem:
1585 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job_elem))
1586 jobs_dict["NJOBS"] = 1
1587 return jobs_dict
1588
1589 # Since the username given is not exactly the same as the one that PBS stores (<username>@host)
1590 # we have to simply loop through rather than using XPATH.
1591 for job in jobs_xml.iterfind("Job"):
1592 job_owner = job.find("Job_Owner").text.split("@")[0]
1593 if username and username != job_owner:
1594 continue
1595 job_id = job.find("Job_Id").text
1596 if job_ids and job_id not in job_ids:
1597 continue
1598 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job))
1599 jobs_dict["NJOBS"] += 1
1600 # Remove it so that we don't keep checking for it
1601 if job_id in job_ids:
1602 job_ids.remove(job_id)
1603 return jobs_dict
1604
1605 @staticmethod
1607 """
1608 Creates a Job dictionary with various job information from the XML element returned by qstat.
1609
1610 Parameters:
1611 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1612
1613 Returns:
1614 dict: JSON serialisable dictionary of the Job information we are interested in.
1615 """
1616 job_dict = {}
1617 job_dict["Job_Id"] = job_elem.find("Job_Id").text
1618 job_dict["Job_Name"] = job_elem.find("Job_Name").text
1619 job_dict["Job_Owner"] = job_elem.find("Job_Owner").text
1620 job_dict["job_state"] = job_elem.find("job_state").text
1621 job_dict["queue"] = job_elem.find("queue").text
1622 return job_dict
1623
1624
1625class LSF(Batch):
1626 """
1627 Backend for submitting calibration processes to a qsub batch system.
1628 """
1629
1630 cmd_wkdir = "#BSUB -cwd"
1631
1632 cmd_stdout = "#BSUB -o"
1633
1634 cmd_stderr = "#BSUB -e"
1635
1636 cmd_queue = "#BSUB -q"
1637
1638 cmd_name = "#BSUB -J"
1639
1640 submission_cmds = ["bsub", "-env", "\"none\"", "<"]
1641
1642 default_global_job_limit = 15000
1643
1644 default_backend_args = {"queue": "s"}
1645
1646 def __init__(self, *, backend_args=None):
1647 """
1648 """
1649 super().__init__(backend_args=backend_args)
1650
1651 def _add_batch_directives(self, job, batch_file):
1652 """
1653 Adds LSF BSUB directives for the job to a script.
1654 """
1655 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1656 batch_queue = job_backend_args["queue"]
1657 print("#!/bin/bash", file=batch_file)
1658 print("# --- Start LSF ---", file=batch_file)
1659 print(" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1660 print(" ".join([LSF.cmd_name, job.name]), file=batch_file)
1661 print(" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1662 print(" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1663 print(" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1664 print("# --- End LSF ---", file=batch_file)
1665
1666 def _create_cmd(self, script_path):
1667 """
1668 """
1669 submission_cmd = self.submission_cmds[:]
1670 submission_cmd.append(script_path.as_posix())
1671 submission_cmd = " ".join(submission_cmd)
1672 return [submission_cmd]
1673
1674 @classmethod
1675 def _submit_to_batch(cls, cmd):
1676 """
1677 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1678 """
1679 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1680 return sub_out
1681
1683 """
1684 Simple class to help monitor status of jobs submitted by LSF Backend.
1685
1686 You pass in a `Job` object and job id from a bsub command.
1687 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1688 """
1689
1690
1691 backend_code_to_status = {"RUN": "running",
1692 "DONE": "completed",
1693 "FINISHED": "completed",
1694 "EXIT": "failed",
1695 "PEND": "submitted"
1696 }
1697
1698 def __init__(self, job, job_id):
1699 """
1700 Pass in the job object and the job id to allow the result to do monitoring and perform
1701 post processing of the job.
1702 """
1703 super().__init__(job)
1704
1705 self.job_id = job_id
1706
1707 def update_status(self):
1708 """
1709 Update the job's (or subjobs') status by calling bjobs.
1710 """
1711 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
1712 # Get all jobs info and reuse it for each status update to minimise tie spent on this updating.
1713 bjobs_output = LSF.bjobs(output_fields=["stat", "id"])
1714 if self.job.subjobs:
1715 for subjob in self.job.subjobs.values():
1716 subjob.result._update_result_status(bjobs_output)
1717 else:
1718 self._update_result_status(bjobs_output)
1719
1720 def _update_result_status(self, bjobs_output):
1721 """
1722 Parameters:
1723 bjobs_output (dict): The JSON output of a previous call to bjobs which we can reuse to find the
1724 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1725 'id' information, otherwise it is useless.
1726
1727 """
1728 try:
1729 backend_status = self._get_status_from_output(bjobs_output)
1730 except KeyError:
1731 # If this happens then maybe the job id wasn't in the bjobs_output argument because it finished.
1732 # Instead of failing immediately we try re-running bjobs here explicitly and then fail if it still isn't there.
1733 bjobs_output = LSF.bjobs(output_fields=["stat", "id"], job_id=str(self.job_id))
1734 try:
1735 backend_status = self._get_status_from_output(bjobs_output)
1736 except KeyError:
1737 # If this happened, maybe we're looking at an old finished job. We could fall back to bhist, but it's
1738 # slow and terrible. Instead let's try looking for the exit code file.
1739 try:
1740 exit_code = self.get_exit_code_from_file()
1741 except FileNotFoundError:
1742 waiting_time = datetime.now() - self.exit_code_file_initial_time
1743 if self.time_to_wait_for_exit_code_file > waiting_time:
1744 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1745 exit_code = 1
1746 else:
1747 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1748 return
1749 if exit_code:
1750 backend_status = "EXIT"
1751 else:
1752 backend_status = "FINISHED"
1753 try:
1754 new_job_status = self.backend_code_to_status[backend_status]
1755 except KeyError as err:
1756 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1757
1758 if new_job_status != self.job.status:
1759 self.job.status = new_job_status
1760
1761 def _get_status_from_output(self, output):
1762 """
1763 Get status from output
1764 """
1765 if output["JOBS"] and "ERROR" in output["JOBS"][0]:
1766 if output["JOBS"][0]["ERROR"] == f"Job <{self.job_id}> is not found":
1767 raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1768 else:
1769 raise BackendError(f"Unidentified Error during status check for {self.job}: {output}")
1770 else:
1771 for job_info in output["JOBS"]:
1772 if job_info["JOBID"] == self.job_id:
1773 return job_info["STAT"]
1774 else:
1775 raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1776
1777 @classmethod
1779 parent.result = cls.LSFResult(parent, None)
1780
1781 @classmethod
1782 def _create_job_result(cls, job, batch_output):
1783 """
1784 """
1785 m = re.search(r"Job <(\d+)>", str(batch_output))
1786 if m:
1787 job_id = m.group(1)
1788 else:
1789 raise BackendError(f"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1790
1791 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1792 job.result = cls.LSFResult(job, job_id)
1793
1794 def can_submit(self, njobs=1):
1795 """
1796 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1797 Returns True if the number is lower that the limit, False if it is higher.
1798
1799 Parameters:
1800 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1801 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1802 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1803 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1804 and check again before submitting more.
1805 """
1806 B2DEBUG(29, "Calling LSF().can_submit()")
1807 job_info = self.bjobs(output_fields=["stat"])
1808 total_jobs = job_info["NJOBS"]
1809 B2INFO(f"Total jobs active in the LSF system is currently {total_jobs}")
1810 if (total_jobs + njobs) > self.global_job_limit:
1811 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1812 return False
1813 else:
1814 B2INFO("There is enough space to submit more jobs.")
1815 return True
1816
1817 @classmethod
1818 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1819 """
1820 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1821 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1822
1823 Parameters:
1824 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1825 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1826 the output of this function will be only information about this job. If this argument is not given, then all jobs
1827 matching the other filters will be returned.
1828 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1829 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1830 receive job information from all known user jobs matching the other filters.
1831 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1832
1833 Returns:
1834 dict: JSON dictionary of the form:
1835
1836 .. code-block:: python
1837
1838 {
1839 "NJOBS":<njobs returned by command>,
1840 "JOBS":[
1841 {
1842 <output field: value>, ...
1843 }, ...
1844 ]
1845 }
1846 """
1847 B2DEBUG(29, f"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1848 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1849 if not output_fields:
1850 output_fields = ["id"]
1851 # Output fields should be space separated but in a string.
1852 field_list_cmd = "\""
1853 field_list_cmd += " ".join(output_fields)
1854 field_list_cmd += "\""
1855 cmd_list = ["bjobs", "-o", field_list_cmd]
1856 # If the queue name is set then we add to the command options
1857 if queue:
1858 cmd_list.extend(["-q", queue])
1859 # If the username is set then we add to the command options
1860 if username:
1861 cmd_list.extend(["-u", username])
1862 # Can now add the json option before the final positional argument (if used)
1863 cmd_list.append("-json")
1864 # If the job id is set then we add to the end of the command
1865 if job_id:
1866 cmd_list.append(job_id)
1867 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1868 cmd = " ".join(cmd_list)
1869 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1870 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True))
1871 output["NJOBS"] = output["JOBS"]
1872 output["JOBS"] = output["RECORDS"]
1873 del output["RECORDS"]
1874 del output["COMMAND"]
1875 return output
1876
1877 @classmethod
1878 def bqueues(cls, output_fields=None, queues=None):
1879 """
1880 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1881 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1882
1883 Parameters:
1884 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1885 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1886 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1887 By default you will receive information about all queues.
1888
1889 Returns:
1890 dict: JSON dictionary of the form:
1891
1892 .. code-block:: python
1893
1894 {
1895 "COMMAND":"bqueues",
1896 "QUEUES":46,
1897 "RECORDS":[
1898 {
1899 "QUEUE_NAME":"b2_beast",
1900 "STATUS":"Open:Active",
1901 "MAX":"200",
1902 "NJOBS":"0",
1903 "PEND":"0",
1904 "RUN":"0"
1905 }, ...
1906 }
1907 """
1908 B2DEBUG(29, f"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1909 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1910 if not output_fields:
1911 output_fields = ["queue_name", "status", "max", "njobs", "pend", "run"]
1912 # Output fields should be space separated but in a string.
1913 field_list_cmd = "\""
1914 field_list_cmd += " ".join(output_fields)
1915 field_list_cmd += "\""
1916 cmd_list = ["bqueues", "-o", field_list_cmd]
1917 # Can now add the json option before the final positional argument (if used)
1918 cmd_list.append("-json")
1919 # If the queue name is set then we add to the end of the command
1920 if queues:
1921 cmd_list.extend(queues)
1922 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1923 cmd = " ".join(cmd_list)
1924 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1925 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1926 return decode_json_string(output)
1927
1928
1930 """
1931 Backend for submitting calibration processes to a HTCondor batch system.
1932 """
1933
1934 batch_submit_script = "submit.sub"
1935
1936 submission_cmds = ["condor_submit", "-terse"]
1937
1938 default_global_job_limit = 10000
1939
1940 default_backend_args = {
1941 "universe": "vanilla",
1942 "getenv": "false",
1943 "request_memory": "4 GB", # We set the default requested memory to 4 GB to maintain parity with KEKCC
1944 "path_prefix": "", # Path prefix for file path
1945 "extra_lines": [] # These should be other HTCondor submit script lines like 'request_cpus = 2'
1946 }
1947
1948 default_class_ads = ["GlobalJobId", "JobStatus", "Owner"]
1949
1950 def _make_submit_file(self, job, submit_file_path):
1951 """
1952 Fill HTCondor submission file.
1953 """
1954 # Find all files/directories in the working directory to copy on the worker node
1955
1956 files_to_transfer = [i.as_posix() for i in job.working_dir.iterdir()]
1957
1958 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1959
1960 with open(submit_file_path, "w") as submit_file:
1961 print(f'executable = {self.get_submit_script_path(job)}', file=submit_file)
1962 print(f'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1963 print(f'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1964 print(f'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1965 print('transfer_input_files = ', ','.join(files_to_transfer), file=submit_file)
1966 print(f'universe = {job_backend_args["universe"]}', file=submit_file)
1967 print(f'getenv = {job_backend_args["getenv"]}', file=submit_file)
1968 print(f'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1969 print('should_transfer_files = Yes', file=submit_file)
1970 print('when_to_transfer_output = ON_EXIT', file=submit_file)
1971 # Any other lines in the backend args that we don't deal with explicitly but maybe someone wants to insert something
1972 for line in job_backend_args["extra_lines"]:
1973 print(line, file=submit_file)
1974 print('queue', file=submit_file)
1975
1976 def _add_batch_directives(self, job, batch_file):
1977 """
1978 For HTCondor leave empty as the directives are already included in the submit file.
1979 """
1980 print('#!/bin/bash', file=batch_file)
1981
1982 def _create_cmd(self, script_path):
1983 """
1984 """
1985 submission_cmd = self.submission_cmds[:]
1986 submission_cmd.append(script_path.as_posix())
1987 return submission_cmd
1988
1990 """
1991 Construct the Path object of the .sub file that we will use to describe the job.
1992 """
1993 return Path(job.working_dir, self.batch_submit_script)
1994
1995 @classmethod
1996 def _submit_to_batch(cls, cmd):
1997 """
1998 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1999 """
2000 job_dir = Path(cmd[-1]).parent.as_posix()
2001 sub_out = ""
2002 attempt = 0
2003 sleep_time = 30
2004
2005 while attempt < 3:
2006 try:
2007 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, cwd=job_dir)
2008 break
2009 except subprocess.CalledProcessError as e:
2010 attempt += 1
2011 if attempt == 3:
2012 B2ERROR(f"Error during condor_submit: {str(e)} occurred more than 3 times.")
2013 raise e
2014 else:
2015 B2ERROR(f"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
2016 time.sleep(30)
2017 return re.search(r"(\d+\.\d+) - \d+\.\d+", sub_out).groups()[0]
2018
2020 """
2021 Simple class to help monitor status of jobs submitted by HTCondor Backend.
2022
2023 You pass in a `Job` object and job id from a condor_submit command.
2024 When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
2025 to see whether or not the job has finished.
2026 """
2027
2028
2029 backend_code_to_status = {0: "submitted",
2030 1: "submitted",
2031 2: "running",
2032 3: "failed",
2033 4: "completed",
2034 5: "submitted",
2035 6: "failed"
2036 }
2037
2038 def __init__(self, job, job_id):
2039 """
2040 Pass in the job object and the job id to allow the result to do monitoring and perform
2041 post processing of the job.
2042 """
2043 super().__init__(job)
2044
2045 self.job_id = job_id
2046
2047 def update_status(self):
2048 """
2049 Update the job's (or subjobs') status by calling condor_q.
2050 """
2051 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
2052 # Get all jobs info and reuse it for each status update to minimise tie spent on this updating.
2053 condor_q_output = HTCondor.condor_q()
2054 if self.job.subjobs:
2055 for subjob in self.job.subjobs.values():
2056 subjob.result._update_result_status(condor_q_output)
2057 else:
2058 self._update_result_status(condor_q_output)
2059
2060 def _update_result_status(self, condor_q_output):
2061 """
2062 In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2063 If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2064 ``condor_history``, etc.
2065
2066 Parameters:
2067 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can reuse to find the
2068 status of this job if it was active when that command ran.
2069 """
2070 B2DEBUG(29, f"Calling {self.job}.result._update_result_status()")
2071 jobs_info = []
2072 for job_record in condor_q_output["JOBS"]:
2073 job_id = job_record["GlobalJobId"].split("#")[1]
2074 if job_id == self.job_id:
2075 B2DEBUG(29, f"Found {self.job_id} in condor_q_output.")
2076 jobs_info.append(job_record)
2077
2078 # Let's look for the exit code file where we expect it
2079 if not jobs_info:
2080 try:
2081 exit_code = self.get_exit_code_from_file()
2082 except FileNotFoundError:
2083 waiting_time = datetime.now() - self.exit_code_file_initial_time
2084 if self.time_to_wait_for_exit_code_file > waiting_time:
2085 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2086 exit_code = 1
2087 else:
2088 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
2089 return
2090 if exit_code:
2091 jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2092 else:
2093 jobs_info = [{"JobStatus": 4, "HoldReason": None}] # Set to completed
2094
2095 # If this job wasn't in the passed in condor_q output, let's try our own with the specific job_id
2096 if not jobs_info:
2097 jobs_info = HTCondor.condor_q(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2098
2099 # If no job information is returned then the job already left the queue
2100 # check in the history to see if it succeeded or failed
2101 if not jobs_info:
2102 try:
2103 jobs_info = HTCondor.condor_history(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2104 except KeyError:
2105 hold_reason = "No Reason Known"
2106
2107 # Still no record of it after waiting for the exit code file?
2108 if not jobs_info:
2109 jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2110
2111 job_info = jobs_info[0]
2112 backend_status = job_info["JobStatus"]
2113 # if job is held (backend_status = 5) then report why then keep waiting
2114 if backend_status == 5:
2115 hold_reason = job_info.get("HoldReason", None)
2116 B2WARNING(f"{self.job} on hold because of {hold_reason}. Keep waiting.")
2117 backend_status = 2
2118 try:
2119 new_job_status = self.backend_code_to_status[backend_status]
2120 except KeyError as err:
2121 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
2122 if new_job_status != self.job.status:
2123 self.job.status = new_job_status
2124
2125 @classmethod
2126 def _create_job_result(cls, job, job_id):
2127 """
2128 """
2129 B2INFO(f"Job ID of {job} recorded as: {job_id}")
2130 job.result = cls.HTCondorResult(job, job_id)
2131
2132 @classmethod
2134 parent.result = cls.HTCondorResult(parent, None)
2135
2136 def can_submit(self, njobs=1):
2137 """
2138 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2139 Returns True if the number is lower that the limit, False if it is higher.
2140
2141 Parameters:
2142 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2143 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2144 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2145 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2146 and check again before submitting more.
2147 """
2148 B2DEBUG(29, "Calling HTCondor().can_submit()")
2149 jobs_info = self.condor_q()
2150 total_jobs = jobs_info["NJOBS"]
2151 B2INFO(f"Total jobs active in the HTCondor system is currently {total_jobs}")
2152 if (total_jobs + njobs) > self.global_job_limit:
2153 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2154 return False
2155 else:
2156 B2INFO("There is enough space to submit more jobs.")
2157 return True
2158
2159 @classmethod
2160 def condor_q(cls, class_ads=None, job_id="", username=""):
2161 """
2162 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2163 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2164 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2165
2166 Parameters:
2167 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2168 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2169 by the condor_q call.
2170 job_id (str): String representation of the Job ID given by condor_submit during submission.
2171 If this argument is given then the output of this function will be only information about this job.
2172 If this argument is not given, then all jobs matching the other filters will be returned.
2173 username (str): By default we return information about only the current user's jobs. By giving
2174 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2175 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2176 so it isn't recommended.
2177
2178 Returns:
2179 dict: JSON dictionary of the form:
2180
2181 .. code-block:: python
2182
2183 {
2184 "NJOBS":<number of records returned by command>,
2185 "JOBS":[
2186 {
2187 <ClassAd: value>, ...
2188 }, ...
2189 ]
2190 }
2191 """
2192 B2DEBUG(29, f"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2193 if not class_ads:
2194 class_ads = cls.default_class_ads
2195 # Output fields should be comma separated.
2196 field_list_cmd = ",".join(class_ads)
2197 cmd_list = ["condor_q", "-json", "-attributes", field_list_cmd]
2198 # If job_id is set then we ignore all other filters
2199 if job_id:
2200 cmd_list.append(job_id)
2201 else:
2202 if not username:
2203 username = os.environ["USER"]
2204 # If the username is set to all it is a special case
2205 if username == "all":
2206 cmd_list.append("-allusers")
2207 else:
2208 cmd_list.append(username)
2209 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2210 cmd = " ".join(cmd_list)
2211 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2212 # condor_q occasionally fails
2213 try:
2214 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2215 except BaseException:
2216 records = None
2217
2218 if records:
2219 records = decode_json_string(records)
2220 else:
2221 records = []
2222 jobs_info = {"JOBS": records}
2223 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2224 return jobs_info
2225
2226 @classmethod
2227 def condor_history(cls, class_ads=None, job_id="", username=""):
2228 """
2229 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2230 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2231 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2232
2233 Parameters:
2234 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2235 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2236 by the condor_q call.
2237 job_id (str): String representation of the Job ID given by condor_submit during submission.
2238 If this argument is given then the output of this function will be only information about this job.
2239 If this argument is not given, then all jobs matching the other filters will be returned.
2240 username (str): By default we return information about only the current user's jobs. By giving
2241 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2242 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2243 and isn't recommended.
2244
2245 Returns:
2246 dict: JSON dictionary of the form:
2247
2248 .. code-block:: python
2249
2250 {
2251 "NJOBS":<number of records returned by command>,
2252 "JOBS":[
2253 {
2254 <ClassAd: value>, ...
2255 }, ...
2256 ]
2257 }
2258 """
2259 B2DEBUG(29, f"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2260 if not class_ads:
2261 class_ads = cls.default_class_ads
2262 # Output fields should be comma separated.
2263 field_list_cmd = ",".join(class_ads)
2264 cmd_list = ["condor_history", "-json", "-attributes", field_list_cmd]
2265 # If job_id is set then we ignore all other filters
2266 if job_id:
2267 cmd_list.append(job_id)
2268 else:
2269 if not username:
2270 username = os.environ["USER"]
2271 # If the username is set to all it is a special case
2272 if username != "all":
2273 cmd_list.append(username)
2274 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2275 cmd = " ".join(cmd_list)
2276 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2277 try:
2278 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2279 except BaseException:
2280 records = None
2281
2282 if records:
2283 records = decode_json_string(records)
2284 else:
2285 records = []
2286
2287 jobs_info = {"JOBS": records}
2288 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2289 return jobs_info
2290
2291
2293 """
2294 Backend for submitting calibration processes to the grid.
2295 """
2296
2297
2298class BackendError(Exception):
2299 """
2300 Base exception class for Backend classes.
2301 """
2302
2303
2304class JobError(Exception):
2305 """
2306 Base exception class for Job objects.
2307 """
2308
2309
2310class SplitterError(Exception):
2311 """
2312 Base exception class for SubjobSplitter objects.
2313 """
generator_function
Generator function that has not been 'primed'.
Definition backends.py:105
args
Positional argument tuple used to 'prime' the ArgumentsGenerator.generator_function.
Definition backends.py:107
kwargs
Keyword argument dictionary used to 'prime' the ArgumentsGenerator.generator_function.
Definition backends.py:109
__init__(self, generator_function, *args, **kwargs)
Definition backends.py:95
max_subjobs
If we try to create more than this many subjobs we throw an exception, if None then there is no maxim...
Definition backends.py:309
__init__(self, *, arguments_generator=None, max_subjobs=None)
Definition backends.py:304
_create_parent_job_result(cls, parent)
Definition backends.py:852
dict backend_args
The backend args that will be applied to jobs unless the job specifies them itself.
Definition backends.py:797
_add_wrapper_script_teardown(self, job, batch_file)
Definition backends.py:839
_add_wrapper_script_setup(self, job, batch_file)
Definition backends.py:814
dict default_backend_args
Default backend_args.
Definition backends.py:789
submit(self, job)
Definition backends.py:800
str submit_script
Default submission script name.
Definition backends.py:785
get_submit_script_path(self, job)
Definition backends.py:860
__init__(self, *, backend_args=None)
Definition backends.py:791
_add_setup(job, batch_file)
Definition backends.py:807
int default_global_job_limit
Default global limit on the total number of submitted/running jobs that the user can have.
Definition backends.py:1156
_submit_to_batch(cls, cmd)
Definition backends.py:1189
get_batch_submit_script_path(self, job)
Definition backends.py:1336
int sleep_between_submission_checks
Seconds we wait before checking if we can submit a list of jobs.
Definition backends.py:1170
list submission_cmds
Shell command to submit a script, should be implemented in the derived class.
Definition backends.py:1143
_make_submit_file(self, job, submit_file_path)
Definition backends.py:1180
int global_job_limit
The active job limit.
Definition backends.py:1167
submit(self, job, check_can_submit=True, jobs_per_check=100)
Definition backends.py:1205
can_submit(self, *args, **kwargs)
Definition backends.py:1194
int default_sleep_between_submission_checks
Default time betweeon re-checking if the active jobs is below the global job limit.
Definition backends.py:1158
_add_batch_directives(self, job, file)
Definition backends.py:1172
__init__(self, *, backend_args=None)
Definition backends.py:1160
_create_cmd(self, job)
Definition backends.py:1352
_create_job_result(cls, job, batch_output)
Definition backends.py:1347
_update_result_status(self, condor_q_output)
Definition backends.py:2060
job_id
job id given by HTCondor
Definition backends.py:2045
__init__(self, job, job_id)
Definition backends.py:2038
dict backend_code_to_status
HTCondor statuses mapped to Job statuses.
Definition backends.py:2029
_create_parent_job_result(cls, parent)
Definition backends.py:2133
_submit_to_batch(cls, cmd)
Definition backends.py:1996
get_batch_submit_script_path(self, job)
Definition backends.py:1989
_create_cmd(self, script_path)
Definition backends.py:1982
_add_batch_directives(self, job, batch_file)
Definition backends.py:1976
can_submit(self, njobs=1)
Definition backends.py:2136
_make_submit_file(self, job, submit_file_path)
Definition backends.py:1950
_create_job_result(cls, job, job_id)
Definition backends.py:2126
condor_q(cls, class_ads=None, job_id="", username="")
Definition backends.py:2160
condor_history(cls, class_ads=None, job_id="", username="")
Definition backends.py:2227
list default_class_ads
Default ClassAd attributes to return from commands like condor_q.
Definition backends.py:1948
str batch_submit_script
HTCondor batch script (different to the wrapper script of Backend.submit_script)
Definition backends.py:1934
status
Not a real attribute, it's a property.
Definition backends.py:460
_get_overall_status_from_subjobs(self)
Definition backends.py:463
check_input_data_files(self)
Definition backends.py:598
list input_sandbox_files
Files to be copied directly into the working directory (pathlib.Path).
Definition backends.py:366
list input_files
Input files to job (str), a list of these is copied to the working directory.
Definition backends.py:378
dump_input_data(self)
Definition backends.py:579
dict backend_args
Config dictionary for the backend to use when submitting the job.
Definition backends.py:383
full_command(self)
Definition backends.py:628
__init__(self, name, job_dict=None)
Definition backends.py:357
working_dir
Working directory of the job (pathlib.Path).
Definition backends.py:368
dict statuses
Allowed Job status dictionary.
Definition backends.py:352
update_status(self)
Definition backends.py:424
append_current_basf2_setup_cmds(self)
Definition backends.py:640
__repr__(self)
Definition backends.py:406
dump_to_json(self, file_path)
Definition backends.py:538
list args
The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own ar...
Definition backends.py:376
list setup_cmds
Bash commands to run before the main self.cmd (mainly used for batch system setup)
Definition backends.py:380
str _status
The actual status of the overall Job.
Definition backends.py:404
list cmd
Command and arguments as a list that will be run by the job on the backend.
Definition backends.py:374
result
The result object of this Job.
Definition backends.py:402
list output_patterns
Files that we produce during the job and want to be returned.
Definition backends.py:372
create_subjob(self, i, input_files=None, args=None)
Definition backends.py:435
dict subjobs
dict of subjobs assigned to this job
Definition backends.py:385
splitter
The SubjobSplitter used to create subjobs if necessary.
Definition backends.py:363
name
Job object's name.
Definition backends.py:361
copy_input_sandbox_files_to_working_dir(self)
Definition backends.py:587
ready(self)
Definition backends.py:412
from_json(cls, file_path)
Definition backends.py:551
output_dir
Output directory (pathlib.Path), where we will download our output_files to.
Definition backends.py:370
job_dict(self)
Definition backends.py:559
job_id
job id given by LSF
Definition backends.py:1705
_get_status_from_output(self, output)
Definition backends.py:1761
_update_result_status(self, bjobs_output)
Definition backends.py:1720
__init__(self, job, job_id)
Definition backends.py:1698
dict backend_code_to_status
LSF statuses mapped to Job statuses.
Definition backends.py:1691
_create_parent_job_result(cls, parent)
Definition backends.py:1778
_submit_to_batch(cls, cmd)
Definition backends.py:1675
_create_cmd(self, script_path)
Definition backends.py:1666
_add_batch_directives(self, job, batch_file)
Definition backends.py:1651
can_submit(self, njobs=1)
Definition backends.py:1794
bqueues(cls, output_fields=None, queues=None)
Definition backends.py:1878
__init__(self, *, backend_args=None)
Definition backends.py:1646
_create_job_result(cls, job, batch_output)
Definition backends.py:1782
bjobs(cls, output_fields=None, job_id="", username="", queue="")
Definition backends.py:1818
__init__(self, job, result)
Definition backends.py:954
result
The underlying result from the backend.
Definition backends.py:961
_create_parent_job_result(cls, parent)
Definition backends.py:1133
pool
The actual Pool object of this instance of the Backend.
Definition backends.py:945
_max_processes
Internal attribute of max_processes.
Definition backends.py:1009
submit(self, job)
Definition backends.py:1017
max_processes
The size of the multiprocessing process pool.
Definition backends.py:947
_(self, job)
Definition backends.py:1024
__init__(self, *, backend_args=None, max_processes=1)
Definition backends.py:940
run_job(name, working_dir, output_dir, script)
Definition backends.py:1107
__init__(self, *, arguments_generator=None, max_files_per_subjob=1)
Definition backends.py:216
max_files_per_subjob
The maximum number of input files that will be used for each SubJob created.
Definition backends.py:223
__init__(self, *, arguments_generator=None, max_subjobs=1000)
Definition backends.py:248
max_subjobs
The maximum number of SubJob objects to be created, input files are split evenly between them.
Definition backends.py:255
job_id
job id given by PBS
Definition backends.py:1452
_get_status_from_output(self, output)
Definition backends.py:1504
__init__(self, job, job_id)
Definition backends.py:1445
_update_result_status(self, qstat_output)
Definition backends.py:1467
dict backend_code_to_status
PBS statuses mapped to Job statuses.
Definition backends.py:1434
create_job_record_from_element(job_elem)
Definition backends.py:1606
_create_parent_job_result(cls, parent)
Definition backends.py:1422
_submit_to_batch(cls, cmd)
Definition backends.py:1414
_create_cmd(self, script_path)
Definition backends.py:1406
_add_batch_directives(self, job, batch_file)
Definition backends.py:1383
can_submit(self, njobs=1)
Definition backends.py:1514
qstat(cls, username="", job_ids=None)
Definition backends.py:1538
__init__(self, *, backend_args=None)
Definition backends.py:1378
_create_job_result(cls, job, batch_output)
Definition backends.py:1399
update_status(self)
Definition backends.py:902
job
Job object for result.
Definition backends.py:879
exit_code_file_initial_time
Time we started waiting for the exit code file to appear.
Definition backends.py:886
bool _is_ready
Quicker way to know if it's ready once it has already been found.
Definition backends.py:881
time_to_wait_for_exit_code_file
After our first attempt to view the exit code file once the job is 'finished', how long should we wai...
Definition backends.py:884
__init__(self, job)
Definition backends.py:874
get_exit_code_from_file(self)
Definition backends.py:909
__init__(self, job, subjob_id, input_files=None)
Definition backends.py:685
parent
Job() instance of parent to this SubJob.
Definition backends.py:691
__getattr__(self, attribute)
Definition backends.py:760
id
Id of Subjob.
Definition backends.py:689
assign_arguments(self, job)
Definition backends.py:179
__init__(self, *, arguments_generator=None)
Definition backends.py:166
arguments_generator
The ArgumentsGenerator used when creating subjobs.
Definition backends.py:171
create_subjobs(self, job)
Definition backends.py:174
STL class.