15 import multiprocessing
16 from typing
import Optional, Dict
20 import validationfunctions
21 from validationscript
import Script
26 A class that provides the controls for local multi-processing via the
27 subprocess-module. It provides two methods:
28 - is_job_finished(job): Returns True or False, depending on whether the job
29 has finished execution
30 - execute(job): Takes a job and executes it by spawning a new process
32 @var jobs_processes: Map between jobs and the processes spawned for them
33 @var logger: Reference to the logging object
34 @var max_number_of_processes: The maximum number of parallel processes
35 @var current_number_of_processes: The number of processes currently running
41 Local control is always supported
48 Returns name of this job contol
55 Returns description of this job control
57 return "Multi-processing on the local machine"
59 def __init__(self, max_number_of_processes: Optional[int] =
None):
61 The default constructor.
62 - Initializes a list which holds a connection between Script-Objects
63 and their respective processes.
64 - Initialized a logger which writes to validate_basf2.py's log.
66 @param max_number_of_processes: The maximum number of processes
71 self.jobs_processes: Dict[Script, subprocess.Popen] = {}
77 self.logger = logging.getLogger(
"validate_basf2")
83 if max_number_of_processes
is None:
84 max_number_of_processes = multiprocessing.cpu_count()
86 "Number of parallel processes has been set to CPU count"
88 self.max_number_of_processes = max_number_of_processes
89 except NotImplementedError:
91 "Number of CPUs could not be determined, number of parallel "
92 "processes set to default value."
94 self.max_number_of_processes = 10
98 f
"Local job control will use {self.max_number_of_processes} "
99 f
"parallel processes."
103 self.current_number_of_processes = 0
107 Checks whether the number of current parallel processes is below the
108 limit and a new process can be started.
109 @return: True if a new process can be spawned, otherwise False
112 return (self.max_number_of_processes > 0)
and (
113 self.current_number_of_processes < self.max_number_of_processes
116 def execute(self, job: Script, options=
"", dry=
False, tag=
"current"):
118 Takes a Script object and a string with options and runs it locally,
119 either with ROOT or with basf2, depending on the file type.
121 @param job: The steering file object that should be executed
122 @param options: Options that will be given to the basf2 command
123 @param dry: Whether to perform a dry run or not
124 @param tag: The name of the folder within the results directory
138 cwd, tag, job.package
140 if not os.path.exists(output_dir):
141 os.makedirs(output_dir)
146 log = open(os.path.basename(job.path) +
".log",
"w+")
149 extension = os.path.splitext(job.path)[1]
150 if extension ==
".C":
152 params = [
"root",
"-b",
"-q", job.path]
159 job.path, options.split()
163 self.logger.debug(subprocess.list2cmdline(params))
169 params = [
"echo",
'"Performing a dry run!"']
170 process = subprocess.Popen(params, stdout=log, stderr=subprocess.STDOUT)
173 self.jobs_processes[job] = process
176 self.current_number_of_processes += 1
181 def is_job_finished(self, job: Script):
183 Checks if a given job has finished.
185 @param job: The job of which we want to know if it finished
186 @return: True if the job has finished, otherwise False
190 process = self.jobs_processes[job]
194 if process.poll()
is not None:
195 del self.jobs_processes[job]
196 self.current_number_of_processes = len(self.jobs_processes)
197 return [
True, process.returncode]
201 def terminate(self, job: Script):
203 Terminate a running job
206 process = self.jobs_processes[job]
209 del self.jobs_processes[job]
210 self.current_number_of_processes = len(self.jobs_processes)
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)
def get_results_tag_package_folder(output_base_dir, tag, package)