Belle II Software development
localcontrol.py
1#!/usr/bin/env python3
2
3
10
11# std
12import logging
13import os
14import subprocess
15import multiprocessing
16from typing import Optional
17
18# ours
19import validationpath
20import validationfunctions
21from validationscript import Script
22
23
24class Local:
25 """!
26 A class that provides the controls for local multi-processing via the
27 subprocess-module. It provides two methods:
28 - is_job_finished(job): Returns True or False, depending on whether the job
29 has finished execution
30 - execute(job): Takes a job and executes it by spawning a new process
31
32 @var jobs_processes: Map between jobs and the processes spawned for them
33 @var logger: Reference to the logging object
34 @var max_number_of_processes: The maximum number of parallel processes
35 @var current_number_of_processes: The number of processes currently running
36 @type jobs_processes: Dict[Script, subprocess.Popen]
37 """
38
39 @staticmethod
40 def is_supported():
41 """
42 Local control is always supported
43 """
44 return True
45
46 @staticmethod
47 def name():
48 """
49 Returns name of this job control
50 """
51 return "local"
52
53 @staticmethod
54 def description():
55 """
56 Returns description of this job control
57 """
58 return "Multi-processing on the local machine"
59
60 def __init__(self, max_number_of_processes: Optional[int] = None):
61 """!
62 The default constructor.
63 - Initializes a list which holds a connection between Script-Objects
64 and their respective processes.
65 - Initialized a logger which writes to validate_basf2.py's log.
66 @param max_number_of_processes: The maximum number of processes
67 """
68
69
71 self.jobs_processes = {}
72
73
77 self.logger = logging.getLogger("validate_basf2")
78
79 # Parameter for maximal number of parallel processes, use system CPU
80 # count if not specified use the default of 10 if the cpu_count call
81 # is not supported on current platform
82 try:
83 if max_number_of_processes is None:
84 max_number_of_processes = multiprocessing.cpu_count()
85 self.logger.debug(
86 "Number of parallel processes has been set to CPU count"
87 )
88 self.max_number_of_processes = max_number_of_processes
89 except NotImplementedError:
90 self.logger.debug(
91 "Number of CPUs could not be determined, number of parallel "
92 "processes set to default value."
93 )
94 self.max_number_of_processes = 10
95
96 # noinspection PyUnresolvedReferences
97 self.logger.note(
98 f"Local job control will use {self.max_number_of_processes} "
99 f"parallel processes."
100 )
101
102
103 self.current_number_of_processes = 0
104
105 def available(self):
106 """!
107 Checks whether the number of current parallel processes is below the
108 limit and a new process can be started.
109 @return: True if a new process can be spawned, otherwise False
110 """
111
112 return (self.max_number_of_processes > 0) and (
113 self.current_number_of_processes < self.max_number_of_processes
114 )
115
116 def execute(self, job: Script, options="", dry=False, tag="current"):
117 """!
118 Takes a Script object and a string with options and runs it locally,
119 either with ROOT or with basf2, depending on the file type.
120
121 @param job: The steering file object that should be executed
122 @param options: Options that will be given to the basf2 command
123 @param dry: Whether to perform a dry run or not
124 @param tag: The name of the folder within the results directory
125 @return: None
126 """
127
128 # Remember current working directory (to make sure the cwd is the same
129 # after this function has finished)
130 cwd = os.getcwd()
131
132 # Define the folder in which the results (= the ROOT files) should be
133 # created. This is where the files containing plots will end up. By
134 # convention, data files will be stored in the parent dir.
135 # Then make sure the folder exists (create if it does not exist) and
136 # change to cwd to this folder.
138 cwd, tag, job.package
139 )
140 if not os.path.exists(output_dir):
141 os.makedirs(output_dir)
142 os.chdir(output_dir)
143
144 # fixme: Don't we need to close this later? /klieret
145 # Create a logfile for this job and make sure it's empty!
146 log = open(os.path.basename(job.path) + ".log", "w+")
147
148 # Now we need to distinguish between .py and .C files:
149 extension = os.path.splitext(job.path)[1]
150 if extension == ".C":
151 # .c files are executed with ROOT. No options available here.
152 params = ["root", "-b", "-q", job.path]
153 else:
154 # .py files are executed with basf2.
155 # 'options' contains an option-string for basf2, e.g. '-n 100 -p
156 # 8'. This string will be split on white-spaces and added to the
157 # params-list, since subprocess.Popen does not like strings...
159 job.path, options.split()
160 )
161
162 # Log the command we are about the execute
163 self.logger.debug(subprocess.list2cmdline(params))
164
165 # Spawn that new process which executes the command we just defined.
166 # Output of it will be written to the file defined above ('log').
167 # If we are performing a dry run, just start an empty process.
168 if dry:
169 params = ["echo", '"Performing a dry run!"']
170 process = subprocess.Popen(params, stdout=log, stderr=subprocess.STDOUT)
171
172 # Save the connection between the job and the given process-ID
173 self.jobs_processes[job] = process
174
175 # Increase the process counter
176 self.current_number_of_processes += 1
177
178 # Return to previous cwd
179 os.chdir(cwd)
180
181 def is_job_finished(self, job: Script):
182 """!
183 Checks if a given job has finished.
184
185 @param job: The job of which we want to know if it finished
186 @return: True if the job has finished, otherwise False
187 """
188
189 # Look which process belongs to the given job
190 process = self.jobs_processes[job]
191
192 # Check if the process has finished or not, and return that together
193 # with the return code / exit_status of the process.
194 if process.poll() is not None:
195 del self.jobs_processes[job]
196 self.current_number_of_processes = len(self.jobs_processes)
197 return [True, process.returncode]
198 else:
199 return [False, 0]
200
201 def terminate(self, job: Script):
202 """!
203 Terminate a running job
204 """
205 # look which process belongs to the given job
206 process = self.jobs_processes[job]
207
208 process.terminate()
209 del self.jobs_processes[job]
210 self.current_number_of_processes = len(self.jobs_processes)
211
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)
def get_results_tag_package_folder(output_base_dir, tag, package)