# -*- coding: utf-8 -*-
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Test utilities.
"""
from __future__ import print_function
import atexit
import contextlib
import ctypes
import errno
import functools
import gc
import inspect
import os
import platform
import random
import re
import select
import shlex
import shutil
import signal
import socket
import stat
import subprocess
import sys
import tempfile
import textwrap
import threading
import time
import unittest
import warnings
from socket import AF_INET
from socket import AF_INET6
from socket import SOCK_STREAM
import psutil
from psutil import AIX
from psutil import LINUX
from psutil import MACOS
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._common import bytes2human
from psutil._common import memoize
from psutil._common import print_color
from psutil._common import supports_ipv6
from psutil._compat import PY3
from psutil._compat import FileExistsError
from psutil._compat import FileNotFoundError
from psutil._compat import range
from psutil._compat import super
from psutil._compat import u
from psutil._compat import unicode
from psutil._compat import which
try:
from unittest import mock # py3
except ImportError:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import mock # NOQA - requires "pip install mock"
if sys.version_info >= (3, 4):
import enum
else:
enum = None
if POSIX:
from psutil._psposix import wait_pid
__all__ = [
# constants
'APPVEYOR', 'DEVNULL', 'GLOBAL_TIMEOUT', 'TOLERANCE_SYS_MEM', 'NO_RETRIES',
'PYPY', 'PYTHON_EXE', 'PYTHON_EXE_ENV', 'ROOT_DIR', 'SCRIPTS_DIR',
'TESTFN_PREFIX', 'UNICODE_SUFFIX', 'INVALID_UNICODE_SUFFIX',
'CI_TESTING', 'VALID_PROC_STATUSES', 'TOLERANCE_DISK_USAGE', 'IS_64BIT',
"HAS_CPU_AFFINITY", "HAS_CPU_FREQ", "HAS_ENVIRON", "HAS_PROC_IO_COUNTERS",
"HAS_IONICE", "HAS_MEMORY_MAPS", "HAS_PROC_CPU_NUM", "HAS_RLIMIT",
"HAS_SENSORS_BATTERY", "HAS_BATTERY", "HAS_SENSORS_FANS",
"HAS_SENSORS_TEMPERATURES", "MACOS_11PLUS",
"MACOS_12PLUS", "COVERAGE",
# subprocesses
'pyrun', 'terminate', 'reap_children', 'spawn_testproc', 'spawn_zombie',
'spawn_children_pair',
# threads
'ThreadTask',
# test utils
'unittest', 'skip_on_access_denied', 'skip_on_not_implemented',
'retry_on_failure', 'TestMemoryLeak', 'PsutilTestCase',
'process_namespace', 'system_namespace', 'print_sysinfo',
# fs utils
'chdir', 'safe_rmpath', 'create_exe', 'get_testfn',
# os
'get_winver', 'kernel_version',
# sync primitives
'call_until', 'wait_for_pid', 'wait_for_file',
# network
'check_net_address',
'get_free_port', 'bind_socket', 'bind_unix_socket', 'tcp_socketpair',
'unix_socketpair', 'create_sockets',
# compat
'reload_module', 'import_module_by_path',
# others
'warn', 'copyload_shared_lib', 'is_namedtuple',
]
# ===================================================================
# --- constants
# ===================================================================
# --- platforms
PYPY = '__pypy__' in sys.builtin_module_names
# whether we're running this test suite on a Continuous Integration service
APPVEYOR = 'APPVEYOR' in os.environ
GITHUB_ACTIONS = 'GITHUB_ACTIONS' in os.environ or 'CIBUILDWHEEL' in os.environ
CI_TESTING = APPVEYOR or GITHUB_ACTIONS
COVERAGE = 'COVERAGE_RUN' in os.environ
# are we a 64 bit process?
IS_64BIT = sys.maxsize > 2 ** 32
@memoize
def macos_version():
version_str = platform.mac_ver()[0]
version = tuple(map(int, version_str.split(".")[:2]))
if version == (10, 16):
# When built against an older macOS SDK, Python will report
# macOS 10.16 instead of the real version.
version_str = subprocess.check_output(
[
sys.executable,
"-sS",
"-c",
"import platform; print(platform.mac_ver()[0])",
],
env={"SYSTEM_VERSION_COMPAT": "0"},
universal_newlines=True,
)
version = tuple(map(int, version_str.split(".")[:2]))
return version
if MACOS:
MACOS_11PLUS = macos_version() > (10, 15)
MACOS_12PLUS = macos_version() >= (12, 0)
else:
MACOS_11PLUS = False
MACOS_12PLUS = False
# --- configurable defaults
# how many times retry_on_failure() decorator will retry
NO_RETRIES = 10
# bytes tolerance for system-wide related tests
TOLERANCE_SYS_MEM = 5 * 1024 * 1024 # 5MB
TOLERANCE_DISK_USAGE = 10 * 1024 * 1024 # 10MB
# the timeout used in functions which have to wait
GLOBAL_TIMEOUT = 5
# be more tolerant if we're on CI in order to avoid false positives
if CI_TESTING:
NO_RETRIES *= 3
GLOBAL_TIMEOUT *= 3
TOLERANCE_SYS_MEM *= 4
TOLERANCE_DISK_USAGE *= 3
# --- file names
# Disambiguate TESTFN for parallel testing.
if os.name == 'java':
# Jython disallows @ in module names
TESTFN_PREFIX = '$psutil-%s-' % os.getpid()
else:
TESTFN_PREFIX = '@psutil-%s-' % os.getpid()
UNICODE_SUFFIX = u("-ƒőő")
# An invalid unicode string.
if PY3:
INVALID_UNICODE_SUFFIX = b"f\xc0\x80".decode('utf8', 'surrogateescape')
else:
INVALID_UNICODE_SUFFIX = "f\xc0\x80"
ASCII_FS = sys.getfilesystemencoding().lower() in ('ascii', 'us-ascii')
# --- paths
ROOT_DIR = os.path.realpath(
os.path.join(os.path.dirname(__file__), '..', '..'))
SCRIPTS_DIR = os.environ.get(
"PSUTIL_SCRIPTS_DIR",
os.path.join(ROOT_DIR, 'scripts')
)
HERE = os.path.realpath(os.path.dirname(__file__))
# --- support
HAS_CONNECTIONS_UNIX = POSIX and not SUNOS
HAS_CPU_AFFINITY = hasattr(psutil.Process, "cpu_affinity")
HAS_CPU_FREQ = hasattr(psutil, "cpu_freq")
HAS_GETLOADAVG = hasattr(psutil, "getloadavg")
HAS_ENVIRON = hasattr(psutil.Process, "environ")
HAS_IONICE = hasattr(psutil.Process, "ionice")
HAS_MEMORY_MAPS = hasattr(psutil.Process, "memory_maps")
HAS_NET_IO_COUNTERS = hasattr(psutil, "net_io_counters")
HAS_PROC_CPU_NUM = hasattr(psutil.Process, "cpu_num")
HAS_PROC_IO_COUNTERS = hasattr(psutil.Process, "io_counters")
HAS_RLIMIT = hasattr(psutil.Process, "rlimit")
HAS_SENSORS_BATTERY = hasattr(psutil, "sensors_battery")
try:
HAS_BATTERY = HAS_SENSORS_BATTERY and bool(psutil.sensors_battery())
except Exception:
HAS_BATTERY = False
HAS_SENSORS_FANS = hasattr(psutil, "sensors_fans")
HAS_SENSORS_TEMPERATURES = hasattr(psutil, "sensors_temperatures")
HAS_THREADS = hasattr(psutil.Process, "threads")
SKIP_SYSCONS = (MACOS or AIX) and os.getuid() != 0
# --- misc
def _get_py_exe():
def attempt(exe):
try:
subprocess.check_call(
[exe, "-V"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception:
return None
else:
return exe
env = os.environ.copy()
# On Windows, starting with python 3.7, virtual environments use a
# venv launcher startup process. This does not play well when
# counting spawned processes, or when relying on the PID of the
# spawned process to do some checks, e.g. connections check per PID.
# Let's use the base python in this case.
base = getattr(sys, "_base_executable", None)
if WINDOWS and sys.version_info >= (3, 7) and base is not None:
# We need to set __PYVENV_LAUNCHER__ to sys.executable for the
# base python executable to know about the environment.
env["__PYVENV_LAUNCHER__"] = sys.executable
return base, env
elif GITHUB_ACTIONS:
return sys.executable, env
elif MACOS:
exe = \
attempt(sys.executable) or \
attempt(os.path.realpath(sys.executable)) or \
attempt(which("python%s.%s" % sys.version_info[:2])) or \
attempt(psutil.Process().exe())
if not exe:
raise ValueError("can't find python exe real abspath")
return exe, env
else:
exe = os.path.realpath(sys.executable)
assert os.path.exists(exe), exe
return exe, env
PYTHON_EXE, PYTHON_EXE_ENV = _get_py_exe()
DEVNULL = open(os.devnull, 'r+')
atexit.register(DEVNULL.close)
VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil)
if x.startswith('STATUS_')]
AF_UNIX = getattr(socket, "AF_UNIX", object())
_subprocesses_started = set()
_pids_started = set()
# ===================================================================
# --- threads
# ===================================================================
class ThreadTask(threading.Thread):
"""A thread task which does nothing expect staying alive."""
def __init__(self):
super().__init__()
self._running = False
self._interval = 0.001
self._flag = threading.Event()
def __repr__(self):
name = self.__class__.__name__
return '<%s running=%s at %#x>' % (name, self._running, id(self))
def __enter__(self):
self.start()
return self
def __exit__(self, *args, **kwargs):
self.stop()
def start(self):
"""Start thread and keep it running until an explicit
stop() request. Polls for shutdown every 'timeout' seconds.
"""
if self._running:
raise ValueError("already started")
threading.Thread.start(self)
self._flag.wait()
def run(self):
self._running = True
self._flag.set()
while self._running:
time.sleep(self._interval)
def stop(self):
"""Stop thread execution and and waits until it is stopped."""
if not self._running:
raise ValueError("already stopped")
self._running = False
self.join()
# ===================================================================
# --- subprocesses
# ===================================================================
def _reap_children_on_err(fun):
@functools.wraps(fun)
def wrapper(*args, **kwargs):
try:
return fun(*args, **kwargs)
except Exception:
reap_children()
raise
return wrapper
@_reap_children_on_err
def spawn_testproc(cmd=None, **kwds):
"""Creates a python subprocess which does nothing for 60 secs and
return it as a subprocess.Popen instance.
If "cmd" is specified that is used instead of python.
By default stdin and stdout are redirected to /dev/null.
It also attempts to make sure the process is in a reasonably
initialized state.
The process is registered for cleanup on reap_children().
"""
kwds.setdefault("stdin", DEVNULL)
kwds.setdefault("stdout", DEVNULL)
kwds.setdefault("cwd", os.getcwd())
kwds.setdefault("env", PYTHON_EXE_ENV)
if WINDOWS:
# Prevents the subprocess to open error dialogs. This will also
# cause stderr to be suppressed, which is suboptimal in order
# to debug broken tests.
CREATE_NO_WINDOW = 0x8000000
kwds.setdefault("creationflags", CREATE_NO_WINDOW)
if cmd is None:
testfn = get_testfn()
try:
safe_rmpath(testfn)
pyline = "from time import sleep;" \
"open(r'%s', 'w').close();" \
"sleep(60);" % testfn
cmd = [PYTHON_EXE, "-c", pyline]
sproc = subprocess.Popen(cmd, **kwds)
_subprocesses_started.add(sproc)
wait_for_file(testfn, delete=True, empty=True)
finally:
safe_rmpath(testfn)
else:
sproc = subprocess.Popen(cmd, **kwds)
_subprocesses_started.add(sproc)
wait_for_pid(sproc.pid)
return sproc
@_reap_children_on_err
def spawn_children_pair():
"""Create a subprocess which creates another one as in:
A (us) -> B (child) -> C (grandchild).
Return a (child, grandchild) tuple.
The 2 processes are fully initialized and will live for 60 secs
and are registered for cleanup on reap_children().
"""
tfile = None
testfn = get_testfn(dir=os.getcwd())
try:
s = textwrap.dedent("""\
import subprocess, os, sys, time
s = "import os, time;"
s += "f = open('%s', 'w');"
s += "f.write(str(os.getpid()));"
s += "f.close();"
s += "time.sleep(60);"
p = subprocess.Popen([r'%s', '-c', s])
p.wait()
""" % (os.path.basename(testfn), PYTHON_EXE))
# On Windows if we create a subprocess with CREATE_NO_WINDOW flag
# set (which is the default) a "conhost.exe" extra process will be
# spawned as a child. We don't want that.
if WINDOWS:
subp, tfile = pyrun(s, creationflags=0)
else:
subp, tfile = pyrun(s)
child = psutil.Process(subp.pid)
grandchild_pid = int(wait_for_file(testfn, delete=True, empty=False))
_pids_started.add(grandchild_pid)
grandchild = psutil.Process(grandchild_pid)
return (child, grandchild)
finally:
safe_rmpath(testfn)
if tfile is not None:
safe_rmpath(tfile)
def spawn_zombie():
"""Create a zombie process and return a (parent, zombie) process tuple.
In order to kill the zombie parent must be terminate()d first, then
zombie must be wait()ed on.
"""
assert psutil.POSIX
unix_file = get_testfn()
src = textwrap.dedent("""\
import os, sys, time, socket, contextlib
child_pid = os.fork()
if child_pid > 0:
time.sleep(3000)
else:
# this is the zombie process
s = socket.socket(socket.AF_UNIX)
with contextlib.closing(s):
s.connect('%s')
if sys.version_info < (3, ):
pid = str(os.getpid())
else:
pid = bytes(str(os.getpid()), 'ascii')
s.sendall(pid)
""" % unix_file)
tfile = None
sock = bind_unix_socket(unix_file)
try:
sock.settimeout(GLOBAL_TIMEOUT)
parent, tfile = pyrun(src)
conn, _ = sock.accept()
try:
select.select([conn.fileno()], [], [], GLOBAL_TIMEOUT)
zpid = int(conn.recv(1024))
_pids_started.add(zpid)
zombie = psutil.Process(zpid)
call_until(zombie.status, "ret == psutil.STATUS_ZOMBIE")
return (parent, zombie)
finally:
conn.close()
finally:
sock.close()
safe_rmpath(unix_file)
if tfile is not None:
safe_rmpath(tfile)
@_reap_children_on_err
def pyrun(src, **kwds):
"""Run python 'src' code string in a separate interpreter.
Returns a subprocess.Popen instance and the test file where the source
code was written.
"""
kwds.setdefault("stdout", None)
kwds.setdefault("stderr", None)
srcfile = get_testfn()
try:
with open(srcfile, 'wt') as f:
f.write(src)
subp = spawn_testproc([PYTHON_EXE, f.name], **kwds)
wait_for_pid(subp.pid)
return (subp, srcfile)
except Exception:
safe_rmpath(srcfile)
raise
@_reap_children_on_err
def sh(cmd, **kwds):
"""run cmd in a subprocess and return its output.
raises RuntimeError on error.
"""
# Prevents subprocess to open error dialogs in case of error.
flags = 0x8000000 if WINDOWS else 0
kwds.setdefault("stdout", subprocess.PIPE)
kwds.setdefault("stderr", subprocess.PIPE)
kwds.setdefault("universal_newlines", True)
kwds.setdefault("creationflags", flags)
if isinstance(cmd, str):
cmd = shlex.split(cmd)
p = subprocess.Popen(cmd, **kwds)
_subprocesses_started.add(p)
if PY3:
stdout, stderr = p.communicate(timeout=GLOBAL_TIMEOUT)
else:
stdout, stderr = p.communicate()
if p.returncode != 0:
raise RuntimeError(stderr)
if stderr:
warn(stderr)
if stdout.endswith('\n'):
stdout = stdout[:-1]
return stdout
def terminate(proc_or_pid, sig=signal.SIGTERM, wait_timeout=GLOBAL_TIMEOUT):
"""Terminate a process and wait() for it.
Process can be a PID or an instance of psutil.Process(),
subprocess.Popen() or psutil.Popen().
If it's a subprocess.Popen() or psutil.Popen() instance also closes
its stdin / stdout / stderr fds.
PID is wait()ed even if the process is already gone (kills zombies).
Does nothing if the process does not exist.
Return process exit status.
"""
def wait(proc, timeout):
if isinstance(proc, subprocess.Popen) and not PY3:
proc.wait()
else:
proc.wait(timeout)
if WINDOWS and isinstance(proc, subprocess.Popen):
# Otherwise PID may still hang around.
try:
return psutil.Process(proc.pid).wait(timeout)
except psutil.NoSuchProcess:
pass
def sendsig(proc, sig):
# XXX: otherwise the build hangs for some reason.
if MACOS and GITHUB_ACTIONS:
sig = signal.SIGKILL
# If the process received SIGSTOP, SIGCONT is necessary first,
# otherwise SIGTERM won't work.
if POSIX and sig != signal.SIGKILL:
proc.send_signal(signal.SIGCONT)
proc.send_signal(sig)
def term_subprocess_proc(proc, timeout):
try:
sendsig(proc, sig)
except OSError as err:
if WINDOWS and err.winerror == 6: # "invalid handle"
pass
elif err.errno != errno.ESRCH:
raise
return wait(proc, timeout)
def term_psutil_proc(proc, timeout):
try:
sendsig(proc, sig)
except psutil.NoSuchProcess:
pass
return wait(proc, timeout)
def term_pid(pid, timeout):
try:
proc = psutil.Process(pid)
except psutil.NoSuchProcess:
# Needed to kill zombies.
if POSIX:
return wait_pid(pid, timeout)
else:
return term_psutil_proc(proc, timeout)
def flush_popen(proc):
if proc.stdout:
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
# Flushing a BufferedWriter may raise an error.
if proc.stdin:
proc.stdin.close()
p = proc_or_pid
try:
if isinstance(p, int):
return term_pid(p, wait_timeout)
elif isinstance(p, (psutil.Process, psutil.Popen)):
return term_psutil_proc(p, wait_timeout)
elif isinstance(p, subprocess.Popen):
return term_subprocess_proc(p, wait_timeout)
else:
raise TypeError("wrong type %r" % p)
finally:
if isinstance(p, (subprocess.Popen, psutil.Popen)):
flush_popen(p)
pid = p if isinstance(p, int) else p.pid
assert not psutil.pid_exists(pid), pid
def reap_children(recursive=False):
"""Terminate and wait() any subprocess started by this test suite
and any children currently running, ensuring that no processes stick
around to hog resources.
If recursive is True it also tries to terminate and wait()
all grandchildren started by this process.
"""
# Get the children here before terminating them, as in case of
# recursive=True we don't want to lose the intermediate reference
# pointing to the grandchildren.
children = psutil.Process().children(recursive=recursive)
# Terminate subprocess.Popen.
while _subprocesses_started:
subp = _subprocesses_started.pop()
terminate(subp)
# Collect started pids.
while _pids_started:
pid = _pids_started.pop()
terminate(pid)
# Terminate children.
if children:
for p in children:
terminate(p, wait_timeout=None)
_, alive = psutil.wait_procs(children, timeout=GLOBAL_TIMEOUT)
for p in alive:
warn("couldn't terminate process %r; attempting kill()" % p)
terminate(p, sig=signal.SIGKILL)
# ===================================================================
# --- OS
# ===================================================================
def kernel_version():
"""Return a tuple such as (2, 6, 36)."""
if not POSIX:
raise NotImplementedError("not POSIX")
s = ""
uname = os.uname()[2]
for c in uname:
if c.isdigit() or c == '.':
s += c
else:
break
if not s:
raise ValueError("can't parse %r" % uname)
minor = 0
micro = 0
nums = s.split('.')
major = int(nums[0])
if len(nums) >= 2:
minor = int(nums[1])
if len(nums) >= 3:
micro = int(nums[2])
return (major, minor, micro)
def get_winver():
if not WINDOWS:
raise NotImplementedError("not WINDOWS")
wv = sys.getwindowsversion()
if hasattr(wv, 'service_pack_major'): # python >= 2.7
sp = wv.service_pack_major or 0
else:
r = re.search(r"\s\d$", wv[4])
if r:
sp = int(r.group(0))
else:
sp = 0
return (wv[0], wv[1], sp)
# ===================================================================
# --- sync primitives
# ===================================================================
class retry(object):
"""A retry decorator."""
def __init__(self,
exception=Exception,
timeout=None,
retries=None,
interval=0.001,
logfun=None,
):
if timeout and retries:
raise ValueError("timeout and retries args are mutually exclusive")
self.exception = exception
self.timeout = timeout
self.retries = retries
self.interval = interval
self.logfun = logfun
def __iter__(self):
if self.timeout:
stop_at = time.time() + self.timeout
while time.time() < stop_at:
yield
elif self.retries:
for _ in range(self.retries):
yield
else:
while True:
yield
def sleep(self):
if self.interval is not None:
time.sleep(self.interval)
def __call__(self, fun):
@functools.wraps(fun)
def wrapper(*args, **kwargs):
exc = None
for _ in self:
try:
return fun(*args, **kwargs)
except self.exception as _: # NOQA
exc = _
if self.logfun is not None:
self.logfun(exc)
self.sleep()
continue
if PY3:
raise exc
else:
raise
# This way the user of the decorated function can change config
# parameters.
wrapper.decorator = self
return wrapper
@retry(exception=psutil.NoSuchProcess, logfun=None, timeout=GLOBAL_TIMEOUT,
interval=0.001)
def wait_for_pid(pid):
"""Wait for pid to show up in the process list then return.
Used in the test suite to give time the sub process to initialize.
"""
psutil.Process(pid)
if WINDOWS:
# give it some more time to allow better initialization
time.sleep(0.01)
@retry(exception=(FileNotFoundError, AssertionError), logfun=None,
timeout=GLOBAL_TIMEOUT, interval=0.001)
def wait_for_file(fname, delete=True, empty=False):
"""Wait for a file to be written on disk with some content."""
with open(fname, "rb") as f:
data = f.read()
if not empty:
assert data
if delete:
safe_rmpath(fname)
return data
@retry(exception=AssertionError, logfun=None, timeout=GLOBAL_TIMEOUT,
interval=0.001)
def call_until(fun, expr):
"""Keep calling function for timeout secs and exit if eval()
expression is True.
"""
ret = fun()
assert eval(expr)
return ret
# ===================================================================
# --- fs
# ===================================================================
def safe_rmpath(path):
"""Convenience function for removing temporary test files or dirs."""
def retry_fun(fun):
# On Windows it could happen that the file or directory has
# open handles or references preventing the delete operation
# to succeed immediately, so we retry for a while. See:
# https://bugs.python.org/issue33240
stop_at = time.time() + GLOBAL_TIMEOUT
while time.time() < stop_at:
try:
return fun()
except FileNotFoundError:
pass
except WindowsError as _:
err = _
warn("ignoring %s" % (str(err)))
time.sleep(0.01)
raise err
try:
st = os.stat(path)
if stat.S_ISDIR(st.st_mode):
fun = functools.partial(shutil.rmtree, path)
else:
fun = functools.partial(os.remove, path)
if POSIX:
fun()
else:
retry_fun(fun)
except FileNotFoundError:
pass
def safe_mkdir(dir):
"""Convenience function for creating a directory."""
try:
os.mkdir(dir)
except FileExistsError:
pass
@contextlib.contextmanager
def chdir(dirname):
"""Context manager which temporarily changes the current directory."""
curdir = os.getcwd()
try:
os.chdir(dirname)
yield
finally:
os.chdir(curdir)
def create_exe(outpath, c_code=None):
"""Creates an executable file in the given location."""
assert not os.path.exists(outpath), outpath
if c_code:
if not which("gcc"):
raise unittest.SkipTest("gcc is not installed")
if isinstance(c_code, bool): # c_code is True
c_code = textwrap.dedent(
"""
#include