Source code for terminal_utils

#!/usr/bin/env python3

##########################################################################
# basf2 (Belle II Analysis Software Framework)                           #
# Author: The Belle II Collaboration                                     #
#                                                                        #
# See git log for contributors and copyright holders.                    #
# This file is licensed under LGPL-3.0, see LICENSE.md.                  #
##########################################################################

# @cond dont_want_no_doxygen_warnings_this_is_sphinxed

"""
terminal_utils - Helper functions for input from/output to a terminal
---------------------------------------------------------------------

This module contains modules useful to deal with output on the terminal:

* `Pager`, a class to provide paginated output with less similar to other
  popular tools like ``git diff``
* `InputEditor`, a class to open an editor window when requesting longer inputs
  from users, similar to ``git commit``
"""

import sys
import subprocess
import os
import io
import shutil
import tempfile
import shlex
import enum


[docs] class ANSIColors(enum.Enum): """ Simple class to handle color output to the terminal. This class allows to very easily add color output to the terminal >>> from terminal_utils import ANSIColors as ac >>> print(f"{ac.fg('red')}Some text in {ac.color(underline=True)}RED{ac.reset()}") The basic colors can be specified by name (case insensitive) or by enum value. Custom colors can be supplied using hex notation like ``#rgb`` or ``#rrggbb`` (Hint: ``matplotlib.colors.to_hex`` might be useful here). As an example to use the viridis colormap to color the output on the terminal:: from matplotlib import cm from matplotlib.colors import to_hex from terminal_utils import ANSIColors as ac # sample the viridis colormap at 16 points for i in range(16): # convert i to be in [0..1] and get the hex color color = to_hex(cm.viridis(i/15)) # and print the hex color in the correct color print(f"{i}. {ac.fg(color)}{color}{ac.reset()}") If the output is not to a terminal color output will be disabled and nothing will be added to the output, for example when redirecting the output to a logfile. """ BLACK = 0 RED = 1 GREEN = 2 YELLOW = 3 BLUE = 4 MAGENTA = 5 CYAN = 6 WHITE = 7
[docs] @staticmethod def supported(): """ Check whether the output is a terminal. If this is False, the methods `color`, `fg`, `bg` and `reset` will only return an empty string as color output will be disabled """ return sys.stdout.isatty()
[docs] @classmethod def convert_color(cls, color): """Convert a color to the necessary ansi code. The argument can either bei * an integer corresponding to the ansi color (see the enum values of this class) * the name (case insensitive) of one of the enum values of this class * a hex color code of the form ``#rgb`` or ``#rrggbb`` Raises: KeyError: if the argument is a string not matching to one of the known colors """ if isinstance(color, str): if color[0] == '#': if len(color) == 4: r, g, b = (int(e, 16)*17 for e in color[1:]) else: r, g, b = (int(color[i:i+2], 16) for i in [1, 3, 5]) return f"2;{r};{g};{b}" try: color = cls[color.upper()].value except KeyError as e: raise KeyError(f"Unknown color: '{color}'") from e return f"5;{color}"
[docs] @classmethod def color(cls, foreground=None, background=None, bold=False, underline=False, inverted=False): """ Change terminal colors to the given foreground/background colors and attributes. This will return a string to be printed to change the color on the terminal. To revert to default print the output of `reset()` Parameters: foreground (int or str): foreground color to use, can be any value accepted by `convert_color` If None is given the current color will not be changed. background (int or str): background color to use, can be any value accepted by `convert_color`. If None is given the current color will not be changed. bold (bool): Use bold font underline (bool): Underline the text inverted (bool): Flip background and foreground color """ if not cls.supported(): return "" codes = [] if foreground is not None: codes.append(f"38;{cls.convert_color(foreground)}") if background is not None: codes.append(f"48;{cls.convert_color(background)}") if bold: codes.append(1) if underline: codes.append(4) if inverted: codes.append(7) if not codes: return "" return f"\x1b[{';'.join(map(str, codes))}m"
[docs] @classmethod def fg(cls, color): """Shorthand for `color(foreground=color) <color>`""" return cls.color(foreground=color)
[docs] @classmethod def bg(cls, color): """Shorthand for `color(background=color) <color>`""" return cls.color(background=color)
[docs] @classmethod def reset(cls): """Reset colors to default""" return '\x1b[0m' if cls.supported() else ''
[docs] class Pager: """ Context manager providing page-wise output using ``less``, similar to how git handles long output of for example ``git diff``. Paging will only be active if the output is to a terminal and not piped into a file or to a different program. Warning: To be able to see `basf2` log messages like `B2INFO() <basf2.B2INFO>` on the paged output you have to set `basf2.logging.enable_python_logging = True <basf2.LogPythonInterface.enable_python_logging>` .. versionchanged:: release-03-00-00 the pager no longer waits until all output is complete but can incrementally show output. It can also show output generated in C++ You can set the environment variable ``$PAGER`` to an empty string or to ``cat`` to disable paging or to a different program (for example ``more``) which should retrieve the output and display it. >>> with Pager(): >>> for i in range(30): >>> print("This is an example on how to use the pager.") Parameters: prompt (str): a string argument allows overriding the description provided by ``less``. Special characters may need escaping. Will only be shown if paging is used and the pager is actually ``less``. quit_if_one_screen (bool): indicating whether the Pager should quit automatically if the content fits on one screen. This implies that the content stays visible on pager exit. True is similar to the behavior of :program:`git diff`, False is similar to :program:`git --help` """ def __init__(self, prompt=None, quit_if_one_screen=False): """ constructor just remembering the arguments """ #: pager program to use self._pager = os.environ.get("PAGER", "less") # treat "cat" as no pager at all if self._pager == "cat": self._pager = "" #: prompt string self._prompt = prompt #: flag indicating whether the pager should automatically exit if the # content fits on one screen self._quit_if_one_screen = quit_if_one_screen #: Pager subprocess self._pager_process = None #: Original file descriptor for stdout before entering the context self._original_stdout_fd = None #: Original file descriptor for stderr before entering the context self._original_stderr_fd = None #: Original sys.__stdout__ before entering the context self._original_stdout = None #: Original sys.__stderr__ before entering the context self._original_stderr = None #: Original sys.stdout.isatty self._original_stdout_isatty = None #: Original sys.stderr.isatty self._original_stderr_isatty = None def __enter__(self): """ entering context """ if not sys.stdout.isatty() or self._pager == "": return # save old sys.__stderr__ and sys.__stdout__ objects self._original_stderr = sys.__stderr__ self._original_stderr = sys.__stdout__ try: # and duplicate the current output file descriptors self._original_stdout_fd = os.dup(sys.stdout.fileno()) self._original_stderr_fd = os.dup(sys.stderr.fileno()) except AttributeError: # jupyter notebook stdout/stderr objects don't have a fileno so # don't support paging return # This is a bit annoying: Usually in python the sys.__stdout__ and # sys.__stderr__ objects point to the original stdout/stderr on program start. # # However we modify the file descriptors directly so these objects will # also be redirected automatically. The documentation for # sys.__stdout__ says it "could be useful to print to the actual # standard stream no matter if the sys.std* object has been # redirected". Also, querying the terminal size looks at # sys.__stdout__ which would no longer be pointing to a tty. # # So lets provide objects pointing to the original file descriptors so # that they behave as expected, i.e. as if we would only have # redirected sys.stdout and sys.stderr ... sys.__stdout__ = io.TextIOWrapper(os.fdopen(self._original_stdout_fd, "wb")) sys.__stderr__ = io.TextIOWrapper(os.fdopen(self._original_stderr_fd, "wb")) # also monkey patch the isatty() function of stdout to actually keep # returning True even if we moved the file descriptor self._original_stdout_isatty = sys.stdout.isatty sys.stdout.isatty = lambda: True self._original_stderr_isatty = sys.stderr.isatty sys.stderr.isatty = lambda: True # fine, everything is saved, start the pager pager_cmd = [self._pager] if self._pager == "less": if self._prompt is None: self._prompt = '' # same as default prompt self._prompt += ' (press h for help or q to quit)' pager_cmd += ['-R', '-Ps' + self._prompt.strip()] if self._quit_if_one_screen: pager_cmd += ['-F', '-X'] self._pager_process = subprocess.Popen(pager_cmd + ["-"], restore_signals=True, stdin=subprocess.PIPE) # and attach stdout to the pager stdin pipe_fd = self._pager_process.stdin.fileno() # and if stderr was a tty do the same for stderr os.dup2(pipe_fd, sys.stdout.fileno()) if sys.stderr.isatty(): os.dup2(pipe_fd, sys.stderr.fileno()) def __exit__(self, exc_type, exc_val, exc_tb): """ exiting context """ # no pager, nothing to do if self._pager_process is None: return # otherwise let's try to flush whatever is left try: sys.stdout.flush() except BrokenPipeError: # apparently pager died before we could flush ... so let's move the # remaining output to /dev/null and flush whatever is left devnull = os.open(os.devnull, os.O_WRONLY) os.dup2(devnull, sys.stdout.fileno()) sys.stdout.flush() # restore output os.dup2(self._original_stdout_fd, sys.stdout.fileno()) os.dup2(self._original_stderr_fd, sys.stderr.fileno()) # and the original __stdout__/__stderr__ object just in case. Will also # close the copied file descriptors sys.__stdout__.close() sys.__stderr__.close() sys.__stderr__ = self._original_stderr sys.__stdout__ = self._original_stdout # and clean up our monkey patch of isatty sys.stdout.isatty = self._original_stdout_isatty sys.stderr.isatty = self._original_stderr_isatty # wait for pager self._pager_process.communicate() # and if we exited due to broken pipe ... then ignore it return exc_type == BrokenPipeError
[docs] class InputEditor(): """ Class to get user input via opening a temporary file in a text editor. It is an alternative to the python commands ``input()`` or ``sys.stdin.readlines`` and is similar to the behaviour of ``git commit`` for editing commit messages. By using an editor instead of the command line, the user is motivated to give expressive multi-line input, leveraging the full text editing capabilities of his editor. This function cannot be used for example in interactive terminal scripts, whenever detailed user input is required. Heavily inspired by the code in this blog post: https://chase-seibert.github.io/blog/2012/10/31/python-fork-exec-vim-raw-input.html Parameters: editor_command: Editor to open for user input. If ``None``, get default editor from environment variables. It should be the name of a shell executable and can contain command line arguments. initial_content: Initial string to insert into the temporary file that is opened for user input. Can be used for default input or to insert comment lines with instructions. commentlines_start_with: Optionally define string with which comment lines start """ def __init__(self, editor_command: str = None, initial_content: str = None, commentlines_start_with: str = "#"): """Constructor""" # Use provided editor command or editor command from environment variables editor_command_string = editor_command or self._default_environment_editor() #: command line for the editor, split to separate executable name command line arguments self.editor_command_list = shlex.split(editor_command_string, posix=True) # check if editor executable exists and if not, prompt for new editor command if shutil.which(self.editor_command_list[0]) is None: self._prompt_for_editor() #: initial content of the editor window self.initial_content = initial_content #: string which starts comments in the file self.comment_string = commentlines_start_with
[docs] def input(self): """ Get user input via editing a temporary file in an editor. If opening the editor fails, fall back to command line input """ try: with tempfile.NamedTemporaryFile(mode='r+') as tmpfile: if self.initial_content: tmpfile.write(self.initial_content) tmpfile.flush() subprocess.check_call(self.editor_command_list + [tmpfile.name]) tmpfile.seek(0) input_string = tmpfile.read().strip() input_string = self._remove_comment_lines(input_string) except (FileNotFoundError, subprocess.CalledProcessError): # If editor not found or other problem with subprocess call, fall back to terminal input print(f"Could not open {self.get_editor_command()}.") print("Try to set your $VISUAL or $EDITOR environment variables properly.\n") sys.exit(1) return input_string
[docs] def get_editor_command(self): """Get editor shell command string used for user input.""" # Construct string from list which holds the executable and args return " ".join(self.editor_command_list)
def _remove_comment_lines(self, a_string): """ Remove lines from string that start with a comment character and return modified version. """ if self.comment_string is not None: a_string = "\n".join( [line for line in a_string.splitlines() if not line.startswith(self.comment_string)]).strip() return a_string def _default_environment_editor(self): """ Return editor from environment variables. If not existing, return vi(m) as default. """ editor_command = (os.environ.get('VISUAL') or os.environ.get('EDITOR') or 'vim' or 'vi') return editor_command def _prompt_for_editor(self): """ Ask user to provide editor command """ # Prompt user for editor command until one is found which exists in PATH while True: new_editor_command_string = input("Use editor: ") new_editor_command_list = shlex.split(new_editor_command_string, posix=True) if shutil.which(new_editor_command_list[0]) is not None: self.editor_command_list = new_editor_command_list return self.editor_command_list else: print(f"Editor '{self.editor_command_list[0]}' not found in $PATH.")
# @endcond