opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
imav
/
wordpress
➕ New
📤 Upload
✎ Editing:
site_repository.py
← Back
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import logging import pwd from pathlib import Path from peewee import SqliteDatabase, OperationalError from defence360agent.utils import retry_on from imav.model.wordpress import WPSite, WordpressSite from imav.wordpress.constants import PLUGIN_SLUG logger = logging.getLogger(__name__) COMPONENTS_DB_PATH = Path( "/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3" ) def get_sites_by_path(path: str) -> list[WPSite]: """ Get a list of WordPress sites that match the given path. Args: path: The path to search for WordPress sites. Note: The same WordPress site (real_path) can appear in multiple reports if it was scanned directly and also as part of a parent folder scan. We use only the entry from the latest report for each real_path. Returns: A list of WPSite objects that match the path. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report WHERE domain IS NOT NULL AND domain != '' GROUP BY dir ) ), -- Get all WordPress sites with their report IDs all_wp_sites AS ( SELECT wp.real_path, lr.domain, lr.uid, lr.id as report_id FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL AND wp.real_path LIKE '{path}%' ) -- For each real_path, keep only the entry from the latest report SELECT real_path, domain, uid FROM all_wp_sites WHERE (real_path, report_id) IN ( SELECT real_path, MAX(report_id) FROM all_wp_sites GROUP BY real_path ) """ ) return [ WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() ] def get_sites_for_user(user_info: pwd.struct_passwd) -> list[str]: """ Get a set of paths to WordPress sites belonging to a particular user. Paths are sorted by their length to make sure that the main site is the last one in the list. The data is pulled from the app-version-detector database. Args: user_info: The user info with ID to get sites for. Returns: A list of paths to WordPress sites. """ if not COMPONENTS_DB_PATH.exists() or user_info is None: logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list() if user_info is None: logger.error( "No user info provided for getting sites", ) return list() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT MAX(id) as id FROM report WHERE uid = {user_info.pw_uid} GROUP BY dir ) SELECT wp.real_path FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL GROUP BY wp.real_path ORDER BY length(wp.real_path) DESC """ ) return [row[0] for row in cursor.fetchall()] def get_sites_without_plugin() -> set[WPSite]: """ Get a set of wp sites where imunify-security plugin is not installed. The data is pulled from the app-version-detector database. Note: The same WordPress site (real_path) can appear in multiple reports if it was scanned directly and also as part of a parent folder scan. We use only the entry from the latest report for each real_path. Returns: A set of WPSite objects where the plugin is not installed. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return set() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report WHERE domain IS NOT NULL AND domain != '' GROUP BY dir ) ), -- Get all WordPress sites with their report IDs all_wp_sites AS ( SELECT wp.id as wp_id, wp.real_path, lr.domain, lr.uid, lr.id as report_id FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL ), -- For each real_path, keep only the entry from the latest report latest_wp_sites AS ( SELECT wp_id, real_path, domain, uid, report_id FROM all_wp_sites WHERE (real_path, report_id) IN ( SELECT real_path, MAX(report_id) FROM all_wp_sites GROUP BY real_path ) ) SELECT real_path, domain, uid FROM latest_wp_sites lws WHERE NOT EXISTS ( SELECT 1 FROM apps AS plugin WHERE plugin.parent_id = lws.wp_id AND plugin.title = 'wp_plugin_{PLUGIN_SLUG.replace("-", "_")}' ) """ ) return { WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() } def get_sites_to_install() -> set[WPSite]: """ Get a set of WordPress sites where we need to install the plugin. This is determined by finding sites that don't have the plugin installed and are not already tracked in our database. Returns: A set of WPSite objects where the plugin needs to be installed. """ sites_without_plugin = get_sites_without_plugin() existing_sites = { WPSite.from_wordpress_site(r) for r in WordpressSite.select() } return sites_without_plugin - existing_sites def insert_installed_sites(sites: set[WPSite]) -> None: """ Insert a set of installed WordPress sites into the database. This is used to track which sites have the plugin installed. Args: sites: A set of WPSite objects representing sites where the plugin was installed. """ if not sites: return WordpressSite.insert_many( [ { "domain": site.domain, "docroot": site.docroot, "uid": site.uid, "version": site.version, "manually_deleted_at": None, } for site in sites ] ).execute() def get_outdated_sites(latest_version: str) -> list[WPSite]: """ Get a list of WordPress sites that have outdated plugin versions. Args: latest_version: The latest available plugin version to compare against. Returns: A list of WPSite objects that have versions older than latest_version. """ if not latest_version: logger.error( "Cannot get outdated sites without a valid latest version" ) return [] return [ WPSite.from_wordpress_site(r) for r in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null(), WordpressSite.version != latest_version, ) ] def mark_site_as_manually_deleted(site: WPSite, timestamp: float) -> None: """ Mark a WordPress site as manually deleted in the database. Args: site: The WPSite object to mark as deleted timestamp: The timestamp when the site was deleted """ logger.info( "Mark site %s as manually deleted at %s (WP-Plugin removed)", site, timestamp, ) ( WordpressSite.update(manually_deleted_at=timestamp) .where(WordpressSite.docroot == site.docroot) .execute() ) def get_sites_to_mark_as_manually_deleted( freshly_installed_sites: set[WPSite] = None, ) -> set[WPSite]: """ Get a set of WordPress sites that should be marked as manually deleted. These are sites that are in our database but no longer have the plugin installed. Args: freshly_installed_sites: Optional set of sites that were just installed and should be excluded from being marked as manually deleted to avoid race conditions. Returns: set[WPSite]: A set of WordPress sites that should be marked as manually deleted """ # Get sites without plugin from AVD database sites_without_plugin = get_sites_without_plugin() # Get sites from our database that haven't been marked as manually deleted sites_from_db = { WPSite.from_wordpress_site(r) for r in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null() ) } # Calculate intersection of sites that are in our DB but no longer have the plugin sites_to_mark = sites_without_plugin & sites_from_db # Filter out freshly installed sites to avoid race condition with AppVersionDetector if freshly_installed_sites: sites_to_mark = sites_to_mark - freshly_installed_sites return sites_to_mark def update_site_version(site: WPSite, version: str) -> None: """ Update the version of a WordPress site in the database. Args: site: The WPSite object to update version: The new version to set """ WordpressSite.update(version=version).where( WordpressSite.docroot == site.docroot ).execute() def get_installed_sites() -> list[WPSite]: """ Get a list of active installed WordPress sites. These are sites that haven't been marked as manually deleted. Returns: A list of WPSite objects representing non-deleted sites. """ return [ WPSite.from_wordpress_site(site) for site in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null(True) ) ] async def sleep_on_error(exception, attempt): await asyncio.sleep(0.5) @retry_on( OperationalError, max_tries=3, silent=True, log=False, on_error=sleep_on_error, ) def delete_site(site: WPSite) -> int: """ Delete a WordPress site from the database with retry logic. Will retry up to 3 times on database operational errors with 0.5s delay between attempts. Args: site: The WPSite object to delete Returns: The number of rows affected by the delete operation """ return ( WordpressSite.delete() .where(WordpressSite.docroot == site.docroot) .execute() ) def get_sites_with_plugin() -> set[WPSite]: """ Get a set of WordPress sites where the imunify-security plugin is installed. The data is pulled from the app-version-detector database. Note: The same WordPress site (real_path) can appear in multiple reports if it was scanned directly and also as part of a parent folder scan. We use only the entry from the latest report for each real_path. Returns: A set of WPSite objects where the plugin is installed. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return set() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report WHERE domain IS NOT NULL AND domain != '' GROUP BY dir ) ), -- Get all WordPress sites with their report IDs all_wp_sites AS ( SELECT wp.id as wp_id, wp.real_path, lr.domain, lr.uid, lr.id as report_id FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL ), -- For each real_path, keep only the entry from the latest report latest_wp_sites AS ( SELECT wp_id, real_path, domain, uid, report_id FROM all_wp_sites WHERE (real_path, report_id) IN ( SELECT real_path, MAX(report_id) FROM all_wp_sites GROUP BY real_path ) ) SELECT real_path, domain, uid FROM latest_wp_sites lws WHERE EXISTS ( SELECT 1 FROM apps AS plugin WHERE plugin.parent_id = lws.wp_id AND plugin.title = 'wp_plugin_{PLUGIN_SLUG.replace("-", "_")}' ) """ ) return { WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() } def get_sites_to_adopt() -> set[WPSite]: """ Get a set of WordPress sites that should be adopted. These are sites where the plugin is installed but either: - Not tracked in our database (e.g., copied/migrated sites) - Flagged as manually removed (from past bugs or manual reinstall) Returns: A set of WPSite objects that should be adopted. """ sites_with_plugin = get_sites_with_plugin() # Get sites from our database that are correctly tracked (not marked as manually deleted) correctly_tracked_sites = { WPSite.from_wordpress_site(r) for r in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null(True) ) } # Adopt sites with plugin that aren't correctly tracked return sites_with_plugin - correctly_tracked_sites def clear_manually_deleted_flag(site: WPSite) -> None: """ Clear the manually_deleted_at flag for a WordPress site. This is used when adopting a site that was previously marked as manually deleted. Args: site: The WPSite object to clear the flag for """ logger.info( "Clearing manually_deleted_at flag for site %s (plugin found)", site, ) ( WordpressSite.update(manually_deleted_at=None) .where(WordpressSite.docroot == site.docroot) .execute() )
💾 Save Changes
Cancel
📤 Upload File
×
Select File
Upload
Cancel
➕ Create New
×
Type
📄 File
📁 Folder
Name
Create
Cancel
✎ Rename Item
×
Current Name
New Name
Rename
Cancel
🔐 Change Permissions
×
Target File
Permission (e.g., 0755, 0644)
0755
0644
0777
Apply
Cancel