Belle II Software development
backends.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
15import re
16import os
17from abc import ABC, abstractmethod
18import json
19import xml.etree.ElementTree as ET
20from math import ceil
21from pathlib import Path
22from collections import deque
23from itertools import count, takewhile
24import shutil
25import time
26from datetime import datetime, timedelta
27import subprocess
28import multiprocessing as mp
29
30from caf.utils import method_dispatch
31from caf.utils import decode_json_string
32from caf.utils import grouper
33from caf.utils import parse_file_uri
34
35
36__all__ = ["Job", "SubJob", "Backend", "Local", "Batch", "LSF", "PBS", "HTCondor", "get_input_data"]
37
38
39_input_data_file_path = Path("__BACKEND_INPUT_FILES__.json")
40
41_STDOUT_FILE = "stdout"
42
43_STDERR_FILE = "stderr"
44
45_backend_job_envvars = (
46 "TMPDIR",
47 "BELLE2_CONFIG_DIR",
48 "VO_BELLE2_SW_DIR",
49 "BELLE2_EXTERNALS_TOPDIR",
50 "BELLE2_CONDB_METADATA",
51 "BELLE2_CONDB_PROXY",
52)
53
54
55def get_input_data():
56 """
57 Simple JSON load of the default input data file. Will contain a list of string file paths
58 for use by the job process.
59 """
60 with open(_input_data_file_path) as input_data_file:
61 input_data = json.load(input_data_file)
62 return input_data
63
64
65def monitor_jobs(args, jobs):
66 unfinished_jobs = jobs[:]
67 failed_jobs = 0
68 while unfinished_jobs:
69 B2INFO("Updating statuses of unfinished jobs...")
70 for j in unfinished_jobs:
71 j.update_status()
72 B2INFO("Checking if jobs are ready...")
73 for j in unfinished_jobs[:]:
74 if j.ready():
75 if j.status == "failed":
76 B2ERROR(f"{j} is failed")
77 failed_jobs += 1
78 else:
79 B2INFO(f"{j} is finished")
80 unfinished_jobs.remove(j)
81 if unfinished_jobs:
82 B2INFO(f"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
83 time.sleep(args.heartbeat)
84 if failed_jobs > 0:
85 B2ERROR(f"{failed_jobs} jobs failed")
86 else:
87 B2INFO('All jobs finished successfully')
88
89
90class ArgumentsGenerator():
91 def __init__(self, generator_function, *args, **kwargs):
92 """
93 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
94 initialise it. This lets us re-use a generator by setting it up again fresh. This is not
95 optimal for expensive calculations, but it is nice for making large sequences of
96 Job input arguments on the fly.
97
98 Parameters:
99 generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
100 should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)`` yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
101 args (tuple): The positional arguments you want to send into the intialisation of the generator.
102 kwargs (dict): The keyword arguments you want to send into the intialisation of the generator.
103 """
104
105 self.generator_function = generator_function
106
107 self.args = args
108
109 self.kwargs = kwargs
110
111 @property
112 def generator(self):
113 """
114 Returns:
115 generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
116 to have ``next``/``send`` called on it.
117 """
118 gen = self.generator_function(*self.args, **self.kwargs)
119 gen.send(None) # Prime it
120 return gen
121
122
123def range_arguments(start=0, stop=None, step=1):
124 """
125 A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
126 It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
127 The `SubJob` object is sent into this function with `send` but is not used.
128
129 Parameters:
130 start (int): The starting value that will be returned.
131 stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
132 forever.
133 step (int): The step size.
134
135 Returns:
136 tuple
137 """
138 def should_stop(x):
139 if stop is not None and x >= stop:
140 return True
141 else:
142 return False
143 # The logic of send/yield is tricky. Basically the first yield must have been hit before send(subjob) can work.
144 # However the first yield is also the one that receives the send(subjob) NOT the send(None).
145 subjob = (yield None) # Handle the send(None) and first send(subjob)
146 args = None
147 for i in takewhile(lambda x: not should_stop(x), count(start, step)):
148 args = (i,) # We could have used the subjob object here
149 B2DEBUG(29, f"{subjob} arguments will be {args}")
150 subjob = (yield args) # Set up ready for the next loop (if it happens)
151
152
153class SubjobSplitter(ABC):
154 """
155 Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
156 The `create_subjobs` function should be implemented and used to construct
157 the subjobs of the parent job object.
158
159 Parameters:
160 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
161 tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
162 called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
163 arguments to return.
164 """
165
166 def __init__(self, *, arguments_generator=None):
167 """
168 Derived classes should call `super` to run this.
169 """
170
171 self.arguments_generator = arguments_generator
172
173 @abstractmethod
174 def create_subjobs(self, job):
175 """
176 Implement this method in derived classes to generate the `SubJob` objects.
177 """
178
179 def assign_arguments(self, job):
180 """
181 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
182 subjobs.
183 """
184 if self.arguments_generator:
185 arg_gen = self.arguments_generator.generator
186 generating = True
187 for subjob in sorted(job.subjobs.values(), key=lambda sj: sj.id):
188 if generating:
189 try:
190 args = arg_gen.send(subjob)
191 except StopIteration:
192 B2ERROR(f"StopIteration called when getting args for {subjob}, "
193 "setting all subsequent subjobs to have empty argument tuples.")
194 args = tuple() # If our generator finishes before the subjobs we use empty argument tuples
195 generating = False
196 else:
197 args = tuple()
198 B2DEBUG(29, f"Arguments for {subjob}: {args}")
199 subjob.args = args
200 return
201
202 B2INFO(f"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
203 "won't automatically have arguments assigned.")
204
205 def __repr__(self):
206 return f"{self.__class__.__name__}"
207
208
209class MaxFilesSplitter(SubjobSplitter):
210
211 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
212 """
213 Parameters:
214 max_files_per_subjob (int): The maximium number of input files used per `SubJob` created.
215 """
216 super().__init__(arguments_generator=arguments_generator)
217
218 self.max_files_per_subjob = max_files_per_subjob
219
220 def create_subjobs(self, job):
221 """
222 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
223 in order to prevent the number of input files per subjob going over the limit set by
224 `MaxFilesSplitter.max_files_per_subjob`.
225 """
226 if not job.input_files:
227 B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
228 return
229
230 for i, subjob_input_files in enumerate(grouper(self.max_files_per_subjob, job.input_files)):
231 job.create_subjob(i, input_files=subjob_input_files)
232
233 self.assign_arguments(job)
234
235 B2INFO(f"{self} created {i+1} Subjobs for {job}")
236
237
238class MaxSubjobsSplitter(SubjobSplitter):
239
240 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
241 """
242 Parameters:
243 max_subjobs (int): The maximium number ofsubjobs that will be created.
244 """
245 super().__init__(arguments_generator=arguments_generator)
246
247 self.max_subjobs = max_subjobs
248
249 def create_subjobs(self, job):
250 """
251 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
252 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
253 more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
254 respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
255 will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
256 """
257 if not job.input_files:
258 B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
259 return
260
261 # Since we will be popping from the left of the input file list, we construct a deque
262 remaining_input_files = deque(job.input_files)
263 # The initial number of available subjobs, will go down as we create them
264 available_subjobs = self.max_subjobs
265 subjob_i = 0
266 while remaining_input_files:
267 # How many files should we use for this subjob?
268 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
269 # Pop them from the remaining files
270 subjob_input_files = []
271 for i in range(num_input_files):
272 subjob_input_files.append(remaining_input_files.popleft())
273 # Create the actual subjob
274 job.create_subjob(subjob_i, input_files=subjob_input_files)
275 subjob_i += 1
276 available_subjobs -= 1
277
278 self.assign_arguments(job)
279 B2INFO(f"{self} created {subjob_i} Subjobs for {job}")
280
281
282class ArgumentsSplitter(SubjobSplitter):
283 """
284 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
285 Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
286 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
287
288 This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
289 assinged to every `SubJob`.
290
291 Parameters:
292 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
293 """
294
295 def __init__(self, *, arguments_generator=None, max_subjobs=None):
296 """
297 """
298 super().__init__(arguments_generator=arguments_generator)
299
300 self.max_subjobs = max_subjobs
301
302 def create_subjobs(self, job):
303 """
304 This function creates subjobs for the parent job passed in. It creates subjobs until the
305 `SubjobSplitter.arguments_generator` finishes.
306
307 If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
308 subjobs are created.
309 """
310 arg_gen = self.arguments_generator.generator
311 for i in count():
312 # Reached the maximum?
313 if i >= self.max_subjobs:
314 raise SplitterError(f"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
315 try:
316 subjob = SubJob(job, i, job.input_files) # We manually create it because we might not need it
317 args = arg_gen.send(subjob) # Might throw StopIteration
318 B2INFO(f"Creating {job}.{subjob}")
319 B2DEBUG(29, f"Arguments for {subjob}: {args}")
320 subjob.args = args
321 job.subjobs[i] = subjob
322 except StopIteration:
323 break
324 B2INFO(f"{self} created {i+1} Subjobs for {job}")
325
326
327class Job:
328 """
329 This generic Job object is used to tell a Backend what to do.
330 This object basically holds necessary information about a process you want to submit to a `Backend`.
331 It should *not* do anything that is backend specific, just hold the configuration for a job to be
332 successfully submitted and monitored using a backend. The result attribute is where backend
333 specific job monitoring goes.
334
335 Parameters:
336 name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
337
338 .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
339 """
340
341
343 statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
344
345
346 exit_statuses = ["failed", "completed"]
347
348 def __init__(self, name, job_dict=None):
349 """
350 """
351
352 self.name = name
353
354 self.splitter = None
355
356 if not job_dict:
357
359 self.input_sandbox_files = []
360
361 self.working_dir = Path()
362
363 self.output_dir = Path()
364
365 self.output_patterns = []
366
367 self.cmd = []
368
369 self.args = []
370
371 self.input_files = []
372
373 self.setup_cmds = []
374
376 self.backend_args = {}
377
378 self.subjobs = {}
379 elif job_dict:
380 self.input_sandbox_files = [Path(p) for p in job_dict["input_sandbox_files"]]
381 self.working_dir = Path(job_dict["working_dir"])
382 self.output_dir = Path(job_dict["output_dir"])
383 self.output_patterns = job_dict["output_patterns"]
384 self.cmd = job_dict["cmd"]
385 self.args = job_dict["args"]
386 self.input_files = job_dict["input_files"]
387 self.setup_cmds = job_dict["setup_cmds"]
388 self.backend_args = job_dict["backend_args"]
389 self.subjobs = {}
390 for subjob_dict in job_dict["subjobs"]:
391 self.create_subjob(subjob_dict["id"], input_files=subjob_dict["input_files"], args=subjob_dict["args"])
392
393
395 self.result = None
396
397 self._status = "init"
398
399 def __repr__(self):
400 """
401 Representation of Job class (what happens when you print a Job() instance).
402 """
403 return f"Job({self.name})"
404
405 def ready(self):
406 """
407 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
408 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
409 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
410 """
411 if not self.result:
412 B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
413 return False
414 else:
415 return self.result.ready()
416
417 def update_status(self):
418 """
419 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any) in the best way for the type of result object/backend.
420 """
421 if not self.result:
422 B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
423 else:
424 self.result.update_status()
425 return self.status
426
427 def create_subjob(self, i, input_files=None, args=None):
428 """
429 Creates a subjob Job object that references that parent Job.
430 Returns the SubJob object at the end.
431 """
432 if i not in self.subjobs:
433 B2INFO(f"Creating {self}.Subjob({i})")
434 subjob = SubJob(self, i, input_files)
435 if args:
436 subjob.args = args
437 self.subjobs[i] = subjob
438 return subjob
439 else:
440 B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
441
442 @property
443 def status(self):
444 """
445 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
446 subjob status in the hierarchy of statuses in `Job.statuses`.
447 """
448 if self.subjobs:
449 job_status = self._get_overall_status_from_subjobs()
450 if job_status != self._status:
451
452 self.status = job_status
453 return self._status
454
455 def _get_overall_status_from_subjobs(self):
456 subjob_statuses = [subjob.status for subjob in self.subjobs.values()]
457 status_level = min([self.statuses[status] for status in subjob_statuses])
458 for status, level in self.statuses.items():
459 if level == status_level:
460 return status
461
462 @status.setter
463 def status(self, status):
464 """
465 Sets the status of this Job.
466 """
467 # Print an error only if the job failed.
468 if status == 'failed':
469 B2ERROR(f"Setting {self.name} status to failed")
470 else:
471 B2INFO(f"Setting {self.name} status to {status}")
472 self._status = status
473
474 @property
475 def output_dir(self):
476 return self._output_dir
477
478 @output_dir.setter
479 def output_dir(self, value):
480 self._output_dir = Path(value).absolute()
481
482 @property
483 def working_dir(self):
484 return self._working_dir
485
486 @working_dir.setter
487 def working_dir(self, value):
488 self._working_dir = Path(value).absolute()
489
490 @property
491 def input_sandbox_files(self):
492 return self._input_sandbox_files
493
494 @input_sandbox_files.setter
495 def input_sandbox_files(self, value):
496 self._input_sandbox_files = [Path(p).absolute() for p in value]
497
498 @property
499 def input_files(self):
500 return self._input_files
501
502 @input_files.setter
503 def input_files(self, value):
504 self._input_files = value
505
506 @property
507 def max_subjobs(self):
508 return self.splitter.max_subjobs
509
510 @max_subjobs.setter
511 def max_subjobs(self, value):
512 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
513 B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
514
515 @property
516 def max_files_per_subjob(self):
517 return self.splitter.max_files_per_subjob
518
519 @max_files_per_subjob.setter
520 def max_files_per_subjob(self, value):
521 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
522 B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
523
524 def dump_to_json(self, file_path):
525 """
526 Dumps the Job object configuration to a JSON file so that it can be read in again later.
527
528 Parameters:
529 file_path(`basf2.Path`): The filepath we'll dump to """
530 with open(file_path, mode="w") as job_file:
531 json.dump(self.job_dict, job_file, indent=2)
532
533 @classmethod
534 def from_json(cls, file_path):
535 with open(file_path) as job_file:
536 job_dict = json.load(job_file)
537 return cls(job_dict["name"], job_dict=job_dict)
538
539 @property
540 def job_dict(self):
541 """
542 Returns:
543 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
544 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
545 """
546 job_dict = {}
547 job_dict["name"] = self.name
548 job_dict["input_sandbox_files"] = [i.as_posix() for i in self.input_sandbox_files]
549 job_dict["working_dir"] = self.working_dir.as_posix()
550 job_dict["output_dir"] = self.output_dir.as_posix()
551 job_dict["output_patterns"] = self.output_patterns
552 job_dict["cmd"] = self.cmd
553 job_dict["args"] = self.args
554 job_dict["input_files"] = self.input_files
555 job_dict["setup_cmds"] = self.setup_cmds
556 job_dict["backend_args"] = self.backend_args
557 job_dict["subjobs"] = [sj.job_dict for sj in self.subjobs.values()]
558 return job_dict
559
560 def dump_input_data(self):
561 """
562 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
563 string URI objects.
564 """
565 with open(Path(self.working_dir, _input_data_file_path), mode="w") as input_data_file:
566 json.dump(self.input_files, input_data_file, indent=2)
567
568 def copy_input_sandbox_files_to_working_dir(self):
569 """
570 Get all of the requested files for the input sandbox and copy them to the working directory.
571 Files like the submit.sh or input_data.json are not part of this process.
572 """
573 for file_path in self.input_sandbox_files:
574 if file_path.is_dir():
575 shutil.copytree(file_path, Path(self.working_dir, file_path.name))
576 else:
577 shutil.copy(file_path, self.working_dir)
578
579 def check_input_data_files(self):
580 """
581 Check the input files and make sure that there aren't any duplicates.
582 Also check if the files actually exist if possible.
583 """
584 existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
585 for file_path in self.input_files:
586 file_uri = parse_file_uri(file_path)
587 if file_uri.scheme == "file":
588 p = Path(file_uri.path)
589 if p.is_file():
590 if file_uri.geturl() not in existing_input_files:
591 existing_input_files.append(file_uri.geturl())
592 else:
593 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
594 else:
595 B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
596 else:
597 B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
598 if file_path not in existing_input_files:
599 existing_input_files.append(file_path)
600 else:
601 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
602 if self.input_files and not existing_input_files:
603 B2WARNING(f"No valid input file paths found for {self.name}, but some were requested.")
604
605 # Replace the Job's input files with the ones that exist + duplicates removed
606 self.input_files = existing_input_files
607
608 @property
609 def full_command(self):
610 """
611 Returns:
612 str: The full command that this job will run including any arguments.
613 """
614 all_components = self.cmd[:]
615 all_components.extend(self.args)
616 # We do a convert to string just in case arguments were generated as different types
617 full_command = " ".join(map(str, all_components))
618 B2DEBUG(29, f"Full command of {self} is '{full_command}'")
619 return full_command
620
621 def append_current_basf2_setup_cmds(self):
622 """
623 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
624 It should detect if you are using a local release or CVMFS and append the correct commands
625 so that the job will have the same basf2 release environment. It should also detect
626 if a local release is not compiled with the ``opt`` option.
627
628 Note that this *doesn't mean that every environment variable is inherited* from the submitting
629 process environment.
630 """
631 def append_environment_variable(cmds, envvar):
632 """
633 Append a command for setting an environment variable.
634 """
635 if envvar in os.environ:
636 cmds.append(f"""if [ -z "${{{envvar}}}" ]; then""")
637 cmds.append(f" export {envvar}={os.environ[envvar]}")
638 cmds.append("fi")
639
640 if "BELLE2_TOOLS" not in os.environ:
641 raise BackendError("No BELLE2_TOOLS found in environment")
642 # Export all the environment variables defined via _backend_job_envvars
643 for envvar in _backend_job_envvars:
644 append_environment_variable(self.setup_cmds, envvar)
645 if "BELLE2_RELEASE" in os.environ:
646 self.setup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
647 elif 'BELLE2_LOCAL_DIR' in os.environ:
648 self.setup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
649 self.setup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
650 self.setup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
651 self.setup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
652 self.setup_cmds.append("pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
653 self.setup_cmds.append("source $BACKEND_B2SETUP")
654 # b2code-option has to be executed only after the source of the tools.
655 self.setup_cmds.append("b2code-option $BACKEND_BELLE2_OPTION")
656 self.setup_cmds.append("popd > /dev/null")
657
658
659class SubJob(Job):
660 """
661 This mini-class simply holds basic information about which subjob you are
662 and a reference to the parent Job object to be able to access the main data there.
663 Rather than replicating all of the parent job's configuration again.
664 """
665
666 def __init__(self, job, subjob_id, input_files=None):
667 """
668 """
669
670 self.id = subjob_id
671
672 self.parent = job
673
674 if not input_files:
675 input_files = []
676 self.input_files = input_files
677
679 self.result = None
680
681 self._status = "init"
682
683 self.args = []
684
685 @property
686 def output_dir(self):
687 """
688 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
689 return Path(self.parent.output_dir, str(self.id))
690
691 @property
692 def working_dir(self):
693 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
694 return Path(self.parent.working_dir, str(self.id))
695
696 @property
697 def name(self):
698 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
699 return "_".join((self.parent.name, str(self.id)))
700
701 @property
702 def status(self):
703 """
704 Returns the status of this SubJob.
705 """
706 return self._status
707
708 @status.setter
709 def status(self, status):
710 """
711 Sets the status of this Job.
712 """
713 # Print an error only if the job failed.
714 if status == "failed":
715 B2ERROR(f"Setting {self.name} status to failed")
716 else:
717 B2INFO(f"Setting {self.name} status to {status}")
718 self._status = status
719
720 @property
721 def subjobs(self):
722 """
723 A subjob cannot have subjobs. Always return empty list.
724 """
725 return []
726
727 @property
728 def job_dict(self):
729 """
730 Returns:
731 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
732 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config we only output the input files and arguments that are specific to this subjob and no other details.
733 """
734 job_dict = {}
735 job_dict["id"] = self.id
736 job_dict["input_files"] = self.input_files
737 job_dict["args"] = self.args
738 return job_dict
739
740 def __getattr__(self, attribute):
741 """
742 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
743 unless otherwise specified.
744 """
745 return getattr(self.parent, attribute)
746
747 def __repr__(self):
748 """
749 """
750 return f"SubJob({self.name})"
751
752
753class Backend(ABC):
754 """
755 Abstract base class for a valid backend.
756 Classes derived from this will implement their own submission of basf2 jobs
757 to whatever backend they describe.
758 Some common methods/attributes go into this base class.
759
760 For backend_args the priority from lowest to highest is:
761
762 backend.default_backend_args -> backend.backend_args -> job.backend_args
763 """
764
765 submit_script = "submit.sh"
766
767 exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
768
769 default_backend_args = {}
770
771 def __init__(self, *, backend_args=None):
772 """
773 """
774 if backend_args is None:
775 backend_args = {}
776
777 self.backend_args = {**self.default_backend_args, **backend_args}
778
779 @abstractmethod
780 def submit(self, job):
781 """
782 Base method for submitting collection jobs to the backend type. This MUST be
783 implemented for a correctly written backend class deriving from Backend().
784 """
785
786 @staticmethod
787 def _add_setup(job, batch_file):
788 """
789 Adds setup lines to the shell script file.
790 """
791 for line in job.setup_cmds:
792 print(line, file=batch_file)
793
794 def _add_wrapper_script_setup(self, job, batch_file):
795 """
796 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
797 `trap` statements for Ctrl-C situations.
798 """
799 start_wrapper = f"""# ---
800# trap ctrl-c and call ctrl_c()
801trap '(ctrl_c 130)' SIGINT
802trap '(ctrl_c 143)' SIGTERM
803
804function write_exit_code() {{
805 echo "Writing $1 to exit status file"
806 echo "$1" > {self.exit_code_file}
807 exit $1
808}}
809
810function ctrl_c() {{
811 trap '' SIGINT SIGTERM
812 echo "** Trapped Ctrl-C **"
813 echo "$1" > {self.exit_code_file}
814 exit $1
815}}
816# ---"""
817 print(start_wrapper, file=batch_file)
818
819 def _add_wrapper_script_teardown(self, job, batch_file):
820 """
821 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
822 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
823 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
824 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
825 file.
826 """
827 end_wrapper = """# ---
828write_exit_code $?"""
829 print(end_wrapper, file=batch_file)
830
831 @classmethod
832 def _create_parent_job_result(cls, parent):
833 """
834 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
835 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
836 statuses and allows the use of ready().
837 """
838 raise NotImplementedError
839
840 def get_submit_script_path(self, job):
841 """
842 Construct the Path object of the bash script file that we will submit. It will contain
843 the actual job command, wrapper commands, setup commands, and any batch directives
844 """
845 return Path(job.working_dir, self.submit_script)
846
847
848class Result():
849 """
850 Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
851 submitted to a backend. It provides a way to query a job's status to find out if it's ready.
852 """
853
854 def __init__(self, job):
855 """
856 Pass in the job object to allow the result to access the job's properties and do post-processing. """
857
858 self.job = job
859
860 self._is_ready = False
861
863 self.time_to_wait_for_exit_code_file = timedelta(minutes=5)
864
865 self.exit_code_file_initial_time = None
866
867 def ready(self):
868 """
869 Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes the 'readiness' based on the known job status.
870 """
871 B2DEBUG(29, f"Calling {self.job}.result.ready()")
872 if self._is_ready:
873 return True
874 elif self.job.status in self.job.exit_statuses:
875 self._is_ready = True
876 return True
877 else:
878 return False
879
880 def update_status(self):
881 """
882 Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
883 actually look up the job's status from some database/exit code file.
884 """
885 raise NotImplementedError
886
887 def get_exit_code_from_file(self):
888 """
889 Read the exit code file to discover the exit status of the job command. Useful falback if the job is no longer
890 known to the job database (batch system purged it for example). Since some backends may take time to download
891 the output files of the job back to the working directory we use a time limit on how long to wait.
892 """
893 if not self.exit_code_file_initial_time:
894 self.exit_code_file_initial_time = datetime.now()
895 exit_code_path = Path(self.job.working_dir, Backend.exit_code_file)
896 with open(exit_code_path) as f:
897 exit_code = int(f.read().strip())
898 B2DEBUG(29, f"Exit code from file for {self.job} was {exit_code}")
899 return exit_code
900
901
902class Local(Backend):
903 """
904 Backend for local processes i.e. on the same machine but in a subprocess.
905
906 Note that you should call the self.join() method to close the pool and wait for any
907 running processes to finish before exiting the process. Once you've called join you will have to set up a new
908 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
909 somewhere, then the main python process might end before your pool is done.
910
911 Keyword Arguments:
912 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
913 It's the maximium simultaneous subjobs.
914 Try not to specify a large number or a number larger than the number of cores.
915 It won't crash the program but it will slow down and negatively impact performance.
916 """
917
918 def __init__(self, *, backend_args=None, max_processes=1):
919 """
920 """
921 super().__init__(backend_args=backend_args)
922
923 self.pool = None
924
925 self.max_processes = max_processes
926
927 class LocalResult(Result):
928 """
929 Result class to help monitor status of jobs submitted by Local backend.
930 """
931
932 def __init__(self, job, result):
933 """
934 Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
935 post processing of the job.
936 """
937 super().__init__(job)
938
939 self.result = result
940
941 def _update_result_status(self):
942 if self.result.ready() and (self.job.status not in self.job.exit_statuses):
943 return_code = self.result.get()
944 if return_code:
945 self.job.status = "failed"
946 else:
947 self.job.status = "completed"
948
949 def update_status(self):
950 """
951 Update the job's (or subjobs') status by calling the result object.
952 """
953 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
954 if self.job.subjobs:
955 for subjob in self.job.subjobs.values():
956 subjob.result._update_result_status()
957 else:
958 self._update_result_status()
959
960 def join(self):
961 """
962 Closes and joins the Pool, letting you wait for all results currently
963 still processing.
964 """
965 B2INFO("Joining Process Pool, waiting for results to finish...")
966 self.pool.close()
967 self.pool.join()
968 B2INFO("Process Pool joined.")
969
970 @property
971 def max_processes(self):
972 """
973 Getter for max_processes
974 """
975 return self._max_processes
976
977 @max_processes.setter
978 def max_processes(self, value):
979 """
980 Setter for max_processes, we also check for a previous Pool(), wait for it to join
981 and then create a new one with the new value of max_processes
982 """
983
984 self._max_processes = value
985 if self.pool:
986 B2INFO("New max_processes requested. But a pool already exists.")
987 self.join()
988 B2INFO(f"Starting up new Pool with {self.max_processes} processes")
989 self.pool = mp.Pool(processes=self.max_processes)
990
991 @method_dispatch
992 def submit(self, job):
993 """
994 """
995 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
996 "Did you submit a (Sub)Job?")
997
998 @submit.register(SubJob)
999 def _(self, job):
1000 """
1001 Submission of a `SubJob` for the Local backend
1002 """
1003 # Make sure the output directory of the job is created
1004 job.output_dir.mkdir(parents=True, exist_ok=True)
1005 # Make sure the working directory of the job is created
1006 job.working_dir.mkdir(parents=True, exist_ok=True)
1007 job.copy_input_sandbox_files_to_working_dir()
1008 job.dump_input_data()
1009 # Get the path to the bash script we run
1010 script_path = self.get_submit_script_path(job)
1011 with open(script_path, mode="w") as batch_file:
1012 print("#!/bin/bash", file=batch_file)
1013 self._add_wrapper_script_setup(job, batch_file)
1014 self._add_setup(job, batch_file)
1015 print(job.full_command, file=batch_file)
1016 self._add_wrapper_script_teardown(job, batch_file)
1017 B2INFO(f"Submitting {job}")
1018 job.result = Local.LocalResult(job,
1019 self.pool.apply_async(self.run_job,
1020 (job.name,
1021 job.working_dir,
1022 job.output_dir,
1023 script_path)
1024 )
1025 )
1026 job.status = "submitted"
1027 B2INFO(f"{job} submitted")
1028
1029 @submit.register(Job)
1030 def _(self, job):
1031 """
1032 Submission of a `Job` for the Local backend
1033 """
1034 # Make sure the output directory of the job is created
1035 job.output_dir.mkdir(parents=True, exist_ok=True)
1036 # Make sure the working directory of the job is created
1037 job.working_dir.mkdir(parents=True, exist_ok=True)
1038 # Check if we have any valid input files
1039 job.check_input_data_files()
1040
1041 if not job.splitter:
1042 # Get all of the requested files for the input sandbox and copy them to the working directory
1043 job.copy_input_sandbox_files_to_working_dir()
1044 job.dump_input_data()
1045 # Get the path to the bash script we run
1046 script_path = self.get_submit_script_path(job)
1047 with open(script_path, mode="w") as batch_file:
1048 print("#!/bin/bash", file=batch_file)
1049 self._add_wrapper_script_setup(job, batch_file)
1050 self._add_setup(job, batch_file)
1051 print(job.full_command, file=batch_file)
1052 self._add_wrapper_script_teardown(job, batch_file)
1053 B2INFO(f"Submitting {job}")
1054 job.result = Local.LocalResult(job,
1055 self.pool.apply_async(self.run_job,
1056 (job.name,
1057 job.working_dir,
1058 job.output_dir,
1059 script_path)
1060 )
1061 )
1062 B2INFO(f"{job} submitted")
1063 else:
1064 # Create subjobs according to the splitter's logic
1065 job.splitter.create_subjobs(job)
1066 # Submit the subjobs
1067 self.submit(list(job.subjobs.values()))
1068 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1069 self._create_parent_job_result(job)
1070
1071 @submit.register(list)
1072 def _(self, jobs):
1073 """
1074 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1075 """
1076 # Submit the jobs
1077 for job in jobs:
1078 self.submit(job)
1079 B2INFO("All requested jobs submitted.")
1080
1081 @staticmethod
1082 def run_job(name, working_dir, output_dir, script):
1083 """
1084 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1085 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1086 """
1087 B2INFO(f"Starting Sub-process: {name}")
1088 from subprocess import Popen
1089 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1090 stderr_file_path = Path(working_dir, _STDERR_FILE)
1091 # Create unix command to redirect stdour and stderr
1092 B2INFO(f"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1093 with open(stdout_file_path, mode="w", buffering=1) as f_out, \
1094 open(stderr_file_path, mode="w", buffering=1) as f_err:
1095 with Popen(["/bin/bash", script.as_posix()],
1096 stdout=f_out,
1097 stderr=f_err,
1098 bufsize=1,
1099 universal_newlines=True,
1100 cwd=working_dir,
1101 env={}) as p:
1102 # We block here and wait so that the return code will be set.
1103 p.wait()
1104 B2INFO(f"Subprocess {name} finished.")
1105 return p.returncode
1106
1107 @classmethod
1108 def _create_parent_job_result(cls, parent):
1109 parent.result = cls.LocalResult(parent, None)
1110
1111
1112class Batch(Backend):
1113 """
1114 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1115 in a derived class. Do not use this class directly!
1116 """
1117
1118 submission_cmds = []
1119
1131 default_global_job_limit = 1000
1132
1133 default_sleep_between_submission_checks = 30
1134
1135 def __init__(self, *, backend_args=None):
1136 """
1137 Init method for Batch Backend. Does some basic default setup.
1138 """
1139 super().__init__(backend_args=backend_args)
1140
1142 self.global_job_limit = self.default_global_job_limit
1143
1145 self.sleep_between_submission_checks = self.default_sleep_between_submission_checks
1146
1147 def _add_batch_directives(self, job, file):
1148 """
1149 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1150 You should think about where the stdout/err should go, and set the queue name.
1151 """
1152 raise NotImplementedError("Need to implement a _add_batch_directives(self, job, file) "
1153 f"method in {self.__class__.__name__} backend.")
1154
1155 def _make_submit_file(self, job, submit_file_path):
1156 """
1157 Useful for the HTCondor backend where a submit is needed instead of batch
1158 directives pasted directly into the submission script. It should be overwritten
1159 if needed.
1160 """
1161
1162 @classmethod
1163 @abstractmethod
1164 def _submit_to_batch(cls, cmd):
1165 """
1166 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1167 """
1168
1169 def can_submit(self, *args, **kwargs):
1170 """
1171 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail. This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1172
1173 Returns:
1174 bool: If the job submission can continue based on the current situation.
1175 """
1176 return True
1177
1178 @method_dispatch
1179 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1180 """
1181 """
1182 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1183 "Did you submit a (Sub)Job?")
1184
1185 @submit.register(SubJob)
1186 def _(self, job, check_can_submit=True, jobs_per_check=100):
1187 """
1188 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1189 create batch script, and send it off with the batch submission command.
1190 It should apply the correct options (default and user requested).
1191
1192 Should set a Result object as an attribute of the job.
1193 """
1194 # Make sure the output directory of the job is created, commented out due to permission issues
1195 # job.output_dir.mkdir(parents=True, exist_ok=True)
1196 # Make sure the working directory of the job is created
1197 job.working_dir.mkdir(parents=True, exist_ok=True)
1198 job.copy_input_sandbox_files_to_working_dir()
1199 job.dump_input_data()
1200 # Make submission file if needed
1201 batch_submit_script_path = self.get_batch_submit_script_path(job)
1202 self._make_submit_file(job, batch_submit_script_path)
1203 # Get the bash file we will actually run, might be the same file
1204 script_path = self.get_submit_script_path(job)
1205 # Construct the batch submission script (with directives if that is supported)
1206 with open(script_path, mode="w") as batch_file:
1207 self._add_batch_directives(job, batch_file)
1208 self._add_wrapper_script_setup(job, batch_file)
1209 self._add_setup(job, batch_file)
1210 print(job.full_command, file=batch_file)
1211 self._add_wrapper_script_teardown(job, batch_file)
1212 os.chmod(script_path, 0o755)
1213 B2INFO(f"Submitting {job}")
1214 # Do the actual batch submission
1215 cmd = self._create_cmd(batch_submit_script_path)
1216 output = self._submit_to_batch(cmd)
1217 self._create_job_result(job, output)
1218 job.status = "submitted"
1219 B2INFO(f"{job} submitted")
1220
1221 @submit.register(Job)
1222 def _(self, job, check_can_submit=True, jobs_per_check=100):
1223 """
1224 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1225 and send it off with the batch submission command, applying the correct options (default and user requested.)
1226
1227 Should set a Result object as an attribute of the job.
1228 """
1229 # Make sure the output directory of the job is created, commented out due to permissions issue
1230 # job.output_dir.mkdir(parents=True, exist_ok=True)
1231 # Make sure the working directory of the job is created
1232 job.working_dir.mkdir(parents=True, exist_ok=True)
1233 # Check if we have any valid input files
1234 job.check_input_data_files()
1235 # Add any required backend args that are missing (I'm a bit hesitant to actually merge with job.backend_args)
1236 # just in case you want to resubmit the same job with different backend settings later.
1237 # job_backend_args = {**self.backend_args, **job.backend_args}
1238
1239 # If there's no splitter then we just submit the Job with no SubJobs
1240 if not job.splitter:
1241 # Get all of the requested files for the input sandbox and copy them to the working directory
1242 job.copy_input_sandbox_files_to_working_dir()
1243 job.dump_input_data()
1244 # Make submission file if needed
1245 batch_submit_script_path = self.get_batch_submit_script_path(job)
1246 self._make_submit_file(job, batch_submit_script_path)
1247 # Get the bash file we will actually run
1248 script_path = self.get_submit_script_path(job)
1249 # Construct the batch submission script (with directives if that is supported)
1250 with open(script_path, mode="w") as batch_file:
1251 self._add_batch_directives(job, batch_file)
1252 self._add_wrapper_script_setup(job, batch_file)
1253 self._add_setup(job, batch_file)
1254 print(job.full_command, file=batch_file)
1255 self._add_wrapper_script_teardown(job, batch_file)
1256 os.chmod(script_path, 0o755)
1257 B2INFO(f"Submitting {job}")
1258 # Do the actual batch submission
1259 cmd = self._create_cmd(batch_submit_script_path)
1260 output = self._submit_to_batch(cmd)
1261 self._create_job_result(job, output)
1262 job.status = "submitted"
1263 B2INFO(f"{job} submitted")
1264 else:
1265 # Create subjobs according to the splitter's logic
1266 job.splitter.create_subjobs(job)
1267 # Submit the subjobs
1268 self.submit(list(job.subjobs.values()))
1269 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1270 self._create_parent_job_result(job)
1271
1272 @submit.register(list)
1273 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1274 """
1275 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1276 """
1277 B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1278 # Technically this could be a list of Jobs or SubJobs. And if it is a list of Jobs then it might not
1279 # be necessary to check if we can submit right now. We could do it later during the submission of the
1280 # SubJob list. However in the interest of simpler code we just do the check here, and re-check again
1281 # if a SubJob list comes through this function. Slightly inefficient, but much simpler logic.
1282
1283 # The first thing to do is make sure that we are iterating through the jobs list in chunks that are
1284 # equal to or smaller than the gloabl limit. Otherwise nothing will ever submit.
1285
1286 if jobs_per_check > self.global_job_limit:
1287 B2INFO(f"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1288 f"limit for this backend (={self.global_job_limit}). Will instead use the "
1289 " value of the global job limit.")
1290 jobs_per_check = self.global_job_limit
1291
1292 # We group the jobs list into chunks of length jobs_per_check
1293 for jobs_to_submit in grouper(jobs_per_check, jobs):
1294 # Wait until we are allowed to submit
1295 while not self.can_submit(njobs=len(jobs_to_submit)):
1296 B2INFO("Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1297 time.sleep(self.sleep_between_submission_checks)
1298 else:
1299 # We loop here since we have already checked if the number of jobs is low enough, we don't want to hit this
1300 # function again unless one of the jobs has subjobs.
1301 B2INFO(f"Submitting the next {len(jobs_to_submit)} jobs...")
1302 for job in jobs_to_submit:
1303 self.submit(job, check_can_submit, jobs_per_check)
1304 B2INFO(f"All {len(jobs)} requested jobs submitted")
1305
1306 def get_batch_submit_script_path(self, job):
1307 """
1308 Construct the Path object of the script file that we will submit using the batch command.
1309 For most batch backends this is the same script as the bash script we submit.
1310 But for some they require a separate submission file that describes the job.
1311 To implement that you can implement this function in the Backend class.
1312 """
1313 return Path(job.working_dir, self.submit_script)
1314
1315 @classmethod
1316 @abstractmethod
1317 def _create_job_result(cls, job, batch_output):
1318 """
1319 """
1320
1321 @abstractmethod
1322 def _create_cmd(self, job):
1323 """
1324 """
1325
1326
1327class PBS(Batch):
1328 """
1329 Backend for submitting calibration processes to a qsub batch system.
1330 """
1331
1332 cmd_wkdir = "#PBS -d"
1333
1334 cmd_stdout = "#PBS -o"
1335
1336 cmd_stderr = "#PBS -e"
1337
1338 cmd_queue = "#PBS -q"
1339
1340 cmd_name = "#PBS -N"
1341
1342 submission_cmds = ["qsub"]
1343
1344 default_global_job_limit = 5000
1345
1346 default_backend_args = {"queue": "short"}
1347
1348 def __init__(self, *, backend_args=None):
1349 super().__init__(backend_args=backend_args)
1350
1351 def _add_batch_directives(self, job, batch_file):
1352 """
1353 Add PBS directives to submitted script.
1354 """
1355 job_backend_args = {**self.backend_args, **job.backend_args}
1356 batch_queue = job_backend_args["queue"]
1357 print("#!/bin/bash", file=batch_file)
1358 print("# --- Start PBS ---", file=batch_file)
1359 print(" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1360 print(" ".join([PBS.cmd_name, job.name]), file=batch_file)
1361 print(" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1362 print(" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1363 print(" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1364 print("# --- End PBS ---", file=batch_file)
1365
1366 @classmethod
1367 def _create_job_result(cls, job, batch_output):
1368 """
1369 """
1370 job_id = batch_output.replace("\n", "")
1371 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1372 job.result = cls.PBSResult(job, job_id)
1373
1374 def _create_cmd(self, script_path):
1375 """
1376 """
1377 submission_cmd = self.submission_cmds[:]
1378 submission_cmd.append(script_path.as_posix())
1379 return submission_cmd
1380
1381 @classmethod
1382 def _submit_to_batch(cls, cmd):
1383 """
1384 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1385 """
1386 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
1387 return sub_out
1388
1389 @classmethod
1390 def _create_parent_job_result(cls, parent):
1391 parent.result = cls.PBSResult(parent, None)
1392
1393 class PBSResult(Result):
1394 """
1395 Simple class to help monitor status of jobs submitted by `PBS` Backend.
1396
1397 You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1398 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1399 """
1400
1401
1402 backend_code_to_status = {"R": "running",
1403 "C": "completed",
1404 "FINISHED": "completed",
1405 "E": "failed",
1406 "H": "submitted",
1407 "Q": "submitted",
1408 "T": "submitted",
1409 "W": "submitted",
1410 "H": "submitted"
1411 }
1412
1413 def __init__(self, job, job_id):
1414 """
1415 Pass in the job object and the job id to allow the result to do monitoring and perform
1416 post processing of the job.
1417 """
1418 super().__init__(job)
1419
1420 self.job_id = job_id
1421
1422 def update_status(self):
1423 """
1424 Update the job's (or subjobs') status by calling qstat.
1425 """
1426 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
1427 # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
1428 qstat_output = PBS.qstat()
1429 if self.job.subjobs:
1430 for subjob in self.job.subjobs.values():
1431 subjob.result._update_result_status(qstat_output)
1432 else:
1433 self._update_result_status(qstat_output)
1434
1435 def _update_result_status(self, qstat_output):
1436 """
1437 Parameters:
1438 qstat_output (dict): The JSON output of a previous call to qstat which we can re-use to find the
1439 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1440 'job_state' information, otherwise it is useless.
1441
1442 """
1443 try:
1444 backend_status = self._get_status_from_output(qstat_output)
1445 except KeyError:
1446 # If this happens then maybe the job id wasn't in the qstat_output argument because it finished.
1447 # Instead of failing immediately we try looking for the exit code file and then fail if it still isn't there.
1448 B2DEBUG(29, f"Checking of the exit code from file for {self.job}")
1449 try:
1450 exit_code = self.get_exit_code_from_file()
1451 except FileNotFoundError:
1452 waiting_time = datetime.now() - self.exit_code_file_initial_time
1453 if self.time_to_wait_for_exit_code_file > waiting_time:
1454 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1455 exit_code = 1
1456 else:
1457 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1458 return
1459 if exit_code:
1460 backend_status = "E"
1461 else:
1462 backend_status = "C"
1463
1464 try:
1465 new_job_status = self.backend_code_to_status[backend_status]
1466 except KeyError as err:
1467 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1468
1469 if new_job_status != self.job.status:
1470 self.job.status = new_job_status
1471
1472 def _get_status_from_output(self, output):
1473 for job_info in output["JOBS"]:
1474 if job_info["Job_Id"] == self.job_id:
1475 return job_info["job_state"]
1476 else:
1477 raise KeyError
1478
1479 def can_submit(self, njobs=1):
1480 """
1481 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1482 Returns True if the number is lower that the limit, False if it is higher.
1483
1484 Parameters:
1485 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1486 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1487 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1488 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250 and check again before submitting more.
1489 """
1490 B2DEBUG(29, "Calling PBS().can_submit()")
1491 job_info = self.qstat(username=os.environ["USER"])
1492 total_jobs = job_info["NJOBS"]
1493 B2INFO(f"Total jobs active in the PBS system is currently {total_jobs}")
1494 if (total_jobs + njobs) > self.global_job_limit:
1495 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1496 return False
1497 else:
1498 B2INFO("There is enough space to submit more jobs.")
1499 return True
1500
1501 @classmethod
1502 def qstat(cls, username="", job_ids=None):
1503 """
1504 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1505 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1506 by qstat.
1507
1508 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1509 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1510 This one should work for Melbourne's cloud computing centre.
1511
1512 Keyword Args:
1513 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1514 will be in the output dictionary.
1515 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1516 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1517 matching the other filters will be returned.
1518
1519 Returns:
1520 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1521
1522 .. code-block:: python
1523
1524 {
1525 "NJOBS": int
1526 "JOBS":[
1527 {
1528 <key: value>, ...
1529 }, ...
1530 ]
1531 }
1532 """
1533 B2DEBUG(29, f"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1534 if not job_ids:
1535 job_ids = []
1536 job_ids = set(job_ids)
1537 cmd_list = ["qstat", "-x"]
1538 # We get an XML serialisable summary from qstat. Requires the shell argument.
1539 cmd = " ".join(cmd_list)
1540 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1541 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1542 jobs_dict = {"NJOBS": 0, "JOBS": []}
1543 jobs_xml = ET.fromstring(output)
1544
1545 # For a specific job_id we can be a bit more efficient in XML parsing
1546 if len(job_ids) == 1:
1547 job_elem = jobs_xml.find(f"./Job[Job_Id='{list(job_ids)[0]}']")
1548 if job_elem:
1549 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job_elem))
1550 jobs_dict["NJOBS"] = 1
1551 return jobs_dict
1552
1553 # Since the username given is not exactly the same as the one that PBS stores (<username>@host)
1554 # we have to simply loop through rather than using XPATH.
1555 for job in jobs_xml.iterfind("Job"):
1556 job_owner = job.find("Job_Owner").text.split("@")[0]
1557 if username and username != job_owner:
1558 continue
1559 job_id = job.find("Job_Id").text
1560 if job_ids and job_id not in job_ids:
1561 continue
1562 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job))
1563 jobs_dict["NJOBS"] += 1
1564 # Remove it so that we don't keep checking for it
1565 if job_id in job_ids:
1566 job_ids.remove(job_id)
1567 return jobs_dict
1568
1569 @staticmethod
1570 def create_job_record_from_element(job_elem):
1571 """
1572 Creates a Job dictionary with various job information from the XML element returned by qstat.
1573
1574 Parameters:
1575 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1576
1577 Returns:
1578 dict: JSON serialisable dictionary of the Job information we are interested in.
1579 """
1580 job_dict = {}
1581 job_dict["Job_Id"] = job_elem.find("Job_Id").text
1582 job_dict["Job_Name"] = job_elem.find("Job_Name").text
1583 job_dict["Job_Owner"] = job_elem.find("Job_Owner").text
1584 job_dict["job_state"] = job_elem.find("job_state").text
1585 job_dict["queue"] = job_elem.find("queue").text
1586 return job_dict
1587
1588
1589class LSF(Batch):
1590 """
1591 Backend for submitting calibration processes to a qsub batch system.
1592 """
1593
1594 cmd_wkdir = "#BSUB -cwd"
1595
1596 cmd_stdout = "#BSUB -o"
1597
1598 cmd_stderr = "#BSUB -e"
1599
1600 cmd_queue = "#BSUB -q"
1601
1602 cmd_name = "#BSUB -J"
1603
1604 submission_cmds = ["bsub", "-env", "\"none\"", "<"]
1605
1606 default_global_job_limit = 15000
1607
1608 default_backend_args = {"queue": "s"}
1609
1610 def __init__(self, *, backend_args=None):
1611 super().__init__(backend_args=backend_args)
1612
1613 def _add_batch_directives(self, job, batch_file):
1614 """
1615 Adds LSF BSUB directives for the job to a script.
1616 """
1617 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1618 batch_queue = job_backend_args["queue"]
1619 print("#!/bin/bash", file=batch_file)
1620 print("# --- Start LSF ---", file=batch_file)
1621 print(" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1622 print(" ".join([LSF.cmd_name, job.name]), file=batch_file)
1623 print(" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1624 print(" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1625 print(" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1626 print("# --- End LSF ---", file=batch_file)
1627
1628 def _create_cmd(self, script_path):
1629 """
1630 """
1631 submission_cmd = self.submission_cmds[:]
1632 submission_cmd.append(script_path.as_posix())
1633 submission_cmd = " ".join(submission_cmd)
1634 return [submission_cmd]
1635
1636 @classmethod
1637 def _submit_to_batch(cls, cmd):
1638 """
1639 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1640 """
1641 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1642 return sub_out
1643
1644 class LSFResult(Result):
1645 """
1646 Simple class to help monitor status of jobs submitted by LSF Backend.
1647
1648 You pass in a `Job` object and job id from a bsub command.
1649 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1650 """
1651
1652
1653 backend_code_to_status = {"RUN": "running",
1654 "DONE": "completed",
1655 "FINISHED": "completed",
1656 "EXIT": "failed",
1657 "PEND": "submitted"
1658 }
1659
1660 def __init__(self, job, job_id):
1661 """
1662 Pass in the job object and the job id to allow the result to do monitoring and perform
1663 post processing of the job.
1664 """
1665 super().__init__(job)
1666
1667 self.job_id = job_id
1668
1669 def update_status(self):
1670 """
1671 Update the job's (or subjobs') status by calling bjobs.
1672 """
1673 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
1674 # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
1675 bjobs_output = LSF.bjobs(output_fields=["stat", "id"])
1676 if self.job.subjobs:
1677 for subjob in self.job.subjobs.values():
1678 subjob.result._update_result_status(bjobs_output)
1679 else:
1680 self._update_result_status(bjobs_output)
1681
1682 def _update_result_status(self, bjobs_output):
1683 """
1684 Parameters:
1685 bjobs_output (dict): The JSON output of a previous call to bjobs which we can re-use to find the
1686 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1687 'id' information, otherwise it is useless.
1688
1689 """
1690 try:
1691 backend_status = self._get_status_from_output(bjobs_output)
1692 except KeyError:
1693 # If this happens then maybe the job id wasn't in the bjobs_output argument because it finished.
1694 # Instead of failing immediately we try re-running bjobs here explicitly and then fail if it still isn't there.
1695 bjobs_output = LSF.bjobs(output_fields=["stat", "id"], job_id=str(self.job_id))
1696 try:
1697 backend_status = self._get_status_from_output(bjobs_output)
1698 except KeyError:
1699 # If this happened, maybe we're looking at an old finished job. We could fall back to bhist, but it's
1700 # slow and terrible. Instead let's try looking for the exit code file.
1701 try:
1702 exit_code = self.get_exit_code_from_file()
1703 except FileNotFoundError:
1704 waiting_time = datetime.now() - self.exit_code_file_initial_time
1705 if self.time_to_wait_for_exit_code_file > waiting_time:
1706 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1707 exit_code = 1
1708 else:
1709 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1710 return
1711 if exit_code:
1712 backend_status = "EXIT"
1713 else:
1714 backend_status = "FINISHED"
1715 try:
1716 new_job_status = self.backend_code_to_status[backend_status]
1717 except KeyError as err:
1718 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1719
1720 if new_job_status != self.job.status:
1721 self.job.status = new_job_status
1722
1723 def _get_status_from_output(self, output):
1724 if output["JOBS"] and "ERROR" in output["JOBS"][0]:
1725 if output["JOBS"][0]["ERROR"] == f"Job <{self.job_id}> is not found":
1726 raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1727 else:
1728 raise BackendError(f"Unidentified Error during status check for {self.job}: {output}")
1729 else:
1730 for job_info in output["JOBS"]:
1731 if job_info["JOBID"] == self.job_id:
1732 return job_info["STAT"]
1733 else:
1734 raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1735
1736 @classmethod
1737 def _create_parent_job_result(cls, parent):
1738 parent.result = cls.LSFResult(parent, None)
1739
1740 @classmethod
1741 def _create_job_result(cls, job, batch_output):
1742 """
1743 """
1744 m = re.search(r"Job <(\d+)>", str(batch_output))
1745 if m:
1746 job_id = m.group(1)
1747 else:
1748 raise BackendError(f"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1749
1750 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1751 job.result = cls.LSFResult(job, job_id)
1752
1753 def can_submit(self, njobs=1):
1754 """
1755 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1756 Returns True if the number is lower that the limit, False if it is higher.
1757
1758 Parameters:
1759 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1760 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1761 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1762 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250 and check again before submitting more.
1763 """
1764 B2DEBUG(29, "Calling LSF().can_submit()")
1765 job_info = self.bjobs(output_fields=["stat"])
1766 total_jobs = job_info["NJOBS"]
1767 B2INFO(f"Total jobs active in the LSF system is currently {total_jobs}")
1768 if (total_jobs + njobs) > self.global_job_limit:
1769 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1770 return False
1771 else:
1772 B2INFO("There is enough space to submit more jobs.")
1773 return True
1774
1775 @classmethod
1776 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1777 """
1778 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1779 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1780
1781 Parameters:
1782 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1783 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1784 the output of this function will be only information about this job. If this argument is not given, then all jobs
1785 matching the other filters will be returned.
1786 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1787 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1788 receive job information from all known user jobs matching the other filters.
1789 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1790
1791 Returns:
1792 dict: JSON dictionary of the form:
1793
1794 .. code-block:: python
1795
1796 {
1797 "NJOBS":<njobs returned by command>,
1798 "JOBS":[
1799 {
1800 <output field: value>, ...
1801 }, ...
1802 ]
1803 }
1804 """
1805 B2DEBUG(29, f"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1806 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1807 if not output_fields:
1808 output_fields = ["id"]
1809 # Output fields should be space separated but in a string.
1810 field_list_cmd = "\""
1811 field_list_cmd += " ".join(output_fields)
1812 field_list_cmd += "\""
1813 cmd_list = ["bjobs", "-o", field_list_cmd]
1814 # If the queue name is set then we add to the command options
1815 if queue:
1816 cmd_list.extend(["-q", queue])
1817 # If the username is set then we add to the command options
1818 if username:
1819 cmd_list.extend(["-u", username])
1820 # Can now add the json option before the final positional argument (if used)
1821 cmd_list.append("-json")
1822 # If the job id is set then we add to the end of the command
1823 if job_id:
1824 cmd_list.append(job_id)
1825 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1826 cmd = " ".join(cmd_list)
1827 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1828 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True))
1829 output["NJOBS"] = output["JOBS"]
1830 output["JOBS"] = output["RECORDS"]
1831 del output["RECORDS"]
1832 del output["COMMAND"]
1833 return output
1834
1835 @classmethod
1836 def bqueues(cls, output_fields=None, queues=None):
1837 """
1838 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1839 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1840
1841 Parameters:
1842 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1843 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1844 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1845 By default you will receive information about all queues.
1846
1847 Returns:
1848 dict: JSON dictionary of the form:
1849
1850 .. code-block:: python
1851
1852 {
1853 "COMMAND":"bqueues",
1854 "QUEUES":46,
1855 "RECORDS":[
1856 {
1857 "QUEUE_NAME":"b2_beast",
1858 "STATUS":"Open:Active",
1859 "MAX":"200",
1860 "NJOBS":"0",
1861 "PEND":"0",
1862 "RUN":"0"
1863 }, ...
1864 }
1865 """
1866 B2DEBUG(29, f"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1867 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1868 if not output_fields:
1869 output_fields = ["queue_name", "status", "max", "njobs", "pend", "run"]
1870 # Output fields should be space separated but in a string.
1871 field_list_cmd = "\""
1872 field_list_cmd += " ".join(output_fields)
1873 field_list_cmd += "\""
1874 cmd_list = ["bqueues", "-o", field_list_cmd]
1875 # Can now add the json option before the final positional argument (if used)
1876 cmd_list.append("-json")
1877 # If the queue name is set then we add to the end of the command
1878 if queues:
1879 cmd_list.extend(queues)
1880 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1881 cmd = " ".join(cmd_list)
1882 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1883 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1884 return decode_json_string(output)
1885
1886
1887class HTCondor(Batch):
1888 """
1889 Backend for submitting calibration processes to a HTCondor batch system.
1890 """
1891
1892 batch_submit_script = "submit.sub"
1893
1894 submission_cmds = ["condor_submit", "-terse"]
1895
1896 default_global_job_limit = 10000
1897
1898 default_backend_args = {
1899 "universe": "vanilla",
1900 "getenv": "false",
1901 "request_memory": "4 GB", # We set the default requested memory to 4 GB to maintain parity with KEKCC
1902 "path_prefix": "", # Path prefix for file path
1903 "extra_lines": [] # These should be other HTCondor submit script lines like 'request_cpus = 2'
1904 }
1905
1906 default_class_ads = ["GlobalJobId", "JobStatus", "Owner"]
1907
1908 def _make_submit_file(self, job, submit_file_path):
1909 """
1910 Fill HTCondor submission file.
1911 """
1912 # Find all files/directories in the working directory to copy on the worker node
1913
1914 files_to_transfer = [i.as_posix() for i in job.working_dir.iterdir()]
1915
1916 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1917
1918 with open(submit_file_path, "w") as submit_file:
1919 print(f'executable = {self.get_submit_script_path(job)}', file=submit_file)
1920 print(f'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1921 print(f'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1922 print(f'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1923 print('transfer_input_files = ', ','.join(files_to_transfer), file=submit_file)
1924 print(f'universe = {job_backend_args["universe"]}', file=submit_file)
1925 print(f'getenv = {job_backend_args["getenv"]}', file=submit_file)
1926 print(f'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1927 print('should_transfer_files = Yes', file=submit_file)
1928 print('when_to_transfer_output = ON_EXIT', file=submit_file)
1929 # Any other lines in the backend args that we don't deal with explicitly but maybe someone wants to insert something
1930 for line in job_backend_args["extra_lines"]:
1931 print(line, file=submit_file)
1932 print('queue', file=submit_file)
1933
1934 def _add_batch_directives(self, job, batch_file):
1935 """
1936 For HTCondor leave empty as the directives are already included in the submit file.
1937 """
1938 print('#!/bin/bash', file=batch_file)
1939
1940 def _create_cmd(self, script_path):
1941 """
1942 """
1943 submission_cmd = self.submission_cmds[:]
1944 submission_cmd.append(script_path.as_posix())
1945 return submission_cmd
1946
1947 def get_batch_submit_script_path(self, job):
1948 """
1949 Construct the Path object of the .sub file that we will use to describe the job.
1950 """
1951 return Path(job.working_dir, self.batch_submit_script)
1952
1953 @classmethod
1954 def _submit_to_batch(cls, cmd):
1955 """
1956 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1957 """
1958 job_dir = Path(cmd[-1]).parent.as_posix()
1959 sub_out = ""
1960 attempt = 0
1961 sleep_time = 30
1962
1963 while attempt < 3:
1964 try:
1965 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, cwd=job_dir)
1966 break
1967 except subprocess.CalledProcessError as e:
1968 attempt += 1
1969 if attempt == 3:
1970 B2ERROR(f"Error during condor_submit: {str(e)} occurred more than 3 times.")
1971 raise e
1972 else:
1973 B2ERROR(f"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
1974 time.sleep(30)
1975 return sub_out.split()[0]
1976
1977 class HTCondorResult(Result):
1978 """
1979 Simple class to help monitor status of jobs submitted by HTCondor Backend.
1980
1981 You pass in a `Job` object and job id from a condor_submit command.
1982 When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
1983 to see whether or not the job has finished.
1984 """
1985
1986
1987 backend_code_to_status = {0: "submitted",
1988 1: "submitted",
1989 2: "running",
1990 3: "failed",
1991 4: "completed",
1992 5: "submitted",
1993 6: "failed"
1994 }
1995
1996 def __init__(self, job, job_id):
1997 """
1998 Pass in the job object and the job id to allow the result to do monitoring and perform
1999 post processing of the job.
2000 """
2001 super().__init__(job)
2002
2003 self.job_id = job_id
2004
2005 def update_status(self):
2006 """
2007 Update the job's (or subjobs') status by calling condor_q.
2008 """
2009 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
2010 # Get all jobs info and re-use it for each status update to minimise tie spent on this updating.
2011 condor_q_output = HTCondor.condor_q()
2012 if self.job.subjobs:
2013 for subjob in self.job.subjobs.values():
2014 subjob.result._update_result_status(condor_q_output)
2015 else:
2016 self._update_result_status(condor_q_output)
2017
2018 def _update_result_status(self, condor_q_output):
2019 """
2020 In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2021 If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed, ``condor_history``, etc.
2022
2023 Parameters:
2024 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can re-use to find the
2025 status of this job if it was active when that command ran.
2026 """
2027 B2DEBUG(29, f"Calling {self.job}.result._update_result_status()")
2028 jobs_info = []
2029 for job_record in condor_q_output["JOBS"]:
2030 job_id = job_record["GlobalJobId"].split("#")[1]
2031 if job_id == self.job_id:
2032 B2DEBUG(29, f"Found {self.job_id} in condor_q_output.")
2033 jobs_info.append(job_record)
2034
2035 # Let's look for the exit code file where we expect it
2036 if not jobs_info:
2037 try:
2038 exit_code = self.get_exit_code_from_file()
2039 except FileNotFoundError:
2040 waiting_time = datetime.now() - self.exit_code_file_initial_time
2041 if self.time_to_wait_for_exit_code_file > waiting_time:
2042 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2043 exit_code = 1
2044 else:
2045 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
2046 return
2047 if exit_code:
2048 jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2049 else:
2050 jobs_info = [{"JobStatus": 4, "HoldReason": None}] # Set to completed
2051
2052 # If this job wasn't in the passed in condor_q output, let's try our own with the specific job_id
2053 if not jobs_info:
2054 jobs_info = HTCondor.condor_q(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2055
2056 # If no job information is returned then the job already left the queue
2057 # check in the history to see if it suceeded or failed
2058 if not jobs_info:
2059 try:
2060 jobs_info = HTCondor.condor_history(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2061 except KeyError:
2062 hold_reason = "No Reason Known"
2063
2064 # Still no record of it after waiting for the exit code file?
2065 if not jobs_info:
2066 jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2067
2068 job_info = jobs_info[0]
2069 backend_status = job_info["JobStatus"]
2070 # if job is held (backend_status = 5) then report why then keep waiting
2071 if backend_status == 5:
2072 hold_reason = job_info.get("HoldReason", None)
2073 B2WARNING(f"{self.job} on hold because of {hold_reason}. Keep waiting.")
2074 backend_status = 2
2075 try:
2076 new_job_status = self.backend_code_to_status[backend_status]
2077 except KeyError as err:
2078 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
2079 if new_job_status != self.job.status:
2080 self.job.status = new_job_status
2081
2082 @classmethod
2083 def _create_job_result(cls, job, job_id):
2084 """
2085 """
2086 B2INFO(f"Job ID of {job} recorded as: {job_id}")
2087 job.result = cls.HTCondorResult(job, job_id)
2088
2089 @classmethod
2090 def _create_parent_job_result(cls, parent):
2091 parent.result = cls.HTCondorResult(parent, None)
2092
2093 def can_submit(self, njobs=1):
2094 """
2095 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2096 Returns True if the number is lower that the limit, False if it is higher.
2097
2098 Parameters:
2099 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2100 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2101 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2102 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2103 and check again before submitting more.
2104 """
2105 B2DEBUG(29, "Calling HTCondor().can_submit()")
2106 jobs_info = self.condor_q()
2107 total_jobs = jobs_info["NJOBS"]
2108 B2INFO(f"Total jobs active in the HTCondor system is currently {total_jobs}")
2109 if (total_jobs + njobs) > self.global_job_limit:
2110 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2111 return False
2112 else:
2113 B2INFO("There is enough space to submit more jobs.")
2114 return True
2115
2116 @classmethod
2117 def condor_q(cls, class_ads=None, job_id="", username=""):
2118 """
2119 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2120 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2121 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2122
2123 Parameters:
2124 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2125 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2126 by the condor_q call.
2127 job_id (str): String representation of the Job ID given by condor_submit during submission.
2128 If this argument is given then the output of this function will be only information about this job.
2129 If this argument is not given, then all jobs matching the other filters will be returned.
2130 username (str): By default we return information about only the current user's jobs. By giving
2131 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2132 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2133 so it isn't recommended.
2134
2135 Returns:
2136 dict: JSON dictionary of the form:
2137
2138 .. code-block:: python
2139
2140 {
2141 "NJOBS":<number of records returned by command>,
2142 "JOBS":[
2143 {
2144 <ClassAd: value>, ...
2145 }, ...
2146 ]
2147 }
2148 """
2149 B2DEBUG(29, f"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2150 if not class_ads:
2151 class_ads = cls.default_class_ads
2152 # Output fields should be comma separated.
2153 field_list_cmd = ",".join(class_ads)
2154 cmd_list = ["condor_q", "-json", "-attributes", field_list_cmd]
2155 # If job_id is set then we ignore all other filters
2156 if job_id:
2157 cmd_list.append(job_id)
2158 else:
2159 if not username:
2160 username = os.environ["USER"]
2161 # If the username is set to all it is a special case
2162 if username == "all":
2163 cmd_list.append("-allusers")
2164 else:
2165 cmd_list.append(username)
2166 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2167 cmd = " ".join(cmd_list)
2168 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2169 # condor_q occassionally fails
2170 try:
2171 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2172 except BaseException:
2173 records = None
2174
2175 if records:
2176 records = decode_json_string(records)
2177 else:
2178 records = []
2179 jobs_info = {"JOBS": records}
2180 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2181 return jobs_info
2182
2183 @classmethod
2184 def condor_history(cls, class_ads=None, job_id="", username=""):
2185 """
2186 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2187 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2188 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2189
2190 Parameters:
2191 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2192 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2193 by the condor_q call.
2194 job_id (str): String representation of the Job ID given by condor_submit during submission.
2195 If this argument is given then the output of this function will be only information about this job.
2196 If this argument is not given, then all jobs matching the other filters will be returned.
2197 username (str): By default we return information about only the current user's jobs. By giving
2198 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2199 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2200 and isn't recommended.
2201
2202 Returns:
2203 dict: JSON dictionary of the form:
2204
2205 .. code-block:: python
2206
2207 {
2208 "NJOBS":<number of records returned by command>,
2209 "JOBS":[
2210 {
2211 <ClassAd: value>, ...
2212 }, ...
2213 ]
2214 }
2215 """
2216 B2DEBUG(29, f"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2217 if not class_ads:
2218 class_ads = cls.default_class_ads
2219 # Output fields should be comma separated.
2220 field_list_cmd = ",".join(class_ads)
2221 cmd_list = ["condor_history", "-json", "-attributes", field_list_cmd]
2222 # If job_id is set then we ignore all other filters
2223 if job_id:
2224 cmd_list.append(job_id)
2225 else:
2226 if not username:
2227 username = os.environ["USER"]
2228 # If the username is set to all it is a special case
2229 if username != "all":
2230 cmd_list.append(username)
2231 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2232 cmd = " ".join(cmd_list)
2233 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2234 try:
2235 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2236 except BaseException:
2237 records = None
2238
2239 if records:
2240 records = decode_json_string(records)
2241 else:
2242 records = []
2243
2244 jobs_info = {"JOBS": records}
2245 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2246 return jobs_info
2247
2248
2249class DIRAC(Backend):
2250 """
2251 Backend for submitting calibration processes to the grid.
2252 """
2253
2254
2255class BackendError(Exception):
2256 """
2257 Base exception class for Backend classes.
2258 """
2259
2260
2261class JobError(Exception):
2262 """
2263 Base exception class for Job objects.
2264 """
2265
2266
2267class SplitterError(Exception):
2268 """
2269 Base exception class for SubjobSplitter objects.
2270 """
2271
2272# @endcond
2273