Belle II Software development
CleanBasf2Execution Class Reference

Public Member Functions

def __init__ (self, timeout=10)
 
def start (self, command)
 
def wait (self)
 
def signal_handler (self, signal_number, signal_frame)
 
def kill (self)
 
def wait_for_process (self, process_list=None, timeout=None, minimum_delay=1)
 
def install_signal_handler (self)
 

Static Public Member Functions

def has_process_ended (process)
 

Public Attributes

 timeout
 Maximum time the process is allowed to stay alive after SIGTERM has been sent.
 

Protected Attributes

 _handled_processes
 The processes handled by this class.
 
 _handled_commands
 The commands related to the processes.
 

Detailed Description

Helper class to call a given (basf2) command via subprocess
and make sure the process is killed properly once a SIGINT or SIGTERM signal is
send to the main process.
To do this, the basf2 command is started in a new session group, so all child processes
of the basf2 command will also be killed.

When the main process receives a termination request via an SIGINT or SIGTERM,
a SIGINT is sent to the started basf2 process.
If the process is still alive after a given timeout (10 s by default),
it is killed via SIGKILL and all its started child forks with it.
After a normal or abnormal termination, the run() function returns the exit code
and cleanup can happen afterwards.

ATTENTION: In some rare cases, e.g. when the terminate request happens during a syscall,
the process can not be stopped (see uninterruptable sleep process state, e.g. in
https://stackoverflow.com/questions/223644/what-is-an-uninterruptable-process).
In those cases, even a KILL signal does not help!

The class can be used in a typical main method, e.g.

    from hlt.clean_execution import CleanBasf2Execution

    if __name__ == "__main__":
        execution = CleanBasf2Execution()
        try:
            execution.start(["basf2", "run.py"])
            execution.wait()
        finally:
            # Make sure to always do the cleanup, also in case of errors
            print("Do cleanup")

Definition at line 16 of file clean_execution.py.

Constructor & Destructor Documentation

◆ __init__()

def __init__ (   self,
  timeout = 10 
)
Create a new execution with the given parameters (list of arguments)

Definition at line 51 of file clean_execution.py.

51 def __init__(self, timeout=10):
52 """
53 Create a new execution with the given parameters (list of arguments)
54 """
55
56 self._handled_processes = []
57
58 self._handled_commands = []
59
60 self.timeout = timeout
61

Member Function Documentation

◆ has_process_ended()

def has_process_ended (   process)
static
Check if the handled process has ended already.
This functions does not wait.

I would rather use self._handled_process.wait() or poll()
which does exactly the same.
However: the main process is also waiting for the return code
so the threading.lock in the .wait() function will never aquire a lock :-(

Definition at line 179 of file clean_execution.py.

179 def has_process_ended(process):
180 """
181 Check if the handled process has ended already.
182 This functions does not wait.
183
184 I would rather use self._handled_process.wait() or poll()
185 which does exactly the same.
186 However: the main process is also waiting for the return code
187 so the threading.lock in the .wait() function will never aquire a lock :-(
188 """
189 pid, sts = process._try_wait(os.WNOHANG)
190 assert pid == process.pid or pid == 0
191
192 process._handle_exitstatus(sts)
193
194 if pid == process.pid:
195 return True
196
197 return False

◆ install_signal_handler()

def install_signal_handler (   self)
Set the signal handlers for SIGINT and SIGTERM to out own one.

Definition at line 168 of file clean_execution.py.

168 def install_signal_handler(self):
169 """
170 Set the signal handlers for SIGINT and SIGTERM to out own one.
171 """
172 signal.signal(signal.SIGINT, self.signal_handler)
173 signal.signal(signal.SIGTERM, self.signal_handler)
174 # Just for safety, also register an exit handler
175 atexit.unregister(self.signal_handler)
176 atexit.register(self.signal_handler, signal.SIGTERM, None)
177

◆ kill()

def kill (   self)
Clean or hard shutdown of all processes.
It tries to kill the process gracefully but if it does not react after a certain time,
it kills it with a SIGKILL.

Definition at line 96 of file clean_execution.py.

96 def kill(self):
97 """
98 Clean or hard shutdown of all processes.
99 It tries to kill the process gracefully but if it does not react after a certain time,
100 it kills it with a SIGKILL.
101 """
102 if not self._handled_processes:
103 basf2.B2WARNING("Signal handler called without started process. This normally means, something is wrong!")
104 return
105
106 basf2.B2INFO("Termination requested...")
107
108 # Make sure this signal handle is not called more than once
109 signal.signal(signal.SIGINT, signal.SIG_IGN)
110 signal.signal(signal.SIGTERM, signal.SIG_IGN)
111
112 try:
113 # Send a graceful stop signal to the process and give it some time to react
114 for process in self._handled_processes:
115 try:
116 os.killpg(process.pid, signal.SIGINT)
117 process.poll()
118 except ProcessLookupError:
119 # The process is already gone! Nice
120 pass
121
122 # Give the process some time to react
123 if not self.wait_for_process(timeout=self.timeout):
124 basf2.B2WARNING("Process did not react in time. Sending a SIGKILL.")
125 finally:
126 # In any case: kill the process
127 for process in self._handled_processes:
128 try:
129 os.killpg(process.pid, signal.SIGKILL)
130 if not self.wait_for_process(timeout=10, process_list=[process]):
131 backtrace = subprocess.check_output(["gdb", "-q", "-batch", "-ex", "backtrace", "basf2",
132 str(process.pid)]).decode()
133 basf2.B2ERROR("Could not end the process event with a KILL signal. "
134 "This can happen because it is in the uninterruptable sleep state. "
135 "I can not do anything about this!",
136 backtrace=backtrace)
137 except ProcessLookupError:
138 # The process is already gone! Nice
139 pass
140 basf2.B2INFO("...Process stopped")
141
142 # And reinstall the signal handlers
143 self.install_signal_handler()
144

◆ signal_handler()

def signal_handler (   self,
  signal_number,
  signal_frame 
)
The signal handler called on SIGINT and SIGTERM.

Definition at line 90 of file clean_execution.py.

90 def signal_handler(self, signal_number, signal_frame):
91 """
92 The signal handler called on SIGINT and SIGTERM.
93 """
94 self.kill()
95

◆ start()

def start (   self,
  command 
)
Add the execution and terminate gracefully/hard if requested via signal.

Definition at line 62 of file clean_execution.py.

62 def start(self, command):
63 """
64 Add the execution and terminate gracefully/hard if requested via signal.
65 """
66 basf2.B2INFO("Starting ", command)
67 process = subprocess.Popen(command, start_new_session=True)
68 pgid = os.getpgid(process.pid)
69 if pgid != process.pid:
70 basf2.B2WARNING("Subprocess is not session leader. Weird")
71
72 self.install_signal_handler()
73 self._handled_processes.append(process)
74 self._handled_commands.append(command)
75

◆ wait()

def wait (   self)
Wait until all handled calculations have finished.

Definition at line 76 of file clean_execution.py.

76 def wait(self):
77 """
78 Wait until all handled calculations have finished.
79 """
80 while True:
81 for command, process in zip(self._handled_commands, self._handled_processes):
82 if self.has_process_ended(process):
83 returncode = process.returncode
84 basf2.B2INFO("The process ", command, " died with ", returncode,
85 ". Killing the remaining ones.")
86 self.kill()
87 return returncode
88 sleep(1)
89

◆ wait_for_process()

def wait_for_process (   self,
  process_list = None,
  timeout = None,
  minimum_delay = 1 
)
Wait maximum "timeout" for the process to stop.
If it did not end in this period, returns False.

Definition at line 145 of file clean_execution.py.

145 def wait_for_process(self, process_list=None, timeout=None, minimum_delay=1):
146 """
147 Wait maximum "timeout" for the process to stop.
148 If it did not end in this period, returns False.
149 """
150 if process_list is None:
151 process_list = self._handled_processes
152
153 if timeout is None:
154 timeout = self.timeout
155
156 endtime = time() + timeout
157 while True:
158 all_finished = all(self.has_process_ended(process) for process in process_list)
159 if all_finished:
160 return True
161
162 remaining = endtime - time()
163 if remaining <= 0:
164 return False
165
166 sleep(minimum_delay)
167

Member Data Documentation

◆ _handled_commands

_handled_commands
protected

The commands related to the processes.

Definition at line 58 of file clean_execution.py.

◆ _handled_processes

_handled_processes
protected

The processes handled by this class.

Definition at line 56 of file clean_execution.py.

◆ timeout

timeout

Maximum time the process is allowed to stay alive after SIGTERM has been sent.

Definition at line 60 of file clean_execution.py.


The documentation for this class was generated from the following file: