# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import configparser
import grp
import json
import os
import pwd
import random
import re
import subprocess
import sys
import uuid
from collections import namedtuple
from functools import wraps
from pathlib import Path
from typing import List, Optional, Tuple # NOQA
import cldetectlib as detect
from cl_proc_hidepid import get_hidepid_typing_from_mounts
from clcommon.clpwd import ClPwd, drop_privileges
from clcommon.cpapi import Feature, is_panel_feature_supported
from clcommon.lib.cledition import CLEditionDetectionError, is_cl_solo_edition
from clcommon.lib.cmt_utils import (
is_client_enabled,
is_cmt_disabled,
)
from clcommon.lib.consts import DEFAULT_JWT_ES_TOKEN_PATH, DISABLE_CMT_FILE
from clcommon.lib.jwt_token import jwt_token_check
from clcommon.lib.whmapi_lib import WhmApiError, WhmApiRequest
from clcommon.utils import (
ExternalProgramFailed,
demote,
is_litespeed_running,
is_ubuntu,
process_is_running,
run_command,
service_is_enabled_and_present,
)
from clcommon.lib.cledition import is_container
from cllimits_validator import LimitsValidator
from clsentry.utils import get_pkg_version
# Possible result types (ChkStatus?)
OK = "OK" # 'PASSED' is better?
FAILED = "FAILED"
SKIPPED = "SKIPPED"
INTERNAL_TEST_ERROR = "INTERNAL_TEST_ERROR"
cldiag_doc_link = "https://docs.cloudlinux.com/cloudlinuxos/command-line_tools/"
cron_cldiag_checkers_param_name = "disabled_cldiag_cron_checkers"
cron_cldiag_section_name = "cldiag_cron"
cl_plus_doc_link = "https://docs.cloudlinux.com/cloudlinux-os-plus/#faq-2"
cl_plus_doc_msg = f"Link to FAQ and troubleshooting {cl_plus_doc_link}"
write_to_support_msg = "Please write to support https://cloudlinux.zendesk.com/ if you can't resolve the issue."
cm_full_name = "Centralized Monitoring"
SKIPPED_ON_SOLO_MSG = "This checker is not supported on CloudLinux OS Solo edition"
SKIPPED_WITHOUT_LVE_MSG = "This checker is not supported in environments without LVE support"
ChkResult = namedtuple(
"ChkResult",
[
"res", # One of predefined checker result types
"msg", # Resulting msg from this checker
],
)
SUEXEC_PATH = {
"cPanel": "/usr/local/apache/bin/suexec",
"cPanel_ea4": "/usr/sbin/suexec",
"DirectAdmin": "/usr/sbin/suexec",
"Plesk": "/usr/sbin/suexec",
"ISPManager": "/usr/sbin/suexec",
"InterWorx": "/usr/sbin/suexec",
"H-Sphere": "/usr/sbin/suexec",
"HostingNG": "/usr/sbin/suexec",
"Unknown": "/usr/sbin/suexec",
}
SUPHP_PATH = {
"cPanel": "/opt/suphp/sbin/suphp",
"cPanel_ea4": "/usr/sbin/suphp",
"DirectAdmin": "/usr/local/suphp/sbin/suphp",
"Plesk": "/usr/sbin/suphp",
"ISPManager": "/usr/sbin/suphp",
"InterWorx": "/usr/sbin/suphp",
"H-Sphere": "/usr/sbin/suphp",
"HostingNG": "/usr/sbin/suphp",
"Unknown": "/usr/sbin/suphp",
}
BINARY_CHECK_PARAMETERS = {}
BINARY_CHECK_PARAMETERS["suphp"] = {
"name": "SuPHP",
"status_function": "detect.get_suPHP_status()",
"location": SUPHP_PATH,
}
BINARY_CHECK_PARAMETERS["suexec"] = {
"name": "SuEXEC",
"status_function": "detect.get_suEXEC_status()",
"location": SUEXEC_PATH,
}
_CLDIAG_USERNAME_FILE = "/var/lve/cldiag_user"
_CLDIAG_TEST_USENAME_PREFIX = "cldiaguser"
def pretty_name(name_of_checker):
def decorator(func):
func.pretty_name = name_of_checker
return func
return decorator
def _formatter(data, error_count, to_json=False):
"""
Formatter of output from all of checkers
"""
msg = "Command for disabling this cron checker: "
cmd_tmp = "cldiag --disable-cron-checkers"
if to_json:
res = {checker_pretty_name: chk_result._asdict() for checker_pretty_name, _, chk_result in data}
res["total_errors"] = error_count
return json.dumps(res)
res = []
for checker_pretty_name, checker_public_name, chk_result in data:
checker_result = f"{checker_pretty_name}:\n {chk_result.res}: " f"{chk_result.msg}"
if checker_public_name is not None:
checker_result = f"{checker_result}\n" f'{msg} "{cmd_tmp} {checker_public_name}"'
res.append(checker_result)
res = "\n\n".join(res + [f"There are {error_count} errors found."])
return res
def runner(checkers, to_json=False, do_exit=True):
if callable(checkers): # allow single checker as input too
checkers = [checkers]
results = []
errors = 0
for f in checkers:
try:
chk_result = f()
except Exception as e:
chk_result = ChkResult(INTERNAL_TEST_ERROR, repr(e))
if chk_result.res in (
FAILED,
INTERNAL_TEST_ERROR,
):
errors += 1
results.append(
(
f.pretty_name,
f.public_name if hasattr(f, "public_name") else None,
chk_result,
)
)
res = _formatter(results, errors, to_json)
if do_exit:
print(res)
sys.exit(errors)
return errors, res
def wrapper(func):
try:
return eval(func)
except AttributeError:
print(f"WARNING\n missing {func} function in cldetectlib.")
return False
def skip_checker_on_cl_solo(f):
@wraps(f)
def checker(*args, **kwargs):
try:
is_solo_edition = is_cl_solo_edition(skip_jwt_check=True)
except CLEditionDetectionError:
is_solo_edition = False
if is_solo_edition:
return ChkResult(SKIPPED, SKIPPED_ON_SOLO_MSG)
return f(*args, **kwargs)
return checker
def skip_check_without_lve(f):
@wraps(f)
def checker(*args, **kwargs):
if not is_panel_feature_supported(Feature.LVE):
return ChkResult(SKIPPED, SKIPPED_WITHOUT_LVE_MSG)
return f(*args, **kwargs)
return checker
@pretty_name("Check cagefs")
def fake_cagefs_checker():
return ChkResult(
SKIPPED,
"Cagefs version is too old. "
"Please run cagefsctl --sanity-check directly "
"or upgrade it to have full cldiag integration",
)
def _is_cmt_allowed_for_server() -> Tuple[bool, Optional[str]]:
"""
Check that a server is cl+, enabled and CM isn't disabled locally
The function returns True if the client has CL+ license, didn't disable CM
localy and activated CM on https://cm.cloudlinux.com. The function also
returns True if we can't read or parse JWT token, because
we want to continue and show to client CM related errors
"""
cm_is_not_activated_msg = f"{cm_full_name} is not activated" " on https://cm.cloudlinux.com"
cm_is_disabled_localy_msg = f"The {cm_full_name} is disabled localy" f' by creating file "{DISABLE_CMT_FILE}"'
no_cl_plus_license_msg = "The server has no CL+ license"
from clsummary.utils import get_client_data_from_jwt_token # pylint: disable=import-outside-toplevel
jwt_token = get_client_data_from_jwt_token()
if jwt_token is not None and not jwt_token["cl_plus"]:
# we do nothing if client doesn't have CL+ license
return False, no_cl_plus_license_msg
# we should check the state of JWT token if we didn't take data from it
if jwt_token is None:
is_valid, message, _ = jwt_token_check()
if not is_valid:
return is_valid, message
if is_cmt_disabled():
# we do nothing if cmt is disabled locally
return False, cm_is_disabled_localy_msg
# we do nothing if client isn't enabled in CM
if not is_client_enabled():
return False, cm_is_not_activated_msg
# We want to continue checks in case of problems with jwt token
# because we want to show cmt related errors to client.
return True, None
def skip_if_cmt_not_used_enabled_allowed(f):
"""
Decorator: Skip check if a server isn't cl+, disabled and
CM is disabled locally
"""
@wraps(f)
def decorated_function(*args, **kwargs):
"""
Decorated function
"""
result, message = _is_cmt_allowed_for_server()
if result:
return f(*args, **kwargs)
return ChkResult(
SKIPPED,
message,
)
return decorated_function
@pretty_name("Check existing JWT token")
def check_jwt_token():
"""
Check an existing JWT token
"""
token_is_absent_msg = " The absence of JWT tokens is normal for the clients with volume license. "
main_msg = (
"Please check for JWT token in path "
f'"{DEFAULT_JWT_ES_TOKEN_PATH}". %sTry running "rhn_check"'
" for getting a new token if it is absent. Server can't "
f"collect and send statistics to {cm_full_name} if you "
f"don't have a correct JWT token. {cl_plus_doc_msg}. "
f"{write_to_support_msg}"
)
token_is_not_cl_plus = "JWT token doesn't have CL+ service"
from clsummary.utils import get_client_data_from_jwt_token # pylint: disable=import-outside-toplevel
if not os.path.exists(DEFAULT_JWT_ES_TOKEN_PATH):
return ChkResult(
SKIPPED,
main_msg % token_is_absent_msg,
)
result, message, _ = jwt_token_check()
if result:
jwt_token = get_client_data_from_jwt_token()
return ChkResult(OK, f'JWT token is valid: "{jwt_token}"')
if message == token_is_not_cl_plus:
return ChkResult(
SKIPPED,
"The server has no CL+ license",
)
main_msg = main_msg % ""
return ChkResult(FAILED, f"{message}. {main_msg}")
def _check_service_state(service_name: str, process_file_path, ) -> ChkResult:
"""
Check that a service is present, enabled and active
:param service_name: name of a service
:param process_file_path: path (or list of paths) to a file which is run
by a service. When a list is given, the service is considered active
if *any* of the paths matches a running process.
"""
is_present, is_enabled = service_is_enabled_and_present(service_name)
paths = [process_file_path] if isinstance(process_file_path, str) else process_file_path
is_active = False
for path in paths:
try:
if process_is_running(path, False):
is_active = True
break
except FileNotFoundError:
continue
if is_present and is_enabled and is_active:
return ChkResult(
OK,
f'Service "{service_name}" is present, enabled and active',
)
messages = []
if not is_present:
messages.append("Service is not present.")
if not is_enabled:
messages.append("Service is not enabled.")
if not is_active:
messages.append("Service is not active.")
return ChkResult(
FAILED,
f"{' '.join(messages)} The server can't collect and send "
f"statistics to {cm_full_name} if service {service_name} isn't "
f"present, enabled and active. {cl_plus_doc_msg}. "
f"{write_to_support_msg}",
)
@pretty_name("Check service `cl_plus_sender` is present, enabled and active")
@skip_checker_on_cl_solo
@skip_if_cmt_not_used_enabled_allowed
def check_cl_plus_sender_service():
"""
Check that service `cl_plus_sender` is present, enabled and active
"""
# todo: turned off until CLPRO-2366 is ready
if is_container():
return ChkResult(SKIPPED, "Centralized Monitoring adaptation is still in progress")
from clsummary.utils import CL_PLUS_SENDER_FILE_PATH # pylint: disable=import-outside-toplevel
service_name = "cl_plus_sender"
return _check_service_state(service_name, CL_PLUS_SENDER_FILE_PATH)
@pretty_name("Check service `node_exporter` is present, enabled and active")
@skip_checker_on_cl_solo
@skip_if_cmt_not_used_enabled_allowed
def check_node_exporter_service():
"""
Check that service `node_exporter` or `cl_node_exporter` is present,
enabled and active
Since it was renamed node_exporter -> cl_node_exporter
let`s handle both cases:
- old `node_exporter` service
- renamed `cl_node_exporter` service
"""
# todo: turned off until CLPRO-2366 is ready
if is_container():
return ChkResult(SKIPPED, "Centralized Monitoring adaptation is still in progress")
base_service_path = "/usr/share/cloudlinux/cl_plus/service/"
process_file_path = "/usr/share/cloudlinux/cl_plus/node_exporter"
# looking for cl_node_exporter on cl6, cl_node_exporter.service on cl7+
if os.path.exists(os.path.join(base_service_path, "cl_node_exporter")) or os.path.exists(
os.path.join(base_service_path, "cl_node_exporter.service")
):
service_name = "cl_node_exporter"
else:
service_name = "node_exporter"
return _check_service_state(service_name, process_file_path)
@pretty_name("Check service `lvestats` is present, enabled and active")
@skip_checker_on_cl_solo
@skip_if_cmt_not_used_enabled_allowed
def check_lvestats_service():
"""
Check that service `lvestats` is present, enabled and active
"""
service_name = "lvestats"
process_file_paths = [
"/usr/share/lve-stats/lvestats-server.py", # Python lve-stats
"/usr/sbin/lvestats-server", # Rust lve-stats3
]
return _check_service_state(service_name, process_file_paths)
@pretty_name("Check that the server has the minimal required packages for correct working of Centralized Monitoring")
@skip_checker_on_cl_solo
@skip_if_cmt_not_used_enabled_allowed
def check_cmt_packages():
"""
Check that the server has minimal required packages for CM
"""
for package_name in ["cl-end-server-tools", "cl-node-exporter"]:
if get_pkg_version(package_name) is None:
return ChkResult(
FAILED,
"System doesn't have the package "
f'"{package_name}". It\'s required for {cm_full_name} '
"feature to work and it usually installed "
f"automatically by cron. {cl_plus_doc_msg}. "
f"{write_to_support_msg}",
)
return ChkResult(OK, "System has the minimal required packages for correct working of Centralized Monitoring")
@pretty_name("Check control panel and it's configuration (for DirectAdmin only)")
def check_cp_diag():
fix_motivation = (
" Fixing the issue will provide CloudLinux support on your control panel. \n"
f"See details: {cldiag_doc_link + '#diag-cp'}"
)
detect.getCP()
cp_name = detect.getCPName()
if cp_name == "Unknown":
return ChkResult(SKIPPED, "Can't detect contol panel")
res_msg = f"Control Panel - {cp_name}; Version {detect.CP_VERSION};"
# we are not setting cloudlinux yes on CL SOLO
if not is_cl_solo_edition(skip_jwt_check=True) and cp_name == "DirectAdmin":
if detect.da_check_options():
return ChkResult(OK, res_msg + ' File "options.conf" is fine')
return ChkResult(FAILED, res_msg + ' File "options.conf" has no line "cloudlinux=yes"' + fix_motivation)
return ChkResult(OK, res_msg)
@pretty_name("Check fs.enforce_symlinksifowner is correctly enabled in sysctl conf")
@skip_check_without_lve
def check_symlinksifowner():
fix_motivation = (
" Fixing that issue makes server more secure against "
"symlink attacks and enables protection of PHP configs "
f"or other sensitive files. \nSee details: {cldiag_doc_link + '#symlinksifowner'}"
)
if detect.is_openvz():
return ChkResult(SKIPPED, "Not supported for OpenVZ environment")
try:
symlinks_if_owner = detect.get_symlinksifowner()
except ExternalProgramFailed as e:
detailed_out = "To see full error run /sbin/sysctl --system"
return ChkResult(
FAILED,
"Some parameter in sysctl config has wrong configuration. "
f"Error: {get_short_error_message(str(e), detailed_out)} It`s recommended to fix it and try again ",
)
if symlinks_if_owner == 2:
return ChkResult(FAILED, "fs.enforce_symlinksifowner = 2" + fix_motivation)
return ChkResult(OK, f"fs.enforce_symlinksifowner = {symlinks_if_owner}")
def binary_check(params):
module_name = params["name"].lower()
link = cldiag_doc_link + "#check-" + module_name
fix_motivation = (
" Fix that issue to be sure that users run their sites inside CageFS and provide stable "
f"work of sites that are using apache {module_name} module. This may improve server security"
f"\nSee details: {link}"
)
if not os.path.exists("/usr/sbin/cagefsctl"):
return ChkResult(SKIPPED, "Cagefs is not installed")
if not wrapper(params["status_function"]):
return ChkResult(SKIPPED, f"{params['name']} is not enabled")
has_jail = detect.check_binary_has_jail(params["location"])
if has_jail is None:
return ChkResult(
SKIPPED,
f"Unable to check {params['name']} module binary for "
"custom control panel. This feature may be added in future updates.",
)
if not has_jail:
return ChkResult(FAILED, "Binary without CageFS jail " + fix_motivation)
return ChkResult(OK, "binary has jail")
@pretty_name("Check suexec has cagefs jail")
def check_suexec():
# Check that LiteSpeed is installed and run
if detect.detect_litespeed() and is_litespeed_running():
return ChkResult(
SKIPPED, "Current PHP selector uses LiteSpeed, which doesn't require the patches in suEXEC bin."
)
return binary_check(BINARY_CHECK_PARAMETERS["suexec"])
@pretty_name("Check suphp has cagefs jail")
def check_suphp():
return binary_check(BINARY_CHECK_PARAMETERS["suphp"])
@pretty_name("Check usepam in sshd config")
def check_use_pam():
fix_motivation = (
"Fix the issue to provide correct work of pam_lve module with sshd and "
f"CageFS ssh sessions\nSee details: {cldiag_doc_link + '#check-usepam'}"
)
check_result = detect.check_SSHd_UsePAM()
if check_result is None:
return ChkResult(SKIPPED, 'Unable to run "/usr/sbin/sshd -T"')
if check_result:
return ChkResult(OK, "Config is fine")
return ChkResult(FAILED, 'There is "usepam no" in "/usr/sbin/sshd -T" output ' + fix_motivation)
@pretty_name("Check the validity of LVE limits on server")
@skip_check_without_lve
def check_lve_limits():
# type: () -> ChkResult
"""
Validate lve limits
"""
doc_link = "https://docs.cloudlinux.com/lve-limits-validation.html"
failed_message = "Invalid LVE limits on server. See doc: " + doc_link
passed_message = "Valid LVE limits on server."
limits_validator = LimitsValidator()
result = limits_validator.validate_existing_limits()
if result is None:
return ChkResult(OK, passed_message)
return ChkResult(FAILED, failed_message + "\n" + result)
@pretty_name("Check compatibility for PHP Selector")
def check_phpselector():
"""
1. mod_ruid is not present
2. suphp
3. mod_lsapi
4. suexec and (fcgi or cgi)
5. litespeed
6. other handlers are not supported
"""
ok_prefix = "It looks OK [%s]"
fail_prefix = (
"Looks like your PHP handler doesn't support CloudLinux PHP Selector "
"and as a result does not work https://docs.cloudlinux.com/cloudlinuxos/limits/#compatibility-matrix [%s]"
f"\nPlease, see: {cldiag_doc_link + '#check-phpselector'} and try to fix issue to have working selector"
)
# do not check for EA3
if not os.path.exists("/etc/cpanel/ea4/is_ea4"):
return ChkResult(SKIPPED, "Not cPanel with EA4, unable to run diagnostics")
# litespeed check
if detect.detect_litespeed() and is_litespeed_running():
return ChkResult(OK, ok_prefix % "Litespeed")
status = {"suexec": False, "suphp": False, "lsapi": False}
handler = None
# check /etc/cpanel/ea4/php.conf for EA4
conf_path = "/etc/cpanel/ea4/php.conf"
if os.path.exists(conf_path):
try:
with open(conf_path, "r", encoding="utf-8") as f:
config = [line.strip() for line in f]
except IOError as e:
err = f"Can not read {conf_path} ({e})"
return ChkResult(FAILED, fail_prefix % err)
# some stub version string
for line in config:
if line.startswith("default:"):
default_ver = (line.split(":")[1]).strip()
break
else:
err = f"{conf_path} config should have the default PHP version"
return ChkResult(FAILED, fail_prefix % err)
for line in config:
if line.startswith(f"{default_ver}:"):
handler = (line.split(":")[1]).strip()
if handler not in ["cgi", "fcgi", "suphp", "lsapi"]:
err = f"doesn't support {handler} handler in ea4/php.conf"
return ChkResult(FAILED, fail_prefix % err)
modules = detect.get_apache_modules()
if modules is not None:
if "ruid2_module" in modules:
return ChkResult(
FAILED,
fail_prefix % "It looks like you use mod_ruid. CloudLinux PHP Selector doesn't work properly with it. "
"How to delete mod_ruid and install mod_suexec in cPanel: "
"https://docs.cloudlinux.com/cloudlinuxos/cloudlinux_os_components/#ins"
"talling-on-cpanel-servers-with-easyapache-4-1"
)
status["suphp"] = "suphp_module" in modules
status["lsapi"] = "lsapi_module" in modules
status["suexec"] = "suexec_module" in modules
if not any([status["suphp"], status["suexec"]]):
return ChkResult(
FAILED,
fail_prefix % "It looks like you do not have mod_suphp or mod_suexec installed. "
"CloudLinux PHP Selector doesn't work properly without it",
)
if status["suphp"] or status["suexec"] and handler in ["suphp", "cgi", "fcgi", "lsapi"]:
current = (
f"php.conf:{handler} with {', '.join(module for module, is_installed in status.items() if is_installed)}"
)
return ChkResult(OK, ok_prefix % current)
err = (
"Some unknown PHP handler, we might not support it "
f"[found handler: {'-' if handler is None else handler} "
f"and apache modules: {', '.join(module for module, is_installed in status.items() if is_installed)}]"
)
return ChkResult(FAILED, fail_prefix % err)
@pretty_name("Check fs.symlinkown_gid")
@skip_check_without_lve
def check_symlinkowngid():
fix_motivation = (
"Fix the issue to provide symlink protection for apache user "
"and as a result make your Web Server more secure. "
f"\nSee details: {cldiag_doc_link + '#check-symlinkowngid'}"
)
ok_res = ChkResult(OK, "Web-server user is protected by Symlink Owner Match Protection")
warn_msg_tpl = "Web-server user '{}' is not in protected group " "specified in {}. " + fix_motivation
symlinkown_gid_file = "/proc/sys/fs/symlinkown_gid"
if detect.is_openvz():
return ChkResult(SKIPPED, "Not supported for OpenVZ environment")
detect.get_apache_gid() # This function fills few module-level variables
apache_uname = detect.APACHE_UNAME
try:
pwd.getpwnam(apache_uname)
except KeyError:
return ChkResult(SKIPPED, f"There is no web-server user [{apache_uname}] on the system. Nothing to check")
try:
# current_symlinkown_gid = int(open(symlinkown_gid_file).read().strip())
with open(symlinkown_gid_file, encoding="utf-8") as f:
current_symlinkown_gid = int(f.read().strip())
except Exception as e:
return ChkResult(FAILED, f"Can't read GID from {symlinkown_gid_file} with error: {e}")
if detect.APACHE_GID == current_symlinkown_gid:
return ok_res
try:
grp_members = grp.getgrgid(current_symlinkown_gid).gr_mem
except KeyError: # no such group
grp_members = []
if grp_members:
# Most often both LiteSpeed and Apache runs under the same user
if apache_uname in grp_members:
return ok_res
return ChkResult(FAILED, warn_msg_tpl.format(apache_uname, symlinkown_gid_file))
@pretty_name("Check existence of all user's packages")
@skip_checker_on_cl_solo
def check_existence_of_all_users_packages():
"""
Return user's packages that do not exist in /var/cpanel/packages/
"""
packages_dir_path = "/var/cpanel/packages/"
users_dir_path = "/var/cpanel/users/"
suspended_dir_path = "/var/cpanel/suspended/"
excluded_packages_names = ["undefined", "default", "cPanel Ticket System temporary user", "Custom"]
user_plan_cmd = ["/bin/grep", "-e", "PLAN=", "-r"]
suspended_users = []
if detect.getCPName() != "cPanel":
return ChkResult(SKIPPED, "should be run on cPanel only")
if not os.listdir(users_dir_path):
return ChkResult(SKIPPED, "no users on this server")
if os.path.exists(suspended_dir_path):
suspended_users = os.listdir(suspended_dir_path)
# getting users packages
with subprocess.Popen(
user_plan_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=users_dir_path,
text=True,
) as proc:
std_out, std_err = proc.communicate()
ret_code = proc.returncode
if ret_code != 0:
msg = f"error getting user's packages: {std_err}"
return ChkResult(FAILED, msg)
try:
# std_out sample: