Belle II Software  release-08-01-10
clean_execution.py
1 
8 import atexit
9 import signal
10 import subprocess
11 from time import sleep, time
12 import os
13 import basf2
14 
15 
17  """
18  Helper class to call a given (basf2) command via subprocess
19  and make sure the process is killed properly once a SIGINT or SIGTERM signal is
20  send to the main process.
21  To do this, the basf2 command is started in a new session group, so all child processes
22  of the basf2 command will also be killed.
23 
24  When the main process receives a termination request via an SIGINT or SIGTERM,
25  a SIGINT is sent to the started basf2 process.
26  If the process is still alive after a given timeout (10 s by default),
27  it is killed via SIGKILL and all its started child forks with it.
28  After a normal or abnormal termination, the run() function returns the exit code
29  and cleanup can happen afterwards.
30 
31  ATTENTION: In some rare cases, e.g. when the terminate request happens during a syscall,
32  the process can not be stopped (see uninterruptable sleep process state, e.g. in
33  https://stackoverflow.com/questions/223644/what-is-an-uninterruptable-process).
34  In those cases, even a KILL signal does not help!
35 
36  The class can be used in a typical main method, e.g.
37 
38  from hlt.clean_execution import CleanBasf2Execution
39 
40  if __name__ == "__main__":
41  execution = CleanBasf2Execution()
42  try:
43  execution.start(["basf2", "run.py"])
44  execution.wait()
45  finally:
46  # Make sure to always do the cleanup, also in case of errors
47  print("Do cleanup")
48 
49  """
50 
51  def __init__(self, timeout=10):
52  """
53  Create a new execution with the given parameters (list of arguments)
54  """
55 
56  self._handled_processes_handled_processes = []
57 
58  self._handled_commands_handled_commands = []
59 
60  self.timeouttimeout = timeout
61 
62  def start(self, command):
63  """
64  Add the execution and terminate gracefully/hard if requested via signal.
65  """
66  basf2.B2INFO("Starting ", command)
67  process = subprocess.Popen(command, start_new_session=True)
68  pgid = os.getpgid(process.pid)
69  if pgid != process.pid:
70  basf2.B2WARNING("Subprocess is not session leader. Weird")
71 
72  self.install_signal_handlerinstall_signal_handler()
73  self._handled_processes_handled_processes.append(process)
74  self._handled_commands_handled_commands.append(command)
75 
76  def wait(self):
77  """
78  Wait until all handled calculations have finished.
79  """
80  while True:
81  for command, process in zip(self._handled_commands_handled_commands, self._handled_processes_handled_processes):
82  if self.has_process_endedhas_process_ended(process):
83  returncode = process.returncode
84  basf2.B2INFO("The process ", command, " died with ", returncode,
85  ". Killing the remaining ones.")
86  self.killkill()
87  return returncode
88  sleep(1)
89 
90  def signal_handler(self, signal_number, signal_frame):
91  """
92  The signal handler called on SIGINT and SIGTERM.
93  """
94  self.killkill()
95 
96  def kill(self):
97  """
98  Clean or hard shutdown of all processes.
99  It tries to kill the process gracefully but if it does not react after a certain time,
100  it kills it with a SIGKILL.
101  """
102  if not self._handled_processes_handled_processes:
103  basf2.B2WARNING("Signal handler called without started process. This normally means, something is wrong!")
104  return
105 
106  basf2.B2INFO("Termination requested...")
107 
108  # Make sure this signal handle is not called more than once
109  signal.signal(signal.SIGINT, signal.SIG_IGN)
110  signal.signal(signal.SIGTERM, signal.SIG_IGN)
111 
112  try:
113  # Send a graceful stop signal to the process and give it some time to react
114  for process in self._handled_processes_handled_processes:
115  try:
116  os.killpg(process.pid, signal.SIGINT)
117  process.poll()
118  except ProcessLookupError:
119  # The process is already gone! Nice
120  pass
121 
122  # Give the process some time to react
123  if not self.wait_for_processwait_for_process(timeout=self.timeouttimeout):
124  basf2.B2WARNING("Process did not react in time. Sending a SIGKILL.")
125  finally:
126  # In any case: kill the process
127  for process in self._handled_processes_handled_processes:
128  try:
129  os.killpg(process.pid, signal.SIGKILL)
130  if not self.wait_for_processwait_for_process(timeout=10, process_list=[process]):
131  backtrace = subprocess.check_output(["gdb", "-q", "-batch", "-ex", "backtrace", "basf2",
132  str(process.pid)]).decode()
133  basf2.B2ERROR("Could not end the process event with a KILL signal. "
134  "This can happen because it is in the uninterruptable sleep state. "
135  "I can not do anything about this!",
136  backtrace=backtrace)
137  except ProcessLookupError:
138  # The process is already gone! Nice
139  pass
140  basf2.B2INFO("...Process stopped")
141 
142  # And reinstall the signal handlers
143  self.install_signal_handlerinstall_signal_handler()
144 
145  def wait_for_process(self, process_list=None, timeout=None, minimum_delay=1):
146  """
147  Wait maximum "timeout" for the process to stop.
148  If it did not end in this period, returns False.
149  """
150  if process_list is None:
151  process_list = self._handled_processes_handled_processes
152 
153  if timeout is None:
154  timeout = self.timeouttimeout
155 
156  endtime = time() + timeout
157  while True:
158  all_finished = all(self.has_process_endedhas_process_ended(process) for process in process_list)
159  if all_finished:
160  return True
161 
162  remaining = endtime - time()
163  if remaining <= 0:
164  return False
165 
166  sleep(minimum_delay)
167 
169  """
170  Set the signal handlers for SIGINT and SIGTERM to out own one.
171  """
172  signal.signal(signal.SIGINT, self.signal_handlersignal_handler)
173  signal.signal(signal.SIGTERM, self.signal_handlersignal_handler)
174  # Just for safety, also register an exit handler
175  atexit.unregister(self.signal_handlersignal_handler)
176  atexit.register(self.signal_handlersignal_handler, signal.SIGTERM, None)
177 
178  @staticmethod
179  def has_process_ended(process):
180  """
181  Check if the handled process has ended already.
182  This functions does not wait.
183 
184  I would rather use self._handled_process.wait() or poll()
185  which does exactly the same.
186  However: the main process is also waiting for the return code
187  so the threading.lock in the .wait() function will never aquire a lock :-(
188  """
189  pid, sts = process._try_wait(os.WNOHANG)
190  assert pid == process.pid or pid == 0
191 
192  process._handle_exitstatus(sts)
193 
194  if pid == process.pid:
195  return True
196 
197  return False
def wait_for_process(self, process_list=None, timeout=None, minimum_delay=1)
_handled_processes
The processes handled by this class.
def signal_handler(self, signal_number, signal_frame)
_handled_commands
The commands related to the processes.
timeout
Maximum time the process is allowed to stay alive after SIGTERM has been sent.