# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains helpful utility functions for X-Ray Manager """ import dbm import errno import pwd import re from getpass import getuser import fcntl import logging import os import shelve import shutil import shlex import subprocess import platform import time import xml.etree.ElementTree as ET from contextlib import contextmanager from datetime import date, timedelta from functools import wraps from glob import glob from socket import (socket, fromfd, AF_UNIX, SOCK_STREAM, SOCK_DGRAM, AF_INET, AF_INET6) from typing import Callable, List, Optional import sentry_sdk from sentry_sdk.integrations.atexit import AtexitIntegration from sentry_sdk.integrations.logging import LoggingIntegration from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported, get_cp_description, getCPName, is_wp2_environment from clcommon.lib.cledition import get_cl_edition_readable from clcommon.ui_config import UIConfig from clcommon.clpwd import drop_privileges from clcommon.utils import get_rhn_systemid_value from clcommon.lib.network import get_hostname from xray.internal.clwpos_safe_imports import php_get_vhost_versions_user from xray import gettext as _ from .constants import ( sentry_dsn, local_tasks_storage, agent_file, logging_level, jwt_token_location, user_agent_sock ) from .exceptions import XRayError, XRayManagerExit logger = logging.getLogger('utils') subprocess_errors = ( OSError, ValueError, subprocess.SubprocessError ) # --------- DECORATORS --------- def skeleton_update(func: Callable) -> Callable: """ Decorator aimed to update ini file in cagefs-skeleton Applies to task.add nd task.remove """ def update(*args): """ Copy ini file to cagefs-skeleton Action takes place for cPanel ea-php only """ original_ini = os.path.join(args[0].ini_location, 'xray.ini') if original_ini.startswith('/opt/cpanel') and glob( '/usr/share/cagefs'): skeleton_ini = os.path.join('/usr/share/cagefs/.cpanel.multiphp', original_ini[1:]) elif original_ini.startswith('/usr/local') and glob( '/usr/share/cagefs-skeleton'): skeleton_ini = os.path.join('/usr/share/cagefs-skeleton', original_ini[1:]) if not os.path.exists(os.path.dirname(skeleton_ini)): os.mkdir(os.path.dirname(skeleton_ini)) else: return if not os.path.exists(original_ini): if os.path.lexists(skeleton_ini): if os.path.islink(skeleton_ini): logger.warning('Refusing to unlink symlink in cagefs-skeleton', extra={'xray_ini': skeleton_ini}) return try: os.unlink(skeleton_ini) except OSError as e: logger.warning('Failed to unlink ini in cagefs-skeleton', extra={'xray_ini': skeleton_ini, 'err': str(e)}) else: try: if os.path.islink(skeleton_ini): logger.warning('Refusing to copy over symlink in cagefs-skeleton', extra={'xray_ini': skeleton_ini}) return # Read source first so a transient source-read failure # cannot truncate-then-fail the existing skeleton file. with open(original_ini, 'rb') as src: src_bytes = src.read() fd = os.open(skeleton_ini, os.O_WRONLY | os.O_CREAT | os.O_TRUNC | os.O_NOFOLLOW, 0o644) try: os.write(fd, src_bytes) finally: os.close(fd) except OSError as e: logger.warning('Failed to copy ini into cagefs-skeleton', extra={'xray_ini': original_ini, 'err': str(e)}) @wraps(func) def wrapper(*args, **kwargs): """ Wraps func """ func(*args, **kwargs) update(*args) return wrapper def dbm_storage_update(func: Callable) -> Callable: """ Decorator aimed to update DBM storage with fake_id:real_id mapping Applies to task.add nd task.remove """ def update(*args): """ Update DBM storage contents """ task_instance = args[0] with dbm_storage(local_tasks_storage) as task_storage: task_storage[task_instance.fake_id] = task_instance.task_id def remove(*args): """ Remove task from DBM storage """ with dbm_storage(local_tasks_storage) as task_storage: try: del task_storage[args[0].fake_id.encode()] except KeyError: # ignore absence of item during removal pass @wraps(func) def wrapper(*args, **kwargs): """ Wraps func """ # add task id into DBM storage as early as possible try: if func.__name__ == 'add': update(*args) except RuntimeError as e: raise XRayError(str(e)) try: func(*args, **kwargs) except Exception: # cleanup recently added task id from DBM storage in case of # any accidental fails during add procedure if func.__name__ == 'add': remove(*args) raise # during task removal cleanup task from DBM storage as late as possible try: if func.__name__ == 'remove': remove(*args) except RuntimeError as e: raise XRayError(str(e)) return wrapper def check_jwt(func: Callable) -> Callable: """ Decorator aimed to validate given JWT token """ def check(): """ Check if retrieved JWT token is valid """ is_xray_supported() @wraps(func) def wrapper(*args, **kwargs): """ Wraps func """ token = func(*args, **kwargs) check() return token return wrapper # --------- FUNCTIONS --------- def timestamp() -> int: """ Get current epoch timestamp as int :return: timestamp as int """ return int(time.time()) def prev_date() -> date: """ Pick a yesterday date :return: a datetime.date object """ return date.today() - timedelta(days=1) def date_of_timestamp(ts: int) -> date: """ Get the datetime.date object for given int timestamp :param ts: timestamp :return: datetime.date object """ return date.fromtimestamp(ts) def get_formatted_date() -> str: """ Get a formatted representation of yesterday date :return: str date in the form of dd/mm/YYYY """ return prev_date().strftime("%d/%m/%Y") def get_html_formatted_links(links: List[dict]) -> str: """ HTML formatted links """ html_item = '
{num}) {domain}
' return '\n'.join([html_item.format(num=i, link=v, domain=k) for i, l in enumerate(links, 1) for k, v in l.items()]) def get_text_formatted_links(links: List[dict]) -> str: """ Formatted links """ text_item = '{num}) {dom}: {link}' return '\n'.join([text_item.format(num=i, dom=k, link=v) for i, l in enumerate(links, 1) for k, v in l.items()]) def read_sys_id() -> str: """ Obtain system ID from /etc/sysconfig/rhn/systemid :return: system ID without ID- prefix """ try: tree = ET.parse('/etc/sysconfig/rhn/systemid') root = tree.getroot() whole_id = root.find(".//member[name='system_id']/value/string").text with sentry_sdk.configure_scope() as scope: scope.set_tag("system_id", whole_id) return whole_id.lstrip('ID-') except (OSError, ET.ParseError) as e: raise XRayError(_('Failed to retrieve system_id')) from e def write_sys_id(sys_id: str, agent_system_id_path: str = agent_file) -> None: """ Write system_id into file /usr/share/alt-php-xray/agent_sys_id """ fd = os.open(agent_system_id_path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | os.O_NOFOLLOW, 0o600) try: os.write(fd, sys_id.encode()) finally: os.close(fd) def read_agent_sys_id(agent_system_id_path: str = agent_file) -> str: """ Read system_id saved by agent during its initialization """ try: with open(agent_system_id_path) as agent_sysid_file: return agent_sysid_file.read().strip() except OSError as e: logger.info( "Failed to retrieve agent's system_id, returning real one", extra={'err': str(e)}) return read_sys_id() def read_agent_sys_id() -> str: """ Read system_id saved by agent during its initialization """ try: with open(agent_file) as agent_sysid_file: return agent_sysid_file.read().strip() except OSError as e: logger.info( "Failed to retrieve agent's system_id, returning real one", extra={'err': str(e)}) return read_sys_id() # raise XRayError("Failed to retrieve agent's system_id") from e def is_xray_supported() -> Optional[bool]: """Raise XRayError in case of detected non-supported edition""" is_supported = is_panel_feature_supported(Feature.XRAY) if not is_supported: current_edition = get_cl_edition_readable() current_panel = getCPName() logger.info('Current CloudLinux edition: %s or Control Panel: %s is not supported by X-Ray', str(current_edition), str(current_panel)) raise XRayManagerExit(_('Current CloudLinux edition: {} or ' 'Control Panel: {} is not supported by X-Ray'.format(current_edition, current_panel))) return True @check_jwt def read_jwt_token() -> str: """ Obtain jwt token from /etc/sysconfig/rhn/jwt.token :return: token read """ try: with open(jwt_token_location) as token_file: return token_file.read().strip() except (OSError, IOError): raise XRayError(_('JWT file %s read error') % str(jwt_token_location)) def pkg_version(filepath: str) -> Optional[str]: """Get version of package from file. alt-php-xray supported""" try: with open(filepath) as v_file: version = v_file.read().strip() except OSError: return # remove dist suffix return '.'.join(version.split('.')[:2]) or '0.0-0' def xray_version() -> Optional[str]: """Get version of alt-php-xray package""" return pkg_version('/usr/share/alt-php-xray/version') def sentry_init() -> None: """ Initialize Sentry client shutdown_timeout=0 disables Atexit integration as stated in docs: 'it’s easier to disable it by setting the shutdown_timeout to 0' https://docs.sentry.io/platforms/python/default-integrations/#atexit On the other hand, docs say, that 'Setting this value too low will most likely cause problems for sending events from command line applications' https://docs.sentry.io/error-reporting/configuration/?platform=python#shutdown-timeout """ def add_info(event: dict, hint: dict) -> dict: """ Add extra data into sentry event :param event: original event :param hint: additional data caught :return: updated event """ event['extra'].update({'xray.version': '0.6-48.el9'}) extra_data = event.get('extra', {}) fingerprint = extra_data.get('fingerprint', None) if fingerprint: event['fingerprint'] = [fingerprint] return event def set_tags(sentry_scope): cp_description = get_cp_description() cp_version = cp_description.get('version') if cp_description else None cp_name = cp_description.get('name') if cp_description else None cp_product = 'WP2' if is_wp2_environment() else None tags = (('Control Panel Name', cp_name), ('Control Panel Version', cp_version), ('Control Panel Product', cp_product), ('kernel', platform.release()), ('CloudLinux version', get_rhn_systemid_value("os_release")), ('Cloudlinux edition', get_cl_edition_readable()), ('Architecture', get_rhn_systemid_value("architecture")), ('ip_address', ip_addr()), ('username', getuser()) ) # set_tags does not work in current version of sentry_sdk # https://github.com/getsentry/sentry-python/issues/1344 for tag in tags: sentry_scope.set_tag(*tag) def nope(pending, timeout) -> None: pass def try_get_ip(address_family, private_ip): """ address_family - we can choose constants represent the address (and protocol) families (AF_INET for ipv4 and AF_INET6 for ipv6) private_ip - specify some private ip address. For instance: ipv4 -> 10.255.255.255 or ipv6 -> fc00:: """ with socket(address_family, SOCK_DGRAM) as s: try: s.connect((private_ip, 1)) IP = s.getsockname()[0] except Exception: IP = None return IP def ip_addr() -> str: """ Retrieve server's IP """ ipversions = (AF_INET, '10.255.255.255'), (AF_INET6, 'fc00::') for addr_fam, priv_ip in ipversions: ip = try_get_ip(addr_fam, priv_ip) if ip: return ip return '127.0.0.1' sentry_logging = LoggingIntegration(level=logging.INFO, event_level=logging.WARNING) xray_ver = xray_version() or 'alt-php-xray@0.6-48.el9' silent_atexit = AtexitIntegration(callback=nope) sentry_sdk.init(dsn=sentry_dsn, before_send=add_info, release=xray_ver, max_value_length=10000, integrations=[sentry_logging, silent_atexit]) with sentry_sdk.configure_scope() as scope: scope.user = { "id": get_rhn_systemid_value("system_id") or ip_addr() or get_hostname() or getuser() } try: set_tags(scope) except Exception: pass def configure_logging(logname: str, level=logging_level) -> Optional[str]: """ Configure logging and Sentry :param logname: path to log :return: logpath """ levels = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL } sentry_init() try: handlers = [ logging.FileHandler(filename=logname) ] if level == 'debug': handlers.append(logging.StreamHandler()) logging.basicConfig(level=levels.get(level, logging.INFO), format='%(asctime)s [%(threadName)s:%(name)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', handlers=handlers) except OSError: # dummy logging logging.basicConfig(handlers=[logging.NullHandler()]) return try: os.chmod(logname, 0o600) except PermissionError: pass return logname _safe_username_pattern = re.compile(r'^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,31}$') def validate_system_user(username: str) -> None: """Validate that username is a real system user. Raises ValueError with a clear message if username is empty, has an invalid format, or does not exist in the system user database. """ if not username: raise ValueError('username must not be empty') if not _safe_username_pattern.match(username): raise ValueError(f'Invalid username: {username!r}') try: pwd.getpwnam(username) except KeyError: raise ValueError(f'system user does not exist: {username!r}') from None def build_clwpos_user_cmd(username: str, clwpos_args: list) -> list: """Build subprocess argv for /usr/bin/clwpos-user invocation. Non-CageFS: wraps in sudo -u