Belle II Software development
Cluster Class Reference

Public Member Functions

def __init__ (self)
 The default constructor.
 
def adjust_path (self, path)
 This method can be used if path names are different on submission and execution hosts.
 
def available (self)
 The cluster should always be available to accept new jobs.
 
def execute (self, Script job, options="", dry=False, tag="current")
 Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with basf2, depending on the file type.
 
Tuple[bool, int] is_job_finished (self, Script job)
 Checks whether the '.done'-file has been created for a job.
 
def terminate (self, Script job)
 Terminate a running job.
 

Static Public Member Functions

def is_supported ()
 
def name ()
 
def description ()
 

Public Attributes

 path
 The path, where the help files are being created Maybe there should be a special subfolder for them?
 
 logger
 Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for the 'cluster execution'-Class, so we can log to validate_basf2.py's log what is going on in .execute and .is_finished.
 
 tools
 Path to the basf2 tools and central/local release.
 
 b2setup
 The command for b2setup (and b2code-option)
 

Protected Member Functions

None _cleanup (self, Script job)
 
str _get_tmp_name (self, Script job)
 

Detailed Description

A class that provides the controls for running jobs on a (remote)
cluster. It provides two methods:
- is_job_finished(job): Returns True or False, depending on whether the job
    has finished execution
- execute(job): Takes a job and executes it by sending it to the cluster

Definition at line 25 of file clustercontrol.py.

Constructor & Destructor Documentation

◆ __init__()

def __init__ (   self)

The default constructor.

  • Holds the current working directory, which is also the location of the shellscripts that are being sent to the cluster.
  • Initializes a logger which writes to validate_basf2.py's log.
  • Finds the revision of basf2 that will be set up on the cluster.

Definition at line 55 of file clustercontrol.py.

55 def __init__(self):
56 """!
57 The default constructor.
58 - Holds the current working directory, which is also the location of
59 the shellscripts that are being sent to the cluster.
60 - Initializes a logger which writes to validate_basf2.py's log.
61 - Finds the revision of basf2 that will be set up on the cluster.
62 """
63
64
66 self.path = os.getcwd()
67
68
72 self.logger = logging.getLogger("validate_basf2")
73
74 # We need to set up the same environment on the cluster like on the
75 # local machine. The information can be extracted from $BELLE2_TOOLS,
76 # $BELLE2_RELEASE_DIR and $BELLE2_LOCAL_DIR
77
78
79 self.tools = self.adjust_path(os.environ["BELLE2_TOOLS"])
80 belle2_release_dir = os.environ.get("BELLE2_RELEASE_DIR", None)
81 belle2_local_dir = os.environ.get("BELLE2_LOCAL_DIR", None)
82
83
84 self.b2setup = "b2setup"
85 if belle2_release_dir is not None:
86 self.b2setup += " " + belle2_release_dir.split("/")[-1]
87 if belle2_local_dir is not None:
88 self.b2setup = (
89 "MY_BELLE2_DIR="
90 + self.adjust_path(belle2_local_dir)
91 + " "
92 + self.b2setup
93 )
94 if os.environ.get("BELLE2_OPTION") != "debug":
95 self.b2setup += "; b2code-option " + os.environ.get("BELLE2_OPTION")
96
97 # Write to log which revision we are using
98 self.logger.debug(f"Setting up the following release: {self.b2setup}")
99
100 # Define the folder in which the log of the cluster messages will be
101 # stored
102 clusterlog_dir = "./html/logs/__general__/"
103 if not os.path.exists(clusterlog_dir):
104 os.makedirs(clusterlog_dir)
105

Member Function Documentation

◆ _cleanup()

None _cleanup (   self,
Script  job 
)
protected
 Clean up after job has finished. 

Definition at line 237 of file clustercontrol.py.

237 def _cleanup(self, job: Script) -> None:
238 """ Clean up after job has finished. """
239 tmp_name = self._get_tmp_name(job)
240 os.unlink(tmp_name)
241

◆ _get_tmp_name()

str _get_tmp_name (   self,
Script  job 
)
protected
 Name of temporary file used for job submission. 

Definition at line 242 of file clustercontrol.py.

242 def _get_tmp_name(self, job: Script) -> str:
243 """ Name of temporary file used for job submission. """
244 return self.path + "/" + "script_" + job.name + ".sh"
245

◆ adjust_path()

def adjust_path (   self,
  path 
)

This method can be used if path names are different on submission and execution hosts.

Parameters
pathThe past that needs to be adjusted
Returns
: The adjusted path

Definition at line 107 of file clustercontrol.py.

107 def adjust_path(self, path):
108 """!
109 This method can be used if path names are different on submission
110 and execution hosts.
111 @param path: The past that needs to be adjusted
112 @return: The adjusted path
113 """
114
115 return path
116

◆ available()

def available (   self)

The cluster should always be available to accept new jobs.

Returns
: Will always return True if the function can be called

Definition at line 118 of file clustercontrol.py.

118 def available(self):
119 """!
120 The cluster should always be available to accept new jobs.
121 @return: Will always return True if the function can be called
122 """
123
124 return True
125

◆ description()

def description ( )
static
Returns description of this job control

Definition at line 49 of file clustercontrol.py.

49 def description():
50 """
51 Returns description of this job control
52 """
53 return "Batch submission to bsub-based cluster"
54

◆ execute()

def execute (   self,
Script  job,
  options = "",
  dry = False,
  tag = "current" 
)

Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with basf2, depending on the file type.

Parameters
jobThe steering file object that should be executed
optionsOptions that will be given to the basf2 command
dryWhether to perform a dry run or not
tagThe folder within the results directory
Returns
: None

Definition at line 126 of file clustercontrol.py.

126 def execute(self, job: Script, options="", dry=False, tag="current"):
127 """!
128 Takes a Script object and a string with options and runs it on the
129 cluster, either with ROOT or with basf2, depending on the file type.
130
131 @param job: The steering file object that should be executed
132 @param options: Options that will be given to the basf2 command
133 @param dry: Whether to perform a dry run or not
134 @param tag: The folder within the results directory
135 @return: None
136 """
137
138 # Define the folder in which the results (= the ROOT files) should be
139 # created. This is where the files containing plots will end up. By
140 # convention, data files will be stored in the parent dir.
141 # Then make sure the folder exists (create if it does not exist) and
142 # change to cwd to this folder.
143 output_dir = os.path.abspath(f"./results/{tag}/{job.package}")
144 if not os.path.exists(output_dir):
145 os.makedirs(output_dir)
146
147 log_file = output_dir + "/" + os.path.basename(job.path) + ".log"
148
149 # Remove any left over done files
150 donefile_path = f"{self.path}/script_{job.name}.done"
151 if os.path.isfile(donefile_path):
152 os.remove(donefile_path)
153
154 extension = os.path.splitext(job.path)[1]
155 if extension == ".C":
156 # .c files are executed with root
157 command = "root -b -q " + job.path
158 else:
159 # .py files are executed with basf2
160 # 'options' contains an option-string for basf2, e.g. '-n 100'
161 command = f"basf2 {job.path} {options}"
162
163 # Create a helpfile-shellscript, which contains all the commands that
164 # need to be executed by the cluster.
165 # First, set up the basf2 tools and perform b2setup with the correct
166 # revision. The execute the command (i.e. run basf2 or ROOT on a
167 # steering file). Write the return code of that into a *.done file.
168 # Delete the helpfile-shellscript.
169 tmp_name = self._get_tmp_name(job)
170 with open(tmp_name, "w+") as tmp_file:
171 tmp_file.write(
172 "#!/bin/bash \n\n"
173 + "BELLE2_NO_TOOLS_CHECK=1 \n"
174 + f"source {self.tools}/b2setup \n"
175 + f"cd {self.adjust_path(output_dir)} \n"
176 + f"{command} \n"
177 + f"echo $? > {self.path}/script_{job.name}.done \n"
178 + f"rm {tmp_name} \n"
179 )
180
181 # Make the helpfile-shellscript executable
182 st = os.stat(tmp_name)
183 os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
184
185 # Prepare the command line command for submission to the cluster
186 params = [
187 "bsub",
188 "-oo",
189 log_file,
190 "-q",
191 "l",
192 tmp_name,
193 ]
194
195 # Log the command we are about the execute
196 self.logger.debug(subprocess.list2cmdline(params))
197
198 if not dry:
199 try:
200 proc = subprocess.run(
201 params,
202 stdout=subprocess.PIPE,
203 stderr=subprocess.PIPE,
204 universal_newlines=True,
205 )
206 except subprocess.CalledProcessError:
207 job.status = "failed"
208 self.logger.error("Failed to submit job. Here's the traceback:")
209 self.logger.error(traceback.format_exc())
210 self.logger.error("Will attempt to cleanup job files.")
211 self._cleanup(job)
212 return
213 else:
214 if proc.stdout.strip():
215 self.logger.debug(
216 f"Stdout of job submission: '{proc.stdout.strip()}'."
217 )
218 if proc.stderr.strip():
219 self.logger.debug(
220 f"Stderr of job submission: '{proc.stderr.strip()}'."
221 )
222
223 # Submission succeeded. Get Job ID by parsing output, so that
224 # we can terminate the job later.
225 res = re.search("Job <([0-9]*)> is submitted", proc.stdout)
226 if res:
227 job.job_id = res.group(1)
228 else:
229 self.logger.error(
230 "Could not find job id! Will not be able to terminate"
231 " this job, even if necessary. "
232 )
233 else:
234 os.system(f"echo 0 > {self.path}/script_{job.name}.done")
235 self._cleanup(job)
236

◆ is_job_finished()

Tuple[bool, int] is_job_finished (   self,
Script  job 
)

Checks whether the '.done'-file has been created for a job.

If so, it returns True, else it returns False. Also deletes the .done-File once it has returned True.

Parameters
jobThe job of which we want to know if it finished
Returns
: (True if the job has finished, exit code). If we can't find the exit code in the '.done'-file, the returncode will be -654. If the job is not finished, the exit code is returned as 0.

Definition at line 246 of file clustercontrol.py.

246 def is_job_finished(self, job: Script) -> Tuple[bool, int]:
247 """!
248 Checks whether the '.done'-file has been created for a job. If so, it
249 returns True, else it returns False.
250 Also deletes the .done-File once it has returned True.
251
252 @param job: The job of which we want to know if it finished
253 @return: (True if the job has finished, exit code). If we can't find the
254 exit code in the '.done'-file, the returncode will be -654.
255 If the job is not finished, the exit code is returned as 0.
256 """
257
258 donefile_path = f"{self.path}/script_{job.name}.done"
259
260 if os.path.isfile(donefile_path):
261 # Job finished.
262 # Read the returncode/exit_status
263 with open(donefile_path) as f:
264 try:
265 returncode = int(f.read().strip())
266 except ValueError:
267 returncode = -654
268
269 os.remove(donefile_path)
270
271 return True, returncode
272
273 else:
274 # If no such file exists, the job has not yet finished
275 return False, 0
276

◆ is_supported()

def is_supported ( )
static
Check if the bsub command is available

Definition at line 35 of file clustercontrol.py.

35 def is_supported():
36 """
37 Check if the bsub command is available
38 """
39 return shutil.which("bsub") is not None
40

◆ name()

def name ( )
static
Returns name of this job control

Definition at line 42 of file clustercontrol.py.

42 def name():
43 """
44 Returns name of this job control
45 """
46 return "cluster"
47

◆ terminate()

def terminate (   self,
Script  job 
)

Terminate a running job.

Definition at line 277 of file clustercontrol.py.

277 def terminate(self, job: Script):
278 """! Terminate a running job
279 """
280 if job.job_id:
281 params = ["bkill", job.job_id]
282 self.logger.debug(subprocess.list2cmdline(params))
283 try:
284 proc = subprocess.run(
285 params,
286 stdout=subprocess.PIPE,
287 stderr=subprocess.PIPE,
288 universal_newlines=True,
289 )
290 except subprocess.CalledProcessError:
291 job.status = "failed"
292 self.logger.error(
293 "Probably wasn't able to cancel job. Here's the traceback:"
294 )
295 self.logger.error(traceback.format_exc())
296 else:
297 if proc.stdout.strip():
298 self.logger.debug(
299 f"Stdout of job termination: '{proc.stdout.strip()}'."
300 )
301 if proc.stderr.strip():
302 self.logger.debug(
303 f"Stderr of job termination: '{proc.stderr.strip()}'."
304 )
305 finally:
306 self._cleanup(job)
307 else:
308 self.logger.error(
309 "Termination of the job corresponding to steering file "
310 f"{job.path} has been requested, but no job id is available."
311 " Can't do anything."
312 )

Member Data Documentation

◆ b2setup

b2setup

The command for b2setup (and b2code-option)

Definition at line 84 of file clustercontrol.py.

◆ logger

logger

Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for the 'cluster execution'-Class, so we can log to validate_basf2.py's log what is going on in .execute and .is_finished.

Definition at line 72 of file clustercontrol.py.

◆ path

path

The path, where the help files are being created Maybe there should be a special subfolder for them?

Definition at line 66 of file clustercontrol.py.

◆ tools

tools

Path to the basf2 tools and central/local release.

Definition at line 79 of file clustercontrol.py.


The documentation for this class was generated from the following file: