import dataclasses
import functools
import inspect
import os
import sys
import warnings
from collections import defaultdict
from collections import deque
from contextlib import suppress
from pathlib import Path
from types import TracebackType
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import Generator
from typing import Generic
from typing import Iterable
from typing import Iterator
from typing import List
from typing import MutableMapping
from typing import NoReturn
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
import _pytest
from _pytest import nodes
from _pytest._code import getfslineno
from _pytest._code.code import FormattedExcinfo
from _pytest._code.code import TerminalRepr
from _pytest._io import TerminalWriter
from _pytest.compat import _format_args
from _pytest.compat import _PytestWrapper
from _pytest.compat import assert_never
from _pytest.compat import final
from _pytest.compat import get_real_func
from _pytest.compat import get_real_method
from _pytest.compat import getfuncargnames
from _pytest.compat import getimfunc
from _pytest.compat import getlocation
from _pytest.compat import is_generator
from _pytest.compat import NOTSET
from _pytest.compat import NotSetType
from _pytest.compat import overload
from _pytest.compat import safe_getattr
from _pytest.config import _PluggyPlugin
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.deprecated import check_ispytest
from _pytest.deprecated import YIELD_FIXTURE
from _pytest.mark import Mark
from _pytest.mark import ParameterSet
from _pytest.mark.structures import MarkDecorator
from _pytest.outcomes import fail
from _pytest.outcomes import skip
from _pytest.outcomes import TEST_OUTCOME
from _pytest.pathlib import absolutepath
from _pytest.pathlib import bestrelpath
from _pytest.scope import HIGH_SCOPES
from _pytest.scope import Scope
from _pytest.stash import StashKey
if TYPE_CHECKING:
from typing import Deque
from _pytest.scope import _ScopeName
from _pytest.main import Session
from _pytest.python import CallSpec2
from _pytest.python import Metafunc
# The value of the fixture -- return/yield of the fixture function (type variable).
FixtureValue = TypeVar("FixtureValue")
# The type of the fixture function (type variable).
FixtureFunction = TypeVar("FixtureFunction", bound=Callable[..., object])
# The type of a fixture function (type alias generic in fixture value).
_FixtureFunc = Union[
Callable[..., FixtureValue], Callable[..., Generator[FixtureValue, None, None]]
]
# The type of FixtureDef.cached_result (type alias generic in fixture value).
_FixtureCachedResult = Union[
Tuple[
# The result.
FixtureValue,
# Cache key.
object,
None,
],
Tuple[
None,
# Cache key.
object,
# Exc info if raised.
Tuple[Type[BaseException], BaseException, TracebackType],
],
]
@dataclasses.dataclass(frozen=True)
class PseudoFixtureDef(Generic[FixtureValue]):
cached_result: "_FixtureCachedResult[FixtureValue]"
_scope: Scope
def pytest_sessionstart(session: "Session") -> None:
session._fixturemanager = FixtureManager(session)
def get_scope_package(
node: nodes.Item,
fixturedef: "FixtureDef[object]",
) -> Optional[Union[nodes.Item, nodes.Collector]]:
from _pytest.python import Package
current: Optional[Union[nodes.Item, nodes.Collector]] = node
fixture_package_name = "{}/{}".format(fixturedef.baseid, "__init__.py")
while current and (
not isinstance(current, Package) or fixture_package_name != current.nodeid
):
current = current.parent # type: ignore[assignment]
if current is None:
return node.session
return current
def get_scope_node(
node: nodes.Node, scope: Scope
) -> Optional[Union[nodes.Item, nodes.Collector]]:
import _pytest.python
if scope is Scope.Function:
return node.getparent(nodes.Item)
elif scope is Scope.Class:
return node.getparent(_pytest.python.Class)
elif scope is Scope.Module:
return node.getparent(_pytest.python.Module)
elif scope is Scope.Package:
return node.getparent(_pytest.python.Package)
elif scope is Scope.Session:
return node.getparent(_pytest.main.Session)
else:
assert_never(scope)
# Used for storing artificial fixturedefs for direct parametrization.
name2pseudofixturedef_key = StashKey[Dict[str, "FixtureDef[Any]"]]()
def add_funcarg_pseudo_fixture_def(
collector: nodes.Collector, metafunc: "Metafunc", fixturemanager: "FixtureManager"
) -> None:
# This function will transform all collected calls to functions
# if they use direct funcargs (i.e. direct parametrization)
# because we want later test execution to be able to rely on
# an existing FixtureDef structure for all arguments.
# XXX we can probably avoid this algorithm if we modify CallSpec2
# to directly care for creating the fixturedefs within its methods.
if not metafunc._calls[0].funcargs:
# This function call does not have direct parametrization.
return
# Collect funcargs of all callspecs into a list of values.
arg2params: Dict[str, List[object]] = {}
arg2scope: Dict[str, Scope] = {}
for callspec in metafunc._calls:
for argname, argvalue in callspec.funcargs.items():
assert argname not in callspec.params
callspec.params[argname] = argvalue
arg2params_list = arg2params.setdefault(argname, [])
callspec.indices[argname] = len(arg2params_list)
arg2params_list.append(argvalue)
if argname not in arg2scope:
scope = callspec._arg2scope.get(argname, Scope.Function)
arg2scope[argname] = scope
callspec.funcargs.clear()
# Register artificial FixtureDef's so that later at test execution
# time we can rely on a proper FixtureDef to exist for fixture setup.
arg2fixturedefs = metafunc._arg2fixturedefs
for argname, valuelist in arg2params.items():
# If we have a scope that is higher than function, we need
# to make sure we only ever create an according fixturedef on
# a per-scope basis. We thus store and cache the fixturedef on the
# node related to the scope.
scope = arg2scope[argname]
node = None
if scope is not Scope.Function:
node = get_scope_node(collector, scope)
if node is None:
assert scope is Scope.Class and isinstance(
collector, _pytest.python.Module
)
# Use module-level collector for class-scope (for now).
node = collector
if node is None:
name2pseudofixturedef = None
else:
default: Dict[str, FixtureDef[Any]] = {}
name2pseudofixturedef = node.stash.setdefault(
name2pseudofixturedef_key, default
)
if name2pseudofixturedef is not None and argname in name2pseudofixturedef:
arg2fixturedefs[argname] = [name2pseudofixturedef[argname]]
else:
fixturedef = FixtureDef(
fixturemanager=fixturemanager,
baseid="",
argname=argname,
func=get_direct_param_fixture_func,
scope=arg2scope[argname],
params=valuelist,
unittest=False,
ids=None,
)
arg2fixturedefs[argname] = [fixturedef]
if name2pseudofixturedef is not None:
name2pseudofixturedef[argname] = fixturedef
def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]:
"""Return fixturemarker or None if it doesn't exist or raised
exceptions."""
return cast(
Optional[FixtureFunctionMarker],
safe_getattr(obj, "_pytestfixturefunction", None),
)
# Parametrized fixture key, helper alias for code below.
_Key = Tuple[object, ...]
def get_parametrized_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[_Key]:
"""Return list of keys for all parametrized arguments which match
the specified scope."""
assert scope is not Scope.Function
try:
callspec = item.callspec # type: ignore[attr-defined]
except AttributeError:
pass
else:
cs: CallSpec2 = callspec
# cs.indices.items() is random order of argnames. Need to
# sort this so that different calls to
# get_parametrized_fixture_keys will be deterministic.
for argname, param_index in sorted(cs.indices.items()):
if cs._arg2scope[argname] != scope:
continue
if scope is Scope.Session:
key: _Key = (argname, param_index)
elif scope is Scope.Package:
key = (argname, param_index, item.path.parent)
elif scope is Scope.Module:
key = (argname, param_index, item.path)
elif scope is Scope.Class:
item_cls = item.cls # type: ignore[attr-defined]
key = (argname, param_index, item.path, item_cls)
else:
assert_never(scope)
yield key
# Algorithm for sorting on a per-parametrized resource setup basis.
# It is called for Session scope first and performs sorting
# down to the lower scopes such as to minimize number of "high scope"
# setups and teardowns.
def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]:
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]] = {}
items_by_argkey: Dict[Scope, Dict[_Key, Deque[nodes.Item]]] = {}
for scope in HIGH_SCOPES:
d: Dict[nodes.Item, Dict[_Key, None]] = {}
argkeys_cache[scope] = d
item_d: Dict[_Key, Deque[nodes.Item]] = defaultdict(deque)
items_by_argkey[scope] = item_d
for item in items:
keys = dict.fromkeys(get_parametrized_fixture_keys(item, scope), None)
if keys:
d[item] = keys
for key in keys:
item_d[key].append(item)
items_dict = dict.fromkeys(items, None)
return list(
reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, Scope.Session)
)
def fix_cache_order(
item: nodes.Item,
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]],
items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]],
) -> None:
for scope in HIGH_SCOPES:
for key in argkeys_cache[scope].get(item, []):
items_by_argkey[scope][key].appendleft(item)
def reorder_items_atscope(
items: Dict[nodes.Item, None],
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]],
items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]],
scope: Scope,
) -> Dict[nodes.Item, None]:
if scope is Scope.Function or len(items) < 3:
return items
ignore: Set[Optional[_Key]] = set()
items_deque = deque(items)
items_done: Dict[nodes.Item, None] = {}
scoped_items_by_argkey = items_by_argkey[scope]
scoped_argkeys_cache = argkeys_cache[scope]
while items_deque:
no_argkey_group: Dict[nodes.Item, None] = {}
slicing_argkey = None
while items_deque:
item = items_deque.popleft()
if item in items_done or item in no_argkey_group:
continue
argkeys = dict.fromkeys(
(k for k in scoped_argkeys_cache.get(item, []) if k not in ignore), None
)
if not argkeys:
no_argkey_group[item] = None
else:
slicing_argkey, _ = argkeys.popitem()
# We don't have to remove relevant items from later in the
# deque because they'll just be ignored.
matching_items = [
i for i in scoped_items_by_argkey[slicing_argkey] if i in items
]
for i in reversed(matching_items):
fix_cache_order(i, argkeys_cache, items_by_argkey)
items_deque.appendleft(i)
break
if no_argkey_group:
no_argkey_group = reorder_items_atscope(
no_argkey_group, argkeys_cache, items_by_argkey, scope.next_lower()
)
for item in no_argkey_group:
items_done[item] = None
ignore.add(slicing_argkey)
return items_done
def get_direct_param_fixture_func(request: "FixtureRequest") -> Any:
return request.param
@dataclasses.dataclass
class FuncFixtureInfo:
__slots__ = ("argnames", "initialnames", "names_closure", "name2fixturedefs")
# Original function argument names.
argnames: Tuple[str, ...]
# Argnames that function immediately requires. These include argnames +
# fixture names specified via usefixtures and via autouse=True in fixture
# definitions.
initialnames: Tuple[str, ...]
names_closure: List[str]
name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]]
def prune_dependency_tree(self) -> None:
"""Recompute names_closure from initialnames and name2fixturedefs.
Can only reduce names_closure, which means that the new closure will
always be a subset of the old one. The order is preserved.
This method is needed because direct parametrization may shadow some
of the fixtures that were included in the originally built dependency
tree. In this way the dependency tree can get pruned, and the closure
of argnames may get reduced.
"""
closure: Set[str] = set()
working_set = set(self.initialnames)
while working_set:
argname = working_set.pop()
# Argname may be smth not included in the original names_closure,
# in which case we ignore it. This currently happens with pseudo
# FixtureDefs which wrap 'get_direct_param_fixture_func(request)'.
# So they introduce the new dependency 'request' which might have
# been missing in the original tree (closure).
if argname not in closure and argname in self.names_closure:
closure.add(argname)
if argname in self.name2fixturedefs:
working_set.update(self.name2fixturedefs[argname][-1].argnames)
self.names_closure[:] = sorted(closure, key=self.names_closure.index)
class FixtureRequest:
"""A request for a fixture from a test or fixture function.
A request object gives access to the requesting test context and has
an optional ``param`` attribute in case the fixture is parametrized
indirectly.
"""
def __init__(self, pyfuncitem, *, _ispytest: bool = False) -> None:
check_ispytest(_ispytest)
self._pyfuncitem = pyfuncitem
#: Fixture for which this request is being performed.
self.fixturename: Optional[str] = None
self._scope = Scope.Function
self._fixture_defs: Dict[str, FixtureDef[Any]] = {}
fixtureinfo: FuncFixtureInfo = pyfuncitem._fixtureinfo
self._arg2fixturedefs = fixtureinfo.name2fixturedefs.copy()
self._arg2index: Dict[str, int] = {}
self._fixturemanager: FixtureManager = pyfuncitem.session._fixturemanager
# Notes on the type of `param`:
# -`request.param` is only defined in parametrized fixtures, and will raise
# AttributeError otherwise. Python typing has no notion of "undefined", so
# this cannot be reflected in the type.
# - Technically `param` is only (possibly) defined on SubRequest, not
# FixtureRequest, but the typing of that is still in flux so this cheats.
# - In the future we might consider using a generic for the param type, but
# for now just using Any.
self.param: Any
@property
def scope(self) -> "_ScopeName":
"""Scope string, one of "function", "class", "module", "package", "session"."""
return self._scope.value
@property
def fixturenames(self) -> List[str]:
"""Names of all active fixtures in this request."""
result = list(self._pyfuncitem._fixtureinfo.names_closure)
result.extend(set(self._fixture_defs).difference(result))
return result
@property
def node(self):
"""Underlying collection node (depends on current request scope)."""
scope = self._scope
if scope is Scope.Function:
# This might also be a non-function Item despite its attribute name.
node: Optional[Union[nodes.Item, nodes.Collector]] = self._pyfuncitem
elif scope is Scope.Package:
# FIXME: _fixturedef is not defined on FixtureRequest (this class),
# but on FixtureRequest (a subclass).
node = get_scope_package(self._pyfuncitem, self._fixturedef) # type: ignore[attr-defined]
else:
node = get_scope_node(self._pyfuncitem, scope)
if node is None and scope is Scope.Class:
# Fallback to function item itself.
node = self._pyfuncitem
assert node, 'Could not obtain a node for scope "{}" for function {!r}'.format(
scope, self._pyfuncitem
)
return node
def _getnextfixturedef(self, argname: str) -> "FixtureDef[Any]":
fixturedefs = self._arg2fixturedefs.get(argname, None)
if fixturedefs is None:
# We arrive here because of a dynamic call to
# getfixturevalue(argname) usage which was naturally
# not known at parsing/collection time.
assert self._pyfuncitem.parent is not None
parentid = self._pyfuncitem.parent.nodeid
fixturedefs = self._fixturemanager.getfixturedefs(argname, parentid)
# TODO: Fix this type ignore. Either add assert or adjust types.
# Can this be None here?
self._arg2fixturedefs[argname] = fixturedefs # type: ignore[assignment]
# fixturedefs list is immutable so we maintain a decreasing index.
index = self._arg2index.get(argname, 0) - 1
if fixturedefs is None or (-index > len(fixturedefs)):
raise FixtureLookupError(argname, self)
self._arg2index[argname] = index
return fixturedefs[index]
@property
def config(self) -> Config:
"""The pytest config object associated with this request."""
return self._pyfuncitem.config # type: ignore[no-any-return]
@property
def function(self):
"""Test function object if the request has a per-function scope."""
if self.scope != "function":
raise AttributeError(
f"function not available in {self.scope}-scoped context"
)
return self._pyfuncitem.obj
@property
def cls(self):
"""Class (can be None) where the test function was collected."""
if self.scope not in ("class", "function"):
raise AttributeError(f"cls not available in {self.scope}-scoped context")
clscol = self._pyfuncitem.getparent(_pytest.python.Class)
if clscol:
return clscol.obj
@property
def instance(self):
"""Instance (can be None) on which test function was collected."""
# unittest support hack, see _pytest.unittest.TestCaseFunction.
try:
return self._pyfuncitem._testcase
except AttributeError:
function = getattr(self, "function", None)
return getattr(function, "__self__", None)
@property
def module(self):
"""Python module object where the test function was collected."""
if self.scope not in ("function", "class", "module"):
raise AttributeError(f"module not available in {self.scope}-scoped context")
return self._pyfuncitem.getparent(_pytest.python.Module).obj
@property
def path(self) -> Path:
"""Path where the test function was collected."""
if self.scope not in ("function", "class", "module", "package"):
raise AttributeError(f"path not available in {self.scope}-scoped context")
# TODO: Remove ignore once _pyfuncitem is properly typed.
return self._pyfuncitem.path # type: ignore
@property
def keywords(self) -> MutableMapping[str, Any]:
"""Keywords/markers dictionary for the underlying node."""
node: nodes.Node = self.node
return node.keywords
@property
def session(self) -> "Session":
"""Pytest session object."""
return self._pyfuncitem.session # type: ignore[no-any-return]
def addfinalizer(self, finalizer: Callable[[], object]) -> None:
"""Add finalizer/teardown function to be called without arguments after
the last test within the requesting test context finished execution."""
# XXX usually this method is shadowed by fixturedef specific ones.
self.node.addfinalizer(finalizer)
def applymarker(self, marker: Union[str, MarkDecorator]) -> None:
"""Apply a marker to a single test function invocation.
This method is useful if you don't want to have a keyword/marker
on all function invocations.
:param marker:
An object created by a call to ``pytest.mark.NAME(...)``.
"""
self.node.add_marker(marker)
def raiseerror(self, msg: Optional[str]) -> NoReturn:
"""Raise a FixtureLookupError exception.
:param msg:
An optional custom error message.
"""
raise self._fixturemanager.FixtureLookupError(None, self, msg)
def _fillfixtures(self) -> None:
item = self._pyfuncitem
fixturenames = getattr(item, "fixturenames", self.fixturenames)
for argname in fixturenames:
if argname not in item.funcargs:
item.funcargs[argname] = self.getfixturevalue(argname)
def getfixturevalue(self, argname: str) -> Any:
"""Dynamically run a named fixture function.
Declaring fixtures via function argument is recommended where possible.
But if you can only decide whether to use another fixture at test
setup time, you may use this function to retrieve it inside a fixture
or test function body.
This method can be used during the test setup phase or the test run
phase, but during the test teardown phase a fixture's value may not
be available.
:param argname:
The fixture name.
:raises pytest.FixtureLookupError:
If the given fixture could not be found.
"""
fixturedef = self._get_active_fixturedef(argname)
assert fixturedef.cached_result is not None, (
f'The fixture value for "{argname}" is not available. '
"This can happen when the fixture has already been torn down."
)
return fixturedef.cached_result[0]
def _get_active_fixturedef(
self, argname: str
) -> Union["FixtureDef[object]", PseudoFixtureDef[object]]:
try:
return self._fixture_defs[argname]
except KeyError:
try:
fixturedef = self._getnextfixturedef(argname)
except FixtureLookupError:
if argname == "request":
cached_result = (self, [0], None)
return PseudoFixtureDef(cached_result, Scope.Function)
raise
# Remove indent to prevent the python3 exception
# from leaking into the call.
self._compute_fixture_value(fixturedef)
self._fixture_defs[argname] = fixturedef
return fixturedef
def _get_fixturestack(self) -> List["FixtureDef[Any]"]:
current = self
values: List[FixtureDef[Any]] = []
while isinstance(current, SubRequest):
values.append(current._fixturedef) # type: ignore[has-type]
current = current._parent_request
values.reverse()
return values
def _compute_fixture_value(self, fixturedef: "FixtureDef[object]") -> None:
"""Create a SubRequest based on "self" and call the execute method
of the given FixtureDef object.
This will force the FixtureDef object to throw away any previous
results and compute a new fixture value, which will be stored into
the FixtureDef object itself.
"""
# prepare a subrequest object before calling fixture function
# (latter managed by fixturedef)
argname = fixturedef.argname
funcitem = self._pyfuncitem
scope = fixturedef._scope
try:
callspec = funcitem.callspec
except AttributeError:
callspec = None
if callspec is not None and argname in callspec.params:
param = callspec.params[argname]
param_index = callspec.indices[argname]
# If a parametrize invocation set a scope it will override
# the static scope defined with the fixture function.
with suppress(KeyError):
scope = callspec._arg2scope[argname]
else:
param = NOTSET
param_index = 0
has_params = fixturedef.params is not None
fixtures_not_supported = getattr(funcitem, "nofuncargs", False)
if has_params and fixtures_not_supported:
msg = (
"{name} does not support fixtures, maybe unittest.TestCase subclass?\n"
"Node id: {nodeid}\n"
"Function type: {typename}"
).format(
name=funcitem.name,
nodeid=funcitem.nodeid,
typename=type(funcitem).__name__,
)
fail(msg, pytrace=False)
if has_params:
frame = inspect.stack()[3]
frameinfo = inspect.getframeinfo(frame[0])
source_path = absolutepath(frameinfo.filename)
source_lineno = frameinfo.lineno
try:
source_path_str = str(
source_path.relative_to(funcitem.config.rootpath)
)
except ValueError:
source_path_str = str(source_path)
msg = (
"The requested fixture has no parameter defined for test:\n"
" {}\n\n"
"Requested fixture '{}' defined in:\n{}"
"\n\nRequested here:\n{}:{}".format(
funcitem.nodeid,
fixturedef.argname,
getlocation(fixturedef.func, funcitem.config.rootpath),
source_path_str,
source_lineno,
)
)
fail(msg, pytrace=False)
subrequest = SubRequest(
self, scope, param, param_index, fixturedef, _ispytest=True
)
# Check if a higher-level scoped fixture accesses a lower level one.
subrequest._check_scope(argname, self._scope, scope)
try:
# Call the fixture function.
fixturedef.execute(request=subrequest)
finally:
self._schedule_finalizers(fixturedef, subrequest)
def _schedule_finalizers(
self, fixturedef: "FixtureDef[object]", subrequest: "SubRequest"
) -> None:
# If fixture function failed it might have registered finalizers.
subrequest.node.addfinalizer(lambda: fixturedef.finish(request=subrequest))
def _check_scope(
self,
argname: str,
invoking_scope: Scope,
requested_scope: Scope,
) -> None:
if argname == "request":
return
if invoking_scope > requested_scope:
# Try to report something helpful.
text = "\n".join(self._factorytraceback())
fail(
f"ScopeMismatch: You tried to access the {requested_scope.value} scoped "
f"fixture {argname} with a {invoking_scope.value} scoped request object, "
f"involved factories:\n{text}",
pytrace=False,
)
def _factorytraceback(self) -> List[str]:
lines = []
for fixturedef in self._get_fixturestack():
factory = fixturedef.func
fs, lineno = getfslineno(factory)
if isinstance(fs, Path):
session: Session = self._pyfuncitem.session
p = bestrelpath(session.path, fs)
else:
p = fs
args = _format_args(factory)
lines.append("%s:%d: def %s%s" % (p, lineno + 1, factory.__name__, args))
return lines
def __repr__(self) -> str:
return "