Belle II Software  release-05-01-25
localcontrol.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 # std
5 import logging
6 import os
7 import subprocess
8 import multiprocessing
9 from typing import Optional, Dict
10 
11 # ours
12 import validationpath
13 import validationfunctions
14 from validationscript import Script
15 
16 
17 class Local:
18  """!
19  A class that provides the controls for local multi-processing via the
20  subprocess-module. It provides two methods:
21  - is_job_finished(job): Returns True or False, depending on whether the job
22  has finished execution
23  - execute(job): Takes a job and executes it by spawning a new process
24 
25  @var jobs_processes: Map between jobs and the processes spawned for them
26  @var logger: Reference to the logging object
27  @var max_number_of_processes: The maximum number of parallel processes
28  @var current_number_of_processes: The number of processes currently running
29  """
30 
31  @staticmethod
32  def is_supported():
33  """
34  Local control is always supported
35  """
36  return True
37 
38  @staticmethod
39  def name():
40  """
41  Returns name of this job contol
42  """
43  return "local"
44 
45  @staticmethod
46  def description():
47  """
48  Returns description of this job control
49  """
50  return "Multi-processing on the local machine"
51 
52  def __init__(self, max_number_of_processes: Optional[int] = None):
53  """!
54  The default constructor.
55  - Initializes a list which holds a connection between Script-Objects
56  and their respective processes.
57  - Initialized a logger which writes to validate_basf2.py's log.
58 
59  @param max_number_of_processes: The maximum number of processes
60  """
61 
62 
64  self.jobs_processes = {} # type: Dict[Script, subprocess.Popen]
65 
66 
70  self.logger = logging.getLogger('validate_basf2')
71 
72  # Parameter for maximal number of parallel processes, use system CPU
73  # count if not specified use the default of 10 if the cpu_count call
74  # is not supported on current platform
75  try:
76  if max_number_of_processes is None:
77  max_number_of_processes = multiprocessing.cpu_count()
78  self.logger.debug(
79  "Number of parallel processes has been set to CPU count"
80  )
81  self.max_number_of_processes = max_number_of_processes
82  except NotImplementedError:
83  self.logger.debug(
84  "Number of CPUs could not be determined, number of parallel "
85  "processes set to default value."
86  )
87  self.max_number_of_processes = 10
88 
89  # noinspection PyUnresolvedReferences
90  self.logger.note(
91  f"Local job control will use {self.max_number_of_processes} "
92  f"parallel processes.")
93 
94 
95  self.current_number_of_processes = 0
96 
97  def available(self):
98  """!
99  Checks whether the number of current parallel processes is below the
100  limit and a new process can be started.
101  @return: True if a new process can be spawned, otherwise False
102  """
103 
104  return (self.max_number_of_processes > 0) and \
105  (self.current_number_of_processes < self.max_number_of_processes)
106 
107  def execute(self, job: Script, options='', dry=False, tag='current'):
108  """!
109  Takes a Script object and a string with options and runs it locally,
110  either with ROOT or with basf2, depending on the file type.
111 
112  @param job: The steering file object that should be executed
113  @param options: Options that will be given to the basf2 command
114  @param dry: Whether to perform a dry run or not
115  @param tag: The name of the folder within the results directory
116  @return: None
117  """
118 
119  # Remember current working directory (to make sure the cwd is the same
120  # after this function has finished)
121  cwd = os.getcwd()
122 
123  # Define the folder in which the results (= the ROOT files) should be
124  # created. This is where the files containing plots will end up. By
125  # convention, data files will be stored in the parent dir.
126  # Then make sure the folder exists (create if it does not exist) and
127  # change to cwd to this folder.
129  cwd, tag, job.package
130  )
131  if not os.path.exists(output_dir):
132  os.makedirs(output_dir)
133  os.chdir(output_dir)
134 
135  # fixme: Don't we need to close this later? /klieret
136  # Create a logfile for this job and make sure it's empty!
137  log = open(os.path.basename(job.path) + '.log', 'w+')
138 
139  # Now we need to distinguish between .py and .C files:
140  extension = os.path.splitext(job.path)[1]
141  if extension == '.C':
142  # .c files are executed with ROOT. No options available here.
143  params = ['root', '-b', '-q', job.path]
144  else:
145  # .py files are executed with basf2.
146  # 'options' contains an option-string for basf2, e.g. '-n 100 -p
147  # 8'. This string will be split on white-spaces and added to the
148  # params-list, since subprocess.Popen does not like strings...
150  job.path, options.split()
151  )
152 
153  # Log the command we are about the execute
154  self.logger.debug(subprocess.list2cmdline(params))
155 
156  # Spawn that new process which executes the command we just defined.
157  # Output of it will be written to the file defined above ('log').
158  # If we are performing a dry run, just start an empty process.
159  if dry:
160  params = ['echo', '"Performing a dry run!"']
161  process = subprocess.Popen(
162  params, stdout=log, stderr=subprocess.STDOUT
163  )
164 
165  # Save the connection between the job and the given process-ID
166  self.jobs_processes[job] = process
167 
168  # Increase the process counter
169  self.current_number_of_processes += 1
170 
171  # Return to previous cwd
172  os.chdir(cwd)
173 
174  def is_job_finished(self, job: Script):
175  """!
176  Checks if a given job has finished.
177 
178  @param job: The job of which we want to know if it finished
179  @return: True if the job has finished, otherwise False
180  """
181 
182  # Look which process belongs to the given job
183  process = self.jobs_processes[job]
184 
185  # Check if the process has finished or not, and return that together
186  # with the return code / exit_status of the process.
187  if process.poll() is not None:
188  del self.jobs_processes[job]
189  self.current_number_of_processes = len(self.jobs_processes)
190  return [True, process.returncode]
191  else:
192  return [False, 0]
193 
194  def terminate(self, job: Script):
195  """!
196  Terminate a running job
197  """
198  # look which process belongs to the given job
199  process = self.jobs_processes[job]
200 
201  process.terminate()
202  del self.jobs_processes[job]
203  self.current_number_of_processes = len(self.jobs_processes)
json_objects.Comparison.__init__
def __init__(self, revisions=None, packages=None)
Definition: json_objects.py:528
validationpath.get_results_tag_package_folder
def get_results_tag_package_folder(output_base_dir, tag, package)
Returns the absolute path for a tag and package.
Definition: validationpath.py:146
validationfunctions.basf2_command_builder
List[str] basf2_command_builder(str steering_file, List[str] parameters, use_multi_processing=False)
Definition: validationfunctions.py:63