opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
model
➕ New
📤 Upload
✎ Editing:
wordpress_incident.py
← Back
"""Helper functions for WordPress CVE protection incidents. WordPress incidents are stored in a dedicated wordpress_incident table with plugin-specific data stored in the extra_info JSON field. This module provides helper functions to work with WordPress incidents. Available for both AV and IM360 modes. """ import time import json from datetime import timedelta from peewee import ( CharField, FloatField, IntegerField, TextField, ) from playhouse.sqlite_ext import JSONField, fn from defence360agent.model import Model, instance from defence360agent.model.simplification import apply_order_by from defence360agent.rpc_tools.validate import OrderBy class WordpressIncident(Model): """ WordPress incident model for CVE protection. Uses dedicated wordpress_incident table created in migration 191. Unique constraint on (abuser, name, plugin, rule, severity, domain) allows deduplication similar to the aggregate plugin. """ id = IntegerField(primary_key=True, null=True) plugin = CharField(null=True) rule = CharField(null=True) timestamp = FloatField(null=True) retries = IntegerField(null=True) severity = IntegerField(null=True) name = CharField(null=True) description = TextField(null=True) abuser = CharField(null=True) country = CharField(null=True, column_name="country_id") domain = TextField(null=True, default=None) extra_info = JSONField(null=True) class Meta: database = instance.db db_table = "wordpress_incident" indexes = ( # Unique composite index for deduplication (migration 192) (("abuser", "name", "plugin", "rule", "severity", "domain"), True), ) def build_extra_info(incident_data: dict, site_info: dict) -> dict: """ Build extra_info dict from incident data and site information. Args: incident_data: Dict with incident fields from PHP incident file site_info: Dict with site information (domain, site_path, username, user_id) Returns: Dict with all WordPress-specific fields for extra_info JSON column """ # Serialize JSON fields files_json = serialize_json_field(incident_data.get("FILES")) get_names_json = serialize_json_field(incident_data.get("GET_NAMES")) post_names_json = serialize_json_field(incident_data.get("POST_NAMES")) return { # WordPress plugin-populated fields "cve": incident_data.get("cve"), "mode": incident_data.get("mode"), "target": incident_data.get("target"), "slug": incident_data.get("slug"), "version": incident_data.get("version"), "user_logged_in": incident_data.get("user_logged_in"), "username": site_info.get("username"), "user_id": site_info.get("user_id"), "site_path": site_info.get("site_path"), # HTTP request details "request_method": incident_data.get("REQUEST_METHOD"), "script_filename": incident_data.get("SCRIPT_FILENAME"), "php_self": incident_data.get("PHP_SELF"), "path_info": incident_data.get("PATH_INFO"), "request_uri": incident_data.get("REQUEST_URI"), "query_string": incident_data.get("QUERY_STRING"), "http_x_forwarded_for": incident_data.get("HTTP_X_FORWARDED_FOR"), "http_user_agent": incident_data.get("HTTP_USER_AGENT"), "http_referer": incident_data.get("HTTP_REFERER"), # Request data "files": files_json, "get_names": get_names_json, "post_names": post_names_json, "raw_data": incident_data.get("RAW_DATA"), } def build_incident_dict(incident_data: dict, site_info: dict) -> dict: """ Build complete incident dict ready for database insertion. This is used for both single incident creation and bulk insertion. Args: incident_data: Dict with incident fields from PHP incident file site_info: Dict with site information (domain, site_path, username, user_id) Returns: Dict with all fields ready for Incident.create() or bulk insert """ message = incident_data.get("message") or build_message_fallback( incident_data ) extra_info = build_extra_info(incident_data, site_info) return { # Standard incident fields "plugin": "wordpress", "rule": incident_data.get("rule_id", "unknown"), "timestamp": float(incident_data.get("ts", 0)), "retries": 1, "severity": calculate_severity(incident_data.get("mode")), "name": f"WordPress CVE: {incident_data.get('cve', 'Unknown')}", "description": message, "abuser": incident_data.get("REMOTE_ADDR") or incident_data.get("attacker_ip"), "domain": site_info.get("domain"), # JSONField automatically handles serialization - just pass the dict "extra_info": extra_info, } def create_wordpress_incident( incident_data: dict, site_info: dict ) -> WordpressIncident: """ Create a WordPress incident in the wordpress_incident table. Args: incident_data: Dict with incident fields from PHP incident file site_info: Dict with site information (domain, site_path, username) Returns: WordpressIncident instance with WordPress fields populated in extra_info """ incident_dict = build_incident_dict(incident_data, site_info) return WordpressIncident.create(**incident_dict) def upsert_wordpress_incident( incident_data: dict, site_info: dict ) -> WordpressIncident: """ Insert or update a WordPress incident in the wordpress_incident table. If an incident with the same aggregate key (abuser, name, plugin, rule, severity, domain) exists, increment its retries counter and update timestamp. Otherwise, create a new incident with retries=1. This implements similar deduplication logic as the aggregate plugin. Args: incident_data: Dict with incident fields from PHP incident file site_info: Dict with site information (domain, site_path, username, user_id) Returns: WordpressIncident instance (either newly created or updated) """ incident_dict = build_incident_dict(incident_data, site_info) # Use INSERT ... ON CONFLICT for efficient upsert # On conflict: increment retries and update timestamp # Use RETURNING to get the inserted/updated record without a separate query result = ( WordpressIncident.insert(**incident_dict) .on_conflict( conflict_target=[ WordpressIncident.abuser, WordpressIncident.name, WordpressIncident.plugin, WordpressIncident.rule, WordpressIncident.severity, WordpressIncident.domain, ], update={ WordpressIncident.retries: WordpressIncident.retries + 1, WordpressIncident.timestamp: incident_dict["timestamp"], }, ) .returning(WordpressIncident) .execute() ) # Get the first (and only) returned row return list(result)[0] def wordpress_incident_to_dict(incident: WordpressIncident) -> dict: """ Convert a WordpressIncident model instance to a dictionary. Args: incident: WordpressIncident model instance Returns: Dictionary representation of the incident """ return { "id": incident.id, "plugin": incident.plugin, "rule": incident.rule, "timestamp": incident.timestamp, "retries": incident.retries, "severity": incident.severity, "name": incident.name, "description": incident.description, "abuser": incident.abuser, "country": incident.country, "domain": incident.domain, "extra_info": incident.extra_info, } def get_wordpress_incidents( limit: int = 1000, offset: int = 0, user_id: int | None = None, by_abuser_ip: str | None = None, by_country_code: str | None = None, by_domain: str | None = None, search: str | None = None, site_search: str | None = None, since: int | None = None, to: int | None = None, order_by: list | None = None, ): """ Get WordPress incidents as dictionaries. Args: limit: Maximum number of incidents to return offset: Offset for pagination user_id: Filter by user ID (None = all) by_abuser_ip: Filter by abuser IP address (None = all) by_country_code: Filter by country code (None = all) by_domain: Filter by domain (None = all) search: Search in IP address, name, description, or domain (None = all) site_search: Filter by site path in extra_info (None = all) since: Filter by timestamp >= this value (unix timestamp, None = all) to: Filter by timestamp <= this value (unix timestamp, None = all) order_by: List of fields to order by (None = default order by timestamp desc). Can be either strings (e.g., ["timestamp+", "severity-"]) or OrderBy objects. Strings are automatically converted. Returns: List of incident dictionaries """ query = WordpressIncident.select(WordpressIncident).where( (WordpressIncident.plugin == "wordpress") ) if user_id is not None: query = query.where( fn.json_extract(WordpressIncident.extra_info, "$.user_id") == user_id ) if by_abuser_ip is not None: query = query.where(WordpressIncident.abuser.contains(by_abuser_ip)) if by_country_code is not None: query = query.where(WordpressIncident.country == by_country_code) if by_domain is not None: query = query.where(WordpressIncident.domain.contains(by_domain)) if search is not None: query = query.where( WordpressIncident.name.contains(search) | WordpressIncident.description.contains(search) | WordpressIncident.domain.contains(search) | WordpressIncident.abuser.contains(search) ) if site_search is not None: query = query.where( fn.json_extract(WordpressIncident.extra_info, "$.site_path") == site_search ) if since is not None: query = query.where(WordpressIncident.timestamp.cast("REAL") >= since) if to is not None: query = query.where(WordpressIncident.timestamp.cast("REAL") <= to) # Apply ordering if order_by is not None: # Convert string format to OrderBy objects if needed converted_order_by = [] for item in order_by: if isinstance(item, str): converted_order_by.append(OrderBy.fromstring(item)) else: converted_order_by.append(item) query = apply_order_by(converted_order_by, WordpressIncident, query) else: # Default order by timestamp descending query = query.order_by(WordpressIncident.timestamp.desc()) query = query.limit(limit) query = query.offset(offset) return [wordpress_incident_to_dict(inc) for inc in query.execute()] def bulk_create_wordpress_incidents( incidents_data: list[dict], ) -> list[dict]: """ Bulk create WordPress incidents in a single transaction. Args: incidents_data: List of dictionaries containing incident field data Returns: List of created incident dictionaries """ if not incidents_data: return [] # Insert all incidents in bulk with RETURNING to get the created objects result = ( WordpressIncident.insert_many(incidents_data) .returning(WordpressIncident) .execute() ) return [wordpress_incident_to_dict(inc) for inc in result] def delete_old_wordpress_incidents(days: int): cutoff_time = time.time() - timedelta(days=days).total_seconds() deleted = ( WordpressIncident.delete() .where( (WordpressIncident.plugin == "wordpress") & (WordpressIncident.timestamp.cast("REAL") < cutoff_time) ) .execute() ) return deleted def build_message_fallback(incident_data: dict) -> str: """Build message if plugin didn't provide one (per spec format).""" parts = ["IM WP plugin:"] if incident_data.get("rule_id"): parts.append(incident_data["rule_id"]) if incident_data.get("cve"): parts.append(incident_data["cve"]) if incident_data.get("slug"): parts.append(incident_data["slug"]) if incident_data.get("version"): parts.append(incident_data["version"]) if incident_data.get("mode"): parts.append(incident_data["mode"]) return " ".join(parts) def calculate_severity(mode: str | None) -> int: """Calculate severity based on mode.""" if mode == "block": return 8 # Higher severity for blocked attacks elif mode == "pass": return 5 # Medium severity for monitored attacks else: return 5 # Default def serialize_json_field(value) -> str | None: """Serialize a value to JSON string if it's not already a string.""" if value is None: return None if isinstance(value, str): return value return json.dumps(value)
💾 Save Changes
Cancel
📤 Upload File
×
Select File
Upload
Cancel
➕ Create New
×
Type
📄 File
📁 Folder
Name
Create
Cancel
✎ Rename Item
×
Current Name
New Name
Rename
Cancel
🔐 Change Permissions
×
Target File
Permission (e.g., 0755, 0644)
0755
0644
0777
Apply
Cancel