Belle II Software development
Job Class Reference
Inheritance diagram for Job:
SubJob

Public Member Functions

 __init__ (self, name, job_dict=None)
 
 __repr__ (self)
 
 ready (self)
 
 update_status (self)
 
 create_subjob (self, i, input_files=None, args=None)
 
 status (self)
 
 status (self, status)
 
 dump_to_json (self, file_path)
 
 from_json (cls, file_path)
 
 job_dict (self)
 
 dump_input_data (self)
 
 copy_input_sandbox_files_to_working_dir (self)
 
 check_input_data_files (self)
 
 full_command (self)
 
 append_current_basf2_setup_cmds (self)
 

Public Attributes

 name = name
 Job object's name.
 
 splitter = None
 The SubjobSplitter used to create subjobs if necessary.
 
list input_sandbox_files = []
 Files to be copied directly into the working directory (pathlib.Path).
 
 working_dir = Path()
 Working directory of the job (pathlib.Path).
 
 output_dir = Path()
 Output directory (pathlib.Path), where we will download our output_files to.
 
list output_patterns = []
 Files that we produce during the job and want to be returned.
 
list cmd = []
 Command and arguments as a list that will be run by the job on the backend.
 
list args = []
 The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own arguments)
 
list input_files = []
 Input files to job (str), a list of these is copied to the working directory.
 
list setup_cmds = []
 Bash commands to run before the main self.cmd (mainly used for batch system setup)
 
dict backend_args = {}
 Config dictionary for the backend to use when submitting the job.
 
dict subjobs = {}
 dict of subjobs assigned to this job
 
 result = None
 The result object of this Job.
 
 status = job_status
 Not a real attribute, it's a property.
 

Static Public Attributes

dict statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
 Allowed Job status dictionary.
 
list exit_statuses = ["failed", "completed"]
 Job statuses that correspond to the Job being finished (successfully or not)
 

Protected Member Functions

 _get_overall_status_from_subjobs (self)
 

Protected Attributes

str _status = "init"
 The actual status of the overall Job.
 

Detailed Description

This generic Job object is used to tell a Backend what to do.
This object basically holds necessary information about a process you want to submit to a `Backend`.
It should *not* do anything that is backend specific, just hold the configuration for a job to be
successfully submitted and monitored using a backend. The result attribute is where backend
specific job monitoring goes.

Parameters:
    name (str): Simply a name to describe the Job, not used for any critical purpose in the CAF

.. warning:: It is recommended to always use absolute paths for files when submitting a `Job`.

Definition at line 333 of file backends.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
name,
job_dict = None )
 

Definition at line 354 of file backends.py.

354 def __init__(self, name, job_dict=None):
355 """
356 """
357
358 self.name = name
359
360 self.splitter = None
361
362 if not job_dict:
363
365 self.input_sandbox_files = []
366
367 self.working_dir = Path()
368
369 self.output_dir = Path()
370
371 self.output_patterns = []
372
373 self.cmd = []
374
375 self.args = []
376
377 self.input_files = []
378
379 self.setup_cmds = []
380
382 self.backend_args = {}
383
384 self.subjobs = {}
385 elif job_dict:
386 self.input_sandbox_files = [Path(p) for p in job_dict["input_sandbox_files"]]
387 self.working_dir = Path(job_dict["working_dir"])
388 self.output_dir = Path(job_dict["output_dir"])
389 self.output_patterns = job_dict["output_patterns"]
390 self.cmd = job_dict["cmd"]
391 self.args = job_dict["args"]
392 self.input_files = job_dict["input_files"]
393 self.setup_cmds = job_dict["setup_cmds"]
394 self.backend_args = job_dict["backend_args"]
395 self.subjobs = {}
396 for subjob_dict in job_dict["subjobs"]:
397 self.create_subjob(subjob_dict["id"], input_files=subjob_dict["input_files"], args=subjob_dict["args"])
398
399
401 self.result = None
402
403 self._status = "init"
404

Member Function Documentation

◆ __repr__()

__repr__ ( self)
Representation of Job class (what happens when you print a Job() instance).

Definition at line 405 of file backends.py.

405 def __repr__(self):
406 """
407 Representation of Job class (what happens when you print a Job() instance).
408 """
409 return f"Job({self.name})"
410

◆ _get_overall_status_from_subjobs()

_get_overall_status_from_subjobs ( self)
protected
 

Definition at line 462 of file backends.py.

462 def _get_overall_status_from_subjobs(self):
463 """
464 """
465 subjob_statuses = [subjob.status for subjob in self.subjobs.values()]
466 status_level = min([self.statuses[status] for status in subjob_statuses])
467 for status, level in self.statuses.items():
468 if level == status_level:
469 return status
470

◆ append_current_basf2_setup_cmds()

append_current_basf2_setup_cmds ( self)
This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
It should detect if you are using a local release or CVMFS and append the correct commands
so that the job will have the same basf2 release environment. It should also detect
if a local release is not compiled with the ``opt`` option.

Note that this *doesn't mean that every environment variable is inherited* from the submitting
process environment.

Definition at line 639 of file backends.py.

639 def append_current_basf2_setup_cmds(self):
640 """
641 This adds simple setup commands like ``source /path/to/tools/b2setup`` to your `Job`.
642 It should detect if you are using a local release or CVMFS and append the correct commands
643 so that the job will have the same basf2 release environment. It should also detect
644 if a local release is not compiled with the ``opt`` option.
645
646 Note that this *doesn't mean that every environment variable is inherited* from the submitting
647 process environment.
648 """
649 def append_environment_variable(cmds, envvar):
650 """
651 Append a command for setting an environment variable.
652 """
653 if envvar in os.environ:
654 cmds.append(f"""if [ -z "${{{envvar}}}" ]; then""")
655 cmds.append(f" export {envvar}={os.environ[envvar]}")
656 cmds.append("fi")
657
658 if "BELLE2_TOOLS" not in os.environ:
659 raise BackendError("No BELLE2_TOOLS found in environment")
660 # Export all the environment variables defined via _backend_job_envvars
661 for envvar in _backend_job_envvars:
662 append_environment_variable(self.setup_cmds, envvar)
663 if "BELLE2_RELEASE" in os.environ:
664 self.setup_cmds.append(f"source {os.environ['BELLE2_TOOLS']}/b2setup {os.environ['BELLE2_RELEASE']}")
665 elif 'BELLE2_LOCAL_DIR' in os.environ:
666 self.setup_cmds.append("export BELLE2_NO_TOOLS_CHECK=\"TRUE\"")
667 self.setup_cmds.append(f"BACKEND_B2SETUP={os.environ['BELLE2_TOOLS']}/b2setup")
668 self.setup_cmds.append(f"BACKEND_BELLE2_RELEASE_LOC={os.environ['BELLE2_LOCAL_DIR']}")
669 self.setup_cmds.append(f"BACKEND_BELLE2_OPTION={os.environ['BELLE2_OPTION']}")
670 self.setup_cmds.append("pushd $BACKEND_BELLE2_RELEASE_LOC > /dev/null")
671 self.setup_cmds.append("source $BACKEND_B2SETUP")
672 # b2code-option has to be executed only after the source of the tools.
673 self.setup_cmds.append("b2code-option $BACKEND_BELLE2_OPTION")
674 self.setup_cmds.append("popd > /dev/null")
675
676

◆ check_input_data_files()

check_input_data_files ( self)
Check the input files and make sure that there aren't any duplicates.
Also check if the files actually exist if possible.

Definition at line 597 of file backends.py.

597 def check_input_data_files(self):
598 """
599 Check the input files and make sure that there aren't any duplicates.
600 Also check if the files actually exist if possible.
601 """
602 existing_input_files = [] # We use a list instead of set to avoid losing any ordering of files
603 for file_path in self.input_files:
604 file_uri = parse_file_uri(file_path)
605 if file_uri.scheme == "file":
606 p = Path(file_uri.path)
607 if p.is_file():
608 if file_uri.geturl() not in existing_input_files:
609 existing_input_files.append(file_uri.geturl())
610 else:
611 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
612 else:
613 B2WARNING(f"Requested input file path {file_path} does not exist, skipping it.")
614 else:
615 B2DEBUG(29, f"{file_path} is not a local file URI. Skipping checking if file exists")
616 if file_path not in existing_input_files:
617 existing_input_files.append(file_path)
618 else:
619 B2WARNING(f"Requested input file path {file_path} was already added, skipping it.")
620 if self.input_files and not existing_input_files:
621 B2WARNING(f"No valid input file paths found for {self.name}, but some were requested.")
622
623 # Replace the Job's input files with the ones that exist + duplicates removed
624 self.input_files = existing_input_files
625

◆ copy_input_sandbox_files_to_working_dir()

copy_input_sandbox_files_to_working_dir ( self)
Get all of the requested files for the input sandbox and copy them to the working directory.
Files like the submit.sh or input_data.json are not part of this process.

Definition at line 586 of file backends.py.

586 def copy_input_sandbox_files_to_working_dir(self):
587 """
588 Get all of the requested files for the input sandbox and copy them to the working directory.
589 Files like the submit.sh or input_data.json are not part of this process.
590 """
591 for file_path in self.input_sandbox_files:
592 if file_path.is_dir():
593 shutil.copytree(file_path, Path(self.working_dir, file_path.name))
594 else:
595 shutil.copy(file_path, self.working_dir)
596

◆ create_subjob()

create_subjob ( self,
i,
input_files = None,
args = None )
Creates a subjob Job object that references that parent Job.
Returns the SubJob object at the end.

Definition at line 434 of file backends.py.

434 def create_subjob(self, i, input_files=None, args=None):
435 """
436 Creates a subjob Job object that references that parent Job.
437 Returns the SubJob object at the end.
438 """
439 if i not in self.subjobs:
440 B2INFO(f"Creating {self}.Subjob({i})")
441 subjob = SubJob(self, i, input_files)
442 if args:
443 subjob.args = args
444 self.subjobs[i] = subjob
445 return subjob
446 else:
447 B2WARNING(f"{self} already contains SubJob({i})! This will not be created.")
448

◆ dump_input_data()

dump_input_data ( self)
Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
string URI objects.

Definition at line 578 of file backends.py.

578 def dump_input_data(self):
579 """
580 Dumps the `Job.input_files` attribute to a JSON file. input_files should be a list of
581 string URI objects.
582 """
583 with open(Path(self.working_dir, _input_data_file_path), mode="w") as input_data_file:
584 json.dump(self.input_files, input_data_file, indent=2)
585

◆ dump_to_json()

dump_to_json ( self,
file_path )
Dumps the Job object configuration to a JSON file so that it can be read in again later.

Parameters:
  file_path(`basf2.Path`): The filepath we'll dump to

Definition at line 537 of file backends.py.

537 def dump_to_json(self, file_path):
538 """
539 Dumps the Job object configuration to a JSON file so that it can be read in again later.
540
541 Parameters:
542 file_path(`basf2.Path`): The filepath we'll dump to
543 """
544 # \cond false positive doxygen warning about job_dict
545 with open(file_path, mode="w") as job_file:
546 json.dump(self.job_dict, job_file, indent=2)
547 # \endcond
548

◆ from_json()

from_json ( cls,
file_path )
 

Definition at line 550 of file backends.py.

550 def from_json(cls, file_path):
551 """
552 """
553 with open(file_path) as job_file:
554 job_dict = json.load(job_file)
555 return cls(job_dict["name"], job_dict=job_dict)
556

◆ full_command()

full_command ( self)
Returns:
    str: The full command that this job will run including any arguments.

Definition at line 627 of file backends.py.

627 def full_command(self):
628 """
629 Returns:
630 str: The full command that this job will run including any arguments.
631 """
632 all_components = self.cmd[:]
633 all_components.extend(self.args)
634 # We do a convert to string just in case arguments were generated as different types
635 full_command = " ".join(map(str, all_components))
636 B2DEBUG(29, f"Full command of {self} is '{full_command}'")
637 return full_command
638
STL class.

◆ job_dict()

job_dict ( self)
Returns:
    dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
    `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.

Reimplemented in SubJob.

Definition at line 558 of file backends.py.

558 def job_dict(self):
559 """
560 Returns:
561 dict: A JSON serialisable representation of the `Job` and its `SubJob` objects.
562 `Path <basf2.Path>` objects are converted to string via ``Path.as_posix()``.
563 """
564 job_dict = {}
565 job_dict["name"] = self.name
566 job_dict["input_sandbox_files"] = [i.as_posix() for i in self.input_sandbox_files]
567 job_dict["working_dir"] = self.working_dir.as_posix()
568 job_dict["output_dir"] = self.output_dir.as_posix()
569 job_dict["output_patterns"] = self.output_patterns
570 job_dict["cmd"] = self.cmd
571 job_dict["args"] = self.args
572 job_dict["input_files"] = self.input_files
573 job_dict["setup_cmds"] = self.setup_cmds
574 job_dict["backend_args"] = self.backend_args
575 job_dict["subjobs"] = [sj.job_dict for sj in self.subjobs.values()]
576 return job_dict
577

◆ ready()

ready ( self)
Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.

Definition at line 411 of file backends.py.

411 def ready(self):
412 """
413 Returns whether or not the Job has finished. If the job has subjobs then it will return true when they are all finished.
414 It will return False as soon as it hits the first failure. Meaning that you cannot guarantee that all subjobs will have
415 their status updated when calling this method. Instead use :py:meth:`update_status` to update all statuses if necessary.
416 """
417 if not self.result:
418 B2DEBUG(29, f"You requested the ready() status for {self} but there is no result object set, returning False.")
419 return False
420 else:
421 return self.result.ready()
422

◆ status() [1/2]

status ( self)
Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
subjob status in the hierarchy of statuses in `Job.statuses`.

Reimplemented in SubJob, and SubJob.

Definition at line 450 of file backends.py.

450 def status(self):
451 """
452 Returns the status of this Job. If the job has subjobs then it will return the overall status equal to the lowest
453 subjob status in the hierarchy of statuses in `Job.statuses`.
454 """
455 if self.subjobs:
456 job_status = self._get_overall_status_from_subjobs()
457 if job_status != self._status:
458
459 self.status = job_status
460 return self._status
461

◆ status() [2/2]

status ( self,
status )
Sets the status of this Job.

Reimplemented in SubJob, and SubJob.

Definition at line 472 of file backends.py.

472 def status(self, status):
473 """
474 Sets the status of this Job.
475 """
476 # Print an error only if the job failed.
477 if status == 'failed':
478 B2ERROR(f"Setting {self.name} status to failed")
479 else:
480 B2INFO(f"Setting {self.name} status to {status}")
481 self._status = status
482

◆ update_status()

update_status ( self)
Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
in the best way for the type of result object/backend.

Definition at line 423 of file backends.py.

423 def update_status(self):
424 """
425 Calls :py:meth:`update_status` on the job's result. The result object should update all of the subjobs (if there are any)
426 in the best way for the type of result object/backend.
427 """
428 if not self.result:
429 B2DEBUG(29, f"You requested update_status() for {self} but there is no result object set yet. Probably not submitted.")
430 else:
431 self.result.update_status()
432 return self.status
433

Member Data Documentation

◆ _status

str _status = "init"
protected

The actual status of the overall Job.

The property handles querying for the subjob status to set this

Definition at line 403 of file backends.py.

◆ args

args = []

The arguments that will be applied to the cmd (These are ignored by SubJobs as they have their own arguments)

Definition at line 375 of file backends.py.

◆ backend_args

dict backend_args = {}

Config dictionary for the backend to use when submitting the job.

Saves us from having multiple attributes that may or may not be used.

Definition at line 382 of file backends.py.

◆ cmd

list cmd = []

Command and arguments as a list that will be run by the job on the backend.

Definition at line 373 of file backends.py.

◆ exit_statuses

list exit_statuses = ["failed", "completed"]
static

Job statuses that correspond to the Job being finished (successfully or not)

Definition at line 352 of file backends.py.

◆ input_files

input_files = []

Input files to job (str), a list of these is copied to the working directory.

Definition at line 377 of file backends.py.

◆ input_sandbox_files

list input_sandbox_files = []

Files to be copied directly into the working directory (pathlib.Path).

Not the input root files, those should be in Job.input_files.

Definition at line 365 of file backends.py.

◆ name

name = name

Job object's name.

Only descriptive, not necessarily unique.

Reimplemented in SubJob.

Definition at line 358 of file backends.py.

◆ output_dir

output_dir = Path()

Output directory (pathlib.Path), where we will download our output_files to.

Default is '.'

Reimplemented in SubJob.

Definition at line 369 of file backends.py.

◆ output_patterns

list output_patterns = []

Files that we produce during the job and want to be returned.

Can use wildcard (*)

Definition at line 371 of file backends.py.

◆ result

result = None

The result object of this Job.

Only filled once the job is submitted to a backend since the backend creates a special result class depending on its type.

Definition at line 401 of file backends.py.

◆ setup_cmds

setup_cmds = []

Bash commands to run before the main self.cmd (mainly used for batch system setup)

Definition at line 379 of file backends.py.

◆ splitter

splitter = None

The SubjobSplitter used to create subjobs if necessary.

Definition at line 360 of file backends.py.

◆ status

status = job_status

Not a real attribute, it's a property.

Reimplemented in SubJob, and SubJob.

Definition at line 459 of file backends.py.

◆ statuses

dict statuses = {"init": 0, "submitted": 1, "running": 2, "failed": 3, "completed": 4}
static

Allowed Job status dictionary.

The key is the status name and the value is its level. The lowest level out of all subjobs is the one that is the overall status of the overall job.

Definition at line 349 of file backends.py.

◆ subjobs

dict subjobs = {}

dict of subjobs assigned to this job

Reimplemented in SubJob.

Definition at line 384 of file backends.py.

◆ working_dir

working_dir = Path()

Working directory of the job (pathlib.Path).

Default is '.', mostly used in Local() backend

Reimplemented in SubJob.

Definition at line 367 of file backends.py.


The documentation for this class was generated from the following file: