19 import validationfunctions
20 from validationscript
import Script
25 A class that provides the controls for running jobs on a (remote)
26 Sun Grid Engine cluster. It provides two methods:
27 - is_job_finished(job): Returns True or False, depending on whether the job
28 has finished execution
29 - execute(job): Takes a job and executes it by sending it to the cluster
35 Check if qsub is available
37 return shutil.which(
"qsub")
is not None
42 Returns name of this job contol
49 Returns description of this job control
51 return "Batch submission via command line to Grid Engine"
55 The default constructor.
56 - Holds the current working directory, which is also the location of
57 the shellscripts that are being sent to the cluster.
58 - Initializes a logger which writes to validate_basf2.py's log.
59 - Finds the revision of basf2 that will be set up on the cluster.
65 "qsub -cwd -l h_vmem={requirement_vmem}G,"
66 "h_fsize={requirement_storage}G "
67 "-oo {logfile} -q {queuename} -V"
84 self.
pathpath = os.getcwd()
90 self.
loggerlogger = logging.getLogger(
"validate_basf2")
98 belle2_release_dir = os.environ.get(
"BELLE2_RELEASE_DIR",
None)
99 belle2_local_dir = os.environ.get(
"BELLE2_LOCAL_DIR",
None)
103 if belle2_release_dir
is not None:
104 self.
b2setupb2setup +=
" " + belle2_release_dir.split(
"/")[-1]
105 if belle2_local_dir
is not None:
112 if os.environ.get(
"BELLE2_OPTION") !=
"debug":
113 self.
b2setupb2setup +=
"; b2code-option " + os.environ.get(
"BELLE2_OPTION")
116 self.
loggerlogger.debug(f
"Setting up the following release: {self.b2setup}")
120 clusterlog_dir =
"./html/logs/__general__/"
121 if not os.path.exists(clusterlog_dir):
122 os.makedirs(clusterlog_dir)
125 self.
clusterlogclusterlog = open(clusterlog_dir +
"clusterlog.log",
"w+")
130 This method can be used if path names are different on submission
132 @param path: The past that needs to be adjusted
133 @return: The adjusted path
141 The cluster should always be available to accept new jobs.
142 @return: Will always return True if the function can be called
147 def execute(self, job: Script, options=
"", dry=
False, tag=
"current"):
149 Takes a Script object and a string with options and runs it on the
150 cluster, either with ROOT or with basf2, depending on the file type.
152 @param job: The steering file object that should be executed
153 @param options: Options that will be given to the basf2 command
154 @param dry: Whether to perform a dry run or not
155 @param tag: The folder within the results directory
164 output_dir = os.path.abspath(f
"./results/{tag}/{job.package}")
165 if not os.path.exists(output_dir):
166 os.makedirs(output_dir)
169 log_file = output_dir +
"/" + os.path.basename(job.path) +
".log"
172 donefile_path = f
"{self.path}/script_{job.name}.done"
173 if os.path.isfile(donefile_path):
174 os.remove(donefile_path)
177 extension = os.path.splitext(job.path)[1]
178 if extension ==
".C":
180 command =
"root -b -q " + job.path
185 job.path, options.split()
187 command = subprocess.list2cmdline(params)
195 tmp_name = self.
pathpath +
"/" +
"script_" + job.name +
".sh"
196 with open(tmp_name,
"w+")
as tmp_file:
199 +
"BELLE2_NO_TOOLS_CHECK=1 \n"
200 + f
"source {self.tools}/b2setup \n"
201 +
"cd {} \n".format(self.
adjust_pathadjust_path(output_dir))
203 +
"echo $? > {}/script_{}.done \n".format(self.
pathpath, job.name)
204 + f
"rm {tmp_name} \n"
208 st = os.stat(tmp_name)
209 os.chmod(tmp_name, st.st_mode | stat.S_IEXEC)
217 ).split() + [tmp_name]
220 self.
loggerlogger.debug(subprocess.list2cmdline(params))
227 process = subprocess.Popen(
228 params, stdout=self.
clusterlogclusterlog, stderr=subprocess.STDOUT
232 if process.wait() != 0:
233 job.status =
"failed"
235 os.system(f
"echo 0 > {self.path}/script_{job.name}.done")
240 Checks whether the '.done'-file has been created for a job. If so, it
241 returns True, else it returns False.
242 Also deletes the .done-File once it has returned True.
244 @param job: The job of which we want to know if it finished
245 @return: True if the job has finished, otherwise False
249 donefile_path = f
"{self.path}/script_{job.name}.done"
253 if os.path.isfile(donefile_path):
256 with open(donefile_path)
as f:
258 returncode = int(f.read().strip())
263 os.remove(donefile_path)
266 return [
True, returncode]
275 Terminate a running job, not support with this backend so ignore the
278 self.
loggerlogger.error(
"Script termination not supported.")
logger
Contains a reference to the logger-object from validate_basf2 Set up the logging functionality for th...
queuename
Queue best suitable for execution at DESY NAF.
tools
We need to set up the same environment on the cluster like on the local machine.
def available(self)
The cluster should always be available to accept new jobs.
b2setup
The command for b2setup (and setoption)
clusterlog
The file object to which all cluster messages will be written.
def is_job_finished(self, Script job)
Checks whether the '.done'-file has been created for a job.
path
The path, where the help files are being created Maybe there should be a special subfolder for them?
requirement_vmem
required vmem by the job in GB, required on DESY NAF, otherwise jobs get killed due to memory consump...
def execute(self, Script job, options="", dry=False, tag="current")
Takes a Script object and a string with options and runs it on the cluster, either with ROOT or with ...
requirement_storage
the storage IO in GB which can be performed by each job.
def terminate(self, Script job)
Terminate a running job, not support with this backend so ignore the call.
submit_command
The command to submit a job.
def __init__(self)
The default constructor.
def adjust_path(self, str path)
This method can be used if path names are different on submission and execution hosts.
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)