Belle II Software  release-08-01-10
terminal_utils.py
1 #!/usr/bin/env python3
2 
3 
10 
11 # @cond dont_want_no_doxygen_warnings_this_is_sphinxed
12 
13 """
14 terminal_utils - Helper functions for input from/output to a terminal
15 ---------------------------------------------------------------------
16 
17 This module contains modules useful to deal with output on the terminal:
18 
19 * `Pager`, a class to provide paginated output with less similar to other
20  popular tools like ``git diff``
21 * `InputEditor`, a class to open an editor window when requesting longer inputs
22  from users, similar to ``git commit``
23 """
24 
25 import sys
26 import subprocess
27 import os
28 import io
29 import shutil
30 import tempfile
31 import shlex
32 import enum
33 
34 
35 class ANSIColors(enum.Enum):
36  """
37  Simple class to handle color output to the terminal.
38 
39  This class allows to very easily add color output to the terminal
40 
41  >>> from terminal_utils import ANSIColors as ac
42  >>> print(f"{ac.fg('red')}Some text in {ac.color(underline=True)}RED{ac.reset()}")
43 
44  The basic colors can be specified by name (case insensitive) or by enum value.
45  Custom colors can be supplied using hex notation like ``#rgb`` or ``#rrggbb``
46  (Hint: ``matplotlib.colors.to_hex`` might be useful here). As an example to
47  use the viridis colormap to color the output on the terminal::
48 
49  from matplotlib import cm
50  from matplotlib.colors import to_hex
51  from terminal_utils import ANSIColors as ac
52 
53  # sample the viridis colormap at 16 points
54  for i in range(16):
55  # convert i to be in [0..1] and get the hex color
56  color = to_hex(cm.viridis(i/15))
57  # and print the hex color in the correct color
58  print(f"{i}. {ac.fg(color)}{color}{ac.reset()}")
59 
60 
61  If the output is not to a terminal color output will be disabled and nothing
62  will be added to the output, for example when redirecting the output to a
63  logfile.
64  """
65  BLACK = 0
66  RED = 1
67  GREEN = 2
68  YELLOW = 3
69  BLUE = 4
70  MAGENTA = 5
71  CYAN = 6
72  WHITE = 7
73 
74  @staticmethod
75  def supported():
76  """
77  Check whether the output is a terminal.
78 
79  If this is False, the methods `color`, `fg`, `bg` and `reset` will only
80  return an empty string as color output will be disabled
81  """
82  return sys.stdout.isatty()
83 
84  @classmethod
85  def convert_color(cls, color):
86  """Convert a color to the necessary ansi code. The argument can either bei
87 
88  * an integer corresponding to the ansi color (see the enum values of this class)
89  * the name (case insensitive) of one of the enum values of this class
90  * a hex color code of the form ``#rgb`` or ``#rrggbb``
91 
92  Raises:
93  KeyError: if the argument is a string not matching to one of the known colors
94  """
95  if isinstance(color, str):
96  if color[0] == '#':
97  if len(color) == 4:
98  r, g, b = (int(e, 16)*17 for e in color[1:])
99  else:
100  r, g, b = (int(color[i:i+2], 16) for i in [1, 3, 5])
101  return f"2;{r};{g};{b}"
102  try:
103  color = cls[color.upper()].value
104  except KeyError as e:
105  raise KeyError(f"Unknown color: '{color}'") from e
106  return f"5;{color}"
107 
108  @classmethod
109  def color(cls, foreground=None, background=None, bold=False, underline=False, inverted=False):
110  """
111  Change terminal colors to the given foreground/background colors and attributes.
112 
113  This will return a string to be printed to change the color on the terminal.
114  To revert to default print the output of `reset()`
115 
116  Parameters:
117  foreground (int or str): foreground color to use, can be any value accepted by `convert_color`
118  If None is given the current color will not be changed.
119  background (int or str): background color to use, can be any value accepted by `convert_color`.
120  If None is given the current color will not be changed.
121  bold (bool): Use bold font
122  underline (bool): Underline the text
123  inverted (bool): Flip background and foreground color
124  """
125  if not cls.supported():
126  return ""
127 
128  codes = []
129  if foreground is not None:
130  codes.append(f"38;{cls.convert_color(foreground)}")
131  if background is not None:
132  codes.append(f"48;{cls.convert_color(background)}")
133  if bold:
134  codes.append(1)
135  if underline:
136  codes.append(4)
137  if inverted:
138  codes.append(7)
139  if not codes:
140  return ""
141  return '\x1b[{}m'.format(";".join(map(str, codes)))
142 
143  @classmethod
144  def fg(cls, color):
145  """Shorthand for `color(foreground=color) <color>`"""
146  return cls.color(foreground=color)
147 
148  @classmethod
149  def bg(cls, color):
150  """Shorthand for `color(background=color) <color>`"""
151  return cls.color(background=color)
152 
153  @classmethod
154  def reset(cls):
155  """Reset colors to default"""
156  return '\x1b[0m' if cls.supported() else ''
157 
158 
159 class Pager:
160  """
161  Context manager providing page-wise output using ``less``, similar to how
162  git handles long output of for example ``git diff``. Paging will only be
163  active if the output is to a terminal and not piped into a file or to a
164  different program.
165 
166  Warning:
167  To be able to see `basf2` log messages like `B2INFO() <basf2.B2INFO>`
168  on the paged output you have to set
169  `basf2.logging.enable_python_logging = True
170  <basf2.LogPythonInterface.enable_python_logging>`
171 
172  .. versionchanged:: release-03-00-00
173  the pager no longer waits until all output is complete but can
174  incrementally show output. It can also show output generated in C++
175 
176  You can set the environment variable ``$PAGER`` to an empty string or to
177  ``cat`` to disable paging or to a different program (for example ``more``)
178  which should retrieve the output and display it.
179 
180  >>> with Pager():
181  >>> for i in range(30):
182  >>> print("This is an example on how to use the pager.")
183 
184  Parameters:
185  prompt (str): a string argument allows overriding the description
186  provided by ``less``. Special characters may need escaping.
187  Will only be shown if paging is used and the pager is actually ``less``.
188  quit_if_one_screen (bool): indicating whether the Pager should quit
189  automatically if the content fits on one screen. This implies that
190  the content stays visible on pager exit. True is similar to the
191  behavior of :program:`git diff`, False is similar to :program:`git
192  --help`
193  """
194 
195  def __init__(self, prompt=None, quit_if_one_screen=False):
196  """ constructor just remembering the arguments """
197 
198  self._pager = os.environ.get("PAGER", "less")
199  # treat "cat" as no pager at all
200  if self._pager == "cat":
201  self._pager = ""
202 
203  self._prompt = prompt
204 
206  self._quit_if_one_screen = quit_if_one_screen
207 
208  self._pager_process = None
209 
210  self._original_stdout_fd = None
211 
212  self._original_stderr_fd = None
213 
214  self._original_stdout = None
215 
216  self._original_stderr = None
217 
218  self._original_stdout_isatty = None
219 
220  self._original_stderr_isatty = None
221 
222  def __enter__(self):
223  """ entering context """
224  if not sys.stdout.isatty() or self._pager == "":
225  return
226 
227  # save old sys.__stderr__ and sys.__stdout__ objects
228  self._original_stderr = sys.__stderr__
229  self._original_stderr = sys.__stdout__
230  try:
231  # and duplicate the current output file descriptors
232  self._original_stdout_fd = os.dup(sys.stdout.fileno())
233  self._original_stderr_fd = os.dup(sys.stderr.fileno())
234  except AttributeError:
235  # jupyter notebook stdout/stderr objects don't have a fileno so
236  # don't support paging
237  return
238 
239  # This is a bit annoying: Usually in python the sys.__stdout__ and
240  # sys.__stderr__ objects point to the original stdout/stderr on program start.
241  #
242  # However we modify the file descriptors directly so these objects will
243  # also be redirected automatically. The documentation for
244  # sys.__stdout__ says it "could be useful to print to the actual
245  # standard stream no matter if the sys.std* object has been
246  # redirected". Also, querying the terminal size looks at
247  # sys.__stdout__ which would no longer be pointing to a tty.
248  #
249  # So lets provide objects pointing to the original file descriptors so
250  # that they behave as expected, i.e. as if we would only have
251  # redirected sys.stdout and sys.stderr ...
252  sys.__stdout__ = io.TextIOWrapper(os.fdopen(self._original_stdout_fd, "wb"))
253  sys.__stderr__ = io.TextIOWrapper(os.fdopen(self._original_stderr_fd, "wb"))
254 
255  # also monkey patch the isatty() function of stdout to actually keep
256  # returning True even if we moved the file descriptor
257  self._original_stdout_isatty = sys.stdout.isatty
258  sys.stdout.isatty = lambda: True
259  self._original_stderr_isatty = sys.stderr.isatty
260  sys.stderr.isatty = lambda: True
261 
262  # fine, everything is saved, start the pager
263  pager_cmd = [self._pager]
264  if self._pager == "less":
265  if self._prompt is None:
266  self._prompt = '' # same as default prompt
267  self._prompt += ' (press h for help or q to quit)'
268  pager_cmd += ['-R', '-Ps' + self._prompt.strip()]
269  if self._quit_if_one_screen:
270  pager_cmd += ['-F', '-X']
271  self._pager_process = subprocess.Popen(pager_cmd + ["-"], restore_signals=True,
272  stdin=subprocess.PIPE)
273  # and attach stdout to the pager stdin
274  pipe_fd = self._pager_process.stdin.fileno()
275  # and if stderr was a tty do the same for stderr
276  os.dup2(pipe_fd, sys.stdout.fileno())
277  if sys.stderr.isatty():
278  os.dup2(pipe_fd, sys.stderr.fileno())
279 
280  def __exit__(self, exc_type, exc_val, exc_tb):
281  """ exiting context """
282  # no pager, nothing to do
283  if self._pager_process is None:
284  return
285 
286  # otherwise let's try to flush whatever is left
287  try:
288  sys.stdout.flush()
289  except BrokenPipeError:
290  # apparently pager died before we could flush ... so let's move the
291  # remaining output to /dev/null and flush whatever is left
292  devnull = os.open(os.devnull, os.O_WRONLY)
293  os.dup2(devnull, sys.stdout.fileno())
294  sys.stdout.flush()
295 
296  # restore output
297  os.dup2(self._original_stdout_fd, sys.stdout.fileno())
298  os.dup2(self._original_stderr_fd, sys.stderr.fileno())
299  # and the original __stdout__/__stderr__ object just in case. Will also
300  # close the copied file descriptors
301  sys.__stderr__ = self._original_stderr
302  sys.__stdout__ = self._original_stdout
303  # and clean up our monkey patch of isatty
304  sys.stdout.isatty = self._original_stdout_isatty
305  sys.stderr.isatty = self._original_stderr_isatty
306 
307  # wait for pager
308  self._pager_process.communicate()
309 
310  # and if we exited due to broken pipe ... then ignore it
311  return exc_type == BrokenPipeError
312 
313 
314 class InputEditor():
315  """
316  Class to get user input via opening a temporary file in a text editor.
317 
318  It is an alternative to the python commands ``input()`` or ``sys.stdin.readlines`` and is
319  similar to the behaviour of ``git commit`` for editing commit messages. By using an editor
320  instead of the command line, the user is motivated to give expressive multi-line input,
321  leveraging the full text editing capabilities of his editor. This function cannot be used for
322  example in interactive terminal scripts, whenever detailed user input is required.
323 
324  Heavily inspired by the code in this blog post:
325  https://chase-seibert.github.io/blog/2012/10/31/python-fork-exec-vim-raw-input.html
326 
327  Parameters:
328  editor_command: Editor to open for user input. If ``None``, get
329  default editor from environment variables. It should be the name
330  of a shell executable and can contain command line arguments.
331  initial_content: Initial string to insert into the temporary file that
332  is opened for user input. Can be used for default input or to
333  insert comment lines with instructions.
334  commentlines_start_with: Optionally define string with which comment
335  lines start
336  """
337 
338  def __init__(self,
339  editor_command: str = None,
340  initial_content: str = None,
341  commentlines_start_with: str = "#"):
342  """Constructor"""
343  # Use provided editor command or editor command from environment variables
344  editor_command_string = editor_command or self._default_environment_editor()
345 
346  self.editor_command_list = shlex.split(editor_command_string, posix=True)
347  # check if editor executable exists and if not, prompt for new editor command
348  if shutil.which(self.editor_command_list[0]) is None:
349  self._prompt_for_editor()
350 
351 
352  self.initial_content = initial_content
353 
354  self.comment_string = commentlines_start_with
355 
356  def input(self):
357  """
358  Get user input via editing a temporary file in an editor. If opening the editor fails, fall
359  back to command line input
360  """
361  try:
362  with tempfile.NamedTemporaryFile(mode='r+') as tmpfile:
363  if self.initial_content:
364  tmpfile.write(self.initial_content)
365  tmpfile.flush()
366  subprocess.check_call(self.editor_command_list + [tmpfile.name])
367  tmpfile.seek(0)
368  input_string = tmpfile.read().strip()
369  input_string = self._remove_comment_lines(input_string)
370 
371  except (FileNotFoundError, subprocess.CalledProcessError):
372  # If editor not found or other problem with subprocess call, fall back to terminal input
373  print(f"Could not open {self.get_editor_command()}.")
374  print("Try to set your $VISUAL or $EDITOR environment variables properly.\n")
375  sys.exit(1)
376 
377  return input_string
378 
379  def get_editor_command(self):
380  """Get editor shell command string used for user input."""
381  # Construct string from list which holds the executable and args
382  return " ".join(self.editor_command_list)
383 
384  def _remove_comment_lines(self, a_string):
385  """
386  Remove lines from string that start with a comment character and return modified version.
387  """
388  if self.comment_string is not None:
389  a_string = "\n".join(
390  [line for line in a_string.splitlines()
391  if not line.startswith(self.comment_string)]).strip()
392  return a_string
393 
394  def _default_environment_editor(self):
395  """
396  Return editor from environment variables. If not existing, return vi(m) as default.
397  """
398  editor_command = (os.environ.get('VISUAL') or os.environ.get('EDITOR') or
399  'vim' or 'vi')
400  return editor_command
401 
402  def _prompt_for_editor(self):
403  """
404  Ask user to provide editor command
405  """
406  # Prompt user for editor command until one is found which exists in PATH
407  while True:
408  new_editor_command_string = input("Use editor: ")
409  new_editor_command_list = shlex.split(new_editor_command_string, posix=True)
410 
411  if shutil.which(new_editor_command_list[0]) is not None:
412  self.editor_command_list = new_editor_command_list
413  return self.editor_command_list
414 
415  else:
416  print(f"Editor '{self.editor_command_list[0]}' not found in $PATH.")
417 
418 # @endcond