Belle II Software development
localcontrol.py
1#!/usr/bin/env python3
2
3
10
11# std
12import logging
13import os
14import subprocess
15import multiprocessing
16from typing import Optional
17
18# ours
19import validationpath
20import validationfunctions
21from validationscript import Script
22
23
24class Local:
25 """!
26 A class that provides the controls for local multi-processing via the
27 subprocess-module. It provides two methods:
28 - is_job_finished(job): Returns True or False, depending on whether the job
29 has finished execution
30 - execute(job): Takes a job and executes it by spawning a new process
31
32 @var jobs_processes: Map between jobs and the processes spawned for them
33 @var logger: Reference to the logging object
34 @var max_number_of_processes: The maximum number of parallel processes
35 @var current_number_of_processes: The number of processes currently running
36 @type jobs_processes: Dict[Script, subprocess.Popen]
37 """
38
39 @staticmethod
40 def is_supported():
41 """
42 Local control is always supported
43 """
44 return True
45
46 @staticmethod
47 def name():
48 """
49 Returns name of this job control
50 """
51 return "local"
52
53 @staticmethod
54 def description():
55 """
56 Returns description of this job control
57 """
58 return "Multi-processing on the local machine"
59
60 def __init__(self, max_number_of_processes: Optional[int] = None):
61 """!
62 The default constructor.
63 - Initializes a list which holds a connection between Script-Objects
64 and their respective processes.
65 - Initialized a logger which writes to validate_basf2.py's log.
66
67 @param max_number_of_processes: The maximum number of processes
68 """
69
70
72 self.jobs_processes = {}
73
74
78 self.logger = logging.getLogger("validate_basf2")
79
80 # Parameter for maximal number of parallel processes, use system CPU
81 # count if not specified use the default of 10 if the cpu_count call
82 # is not supported on current platform
83 try:
84 if max_number_of_processes is None:
85 max_number_of_processes = multiprocessing.cpu_count()
86 self.logger.debug(
87 "Number of parallel processes has been set to CPU count"
88 )
89 self.max_number_of_processes = max_number_of_processes
90 except NotImplementedError:
91 self.logger.debug(
92 "Number of CPUs could not be determined, number of parallel "
93 "processes set to default value."
94 )
95 self.max_number_of_processes = 10
96
97 # noinspection PyUnresolvedReferences
98 self.logger.note(
99 f"Local job control will use {self.max_number_of_processes} "
100 f"parallel processes."
101 )
102
103
104 self.current_number_of_processes = 0
105
106 def available(self):
107 """!
108 Checks whether the number of current parallel processes is below the
109 limit and a new process can be started.
110 @return: True if a new process can be spawned, otherwise False
111 """
112
113 return (self.max_number_of_processes > 0) and (
114 self.current_number_of_processes < self.max_number_of_processes
115 )
116
117 def execute(self, job: Script, options="", dry=False, tag="current"):
118 """!
119 Takes a Script object and a string with options and runs it locally,
120 either with ROOT or with basf2, depending on the file type.
121
122 @param job: The steering file object that should be executed
123 @param options: Options that will be given to the basf2 command
124 @param dry: Whether to perform a dry run or not
125 @param tag: The name of the folder within the results directory
126 @return: None
127 """
128
129 # Remember current working directory (to make sure the cwd is the same
130 # after this function has finished)
131 cwd = os.getcwd()
132
133 # Define the folder in which the results (= the ROOT files) should be
134 # created. This is where the files containing plots will end up. By
135 # convention, data files will be stored in the parent dir.
136 # Then make sure the folder exists (create if it does not exist) and
137 # change to cwd to this folder.
139 cwd, tag, job.package
140 )
141 if not os.path.exists(output_dir):
142 os.makedirs(output_dir)
143 os.chdir(output_dir)
144
145 # fixme: Don't we need to close this later? /klieret
146 # Create a logfile for this job and make sure it's empty!
147 log = open(os.path.basename(job.path) + ".log", "w+")
148
149 # Now we need to distinguish between .py and .C files:
150 extension = os.path.splitext(job.path)[1]
151 if extension == ".C":
152 # .c files are executed with ROOT. No options available here.
153 params = ["root", "-b", "-q", job.path]
154 else:
155 # .py files are executed with basf2.
156 # 'options' contains an option-string for basf2, e.g. '-n 100 -p
157 # 8'. This string will be split on white-spaces and added to the
158 # params-list, since subprocess.Popen does not like strings...
160 job.path, options.split()
161 )
162
163 # Log the command we are about the execute
164 self.logger.debug(subprocess.list2cmdline(params))
165
166 # Spawn that new process which executes the command we just defined.
167 # Output of it will be written to the file defined above ('log').
168 # If we are performing a dry run, just start an empty process.
169 if dry:
170 params = ["echo", '"Performing a dry run!"']
171 process = subprocess.Popen(params, stdout=log, stderr=subprocess.STDOUT)
172
173 # Save the connection between the job and the given process-ID
174 self.jobs_processes[job] = process
175
176 # Increase the process counter
177 self.current_number_of_processes += 1
178
179 # Return to previous cwd
180 os.chdir(cwd)
181
182 def is_job_finished(self, job: Script):
183 """!
184 Checks if a given job has finished.
185
186 @param job: The job of which we want to know if it finished
187 @return: True if the job has finished, otherwise False
188 """
189
190 # Look which process belongs to the given job
191 process = self.jobs_processes[job]
192
193 # Check if the process has finished or not, and return that together
194 # with the return code / exit_status of the process.
195 if process.poll() is not None:
196 del self.jobs_processes[job]
197 self.current_number_of_processes = len(self.jobs_processes)
198 return [True, process.returncode]
199 else:
200 return [False, 0]
201
202 def terminate(self, job: Script):
203 """!
204 Terminate a running job
205 """
206 # look which process belongs to the given job
207 process = self.jobs_processes[job]
208
209 process.terminate()
210 del self.jobs_processes[job]
211 self.current_number_of_processes = len(self.jobs_processes)
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)
get_results_tag_package_folder(output_base_dir, tag, package)