9 from typing
import Optional, Dict
13 import validationfunctions
14 from validationscript
import Script
19 A class that provides the controls for local multi-processing via the
20 subprocess-module. It provides two methods:
21 - is_job_finished(job): Returns True or False, depending on whether the job
22 has finished execution
23 - execute(job): Takes a job and executes it by spawning a new process
25 @var jobs_processes: Map between jobs and the processes spawned for them
26 @var logger: Reference to the logging object
27 @var max_number_of_processes: The maximum number of parallel processes
28 @var current_number_of_processes: The number of processes currently running
34 Local control is always supported
41 Returns name of this job contol
48 Returns description of this job control
50 return "Multi-processing on the local machine"
52 def __init__(self, max_number_of_processes: Optional[int] =
None):
54 The default constructor.
55 - Initializes a list which holds a connection between Script-Objects
56 and their respective processes.
57 - Initialized a logger which writes to validate_basf2.py's log.
59 @param max_number_of_processes: The maximum number of processes
64 self.jobs_processes = {}
70 self.logger = logging.getLogger(
'validate_basf2')
76 if max_number_of_processes
is None:
77 max_number_of_processes = multiprocessing.cpu_count()
79 "Number of parallel processes has been set to CPU count"
81 self.max_number_of_processes = max_number_of_processes
82 except NotImplementedError:
84 "Number of CPUs could not be determined, number of parallel "
85 "processes set to default value."
87 self.max_number_of_processes = 10
91 f
"Local job control will use {self.max_number_of_processes} "
92 f
"parallel processes.")
95 self.current_number_of_processes = 0
99 Checks whether the number of current parallel processes is below the
100 limit and a new process can be started.
101 @return: True if a new process can be spawned, otherwise False
104 return (self.max_number_of_processes > 0)
and \
105 (self.current_number_of_processes < self.max_number_of_processes)
107 def execute(self, job: Script, options=
'', dry=
False, tag=
'current'):
109 Takes a Script object and a string with options and runs it locally,
110 either with ROOT or with basf2, depending on the file type.
112 @param job: The steering file object that should be executed
113 @param options: Options that will be given to the basf2 command
114 @param dry: Whether to perform a dry run or not
115 @param tag: The name of the folder within the results directory
129 cwd, tag, job.package
131 if not os.path.exists(output_dir):
132 os.makedirs(output_dir)
137 log = open(os.path.basename(job.path) +
'.log',
'w+')
140 extension = os.path.splitext(job.path)[1]
141 if extension ==
'.C':
143 params = [
'root',
'-b',
'-q', job.path]
150 job.path, options.split()
154 self.logger.debug(subprocess.list2cmdline(params))
160 params = [
'echo',
'"Performing a dry run!"']
161 process = subprocess.Popen(
162 params, stdout=log, stderr=subprocess.STDOUT
166 self.jobs_processes[job] = process
169 self.current_number_of_processes += 1
174 def is_job_finished(self, job: Script):
176 Checks if a given job has finished.
178 @param job: The job of which we want to know if it finished
179 @return: True if the job has finished, otherwise False
183 process = self.jobs_processes[job]
187 if process.poll()
is not None:
188 del self.jobs_processes[job]
189 self.current_number_of_processes = len(self.jobs_processes)
190 return [
True, process.returncode]
194 def terminate(self, job: Script):
196 Terminate a running job
199 process = self.jobs_processes[job]
202 del self.jobs_processes[job]
203 self.current_number_of_processes = len(self.jobs_processes)