Belle II Software  release-08-01-10
localcontrol.py
1 #!/usr/bin/env python3
2 
3 
10 
11 # std
12 import logging
13 import os
14 import subprocess
15 import multiprocessing
16 from typing import Optional, Dict
17 
18 # ours
19 import validationpath
20 import validationfunctions
21 from validationscript import Script
22 
23 
24 class Local:
25  """!
26  A class that provides the controls for local multi-processing via the
27  subprocess-module. It provides two methods:
28  - is_job_finished(job): Returns True or False, depending on whether the job
29  has finished execution
30  - execute(job): Takes a job and executes it by spawning a new process
31 
32  @var jobs_processes: Map between jobs and the processes spawned for them
33  @var logger: Reference to the logging object
34  @var max_number_of_processes: The maximum number of parallel processes
35  @var current_number_of_processes: The number of processes currently running
36  """
37 
38  @staticmethod
39  def is_supported():
40  """
41  Local control is always supported
42  """
43  return True
44 
45  @staticmethod
46  def name():
47  """
48  Returns name of this job contol
49  """
50  return "local"
51 
52  @staticmethod
53  def description():
54  """
55  Returns description of this job control
56  """
57  return "Multi-processing on the local machine"
58 
59  def __init__(self, max_number_of_processes: Optional[int] = None):
60  """!
61  The default constructor.
62  - Initializes a list which holds a connection between Script-Objects
63  and their respective processes.
64  - Initialized a logger which writes to validate_basf2.py's log.
65 
66  @param max_number_of_processes: The maximum number of processes
67  """
68 
69 
71  self.jobs_processes: Dict[Script, subprocess.Popen] = {}
72 
73 
77  self.logger = logging.getLogger("validate_basf2")
78 
79  # Parameter for maximal number of parallel processes, use system CPU
80  # count if not specified use the default of 10 if the cpu_count call
81  # is not supported on current platform
82  try:
83  if max_number_of_processes is None:
84  max_number_of_processes = multiprocessing.cpu_count()
85  self.logger.debug(
86  "Number of parallel processes has been set to CPU count"
87  )
88  self.max_number_of_processes = max_number_of_processes
89  except NotImplementedError:
90  self.logger.debug(
91  "Number of CPUs could not be determined, number of parallel "
92  "processes set to default value."
93  )
94  self.max_number_of_processes = 10
95 
96  # noinspection PyUnresolvedReferences
97  self.logger.note(
98  f"Local job control will use {self.max_number_of_processes} "
99  f"parallel processes."
100  )
101 
102 
103  self.current_number_of_processes = 0
104 
105  def available(self):
106  """!
107  Checks whether the number of current parallel processes is below the
108  limit and a new process can be started.
109  @return: True if a new process can be spawned, otherwise False
110  """
111 
112  return (self.max_number_of_processes > 0) and (
113  self.current_number_of_processes < self.max_number_of_processes
114  )
115 
116  def execute(self, job: Script, options="", dry=False, tag="current"):
117  """!
118  Takes a Script object and a string with options and runs it locally,
119  either with ROOT or with basf2, depending on the file type.
120 
121  @param job: The steering file object that should be executed
122  @param options: Options that will be given to the basf2 command
123  @param dry: Whether to perform a dry run or not
124  @param tag: The name of the folder within the results directory
125  @return: None
126  """
127 
128  # Remember current working directory (to make sure the cwd is the same
129  # after this function has finished)
130  cwd = os.getcwd()
131 
132  # Define the folder in which the results (= the ROOT files) should be
133  # created. This is where the files containing plots will end up. By
134  # convention, data files will be stored in the parent dir.
135  # Then make sure the folder exists (create if it does not exist) and
136  # change to cwd to this folder.
138  cwd, tag, job.package
139  )
140  if not os.path.exists(output_dir):
141  os.makedirs(output_dir)
142  os.chdir(output_dir)
143 
144  # fixme: Don't we need to close this later? /klieret
145  # Create a logfile for this job and make sure it's empty!
146  log = open(os.path.basename(job.path) + ".log", "w+")
147 
148  # Now we need to distinguish between .py and .C files:
149  extension = os.path.splitext(job.path)[1]
150  if extension == ".C":
151  # .c files are executed with ROOT. No options available here.
152  params = ["root", "-b", "-q", job.path]
153  else:
154  # .py files are executed with basf2.
155  # 'options' contains an option-string for basf2, e.g. '-n 100 -p
156  # 8'. This string will be split on white-spaces and added to the
157  # params-list, since subprocess.Popen does not like strings...
159  job.path, options.split()
160  )
161 
162  # Log the command we are about the execute
163  self.logger.debug(subprocess.list2cmdline(params))
164 
165  # Spawn that new process which executes the command we just defined.
166  # Output of it will be written to the file defined above ('log').
167  # If we are performing a dry run, just start an empty process.
168  if dry:
169  params = ["echo", '"Performing a dry run!"']
170  process = subprocess.Popen(params, stdout=log, stderr=subprocess.STDOUT)
171 
172  # Save the connection between the job and the given process-ID
173  self.jobs_processes[job] = process
174 
175  # Increase the process counter
176  self.current_number_of_processes += 1
177 
178  # Return to previous cwd
179  os.chdir(cwd)
180 
181  def is_job_finished(self, job: Script):
182  """!
183  Checks if a given job has finished.
184 
185  @param job: The job of which we want to know if it finished
186  @return: True if the job has finished, otherwise False
187  """
188 
189  # Look which process belongs to the given job
190  process = self.jobs_processes[job]
191 
192  # Check if the process has finished or not, and return that together
193  # with the return code / exit_status of the process.
194  if process.poll() is not None:
195  del self.jobs_processes[job]
196  self.current_number_of_processes = len(self.jobs_processes)
197  return [True, process.returncode]
198  else:
199  return [False, 0]
200 
201  def terminate(self, job: Script):
202  """!
203  Terminate a running job
204  """
205  # look which process belongs to the given job
206  process = self.jobs_processes[job]
207 
208  process.terminate()
209  del self.jobs_processes[job]
210  self.current_number_of_processes = len(self.jobs_processes)
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)
def get_results_tag_package_folder(output_base_dir, tag, package)