import asyncio
import errno
import logging
import os
import pwd
import time
from collections import defaultdict
from collections.abc import Awaitable, Callable
from distutils.version import LooseVersion
from pathlib import Path
from defence360agent.api import inactivity
from defence360agent.contracts.config import (
MalwareScanScheduleInterval as Interval,
SystemConfig,
ANTIVIRUS_MODE,
choose_value_from_config,
)
from defence360agent.files import Index, WP_RULES
from defence360agent.sentry import log_message
from defence360agent.utils.fd_ops import (
open_dir_no_symlinks,
open_nofollow,
rmtree_fd,
safe_dir,
)
from defence360agent.contracts.config import Wordpress
from defence360agent.wordpress.wp_rules import (
get_wp_rules_data,
get_wp_ruleset_version,
)
from defence360agent.model.wordpress import WordpressSite, WPSite
from defence360agent.model.wp_disabled_rule import WPDisabledRule
from defence360agent.wordpress import cli, telemetry
from defence360agent.wordpress.constants import PLUGIN_VERSION_FILE
from defence360agent.wordpress.utils import (
_validate_preset,
calculate_next_scan_timestamp,
clear_get_cagefs_enabled_users_cache,
ensure_site_data_directory,
format_php_with_embedded_json,
get_imunify_package_versions,
get_last_scan,
get_malware_history,
prepare_plugin_config,
prepare_scan_data,
write_plugin_data_file_atomically,
)
from defence360agent.wordpress.site_repository import (
clear_manually_deleted_flag,
delete_site,
get_installed_sites_by_domains,
get_outdated_sites,
get_sites_for_user,
get_sites_to_adopt,
get_sites_to_install,
get_sites_to_mark_as_manually_deleted,
get_installed_sites,
insert_installed_sites,
mark_site_as_manually_deleted,
update_site_identity,
update_site_version,
)
from defence360agent.wordpress.proxy_auth import setup_site_authentication
logger = logging.getLogger(__name__)
# Default when waf_enabled is missing from config (old schema).
# True keeps WAF enabled — no behavior change on upgrade.
_WAF_ENABLED_DEFAULT = True
# Default when ai_bot_protection is missing from config (old schema on
# a host where sibling packages haven't shipped the field yet). False
# because the feature is opt-in — if we can't determine admin intent,
# stay off rather than silently activate request-blocking logic.
_AI_BOT_PROTECTION_DEFAULT = False
_AI_BOT_PROTECTION_PRESET_DEFAULT = "balanced"
def _get_global_waf_enabled() -> bool:
"""Read WORDPRESS.waf_enabled from config, defaulting to True.
Returns _WAF_ENABLED_DEFAULT when the config key is missing (old
schema without waf_enabled), so WAF stays enabled — no behavior
change on upgrade.
"""
try:
return bool(Wordpress.WAF_ENABLED)
except KeyError:
return _WAF_ENABLED_DEFAULT
def _get_global_ai_bot_protection() -> bool:
"""Read WORDPRESS.ai_bot_protection from config, defaulting to False.
Returns _AI_BOT_PROTECTION_DEFAULT when the config key is missing
— e.g. the ai_bot_protection field hasn't rolled out to this
install's imunify360 yet, or a sibling package is still on an
older schema. Keeps the feature off in all ambiguous cases.
"""
try:
return bool(Wordpress.AI_BOT_PROTECTION)
except KeyError:
return _AI_BOT_PROTECTION_DEFAULT
def _get_global_ai_bot_protection_preset() -> str:
"""Read WORDPRESS.ai_bot_protection_preset from config, defaulting to
"balanced".
Two layers of safety: KeyError on a missing key (older schema, agent
upgrade in progress) and _validate_preset() on the value itself
(hand-edited override file, future preset rolled in via a sibling
package this version doesn't recognise). Both fall back to the same
canonical default so all layers — schema, agent, plugin — agree.
"""
try:
raw = Wordpress.AI_BOT_PROTECTION_PRESET
except KeyError:
return _AI_BOT_PROTECTION_PRESET_DEFAULT
return _validate_preset(raw)
def _is_waf_enabled_for_user_sync(username: str) -> bool:
"""Check if WAF is enabled for a user, respecting global-off precedence."""
if not _get_global_waf_enabled():
return False
try:
value, _ = choose_value_from_config(
"WORDPRESS", "waf_enabled", username=username
)
except KeyError:
return _WAF_ENABLED_DEFAULT
return _WAF_ENABLED_DEFAULT if value is None else bool(value)
async def is_waf_enabled_for_user(username: str) -> bool:
"""Async wrapper — runs config file I/O in executor."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, _is_waf_enabled_for_user_sync, username
)
COMPONENTS_DB_PATH = Path(
"/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3"
)
def _get_user_schedule_config(username: str, admin_config: SystemConfig):
"""
Get user-specific schedule configuration with lazy import fallback.
Returns default values if imav.malwarelib is not available.
"""
try:
from imav.malwarelib.plugins.schedule_watcher import (
get_user_schedule_config,
)
return get_user_schedule_config(username, admin_config)
except ImportError:
logger.debug(
"imav.malwarelib not available, returning default schedule config"
)
return Interval.NONE, 0, 1, 0
def get_updated_wp_rules_data(index: Index) -> dict | None:
"""
Retrieve WordPress rules with ANTIVIRUS_MODE handling and global disable filtering.
In ANTIVIRUS_MODE, all rules are set to monitoring mode ("pass").
Globally disabled rules are filtered out entirely — they should not
appear in rules.php. Domain-specific disables are handled separately
via disabled-rules.php.
Args:
index: The Index object used to locate the wp-rules.zip file.
Returns:
The parsed wp-rules data with mode adjusted for ANTIVIRUS_MODE
and globally disabled rules removed, or None if rules cannot be loaded.
"""
rules_data = get_wp_rules_data(index)
if rules_data is None:
return None
if ANTIVIRUS_MODE:
# all rules will be in monitoring mode only for AV and AV+
for cve, params in rules_data.items():
params["mode"] = "pass"
# Filter out globally disabled rules — these are excluded from rules.php
globally_disabled = set(WPDisabledRule.get_global_disabled())
if globally_disabled:
rules_data = {
cve: params
for cve, params in rules_data.items()
if cve not in globally_disabled
}
return rules_data
def clear_caches():
"""Clear all WordPress-related caches."""
clear_get_cagefs_enabled_users_cache()
cli.clear_get_content_dir_cache()
def site_search(items: dict, user_info: pwd.struct_passwd, matcher) -> dict:
# Get all WordPress sites for the user (the main site is always last)
user_sites = get_sites_for_user(user_info)
result = {path: [] for path in user_sites}
for item in items:
# Find all matching sites for this item
matching_sites = [path for path in user_sites if matcher(item, path)]
if matching_sites:
# Find the most specific (longest) matching path
most_specific_site = max(matching_sites, key=len)
result[most_specific_site].append(item)
return result
async def _get_scan_data_for_user(
sink, user_info: pwd.struct_passwd, admin_config: SystemConfig
):
# Get the last scan data
last_scan = await get_last_scan(sink, user_info.pw_name)
# Extract the last scan date
last_scan_time = last_scan.get("scan_date", None)
# Get user-specific schedule configuration
interval, hour, day_of_month, day_of_week = _get_user_schedule_config(
user_info.pw_name, admin_config
)
next_scan_time = None
if interval != Interval.NONE:
next_scan_time = calculate_next_scan_timestamp(
interval, hour, day_of_month, day_of_week
)
# Get the malware history for the user
malware_history = get_malware_history(user_info.pw_name)
# Split malware history by site. This part relies on the main site being the last one in the list.
# Without this all malware could be attributed to the main site.
malware_by_site = site_search(
malware_history,
user_info,
lambda item, path: item["resource_type"] == "file"
and item["file"].startswith(path),
)
return last_scan_time, next_scan_time, malware_by_site
async def _send_telemetry_task(coro, semaphore: asyncio.Semaphore):
async with semaphore:
try:
await coro
except Exception as e:
logger.error(f"Telemetry task failed: {e}")
async def process_telemetry_tasks(coroutines: list, concurrency=10):
"""
Process a list of telemetry coroutines with a concurrency limit.s
"""
if not coroutines:
return
semaphore = asyncio.Semaphore(concurrency)
tasks = [
asyncio.create_task(_send_telemetry_task(coro, semaphore))
for coro in coroutines
]
try:
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"Some telemetry tasks failed: {e}")
async def load_wp_rules_php():
"""
Load WordPress rules from the index and format them as PHP.
Returns:
str or None: PHP-formatted rules data, or None if rules could not be loaded.
"""
try:
wp_rules_index = Index(WP_RULES, integrity_check=False)
await wp_rules_index.update()
wp_rules_data = get_updated_wp_rules_data(wp_rules_index)
except Exception as e:
logger.warning(
"Failed to load wp-rules index: %s, skipping rules installation",
e,
)
return None
if not wp_rules_data:
logger.warning(
"valid WordPress rules not found, skipping rules installation"
)
return None
# Get version and create ruleset dict with version and rules
wp_rules_version = get_wp_ruleset_version(wp_rules_index)
ruleset_dict = {
"version": wp_rules_version,
"rules": wp_rules_data,
}
return format_php_with_embedded_json(ruleset_dict)
async def install_everywhere(sink):
"""Install the imunify-security plugin for all sites where it is not installed."""
sites = get_sites_to_install()
installer = WordPressSiteInstaller(sink, sites)
return await installer.run()
async def adopt_found_sites(sink):
"""
Adopt WordPress sites where the plugin is installed but not tracked in our database
or flagged as manually removed.
This handles scenarios like:
- Sites copied/migrated from another location
- Sites migrated from another server
- Sites where the manually_deleted flag was incorrectly set (past bugs)
- Sites where the user installed the plugin from wordpress.org
"""
sites = get_sites_to_adopt()
processor = WordPressSiteAdopter(sink, sites)
return await processor.run()
def get_latest_plugin_version() -> str:
"""Get the latest version of the imunify-security plugin from the version file."""
try:
if not PLUGIN_VERSION_FILE.exists():
logger.error(
"Plugin version file does not exist: %s", PLUGIN_VERSION_FILE
)
return None
return PLUGIN_VERSION_FILE.read_text().strip()
except Exception as e:
logger.error("Failed to read plugin version file: %s", e)
return None
async def update_everywhere(sink):
"""Update the imunify-security plugin on all sites where it is installed."""
latest_version = get_latest_plugin_version()
if not latest_version:
logger.error("Could not determine latest plugin version")
return
logger.info(
"Updating imunify-security wp plugin to the latest version %s",
latest_version,
)
updated = set()
telemetry_coros = []
with inactivity.track.task("wp-plugin-update"):
try:
# Get sites with outdated versions
outdated_sites = get_outdated_sites(latest_version)
logger.info(f"Found {len(outdated_sites)} outdated sites")
if not outdated_sites:
return
# Create SystemConfig once for all users
admin_config = SystemConfig()
versions = await get_imunify_package_versions()
# Group sites by user id
sites_by_user = defaultdict(list)
for site in outdated_sites:
sites_by_user[site.uid].append(site)
# Process each user's sites
for uid, sites in sites_by_user.items():
try:
user_info = pwd.getpwuid(uid)
username = user_info.pw_name
except Exception as error:
logger.error(
"Failed to get username for uid=%d. error=%s",
uid,
error,
)
continue
# Get scan data once for all sites of this user
(
last_scan_time,
next_scan_time,
malware_by_site,
) = await _get_scan_data_for_user(
sink, user_info, admin_config
)
plugin_config = prepare_plugin_config(username)
for site in sites:
if await remove_site_if_missing(sink, site):
continue
try:
# Check if site still exists
if not await cli.is_wordpress_installed(site):
logger.info(
"WordPress site no longer exists: %s", site
)
continue
# Prepare scan data
scan_data = prepare_scan_data(
last_scan_time,
next_scan_time,
username,
site,
malware_by_site,
versions=versions,
)
# Update the scan data file
await update_scan_data_file(site, scan_data)
# Keep plugin_config.php fresh alongside scan_data
# — covers the case where a ConfigUpdate event
# was missed (plugin re-installed after the
# toggle, first scan after an upgrade, etc).
await update_plugin_config_file(site, plugin_config)
# Now update the plugin
await cli.plugin_update(site)
updated.add(site)
# Get the version after update
version = await cli.get_plugin_version(site)
if version:
# Store original version for comparison
original_version = site.version
# Update the database with the new version
update_site_version(site, version)
# Create a new WPSite with updated version
site = site.build_with_version(version)
# Determine if this is a downgrade
is_downgrade = LooseVersion(
version
) < LooseVersion(original_version)
# Prepare telemetry
telemetry_coros.append(
telemetry.send_event(
sink=sink,
event=(
"downgraded_by_imunify"
if is_downgrade
else "updated_by_imunify"
),
site=site,
version=version,
)
)
except Exception as error:
logger.error(
"Failed to update plugin on site=%s error=%s",
site,
error,
)
logger.info(
"Updated imunify-security wp plugin on %d sites",
len(updated),
)
except asyncio.CancelledError:
logger.info(
"Update of imunify-security wp plugin was cancelled. Plugin"
" was updated on %d sites",
len(updated),
)
except Exception as error:
logger.error(
"Error occurred during plugin update. error=%s", error
)
raise
finally:
# Send telemetry
await process_telemetry_tasks(telemetry_coros)
async def delete_plugin_files(site: WPSite):
data_dir = await cli.get_data_dir(site)
# Open both target and parent dirs with symlink protection before
# performing any destructive operations.
try:
dir_fd = open_dir_no_symlinks(data_dir)
except OSError as exc:
if exc.errno == errno.ENOENT:
return # directory does not exist — nothing to delete
if exc.errno in (errno.ELOOP, errno.ENOTDIR):
logger.warning(
"Skipping rmtree: data directory %s is a symlink", data_dir
)
return
raise
try:
with safe_dir(data_dir.parent) as parent_fd:
try:
await asyncio.to_thread(rmtree_fd, dir_fd)
finally:
os.close(dir_fd)
dir_fd = -1
# Remove the now-empty directory via the parent fd.
os.rmdir(data_dir.name, dir_fd=parent_fd)
except BaseException:
if dir_fd >= 0:
os.close(dir_fd)
raise
async def remove_from_single_site(site: WPSite, sink, telemetry_coros) -> int:
"""
Remove the imunify-security plugin from a single site, including all cleanup and telemetry.
Returns the number of affected sites (should be 1 if deletion was successful).
This function is intended to be protected with asyncio.shield to ensure it completes even if the parent task is cancelled.
"""
try:
# Check if site is still installed and accessible using WP CLI
is_installed = await cli.is_plugin_installed(site)
if not is_installed:
# Plugin is no longer installed. It was removed manually by the user.
await process_manually_deleted_plugin(
site, time.time(), sink, telemetry_coros
)
return 0
# Get the version of the plugin (for telemetry data)
version = await cli.get_plugin_version(site)
# Uninstall the plugin from WordPress site.
await cli.plugin_uninstall(site)
# Delete the data files from the site.
await delete_plugin_files(site)
# Delete the site from database.
affected = delete_site(site)
# Send telemetry for successful uninstall
telemetry_coros.append(
telemetry.send_event(
sink=sink,
event="uninstalled_by_imunify",
site=site,
version=version,
)
)
return affected
except Exception as error:
# Log any error that occurs during the removal process
logger.error("Failed to remove plugin from %s %s", site, error)
return 0
async def remove_all_installed(sink):
"""Remove the imunify-security plugin from all sites where it is installed."""
logger.info("Deleting imunify-security wp plugin")
telemetry_coros = []
affected = 0
with inactivity.track.task("wp-plugin-removal"):
try:
clear_caches()
to_remove = get_installed_sites()
for site in to_remove:
try:
affected += await asyncio.shield(
remove_from_single_site(site, sink, telemetry_coros)
)
except asyncio.CancelledError:
logger.info(
"Deleting imunify-security wp plugin was cancelled."
" Plugin was deleted from %d sites (out of %d)",
affected,
len(to_remove),
)
except Exception as error:
logger.error("Error occurred during plugin deleting. %s", error)
raise
finally:
logger.info(
"Removed imunify-security wp plugin from %s sites",
affected,
)
# send telemetry
await process_telemetry_tasks(telemetry_coros)
async def process_manually_deleted_plugin(site, now, sink, telemetry_coros):
"""
Process the manually deleted plugin for a single site.
Args:
site: The site to process.
now: The current time.
sink: The telemetry/event sink.
telemetry_coros: The list of telemetry coroutines to add the event to.
The process includes:
- marking the site as manually deleted in the database
- removing plugin data files
- sending telemetry for manual removal
"""
try:
# Mark the site as manually deleted in the database
mark_site_as_manually_deleted(site, now)
# Remove plugin data files
await delete_plugin_files(site)
# Send telemetry for manual removal
telemetry_coros.append(
telemetry.send_event(
sink=sink,
event="removed_by_user",
site=site,
version=site.version,
)
)
except Exception as error:
logger.error(
"Failed to process manually deleted plugin for site=%s error=%s",
site,
error,
)
async def tidy_up_manually_deleted(
sink, freshly_installed_sites: set[WPSite] = None
):
"""
Tidy up sites that have been manually deleted by the user.
Args:
sink: The telemetry/event sink.
freshly_installed_sites: Optional set of sites that were just installed and should be excluded
from being marked as manually deleted to avoid race conditions.
"""
telemetry_coros = []
try:
to_mark_as_manually_removed = get_sites_to_mark_as_manually_deleted(
freshly_installed_sites
)
if to_mark_as_manually_removed:
now = time.time()
for site in to_mark_as_manually_removed:
await process_manually_deleted_plugin(
site, now, sink, telemetry_coros
)
except Exception as error:
logger.error("Error occurred during site tidy up. %s", error)
finally:
if telemetry_coros:
await process_telemetry_tasks(telemetry_coros)
async def update_data_on_sites(sink, sites: list[WPSite]):
if not sites:
return
# Create SystemConfig once for all users
admin_config = SystemConfig()
versions = await get_imunify_package_versions()
# Group sites by user id
sites_by_user = defaultdict(list)
for site in sites:
sites_by_user[site.uid].append(site)
# Now iterate over the grouped sites
for uid, sites in sites_by_user.items():
try:
user_info = pwd.getpwuid(uid)
username = user_info.pw_name
except Exception as error:
logger.error(
"Failed to get username for uid=%d. error=%s",
uid,
error,
)
continue
(
last_scan_time,
next_scan_time,
malware_by_site,
) = await _get_scan_data_for_user(sink, user_info, admin_config)
plugin_config = prepare_plugin_config(username)
for site in sites:
if await remove_site_if_missing(sink, site):
continue
try:
# Prepare scan data
scan_data = prepare_scan_data(
last_scan_time,
next_scan_time,
username,
site,
malware_by_site,
versions=versions,
)
# Update the scan data file
await update_scan_data_file(site, scan_data)
# Keep plugin_config.php fresh alongside scan_data.
await update_plugin_config_file(site, plugin_config)
except Exception as error:
logger.error(
"Failed to update site data on site=%s error=%s",
site,
error,
)
async def _write_json_php_data_file(
site: WPSite, filename: str, data: dict
) -> None:
"""Embed ``data`` as JSON inside a PHP file at ``