Belle II Software development
clean_execution.py
1
8import atexit
9import signal
10import subprocess
11from time import sleep, time
12import os
13import basf2
14
15
17 """
18 Helper class to call a given (basf2) command via subprocess
19 and make sure the process is killed properly once a SIGINT or SIGTERM signal is
20 send to the main process.
21 To do this, the basf2 command is started in a new session group, so all child processes
22 of the basf2 command will also be killed.
23
24 When the main process receives a termination request via an SIGINT or SIGTERM,
25 a SIGINT is sent to the started basf2 process.
26 If the process is still alive after a given timeout (10 s by default),
27 it is killed via SIGKILL and all its started child forks with it.
28 After a normal or abnormal termination, the run() function returns the exit code
29 and cleanup can happen afterwards.
30
31 ATTENTION: In some rare cases, e.g. when the terminate request happens during a syscall,
32 the process can not be stopped (see uninterruptable sleep process state, e.g. in
33 https://stackoverflow.com/questions/223644/what-is-an-uninterruptable-process).
34 In those cases, even a KILL signal does not help!
35
36 The class can be used in a typical main method, e.g.
37
38 from hlt.clean_execution import CleanBasf2Execution
39
40 if __name__ == "__main__":
41 execution = CleanBasf2Execution()
42 try:
43 execution.start(["basf2", "run.py"])
44 execution.wait()
45 finally:
46 # Make sure to always do the cleanup, also in case of errors
47 print("Do cleanup")
48
49 """
50
51 def __init__(self, timeout=10):
52 """
53 Create a new execution with the given parameters (list of arguments)
54 """
55
57
59
60 self.timeout = timeout
61
62 def start(self, command):
63 """
64 Add the execution and terminate gracefully/hard if requested via signal.
65 """
66 basf2.B2INFO("Starting ", command)
67 process = subprocess.Popen(command, start_new_session=True)
68 pgid = os.getpgid(process.pid)
69 if pgid != process.pid:
70 basf2.B2WARNING("Subprocess is not session leader. Weird")
71
73 self._handled_processes.append(process)
74 self._handled_commands.append(command)
75
76 def wait(self):
77 """
78 Wait until all handled calculations have finished.
79 """
80 while True:
81 for command, process in zip(self._handled_commands, self._handled_processes):
82 if self.has_process_ended(process):
83 returncode = process.returncode
84 basf2.B2INFO("The process ", command, " died with ", returncode,
85 ". Killing the remaining ones.")
86 self.kill()
87 return returncode
88 sleep(1)
89
90 def signal_handler(self, signal_number, signal_frame):
91 """
92 The signal handler called on SIGINT and SIGTERM.
93 """
94 self.kill()
95
96 def kill(self):
97 """
98 Clean or hard shutdown of all processes.
99 It tries to kill the process gracefully but if it does not react after a certain time,
100 it kills it with a SIGKILL.
101 """
102 if not self._handled_processes:
103 basf2.B2WARNING("Signal handler called without started process. This normally means, something is wrong!")
104 return
105
106 basf2.B2INFO("Termination requested...")
107
108 # Make sure this signal handle is not called more than once
109 signal.signal(signal.SIGINT, signal.SIG_IGN)
110 signal.signal(signal.SIGTERM, signal.SIG_IGN)
111
112 try:
113 # Send a graceful stop signal to the process and give it some time to react
114 for process in self._handled_processes:
115 try:
116 os.killpg(process.pid, signal.SIGINT)
117 process.poll()
118 except ProcessLookupError:
119 # The process is already gone! Nice
120 pass
121
122 # Give the process some time to react
123 if not self.wait_for_process(timeout=self.timeout):
124 basf2.B2WARNING("Process did not react in time. Sending a SIGKILL.")
125 finally:
126 # In any case: kill the process
127 for process in self._handled_processes:
128 try:
129 os.killpg(process.pid, signal.SIGKILL)
130 if not self.wait_for_process(timeout=10, process_list=[process]):
131 backtrace = subprocess.check_output(["gdb", "-q", "-batch", "-ex", "backtrace", "basf2",
132 str(process.pid)]).decode()
133 basf2.B2ERROR("Could not end the process event with a KILL signal. "
134 "This can happen because it is in the uninterruptable sleep state. "
135 "I can not do anything about this!",
136 backtrace=backtrace)
137 except ProcessLookupError:
138 # The process is already gone! Nice
139 pass
140 basf2.B2INFO("...Process stopped")
141
142 # And reinstall the signal handlers
144
145 def wait_for_process(self, process_list=None, timeout=None, minimum_delay=1):
146 """
147 Wait maximum "timeout" for the process to stop.
148 If it did not end in this period, returns False.
149 """
150 if process_list is None:
151 process_list = self._handled_processes
152
153 if timeout is None:
154 timeout = self.timeout
155
156 endtime = time() + timeout
157 while True:
158 all_finished = all(self.has_process_ended(process) for process in process_list)
159 if all_finished:
160 return True
161
162 remaining = endtime - time()
163 if remaining <= 0:
164 return False
165
166 sleep(minimum_delay)
167
169 """
170 Set the signal handlers for SIGINT and SIGTERM to out own one.
171 """
172 signal.signal(signal.SIGINT, self.signal_handler)
173 signal.signal(signal.SIGTERM, self.signal_handler)
174 # Just for safety, also register an exit handler
175 atexit.unregister(self.signal_handler)
176 atexit.register(self.signal_handler, signal.SIGTERM, None)
177
178 @staticmethod
179 def has_process_ended(process):
180 """
181 Check if the handled process has ended already.
182 This functions does not wait.
183
184 I would rather use self._handled_process.wait() or poll()
185 which does exactly the same.
186 However: the main process is also waiting for the return code
187 so the threading.lock in the .wait() function will never aquire a lock :-(
188 """
189 pid, sts = process._try_wait(os.WNOHANG)
190 assert pid == process.pid or pid == 0
191
192 process._handle_exitstatus(sts)
193
194 if pid == process.pid:
195 return True
196
197 return False
def wait_for_process(self, process_list=None, timeout=None, minimum_delay=1)
_handled_processes
The processes handled by this class.
def signal_handler(self, signal_number, signal_frame)
_handled_commands
The commands related to the processes.
timeout
Maximum time the process is allowed to stay alive after SIGTERM has been sent.
Definition: __init__.py:1
Definition: main.py:1