Belle II Software development
clustercontrol.py
1#!/usr/bin/env python3
2
3
10
11# std
12import logging
13import os
14import subprocess
15import stat
16import shutil
17import re
18import traceback
19from typing import Tuple
20
21# ours
22from validationscript import Script
23
24
25class Cluster:
26 """
27 A class that provides the controls for running jobs on a (remote)
28 cluster. It provides two methods:
29 - is_job_finished(job): Returns True or False, depending on whether the job
30 has finished execution
31 - execute(job): Takes a job and executes it by sending it to the cluster
32 """
33
34 @staticmethod
36 """
37 Check if the bsub command is available
38 """
39 return shutil.which("bsub") is not None
40
41 @staticmethod
42 def name():
43 """
44 Returns name of this job control
45 """
46 return "cluster"
47
48 @staticmethod
50 """
51 Returns description of this job control
52 """
53 return "Batch submission to bsub-based cluster"
54
55 def __init__(self):
56 """!
57 The default constructor.
58 - Holds the current working directory, which is also the location of
59 the shellscripts that are being sent to the cluster.
60 - Initializes a logger which writes to validate_basf2.py's log.
61 - Finds the revision of basf2 that will be set up on the cluster.
62 """
63
64
66 self.path = os.getcwd()
67
68
72 self.logger = logging.getLogger("validate_basf2")
73
74 # We need to set up the same environment on the cluster like on the
75 # local machine. The information can be extracted from $BELLE2_TOOLS,
76 # $BELLE2_RELEASE_DIR and $BELLE2_LOCAL_DIR
77
78
79 self.tools = self.adjust_path(os.environ["BELLE2_TOOLS"])
80 belle2_release_dir = os.environ.get("BELLE2_RELEASE_DIR", None)
81 belle2_local_dir = os.environ.get("BELLE2_LOCAL_DIR", None)
82
83
84 self.b2setup = "b2setup"
85 if belle2_release_dir is not None:
86 self.b2setup += " " + belle2_release_dir.split("/")[-1]
87 if belle2_local_dir is not None:
88 self.b2setup = (
89 "MY_BELLE2_DIR="
90 + self.adjust_path(belle2_local_dir)
91 + " "
92 + self.b2setup
93 )
94 if os.environ.get("BELLE2_OPTION") != "debug":
95 self.b2setup += "; b2code-option " + os.environ.get("BELLE2_OPTION")
96
97 # Write to log which revision we are using
98 self.logger.debug(f"Setting up the following release: {self.b2setup}")
99
100 # Define the folder in which the log of the cluster messages will be
101 # stored
102 clusterlog_dir = "./html/logs/__general__/"
103 if not os.path.exists(clusterlog_dir):
104 os.makedirs(clusterlog_dir)
105
106 # noinspection PyMethodMayBeStatic
107 def adjust_path(self, path):
108 """!
109 This method can be used if path names are different on submission
110 and execution hosts.
111 @param path: The past that needs to be adjusted
112 @return: The adjusted path
113 """
114
115 return path
116
117 # noinspection PyMethodMayBeStatic
118 def available(self):
119 """!
120 The cluster should always be available to accept new jobs.
121 @return: Will always return True if the function can be called
122 """
123
124 return True
125
126 def execute(self, job: Script, options="", dry=False, tag="current"):
127 """!
128 Takes a Script object and a string with options and runs it on the
129 cluster, either with ROOT or with basf2, depending on the file type.
130
131 @param job: The steering file object that should be executed
132 @param options: Options that will be given to the basf2 command
133 @param dry: Whether to perform a dry run or not
134 @param tag: The folder within the results directory
135 @return: None
136 """
137
138 # Define the folder in which the results (= the ROOT files) should be
139 # created. This is where the files containing plots will end up. By
140 # convention, data files will be stored in the parent dir.
141 # Then make sure the folder exists (create if it does not exist) and
142 # change to cwd to this folder.
143 output_dir = os.path.abspath(f"./results/{tag}/{job.package}")
144 if not os.path.exists(output_dir):
145 os.makedirs(output_dir)
146
147 log_file = output_dir + "/" + os.path.basename(job.path) + ".log"
148
149 # Remove any left over done files
150 donefile_path = f"{self.path}/script_{job.name}.done"
151 if os.path.isfile(donefile_path):
152 os.remove(donefile_path)
153
154 extension = os.path.splitext(job.path)[1]
155 if extension == ".C":
156 # .c files are executed with root
157 command = "root -b -q " + job.path
158 else:
159 # .py files are executed with basf2
160 # 'options' contains an option-string for basf2, e.g. '-n 100'
161 command = f"basf2 {job.path} {options}"
162
163 # Create a helpfile-shellscript, which contains all the commands that
164 # need to be executed by the cluster.
165 # First, set up the basf2 tools and perform b2setup with the correct
166 # revision. The execute the command (i.e. run basf2 or ROOT on a
167 # steering file). Write the return code of that into a *.done file.
168 # Delete the helpfile-shellscript.
169 tmp_name = self._get_tmp_name(job)
170 with open(tmp_name, "w+") as tmp_file:
171 tmp_file.write(
172 "#!/bin/bash \n\n"
173 + "BELLE2_NO_TOOLS_CHECK=1 \n"
174 + f"source {self.tools}/b2setup \n"
175 + f"cd {self.adjust_path(output_dir)} \n"
176 + f"{command} \n"
177 + f"echo $? > {self.path}/script_{job.name}.done \n"
178 + f"rm {tmp_name} \n"
179 )
180
181 # Make the helpfile-shellscript executable
182 st = os.stat(tmp_name)
183 os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
184
185 # Prepare the command line command for submission to the cluster
186 params = [
187 "bsub",
188 "-oo",
189 log_file,
190 "-q",
191 "l",
192 tmp_name,
193 ]
194
195 # Log the command we are about the execute
196 self.logger.debug(subprocess.list2cmdline(params))
197
198 if not dry:
199 try:
200 proc = subprocess.run(
201 params,
202 stdout=subprocess.PIPE,
203 stderr=subprocess.PIPE,
204 universal_newlines=True,
205 )
206 except subprocess.CalledProcessError:
207 job.status = "failed"
208 self.logger.error("Failed to submit job. Here's the traceback:")
209 self.logger.error(traceback.format_exc())
210 self.logger.error("Will attempt to cleanup job files.")
211 self._cleanup(job)
212 return
213 else:
214 if proc.stdout.strip():
215 self.logger.debug(
216 f"Stdout of job submission: '{proc.stdout.strip()}'."
217 )
218 if proc.stderr.strip():
219 self.logger.debug(
220 f"Stderr of job submission: '{proc.stderr.strip()}'."
221 )
222
223 # Submission succeeded. Get Job ID by parsing output, so that
224 # we can terminate the job later.
225 res = re.search("Job <([0-9]*)> is submitted", proc.stdout)
226 if res:
227 job.job_id = res.group(1)
228 else:
229 self.logger.error(
230 "Could not find job id! Will not be able to terminate"
231 " this job, even if necessary. "
232 )
233 else:
234 os.system(f"echo 0 > {self.path}/script_{job.name}.done")
235 self._cleanup(job)
236
237 def _cleanup(self, job: Script) -> None:
238 """ Clean up after job has finished. """
239 tmp_name = self._get_tmp_name(job)
240 os.unlink(tmp_name)
241
242 def _get_tmp_name(self, job: Script) -> str:
243 """ Name of temporary file used for job submission. """
244 return self.path + "/" + "script_" + job.name + ".sh"
245
246 def is_job_finished(self, job: Script) -> Tuple[bool, int]:
247 """!
248 Checks whether the '.done'-file has been created for a job. If so, it
249 returns True, else it returns False.
250 Also deletes the .done-File once it has returned True.
251
252 @param job: The job of which we want to know if it finished
253 @return: (True if the job has finished, exit code). If we can't find the
254 exit code in the '.done'-file, the returncode will be -654.
255 If the job is not finished, the exit code is returned as 0.
256 """
257
258 donefile_path = f"{self.path}/script_{job.name}.done"
259
260 if os.path.isfile(donefile_path):
261 # Job finished.
262 # Read the returncode/exit_status
263 with open(donefile_path) as f:
264 try:
265 returncode = int(f.read().strip())
266 except ValueError:
267 returncode = -654
268
269 os.remove(donefile_path)
270
271 return True, returncode
272
273 else:
274 # If no such file exists, the job has not yet finished
275 return False, 0
276
277 def terminate(self, job: Script):
278 """! Terminate a running job
279 """
280 if job.job_id:
281 params = ["bkill", job.job_id]
282 self.logger.debug(subprocess.list2cmdline(params))
283 try:
284 proc = subprocess.run(
285 params,
286 stdout=subprocess.PIPE,
287 stderr=subprocess.PIPE,
288 universal_newlines=True,
289 )
290 except subprocess.CalledProcessError:
291 job.status = "failed"
292 self.logger.error(
293 "Probably wasn't able to cancel job. Here's the traceback:"
294 )
295 self.logger.error(traceback.format_exc())
296 else:
297 if proc.stdout.strip():
298 self.logger.debug(
299 f"Stdout of job termination: '{proc.stdout.strip()}'."
300 )
301 if proc.stderr.strip():
302 self.logger.debug(
303 f"Stderr of job termination: '{proc.stderr.strip()}'."
304 )
305 finally:
306 self._cleanup(job)
307 else:
308 self.logger.error(
309 "Termination of the job corresponding to steering file "
310 f"{job.path} has been requested, but no job id is available."
311 " Can't do anything."
312 )
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
tools
Path to the basf2 tools and central/local release.
None _cleanup(self, Script job)
b2setup
The command for b2setup (and b2code-option)
Tuple[bool, int] is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
path
The path, where the help files are being created Maybe there should be a special subfolder for them?
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
str _get_tmp_name(self, Script job)
def adjust_path(self, path)
This method can be used if path names are different on submission and execution hosts.
def terminate(self, Script job)
Terminate a running job.
def __init__(self)
The default constructor.