Belle II Software development
Job Class Reference
Inheritance diagram for Job:
SubJob

Public Member Functions

 __init__ (self, name, job_dict=None)
 
 __repr__ (self)
 
 ready (self)
 
 update_status (self)
 
 create_subjob (self, i, input_files=None, args=None)
 
 status (self)
 
 status (self, status)
 
 dump_to_json (self, file_path)
 
 from_json (cls, file_path)
 
 job_dict (self)
 
 dump_input_data (self)
 
 copy_input_sandbox_files_to_working_dir (self)
 
 check_input_data_files (self)
 
 full_command (self)
 
 append_current_basf2_setup_cmds (self)
 

Public Attributes

 name = name
 Job object's name.
 
 splitter = None
 The SubjobSplitter used to create subjobs if necessary.
 
list input_sandbox_files = []
 Files to be copied directly into the working directory (pathlib.Path).
 
 working_dir = Path()
 Working directory of the job (pathlib.Path).
 
 output_dir = Path()
 Output directory (pathlib.Path), where we will download our output_files to.
 
list output_patterns = []
 Files that we produce during the job and want to be returned.
 
list cmd = []
 Command and arguments as a list that will be run by the job on the backend.
 
list args = []
 The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own arguments)
 
list input_files = []
 Input files to job (str), a list of these is copied to the working directory.
 
list setup_cmds = []
 Bash commands to run before the main self.cmd (mainly used for batch system setup)
 
dict backend_args = {}
 Config dictionary for the backend to use when submitting the job.
 
dict subjobs = {}
 dict of subjobs assigned to this job
 
 result = None
 The result object of this Job.
 
 status = job_status
 Not a real attribute, it's a property.
 

Static Public Attributes

dict statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
 Allowed Job status dictionary.
 
list exit_statuses = ["failed", "completed"]
 Job statuses that correspond to the Job being finished (successfully or not)
 

Protected Member Functions

 _get_overall_status_from_subjobs (self)
 

Protected Attributes

str _status = "init"
 The actual status of the overall Job.
 

Detailed Description

This generic Job object is used to tell a Backend what to do.
This object basically holds necessary information about a process you want to submit to a `Backend`.
It should *not* do anything that is backend specific, just hold the configuration for a job to be
successfully submitted and monitored using a backend. The result attribute is where backend
specific job monitoring goes.

Parameters:
    name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF

.. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.

Definition at line 336 of file backends.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
name,
job_dict = None )
 

Definition at line 357 of file backends.py.

357 def __init__(self, name, job_dict=None):
358 """
359 """
360
361 self.name = name
362
363 self.splitter = None
364
366 self.input_sandbox_files = []
367
368 self.working_dir = Path()
369
370 self.output_dir = Path()
371
372 self.output_patterns = []
373
374 self.cmd = []
375
376 self.args = []
377
378 self.input_files = []
379
380 self.setup_cmds = []
381
383 self.backend_args = {}
384
385 self.subjobs = {}
386
387 if job_dict:
388 self.input_sandbox_files = [Path(p) for p in job_dict["input_sandbox_files"]]
389 self.working_dir = Path(job_dict["working_dir"])
390 self.output_dir = Path(job_dict["output_dir"])
391 self.output_patterns = job_dict["output_patterns"]
392 self.cmd = job_dict["cmd"]
393 self.args = job_dict["args"]
394 self.input_files = job_dict["input_files"]
395 self.setup_cmds = job_dict["setup_cmds"]
396 self.backend_args = job_dict["backend_args"]
397 for subjob_dict in job_dict["subjobs"]:
398 self.create_subjob(subjob_dict["id"], input_files=subjob_dict["input_files"], args=subjob_dict["args"])
399
400
402 self.result = None
403
404 self._status = "init"
405

Member Function Documentation

◆ __repr__()

__repr__ ( self)
Representation of Job class (what happens when you print a Job() instance).

Definition at line 406 of file backends.py.

406 def __repr__(self):
407 """
408 Representation of Job class (what happens when you print a Job() instance).
409 """
410 return f"Job({self.name})"
411

◆ _get_overall_status_from_subjobs()

_get_overall_status_from_subjobs ( self)
protected
 

Definition at line 463 of file backends.py.

463 def _get_overall_status_from_subjobs(self):
464 """
465 """
466 subjob_statuses = [subjob.status for subjob in self.subjobs.values()]
467 status_level = min([self.statuses[status] for status in subjob_statuses])
468 for status, level in self.statuses.items():
469 if level == status_level:
470 return status
471

◆ append_current_basf2_setup_cmds()

append_current_basf2_setup_cmds ( self)
This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
It should detect if you are using a local release or CVMFS and append the correct commands
so that the job will have the same basf2 release environment. It should also detect
if a local release is not compiled with the ``opt`` option.

Note that this *doesn't mean that every environment variable is inherited* from the submitting
process environment.

Definition at line 640 of file backends.py.

640 def append_current_basf2_setup_cmds(self):
641 """
642 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
643 It should detect if you are using a local release or CVMFS and append the correct commands
644 so that the job will have the same basf2 release environment. It should also detect
645 if a local release is not compiled with the ``opt`` option.
646
647 Note that this *doesn't mean that every environment variable is inherited* from the submitting
648 process environment.
649 """
650 def append_environment_variable(cmds, envvar):
651 """
652 Append a command for setting an environment variable.
653 """
654 if envvar in os.environ:
655 cmds.append(f"""if [ -z "${{{envvar}}}" ]; then""")
656 cmds.append(f" export {envvar}={os.environ[envvar]}")
657 cmds.append("fi")
658
659 if "BELLE2_TOOLS" not in os.environ:
660 raise BackendError("No BELLE2_TOOLS found in environment")
661 # Export all the environment variables defined via _backend_job_envvars
662 for envvar in _backend_job_envvars:
663 append_environment_variable(self.setup_cmds, envvar)
664 if "BELLE2_RELEASE" in os.environ:
665 self.setup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
666 elif 'BELLE2_LOCAL_DIR' in os.environ:
667 self.setup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
668 self.setup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
669 self.setup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
670 self.setup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
671 self.setup_cmds.append("pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
672 self.setup_cmds.append("source $BACKEND_B2SETUP")
673 # b2code-option has to be executed only after the source of the tools.
674 self.setup_cmds.append("b2code-option $BACKEND_BELLE2_OPTION")
675 self.setup_cmds.append("popd > /dev/null")
676
677

◆ check_input_data_files()

check_input_data_files ( self)
Check the input files and make sure that there aren't any duplicates.
Also check if the files actually exist if possible.

Definition at line 598 of file backends.py.

598 def check_input_data_files(self):
599 """
600 Check the input files and make sure that there aren't any duplicates.
601 Also check if the files actually exist if possible.
602 """
603 existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
604 for file_path in self.input_files:
605 file_uri = parse_file_uri(file_path)
606 if file_uri.scheme == "file":
607 p = Path(file_uri.path)
608 if p.is_file():
609 if file_uri.geturl() not in existing_input_files:
610 existing_input_files.append(file_uri.geturl())
611 else:
612 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
613 else:
614 B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
615 else:
616 B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
617 if file_path not in existing_input_files:
618 existing_input_files.append(file_path)
619 else:
620 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
621 if self.input_files and not existing_input_files:
622 B2WARNING(f"No valid input file paths found for {self.name}, but some were requested.")
623
624 # Replace the Job's input files with the ones that exist + duplicates removed
625 self.input_files = existing_input_files
626

◆ copy_input_sandbox_files_to_working_dir()

copy_input_sandbox_files_to_working_dir ( self)
Get all of the requested files for the input sandbox and copy them to the working directory.
Files like the submit.sh or input_data.json are not part of this process.

Definition at line 587 of file backends.py.

587 def copy_input_sandbox_files_to_working_dir(self):
588 """
589 Get all of the requested files for the input sandbox and copy them to the working directory.
590 Files like the submit.sh or input_data.json are not part of this process.
591 """
592 for file_path in self.input_sandbox_files:
593 if file_path.is_dir():
594 shutil.copytree(file_path, Path(self.working_dir, file_path.name))
595 else:
596 shutil.copy(file_path, self.working_dir)
597

◆ create_subjob()

create_subjob ( self,
i,
input_files = None,
args = None )
Creates a subjob Job object that references that parent Job.
Returns the SubJob object at the end.

Definition at line 435 of file backends.py.

435 def create_subjob(self, i, input_files=None, args=None):
436 """
437 Creates a subjob Job object that references that parent Job.
438 Returns the SubJob object at the end.
439 """
440 if i not in self.subjobs:
441 B2INFO(f"Creating {self}.Subjob({i})")
442 subjob = SubJob(self, i, input_files)
443 if args:
444 subjob.args = args
445 self.subjobs[i] = subjob
446 return subjob
447 else:
448 B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
449

◆ dump_input_data()

dump_input_data ( self)
Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
string URI objects.

Definition at line 579 of file backends.py.

579 def dump_input_data(self):
580 """
581 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
582 string URI objects.
583 """
584 with open(Path(self.working_dir, _input_data_file_path), mode="w") as input_data_file:
585 json.dump(self.input_files, input_data_file, indent=2)
586

◆ dump_to_json()

dump_to_json ( self,
file_path )
Dumps the Job object configuration to a JSON file so that it can be read in again later.

Parameters:
  file_path(`basf2.Path`): The filepath we'll dump to

Definition at line 538 of file backends.py.

538 def dump_to_json(self, file_path):
539 """
540 Dumps the Job object configuration to a JSON file so that it can be read in again later.
541
542 Parameters:
543 file_path(`basf2.Path`): The filepath we'll dump to
544 """
545 # \cond false positive doxygen warning about job_dict
546 with open(file_path, mode="w") as job_file:
547 json.dump(self.job_dict, job_file, indent=2)
548 # \endcond
549

◆ from_json()

from_json ( cls,
file_path )
 

Definition at line 551 of file backends.py.

551 def from_json(cls, file_path):
552 """
553 """
554 with open(file_path) as job_file:
555 job_dict = json.load(job_file)
556 return cls(job_dict["name"], job_dict=job_dict)
557

◆ full_command()

full_command ( self)
Returns:
    str: The full command that this job will run including any arguments.

Definition at line 628 of file backends.py.

628 def full_command(self):
629 """
630 Returns:
631 str: The full command that this job will run including any arguments.
632 """
633 all_components = self.cmd[:]
634 all_components.extend(self.args)
635 # We do a convert to string just in case arguments were generated as different types
636 full_command = " ".join(map(str, all_components))
637 B2DEBUG(29, f"Full command of {self} is '{full_command}'")
638 return full_command
639
STL class.

◆ job_dict()

job_dict ( self)
Returns:
    dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
    `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.

Reimplemented in SubJob.

Definition at line 559 of file backends.py.

559 def job_dict(self):
560 """
561 Returns:
562 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
563 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
564 """
565 job_dict = {}
566 job_dict["name"] = self.name
567 job_dict["input_sandbox_files"] = [i.as_posix() for i in self.input_sandbox_files]
568 job_dict["working_dir"] = self.working_dir.as_posix()
569 job_dict["output_dir"] = self.output_dir.as_posix()
570 job_dict["output_patterns"] = self.output_patterns
571 job_dict["cmd"] = self.cmd
572 job_dict["args"] = self.args
573 job_dict["input_files"] = self.input_files
574 job_dict["setup_cmds"] = self.setup_cmds
575 job_dict["backend_args"] = self.backend_args
576 job_dict["subjobs"] = [sj.job_dict for sj in self.subjobs.values()]
577 return job_dict
578

◆ ready()

ready ( self)
Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.

Definition at line 412 of file backends.py.

412 def ready(self):
413 """
414 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
415 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
416 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
417 """
418 if not self.result:
419 B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
420 return False
421 else:
422 return self.result.ready()
423

◆ status() [1/2]

status ( self)
Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
subjob status in the hierarchy of statuses in `Job.statuses`.

Reimplemented in SubJob, and SubJob.

Definition at line 451 of file backends.py.

451 def status(self):
452 """
453 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
454 subjob status in the hierarchy of statuses in `Job.statuses`.
455 """
456 if self.subjobs:
457 job_status = self._get_overall_status_from_subjobs()
458 if job_status != self._status:
459
460 self.status = job_status
461 return self._status
462

◆ status() [2/2]

status ( self,
status )
Sets the status of this Job.

Reimplemented in SubJob, and SubJob.

Definition at line 473 of file backends.py.

473 def status(self, status):
474 """
475 Sets the status of this Job.
476 """
477 # Print an error only if the job failed.
478 if status == 'failed':
479 B2ERROR(f"Setting {self.name} status to failed")
480 else:
481 B2INFO(f"Setting {self.name} status to {status}")
482 self._status = status
483

◆ update_status()

update_status ( self)
Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
in the best way for the type of result object/backend.

Definition at line 424 of file backends.py.

424 def update_status(self):
425 """
426 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
427 in the best way for the type of result object/backend.
428 """
429 if not self.result:
430 B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
431 else:
432 self.result.update_status()
433 return self.status
434

Member Data Documentation

◆ _status

str _status = "init"
protected

The actual status of the overall Job.

The property handles querying for the subjob status to set this

Definition at line 404 of file backends.py.

◆ args

args = []

The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own arguments)

Definition at line 376 of file backends.py.

◆ backend_args

dict backend_args = {}

Config dictionary for the backend to use when submitting the job.

Saves us from having multiple attributes that may or may not be used.

Definition at line 383 of file backends.py.

◆ cmd

list cmd = []

Command and arguments as a list that will be run by the job on the backend.

Definition at line 374 of file backends.py.

◆ exit_statuses

list exit_statuses = ["failed", "completed"]
static

Job statuses that correspond to the Job being finished (successfully or not)

Definition at line 355 of file backends.py.

◆ input_files

input_files = []

Input files to job (str), a list of these is copied to the working directory.

Definition at line 378 of file backends.py.

◆ input_sandbox_files

list input_sandbox_files = []

Files to be copied directly into the working directory (pathlib.Path).

Not the input root files, those should be in Job.input_files.

Definition at line 366 of file backends.py.

◆ name

name = name

Job object's name.

Only descriptive, not necessarily unique.

Reimplemented in SubJob.

Definition at line 361 of file backends.py.

◆ output_dir

output_dir = Path()

Output directory (pathlib.Path), where we will download our output_files to.

Default is '.'

Reimplemented in SubJob.

Definition at line 370 of file backends.py.

◆ output_patterns

list output_patterns = []

Files that we produce during the job and want to be returned.

Can use wildcard (*)

Definition at line 372 of file backends.py.

◆ result

result = None

The result object of this Job.

Only filled once the job is submitted to a backend since the backend creates a special result class depending on its type.

Definition at line 402 of file backends.py.

◆ setup_cmds

setup_cmds = []

Bash commands to run before the main self.cmd (mainly used for batch system setup)

Definition at line 380 of file backends.py.

◆ splitter

splitter = None

The SubjobSplitter used to create subjobs if necessary.

Definition at line 363 of file backends.py.

◆ status

status = job_status

Not a real attribute, it's a property.

Reimplemented in SubJob, and SubJob.

Definition at line 460 of file backends.py.

◆ statuses

dict statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
static

Allowed Job status dictionary.

The key is the status name and the value is its level. The lowest level out of all subjobs is the one that is the overall status of the overall job.

Definition at line 352 of file backends.py.

◆ subjobs

dict subjobs = {}

dict of subjobs assigned to this job

Reimplemented in SubJob.

Definition at line 385 of file backends.py.

◆ working_dir

working_dir = Path()

Working directory of the job (pathlib.Path).

Default is '.', mostly used in Local() backend

Reimplemented in SubJob.

Definition at line 368 of file backends.py.


The documentation for this class was generated from the following file: