Belle II Software prerelease-10-00-00a
Cluster Class Reference
Inheritance diagram for Cluster:
Collaboration diagram for Cluster:

Public Member Functions

 __init__ (self)
 The default constructor.
 
 adjust_path (self, str path)
 This method can be used if path names are different on submission and execution hosts.
 
 available (self)
 The cluster should always be available to accept new jobs.
 
 execute (self, Script job, options="", dry=False, tag="current")
 Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with basf2, depending on the file type.
 
 is_job_finished (self, Script job)
 Checks whether the '.done'-file has been created for a job.
 
str createDoneFileName (self, Script job)
 Generate the file name used for the done output.
 
 prepareSubmission (self, Script job, options, tag)
 Setup output folders and create the wrapping shell script.
 
 checkDoneFile (self, job)
 Checks whether the '.done'-file has been created for a job.
 
 terminate (self, Script job)
 Terminate running job.
 

Static Public Member Functions

 is_supported ()
 
 name ()
 
 description ()
 

Public Attributes

tuple native_spec
 The command to submit a job.
 
int requirement_vmem = 4
 required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consumption
 
int requirement_storage = 50
 the storage IO in GB which can be performed by each job.
 
str queuename = "short.q"
 Queue best suitable for execution at DESY NAF.
 
 path = os.getcwd()
 The default constructor.
 
 logger = logging.getLogger("validate_basf2")
 Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for the 'cluster execution'-Class, so we can log to validate_basf2.py's log what is going on in .execute and .is_finished.
 
 tools = self.adjust_path(os.environ["BELLE2_TOOLS"])
 Path to the basf2 tools and central/local release.
 
str b2setup = "b2setup"
 The command for b2setup (and b2code-option)
 
 clusterlog = open(clusterlog_dir + "clusterlog.log", "w+")
 The file object to which all cluster messages will be written.
 

Detailed Description

A class that provides the controls for running jobs on a (remote)
Sun Grid Engine cluster. It provides two methods:
- is_job_finished(job): Returns True or False, depending on whether the job
    has finished execution
- execute(job): Takes a job and executes it by sending it to the cluster

Definition at line 19 of file clustercontroldrmaa.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self)

The default constructor.

  • Holds the current working directory, which is also the location of the shellscripts that are being sent to the cluster.
  • Initializes a logger which writes to validate_basf2.py's log.
  • Finds the revision of basf2 that will be set up on the cluster.

Definition at line 62 of file clustercontroldrmaa.py.

62 def __init__(self):
63 """!
64 The default constructor.
65 - Holds the current working directory, which is also the location of
66 the shellscripts that are being sent to the cluster.
67 - Initializes a logger which writes to validate_basf2.py's log.
68 - Finds the revision of basf2 that will be set up on the cluster.
69 """
70
71
73 self.native_spec = (
74 "-l h_vmem={requirement_vmem}G,h_fsize={"
75 "requirement_storage}G "
76 "-q {queuename} -V"
77 )
78
79
81 self.requirement_vmem = 4
82
83
86 self.requirement_storage = 50
87
88
89 self.queuename = "short.q"
90
91 # call the base constructor, which will setup the batch cluster
92 # common stuff
93 super().__init__()
94

Member Function Documentation

◆ adjust_path()

adjust_path ( self,
str path )

This method can be used if path names are different on submission and execution hosts.

Parameters
pathThe past that needs to be adjusted
Returns
: The adjusted path

Reimplemented from ClusterBase.

Definition at line 96 of file clustercontroldrmaa.py.

96 def adjust_path(self, path: str):
97 """!
98 This method can be used if path names are different on submission
99 and execution hosts.
100 @param path: The past that needs to be adjusted
101 @return: The adjusted path
102 """
103
104 return path
105

◆ available()

available ( self)

The cluster should always be available to accept new jobs.

Returns
: Will always return True if the function can be called

Definition at line 107 of file clustercontroldrmaa.py.

107 def available(self):
108 """!
109 The cluster should always be available to accept new jobs.
110 @return: Will always return True if the function can be called
111 """
112
113 return True
114

◆ checkDoneFile()

checkDoneFile ( self,
job )
inherited

Checks whether the '.done'-file has been created for a job.

If so, it returns True, else it returns False in the first part of the tuple. Also deletes the .done-File it if exists. The second entry in the tuple will be the exit code read from the done file

Definition at line 139 of file clustercontrolbase.py.

139 def checkDoneFile(self, job):
140 """!
141 Checks whether the '.done'-file has been created for a job. If so, it
142 returns True, else it returns False in the first part of the tuple.
143 Also deletes the .done-File it if exists.
144 The second entry in the tuple will be the exit code read from the done file
145 """
146
147 # If there is a file indicating the job is done, that is its name:
148 donefile_path = self.createDoneFileName(job)
149
150 donefile_exists = False
151 # Check if such a file exists. If so, this means that the job has
152 # finished.
153 if os.path.isfile(donefile_path):
154
155 # Read the returncode/exit_status for the job from the *.done-file
156 with open(donefile_path) as f:
157 try:
158 returncode = int(f.read().strip())
159 except ValueError:
160 returncode = -654
161
162 print(f"donefile found with return code {returncode}")
163 donefile_exists = True
164 os.remove(donefile_path)
165 else:
166 print("no donefile found")
167 returncode = -555
168
169 return [donefile_exists, returncode]
170

◆ createDoneFileName()

str createDoneFileName ( self,
Script job )
inherited

Generate the file name used for the done output.

Definition at line 76 of file clustercontrolbase.py.

76 def createDoneFileName(self, job: Script) -> str:
77 """!
78 Generate the file name used for the done output
79 """
80 return f"{self.path}/script_{job.name}.done"
81

◆ description()

description ( )
static
Returns description of this job control

Definition at line 56 of file clustercontroldrmaa.py.

56 def description():
57 """
58 Returns description of this job control
59 """
60 return "Batch submission via the drmaa interface to Grid Engine"
61

◆ execute()

execute ( self,
Script job,
options = "",
dry = False,
tag = "current" )

Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with basf2, depending on the file type.

Parameters
jobThe steering file object that should be executed
optionsOptions that will be given to the basf2 command
dryWhether to perform a dry run or not
tagThe folder within the results directory
Returns
: None

Definition at line 115 of file clustercontroldrmaa.py.

115 def execute(self, job: Script, options="", dry=False, tag="current"):
116 """!
117 Takes a Script object and a string with options and runs it on the
118 cluster, either with ROOT or with basf2, depending on the file type.
119
120 @param job: The steering file object that should be executed
121 @param options: Options that will be given to the basf2 command
122 @param dry: Whether to perform a dry run or not
123 @param tag: The folder within the results directory
124 @return: None
125 """
126
127 # import here first so the whole module can also be imported on python
128 # installations which have no drmaa at all
129 import drmaa
130
131 print(str(drmaa.Session()))
132
133 with drmaa.Session() as session:
134 print("got session ")
135 print(str(session))
136
137 shell_script_name = self.prepareSubmission(job, options, tag)
138
139 # native specification with all the good settings for the batch
140 # server
141 native_spec_string = self.native_spec.format(
142 requirement_storage=self.requirement_storage,
143 requirement_vmem=self.requirement_vmem,
144 queuename=self.queuename,
145 )
146 print(
147 f"Creating job template for wrapper script {shell_script_name}"
148 )
149 jt = session.createJobTemplate()
150 jt.remoteCommand = shell_script_name
151 jt.joinFiles = True
152 jt.nativeSpecification = native_spec_string
153
154 if not dry:
155 jobid = session.runJob(jt)
156 self.logger.debug(
157 f"Script {job.name} started with job id {jobid}"
158 )
159 job.job_id = jobid
160
161 session.deleteJobTemplate(jt)
162 return
163

◆ is_job_finished()

is_job_finished ( self,
Script job )

Checks whether the '.done'-file has been created for a job.

If so, it returns True, else it returns False. Also deletes the .done-File once it has returned True.

Parameters
jobThe job of which we want to know if it finished
Returns
: (True if the job has finished, exit code). If we can't find the exit code in the '.done'-file, the returncode will be -654. If the job is not finished, the exit code is returned as 0.

Definition at line 164 of file clustercontroldrmaa.py.

164 def is_job_finished(self, job: Script):
165 """!
166 Checks whether the '.done'-file has been created for a job. If so, it
167 returns True, else it returns False.
168 Also deletes the .done-File once it has returned True.
169
170 @param job: The job of which we want to know if it finished
171 @return: (True if the job has finished, exit code). If we can't find the
172 exit code in the '.done'-file, the returncode will be -654.
173 If the job is not finished, the exit code is returned as 0.
174 """
175
176 # import here first so the whole module can also be imported on python
177 # installations which have no drmaa at all
178 import drmaa
179
180 if job.job_id is None:
181 print(
182 "Job has not been started with cluster drmaaa because "
183 "job id is missing"
184 )
185 sys.exit(0)
186
187 with drmaa.Session() as session:
188
189 # some batch server will forget completed jobs right away
190 try:
191 status = session.jobStatus(job.job_id)
192 except drmaa.errors.InvalidJobException:
193 print(
194 f"Job info for jobid {job.job_id} cannot be retrieved, assuming job has terminated"
195 )
196
197 (donefile_exists, donefile_returncode) = self.checkDoneFile(job)
198
199 # always return the job es complete even if there is no done
200 # file at this point tho job is also not longer
201 # running/queued on the cluster
202 return [True, donefile_returncode]
203
204 # Return that the job is finished + the return code for it
205 # depending when we look for the job this might never be used,
206 # because the jobs disappear from qstat before we can query them
207 # ..
208 if status == drmaa.JobState.DONE:
209 # todo: return code
210 return [True, 0]
211 if status == drmaa.JobState.FAILED:
212 return [True, 1]
213
214 return [False, 0]

◆ is_supported()

is_supported ( )
static
Check if qsub is available

Definition at line 29 of file clustercontroldrmaa.py.

29 def is_supported():
30 """
31 Check if qsub is available
32 """
33 try:
34 import drmaa # noqa
35
36 return True
37 except ImportError:
38 print(
39 "drmaa library is not installed, please use 'pip3 install "
40 "drmaa'"
41 )
42 return False
43 except RuntimeError as re:
44 print("drmaa library not properly configured")
45 print(str(re))
46 return False
47

◆ name()

name ( )
static
Returns name of this job control

Definition at line 49 of file clustercontroldrmaa.py.

49 def name():
50 """
51 Returns name of this job control
52 """
53 return "cluster-drmaa"
54

◆ prepareSubmission()

prepareSubmission ( self,
Script job,
options,
tag )
inherited

Setup output folders and create the wrapping shell script.

Will return the full file name of the generated wrapper script.

Definition at line 82 of file clustercontrolbase.py.

82 def prepareSubmission(self, job: Script, options, tag):
83 """!
84 Setup output folders and create the wrapping shell script. Will return
85 the full file name of the generated wrapper script.
86 """
87
88 # Define the folder in which the results (= the ROOT files) should be
89 # created. This is where the files containing plots will end up. By
90 # convention, data files will be stored in the parent dir.
91 # Then make sure the folder exists (create if it does not exist) and
92 # change to cwd to this folder.
93 output_dir = os.path.abspath(f"./results/{tag}/{job.package}")
94 if not os.path.exists(output_dir):
95 os.makedirs(output_dir)
96
97 # Path where log file is supposed to be created
98 # log_file = output_dir + '/' + os.path.basename(job.path) + '.log'
99
100 # Remove any left over done files
101 donefile_path = self.createDoneFileName(job)
102 if os.path.isfile(donefile_path):
103 os.remove(donefile_path)
104
105 # Now we need to distinguish between .py and .C files:
106 extension = os.path.splitext(job.path)[1]
107 if extension == ".C":
108 # .c files are executed with root
109 command = "root -b -q " + job.path
110 else:
111 # .py files are executed with basf2
112 # 'options' contains an option-string for basf2, e.g. '-n 100'
113 command = f"basf2 {job.path} {options}"
114
115 # Create a helpfile-shellscript, which contains all the commands that
116 # need to be executed by the cluster.
117 # First, set up the basf2 tools and perform b2setup with the correct
118 # revision. The execute the command (i.e. run basf2 or ROOT on a
119 # steering file). Write the return code of that into a *.done file.
120 # Delete the helpfile-shellscript.
121 tmp_name = self.path + "/" + "script_" + job.name + ".sh"
122 with open(tmp_name, "w+") as tmp_file:
123 tmp_file.write(
124 "#!/bin/bash \n\n"
125 + "BELLE2_NO_TOOLS_CHECK=1 \n"
126 + f"source {self.tools}/b2setup \n"
127 + f"cd {self.adjust_path(output_dir)} \n"
128 + f"{command} \n"
129 + f"echo $? > {self.path}/script_{job.name}.done \n"
130 + f"rm {tmp_name} \n"
131 )
132
133 # Make the helpfile-shellscript executable
134 st = os.stat(tmp_name)
135 os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
136
137 return tmp_name
138

◆ terminate()

terminate ( self,
Script job )
inherited

Terminate running job.

Definition at line 171 of file clustercontrolbase.py.

171 def terminate(self, job: Script):
172 """! Terminate running job.
173 """
174 self.logger.error("Script termination not supported.")
175

Member Data Documentation

◆ b2setup

str b2setup = "b2setup"
inherited

The command for b2setup (and b2code-option)

Definition at line 51 of file clustercontrolbase.py.

◆ clusterlog

clusterlog = open(clusterlog_dir + "clusterlog.log", "w+")
inherited

The file object to which all cluster messages will be written.

Definition at line 74 of file clustercontrolbase.py.

◆ logger

logger = logging.getLogger("validate_basf2")
inherited

Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for the 'cluster execution'-Class, so we can log to validate_basf2.py's log what is going on in .execute and .is_finished.

Definition at line 39 of file clustercontrolbase.py.

◆ native_spec

tuple native_spec
Initial value:
= (
"-l h_vmem={requirement_vmem}G,h_fsize={"
"requirement_storage}G "
"-q {queuename} -V"
)

The command to submit a job.

'LOGFILE' will be replaced by the actual log file name

Definition at line 73 of file clustercontroldrmaa.py.

◆ path

path = os.getcwd()
inherited

The default constructor.

The path, where the help files are being created Maybe there should be a special subfolder for them?

Definition at line 33 of file clustercontrolbase.py.

◆ queuename

str queuename = "short.q"

Queue best suitable for execution at DESY NAF.

Definition at line 89 of file clustercontroldrmaa.py.

◆ requirement_storage

int requirement_storage = 50

the storage IO in GB which can be performed by each job.

By default, this is 3GB at DESY which is to small for some validation scripts

Definition at line 86 of file clustercontroldrmaa.py.

◆ requirement_vmem

int requirement_vmem = 4

required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consumption

Definition at line 81 of file clustercontroldrmaa.py.

◆ tools

tools = self.adjust_path(os.environ["BELLE2_TOOLS"])
inherited

Path to the basf2 tools and central/local release.

Definition at line 46 of file clustercontrolbase.py.


The documentation for this class was generated from the following file: