Belle II Software development
backends.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14from basf2 import B2DEBUG, B2ERROR, B2INFO, B2WARNING
15import re
16import os
17from abc import ABC, abstractmethod
18import json
19import xml.etree.ElementTree as ET
20from math import ceil
21from pathlib import Path
22from collections import deque
23from itertools import count, takewhile
24import shutil
25import time
26from datetime import datetime, timedelta
27import subprocess
28import multiprocessing as mp
29
30from caf.utils import method_dispatch
31from caf.utils import decode_json_string
32from caf.utils import grouper
33from caf.utils import parse_file_uri
34
35
36__all__ = ["Job", "SubJob", "Backend", "Local", "Batch", "LSF", "PBS", "HTCondor", "get_input_data"]
37
38
39_input_data_file_path = Path("__BACKEND_INPUT_FILES__.json")
40
41_STDOUT_FILE = "stdout"
42
43_STDERR_FILE = "stderr"
44
45_backend_job_envvars = (
46 "TMPDIR",
47 "BELLE2_CONFIG_DIR",
48 "VO_BELLE2_SW_DIR",
49 "BELLE2_EXTERNALS_TOPDIR",
50 "BELLE2_CONDB_METADATA",
51 "BELLE2_CONDB_PROXY",
52)
53
54
55def get_input_data():
56 """
57 Simple JSON load of the default input data file. Will contain a list of string file paths
58 for use by the job process.
59 """
60 with open(_input_data_file_path) as input_data_file:
61 input_data = json.load(input_data_file)
62 return input_data
63
64
65def monitor_jobs(args, jobs):
66 unfinished_jobs = jobs[:]
67 failed_jobs = 0
68 while unfinished_jobs:
69 B2INFO("Updating statuses of unfinished jobs...")
70 for j in unfinished_jobs:
71 j.update_status()
72 B2INFO("Checking if jobs are ready...")
73 for j in unfinished_jobs[:]:
74 if j.ready():
75 if j.status == "failed":
76 B2ERROR(f"{j} is failed")
77 failed_jobs += 1
78 else:
79 B2INFO(f"{j} is finished")
80 unfinished_jobs.remove(j)
81 if unfinished_jobs:
82 B2INFO(f"Not all jobs done yet, waiting {args.heartbeat} seconds before re-checking...")
83 time.sleep(args.heartbeat)
84 if failed_jobs > 0:
85 B2ERROR(f"{failed_jobs} jobs failed")
86 else:
87 B2INFO('All jobs finished successfully')
88
89
90class ArgumentsGenerator():
91 def __init__(self, generator_function, *args, **kwargs):
92 """
93 Simple little class to hold a generator (uninitialised) and the necessary args/kwargs to
94 initialise it. This lets us reuse a generator by setting it up again fresh. This is not
95 optimal for expensive calculations, but it is nice for making large sequences of
96 Job input arguments on the fly.
97
98 Parameters:
99 generator_function (py:function): A function (callable) that contains a ``yield`` statement. This generator
100 should *not* be initialised i.e. you haven't called it with ``generator_function(*args, **kwargs)``
101 yet. That will happen when accessing the `ArgumentsGenerator.generator` property.
102 args (tuple): The positional arguments you want to send into the initialisation of the generator.
103 kwargs (dict): The keyword arguments you want to send into the initialisation of the generator.
104 """
105
106 self.generator_function = generator_function
107
108 self.args = args
109
110 self.kwargs = kwargs
111
112 @property
113 def generator(self):
114 """
115 Returns:
116 generator: The initialised generator (using the args and kwargs for initialisation). It should be ready
117 to have ``next``/``send`` called on it.
118 """
119 gen = self.generator_function(*self.args, **self.kwargs)
120 gen.send(None) # Prime it
121 return gen
122
123
124def range_arguments(start=0, stop=None, step=1):
125 """
126 A simple example Arguments Generator function for use as a `ArgumentsGenerator.generator_function`.
127 It will return increasing values using itertools.count. By default it is infinite and will not call `StopIteration`.
128 The `SubJob` object is sent into this function with `send` but is not used.
129
130 Parameters:
131 start (int): The starting value that will be returned.
132 stop (int): At this value the `StopIteration` will be thrown. If this is `None` then this generator will continue
133 forever.
134 step (int): The step size.
135
136 Returns:
137 tuple
138 """
139 def should_stop(x):
140 if stop is not None and x >= stop:
141 return True
142 else:
143 return False
144 # The logic of send/yield is tricky. Basically the first yield must have been hit before send(subjob) can work.
145 # However the first yield is also the one that receives the send(subjob) NOT the send(None).
146 subjob = (yield None) # Handle the send(None) and first send(subjob)
147 args = None
148 for i in takewhile(lambda x: not should_stop(x), count(start, step)):
149 args = (i,) # We could have used the subjob object here
150 B2DEBUG(29, f"{subjob} arguments will be {args}")
151 subjob = (yield args) # Set up ready for the next loop (if it happens)
152
153
154class SubjobSplitter(ABC):
155 """
156 Abstract base class. This class handles the logic of creating subjobs for a `Job` object.
157 The `create_subjobs` function should be implemented and used to construct
158 the subjobs of the parent job object.
159
160 Parameters:
161 arguments_generator (ArgumentsGenerator): Used to construct the generator function that will yield the argument
162 tuple for each `SubJob`. The splitter will iterate through the generator each time `create_subjobs` is
163 called. The `SubJob` will be sent into the generator with ``send(subjob)`` so that the generator can decide what
164 arguments to return.
165 """
166
167 def __init__(self, *, arguments_generator=None):
168 """
169 Derived classes should call `super` to run this.
170 """
171
172 self.arguments_generator = arguments_generator
173
174 @abstractmethod
175 def create_subjobs(self, job):
176 """
177 Implement this method in derived classes to generate the `SubJob` objects.
178 """
179
180 def assign_arguments(self, job):
181 """
182 Use the `arguments_generator` (if one exists) to assign the argument tuples to the
183 subjobs.
184 """
185 if self.arguments_generator:
186 arg_gen = self.arguments_generator.generator
187 generating = True
188 for subjob in sorted(job.subjobs.values(), key=lambda sj: sj.id):
189 if generating:
190 try:
191 args = arg_gen.send(subjob)
192 except StopIteration:
193 B2ERROR(f"StopIteration called when getting args for {subjob}, "
194 "setting all subsequent subjobs to have empty argument tuples.")
195 args = tuple() # If our generator finishes before the subjobs we use empty argument tuples
196 generating = False
197 else:
198 args = tuple()
199 B2DEBUG(29, f"Arguments for {subjob}: {args}")
200 subjob.args = args
201 return
202
203 B2INFO(f"No ArgumentsGenerator assigned to the {self} so subjobs of {job} "
204 "won't automatically have arguments assigned.")
205
206 def __repr__(self):
207 return f"{self.__class__.__name__}"
208
209
210class MaxFilesSplitter(SubjobSplitter):
211
212 def __init__(self, *, arguments_generator=None, max_files_per_subjob=1):
213 """
214 Parameters:
215 max_files_per_subjob (int): The maximum number of input files used per `SubJob` created.
216 """
217 super().__init__(arguments_generator=arguments_generator)
218
219 self.max_files_per_subjob = max_files_per_subjob
220
221 def create_subjobs(self, job):
222 """
223 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
224 in order to prevent the number of input files per subjob going over the limit set by
225 `MaxFilesSplitter.max_files_per_subjob`.
226 """
227 if not job.input_files:
228 B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
229 return
230
231 for i, subjob_input_files in enumerate(grouper(self.max_files_per_subjob, job.input_files)):
232 job.create_subjob(i, input_files=subjob_input_files)
233
234 self.assign_arguments(job)
235
236 B2INFO(f"{self} created {i+1} Subjobs for {job}")
237
238
239class MaxSubjobsSplitter(SubjobSplitter):
240
241 def __init__(self, *, arguments_generator=None, max_subjobs=1000):
242 """
243 Parameters:
244 max_subjobs (int): The maximum number ofsubjobs that will be created.
245 """
246 super().__init__(arguments_generator=arguments_generator)
247
248 self.max_subjobs = max_subjobs
249
250 def create_subjobs(self, job):
251 """
252 This function creates subjobs for the parent job passed in. It creates as many subjobs as required
253 by the number of input files up to the maximum set by `MaxSubjobsSplitter.max_subjobs`. If there are
254 more input files than `max_subjobs` it instead groups files by the minimum number per subjob in order to
255 respect the subjob limit e.g. If you have 11 input files and a maximum number of subjobs of 4, then it
256 will create 4 subjobs, 3 of them with 3 input files, and one with 2 input files.
257 """
258 if not job.input_files:
259 B2WARNING(f"Subjob splitting by input files requested, but no input files exist for {job}. No subjobs created.")
260 return
261
262 # Since we will be popping from the left of the input file list, we construct a deque
263 remaining_input_files = deque(job.input_files)
264 # The initial number of available subjobs, will go down as we create them
265 available_subjobs = self.max_subjobs
266 subjob_i = 0
267 while remaining_input_files:
268 # How many files should we use for this subjob?
269 num_input_files = ceil(len(remaining_input_files) / available_subjobs)
270 # Pop them from the remaining files
271 subjob_input_files = []
272 for i in range(num_input_files):
273 subjob_input_files.append(remaining_input_files.popleft())
274 # Create the actual subjob
275 job.create_subjob(subjob_i, input_files=subjob_input_files)
276 subjob_i += 1
277 available_subjobs -= 1
278
279 self.assign_arguments(job)
280 B2INFO(f"{self} created {subjob_i} Subjobs for {job}")
281
282
283class ArgumentsSplitter(SubjobSplitter):
284 """
285 Creates SubJobs based on the given argument generator. The generator will be called until a `StopIteration` is issued.
286 Be VERY careful to not accidentally give an infinite generator! Otherwise it will simply create SubJobs until you run out
287 of memory. You can set the `ArgumentsSplitter.max_subjobs` parameter to try and prevent this and throw an exception.
288
289 This splitter is useful for MC production jobs where you don't have any input files, but you want to control the exp/run
290 numbers of subjobs. If you do have input files set for the parent `Job` objects, then the same input files will be
291 assigned to every `SubJob`.
292
293 Parameters:
294 arguments_generator (ArgumentsGenerator): The standard ArgumentsGenerator that is used to assign arguments
295 """
296
297 def __init__(self, *, arguments_generator=None, max_subjobs=None):
298 """
299 """
300 super().__init__(arguments_generator=arguments_generator)
301
302 self.max_subjobs = max_subjobs
303
304 def create_subjobs(self, job):
305 """
306 This function creates subjobs for the parent job passed in. It creates subjobs until the
307 `SubjobSplitter.arguments_generator` finishes.
308
309 If `ArgumentsSplitter.max_subjobs` is set, then it will throw an exception if more than this number of
310 subjobs are created.
311 """
312 arg_gen = self.arguments_generator.generator
313 for i in count():
314 # Reached the maximum?
315 if i >= self.max_subjobs:
316 raise SplitterError(f"{self} tried to create more subjobs than the maximum (={self.max_subjobs}).")
317 try:
318 subjob = SubJob(job, i, job.input_files) # We manually create it because we might not need it
319 args = arg_gen.send(subjob) # Might throw StopIteration
320 B2INFO(f"Creating {job}.{subjob}")
321 B2DEBUG(29, f"Arguments for {subjob}: {args}")
322 subjob.args = args
323 job.subjobs[i] = subjob
324 except StopIteration:
325 break
326 B2INFO(f"{self} created {i+1} Subjobs for {job}")
327
328
329class Job:
330 """
331 This generic Job object is used to tell a Backend what to do.
332 This object basically holds necessary information about a process you want to submit to a `Backend`.
333 It should *not* do anything that is backend specific, just hold the configuration for a job to be
334 successfully submitted and monitored using a backend. The result attribute is where backend
335 specific job monitoring goes.
336
337 Parameters:
338 name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF
339
340 .. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.
341 """
342
343
345 statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
346
347
348 exit_statuses = ["failed", "completed"]
349
350 def __init__(self, name, job_dict=None):
351 """
352 """
353
354 self.name = name
355
356 self.splitter = None
357
358 if not job_dict:
359
361 self.input_sandbox_files = []
362
363 self.working_dir = Path()
364
365 self.output_dir = Path()
366
367 self.output_patterns = []
368
369 self.cmd = []
370
371 self.args = []
372
373 self.input_files = []
374
375 self.setup_cmds = []
376
378 self.backend_args = {}
379
380 self.subjobs = {}
381 elif job_dict:
382 self.input_sandbox_files = [Path(p) for p in job_dict["input_sandbox_files"]]
383 self.working_dir = Path(job_dict["working_dir"])
384 self.output_dir = Path(job_dict["output_dir"])
385 self.output_patterns = job_dict["output_patterns"]
386 self.cmd = job_dict["cmd"]
387 self.args = job_dict["args"]
388 self.input_files = job_dict["input_files"]
389 self.setup_cmds = job_dict["setup_cmds"]
390 self.backend_args = job_dict["backend_args"]
391 self.subjobs = {}
392 for subjob_dict in job_dict["subjobs"]:
393 self.create_subjob(subjob_dict["id"], input_files=subjob_dict["input_files"], args=subjob_dict["args"])
394
395
397 self.result = None
398
399 self._status = "init"
400
401 def __repr__(self):
402 """
403 Representation of Job class (what happens when you print a Job() instance).
404 """
405 return f"Job({self.name})"
406
407 def ready(self):
408 """
409 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
410 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
411 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
412 """
413 if not self.result:
414 B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
415 return False
416 else:
417 return self.result.ready()
418
419 def update_status(self):
420 """
421 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
422 in the best way for the type of result object/backend.
423 """
424 if not self.result:
425 B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
426 else:
427 self.result.update_status()
428 return self.status
429
430 def create_subjob(self, i, input_files=None, args=None):
431 """
432 Creates a subjob Job object that references that parent Job.
433 Returns the SubJob object at the end.
434 """
435 if i not in self.subjobs:
436 B2INFO(f"Creating {self}.Subjob({i})")
437 subjob = SubJob(self, i, input_files)
438 if args:
439 subjob.args = args
440 self.subjobs[i] = subjob
441 return subjob
442 else:
443 B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
444
445 @property
446 def status(self):
447 """
448 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
449 subjob status in the hierarchy of statuses in `Job.statuses`.
450 """
451 if self.subjobs:
452 job_status = self._get_overall_status_from_subjobs()
453 if job_status != self._status:
454
455 self.status = job_status
456 return self._status
457
458 def _get_overall_status_from_subjobs(self):
459 subjob_statuses = [subjob.status for subjob in self.subjobs.values()]
460 status_level = min([self.statuses[status] for status in subjob_statuses])
461 for status, level in self.statuses.items():
462 if level == status_level:
463 return status
464
465 @status.setter
466 def status(self, status):
467 """
468 Sets the status of this Job.
469 """
470 # Print an error only if the job failed.
471 if status == 'failed':
472 B2ERROR(f"Setting {self.name} status to failed")
473 else:
474 B2INFO(f"Setting {self.name} status to {status}")
475 self._status = status
476
477 @property
478 def output_dir(self):
479 return self._output_dir
480
481 @output_dir.setter
482 def output_dir(self, value):
483 self._output_dir = Path(value).absolute()
484
485 @property
486 def working_dir(self):
487 return self._working_dir
488
489 @working_dir.setter
490 def working_dir(self, value):
491 self._working_dir = Path(value).absolute()
492
493 @property
494 def input_sandbox_files(self):
495 return self._input_sandbox_files
496
497 @input_sandbox_files.setter
498 def input_sandbox_files(self, value):
499 self._input_sandbox_files = [Path(p).absolute() for p in value]
500
501 @property
502 def input_files(self):
503 return self._input_files
504
505 @input_files.setter
506 def input_files(self, value):
507 self._input_files = value
508
509 @property
510 def max_subjobs(self):
511 return self.splitter.max_subjobs
512
513 @max_subjobs.setter
514 def max_subjobs(self, value):
515 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
516 B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
517
518 @property
519 def max_files_per_subjob(self):
520 return self.splitter.max_files_per_subjob
521
522 @max_files_per_subjob.setter
523 def max_files_per_subjob(self, value):
524 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
525 B2DEBUG(29, f"Changed splitter to {self.splitter} for {self}.")
526
527 def dump_to_json(self, file_path):
528 """
529 Dumps the Job object configuration to a JSON file so that it can be read in again later.
530
531 Parameters:
532 file_path(`basf2.Path`): The filepath we'll dump to
533 """
534 with open(file_path, mode="w") as job_file:
535 json.dump(self.job_dict, job_file, indent=2)
536
537 @classmethod
538 def from_json(cls, file_path):
539 with open(file_path) as job_file:
540 job_dict = json.load(job_file)
541 return cls(job_dict["name"], job_dict=job_dict)
542
543 @property
544 def job_dict(self):
545 """
546 Returns:
547 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
548 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
549 """
550 job_dict = {}
551 job_dict["name"] = self.name
552 job_dict["input_sandbox_files"] = [i.as_posix() for i in self.input_sandbox_files]
553 job_dict["working_dir"] = self.working_dir.as_posix()
554 job_dict["output_dir"] = self.output_dir.as_posix()
555 job_dict["output_patterns"] = self.output_patterns
556 job_dict["cmd"] = self.cmd
557 job_dict["args"] = self.args
558 job_dict["input_files"] = self.input_files
559 job_dict["setup_cmds"] = self.setup_cmds
560 job_dict["backend_args"] = self.backend_args
561 job_dict["subjobs"] = [sj.job_dict for sj in self.subjobs.values()]
562 return job_dict
563
564 def dump_input_data(self):
565 """
566 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
567 string URI objects.
568 """
569 with open(Path(self.working_dir, _input_data_file_path), mode="w") as input_data_file:
570 json.dump(self.input_files, input_data_file, indent=2)
571
572 def copy_input_sandbox_files_to_working_dir(self):
573 """
574 Get all of the requested files for the input sandbox and copy them to the working directory.
575 Files like the submit.sh or input_data.json are not part of this process.
576 """
577 for file_path in self.input_sandbox_files:
578 if file_path.is_dir():
579 shutil.copytree(file_path, Path(self.working_dir, file_path.name))
580 else:
581 shutil.copy(file_path, self.working_dir)
582
583 def check_input_data_files(self):
584 """
585 Check the input files and make sure that there aren't any duplicates.
586 Also check if the files actually exist if possible.
587 """
588 existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
589 for file_path in self.input_files:
590 file_uri = parse_file_uri(file_path)
591 if file_uri.scheme == "file":
592 p = Path(file_uri.path)
593 if p.is_file():
594 if file_uri.geturl() not in existing_input_files:
595 existing_input_files.append(file_uri.geturl())
596 else:
597 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
598 else:
599 B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
600 else:
601 B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
602 if file_path not in existing_input_files:
603 existing_input_files.append(file_path)
604 else:
605 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
606 if self.input_files and not existing_input_files:
607 B2WARNING(f"No valid input file paths found for {self.name}, but some were requested.")
608
609 # Replace the Job's input files with the ones that exist + duplicates removed
610 self.input_files = existing_input_files
611
612 @property
613 def full_command(self):
614 """
615 Returns:
616 str: The full command that this job will run including any arguments.
617 """
618 all_components = self.cmd[:]
619 all_components.extend(self.args)
620 # We do a convert to string just in case arguments were generated as different types
621 full_command = " ".join(map(str, all_components))
622 B2DEBUG(29, f"Full command of {self} is '{full_command}'")
623 return full_command
624
625 def append_current_basf2_setup_cmds(self):
626 """
627 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
628 It should detect if you are using a local release or CVMFS and append the correct commands
629 so that the job will have the same basf2 release environment. It should also detect
630 if a local release is not compiled with the ``opt`` option.
631
632 Note that this *doesn't mean that every environment variable is inherited* from the submitting
633 process environment.
634 """
635 def append_environment_variable(cmds, envvar):
636 """
637 Append a command for setting an environment variable.
638 """
639 if envvar in os.environ:
640 cmds.append(f"""if [ -z "${{{envvar}}}" ]; then""")
641 cmds.append(f" export {envvar}={os.environ[envvar]}")
642 cmds.append("fi")
643
644 if "BELLE2_TOOLS" not in os.environ:
645 raise BackendError("No BELLE2_TOOLS found in environment")
646 # Export all the environment variables defined via _backend_job_envvars
647 for envvar in _backend_job_envvars:
648 append_environment_variable(self.setup_cmds, envvar)
649 if "BELLE2_RELEASE" in os.environ:
650 self.setup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
651 elif 'BELLE2_LOCAL_DIR' in os.environ:
652 self.setup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
653 self.setup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
654 self.setup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
655 self.setup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
656 self.setup_cmds.append("pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
657 self.setup_cmds.append("source $BACKEND_B2SETUP")
658 # b2code-option has to be executed only after the source of the tools.
659 self.setup_cmds.append("b2code-option $BACKEND_BELLE2_OPTION")
660 self.setup_cmds.append("popd > /dev/null")
661
662
663class SubJob(Job):
664 """
665 This mini-class simply holds basic information about which subjob you are
666 and a reference to the parent Job object to be able to access the main data there.
667 Rather than replicating all of the parent job's configuration again.
668 """
669
670 def __init__(self, job, subjob_id, input_files=None):
671 """
672 """
673
674 self.id = subjob_id
675
676 self.parent = job
677
678 if not input_files:
679 input_files = []
680 self.input_files = input_files
681
683 self.result = None
684
685 self._status = "init"
686
687 self.args = []
688
689 @property
690 def output_dir(self):
691 """
692 Getter for output_dir of SubJob. Accesses the parent Job output_dir to infer this."""
693 return Path(self.parent.output_dir, str(self.id))
694
695 @property
696 def working_dir(self):
697 """Getter for working_dir of SubJob. Accesses the parent Job working_dir to infer this."""
698 return Path(self.parent.working_dir, str(self.id))
699
700 @property
701 def name(self):
702 """Getter for name of SubJob. Accesses the parent Job name to infer this."""
703 return "_".join((self.parent.name, str(self.id)))
704
705 @property
706 def status(self):
707 """
708 Returns the status of this SubJob.
709 """
710 return self._status
711
712 @status.setter
713 def status(self, status):
714 """
715 Sets the status of this Job.
716 """
717 # Print an error only if the job failed.
718 if status == "failed":
719 B2ERROR(f"Setting {self.name} status to failed")
720 else:
721 B2INFO(f"Setting {self.name} status to {status}")
722 self._status = status
723
724 @property
725 def subjobs(self):
726 """
727 A subjob cannot have subjobs. Always return empty list.
728 """
729 return []
730
731 @property
732 def job_dict(self):
733 """
734 Returns:
735 dict: A JSON serialisable representation of the `SubJob`. `Path <basf2.Path>` objects are converted to
736 `string` via ``Path.as_posix()``. Since Subjobs inherit most of the parent job's config
737 we only output the input files and arguments that are specific to this subjob and no other details.
738 """
739 job_dict = {}
740 job_dict["id"] = self.id
741 job_dict["input_files"] = self.input_files
742 job_dict["args"] = self.args
743 return job_dict
744
745 def __getattr__(self, attribute):
746 """
747 Since a SubJob uses attributes from the parent Job, everything simply accesses the Job attributes
748 unless otherwise specified.
749 """
750 return getattr(self.parent, attribute)
751
752 def __repr__(self):
753 """
754 """
755 return f"SubJob({self.name})"
756
757
758class Backend(ABC):
759 """
760 Abstract base class for a valid backend.
761 Classes derived from this will implement their own submission of basf2 jobs
762 to whatever backend they describe.
763 Some common methods/attributes go into this base class.
764
765 For backend_args the priority from lowest to highest is:
766
767 backend.default_backend_args -> backend.backend_args -> job.backend_args
768 """
769
770 submit_script = "submit.sh"
771
772 exit_code_file = "__BACKEND_CMD_EXIT_STATUS__"
773
774 default_backend_args = {}
775
776 def __init__(self, *, backend_args=None):
777 """
778 """
779 if backend_args is None:
780 backend_args = {}
781
782 self.backend_args = {**self.default_backend_args, **backend_args}
783
784 @abstractmethod
785 def submit(self, job):
786 """
787 Base method for submitting collection jobs to the backend type. This MUST be
788 implemented for a correctly written backend class deriving from Backend().
789 """
790
791 @staticmethod
792 def _add_setup(job, batch_file):
793 """
794 Adds setup lines to the shell script file.
795 """
796 for line in job.setup_cmds:
797 print(line, file=batch_file)
798
799 def _add_wrapper_script_setup(self, job, batch_file):
800 """
801 Adds lines to the submitted script that help with job monitoring/setup. Mostly here so that we can insert
802 `trap` statements for Ctrl-C situations.
803 """
804 start_wrapper = f"""# ---
805# trap ctrl-c and call ctrl_c()
806trap '(ctrl_c 130)' SIGINT
807trap '(ctrl_c 143)' SIGTERM
808
809function write_exit_code() {{
810 echo "Writing $1 to exit status file"
811 echo "$1" > {self.exit_code_file}
812 exit $1
813}}
814
815function ctrl_c() {{
816 trap '' SIGINT SIGTERM
817 echo "** Trapped Ctrl-C **"
818 echo "$1" > {self.exit_code_file}
819 exit $1
820}}
821# ---"""
822 print(start_wrapper, file=batch_file)
823
824 def _add_wrapper_script_teardown(self, job, batch_file):
825 """
826 Adds lines to the submitted script that help with job monitoring/teardown. Mostly here so that we can insert
827 an exit code of the job cmd being written out to a file. Which means that we can know if the command was
828 successful or not even if the backend server/monitoring database purges the data about our job i.e. If PBS
829 removes job information too quickly we may never know if a job succeeded or failed without some kind of exit
830 file.
831 """
832 end_wrapper = """# ---
833write_exit_code $?"""
834 print(end_wrapper, file=batch_file)
835
836 @classmethod
837 def _create_parent_job_result(cls, parent):
838 """
839 We want to be able to call `ready()` on the top level `Job.result`. So this method needs to exist
840 so that a Job.result object actually exists. It will be mostly empty and simply updates subjob
841 statuses and allows the use of ready().
842 """
843 raise NotImplementedError
844
845 def get_submit_script_path(self, job):
846 """
847 Construct the Path object of the bash script file that we will submit. It will contain
848 the actual job command, wrapper commands, setup commands, and any batch directives
849 """
850 return Path(job.working_dir, self.submit_script)
851
852
853class Result():
854 """
855 Base class for Result objects. A Result is created for each `Job` (or `Job.SubJob`) object
856 submitted to a backend. It provides a way to query a job's status to find out if it's ready.
857 """
858
859 def __init__(self, job):
860 """
861 Pass in the job object to allow the result to access the job's properties and do post-processing.
862 """
863
864 self.job = job
865
866 self._is_ready = False
867
869 self.time_to_wait_for_exit_code_file = timedelta(minutes=5)
870
871 self.exit_code_file_initial_time = None
872
873 def ready(self):
874 """
875 Returns whether or not this job result is known to be ready. Doesn't actually change the job status. Just changes
876 the 'readiness' based on the known job status.
877 """
878 B2DEBUG(29, f"Calling {self.job}.result.ready()")
879 if self._is_ready:
880 return True
881 elif self.job.status in self.job.exit_statuses:
882 self._is_ready = True
883 return True
884 else:
885 return False
886
887 def update_status(self):
888 """
889 Update the job's (and subjobs') status so that `Result.ready` will return the up to date status. This call will have to
890 actually look up the job's status from some database/exit code file.
891 """
892 raise NotImplementedError
893
894 def get_exit_code_from_file(self):
895 """
896 Read the exit code file to discover the exit status of the job command. Useful fallback if the job is no longer
897 known to the job database (batch system purged it for example). Since some backends may take time to download
898 the output files of the job back to the working directory we use a time limit on how long to wait.
899 """
900 if not self.exit_code_file_initial_time:
901 self.exit_code_file_initial_time = datetime.now()
902 exit_code_path = Path(self.job.working_dir, Backend.exit_code_file)
903 with open(exit_code_path) as f:
904 exit_code = int(f.read().strip())
905 B2DEBUG(29, f"Exit code from file for {self.job} was {exit_code}")
906 return exit_code
907
908
909class Local(Backend):
910 """
911 Backend for local processes i.e. on the same machine but in a subprocess.
912
913 Note that you should call the self.join() method to close the pool and wait for any
914 running processes to finish before exiting the process. Once you've called join you will have to set up a new
915 instance of this backend to create a new pool. If you don't call `Local.join` or don't create a join yourself
916 somewhere, then the main python process might end before your pool is done.
917
918 Keyword Arguments:
919 max_processes (int): Integer that specifies the size of the process pool that spawns the subjobs, default=1.
920 It's the maximum simultaneous subjobs.
921 Try not to specify a large number or a number larger than the number of cores.
922 It won't crash the program but it will slow down and negatively impact performance.
923 """
924
925 def __init__(self, *, backend_args=None, max_processes=1):
926 """
927 """
928 super().__init__(backend_args=backend_args)
929
930 self.pool = None
931
932 self.max_processes = max_processes
933
934 class LocalResult(Result):
935 """
936 Result class to help monitor status of jobs submitted by Local backend.
937 """
938
939 def __init__(self, job, result):
940 """
941 Pass in the job object and the multiprocessing result to allow the result to do monitoring and perform
942 post processing of the job.
943 """
944 super().__init__(job)
945
946 self.result = result
947
948 def _update_result_status(self):
949 if self.result.ready() and (self.job.status not in self.job.exit_statuses):
950 return_code = self.result.get()
951 if return_code:
952 self.job.status = "failed"
953 else:
954 self.job.status = "completed"
955
956 def update_status(self):
957 """
958 Update the job's (or subjobs') status by calling the result object.
959 """
960 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
961 if self.job.subjobs:
962 for subjob in self.job.subjobs.values():
963 subjob.result._update_result_status()
964 else:
965 self._update_result_status()
966
967 def join(self):
968 """
969 Closes and joins the Pool, letting you wait for all results currently
970 still processing.
971 """
972 B2INFO("Joining Process Pool, waiting for results to finish...")
973 self.pool.close()
974 self.pool.join()
975 B2INFO("Process Pool joined.")
976
977 @property
978 def max_processes(self):
979 """
980 Getter for max_processes
981 """
982 return self._max_processes
983
984 @max_processes.setter
985 def max_processes(self, value):
986 """
987 Setter for max_processes, we also check for a previous Pool(), wait for it to join
988 and then create a new one with the new value of max_processes
989 """
990
991 self._max_processes = value
992 if self.pool:
993 B2INFO("New max_processes requested. But a pool already exists.")
994 self.join()
995 B2INFO(f"Starting up new Pool with {self.max_processes} processes")
996 self.pool = mp.Pool(processes=self.max_processes)
997
998 @method_dispatch
999 def submit(self, job):
1000 """
1001 """
1002 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1003 "Did you submit a (Sub)Job?")
1004
1005 @submit.register(SubJob)
1006 def _(self, job):
1007 """
1008 Submission of a `SubJob` for the Local backend
1009 """
1010 # Make sure the output directory of the job is created
1011 job.output_dir.mkdir(parents=True, exist_ok=True)
1012 # Make sure the working directory of the job is created
1013 job.working_dir.mkdir(parents=True, exist_ok=True)
1014 job.copy_input_sandbox_files_to_working_dir()
1015 job.dump_input_data()
1016 # Get the path to the bash script we run
1017 script_path = self.get_submit_script_path(job)
1018 with open(script_path, mode="w") as batch_file:
1019 print("#!/bin/bash", file=batch_file)
1020 self._add_wrapper_script_setup(job, batch_file)
1021 self._add_setup(job, batch_file)
1022 print(job.full_command, file=batch_file)
1023 self._add_wrapper_script_teardown(job, batch_file)
1024 B2INFO(f"Submitting {job}")
1025 job.result = Local.LocalResult(job,
1026 self.pool.apply_async(self.run_job,
1027 (job.name,
1028 job.working_dir,
1029 job.output_dir,
1030 script_path)
1031 )
1032 )
1033 job.status = "submitted"
1034 B2INFO(f"{job} submitted")
1035
1036 @submit.register(Job)
1037 def _(self, job):
1038 """
1039 Submission of a `Job` for the Local backend
1040 """
1041 # Make sure the output directory of the job is created
1042 job.output_dir.mkdir(parents=True, exist_ok=True)
1043 # Make sure the working directory of the job is created
1044 job.working_dir.mkdir(parents=True, exist_ok=True)
1045 # Check if we have any valid input files
1046 job.check_input_data_files()
1047
1048 if not job.splitter:
1049 # Get all of the requested files for the input sandbox and copy them to the working directory
1050 job.copy_input_sandbox_files_to_working_dir()
1051 job.dump_input_data()
1052 # Get the path to the bash script we run
1053 script_path = self.get_submit_script_path(job)
1054 with open(script_path, mode="w") as batch_file:
1055 print("#!/bin/bash", file=batch_file)
1056 self._add_wrapper_script_setup(job, batch_file)
1057 self._add_setup(job, batch_file)
1058 print(job.full_command, file=batch_file)
1059 self._add_wrapper_script_teardown(job, batch_file)
1060 B2INFO(f"Submitting {job}")
1061 job.result = Local.LocalResult(job,
1062 self.pool.apply_async(self.run_job,
1063 (job.name,
1064 job.working_dir,
1065 job.output_dir,
1066 script_path)
1067 )
1068 )
1069 B2INFO(f"{job} submitted")
1070 else:
1071 # Create subjobs according to the splitter's logic
1072 job.splitter.create_subjobs(job)
1073 # Submit the subjobs
1074 self.submit(list(job.subjobs.values()))
1075 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1076 self._create_parent_job_result(job)
1077
1078 @submit.register(list)
1079 def _(self, jobs):
1080 """
1081 Submit method of Local() that takes a list of jobs instead of just one and submits each one.
1082 """
1083 # Submit the jobs
1084 for job in jobs:
1085 self.submit(job)
1086 B2INFO("All requested jobs submitted.")
1087
1088 @staticmethod
1089 def run_job(name, working_dir, output_dir, script):
1090 """
1091 The function that is used by multiprocessing.Pool.apply_async during process creation. This runs a
1092 shell command in a subprocess and captures the stdout and stderr of the subprocess to files.
1093 """
1094 B2INFO(f"Starting Sub-process: {name}")
1095 from subprocess import Popen
1096 stdout_file_path = Path(working_dir, _STDOUT_FILE)
1097 stderr_file_path = Path(working_dir, _STDERR_FILE)
1098 # Create unix command to redirect stdour and stderr
1099 B2INFO(f"stdout/err for subprocess {name} visible at:\n\t{stdout_file_path}\n\t{stderr_file_path}")
1100 with open(stdout_file_path, mode="w", buffering=1) as f_out, \
1101 open(stderr_file_path, mode="w", buffering=1) as f_err:
1102 with Popen(["/bin/bash", script.as_posix()],
1103 stdout=f_out,
1104 stderr=f_err,
1105 bufsize=1,
1106 universal_newlines=True,
1107 cwd=working_dir,
1108 env={}) as p:
1109 # We block here and wait so that the return code will be set.
1110 p.wait()
1111 B2INFO(f"Subprocess {name} finished.")
1112 return p.returncode
1113
1114 @classmethod
1115 def _create_parent_job_result(cls, parent):
1116 parent.result = cls.LocalResult(parent, None)
1117
1118
1119class Batch(Backend):
1120 """
1121 Abstract Base backend for submitting to a local batch system. Batch system specific commands should be implemented
1122 in a derived class. Do not use this class directly!
1123 """
1124
1125 submission_cmds = []
1126
1138 default_global_job_limit = 1000
1139
1140 default_sleep_between_submission_checks = 30
1141
1142 def __init__(self, *, backend_args=None):
1143 """
1144 Init method for Batch Backend. Does some basic default setup.
1145 """
1146 super().__init__(backend_args=backend_args)
1147
1149 self.global_job_limit = self.default_global_job_limit
1150
1152 self.sleep_between_submission_checks = self.default_sleep_between_submission_checks
1153
1154 def _add_batch_directives(self, job, file):
1155 """
1156 Should be implemented in a derived class to write a batch submission script to the job.working_dir.
1157 You should think about where the stdout/err should go, and set the queue name.
1158 """
1159 raise NotImplementedError("Need to implement a _add_batch_directives(self, job, file) "
1160 f"method in {self.__class__.__name__} backend.")
1161
1162 def _make_submit_file(self, job, submit_file_path):
1163 """
1164 Useful for the HTCondor backend where a submit is needed instead of batch
1165 directives pasted directly into the submission script. It should be overwritten
1166 if needed.
1167 """
1168
1169 @classmethod
1170 @abstractmethod
1171 def _submit_to_batch(cls, cmd):
1172 """
1173 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1174 """
1175
1176 def can_submit(self, *args, **kwargs):
1177 """
1178 Should be implemented in a derived class to check that submitting the next job(s) shouldn't fail.
1179 This is initially meant to make sure that we don't go over the global limits of jobs (submitted + running).
1180
1181 Returns:
1182 bool: If the job submission can continue based on the current situation.
1183 """
1184 return True
1185
1186 @method_dispatch
1187 def submit(self, job, check_can_submit=True, jobs_per_check=100):
1188 """
1189 """
1190 raise NotImplementedError("This is an abstract submit(job) method that shouldn't have been called. "
1191 "Did you submit a (Sub)Job?")
1192
1193 @submit.register(SubJob)
1194 def _(self, job, check_can_submit=True, jobs_per_check=100):
1195 """
1196 Submit method of Batch backend for a `SubJob`. Should take `SubJob` object, create needed directories,
1197 create batch script, and send it off with the batch submission command.
1198 It should apply the correct options (default and user requested).
1199
1200 Should set a Result object as an attribute of the job.
1201 """
1202 # Make sure the output directory of the job is created, commented out due to permission issues
1203 # job.output_dir.mkdir(parents=True, exist_ok=True)
1204 # Make sure the working directory of the job is created
1205 job.working_dir.mkdir(parents=True, exist_ok=True)
1206 job.copy_input_sandbox_files_to_working_dir()
1207 job.dump_input_data()
1208 # Make submission file if needed
1209 batch_submit_script_path = self.get_batch_submit_script_path(job)
1210 self._make_submit_file(job, batch_submit_script_path)
1211 # Get the bash file we will actually run, might be the same file
1212 script_path = self.get_submit_script_path(job)
1213 # Construct the batch submission script (with directives if that is supported)
1214 with open(script_path, mode="w") as batch_file:
1215 self._add_batch_directives(job, batch_file)
1216 self._add_wrapper_script_setup(job, batch_file)
1217 self._add_setup(job, batch_file)
1218 print(job.full_command, file=batch_file)
1219 self._add_wrapper_script_teardown(job, batch_file)
1220 os.chmod(script_path, 0o755)
1221 B2INFO(f"Submitting {job}")
1222 # Do the actual batch submission
1223 cmd = self._create_cmd(batch_submit_script_path)
1224 output = self._submit_to_batch(cmd)
1225 self._create_job_result(job, output)
1226 job.status = "submitted"
1227 B2INFO(f"{job} submitted")
1228
1229 @submit.register(Job)
1230 def _(self, job, check_can_submit=True, jobs_per_check=100):
1231 """
1232 Submit method of Batch backend. Should take job object, create needed directories, create batch script,
1233 and send it off with the batch submission command, applying the correct options (default and user requested.)
1234
1235 Should set a Result object as an attribute of the job.
1236 """
1237 # Make sure the output directory of the job is created, commented out due to permissions issue
1238 # job.output_dir.mkdir(parents=True, exist_ok=True)
1239 # Make sure the working directory of the job is created
1240 job.working_dir.mkdir(parents=True, exist_ok=True)
1241 # Check if we have any valid input files
1242 job.check_input_data_files()
1243 # Add any required backend args that are missing (I'm a bit hesitant to actually merge with job.backend_args)
1244 # just in case you want to resubmit the same job with different backend settings later.
1245 # job_backend_args = {**self.backend_args, **job.backend_args}
1246
1247 # If there's no splitter then we just submit the Job with no SubJobs
1248 if not job.splitter:
1249 # Get all of the requested files for the input sandbox and copy them to the working directory
1250 job.copy_input_sandbox_files_to_working_dir()
1251 job.dump_input_data()
1252 # Make submission file if needed
1253 batch_submit_script_path = self.get_batch_submit_script_path(job)
1254 self._make_submit_file(job, batch_submit_script_path)
1255 # Get the bash file we will actually run
1256 script_path = self.get_submit_script_path(job)
1257 # Construct the batch submission script (with directives if that is supported)
1258 with open(script_path, mode="w") as batch_file:
1259 self._add_batch_directives(job, batch_file)
1260 self._add_wrapper_script_setup(job, batch_file)
1261 self._add_setup(job, batch_file)
1262 print(job.full_command, file=batch_file)
1263 self._add_wrapper_script_teardown(job, batch_file)
1264 os.chmod(script_path, 0o755)
1265 B2INFO(f"Submitting {job}")
1266 # Do the actual batch submission
1267 cmd = self._create_cmd(batch_submit_script_path)
1268 output = self._submit_to_batch(cmd)
1269 self._create_job_result(job, output)
1270 job.status = "submitted"
1271 B2INFO(f"{job} submitted")
1272 else:
1273 # Create subjobs according to the splitter's logic
1274 job.splitter.create_subjobs(job)
1275 # Submit the subjobs
1276 self.submit(list(job.subjobs.values()))
1277 # After submitting subjobs, make a Job.result for the parent Job object, used to call ready() on
1278 self._create_parent_job_result(job)
1279
1280 @submit.register(list)
1281 def _(self, jobs, check_can_submit=True, jobs_per_check=100):
1282 """
1283 Submit method of Batch Backend that takes a list of jobs instead of just one and submits each one.
1284 """
1285 B2INFO(f"Submitting a list of {len(jobs)} jobs to a Batch backend")
1286 # Technically this could be a list of Jobs or SubJobs. And if it is a list of Jobs then it might not
1287 # be necessary to check if we can submit right now. We could do it later during the submission of the
1288 # SubJob list. However in the interest of simpler code we just do the check here, and re-check again
1289 # if a SubJob list comes through this function. Slightly inefficient, but much simpler logic.
1290
1291 # The first thing to do is make sure that we are iterating through the jobs list in chunks that are
1292 # equal to or smaller than the global limit. Otherwise nothing will ever submit.
1293
1294 if jobs_per_check > self.global_job_limit:
1295 B2INFO(f"jobs_per_check (={jobs_per_check}) but this is higher than the global job "
1296 f"limit for this backend (={self.global_job_limit}). Will instead use the "
1297 " value of the global job limit.")
1298 jobs_per_check = self.global_job_limit
1299
1300 # We group the jobs list into chunks of length jobs_per_check
1301 for jobs_to_submit in grouper(jobs_per_check, jobs):
1302 # Wait until we are allowed to submit
1303 while not self.can_submit(njobs=len(jobs_to_submit)):
1304 B2INFO("Too many jobs are currently in the batch system globally. Waiting until submission can continue...")
1305 time.sleep(self.sleep_between_submission_checks)
1306 else:
1307 # We loop here since we have already checked if the number of jobs is low enough, we don't want to hit this
1308 # function again unless one of the jobs has subjobs.
1309 B2INFO(f"Submitting the next {len(jobs_to_submit)} jobs...")
1310 for job in jobs_to_submit:
1311 self.submit(job, check_can_submit, jobs_per_check)
1312 B2INFO(f"All {len(jobs)} requested jobs submitted")
1313
1314 def get_batch_submit_script_path(self, job):
1315 """
1316 Construct the Path object of the script file that we will submit using the batch command.
1317 For most batch backends this is the same script as the bash script we submit.
1318 But for some they require a separate submission file that describes the job.
1319 To implement that you can implement this function in the Backend class.
1320 """
1321 return Path(job.working_dir, self.submit_script)
1322
1323 @classmethod
1324 @abstractmethod
1325 def _create_job_result(cls, job, batch_output):
1326 """
1327 """
1328
1329 @abstractmethod
1330 def _create_cmd(self, job):
1331 """
1332 """
1333
1334
1335class PBS(Batch):
1336 """
1337 Backend for submitting calibration processes to a qsub batch system.
1338 """
1339
1340 cmd_wkdir = "#PBS -d"
1341
1342 cmd_stdout = "#PBS -o"
1343
1344 cmd_stderr = "#PBS -e"
1345
1346 cmd_queue = "#PBS -q"
1347
1348 cmd_name = "#PBS -N"
1349
1350 submission_cmds = ["qsub"]
1351
1352 default_global_job_limit = 5000
1353
1354 default_backend_args = {"queue": "short"}
1355
1356 def __init__(self, *, backend_args=None):
1357 super().__init__(backend_args=backend_args)
1358
1359 def _add_batch_directives(self, job, batch_file):
1360 """
1361 Add PBS directives to submitted script.
1362 """
1363 job_backend_args = {**self.backend_args, **job.backend_args}
1364 batch_queue = job_backend_args["queue"]
1365 print("#!/bin/bash", file=batch_file)
1366 print("# --- Start PBS ---", file=batch_file)
1367 print(" ".join([PBS.cmd_queue, batch_queue]), file=batch_file)
1368 print(" ".join([PBS.cmd_name, job.name]), file=batch_file)
1369 print(" ".join([PBS.cmd_wkdir, job.working_dir.as_posix()]), file=batch_file)
1370 print(" ".join([PBS.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1371 print(" ".join([PBS.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1372 print("# --- End PBS ---", file=batch_file)
1373
1374 @classmethod
1375 def _create_job_result(cls, job, batch_output):
1376 """
1377 """
1378 job_id = batch_output.replace("\n", "")
1379 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1380 job.result = cls.PBSResult(job, job_id)
1381
1382 def _create_cmd(self, script_path):
1383 """
1384 """
1385 submission_cmd = self.submission_cmds[:]
1386 submission_cmd.append(script_path.as_posix())
1387 return submission_cmd
1388
1389 @classmethod
1390 def _submit_to_batch(cls, cmd):
1391 """
1392 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1393 """
1394 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True)
1395 return sub_out
1396
1397 @classmethod
1398 def _create_parent_job_result(cls, parent):
1399 parent.result = cls.PBSResult(parent, None)
1400
1401 class PBSResult(Result):
1402 """
1403 Simple class to help monitor status of jobs submitted by `PBS` Backend.
1404
1405 You pass in a `Job` object (or `SubJob`) and job id from a qsub command.
1406 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1407 """
1408
1409
1410 backend_code_to_status = {"R": "running",
1411 "C": "completed",
1412 "FINISHED": "completed",
1413 "E": "failed",
1414 "H": "submitted",
1415 "Q": "submitted",
1416 "T": "submitted",
1417 "W": "submitted",
1418 "H": "submitted"
1419 }
1420
1421 def __init__(self, job, job_id):
1422 """
1423 Pass in the job object and the job id to allow the result to do monitoring and perform
1424 post processing of the job.
1425 """
1426 super().__init__(job)
1427
1428 self.job_id = job_id
1429
1430 def update_status(self):
1431 """
1432 Update the job's (or subjobs') status by calling qstat.
1433 """
1434 B2DEBUG(29, f"Calling {self.job}.result.update_status()")
1435 # Get all jobs info and reuse it for each status update to minimise tie spent on this updating.
1436 qstat_output = PBS.qstat()
1437 if self.job.subjobs:
1438 for subjob in self.job.subjobs.values():
1439 subjob.result._update_result_status(qstat_output)
1440 else:
1441 self._update_result_status(qstat_output)
1442
1443 def _update_result_status(self, qstat_output):
1444 """
1445 Parameters:
1446 qstat_output (dict): The JSON output of a previous call to qstat which we can reuse to find the
1447 status of this job. Obviously you should only be passing a JSON dict that contains the 'Job_Id' and
1448 'job_state' information, otherwise it is useless.
1449
1450 """
1451 try:
1452 backend_status = self._get_status_from_output(qstat_output)
1453 except KeyError:
1454 # If this happens then maybe the job id wasn't in the qstat_output argument because it finished.
1455 # Instead of failing immediately we try looking for the exit code file and then fail if it still isn't there.
1456 B2DEBUG(29, f"Checking of the exit code from file for {self.job}")
1457 try:
1458 exit_code = self.get_exit_code_from_file()
1459 except FileNotFoundError:
1460 waiting_time = datetime.now() - self.exit_code_file_initial_time
1461 if self.time_to_wait_for_exit_code_file > waiting_time:
1462 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1463 exit_code = 1
1464 else:
1465 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1466 return
1467 if exit_code:
1468 backend_status = "E"
1469 else:
1470 backend_status = "C"
1471
1472 try:
1473 new_job_status = self.backend_code_to_status[backend_status]
1474 except KeyError as err:
1475 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1476
1477 if new_job_status != self.job.status:
1478 self.job.status = new_job_status
1479
1480 def _get_status_from_output(self, output):
1481 for job_info in output["JOBS"]:
1482 if job_info["Job_Id"] == self.job_id:
1483 return job_info["job_state"]
1484 else:
1485 raise KeyError
1486
1487 def can_submit(self, njobs=1):
1488 """
1489 Checks the global number of jobs in PBS right now (submitted or running) for this user.
1490 Returns True if the number is lower that the limit, False if it is higher.
1491
1492 Parameters:
1493 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1494 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1495 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1496 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1497 and check again before submitting more.
1498 """
1499 B2DEBUG(29, "Calling PBS().can_submit()")
1500 job_info = self.qstat(username=os.environ["USER"])
1501 total_jobs = job_info["NJOBS"]
1502 B2INFO(f"Total jobs active in the PBS system is currently {total_jobs}")
1503 if (total_jobs + njobs) > self.global_job_limit:
1504 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1505 return False
1506 else:
1507 B2INFO("There is enough space to submit more jobs.")
1508 return True
1509
1510 @classmethod
1511 def qstat(cls, username="", job_ids=None):
1512 """
1513 Simplistic interface to the ``qstat`` command. Lets you request information about all jobs or ones matching the filter
1514 ['job_id'] or for the username. The result is a JSON dictionary containing come of the useful job attributes returned
1515 by qstat.
1516
1517 PBS is kind of annoying as depending on the configuration it can forget about jobs immediately. So the status of a
1518 finished job is VERY hard to get. There are other commands that are sometimes included that may do a better job.
1519 This one should work for Melbourne's cloud computing centre.
1520
1521 Keyword Args:
1522 username (str): The username of the jobs we are interested in. Only jobs corresponding to the <username>@hostnames
1523 will be in the output dictionary.
1524 job_ids (list[str]): List of Job ID strings, each given by qstat during submission. If this argument is given then
1525 the output of this function will be only information about this jobs. If this argument is not given, then all jobs
1526 matching the other filters will be returned.
1527
1528 Returns:
1529 dict: JSON dictionary of the form (to save you parsing the XML that qstat returns).:
1530
1531 .. code-block:: python
1532
1533 {
1534 "NJOBS": int
1535 "JOBS":[
1536 {
1537 <key: value>, ...
1538 }, ...
1539 ]
1540 }
1541 """
1542 B2DEBUG(29, f"Calling PBS.qstat(username='{username}', job_id={job_ids})")
1543 if not job_ids:
1544 job_ids = []
1545 job_ids = set(job_ids)
1546 cmd_list = ["qstat", "-x"]
1547 # We get an XML serialisable summary from qstat. Requires the shell argument.
1548 cmd = " ".join(cmd_list)
1549 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1550 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1551 jobs_dict = {"NJOBS": 0, "JOBS": []}
1552 jobs_xml = ET.fromstring(output)
1553
1554 # For a specific job_id we can be a bit more efficient in XML parsing
1555 if len(job_ids) == 1:
1556 job_elem = jobs_xml.find(f"./Job[Job_Id='{list(job_ids)[0]}']")
1557 if job_elem:
1558 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job_elem))
1559 jobs_dict["NJOBS"] = 1
1560 return jobs_dict
1561
1562 # Since the username given is not exactly the same as the one that PBS stores (<username>@host)
1563 # we have to simply loop through rather than using XPATH.
1564 for job in jobs_xml.iterfind("Job"):
1565 job_owner = job.find("Job_Owner").text.split("@")[0]
1566 if username and username != job_owner:
1567 continue
1568 job_id = job.find("Job_Id").text
1569 if job_ids and job_id not in job_ids:
1570 continue
1571 jobs_dict["JOBS"].append(cls.create_job_record_from_element(job))
1572 jobs_dict["NJOBS"] += 1
1573 # Remove it so that we don't keep checking for it
1574 if job_id in job_ids:
1575 job_ids.remove(job_id)
1576 return jobs_dict
1577
1578 @staticmethod
1579 def create_job_record_from_element(job_elem):
1580 """
1581 Creates a Job dictionary with various job information from the XML element returned by qstat.
1582
1583 Parameters:
1584 job_elem (xml.etree.ElementTree.Element): The XML Element of the Job
1585
1586 Returns:
1587 dict: JSON serialisable dictionary of the Job information we are interested in.
1588 """
1589 job_dict = {}
1590 job_dict["Job_Id"] = job_elem.find("Job_Id").text
1591 job_dict["Job_Name"] = job_elem.find("Job_Name").text
1592 job_dict["Job_Owner"] = job_elem.find("Job_Owner").text
1593 job_dict["job_state"] = job_elem.find("job_state").text
1594 job_dict["queue"] = job_elem.find("queue").text
1595 return job_dict
1596
1597
1598class LSF(Batch):
1599 """
1600 Backend for submitting calibration processes to a qsub batch system.
1601 """
1602
1603 cmd_wkdir = "#BSUB -cwd"
1604
1605 cmd_stdout = "#BSUB -o"
1606
1607 cmd_stderr = "#BSUB -e"
1608
1609 cmd_queue = "#BSUB -q"
1610
1611 cmd_name = "#BSUB -J"
1612
1613 submission_cmds = ["bsub", "-env", "\"none\"", "<"]
1614
1615 default_global_job_limit = 15000
1616
1617 default_backend_args = {"queue": "s"}
1618
1619 def __init__(self, *, backend_args=None):
1620 super().__init__(backend_args=backend_args)
1621
1622 def _add_batch_directives(self, job, batch_file):
1623 """
1624 Adds LSF BSUB directives for the job to a script.
1625 """
1626 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1627 batch_queue = job_backend_args["queue"]
1628 print("#!/bin/bash", file=batch_file)
1629 print("# --- Start LSF ---", file=batch_file)
1630 print(" ".join([LSF.cmd_queue, batch_queue]), file=batch_file)
1631 print(" ".join([LSF.cmd_name, job.name]), file=batch_file)
1632 print(" ".join([LSF.cmd_wkdir, str(job.working_dir)]), file=batch_file)
1633 print(" ".join([LSF.cmd_stdout, Path(job.working_dir, _STDOUT_FILE).as_posix()]), file=batch_file)
1634 print(" ".join([LSF.cmd_stderr, Path(job.working_dir, _STDERR_FILE).as_posix()]), file=batch_file)
1635 print("# --- End LSF ---", file=batch_file)
1636
1637 def _create_cmd(self, script_path):
1638 """
1639 """
1640 submission_cmd = self.submission_cmds[:]
1641 submission_cmd.append(script_path.as_posix())
1642 submission_cmd = " ".join(submission_cmd)
1643 return [submission_cmd]
1644
1645 @classmethod
1646 def _submit_to_batch(cls, cmd):
1647 """
1648 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1649 """
1650 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1651 return sub_out
1652
1653 class LSFResult(Result):
1654 """
1655 Simple class to help monitor status of jobs submitted by LSF Backend.
1656
1657 You pass in a `Job` object and job id from a bsub command.
1658 When you call the `ready` method it runs bjobs to see whether or not the job has finished.
1659 """
1660
1661
1662 backend_code_to_status = {"RUN": "running",
1663 "DONE": "completed",
1664 "FINISHED": "completed",
1665 "EXIT": "failed",
1666 "PEND": "submitted"
1667 }
1668
1669 def __init__(self, job, job_id):
1670 """
1671 Pass in the job object and the job id to allow the result to do monitoring and perform
1672 post processing of the job.
1673 """
1674 super().__init__(job)
1675
1676 self.job_id = job_id
1677
1678 def update_status(self):
1679 """
1680 Update the job's (or subjobs') status by calling bjobs.
1681 """
1682 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
1683 # Get all jobs info and reuse it for each status update to minimise tie spent on this updating.
1684 bjobs_output = LSF.bjobs(output_fields=["stat", "id"])
1685 if self.job.subjobs:
1686 for subjob in self.job.subjobs.values():
1687 subjob.result._update_result_status(bjobs_output)
1688 else:
1689 self._update_result_status(bjobs_output)
1690
1691 def _update_result_status(self, bjobs_output):
1692 """
1693 Parameters:
1694 bjobs_output (dict): The JSON output of a previous call to bjobs which we can reuse to find the
1695 status of this job. Obviously you should only be passing a JSON dict that contains the 'stat' and
1696 'id' information, otherwise it is useless.
1697
1698 """
1699 try:
1700 backend_status = self._get_status_from_output(bjobs_output)
1701 except KeyError:
1702 # If this happens then maybe the job id wasn't in the bjobs_output argument because it finished.
1703 # Instead of failing immediately we try re-running bjobs here explicitly and then fail if it still isn't there.
1704 bjobs_output = LSF.bjobs(output_fields=["stat", "id"], job_id=str(self.job_id))
1705 try:
1706 backend_status = self._get_status_from_output(bjobs_output)
1707 except KeyError:
1708 # If this happened, maybe we're looking at an old finished job. We could fall back to bhist, but it's
1709 # slow and terrible. Instead let's try looking for the exit code file.
1710 try:
1711 exit_code = self.get_exit_code_from_file()
1712 except FileNotFoundError:
1713 waiting_time = datetime.now() - self.exit_code_file_initial_time
1714 if self.time_to_wait_for_exit_code_file > waiting_time:
1715 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
1716 exit_code = 1
1717 else:
1718 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
1719 return
1720 if exit_code:
1721 backend_status = "EXIT"
1722 else:
1723 backend_status = "FINISHED"
1724 try:
1725 new_job_status = self.backend_code_to_status[backend_status]
1726 except KeyError as err:
1727 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
1728
1729 if new_job_status != self.job.status:
1730 self.job.status = new_job_status
1731
1732 def _get_status_from_output(self, output):
1733 if output["JOBS"] and "ERROR" in output["JOBS"][0]:
1734 if output["JOBS"][0]["ERROR"] == f"Job <{self.job_id}> is not found":
1735 raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1736 else:
1737 raise BackendError(f"Unidentified Error during status check for {self.job}: {output}")
1738 else:
1739 for job_info in output["JOBS"]:
1740 if job_info["JOBID"] == self.job_id:
1741 return job_info["STAT"]
1742 else:
1743 raise KeyError(f"No job record in the 'output' argument had the 'JOBID'=={self.job_id}")
1744
1745 @classmethod
1746 def _create_parent_job_result(cls, parent):
1747 parent.result = cls.LSFResult(parent, None)
1748
1749 @classmethod
1750 def _create_job_result(cls, job, batch_output):
1751 """
1752 """
1753 m = re.search(r"Job <(\d+)>", str(batch_output))
1754 if m:
1755 job_id = m.group(1)
1756 else:
1757 raise BackendError(f"Failed to get the batch job ID of {job}. LSF output was:\n{batch_output}")
1758
1759 B2INFO(f"Job ID of {job} recorded as: {job_id}")
1760 job.result = cls.LSFResult(job, job_id)
1761
1762 def can_submit(self, njobs=1):
1763 """
1764 Checks the global number of jobs in LSF right now (submitted or running) for this user.
1765 Returns True if the number is lower that the limit, False if it is higher.
1766
1767 Parameters:
1768 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
1769 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
1770 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
1771 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
1772 and check again before submitting more.
1773 """
1774 B2DEBUG(29, "Calling LSF().can_submit()")
1775 job_info = self.bjobs(output_fields=["stat"])
1776 total_jobs = job_info["NJOBS"]
1777 B2INFO(f"Total jobs active in the LSF system is currently {total_jobs}")
1778 if (total_jobs + njobs) > self.global_job_limit:
1779 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
1780 return False
1781 else:
1782 B2INFO("There is enough space to submit more jobs.")
1783 return True
1784
1785 @classmethod
1786 def bjobs(cls, output_fields=None, job_id="", username="", queue=""):
1787 """
1788 Simplistic interface to the `bjobs` command. lets you request information about all jobs matching the filters
1789 'job_id', 'username', and 'queue'. The result is the JSON dictionary returned by output of the ``-json`` bjobs option.
1790
1791 Parameters:
1792 output_fields (list[str]): A list of bjobs -o fields that you would like information about e.g. ['stat', 'name', 'id']
1793 job_id (str): String representation of the Job ID given by bsub during submission If this argument is given then
1794 the output of this function will be only information about this job. If this argument is not given, then all jobs
1795 matching the other filters will be returned.
1796 username (str): By default bjobs (and this function) return information about only the current user's jobs. By giving
1797 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
1798 receive job information from all known user jobs matching the other filters.
1799 queue (str): Set this argument to receive job information about jobs that are in the given queue and no other.
1800
1801 Returns:
1802 dict: JSON dictionary of the form:
1803
1804 .. code-block:: python
1805
1806 {
1807 "NJOBS":<njobs returned by command>,
1808 "JOBS":[
1809 {
1810 <output field: value>, ...
1811 }, ...
1812 ]
1813 }
1814 """
1815 B2DEBUG(29, f"Calling LSF.bjobs(output_fields={output_fields}, job_id={job_id}, username={username}, queue={queue})")
1816 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1817 if not output_fields:
1818 output_fields = ["id"]
1819 # Output fields should be space separated but in a string.
1820 field_list_cmd = "\""
1821 field_list_cmd += " ".join(output_fields)
1822 field_list_cmd += "\""
1823 cmd_list = ["bjobs", "-o", field_list_cmd]
1824 # If the queue name is set then we add to the command options
1825 if queue:
1826 cmd_list.extend(["-q", queue])
1827 # If the username is set then we add to the command options
1828 if username:
1829 cmd_list.extend(["-u", username])
1830 # Can now add the json option before the final positional argument (if used)
1831 cmd_list.append("-json")
1832 # If the job id is set then we add to the end of the command
1833 if job_id:
1834 cmd_list.append(job_id)
1835 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1836 cmd = " ".join(cmd_list)
1837 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1838 output = decode_json_string(subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True))
1839 output["NJOBS"] = output["JOBS"]
1840 output["JOBS"] = output["RECORDS"]
1841 del output["RECORDS"]
1842 del output["COMMAND"]
1843 return output
1844
1845 @classmethod
1846 def bqueues(cls, output_fields=None, queues=None):
1847 """
1848 Simplistic interface to the `bqueues` command. lets you request information about all queues matching the filters.
1849 The result is the JSON dictionary returned by output of the ``-json`` bqueues option.
1850
1851 Parameters:
1852 output_fields (list[str]): A list of bqueues -o fields that you would like information about
1853 e.g. the default is ['queue_name' 'status' 'max' 'njobs' 'pend' 'run']
1854 queues (list[str]): Set this argument to receive information about only the queues that are requested and no others.
1855 By default you will receive information about all queues.
1856
1857 Returns:
1858 dict: JSON dictionary of the form:
1859
1860 .. code-block:: python
1861
1862 {
1863 "COMMAND":"bqueues",
1864 "QUEUES":46,
1865 "RECORDS":[
1866 {
1867 "QUEUE_NAME":"b2_beast",
1868 "STATUS":"Open:Active",
1869 "MAX":"200",
1870 "NJOBS":"0",
1871 "PEND":"0",
1872 "RUN":"0"
1873 }, ...
1874 }
1875 """
1876 B2DEBUG(29, f"Calling LSF.bqueues(output_fields={output_fields}, queues={queues})")
1877 # We must always return at least one output field when using JSON and -o options. So we choose the job id
1878 if not output_fields:
1879 output_fields = ["queue_name", "status", "max", "njobs", "pend", "run"]
1880 # Output fields should be space separated but in a string.
1881 field_list_cmd = "\""
1882 field_list_cmd += " ".join(output_fields)
1883 field_list_cmd += "\""
1884 cmd_list = ["bqueues", "-o", field_list_cmd]
1885 # Can now add the json option before the final positional argument (if used)
1886 cmd_list.append("-json")
1887 # If the queue name is set then we add to the end of the command
1888 if queues:
1889 cmd_list.extend(queues)
1890 # We get a JSON serialisable summary from bjobs. Requires the shell argument.
1891 cmd = " ".join(cmd_list)
1892 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
1893 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
1894 return decode_json_string(output)
1895
1896
1897class HTCondor(Batch):
1898 """
1899 Backend for submitting calibration processes to a HTCondor batch system.
1900 """
1901
1902 batch_submit_script = "submit.sub"
1903
1904 submission_cmds = ["condor_submit", "-terse"]
1905
1906 default_global_job_limit = 10000
1907
1908 default_backend_args = {
1909 "universe": "vanilla",
1910 "getenv": "false",
1911 "request_memory": "4 GB", # We set the default requested memory to 4 GB to maintain parity with KEKCC
1912 "path_prefix": "", # Path prefix for file path
1913 "extra_lines": [] # These should be other HTCondor submit script lines like 'request_cpus = 2'
1914 }
1915
1916 default_class_ads = ["GlobalJobId", "JobStatus", "Owner"]
1917
1918 def _make_submit_file(self, job, submit_file_path):
1919 """
1920 Fill HTCondor submission file.
1921 """
1922 # Find all files/directories in the working directory to copy on the worker node
1923
1924 files_to_transfer = [i.as_posix() for i in job.working_dir.iterdir()]
1925
1926 job_backend_args = {**self.backend_args, **job.backend_args} # Merge the two dictionaries, with the job having priority
1927
1928 with open(submit_file_path, "w") as submit_file:
1929 print(f'executable = {self.get_submit_script_path(job)}', file=submit_file)
1930 print(f'log = {Path(job.output_dir, "htcondor.log").as_posix()}', file=submit_file)
1931 print(f'output = {Path(job.working_dir, _STDOUT_FILE).as_posix()}', file=submit_file)
1932 print(f'error = {Path(job.working_dir, _STDERR_FILE).as_posix()}', file=submit_file)
1933 print('transfer_input_files = ', ','.join(files_to_transfer), file=submit_file)
1934 print(f'universe = {job_backend_args["universe"]}', file=submit_file)
1935 print(f'getenv = {job_backend_args["getenv"]}', file=submit_file)
1936 print(f'request_memory = {job_backend_args["request_memory"]}', file=submit_file)
1937 print('should_transfer_files = Yes', file=submit_file)
1938 print('when_to_transfer_output = ON_EXIT', file=submit_file)
1939 # Any other lines in the backend args that we don't deal with explicitly but maybe someone wants to insert something
1940 for line in job_backend_args["extra_lines"]:
1941 print(line, file=submit_file)
1942 print('queue', file=submit_file)
1943
1944 def _add_batch_directives(self, job, batch_file):
1945 """
1946 For HTCondor leave empty as the directives are already included in the submit file.
1947 """
1948 print('#!/bin/bash', file=batch_file)
1949
1950 def _create_cmd(self, script_path):
1951 """
1952 """
1953 submission_cmd = self.submission_cmds[:]
1954 submission_cmd.append(script_path.as_posix())
1955 return submission_cmd
1956
1957 def get_batch_submit_script_path(self, job):
1958 """
1959 Construct the Path object of the .sub file that we will use to describe the job.
1960 """
1961 return Path(job.working_dir, self.batch_submit_script)
1962
1963 @classmethod
1964 def _submit_to_batch(cls, cmd):
1965 """
1966 Do the actual batch submission command and collect the output to find out the job id for later monitoring.
1967 """
1968 job_dir = Path(cmd[-1]).parent.as_posix()
1969 sub_out = ""
1970 attempt = 0
1971 sleep_time = 30
1972
1973 while attempt < 3:
1974 try:
1975 sub_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, cwd=job_dir)
1976 break
1977 except subprocess.CalledProcessError as e:
1978 attempt += 1
1979 if attempt == 3:
1980 B2ERROR(f"Error during condor_submit: {str(e)} occurred more than 3 times.")
1981 raise e
1982 else:
1983 B2ERROR(f"Error during condor_submit: {str(e)}, sleeping for {sleep_time} seconds.")
1984 time.sleep(30)
1985 return re.search(r"(\d+\.\d+) - \d+\.\d+", sub_out).groups()[0]
1986
1987 class HTCondorResult(Result):
1988 """
1989 Simple class to help monitor status of jobs submitted by HTCondor Backend.
1990
1991 You pass in a `Job` object and job id from a condor_submit command.
1992 When you call the `ready` method it runs condor_q and, if needed, ``condor_history``
1993 to see whether or not the job has finished.
1994 """
1995
1996
1997 backend_code_to_status = {0: "submitted",
1998 1: "submitted",
1999 2: "running",
2000 3: "failed",
2001 4: "completed",
2002 5: "submitted",
2003 6: "failed"
2004 }
2005
2006 def __init__(self, job, job_id):
2007 """
2008 Pass in the job object and the job id to allow the result to do monitoring and perform
2009 post processing of the job.
2010 """
2011 super().__init__(job)
2012
2013 self.job_id = job_id
2014
2015 def update_status(self):
2016 """
2017 Update the job's (or subjobs') status by calling condor_q.
2018 """
2019 B2DEBUG(29, f"Calling {self.job.name}.result.update_status()")
2020 # Get all jobs info and reuse it for each status update to minimise tie spent on this updating.
2021 condor_q_output = HTCondor.condor_q()
2022 if self.job.subjobs:
2023 for subjob in self.job.subjobs.values():
2024 subjob.result._update_result_status(condor_q_output)
2025 else:
2026 self._update_result_status(condor_q_output)
2027
2028 def _update_result_status(self, condor_q_output):
2029 """
2030 In order to be slightly more efficient we pass in a previous call to condor_q to see if it can work.
2031 If it is there we update the job's status. If not we are forced to start calling condor_q and, if needed,
2032 ``condor_history``, etc.
2033
2034 Parameters:
2035 condor_q_output (dict): The JSON output of a previous call to `HTCondor.condor_q` which we can reuse to find the
2036 status of this job if it was active when that command ran.
2037 """
2038 B2DEBUG(29, f"Calling {self.job}.result._update_result_status()")
2039 jobs_info = []
2040 for job_record in condor_q_output["JOBS"]:
2041 job_id = job_record["GlobalJobId"].split("#")[1]
2042 if job_id == self.job_id:
2043 B2DEBUG(29, f"Found {self.job_id} in condor_q_output.")
2044 jobs_info.append(job_record)
2045
2046 # Let's look for the exit code file where we expect it
2047 if not jobs_info:
2048 try:
2049 exit_code = self.get_exit_code_from_file()
2050 except FileNotFoundError:
2051 waiting_time = datetime.now() - self.exit_code_file_initial_time
2052 if self.time_to_wait_for_exit_code_file > waiting_time:
2053 B2ERROR(f"Exit code file for {self.job} missing and we can't wait longer. Setting exit code to 1.")
2054 exit_code = 1
2055 else:
2056 B2WARNING(f"Exit code file for {self.job} missing, will wait longer.")
2057 return
2058 if exit_code:
2059 jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2060 else:
2061 jobs_info = [{"JobStatus": 4, "HoldReason": None}] # Set to completed
2062
2063 # If this job wasn't in the passed in condor_q output, let's try our own with the specific job_id
2064 if not jobs_info:
2065 jobs_info = HTCondor.condor_q(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2066
2067 # If no job information is returned then the job already left the queue
2068 # check in the history to see if it succeeded or failed
2069 if not jobs_info:
2070 try:
2071 jobs_info = HTCondor.condor_history(job_id=self.job_id, class_ads=["JobStatus", "HoldReason"])["JOBS"]
2072 except KeyError:
2073 hold_reason = "No Reason Known"
2074
2075 # Still no record of it after waiting for the exit code file?
2076 if not jobs_info:
2077 jobs_info = [{"JobStatus": 6, "HoldReason": None}] # Set to failed
2078
2079 job_info = jobs_info[0]
2080 backend_status = job_info["JobStatus"]
2081 # if job is held (backend_status = 5) then report why then keep waiting
2082 if backend_status == 5:
2083 hold_reason = job_info.get("HoldReason", None)
2084 B2WARNING(f"{self.job} on hold because of {hold_reason}. Keep waiting.")
2085 backend_status = 2
2086 try:
2087 new_job_status = self.backend_code_to_status[backend_status]
2088 except KeyError as err:
2089 raise BackendError(f"Unidentified backend status found for {self.job}: {backend_status}") from err
2090 if new_job_status != self.job.status:
2091 self.job.status = new_job_status
2092
2093 @classmethod
2094 def _create_job_result(cls, job, job_id):
2095 """
2096 """
2097 B2INFO(f"Job ID of {job} recorded as: {job_id}")
2098 job.result = cls.HTCondorResult(job, job_id)
2099
2100 @classmethod
2101 def _create_parent_job_result(cls, parent):
2102 parent.result = cls.HTCondorResult(parent, None)
2103
2104 def can_submit(self, njobs=1):
2105 """
2106 Checks the global number of jobs in HTCondor right now (submitted or running) for this user.
2107 Returns True if the number is lower that the limit, False if it is higher.
2108
2109 Parameters:
2110 njobs (int): The number of jobs that we want to submit before checking again. Lets us check if we
2111 are sufficiently below the limit in order to (somewhat) safely submit. It is slightly dangerous to
2112 assume that it is safe to submit too many jobs since there might be other processes also submitting jobs.
2113 So njobs really shouldn't be abused when you might be getting close to the limit i.e. keep it <=250
2114 and check again before submitting more.
2115 """
2116 B2DEBUG(29, "Calling HTCondor().can_submit()")
2117 jobs_info = self.condor_q()
2118 total_jobs = jobs_info["NJOBS"]
2119 B2INFO(f"Total jobs active in the HTCondor system is currently {total_jobs}")
2120 if (total_jobs + njobs) > self.global_job_limit:
2121 B2INFO(f"Since the global limit is {self.global_job_limit} we cannot submit {njobs} jobs until some complete.")
2122 return False
2123 else:
2124 B2INFO("There is enough space to submit more jobs.")
2125 return True
2126
2127 @classmethod
2128 def condor_q(cls, class_ads=None, job_id="", username=""):
2129 """
2130 Simplistic interface to the `condor_q` command. lets you request information about all jobs matching the filters
2131 'job_id' and 'username'. Note that setting job_id negates username so it is ignored.
2132 The result is the JSON dictionary returned by output of the ``-json`` condor_q option.
2133
2134 Parameters:
2135 class_ads (list[str]): A list of condor_q ClassAds that you would like information about.
2136 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2137 by the condor_q call.
2138 job_id (str): String representation of the Job ID given by condor_submit during submission.
2139 If this argument is given then the output of this function will be only information about this job.
2140 If this argument is not given, then all jobs matching the other filters will be returned.
2141 username (str): By default we return information about only the current user's jobs. By giving
2142 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2143 receive job information from all known user jobs matching the other filters. This may be a LOT of jobs
2144 so it isn't recommended.
2145
2146 Returns:
2147 dict: JSON dictionary of the form:
2148
2149 .. code-block:: python
2150
2151 {
2152 "NJOBS":<number of records returned by command>,
2153 "JOBS":[
2154 {
2155 <ClassAd: value>, ...
2156 }, ...
2157 ]
2158 }
2159 """
2160 B2DEBUG(29, f"Calling HTCondor.condor_q(class_ads={class_ads}, job_id={job_id}, username={username})")
2161 if not class_ads:
2162 class_ads = cls.default_class_ads
2163 # Output fields should be comma separated.
2164 field_list_cmd = ",".join(class_ads)
2165 cmd_list = ["condor_q", "-json", "-attributes", field_list_cmd]
2166 # If job_id is set then we ignore all other filters
2167 if job_id:
2168 cmd_list.append(job_id)
2169 else:
2170 if not username:
2171 username = os.environ["USER"]
2172 # If the username is set to all it is a special case
2173 if username == "all":
2174 cmd_list.append("-allusers")
2175 else:
2176 cmd_list.append(username)
2177 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2178 cmd = " ".join(cmd_list)
2179 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2180 # condor_q occasionally fails
2181 try:
2182 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2183 except BaseException:
2184 records = None
2185
2186 if records:
2187 records = decode_json_string(records)
2188 else:
2189 records = []
2190 jobs_info = {"JOBS": records}
2191 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2192 return jobs_info
2193
2194 @classmethod
2195 def condor_history(cls, class_ads=None, job_id="", username=""):
2196 """
2197 Simplistic interface to the ``condor_history`` command. lets you request information about all jobs matching the filters
2198 ``job_id`` and ``username``. Note that setting job_id negates username so it is ignored.
2199 The result is a JSON dictionary filled by output of the ``-json`` ``condor_history`` option.
2200
2201 Parameters:
2202 class_ads (list[str]): A list of condor_history ClassAds that you would like information about.
2203 By default we give {cls.default_class_ads}, increasing the amount of class_ads increase the time taken
2204 by the condor_q call.
2205 job_id (str): String representation of the Job ID given by condor_submit during submission.
2206 If this argument is given then the output of this function will be only information about this job.
2207 If this argument is not given, then all jobs matching the other filters will be returned.
2208 username (str): By default we return information about only the current user's jobs. By giving
2209 a username you can access the job information of a specific user's jobs. By giving ``username='all'`` you will
2210 receive job information from all known user jobs matching the other filters. This is limited to 10000 records
2211 and isn't recommended.
2212
2213 Returns:
2214 dict: JSON dictionary of the form:
2215
2216 .. code-block:: python
2217
2218 {
2219 "NJOBS":<number of records returned by command>,
2220 "JOBS":[
2221 {
2222 <ClassAd: value>, ...
2223 }, ...
2224 ]
2225 }
2226 """
2227 B2DEBUG(29, f"Calling HTCondor.condor_history(class_ads={class_ads}, job_id={job_id}, username={username})")
2228 if not class_ads:
2229 class_ads = cls.default_class_ads
2230 # Output fields should be comma separated.
2231 field_list_cmd = ",".join(class_ads)
2232 cmd_list = ["condor_history", "-json", "-attributes", field_list_cmd]
2233 # If job_id is set then we ignore all other filters
2234 if job_id:
2235 cmd_list.append(job_id)
2236 else:
2237 if not username:
2238 username = os.environ["USER"]
2239 # If the username is set to all it is a special case
2240 if username != "all":
2241 cmd_list.append(username)
2242 # We get a JSON serialisable summary from condor_q. But we will alter it slightly to be more similar to other backends
2243 cmd = " ".join(cmd_list)
2244 B2DEBUG(29, f"Calling subprocess with command = '{cmd}'")
2245 try:
2246 records = subprocess.check_output(cmd, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
2247 except BaseException:
2248 records = None
2249
2250 if records:
2251 records = decode_json_string(records)
2252 else:
2253 records = []
2254
2255 jobs_info = {"JOBS": records}
2256 jobs_info["NJOBS"] = len(jobs_info["JOBS"]) # Just to avoid having to len() it in the future
2257 return jobs_info
2258
2259
2260class DIRAC(Backend):
2261 """
2262 Backend for submitting calibration processes to the grid.
2263 """
2264
2265
2266class BackendError(Exception):
2267 """
2268 Base exception class for Backend classes.
2269 """
2270
2271
2272class JobError(Exception):
2273 """
2274 Base exception class for Job objects.
2275 """
2276
2277
2278class SplitterError(Exception):
2279 """
2280 Base exception class for SubjobSplitter objects.
2281 """
2282
2283# @endcond
STL class.