import asyncio import logging import os import shutil import stat import tempfile import urllib.request from pathlib import Path from typing import Optional from defence360agent.contracts.config import Packaging from defence360agent.subsys.persistent_state import save_state from defence360agent.utils import CheckRunError, check_run _HTTP_TIMEOUT = 30 logger = logging.getLogger(__name__) _SCRIPT_NAME = "imunify-doctor.sh" _SCRIPT_URL = ( "https://repo.imunify360.cloudlinux.com/defence360/" + _SCRIPT_NAME ) _SIG_URL = _SCRIPT_URL + ".sig" _TMPDIR = Path("/var/imunify360/tmp") _PUBKEY_PATHS = ( Path("/etc/pki/rpm-gpg/RPM-GPG-KEY-CloudLinux-Imunify"), Path("/etc/apt/trusted.gpg.d/RPM-GPG-KEY-CloudLinux.gpg"), ) def _find_pubkey() -> Optional[Path]: for p in _PUBKEY_PATHS: if p.is_file() and os.access(str(p), os.R_OK): return p return None def _blocking_download(url: str, dst: Path) -> None: req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=_HTTP_TIMEOUT) as resp, dst.open( "wb" ) as fp: shutil.copyfileobj(resp, fp) def _blocking_setup_workdir(): """Locate the pubkey + gpg binary and create a validated 0700 workdir. Returns (pubkey_path, workdir_path) on success or None on failure; any partial state is removed before returning. """ pubkey = _find_pubkey() if pubkey is None or not shutil.which("gpg"): return None try: _TMPDIR.mkdir(mode=0o700, parents=True, exist_ok=True) workdir = Path( tempfile.mkdtemp(prefix="imunify-doctor.", dir=str(_TMPDIR)) ) except OSError as exc: logger.info("cannot prepare workdir under %s: %s", _TMPDIR, exc) return None try: # Single lstat — atomic snapshot of mode + uid. Path.is_dir() would # follow symlinks and Path.is_symlink() would issue another lstat, so # using st.st_mode here both eliminates the extra syscalls and keeps # the symlink rejection semantically consistent with the lstat. st = workdir.lstat() if ( stat.S_ISLNK(st.st_mode) or not stat.S_ISDIR(st.st_mode) or st.st_uid != os.geteuid() ): shutil.rmtree(str(workdir), ignore_errors=True) return None (workdir / "gnupg").mkdir(mode=0o700) except OSError as exc: logger.info("workdir setup failed: %s", exc) shutil.rmtree(str(workdir), ignore_errors=True) return None return pubkey, workdir def _blocking_rmtree(p: Path) -> None: shutil.rmtree(str(p), ignore_errors=True) def _blocking_chmod(p: Path, mode: int) -> None: p.chmod(mode) async def _download(url: str, dst: Path) -> None: """Fetch *url* to *dst* without blocking the event loop. Raises urllib.error.URLError (subclass of OSError) on any HTTP/transport error, which the caller's `except OSError` already handles. """ loop = asyncio.get_event_loop() await loop.run_in_executor(None, _blocking_download, url, dst) async def _verified_remote_script() -> Optional[Path]: """ Download imunify-doctor.sh + .sig into /var/imunify360/tmp and verify the detached signature against an ephemeral keyring seeded with the CloudLinux pubkey. Returns the verified script on success or None on any failure (so the caller can fall back to the package copy). """ loop = asyncio.get_event_loop() setup = await loop.run_in_executor(None, _blocking_setup_workdir) if setup is None: return None pubkey, workdir = setup script = workdir / _SCRIPT_NAME sig = workdir / (_SCRIPT_NAME + ".sig") gpghome = workdir / "gnupg" success = False try: await _download(_SCRIPT_URL, script) await _download(_SIG_URL, sig) env = dict(os.environ, GNUPGHOME=str(gpghome)) await check_run( ["gpg", "--batch", "--quiet", "--import", str(pubkey)], env=env, ) await check_run( ["gpg", "--batch", "--quiet", "--verify", str(sig), str(script)], env=env, ) await loop.run_in_executor(None, _blocking_chmod, script, 0o700) success = True return script except (CheckRunError, OSError) as exc: logger.info("signed remote doctor fetch failed: %s", exc) return None finally: if not success: await loop.run_in_executor(None, _blocking_rmtree, workdir) async def _repo_get_doctor_key() -> str: script = await _verified_remote_script() if script is None: raise ValueError("Signed remote doctor script not available") loop = asyncio.get_event_loop() try: out = await check_run([str(script)]) finally: await loop.run_in_executor(None, _blocking_rmtree, script.parent) key = out.decode().strip() if not key: raise ValueError("Doctor key is empty") return key async def _package_get_doctor_key() -> str: dir_ = Packaging.DATADIR if not Path(dir_).is_dir(): dir_ = "/opt/imunify360/venv/share/imunify360" out = await check_run([Path(dir_, "scripts", _SCRIPT_NAME)]) key = out.decode().strip() return key async def get_doctor_key(): try: key = await _repo_get_doctor_key() except (CheckRunError, ValueError, OSError): key = await _package_get_doctor_key() save_state("doctor_key", {"doctor_key": key}) return key