opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
wordpress
➕ New
📤 Upload
✎ Editing:
incident_collector.py
← Back
"""Collector for WordPress CVE protection incidents.""" import logging import pwd import time import re from pathlib import Path from collections import defaultdict from imav.model.wordpress import WPSite from imav.wordpress.cli import get_data_dir from defence360agent.wordpress.incident_parser import IncidentFileParser from defence360agent.model.wordpress_incident import ( upsert_wordpress_incident, bulk_create_wordpress_incidents, build_incident_dict, ) logger = logging.getLogger(__name__) class IncidentRateLimiter: """ Rate limiter to prevent DoS attacks via incident flooding. Implements per-rule-per-IP rate limiting as per spec: - Maximum 100 incidents for each rule from the same IP within 15 minutes Memory-optimized implementation with bounded entry count using LRU eviction. """ def __init__( self, max_incidents_per_rule_per_ip: int = 100, time_window_seconds: int = 900, # 15 minutes max_unique_entries: int = 10000, # Limit total unique (rule_id, IP) combinations ): """ Initialize the rate limiter. Args: max_incidents_per_rule_per_ip: Max incidents per rule per IP (default: 100) time_window_seconds: Time window in seconds (default: 900 = 15 minutes) max_unique_entries: Max unique (rule_id, IP) combinations to track (default: 10000) """ self.max_per_rule_per_ip = max_incidents_per_rule_per_ip self.time_window = time_window_seconds self.max_unique_entries = max_unique_entries # Track incident timestamps: {(rule_id, ip): [timestamp1, timestamp2, ...]} self.incident_times = defaultdict(list) self.cleanup_interval = 60 # Clean up old records every minute self.last_cleanup = time.time() def _cleanup_old_records(self): """Remove records older than the time window and enforce max entries limit.""" now = time.time() cutoff = now - self.time_window # Clean expired timestamps from all entries keys_to_delete = [] for key, timestamps in self.incident_times.items(): # Filter out timestamps older than the window recent = [ts for ts in timestamps if ts > cutoff] if recent: self.incident_times[key] = recent else: keys_to_delete.append(key) for key in keys_to_delete: del self.incident_times[key] # Enforce max unique entries limit using LRU eviction if len(self.incident_times) > self.max_unique_entries: # Find oldest entries (those with oldest timestamp) entries_by_age = sorted( self.incident_times.items(), key=lambda x: x[1][0] if x[1] else 0, ) # Remove oldest 10% of entries to avoid frequent evictions num_to_remove = max( 1, len(self.incident_times) - int(self.max_unique_entries * 0.9), ) for key, _ in entries_by_age[:num_to_remove]: del self.incident_times[key] logger.warning( "Rate limiter exceeded max entries (%d), removed %d oldest" " entries", self.max_unique_entries, num_to_remove, ) self.last_cleanup = now def check_rate_limit( self, rule_id: str, attacker_ip: str ) -> tuple[bool, str]: """ Check if adding an incident would exceed rate limits. Args: rule_id: Rule identifier attacker_ip: IP address of the attacker Returns: Tuple of (allowed: bool, reason: str) """ # Periodic cleanup if time.time() - self.last_cleanup > self.cleanup_interval: self._cleanup_old_records() now = time.time() cutoff = now - self.time_window key = (rule_id, attacker_ip) # Lazy cleanup: remove expired entries on access if key in self.incident_times: timestamps = self.incident_times[key] # Filter out old timestamps recent = [ts for ts in timestamps if ts > cutoff] if recent: self.incident_times[key] = recent recent_count = len(recent) else: # All timestamps expired, remove entry del self.incident_times[key] recent_count = 0 else: recent_count = 0 # Check if limit exceeded if recent_count >= self.max_per_rule_per_ip: window_minutes = self.time_window // 60 return ( False, ( f"Rate limit exceeded for rule {rule_id} from IP" f" {attacker_ip}:" f" {recent_count}/{self.max_per_rule_per_ip} within" f" {window_minutes} minutes" ), ) return True, "OK" def record_incident(self, rule_id: str, attacker_ip: str): """ Record that an incident was added. Args: rule_id: Rule identifier attacker_ip: IP address """ now = time.time() key = (rule_id, attacker_ip) # Create list if it doesn't exist, or append to existing if key not in self.incident_times: self.incident_times[key] = [now] else: # Limit list size to prevent unbounded growth timestamps = self.incident_times[key] if len(timestamps) >= self.max_per_rule_per_ip: # Remove oldest timestamp when at limit timestamps.pop(0) timestamps.append(now) class IncidentCollector: """ Collect and persist WordPress incidents from plugin incident files. """ def __init__(self, rate_limiter: IncidentRateLimiter | None = None): """ Initialize the incident collector. Args: rate_limiter: Optional rate limiter (creates default if not provided) """ self.rate_limiter = rate_limiter or IncidentRateLimiter() self.parser = IncidentFileParser() async def collect_incidents_for_site( self, site: WPSite, delete_after_processing: bool = True, ) -> list: """ Collect incidents from a single WordPress site. Args: site: WordPress site to collect incidents from ruleset_version: Version of the ruleset being used delete_after_processing: Whether to delete incident files after processing Returns: List of collected Incident objects """ collected_incidents = [] try: data_dir = await get_data_dir(site) logger.debug("Data directory for site %s: %s", site, data_dir) if not data_dir.exists(): logger.debug("Data directory does not exist for site %s", site) return [] incident_files = self._get_incident_files(data_dir) logger.debug( "Incident files for site %s: %s", site, incident_files ) if not incident_files: logger.debug("No incident files found for site %s", site) return [] logger.debug( "Found %d incident file(s) for site %s", len(incident_files), site, ) username = self._get_site_username(site) for incident_file in incident_files: file_incidents = await self._process_file( incident_file, site, username, delete_after_processing, ) collected_incidents.extend(file_incidents) except Exception as e: logger.error( "Error collecting incidents for site %s: %s", site, e, ) logger.info( "Collected %d incident(s) for site %s", len(collected_incidents), site, ) return collected_incidents async def collect_incidents_for_sites( self, sites: list[WPSite], delete_after_processing: bool = True, ) -> list: """ Collect incidents from multiple WordPress sites. Args: sites: List of WordPress sites delete_after_processing: Whether to delete incident files after processing Returns: List of collected Incident objects """ all_collected_incidents = [] for site in sites: site_incidents = await self.collect_incidents_for_site( site, delete_after_processing, ) all_collected_incidents.extend(site_incidents) if all_collected_incidents: logger.info( "Collected %d WordPress incident(s) from %d site(s)", len(all_collected_incidents), len(sites), ) return all_collected_incidents @classmethod def _get_incident_files(cls, data_dir: Path) -> list[Path]: """ Get all incident files in the incidents directory. Only returns files older than one hour to give the WordPress plugin time to process and finalize the incident data before collection. Args: data_dir: Path to the imunify-security data directory Returns: List of incident file paths, sorted by modification time """ incidents_dir = data_dir / "incidents" logger.debug( "Incidents directory for site %s: %s", data_dir, incidents_dir ) if not incidents_dir.exists() or not incidents_dir.is_dir(): logger.debug( "Incidents directory does not exist for site %s", data_dir ) return [] incident_files = [ f for f in incidents_dir.iterdir() if f.is_file() and cls._is_incident_file(f) ] logger.debug( "Incident files for site %s: %s", data_dir, incident_files ) return incident_files # Pattern for incident files: yyyy-mm-dd-hh.php _FILE_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-\d{2}\.php$") @classmethod def _is_incident_file(cls, file_path: Path) -> bool: """ Check if a file is an incident file based on naming pattern. Args: file_path: Path to the file to check Returns: True if file matches pattern yyyy-mm-dd-hh.php """ return bool(cls._FILE_PATTERN.match(file_path.name)) async def _process_file( self, incident_file, site: WPSite, username: str | None, delete_after_processing: bool, ) -> list: try: incidents = self.parser.parse_file(incident_file) if not incidents: logger.warning( "No valid incidents in file %s", incident_file.name, ) if delete_after_processing: incident_file.unlink(missing_ok=True) return [] logger.debug( "Parsed %d incident(s) from %s for site %s", len(incidents), incident_file.name, site, ) collected_incidents = self._process_file_incidents( incidents, site, username, incident_file.name, ) if delete_after_processing: incident_file.unlink(missing_ok=True) logger.debug("Deleted processed file %s", incident_file.name) return collected_incidents except Exception as e: logger.error( "Error processing incident file %s for site %s: %s", incident_file.name, site, e, ) return [] def _get_site_username(self, site: WPSite) -> str | None: try: user_info = pwd.getpwuid(site.uid) return user_info.pw_name except Exception as e: logger.error( "Failed to get username for uid=%d, site %s: %s", site.uid, site, e, ) return None def _process_file_incidents( self, incidents: list[dict], site: WPSite, username: str | None, incident_file_name: str, ) -> list: incidents_to_insert = [] dropped_count = 0 # Prepare all incidents for bulk insertion for incident in incidents: rule_id = incident.get("rule_id", "unknown") attacker_ip = incident.get("REMOTE_ADDR") or incident.get( "attacker_ip", "unknown" ) allowed, reason = self.rate_limiter.check_rate_limit( rule_id, attacker_ip, ) if not allowed: logger.warning( "Rate limit exceeded for site %s: %s", site, reason, ) dropped_count += 1 continue # Prepare incident data for bulk insert site_info = { "domain": site.domain, "site_path": site.docroot, "username": username, } incident_data = build_incident_dict(incident, site_info) incidents_to_insert.append(incident_data) self.rate_limiter.record_incident(rule_id, attacker_ip) # Bulk insert all incidents in a single transaction created_incidents = [] if incidents_to_insert: try: created_incidents = bulk_create_wordpress_incidents( incidents_to_insert ) except Exception as e: logger.error( "Failed to bulk insert incidents from %s: %s", incident_file_name, e, ) logger.info( "Processed file %s: %d stored, %d dropped", incident_file_name, len(created_incidents), dropped_count, ) return created_incidents def _process_incident( self, incident: dict, site: WPSite, username: str | None, incident_file_name: str, ): rule_id = incident.get("rule_id", "unknown") attacker_ip = incident.get("REMOTE_ADDR") or incident.get( "attacker_ip", "unknown" ) allowed, reason = self.rate_limiter.check_rate_limit( rule_id, attacker_ip, ) if not allowed: logger.warning( "Rate limit exceeded for site %s: %s", site, reason, ) return None return self._store_incident( incident, site, username, rule_id, attacker_ip, incident_file_name, ) def _store_incident( self, incident: dict, site: WPSite, username: str | None, rule_id: str, attacker_ip: str, incident_file_name: str, ): try: site_info = { "domain": site.domain, "site_path": site.docroot, "username": username, } incident = upsert_wordpress_incident( incident, site_info, ) self.rate_limiter.record_incident(rule_id, attacker_ip) return incident except Exception as e: logger.error( "Failed to store incident from %s: %s", incident_file_name, e, ) return None
💾 Save Changes
Cancel
📤 Upload File
×
Select File
Upload
Cancel
➕ Create New
×
Type
📄 File
📁 Folder
Name
Create
Cancel
✎ Rename Item
×
Current Name
New Name
Rename
Cancel
🔐 Change Permissions
×
Target File
Permission (e.g., 0755, 0644)
0755
0644
0777
Apply
Cancel