Belle II Software  release-05-02-19
terminal_utils.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 # @cond dont_want_no_doxygen_warnings_this_is_sphinxed
5 
6 """
7 terminal_utils - Helper functions for input from/output to a terminal
8 ---------------------------------------------------------------------
9 
10 This module contains modules useful to deal with output on the terminal:
11 
12 * `ANSIColors`, a class to easily add color output to the terminal
13 * `Pager`, a class to provide paginated output with less similar to other
14  popular tools like ``git diff``
15 * `InputEditor`, a class to open an editor window when requesting longer inputs
16  from users, similar to ``git commit``
17 """
18 
19 import sys
20 import subprocess
21 import os
22 import io
23 import shutil
24 import tempfile
25 import shlex
26 import enum
27 
28 
29 class ANSIColors(enum.Enum):
30  """
31  Simple class to handle color output to the terminal.
32 
33  This class allows to very easily add color output to the terminal
34 
35  >>> from terminal_utils import ANSIColors as ac
36  >>> print(f"{ac.fg('red')}Some text in {ac.color(underline=True)}RED{ac.reset()}")
37 
38  The basic colors can be specified by name (case insensitive) or by enum value.
39  Custom colors can be supplied using hex notation like ``#rgb`` or ``#rrggbb``
40  (Hint: ``matplotlib.colors.to_hex`` might be useful here). As an example to
41  use the viridis colormap to color the output on the terminal::
42 
43  from matplotlib import cm
44  from matplotlib.colors import to_hex
45  from terminal_utils import ANSIColors as ac
46 
47  # sample the viridis colormap at 16 points
48  for i in range(16):
49  # convert i to be in [0..1] and get the hex color
50  color = to_hex(cm.viridis(i/15))
51  # and print the hex color in the correct color
52  print(f"{i}. {ac.fg(color)}{color}{ac.reset()}")
53 
54 
55  If the output is not to a terminal color output will be disabled and nothing
56  will be added to the output, for example when redirecting the output to a
57  logfile.
58 
59  .. sphinx bug, classes inheriting from enum don't show class methods:
60  https://github.com/sphinx-doc/sphinx/issues/6857. So until that is fixed
61  we need to add them manually here
62 
63  .. automethod:: color
64  .. automethod:: convert_color
65  .. automethod:: fg
66  .. automethod:: bg
67  .. automethod:: reset
68  .. automethod:: supported
69  """
70  BLACK = 0
71  RED = 1
72  GREEN = 2
73  YELLOW = 3
74  BLUE = 4
75  MAGENTA = 5
76  CYAN = 6
77  WHITE = 7
78 
79  @staticmethod
80  def supported():
81  """
82  Check whether the output is a terminal.
83 
84  If this is False, the methods `color`, `fg`, `bg` and `reset` will only
85  return an empty string as color output will be disabled
86  """
87  return sys.stdout.isatty()
88 
89  @classmethod
90  def convert_color(cls, color):
91  """Convert a color to the necessary ansi code. The argument can either bei
92 
93  * an integer corresponding to the ansi color (see the enum values of this class)
94  * the name (case insensitive) of one of the enum values of this class
95  * a hex color code of the form ``#rgb`` or ``#rrggbb``
96 
97  Raises:
98  KeyError: if the argument is a string not matching to one of the known colors
99  """
100  if isinstance(color, str):
101  if color[0] == '#':
102  if len(color) == 4:
103  r, g, b = (int(e, 16)*17 for e in color[1:])
104  else:
105  r, g, b = (int(color[i:i+2], 16) for i in [1, 3, 5])
106  return f"2;{r};{g};{b}"
107  try:
108  color = cls[color.upper()].value
109  except KeyError as e:
110  raise KeyError(f"Unknown color: '{color}'") from e
111  return f"5;{color}"
112 
113  @classmethod
114  def color(cls, foreground=None, background=None, bold=False, underline=False, inverted=False):
115  """
116  Change terminal colors to the given foreground/background colors and attributes.
117 
118  This will return a string to be printed to change the color on the terminal.
119  To revert to default print the output of `reset()`
120 
121  Parameters:
122  foreground (int or str): foreground color to use, can be any value accepted by `convert_color`
123  If None is given the current color will not be changed.
124  background (int or str): background color to use, can be any value accepted by `convert_color`.
125  If None is given the current color will not be changed.
126  bold (bool): Use bold font
127  underline (bool): Underline the text
128  inverted (bool): Flip background and foreground color
129  """
130  if not cls.supported():
131  return ""
132 
133  codes = []
134  if foreground is not None:
135  codes.append(f"38;{cls.convert_color(foreground)}")
136  if background is not None:
137  codes.append(f"48;{cls.convert_color(background)}")
138  if bold:
139  codes.append(1)
140  if underline:
141  codes.append(4)
142  if inverted:
143  codes.append(7)
144  if not codes:
145  return ""
146  return '\x1b[{}m'.format(";".join(map(str, codes)))
147 
148  @classmethod
149  def fg(cls, color):
150  """Shorthand for `color(foreground=color) <color>`"""
151  return cls.color(foreground=color)
152 
153  @classmethod
154  def bg(cls, color):
155  """Shorthand for `color(background=color) <color>`"""
156  return cls.color(background=color)
157 
158  @classmethod
159  def reset(cls):
160  """Reset colors to default"""
161  return '\x1b[0m' if cls.supported() else ''
162 
163 
164 class Pager(object):
165  """
166  Context manager providing page-wise output using ``less``, similar to how
167  git handles long output of for example ``git diff``. Paging will only be
168  active if the output is to a terminal and not piped into a file or to a
169  different program.
170 
171  Warning:
172  To be able to see `basf2` log messages like `B2INFO() <basf2.B2INFO>`
173  on the paged output you have to set
174  `basf2.logging.enable_python_logging = True
175  <basf2.LogPythonInterface.enable_python_logging>`
176 
177  .. versionchanged:: release-03-00-00
178  the pager no longer waits until all output is complete but can
179  incrementally show output. It can also show output generated in C++
180 
181  You can set the environment variable ``$PAGER`` to an empty string or to
182  ``cat`` to disable paging or to a different program (for example ``more``)
183  which should retrieve the output and display it.
184 
185  >>> with Pager():
186  >>> for i in range(30):
187  >>> print("This is an example on how to use the pager.")
188 
189  Parameters:
190  prompt (str): a string argument allows overriding the description
191  provided by ``less``. Special characters may need escaping.
192  Will only be shown if paging is used and the pager is actually ``less``.
193  quit_if_one_screen (bool): indicating whether the Pager should quit
194  automatically if the content fits on one screen. This implies that
195  the content stays visible on pager exit. True is similar to the
196  behavior of :program:`git diff`, False is similar to :program:`git
197  --help`
198  """
199 
200  def __init__(self, prompt=None, quit_if_one_screen=False):
201  """ constructor just remembering the arguments """
202 
203  self._pager = os.environ.get("PAGER", "less")
204  # treat "cat" as no pager at all
205  if self._pager == "cat":
206  self._pager = ""
207 
208  self._prompt = prompt
209 
211  self._quit_if_one_screen = quit_if_one_screen
212 
213  self._pager_process = None
214 
215  self._original_stdout_fd = None
216 
217  self._original_stderr_fd = None
218 
219  self._original_stdout = None
220 
221  self._original_stderr = None
222 
223  self._original_stdout_isatty = None
224 
225  self._original_stderr_isatty = None
226 
227  def __enter__(self):
228  """ entering context """
229  if not sys.stdout.isatty() or self._pager == "":
230  return
231 
232  # save old sys.__stderr__ and sys.__stdout__ objects
233  self._original_stderr = sys.__stderr__
234  self._original_stderr = sys.__stdout__
235  try:
236  # and duplicate the current output file descriptors
237  self._original_stdout_fd = os.dup(sys.stdout.fileno())
238  self._original_stderr_fd = os.dup(sys.stderr.fileno())
239  except AttributeError:
240  # jupyter notebook stdout/stderr objects don't have a fileno so
241  # don't support paging
242  return
243 
244  # This is a bit annoying: Usually in python the sys.__stdout__ and
245  # sys.__stderr__ objects point to the original stdout/stderr on program start.
246  #
247  # However we modify the file descriptors directly so these objects will
248  # also be redirected automatically. The documentation for
249  # sys.__stdout__ says it "could be useful to print to the actual
250  # standard stream no matter if the sys.std* object has been
251  # redirected". Also, querying the terminal size looks at
252  # sys.__stdout__ which would no longer be pointing to a tty.
253  #
254  # So lets provide objects pointing to the original file descriptors so
255  # that they behave as expected, i.e. as if we would only have
256  # redirected sys.stdout and sys.stderr ...
257  sys.__stdout__ = io.TextIOWrapper(os.fdopen(self._original_stdout_fd, "wb"))
258  sys.__stderr__ = io.TextIOWrapper(os.fdopen(self._original_stderr_fd, "wb"))
259 
260  # also monkey patch the isatty() function of stdout to actually keep
261  # returning True even if we moved the file descriptor
262  self._original_stdout_isatty = sys.stdout.isatty
263  sys.stdout.isatty = lambda: True
264  self._original_stderr_isatty = sys.stderr.isatty
265  sys.stderr.isatty = lambda: True
266 
267  # fine, everything is saved, start the pager
268  pager_cmd = [self._pager]
269  if self._pager == "less":
270  if self._prompt is None:
271  self._prompt = '' # same as default prompt
272  self._prompt += ' (press h for help or q to quit)'
273  pager_cmd += ['-R', '-Ps' + self._prompt.strip()]
274  if self._quit_if_one_screen:
275  pager_cmd += ['-F', '-X']
276  self._pager_process = subprocess.Popen(pager_cmd + ["-"], restore_signals=True,
277  stdin=subprocess.PIPE)
278  # and attach stdout to the pager stdin
279  pipe_fd = self._pager_process.stdin.fileno()
280  # and if stderr was a tty do the same for stderr
281  os.dup2(pipe_fd, sys.stdout.fileno())
282  if sys.stderr.isatty():
283  os.dup2(pipe_fd, sys.stderr.fileno())
284 
285  def __exit__(self, exc_type, exc_val, exc_tb):
286  """ exiting context """
287  # no pager, nothing to do
288  if self._pager_process is None:
289  return
290 
291  # otherwise let's try to flush whatever is left
292  try:
293  sys.stdout.flush()
294  except BrokenPipeError:
295  # apparently pager died before we could flush ... so let's move the
296  # remaining output to /dev/null and flush whatever is left
297  devnull = os.open(os.devnull, os.O_WRONLY)
298  os.dup2(devnull, sys.stdout.fileno())
299  sys.stdout.flush()
300 
301  # restore output
302  os.dup2(self._original_stdout_fd, sys.stdout.fileno())
303  os.dup2(self._original_stderr_fd, sys.stderr.fileno())
304  # and the original __stdout__/__stderr__ object just in case. Will also
305  # close the copied file descriptors
306  sys.__stderr__ = self._original_stderr
307  sys.__stdout__ = self._original_stdout
308  # and clean up our monkey patch of isatty
309  sys.stdout.isatty = self._original_stdout_isatty
310  sys.stderr.isatty = self._original_stderr_isatty
311 
312  # wait for pager
313  self._pager_process.communicate()
314 
315  # and if we exited due to broken pipe ... then ignore it
316  return exc_type == BrokenPipeError
317 
318 
319 class InputEditor():
320  """
321  Class to get user input via opening a temporary file in a text editor.
322 
323  It is an alternative to the python commands ``input()`` or ``sys.stdin.readlines`` and is
324  similar to the behaviour of ``git commit`` for editing commit messages. By using an editor
325  instead of the command line, the user is motivated to give expressive multi-line input,
326  leveraging the full text editing capabilities of his editor. This function cannot be used for
327  example in interactive terminal scripts, whenever detailed user input is required.
328 
329  Heavily inspired by the code in this blog post:
330  https://chase-seibert.github.io/blog/2012/10/31/python-fork-exec-vim-raw-input.html
331 
332  Parameters:
333  editor_command: Editor to open for user input. If ``None``, get
334  default editor from environment variables. It should be the name
335  of a shell executable and can contain command line arguments.
336  initial_content: Initial string to insert into the temporary file that
337  is opened for user input. Can be used for default input or to
338  insert comment lines with instructions.
339  commentlines_start_with: Optionally define string with which comment
340  lines start
341  """
342 
343  def __init__(self,
344  editor_command: str = None,
345  initial_content: str = None,
346  commentlines_start_with: str = "#"):
347  """Constructor"""
348  # Use provided editor command or editor command from environment variables
349  editor_command_string = editor_command or self._default_environment_editor()
350 
351  self.editor_command_list = shlex.split(editor_command_string, posix=True)
352  # check if editor executable exists and if not, prompt for new editor command
353  if shutil.which(self.editor_command_list[0]) is None:
354  self._prompt_for_editor()
355 
356 
357  self.initial_content = initial_content
358 
359  self.comment_string = commentlines_start_with
360 
361  def input(self):
362  """
363  Get user input via editing a temporary file in an editor. If opening the editor fails, fall
364  back to command line input
365  """
366  try:
367  with tempfile.NamedTemporaryFile(mode='r+') as tmpfile:
368  if self.initial_content:
369  tmpfile.write(self.initial_content)
370  tmpfile.flush()
371  subprocess.check_call(self.editor_command_list + [tmpfile.name])
372  tmpfile.seek(0)
373  input_string = tmpfile.read().strip()
374  input_string = self._remove_comment_lines(input_string)
375 
376  except (FileNotFoundError, subprocess.CalledProcessError):
377  # If editor not found or other problem with subprocess call, fall back to terminal input
378  print(f"Could not open {self.get_editor_command()}.")
379  print("Try to set your $VISUAL or $EDITOR environment variables properly.\n")
380  sys.exit(1)
381 
382  return input_string
383 
384  def get_editor_command(self):
385  """Get editor shell command string used for user input."""
386  # Construct string from list which holds the executable and args
387  return " ".join(self.editor_command_list)
388 
389  def _remove_comment_lines(self, a_string):
390  """
391  Remove lines from string that start with a comment character and return modified version.
392  """
393  if self.comment_string is not None:
394  a_string = "\n".join(
395  [line for line in a_string.splitlines()
396  if not line.startswith(self.comment_string)]).strip()
397  return a_string
398 
399  def _default_environment_editor(self):
400  """
401  Return editor from environment variables. If not existing, return vi(m) as default.
402  """
403  editor_command = (os.environ.get('VISUAL') or os.environ.get('EDITOR') or
404  'vim' or 'vi')
405  return editor_command
406 
407  def _prompt_for_editor(self):
408  """
409  Ask user to provide editor command
410  """
411  # Prompt user for editor command until one is found which exists in PATH
412  while True:
413  new_editor_command_string = input("Use editor: ")
414  new_editor_command_list = shlex.split(new_editor_command_string, posix=True)
415 
416  if shutil.which(new_editor_command_list[0]) is not None:
417  self.editor_command_list = new_editor_command_list
418  return self.editor_command_list
419 
420  else:
421  print(f"Editor '{self.editor_command_list[0]}' not found in $PATH.")
422 
423 # @endcond
rundb.RunDB.__init__
def __init__(self, apikey=None, username=None)
Definition: rundb.py:30