import contextlib
import io
import os
import shlex
import shutil
import sys
import tempfile
import typing as t
from types import TracebackType
from . import formatting
from . import termui
from . import utils
from ._compat import _find_binary_reader
if t.TYPE_CHECKING:
from .core import BaseCommand
class EchoingStdin:
def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
self._input = input
self._output = output
self._paused = False
def __getattr__(self, x: str) -> t.Any:
return getattr(self._input, x)
def _echo(self, rv: bytes) -> bytes:
if not self._paused:
self._output.write(rv)
return rv
def read(self, n: int = -1) -> bytes:
return self._echo(self._input.read(n))
def read1(self, n: int = -1) -> bytes:
return self._echo(self._input.read1(n)) # type: ignore
def readline(self, n: int = -1) -> bytes:
return self._echo(self._input.readline(n))
def readlines(self) -> t.List[bytes]:
return [self._echo(x) for x in self._input.readlines()]
def __iter__(self) -> t.Iterator[bytes]:
return iter(self._echo(x) for x in self._input)
def __repr__(self) -> str:
return repr(self._input)
@contextlib.contextmanager
def _pause_echo(stream: t.Optional[EchoingStdin]) -> t.Iterator[None]:
if stream is None:
yield
else:
stream._paused = True
yield
stream._paused = False
class _NamedTextIOWrapper(io.TextIOWrapper):
def __init__(
self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any
) -> None:
super().__init__(buffer, **kwargs)
self._name = name
self._mode = mode
@property
def name(self) -> str:
return self._name
@property
def mode(self) -> str:
return self._mode
def make_input_stream(
input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]], charset: str
) -> t.BinaryIO:
# Is already an input stream.
if hasattr(input, "read"):
rv = _find_binary_reader(t.cast(t.IO[t.Any], input))
if rv is not None:
return rv
raise TypeError("Could not find binary reader for input stream.")
if input is None:
input = b""
elif isinstance(input, str):
input = input.encode(charset)
return io.BytesIO(input)
class Result:
"""Holds the captured result of an invoked CLI script."""
def __init__(
self,
runner: "CliRunner",
stdout_bytes: bytes,
stderr_bytes: t.Optional[bytes],
return_value: t.Any,
exit_code: int,
exception: t.Optional[BaseException],
exc_info: t.Optional[
t.Tuple[t.Type[BaseException], BaseException, TracebackType]
] = None,
):
#: The runner that created the result
self.runner = runner
#: The standard output as bytes.
self.stdout_bytes = stdout_bytes
#: The standard error as bytes, or None if not available
self.stderr_bytes = stderr_bytes
#: The value returned from the invoked command.
#:
#: .. versionadded:: 8.0
self.return_value = return_value
#: The exit code as integer.
self.exit_code = exit_code
#: The exception that happened if one did.
self.exception = exception
#: The traceback
self.exc_info = exc_info
@property
def output(self) -> str:
"""The (standard) output as unicode string."""
return self.stdout
@property
def stdout(self) -> str:
"""The standard output as unicode string."""
return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
"\r\n", "\n"
)
@property
def stderr(self) -> str:
"""The standard error as unicode string."""
if self.stderr_bytes is None:
raise ValueError("stderr not separately captured")
return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
"\r\n", "\n"
)
def __repr__(self) -> str:
exc_str = repr(self.exception) if self.exception else "okay"
return f"<{type(self).__name__} {exc_str}>"
class CliRunner:
"""The CLI runner provides functionality to invoke a Click command line
script for unittesting purposes in a isolated environment. This only
works in single-threaded systems without any concurrency as it changes the
global interpreter state.
:param charset: the character set for the input and output data.
:param env: a dictionary with environment variables for overriding.
:param echo_stdin: if this is set to `True`, then reading from stdin writes
to stdout. This is useful for showing examples in
some circumstances. Note that regular prompts
will automatically echo the input.
:param mix_stderr: if this is set to `False`, then stdout and stderr are
preserved as independent streams. This is useful for
Unix-philosophy apps that have predictable stdout and
noisy stderr, such that each may be measured
independently
"""
def __init__(
self,
charset: str = "utf-8",
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
echo_stdin: bool = False,
mix_stderr: bool = True,
) -> None:
self.charset = charset
self.env: t.Mapping[str, t.Optional[str]] = env or {}
self.echo_stdin = echo_stdin
self.mix_stderr = mix_stderr
def get_default_prog_name(self, cli: "BaseCommand") -> str:
"""Given a command object it will return the default program name
for it. The default is the `name` attribute or ``"root"`` if not
set.
"""
return cli.name or "root"
def make_env(
self, overrides: t.Optional[t.Mapping[str, t.Optional[str]]] = None
) -> t.Mapping[str, t.Optional[str]]:
"""Returns the environment overrides for invoking a script."""
rv = dict(self.env)
if overrides:
rv.update(overrides)
return rv
@contextlib.contextmanager
def isolation(
self,
input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None,
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
color: bool = False,
) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]:
"""A context manager that sets up the isolation for invoking of a
command line tool. This sets up stdin with the given input data
and `os.environ` with the overrides from the given dictionary.
This also rebinds some internals in Click to be mocked (like the
prompt functionality).
This is automatically done in the :meth:`invoke` method.
:param input: the input stream to put into sys.stdin.
:param env: the environment overrides as dictionary.
:param color: whether the output should contain color codes. The
application can still override this explicitly.
.. versionchanged:: 8.0
``stderr`` is opened with ``errors="backslashreplace"``
instead of the default ``"strict"``.
.. versionchanged:: 4.0
Added the ``color`` parameter.
"""
bytes_input = make_input_stream(input, self.charset)
echo_input = None
old_stdin = sys.stdin
old_stdout = sys.stdout
old_stderr = sys.stderr
old_forced_width = formatting.FORCED_WIDTH
formatting.FORCED_WIDTH = 80
env = self.make_env(env)
bytes_output = io.BytesIO()
if self.echo_stdin:
bytes_input = echo_input = t.cast(
t.BinaryIO, EchoingStdin(bytes_input, bytes_output)
)
sys.stdin = text_input = _NamedTextIOWrapper(
bytes_input, encoding=self.charset, name="