Belle II Software development
clustercontrolsge.py
1#!/usr/bin/env python3
2
3
10
11# std
12import logging
13import os
14import subprocess
15import stat
16import shutil
17
18# ours
19import validationfunctions
20from validationscript import Script
21
22
23class Cluster:
24 """
25 A class that provides the controls for running jobs on a (remote)
26 Sun Grid Engine cluster. It provides two methods:
27 - is_job_finished(job): Returns True or False, depending on whether the job
28 has finished execution
29 - execute(job): Takes a job and executes it by sending it to the cluster
30 """
31
32 @staticmethod
34 """
35 Check if qsub is available
36 """
37 return shutil.which("qsub") is not None
38
39 @staticmethod
40 def name():
41 """
42 Returns name of this job contol
43 """
44 return "cluster-sge"
45
46 @staticmethod
48 """
49 Returns description of this job control
50 """
51 return "Batch submission via command line to Grid Engine"
52
53 def __init__(self):
54 """!
55 The default constructor.
56 - Holds the current working directory, which is also the location of
57 the shellscripts that are being sent to the cluster.
58 - Initializes a logger which writes to validate_basf2.py's log.
59 - Finds the revision of basf2 that will be set up on the cluster.
60 """
61
62
65 "qsub -cwd -l h_vmem={requirement_vmem}G,"
66 "h_fsize={requirement_storage}G "
67 "-oo {logfile} -q {queuename} -V"
68 )
69
70
73
74
78
79
80 self.queuename = "short.q"
81
82
84 self.path = os.getcwd()
85
86
90 self.logger = logging.getLogger("validate_basf2")
91
92
95
96
97 self.tools = self.adjust_path(os.environ["BELLE2_TOOLS"])
98 belle2_release_dir = os.environ.get("BELLE2_RELEASE_DIR", None)
99 belle2_local_dir = os.environ.get("BELLE2_LOCAL_DIR", None)
100
101
102 self.b2setup = "b2setup"
103 if belle2_release_dir is not None:
104 self.b2setup += " " + belle2_release_dir.split("/")[-1]
105 if belle2_local_dir is not None:
106 self.b2setup = (
107 "MY_BELLE2_DIR="
108 + self.adjust_path(belle2_local_dir)
109 + " "
110 + self.b2setup
111 )
112 if os.environ.get("BELLE2_OPTION") != "debug":
113 self.b2setup += "; b2code-option " + os.environ.get("BELLE2_OPTION")
114
115 # Write to log which revision we are using
116 self.logger.debug(f"Setting up the following release: {self.b2setup}")
117
118 # Define the folder in which the log of the cluster messages will be
119 # stored (same folder like the log for validate_basf2.py)
120 clusterlog_dir = "./html/logs/__general__/"
121 if not os.path.exists(clusterlog_dir):
122 os.makedirs(clusterlog_dir)
123
124
125 self.clusterlog = open(clusterlog_dir + "clusterlog.log", "w+")
126
127 # noinspection PyMethodMayBeStatic
128 def adjust_path(self, path: str):
129 """!
130 This method can be used if path names are different on submission
131 and execution hosts.
132 @param path: The past that needs to be adjusted
133 @return: The adjusted path
134 """
135
136 return path
137
138 # noinspection PyMethodMayBeStatic
139 def available(self):
140 """!
141 The cluster should always be available to accept new jobs.
142 @return: Will always return True if the function can be called
143 """
144
145 return True
146
147 def execute(self, job: Script, options="", dry=False, tag="current"):
148 """!
149 Takes a Script object and a string with options and runs it on the
150 cluster, either with ROOT or with basf2, depending on the file type.
151
152 @param job: The steering file object that should be executed
153 @param options: Options that will be given to the basf2 command
154 @param dry: Whether to perform a dry run or not
155 @param tag: The folder within the results directory
156 @return: None
157 """
158
159 # Define the folder in which the results (= the ROOT files) should be
160 # created. This is where the files containing plots will end up. By
161 # convention, data files will be stored in the parent dir.
162 # Then make sure the folder exists (create if it does not exist) and
163 # change to cwd to this folder.
164 output_dir = os.path.abspath(f"./results/{tag}/{job.package}")
165 if not os.path.exists(output_dir):
166 os.makedirs(output_dir)
167
168 # Path where log file is supposed to be created
169 log_file = output_dir + "/" + os.path.basename(job.path) + ".log"
170
171 # Remove any left over done files
172 donefile_path = f"{self.path}/script_{job.name}.done"
173 if os.path.isfile(donefile_path):
174 os.remove(donefile_path)
175
176 # Now we need to distinguish between .py and .C files:
177 extension = os.path.splitext(job.path)[1]
178 if extension == ".C":
179 # .c files are executed with root
180 command = "root -b -q " + job.path
181 else:
182 # .py files are executed with basf2
183 # 'options' contains an option-string for basf2, e.g. '-n 100'
185 job.path, options.split()
186 )
187 command = subprocess.list2cmdline(params)
188
189 # Create a helpfile-shellscript, which contains all the commands that
190 # need to be executed by the cluster.
191 # First, set up the basf2 tools and perform b2setup with the correct
192 # revision. The execute the command (i.e. run basf2 or ROOT on a
193 # steering file). Write the return code of that into a *.done file.
194 # Delete the helpfile-shellscript.
195 tmp_name = self.path + "/" + "script_" + job.name + ".sh"
196 with open(tmp_name, "w+") as tmp_file:
197 tmp_file.write(
198 "#!/bin/bash \n\n"
199 + "BELLE2_NO_TOOLS_CHECK=1 \n"
200 + f"source {self.tools}/b2setup \n"
201 + f"cd {self.adjust_path(output_dir)} \n"
202 + f"{command} \n"
203 + f"echo $? > {self.path}/script_{job.name}.done \n"
204 + f"rm {tmp_name} \n"
205 )
206
207 # Make the helpfile-shellscript executable
208 st = os.stat(tmp_name)
209 os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
210
211 # Prepare the command line command for submission to the cluster
212 params = self.submit_command.format(
213 queuename=self.queuename,
214 requirement_storage=self.requirement_storage,
215 requirement_vmem=self.requirement_vmem,
216 logfile=log_file,
217 ).split() + [tmp_name]
218
219 # Log the command we are about the execute
220 self.logger.debug(subprocess.list2cmdline(params))
221
222 # Submit it to the cluster. The steering
223 # file output will be written to 'log_file' (see above).
224 # If we are performing a dry run, don't send anything to the cluster
225 # and just create the *.done file right away and delete the *.sh file.
226 if not dry:
227 process = subprocess.Popen(
228 params, stdout=self.clusterlog, stderr=subprocess.STDOUT
229 )
230
231 # Check whether the submission succeeded
232 if process.wait() != 0:
233 job.status = "failed"
234 else:
235 os.system(f"echo 0 > {self.path}/script_{job.name}.done")
236 os.unlink(tmp_name)
237
238 def is_job_finished(self, job: Script):
239 """!
240 Checks whether the '.done'-file has been created for a job. If so, it
241 returns True, else it returns False.
242 Also deletes the .done-File once it has returned True.
243
244 @param job: The job of which we want to know if it finished
245 @return: True if the job has finished, otherwise False
246 """
247
248 # If there is a file indicating the job is done, that is its name:
249 donefile_path = f"{self.path}/script_{job.name}.done"
250
251 # Check if such a file exists. If so, this means that the job has
252 # finished.
253 if os.path.isfile(donefile_path):
254
255 # Read the returncode/exit_status for the job from the *.done-file
256 with open(donefile_path) as f:
257 try:
258 returncode = int(f.read().strip())
259 except ValueError:
260 returncode = -654
261
262 # Delete the *.done file
263 os.remove(donefile_path)
264
265 # Return that the job is finished + the return code for it
266 return [True, returncode]
267
268 # If no such file exists, the job has not yet finished
269 else:
270 return [False, 0]
271
272 # noinspection PyMethodMayBeStatic
273 def terminate(self, job: Script):
274 """!
275 Terminate a running job, not support with this backend so ignore the
276 call.
277 """
278 self.logger.error("Script termination not supported.")
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
queuename
Queue best suitable for execution at DESY NAF.
tools
We need to set up the same environment on the cluster like on the local machine.
b2setup
The command for b2setup (and setoption)
clusterlog
The file object to which all cluster messages will be written.
def is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
path
The path, where the help files are being created Maybe there should be a special subfolder for them?
requirement_vmem
required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consump...
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
requirement_storage
the storage IO in GB which can be performed by each job.
def terminate(self, Script job)
Terminate a running job, not support with this backend so ignore the call.
submit_command
The command to submit a job.
def __init__(self)
The default constructor.
def adjust_path(self, str path)
This method can be used if path names are different on submission and execution hosts.
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)