16from typing
import Optional
20import validationfunctions
21from validationscript
import Script
26 A class that provides the controls for local multi-processing via the
27 subprocess-module. It provides two methods:
28 - is_job_finished(job): Returns True or False, depending on whether the job
29 has finished execution
30 - execute(job): Takes a job and executes it by spawning a new process
32 @var jobs_processes: Map between jobs and the processes spawned for them
33 @var logger: Reference to the logging object
34 @var max_number_of_processes: The maximum number of parallel processes
35 @var current_number_of_processes: The number of processes currently running
36 @type jobs_processes: Dict[Script, subprocess.Popen]
42 Local control is always supported
49 Returns name of this job control
56 Returns description of this job control
58 return "Multi-processing on the local machine"
60 def __init__(self, max_number_of_processes: Optional[int] =
None):
62 The default constructor.
63 - Initializes a list which holds a connection between Script-Objects
64 and their respective processes.
65 - Initialized a logger which writes to validate_basf2.py's log.
67 @param max_number_of_processes: The maximum number of processes
72 self.jobs_processes = {}
78 self.logger = logging.getLogger(
"validate_basf2")
84 if max_number_of_processes
is None:
85 max_number_of_processes = multiprocessing.cpu_count()
87 "Number of parallel processes has been set to CPU count"
89 self.max_number_of_processes = max_number_of_processes
90 except NotImplementedError:
92 "Number of CPUs could not be determined, number of parallel "
93 "processes set to default value."
95 self.max_number_of_processes = 10
99 f
"Local job control will use {self.max_number_of_processes} "
100 f
"parallel processes."
104 self.current_number_of_processes = 0
108 Checks whether the number of current parallel processes is below the
109 limit and a new process can be started.
110 @return: True if a new process can be spawned, otherwise False
113 return (self.max_number_of_processes > 0)
and (
114 self.current_number_of_processes < self.max_number_of_processes
117 def execute(self, job: Script, options=
"", dry=
False, tag=
"current"):
119 Takes a Script object and a string with options and runs it locally,
120 either with ROOT or with basf2, depending on the file type.
122 @param job: The steering file object that should be executed
123 @param options: Options that will be given to the basf2 command
124 @param dry: Whether to perform a dry run or not
125 @param tag: The name of the folder within the results directory
139 cwd, tag, job.package
141 if not os.path.exists(output_dir):
142 os.makedirs(output_dir)
147 log = open(os.path.basename(job.path) +
".log",
"w+")
150 extension = os.path.splitext(job.path)[1]
151 if extension ==
".C":
153 params = [
"root",
"-b",
"-q", job.path]
160 job.path, options.split()
164 self.logger.debug(subprocess.list2cmdline(params))
170 params = [
"echo",
'"Performing a dry run!"']
171 process = subprocess.Popen(params, stdout=log, stderr=subprocess.STDOUT)
174 self.jobs_processes[job] = process
177 self.current_number_of_processes += 1
182 def is_job_finished(self, job: Script):
184 Checks if a given job has finished.
186 @param job: The job of which we want to know if it finished
187 @return: True if the job has finished, otherwise False
191 process = self.jobs_processes[job]
195 if process.poll()
is not None:
196 del self.jobs_processes[job]
197 self.current_number_of_processes = len(self.jobs_processes)
198 return [
True, process.returncode]
202 def terminate(self, job: Script):
204 Terminate a running job
207 process = self.jobs_processes[job]
210 del self.jobs_processes[job]
211 self.current_number_of_processes = len(self.jobs_processes)
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)
get_results_tag_package_folder(output_base_dir, tag, package)