Belle II Software development
terminal_utils.py
1#!/usr/bin/env python3
2
3
10
11# @cond dont_want_no_doxygen_warnings_this_is_sphinxed
12
13"""
14terminal_utils - Helper functions for input from/output to a terminal
15---------------------------------------------------------------------
16
17This module contains modules useful to deal with output on the terminal:
18
19* `Pager`, a class to provide paginated output with less similar to other
20 popular tools like ``git diff``
21* `InputEditor`, a class to open an editor window when requesting longer inputs from users, similar to ``git commit``
22"""
23
24import sys
25import subprocess
26import os
27import io
28import shutil
29import tempfile
30import shlex
31import enum
32
33
34class ANSIColors(enum.Enum):
35 """
36 Simple class to handle color output to the terminal.
37
38 This class allows to very easily add color output to the terminal
39
40 >>> from terminal_utils import ANSIColors as ac
41 >>> print(f"{ac.fg('red')}Some text in {ac.color(underline=True)}RED{ac.reset()}")
42
43 The basic colors can be specified by name (case insensitive) or by enum value.
44 Custom colors can be supplied using hex notation like ``#rgb`` or ``#rrggbb``
45 (Hint: ``matplotlib.colors.to_hex`` might be useful here). As an example to
46 use the viridis colormap to color the output on the terminal::
47
48 from matplotlib import cm
49 from matplotlib.colors import to_hex
50 from terminal_utils import ANSIColors as ac
51
52 # sample the viridis colormap at 16 points
53 for i in range(16):
54 # convert i to be in [0..1] and get the hex color
55 color = to_hex(cm.viridis(i/15))
56 # and print the hex color in the correct color
57 print(f"{i}. {ac.fg(color)}{color}{ac.reset()}")
58
59
60 If the output is not to a terminal color output will be disabled and nothing
61 will be added to the output, for example when redirecting the output to a
62 logfile.
63 """
64 BLACK = 0
65 RED = 1
66 GREEN = 2
67 YELLOW = 3
68 BLUE = 4
69 MAGENTA = 5
70 CYAN = 6
71 WHITE = 7
72
73 @staticmethod
74 def supported():
75 """
76 Check whether the output is a terminal.
77
78 If this is False, the methods `color`, `fg`, `bg` and `reset` will only
79 return an empty string as color output will be disabled
80 """
81 return sys.stdout.isatty()
82
83 @classmethod
84 def convert_color(cls, color):
85 """Convert a color to the necessary ansi code. The argument can either bei
86
87 * an integer corresponding to the ansi color (see the enum values of this class)
88 * the name (case insensitive) of one of the enum values of this class
89 * a hex color code of the form ``#rgb`` or ``#rrggbb``
90
91 Raises:
92 KeyError: if the argument is a string not matching to one of the known colors
93 """
94 if isinstance(color, str):
95 if color[0] == '#':
96 if len(color) == 4:
97 r, g, b = (int(e, 16)*17 for e in color[1:])
98 else:
99 r, g, b = (int(color[i:i+2], 16) for i in [1, 3, 5])
100 return f"2;{r};{g};{b}"
101 try:
102 color = cls[color.upper()].value
103 except KeyError as e:
104 raise KeyError(f"Unknown color: '{color}'") from e
105 return f"5;{color}"
106
107 @classmethod
108 def color(cls, foreground=None, background=None, bold=False, underline=False, inverted=False):
109 """
110 Change terminal colors to the given foreground/background colors and attributes.
111
112 This will return a string to be printed to change the color on the terminal.
113 To revert to default print the output of `reset()`
114
115 Parameters:
116 foreground (int or str): foreground color to use, can be any value accepted by `convert_color`
117 If None is given the current color will not be changed.
118 background (int or str): background color to use, can be any value accepted by `convert_color`.
119 If None is given the current color will not be changed.
120 bold (bool): Use bold font
121 underline (bool): Underline the text
122 inverted (bool): Flip background and foreground color
123 """
124 if not cls.supported():
125 return ""
126
127 codes = []
128 if foreground is not None:
129 codes.append(f"38;{cls.convert_color(foreground)}")
130 if background is not None:
131 codes.append(f"48;{cls.convert_color(background)}")
132 if bold:
133 codes.append(1)
134 if underline:
135 codes.append(4)
136 if inverted:
137 codes.append(7)
138 if not codes:
139 return ""
140 return f"\x1b[{';'.join(map(str, codes))}m"
141
142 @classmethod
143 def fg(cls, color):
144 """Shorthand for `color(foreground=color) <color>`"""
145 return cls.color(foreground=color)
146
147 @classmethod
148 def bg(cls, color):
149 """Shorthand for `color(background=color) <color>`"""
150 return cls.color(background=color)
151
152 @classmethod
153 def reset(cls):
154 """Reset colors to default"""
155 return '\x1b[0m' if cls.supported() else ''
156
157
158class Pager:
159 """
160 Context manager providing page-wise output using ``less``, similar to how
161 git handles long output of for example ``git diff``. Paging will only be
162 active if the output is to a terminal and not piped into a file or to a
163 different program.
164
165 Warning:
166 To be able to see `basf2` log messages like `B2INFO() <basf2.B2INFO>`
167 on the paged output you have to set
168 `basf2.logging.enable_python_logging = True
169 <basf2.LogPythonInterface.enable_python_logging>`
170
171 .. versionchanged:: release-03-00-00
172 the pager no longer waits until all output is complete but can
173 incrementally show output. It can also show output generated in C++
174
175 You can set the environment variable ``$PAGER`` to an empty string or to
176 ``cat`` to disable paging or to a different program (for example ``more``)
177 which should retrieve the output and display it.
178
179 >>> with Pager():
180 >>> for i in range(30):
181 >>> print("This is an example on how to use the pager.")
182
183 Parameters:
184 prompt (str): a string argument allows overriding the description
185 provided by ``less``. Special characters may need escaping.
186 Will only be shown if paging is used and the pager is actually ``less``.
187 quit_if_one_screen (bool): indicating whether the Pager should quit
188 automatically if the content fits on one screen. This implies that
189 the content stays visible on pager exit. True is similar to the
190 behavior of :program:`git diff`, False is similar to :program:`git
191 --help`
192 """
193
194 def __init__(self, prompt=None, quit_if_one_screen=False):
195 """ constructor just remembering the arguments """
196
197 self._pager = os.environ.get("PAGER", "less")
198 # treat "cat" as no pager at all
199 if self._pager == "cat":
200 self._pager = ""
201
202 self._prompt = prompt
203
205 self._quit_if_one_screen = quit_if_one_screen
206
207 self._pager_process = None
208
209 self._original_stdout_fd = None
210
211 self._original_stderr_fd = None
212
213 self._original_stdout = None
214
215 self._original_stderr = None
216
217 self._original_stdout_isatty = None
218
219 self._original_stderr_isatty = None
220
221 def __enter__(self):
222 """ entering context """
223 if not sys.stdout.isatty() or self._pager == "":
224 return
225
226 # save old sys.__stderr__ and sys.__stdout__ objects
227 self._original_stderr = sys.__stderr__
228 self._original_stderr = sys.__stdout__
229 try:
230 # and duplicate the current output file descriptors
231 self._original_stdout_fd = os.dup(sys.stdout.fileno())
232 self._original_stderr_fd = os.dup(sys.stderr.fileno())
233 except AttributeError:
234 # jupyter notebook stdout/stderr objects don't have a fileno so
235 # don't support paging
236 return
237
238 # This is a bit annoying: Usually in python the sys.__stdout__ and
239 # sys.__stderr__ objects point to the original stdout/stderr on program start.
240 #
241 # However we modify the file descriptors directly so these objects will
242 # also be redirected automatically. The documentation for
243 # sys.__stdout__ says it "could be useful to print to the actual
244 # standard stream no matter if the sys.std* object has been
245 # redirected". Also, querying the terminal size looks at
246 # sys.__stdout__ which would no longer be pointing to a tty.
247 #
248 # So lets provide objects pointing to the original file descriptors so
249 # that they behave as expected, i.e. as if we would only have
250 # redirected sys.stdout and sys.stderr ...
251 sys.__stdout__ = io.TextIOWrapper(os.fdopen(self._original_stdout_fd, "wb"))
252 sys.__stderr__ = io.TextIOWrapper(os.fdopen(self._original_stderr_fd, "wb"))
253
254 # also monkey patch the isatty() function of stdout to actually keep
255 # returning True even if we moved the file descriptor
256 self._original_stdout_isatty = sys.stdout.isatty
257 sys.stdout.isatty = lambda: True
258 self._original_stderr_isatty = sys.stderr.isatty
259 sys.stderr.isatty = lambda: True
260
261 # fine, everything is saved, start the pager
262 pager_cmd = [self._pager]
263 if self._pager == "less":
264 if self._prompt is None:
265 self._prompt = '' # same as default prompt
266 self._prompt += ' (press h for help or q to quit)'
267 pager_cmd += ['-R', '-Ps' + self._prompt.strip()]
268 if self._quit_if_one_screen:
269 pager_cmd += ['-F', '-X']
270 self._pager_process = subprocess.Popen(pager_cmd + ["-"], restore_signals=True,
271 stdin=subprocess.PIPE)
272 # and attach stdout to the pager stdin
273 pipe_fd = self._pager_process.stdin.fileno()
274 # and if stderr was a tty do the same for stderr
275 os.dup2(pipe_fd, sys.stdout.fileno())
276 if sys.stderr.isatty():
277 os.dup2(pipe_fd, sys.stderr.fileno())
278
279 def __exit__(self, exc_type, exc_val, exc_tb):
280 """ exiting context """
281 # no pager, nothing to do
282 if self._pager_process is None:
283 return
284
285 # otherwise let's try to flush whatever is left
286 try:
287 sys.stdout.flush()
288 except BrokenPipeError:
289 # apparently pager died before we could flush ... so let's move the
290 # remaining output to /dev/null and flush whatever is left
291 devnull = os.open(os.devnull, os.O_WRONLY)
292 os.dup2(devnull, sys.stdout.fileno())
293 sys.stdout.flush()
294
295 # restore output
296 os.dup2(self._original_stdout_fd, sys.stdout.fileno())
297 os.dup2(self._original_stderr_fd, sys.stderr.fileno())
298 # and the original __stdout__/__stderr__ object just in case. Will also
299 # close the copied file descriptors
300 sys.__stdout__.close()
301 sys.__stderr__.close()
302 sys.__stderr__ = self._original_stderr
303 sys.__stdout__ = self._original_stdout
304 # and clean up our monkey patch of isatty
305 sys.stdout.isatty = self._original_stdout_isatty
306 sys.stderr.isatty = self._original_stderr_isatty
307
308 # wait for pager
309 self._pager_process.communicate()
310
311 # and if we exited due to broken pipe ... then ignore it
312 return exc_type == BrokenPipeError
313
314
315class InputEditor():
316 """
317 Class to get user input via opening a temporary file in a text editor.
318
319 It is an alternative to the python commands ``input()`` or ``sys.stdin.readlines`` and is
320 similar to the behaviour of ``git commit`` for editing commit messages. By using an editor
321 instead of the command line, the user is motivated to give expressive multi-line input,
322 leveraging the full text editing capabilities of his editor. This function cannot be used for
323 example in interactive terminal scripts, whenever detailed user input is required.
324
325 Heavily inspired by the code in this blog post:
326 https://chase-seibert.github.io/blog/2012/10/31/python-fork-exec-vim-raw-input.html
327
328 Parameters:
329 editor_command: Editor to open for user input. If ``None``, get
330 default editor from environment variables. It should be the name
331 of a shell executable and can contain command line arguments.
332 initial_content: Initial string to insert into the temporary file that
333 is opened for user input. Can be used for default input or to
334 insert comment lines with instructions.
335 commentlines_start_with: Optionally define string with which comment
336 lines start
337 """
338
339 def __init__(self,
340 editor_command: str = None,
341 initial_content: str = None,
342 commentlines_start_with: str = "#"):
343 """Constructor"""
344 # Use provided editor command or editor command from environment variables
345 editor_command_string = editor_command or self._default_environment_editor()
346
347 self.editor_command_list = shlex.split(editor_command_string, posix=True)
348 # check if editor executable exists and if not, prompt for new editor command
349 if shutil.which(self.editor_command_list[0]) is None:
350 self._prompt_for_editor()
351
352
353 self.initial_content = initial_content
354
355 self.comment_string = commentlines_start_with
356
357 def input(self):
358 """
359 Get user input via editing a temporary file in an editor. If opening the editor fails, fall
360 back to command line input
361 """
362 try:
363 with tempfile.NamedTemporaryFile(mode='r+') as tmpfile:
364 if self.initial_content:
365 tmpfile.write(self.initial_content)
366 tmpfile.flush()
367 subprocess.check_call(self.editor_command_list + [tmpfile.name])
368 tmpfile.seek(0)
369 input_string = tmpfile.read().strip()
370 input_string = self._remove_comment_lines(input_string)
371
372 except (FileNotFoundError, subprocess.CalledProcessError):
373 # If editor not found or other problem with subprocess call, fall back to terminal input
374 print(f"Could not open {self.get_editor_command()}.")
375 print("Try to set your $VISUAL or $EDITOR environment variables properly.\n")
376 sys.exit(1)
377
378 return input_string
379
380 def get_editor_command(self):
381 """Get editor shell command string used for user input."""
382 # Construct string from list which holds the executable and args
383 return " ".join(self.editor_command_list)
384
385 def _remove_comment_lines(self, a_string):
386 """
387 Remove lines from string that start with a comment character and return modified version.
388 """
389 if self.comment_string is not None:
390 a_string = "\n".join(
391 [line for line in a_string.splitlines()
392 if not line.startswith(self.comment_string)]).strip()
393 return a_string
394
395 def _default_environment_editor(self):
396 """
397 Return editor from environment variables. If not existing, return vi(m) as default.
398 """
399 editor_command = (os.environ.get('VISUAL') or os.environ.get('EDITOR') or
400 'vim' or 'vi')
401 return editor_command
402
403 def _prompt_for_editor(self):
404 """
405 Ask user to provide editor command
406 """
407 # Prompt user for editor command until one is found which exists in PATH
408 while True:
409 new_editor_command_string = input("Use editor: ")
410 new_editor_command_list = shlex.split(new_editor_command_string, posix=True)
411
412 if shutil.which(new_editor_command_list[0]) is not None:
413 self.editor_command_list = new_editor_command_list
414 return self.editor_command_list
415
416 else:
417 print(f"Editor '{self.editor_command_list[0]}' not found in $PATH.")
418
419# @endcond
420