Belle II Software  release-05-02-19
clean_execution.py
1 import atexit
2 import signal
3 import subprocess
4 from time import sleep, time
5 import os
6 import basf2
7 
8 
10  """
11  Helper class to call a given (basf2) command via subprocess
12  and make sure the process is killed properly once a SIGINT or SIGTERM signal is
13  send to the main process.
14  To do this, the basf2 command is started in a new session group, so all child processes
15  of the basf2 command will also be killed.
16 
17  When the main process receives a termination request via an SIGINT or SIGTERM,
18  a SIGINT is sent to the started basf2 process.
19  If the process is still alive after a given timeout (10 s by default),
20  it is killed via SIGKILL and all its started child forks with it.
21  After a normal or abnormal termination, the run() function returns the exit code
22  and cleanup can happen afterwards.
23 
24  ATTENTION: In some rare cases, e.g. when the terminate request happens during a syscall,
25  the process can not be stopped (see uninterruptable sleep process state, e.g. in
26  https://stackoverflow.com/questions/223644/what-is-an-uninterruptable-process).
27  In those cases, even a KILL signal does not help!
28 
29  The class can be used in a typical main method, e.g.
30 
31  from hlt.clean_execution import CleanBasf2Execution
32 
33  if __name__ == "__main__":
34  execution = CleanBasf2Execution()
35  try:
36  execution.start(["basf2", "run.py"])
37  execution.wait()
38  finally:
39  # Make sure to always do the cleanup, also in case of errors
40  print("Do cleanup")
41 
42  """
43 
44  def __init__(self, timeout=10):
45  """
46  Create a new execution with the given parameters (list of arguments)
47  """
48 
50 
52 
53  self.timeout = timeout
54 
55  def start(self, command):
56  """
57  Add the execution and terminate gracefully/hard if requested via signal.
58  """
59  basf2.B2INFO("Starting ", command)
60  process = subprocess.Popen(command, start_new_session=True)
61  pgid = os.getpgid(process.pid)
62  if pgid != process.pid:
63  basf2.B2WARNING("Subprocess is not session leader. Weird")
64 
66  self._handled_processes.append(process)
67  self._handled_commands.append(command)
68 
69  def wait(self):
70  """
71  Wait until all handled calculations have finished.
72  """
73  while True:
74  for command, process in zip(self._handled_commands, self._handled_processes):
75  if self.has_process_ended(process):
76  returncode = process.returncode
77  basf2.B2INFO("The process ", command, " died with ", returncode,
78  ". Killing the remaining ones.")
79  self.kill()
80  return returncode
81  sleep(1)
82 
83  def signal_handler(self, signal_number, signal_frame):
84  """
85  The signal handler called on SIGINT and SIGTERM.
86  """
87  self.kill()
88 
89  def kill(self):
90  """
91  Clean or hard shutdown of all processes.
92  It tries to kill the process gracefully but if it does not react after a certain time,
93  it kills it with a SIGKILL.
94  """
95  if not self._handled_processes:
96  basf2.B2WARNING("Signal handler called without started process. This normally means, something is wrong!")
97  return
98 
99  basf2.B2INFO("Termination requested...")
100 
101  # Make sure this signal handle is not called more than once
102  signal.signal(signal.SIGINT, signal.SIG_IGN)
103  signal.signal(signal.SIGTERM, signal.SIG_IGN)
104 
105  try:
106  # Send a graceful stop signal to the process and give it some time to react
107  for process in self._handled_processes:
108  try:
109  os.killpg(process.pid, signal.SIGINT)
110  process.poll()
111  except ProcessLookupError:
112  # The process is already gone! Nice
113  pass
114 
115  # Give the process some time to react
116  if not self.wait_for_process(timeout=self.timeout):
117  basf2.B2WARNING("Process did not react in time. Sending a SIGKILL.")
118  finally:
119  # In any case: kill the process
120  for process in self._handled_processes:
121  try:
122  os.killpg(process.pid, signal.SIGKILL)
123  if not self.wait_for_process(timeout=10, process_list=[process]):
124  backtrace = subprocess.check_output(["gdb", "-q", "-batch", "-ex", "backtrace", "basf2",
125  str(process.pid)]).decode()
126  basf2.B2ERROR("Could not end the process event with a KILL signal. "
127  "This can happen because it is in the uninterruptable sleep state. "
128  "I can not do anything about this!",
129  backtrace=backtrace)
130  except ProcessLookupError:
131  # The process is already gone! Nice
132  pass
133  basf2.B2INFO("...Process stopped")
134 
135  # And reinstall the signal handlers
137 
138  def wait_for_process(self, process_list=None, timeout=None, minimum_delay=1):
139  """
140  Wait maximum "timeout" for the process to stop.
141  If it did not end in this period, returns False.
142  """
143  if process_list is None:
144  process_list = self._handled_processes
145 
146  if timeout is None:
147  timeout = self.timeout
148 
149  endtime = time() + timeout
150  while True:
151  all_finished = all(self.has_process_ended(process) for process in process_list)
152  if all_finished:
153  return True
154 
155  remaining = endtime - time()
156  if remaining <= 0:
157  return False
158 
159  sleep(minimum_delay)
160 
162  """
163  Set the signal handlers for SIGINT and SIGTERM to out own one.
164  """
165  signal.signal(signal.SIGINT, self.signal_handler)
166  signal.signal(signal.SIGTERM, self.signal_handler)
167  # Just for safety, also register an exit handler
168  atexit.unregister(self.signal_handler)
169  atexit.register(self.signal_handler, signal.SIGTERM, None)
170 
171  @staticmethod
172  def has_process_ended(process):
173  """
174  Check if the handled process has ended already.
175  This functions does not wait.
176 
177  I would rather use self._handled_process.wait() or poll()
178  which does exactly the same.
179  However: the main process is also waiting for the return code
180  so the threading.lock in the .wait() function will never aquire a lock :-(
181  """
182  pid, sts = process._try_wait(os.WNOHANG)
183  assert pid == process.pid or pid == 0
184 
185  process._handle_exitstatus(sts)
186 
187  if pid == process.pid:
188  return True
189 
190  return False
hlt.clean_execution.CleanBasf2Execution.has_process_ended
def has_process_ended(process)
Definition: clean_execution.py:172
hlt.clean_execution.CleanBasf2Execution.signal_handler
def signal_handler(self, signal_number, signal_frame)
Definition: clean_execution.py:83
hlt.clean_execution.CleanBasf2Execution._handled_processes
_handled_processes
The processes handled by this class.
Definition: clean_execution.py:49
hlt.clean_execution.CleanBasf2Execution.kill
def kill(self)
Definition: clean_execution.py:89
hlt.clean_execution.CleanBasf2Execution.start
def start(self, command)
Definition: clean_execution.py:55
hlt.clean_execution.CleanBasf2Execution._handled_commands
_handled_commands
The commands related to the processes.
Definition: clean_execution.py:51
hlt.clean_execution.CleanBasf2Execution
Definition: clean_execution.py:9
hlt.clean_execution.CleanBasf2Execution.wait_for_process
def wait_for_process(self, process_list=None, timeout=None, minimum_delay=1)
Definition: clean_execution.py:138
hlt.clean_execution.CleanBasf2Execution.install_signal_handler
def install_signal_handler(self)
Definition: clean_execution.py:161
hlt.clean_execution.CleanBasf2Execution.timeout
timeout
Maximum time the process is allowed to stay alive after SIGTERM has been sent.
Definition: clean_execution.py:53
hlt.clean_execution.CleanBasf2Execution.__init__
def __init__(self, timeout=10)
Definition: clean_execution.py:44
hlt.clean_execution.CleanBasf2Execution.wait
def wait(self)
Definition: clean_execution.py:69