import asyncio
import functools
import json
import logging
import os
from abc import ABC, abstractmethod
from contextvars import ContextVar
from itertools import chain
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from defence360agent.contracts.config import Core
from defence360agent.subsys.panels.base import (
AbstractPanel,
ModsecVendorsError,
PanelException,
)
from defence360agent.subsys.web_server import (
apache_modules,
apache_running,
litespeed_running,
)
from defence360agent.utils import async_lru_cache, finally_happened
from im360 import files
from im360.contracts.config import RBL_WHITELIST_FILE, Modsec
from im360.subsys.shared_disabled_rules import (
get_shared_disabled_modsec_rules_ids,
)
from im360.utils import ModSecLock
logger = logging.getLogger(__name__)
APACHE = "apache"
NGINX = "nginx"
LITESPEED = "litespeed"
OPENLITESPEED = "openlitespeed"
# imunify360-(full|minimal)-WEBSERVER-PANEL
MODSEC_NAME_TEMPLATE = "imunify360-{ruleset_suffix}-{webserver}-{panel}"
class ModsecImunifyVendorNotInstalled(ModsecVendorsError):
"""
Raises when there is no imunify vendor installed
"""
pass
class ModsecNotInstalledVendors(ModsecVendorsError):
"""
Raises when there is no vendors installed at all
"""
pass
class _ModSecLocker:
LOCK = ModSecLock()
def __call__(self, coro):
@functools.wraps(coro)
async def wrapper(*args, **kwargs):
async with self.LOCK:
return await coro(*args, **kwargs)
return wrapper
def is_modsec_locked():
return _ModSecLocker.LOCK.locked()
use_modsec_lock = _ModSecLocker()
def skip_if_not_installed_modsec(func):
"""Skip the call if ModSecurity is not installed."""
async def wrapper(cls, *args, **kwargs):
if not await cls.installed_modsec():
logger.warning("ModSecurity is not installed")
return
return await func(cls, *args, **kwargs)
return wrapper
async def _get_web_server() -> Optional[str]:
async def get_web_server():
if litespeed_running():
return LITESPEED
elif apache_running():
return APACHE
return None
result = await finally_happened(get_web_server, max_tries=15, delay=1)
if result is None:
logger.warning("Couldn't detect any Web Server running")
return result
class PanelInterface(AbstractPanel):
@abstractmethod
async def _get_all_admin_emails(self) -> list:
pass
async def get_admin_emails(self) -> list:
"""
Return admin contact emails
"""
try:
return await self._get_all_admin_emails()
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning(
"Something went wrong while getting admin email: {}".format(
str(e)
)
)
return []
def http_ports(self) -> Set[int]:
"""
Return panel's http ports
"""
return set()
def https_ports(self) -> Set[int]:
"""
Return panel's https ports
"""
return set()
def remoteip_supported(self) -> bool:
return False
async def get_web_server(self) -> Optional[str]:
return await _get_web_server()
def get_webshield_protected_ports(self):
return dict()
class ModSecurityInterface(ABC):
REBUILD_HTTPDCONF_CMD = None
APP_BASED_EXCLUDE_CONF_NAME = "i360-app-based-excludes.conf"
#: whether ModSecurity vendors are being installed
installing_settings_var: ContextVar[bool] = ContextVar(
"installing_settings", default=False
)
@classmethod
@abstractmethod
async def installed_modsec(cls):
"""
Check if ModSecurity installed and enabled
:return: bool
"""
pass
@abstractmethod
async def _install_settings(self, reload_wafd=True):
pass
@skip_if_not_installed_modsec
async def install_settings(self, reload_wafd=True):
"""
Install ModSecurity vendors and patch ModSecurity config
"""
token = self.installing_settings_var.set(True)
try:
await self._install_settings(reload_wafd=reload_wafd)
finally:
self.installing_settings_var.reset(token)
# clean cache to avoid duplicate install/update attempts
await self.invalidate_installed_vendors_cache()
@abstractmethod
async def reset_modsec_directives(self):
"""
Reset ModSecurity settings to values chosen by Imunify360
"""
@abstractmethod
async def reset_modsec_rulesets(self):
"""
Reset ModSecurity rulesets to values chosen by Imunify360
"""
@abstractmethod
async def revert_settings(self, reload_wafd=True):
"""
Uninstall previously installedModSecurity vendors and
revert ModSecurity config
"""
pass
@classmethod
@abstractmethod
def detect_cwaf(cls):
"""
Detects Comodo ModSecurity Rule Set installed as Plugin
:return: bool:
"""
pass
@classmethod
@abstractmethod
async def modsec_vendor_list(cls) -> list:
"""Return a list of installed ModSecurity vendors."""
pass
@classmethod
@abstractmethod
async def enabled_modsec_vendor_list(cls) -> list:
"""Return a list of enabled ModSecurity vendors."""
pass
@classmethod
@abstractmethod
async def modsec_get_directive(cls, directive_name, default=None):
"""
Example:
>>> modsec_interface.modsec_get_directive("SecRuleEngine")
'Off'
"""
@classmethod
@abstractmethod
async def build_vendor_file_path(cls, vendor: str, filename: str) -> Path:
"""Return path to a specified vendor file"""
raise NotImplementedError
@classmethod
async def build_version_file_path(cls, vendor: str) -> Path:
"""Return path to Imunify360 vendor VERSION file"""
return await cls.build_vendor_file_path(vendor, "VERSION")
@classmethod
async def get_i360_vendor_name(cls) -> str:
"""
Return a name of Imunify360 ModSecurity vendor.
"""
try:
installed_vendors = await cls.modsec_vendor_list()
except PanelException as e:
raise ModsecVendorsError(str(e))
if not installed_vendors:
raise ModsecNotInstalledVendors("No vendors installed")
name = next(
(v for v in installed_vendors if v.startswith(Core.PRODUCT)), None
)
if name is None:
raise ModsecImunifyVendorNotInstalled(
"Imunify360 vendor is not installed, all vendors are :%s",
" ".join(installed_vendors),
)
return name
@classmethod
async def get_i360_vendor_version(cls) -> str:
"""
Return a version of the Imunify360 ModSecurity vendor.
"""
vendor = await cls.get_i360_vendor_name()
version_file = await cls.build_version_file_path(vendor)
try:
with version_file.open() as f:
return f.read().strip()
except OSError as err:
raise ModsecVendorsError(
"Cannot read Imunify360 vendor version: {}".format(err)
)
@classmethod
async def invalidate_installed_vendors_cache(cls):
if hasattr(cls.modsec_vendor_list, "cache_clear"):
cls.modsec_vendor_list.cache_clear() # NOSONAR Pylint:E1101
if hasattr(cls._get_release_info_from_file, "cache_clear"):
cls._get_release_info_from_file.cache_clear() # noqa NOSONAR Pylint:E1101
if hasattr(cls.enabled_modsec_vendor_list, "cache_clear"):
cls.enabled_modsec_vendor_list.cache_clear() # noqa NOSONAR Pylint:E1101
@classmethod
@abstractmethod
async def _apply_modsec_files_update(cls):
"""
:param list updates: [
{
"groups": [
"cpanel",
"litespeed"
],
...
"name": "imunify360-litespeed-meta",
"url": "https://files.imunify360.com/.../meta_imunify360_litespeed.yaml"
},
{
"groups": [
"cpanel",
"apache",
"litespeed"
],
...
"name": "imunify360-rules-meta",
"url": "https://files.imunify360.com/.../meta_imunify360_rules.yaml"
}]
""" # noqa: E501
pass
@classmethod
async def apply_modsec_files_update(cls):
await cls._apply_modsec_files_update()
await cls.invalidate_installed_vendors_cache()
@classmethod
@abstractmethod
def get_audit_log_path(cls):
"""
Returns path to ModSecurity audit log file
:return: srt:
"""
pass
@classmethod
@abstractmethod
def get_audit_logdir_path(cls):
"""
Returns path to ModSecurity audit log dir for concurrent mode
:return: srt:
"""
pass
@classmethod
@abstractmethod
def write_global_disabled_rules(cls, rule_list) -> bool:
"""
Disable mod_security rules on global level.
Rebuild httpd conf and restart httpd server is caller
method responsibility.
:param list rule_list: list of rules to disable
:return: True if config was changed, False otherwise
"""
@classmethod
@abstractmethod
async def sync_global_disabled_rules(cls, rule_list) -> bool:
"""
Disable mod_security rules on global level
This method should be idempotent
:param list rule_list: list of rules to disable
:return: True if config was changed, False otherwise
"""
@classmethod
@abstractmethod
async def sync_disabled_rules_for_domains(
cls, domain_rules_map: Dict[str, list]
):
"""
Disable mod_security rules on domain level for each domain
specified in a map. This method should be idempotent.
"""
@classmethod
def generate_disabled_rules_config(cls, rule_list):
tpl = """