Belle II Software development
terminal_utils.py
1#!/usr/bin/env python3
2
3
10
11# @cond dont_want_no_doxygen_warnings_this_is_sphinxed
12
13"""
14terminal_utils - Helper functions for input from/output to a terminal
15---------------------------------------------------------------------
16
17This module contains modules useful to deal with output on the terminal:
18
19* `Pager`, a class to provide paginated output with less similar to other
20 popular tools like ``git diff``
21* `InputEditor`, a class to open an editor window when requesting longer inputs
22 from users, similar to ``git commit``
23"""
24
25import sys
26import subprocess
27import os
28import io
29import shutil
30import tempfile
31import shlex
32import enum
33
34
35class ANSIColors(enum.Enum):
36 """
37 Simple class to handle color output to the terminal.
38
39 This class allows to very easily add color output to the terminal
40
41 >>> from terminal_utils import ANSIColors as ac
42 >>> print(f"{ac.fg('red')}Some text in {ac.color(underline=True)}RED{ac.reset()}")
43
44 The basic colors can be specified by name (case insensitive) or by enum value.
45 Custom colors can be supplied using hex notation like ``#rgb`` or ``#rrggbb``
46 (Hint: ``matplotlib.colors.to_hex`` might be useful here). As an example to
47 use the viridis colormap to color the output on the terminal::
48
49 from matplotlib import cm
50 from matplotlib.colors import to_hex
51 from terminal_utils import ANSIColors as ac
52
53 # sample the viridis colormap at 16 points
54 for i in range(16):
55 # convert i to be in [0..1] and get the hex color
56 color = to_hex(cm.viridis(i/15))
57 # and print the hex color in the correct color
58 print(f"{i}. {ac.fg(color)}{color}{ac.reset()}")
59
60
61 If the output is not to a terminal color output will be disabled and nothing
62 will be added to the output, for example when redirecting the output to a
63 logfile.
64 """
65 BLACK = 0
66 RED = 1
67 GREEN = 2
68 YELLOW = 3
69 BLUE = 4
70 MAGENTA = 5
71 CYAN = 6
72 WHITE = 7
73
74 @staticmethod
75 def supported():
76 """
77 Check whether the output is a terminal.
78
79 If this is False, the methods `color`, `fg`, `bg` and `reset` will only
80 return an empty string as color output will be disabled
81 """
82 return sys.stdout.isatty()
83
84 @classmethod
85 def convert_color(cls, color):
86 """Convert a color to the necessary ansi code. The argument can either bei
87
88 * an integer corresponding to the ansi color (see the enum values of this class)
89 * the name (case insensitive) of one of the enum values of this class
90 * a hex color code of the form ``#rgb`` or ``#rrggbb``
91
92 Raises:
93 KeyError: if the argument is a string not matching to one of the known colors
94 """
95 if isinstance(color, str):
96 if color[0] == '#':
97 if len(color) == 4:
98 r, g, b = (int(e, 16)*17 for e in color[1:])
99 else:
100 r, g, b = (int(color[i:i+2], 16) for i in [1, 3, 5])
101 return f"2;{r};{g};{b}"
102 try:
103 color = cls[color.upper()].value
104 except KeyError as e:
105 raise KeyError(f"Unknown color: '{color}'") from e
106 return f"5;{color}"
107
108 @classmethod
109 def color(cls, foreground=None, background=None, bold=False, underline=False, inverted=False):
110 """
111 Change terminal colors to the given foreground/background colors and attributes.
112
113 This will return a string to be printed to change the color on the terminal.
114 To revert to default print the output of `reset()`
115
116 Parameters:
117 foreground (int or str): foreground color to use, can be any value accepted by `convert_color`
118 If None is given the current color will not be changed.
119 background (int or str): background color to use, can be any value accepted by `convert_color`.
120 If None is given the current color will not be changed.
121 bold (bool): Use bold font
122 underline (bool): Underline the text
123 inverted (bool): Flip background and foreground color
124 """
125 if not cls.supported():
126 return ""
127
128 codes = []
129 if foreground is not None:
130 codes.append(f"38;{cls.convert_color(foreground)}")
131 if background is not None:
132 codes.append(f"48;{cls.convert_color(background)}")
133 if bold:
134 codes.append(1)
135 if underline:
136 codes.append(4)
137 if inverted:
138 codes.append(7)
139 if not codes:
140 return ""
141 return f"\x1b[{';'.join(map(str, codes))}m"
142
143 @classmethod
144 def fg(cls, color):
145 """Shorthand for `color(foreground=color) <color>`"""
146 return cls.color(foreground=color)
147
148 @classmethod
149 def bg(cls, color):
150 """Shorthand for `color(background=color) <color>`"""
151 return cls.color(background=color)
152
153 @classmethod
154 def reset(cls):
155 """Reset colors to default"""
156 return '\x1b[0m' if cls.supported() else ''
157
158
159class Pager:
160 """
161 Context manager providing page-wise output using ``less``, similar to how
162 git handles long output of for example ``git diff``. Paging will only be
163 active if the output is to a terminal and not piped into a file or to a
164 different program.
165
166 Warning:
167 To be able to see `basf2` log messages like `B2INFO() <basf2.B2INFO>`
168 on the paged output you have to set
169 `basf2.logging.enable_python_logging = True
170 <basf2.LogPythonInterface.enable_python_logging>`
171
172 .. versionchanged:: release-03-00-00
173 the pager no longer waits until all output is complete but can
174 incrementally show output. It can also show output generated in C++
175
176 You can set the environment variable ``$PAGER`` to an empty string or to
177 ``cat`` to disable paging or to a different program (for example ``more``)
178 which should retrieve the output and display it.
179
180 >>> with Pager():
181 >>> for i in range(30):
182 >>> print("This is an example on how to use the pager.")
183
184 Parameters:
185 prompt (str): a string argument allows overriding the description
186 provided by ``less``. Special characters may need escaping.
187 Will only be shown if paging is used and the pager is actually ``less``.
188 quit_if_one_screen (bool): indicating whether the Pager should quit
189 automatically if the content fits on one screen. This implies that
190 the content stays visible on pager exit. True is similar to the
191 behavior of :program:`git diff`, False is similar to :program:`git
192 --help`
193 """
194
195 def __init__(self, prompt=None, quit_if_one_screen=False):
196 """ constructor just remembering the arguments """
197
198 self._pager = os.environ.get("PAGER", "less")
199 # treat "cat" as no pager at all
200 if self._pager == "cat":
201 self._pager = ""
202
203 self._prompt = prompt
204
206 self._quit_if_one_screen = quit_if_one_screen
207
208 self._pager_process = None
209
210 self._original_stdout_fd = None
211
212 self._original_stderr_fd = None
213
214 self._original_stdout = None
215
216 self._original_stderr = None
217
218 self._original_stdout_isatty = None
219
220 self._original_stderr_isatty = None
221
222 def __enter__(self):
223 """ entering context """
224 if not sys.stdout.isatty() or self._pager == "":
225 return
226
227 # save old sys.__stderr__ and sys.__stdout__ objects
228 self._original_stderr = sys.__stderr__
229 self._original_stderr = sys.__stdout__
230 try:
231 # and duplicate the current output file descriptors
232 self._original_stdout_fd = os.dup(sys.stdout.fileno())
233 self._original_stderr_fd = os.dup(sys.stderr.fileno())
234 except AttributeError:
235 # jupyter notebook stdout/stderr objects don't have a fileno so
236 # don't support paging
237 return
238
239 # This is a bit annoying: Usually in python the sys.__stdout__ and
240 # sys.__stderr__ objects point to the original stdout/stderr on program start.
241 #
242 # However we modify the file descriptors directly so these objects will
243 # also be redirected automatically. The documentation for
244 # sys.__stdout__ says it "could be useful to print to the actual
245 # standard stream no matter if the sys.std* object has been
246 # redirected". Also, querying the terminal size looks at
247 # sys.__stdout__ which would no longer be pointing to a tty.
248 #
249 # So lets provide objects pointing to the original file descriptors so
250 # that they behave as expected, i.e. as if we would only have
251 # redirected sys.stdout and sys.stderr ...
252 sys.__stdout__ = io.TextIOWrapper(os.fdopen(self._original_stdout_fd, "wb"))
253 sys.__stderr__ = io.TextIOWrapper(os.fdopen(self._original_stderr_fd, "wb"))
254
255 # also monkey patch the isatty() function of stdout to actually keep
256 # returning True even if we moved the file descriptor
257 self._original_stdout_isatty = sys.stdout.isatty
258 sys.stdout.isatty = lambda: True
259 self._original_stderr_isatty = sys.stderr.isatty
260 sys.stderr.isatty = lambda: True
261
262 # fine, everything is saved, start the pager
263 pager_cmd = [self._pager]
264 if self._pager == "less":
265 if self._prompt is None:
266 self._prompt = '' # same as default prompt
267 self._prompt += ' (press h for help or q to quit)'
268 pager_cmd += ['-R', '-Ps' + self._prompt.strip()]
269 if self._quit_if_one_screen:
270 pager_cmd += ['-F', '-X']
271 self._pager_process = subprocess.Popen(pager_cmd + ["-"], restore_signals=True,
272 stdin=subprocess.PIPE)
273 # and attach stdout to the pager stdin
274 pipe_fd = self._pager_process.stdin.fileno()
275 # and if stderr was a tty do the same for stderr
276 os.dup2(pipe_fd, sys.stdout.fileno())
277 if sys.stderr.isatty():
278 os.dup2(pipe_fd, sys.stderr.fileno())
279
280 def __exit__(self, exc_type, exc_val, exc_tb):
281 """ exiting context """
282 # no pager, nothing to do
283 if self._pager_process is None:
284 return
285
286 # otherwise let's try to flush whatever is left
287 try:
288 sys.stdout.flush()
289 except BrokenPipeError:
290 # apparently pager died before we could flush ... so let's move the
291 # remaining output to /dev/null and flush whatever is left
292 devnull = os.open(os.devnull, os.O_WRONLY)
293 os.dup2(devnull, sys.stdout.fileno())
294 sys.stdout.flush()
295
296 # restore output
297 os.dup2(self._original_stdout_fd, sys.stdout.fileno())
298 os.dup2(self._original_stderr_fd, sys.stderr.fileno())
299 # and the original __stdout__/__stderr__ object just in case. Will also
300 # close the copied file descriptors
301 sys.__stdout__.close()
302 sys.__stderr__.close()
303 sys.__stderr__ = self._original_stderr
304 sys.__stdout__ = self._original_stdout
305 # and clean up our monkey patch of isatty
306 sys.stdout.isatty = self._original_stdout_isatty
307 sys.stderr.isatty = self._original_stderr_isatty
308
309 # wait for pager
310 self._pager_process.communicate()
311
312 # and if we exited due to broken pipe ... then ignore it
313 return exc_type == BrokenPipeError
314
315
316class InputEditor():
317 """
318 Class to get user input via opening a temporary file in a text editor.
319
320 It is an alternative to the python commands ``input()`` or ``sys.stdin.readlines`` and is
321 similar to the behaviour of ``git commit`` for editing commit messages. By using an editor
322 instead of the command line, the user is motivated to give expressive multi-line input,
323 leveraging the full text editing capabilities of his editor. This function cannot be used for
324 example in interactive terminal scripts, whenever detailed user input is required.
325
326 Heavily inspired by the code in this blog post:
327 https://chase-seibert.github.io/blog/2012/10/31/python-fork-exec-vim-raw-input.html
328
329 Parameters:
330 editor_command: Editor to open for user input. If ``None``, get
331 default editor from environment variables. It should be the name
332 of a shell executable and can contain command line arguments.
333 initial_content: Initial string to insert into the temporary file that
334 is opened for user input. Can be used for default input or to
335 insert comment lines with instructions.
336 commentlines_start_with: Optionally define string with which comment
337 lines start
338 """
339
340 def __init__(self,
341 editor_command: str = None,
342 initial_content: str = None,
343 commentlines_start_with: str = "#"):
344 """Constructor"""
345 # Use provided editor command or editor command from environment variables
346 editor_command_string = editor_command or self._default_environment_editor()
347 ## command line for the editor, split to separate executable name command line arguments
348 self.editor_command_list = shlex.split(editor_command_string, posix=True)
349 # check if editor executable exists and if not, prompt for new editor command
350 if shutil.which(self.editor_command_list[0]) is None:
351 self._prompt_for_editor()
352
353 ## initial content of the editor window
354 self.initial_content = initial_content
355 ## string which starts comments in the file
356 self.comment_string = commentlines_start_with
357
358 def input(self):
359 """
360 Get user input via editing a temporary file in an editor. If opening the editor fails, fall
361 back to command line input
362 """
363 try:
364 with tempfile.NamedTemporaryFile(mode='r+') as tmpfile:
365 if self.initial_content:
366 tmpfile.write(self.initial_content)
367 tmpfile.flush()
368 subprocess.check_call(self.editor_command_list + [tmpfile.name])
369 tmpfile.seek(0)
370 input_string = tmpfile.read().strip()
371 input_string = self._remove_comment_lines(input_string)
372
373 except (FileNotFoundError, subprocess.CalledProcessError):
374 # If editor not found or other problem with subprocess call, fall back to terminal input
375 print(f"Could not open {self.get_editor_command()}.")
376 print("Try to set your $VISUAL or $EDITOR environment variables properly.\n")
377 sys.exit(1)
378
379 return input_string
380
381 def get_editor_command(self):
382 """Get editor shell command string used for user input."""
383 # Construct string from list which holds the executable and args
384 return " ".join(self.editor_command_list)
385
386 def _remove_comment_lines(self, a_string):
387 """
388 Remove lines from string that start with a comment character and return modified version.
389 """
390 if self.comment_string is not None:
391 a_string = "\n".join(
392 [line for line in a_string.splitlines()
393 if not line.startswith(self.comment_string)]).strip()
394 return a_string
395
396 def _default_environment_editor(self):
397 """
398 Return editor from environment variables. If not existing, return vi(m) as default.
399 """
400 editor_command = (os.environ.get('VISUAL') or os.environ.get('EDITOR') or
401 'vim' or 'vi')
402 return editor_command
403
404 def _prompt_for_editor(self):
405 """
406 Ask user to provide editor command
407 """
408 # Prompt user for editor command until one is found which exists in PATH
409 while True:
410 new_editor_command_string = input("Use editor: ")
411 new_editor_command_list = shlex.split(new_editor_command_string, posix=True)
412
413 if shutil.which(new_editor_command_list[0]) is not None:
414 self.editor_command_list = new_editor_command_list
415 return self.editor_command_list
416
417 else:
418 print(f"Editor '{self.editor_command_list[0]}' not found in $PATH.")
419
420# @endcond