diff --git a/cortex/autonomous_patcher.py b/cortex/autonomous_patcher.py new file mode 100644 index 00000000..a0d6d118 --- /dev/null +++ b/cortex/autonomous_patcher.py @@ -0,0 +1,791 @@ +#!/usr/bin/env python3 +""" +Autonomous Security Patcher for Cortex Linux + +Automatically patches security vulnerabilities with safety controls including: +- Dry-run mode by default +- Rollback capability +- Whitelist/blacklist support +- Severity-based filtering +- Integration with installation history +""" + +import logging +import os +import subprocess +import threading +import time +from dataclasses import dataclass +from datetime import datetime +from enum import Enum +from pathlib import Path + +from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType +from cortex.progress_indicators import ProgressIndicator, get_progress_indicator +from cortex.vulnerability_scanner import Severity, Vulnerability, VulnerabilityScanner + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class PrivilegeError(Exception): + """Raised when root/sudo privileges are required but not available.""" + + pass + + +def _get_severity_color(severity: Severity) -> str: + """Get color for severity level.""" + return { + Severity.CRITICAL: "red", + Severity.HIGH: "bright_red", + Severity.MEDIUM: "yellow", + Severity.LOW: "blue", + Severity.UNKNOWN: "dim", + }.get(severity, "white") + + +# Module-level apt update lock for thread safety +_apt_update_lock = threading.Lock() +# Default apt update interval: 5 minutes balances freshness with avoiding +# excessive apt-get update calls during batch operations +_DEFAULT_APT_UPDATE_INTERVAL_SECONDS = 300 + + +class PatchStrategy(Enum): + """Patching strategy""" + + AUTOMATIC = "automatic" # Patch all vulnerabilities + CRITICAL_ONLY = "critical_only" # Only patch critical vulnerabilities + HIGH_AND_ABOVE = "high_and_above" # Patch high and critical + MANUAL = "manual" # Require manual approval for each patch + + +@dataclass +class PatchPlan: + """Plan for patching vulnerabilities""" + + vulnerabilities: list[Vulnerability] + packages_to_update: dict[str, str] # package -> target_version + estimated_duration_minutes: float + requires_reboot: bool + rollback_available: bool + + +@dataclass +class PatchResult: + """Result of a patching operation""" + + patch_id: str + timestamp: str + vulnerabilities_patched: int + packages_updated: list[str] + success: bool + errors: list[str] + rollback_id: str | None = None + duration_seconds: float | None = None + + +class AutonomousPatcher: + """Autonomous security patching with safety controls""" + + def __init__( + self, + strategy: PatchStrategy = PatchStrategy.CRITICAL_ONLY, + dry_run: bool = True, + auto_approve: bool = False, + config_path: str | Path | None = None, + allow_unverified_patches: bool = False, + apt_update_interval: int = _DEFAULT_APT_UPDATE_INTERVAL_SECONDS, + ): + """ + Initialize the autonomous patcher. + + Args: + strategy: Patching strategy + dry_run: If True, only show what would be patched + auto_approve: If True, automatically approve patches (dangerous!) + config_path: Optional path to config file (defaults to ~/.cortex/patcher_config.json) + allow_unverified_patches: If True, allow patches when fixed_version is unknown + apt_update_interval: Seconds between apt-get update calls. Defaults to 300 (5 min). + Increase for systems running frequent scans to reduce load. + Decrease for time-sensitive security patching. + """ + self.strategy = strategy + self.dry_run = dry_run + self.auto_approve = auto_approve + self.allow_unverified_patches = allow_unverified_patches + self.apt_update_interval = apt_update_interval + self.scanner = VulnerabilityScanner() + self.history = InstallationHistory() + + # Config path + if config_path is None: + self.config_path = Path.home() / ".cortex" / "patcher_config.json" + else: + self.config_path = Path(config_path) + + # Safety controls + self.whitelist: set[str] = set() # Packages always allowed to patch + self.blacklist: set[str] = set() # Packages never patched automatically + self.min_severity = Severity.MEDIUM # Minimum severity to patch + + # Apt update tracking (instance-level to avoid global state issues) + self._apt_last_updated: datetime | None = None + + # Load configuration + self._load_config() + + def _load_config(self): + """Load patcher configuration from file""" + if self.config_path.exists(): + try: + import json + + with open(self.config_path) as f: + config = json.load(f) + + self.whitelist = set(config.get("whitelist", [])) + self.blacklist = set(config.get("blacklist", [])) + min_sev = config.get("min_severity", "medium") + self.min_severity = Severity(min_sev.lower()) + + logger.info(f"Loaded patcher config from {self.config_path}") + except Exception as e: + logger.warning(f"Failed to load patcher config: {e}") + + def _save_config(self): + """Save patcher configuration to file""" + self.config_path.parent.mkdir(parents=True, exist_ok=True) + + try: + import json + + config = { + "whitelist": list(self.whitelist), + "blacklist": list(self.blacklist), + "min_severity": self.min_severity.value, + } + + with open(self.config_path, "w") as f: + json.dump(config, f, indent=2) + + except Exception as e: + logger.warning(f"Failed to save patcher config: {e}") + + def _run_command(self, cmd: list[str]) -> tuple[bool, str, str]: + """Execute command and return success, stdout, stderr""" + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) + return (result.returncode == 0, result.stdout, result.stderr) + except subprocess.TimeoutExpired: + return (False, "", "Command timed out") + except Exception as e: + return (False, "", str(e)) + + def _has_root_privileges(self) -> bool: + """ + Check if we have root privileges (running as root or have passwordless sudo). + + Returns: + True if running as root or passwordless sudo is available + """ + # Check if running as root + if os.geteuid() == 0: + return True + + # Check if we have passwordless sudo access + try: + result = subprocess.run( + ["sudo", "-n", "true"], capture_output=True, timeout=2 + ) + return result.returncode == 0 + except Exception: + return False + + def _require_root_privileges(self, operation: str) -> None: + """ + Check for root privileges and raise PrivilegeError if not available. + + Args: + operation: Description of the operation requiring privileges + + Raises: + PrivilegeError: If root privileges are not available + """ + if not self._has_root_privileges(): + raise PrivilegeError( + f"Root privileges required for {operation}. " + f"Run with sudo or as root: sudo cortex security patch" + ) + + def ensure_apt_updated(self, force: bool = False) -> bool: + """ + Ensure apt package list is updated. Thread-safe and rate-limited. + + Args: + force: If True, force update even if recently updated + + Returns: + True if update succeeded or was recently done, False on failure + + Raises: + PrivilegeError: If root privileges are required but not available + """ + with _apt_update_lock: + now = datetime.now() + + # Check if we need to update + if not force and self._apt_last_updated is not None: + elapsed = (now - self._apt_last_updated).total_seconds() + if elapsed < self.apt_update_interval: + logger.debug(f"Apt cache still fresh ({elapsed:.0f}s old), skipping update") + return True + + # Check for root privileges before attempting apt-get update + if not self._has_root_privileges(): + logger.warning( + "Root privileges required for apt-get update. " + "Run with sudo or as root: sudo cortex security patch" + ) + return False + + # Run apt-get update + logger.info("Updating apt package list...") + success, stdout, stderr = self._run_command(["apt-get", "update", "-qq"]) + + if success: + self._apt_last_updated = now + logger.info("Apt package list updated successfully") + return True + else: + logger.warning(f"Failed to update apt package list: {stderr}") + # Still set timestamp to avoid hammering on repeated failures + self._apt_last_updated = now + return False + + def _check_package_update_available(self, package_name: str) -> str | None: + """ + Check if an update is available for a package. + + Note: Call ensure_apt_updated() before iterating over multiple packages + to avoid repeated apt-get update calls. + """ + try: + # Check for available updates (apt-get update should be called beforehand) + success, stdout, _ = self._run_command(["apt-cache", "policy", package_name]) + + if success: + # Parse output to find candidate version + for line in stdout.split("\n"): + if "Candidate:" in line: + # Use split(":", 1) to handle epoch versions like "1:2.3.4-5" + # Without the limit, "Candidate: 1:2.3.4-5" splits to + # ["Candidate", " 1", "2.3.4-5"] instead of ["Candidate", " 1:2.3.4-5"] + parts = line.split(":", 1) + if len(parts) >= 2: + version = parts[1].strip() + if version and version != "(none)": + return version + + except Exception as e: + logger.warning(f"Failed to check update for {package_name}: {e}") + + return None + + def _compare_versions(self, version1: str, operator: str, version2: str) -> bool: + """ + Compare two Debian package versions using dpkg --compare-versions. + + Args: + version1: First version string + operator: Comparison operator (lt, le, eq, ne, ge, gt) + version2: Second version string + + Returns: + True if the comparison holds, False otherwise + """ + try: + success, _, _ = self._run_command( + ["dpkg", "--compare-versions", version1, operator, version2] + ) + return success + except Exception as e: + logger.warning(f"Version comparison failed: {e}") + return False + + def _update_fixes_vulnerability( + self, candidate_version: str, vulnerability: Vulnerability + ) -> bool: + """ + Check if the candidate version fixes the vulnerability. + + Args: + candidate_version: The version available for update + vulnerability: The vulnerability to check + + Returns: + True if the update will fix the vulnerability + """ + fixed_version = vulnerability.fixed_version + + # If no fixed version is specified, we can't verify the fix + if not fixed_version: + if self.allow_unverified_patches: + logger.warning( + f"āš ļø Applying UNVERIFIED update for {vulnerability.cve_id} on " + f"{vulnerability.package_name} - no fixed_version available to verify fix" + ) + return True + else: + logger.info( + f"Skipping {vulnerability.cve_id} on {vulnerability.package_name}: " + f"no fixed_version specified, cannot verify update fixes vulnerability " + f"(set allow_unverified_patches=True to override)" + ) + return False + + # Check if candidate version >= fixed version + if self._compare_versions(candidate_version, "ge", fixed_version): + logger.debug( + f"Candidate {candidate_version} >= fixed {fixed_version} for " + f"{vulnerability.package_name} ({vulnerability.cve_id})" + ) + return True + + logger.warning( + f"Update for {vulnerability.package_name} to {candidate_version} does NOT fix " + f"{vulnerability.cve_id} (requires >= {fixed_version})" + ) + return False + + def _should_patch(self, vulnerability: Vulnerability) -> bool: + """ + Determine if a vulnerability should be patched based on strategy and filters. + + Args: + vulnerability: Vulnerability to check + + Returns: + True if should be patched + """ + # Check blacklist + if vulnerability.package_name in self.blacklist: + logger.debug(f"Skipping {vulnerability.package_name} (blacklisted)") + return False + + # Check whitelist (always patch if whitelisted) + if vulnerability.package_name in self.whitelist: + return True + + # Check minimum severity + severity_order = { + Severity.CRITICAL: 4, + Severity.HIGH: 3, + Severity.MEDIUM: 2, + Severity.LOW: 1, + Severity.UNKNOWN: 0, + } + + if severity_order.get(vulnerability.severity, 0) < severity_order.get(self.min_severity, 0): + return False + + # Check strategy + if self.strategy == PatchStrategy.CRITICAL_ONLY: + return vulnerability.severity == Severity.CRITICAL + elif self.strategy == PatchStrategy.HIGH_AND_ABOVE: + return vulnerability.severity in [Severity.CRITICAL, Severity.HIGH] + elif self.strategy == PatchStrategy.AUTOMATIC: + return True + elif self.strategy == PatchStrategy.MANUAL: + return False # Manual approval required + + return False + + def create_patch_plan( + self, + vulnerabilities: list[Vulnerability] | None = None, + progress: ProgressIndicator | None = None, + ) -> PatchPlan: + """ + Create a plan for patching vulnerabilities. + + Args: + vulnerabilities: List of vulnerabilities to patch (if None, scans all) + progress: Optional progress indicator for UI output + + Returns: + PatchPlan with packages to update + """ + if progress is None: + progress = get_progress_indicator() + + if vulnerabilities is None: + # Scan for vulnerabilities + scan_result = self.scanner.scan_all_packages(progress=progress) + vulnerabilities = scan_result.vulnerabilities + + # Filter vulnerabilities based on strategy + to_patch = [v for v in vulnerabilities if self._should_patch(v)] + + if not to_patch: + return PatchPlan( + vulnerabilities=[], + packages_to_update={}, + estimated_duration_minutes=0.0, + requires_reboot=False, + rollback_available=True, + ) + + # Group by package + packages_to_update: dict[str, str] = {} + package_vulns: dict[str, list[Vulnerability]] = {} + + for vuln in to_patch: + if vuln.package_name not in package_vulns: + package_vulns[vuln.package_name] = [] + package_vulns[vuln.package_name].append(vuln) + + # Update apt package list once before checking all packages + progress.print_info("Updating package list...") + apt_ok = self.ensure_apt_updated() + if not apt_ok: + progress.print_warning("Could not update package list (may need sudo)") + + # Check for available updates and verify they fix vulnerabilities + requires_reboot = False + verified_vulns: list[Vulnerability] = [] + skipped_vulns: list[tuple[Vulnerability, str]] = [] # (vuln, reason) + + progress.print_info(f"Checking updates for {len(package_vulns)} package(s)...") + + for package_name, vulns in package_vulns.items(): + # Check if update is available + update_version = self._check_package_update_available(package_name) + + if not update_version: + for vuln in vulns: + skipped_vulns.append((vuln, "no update available")) + continue + + # Verify the update fixes each vulnerability for this package + fixes_any = False + for vuln in vulns: + if self._update_fixes_vulnerability(update_version, vuln): + verified_vulns.append(vuln) + fixes_any = True + else: + skipped_vulns.append( + ( + vuln, + f"update to {update_version} doesn't fix (needs >= {vuln.fixed_version})", + ) + ) + + # Only include the package if it fixes at least one vulnerability + if fixes_any: + packages_to_update[package_name] = update_version + + # Check if this is a kernel package (requires reboot) + if "linux-image" in package_name or "linux-headers" in package_name: + requires_reboot = True + + # Show skipped vulnerabilities summary + if skipped_vulns: + progress.print_warning( + f"{len(skipped_vulns)} vulnerabilities cannot be fixed by available updates" + ) + logger.debug(f"Skipped vulnerabilities: {skipped_vulns[:5]}") + + # Estimate duration (rough: 1 minute per package) + estimated_duration = len(packages_to_update) * 1.0 + + return PatchPlan( + vulnerabilities=verified_vulns, + packages_to_update=packages_to_update, + estimated_duration_minutes=estimated_duration, + requires_reboot=requires_reboot, + rollback_available=True, + ) + + def apply_patch_plan( + self, plan: PatchPlan, progress: ProgressIndicator | None = None + ) -> PatchResult: + """ + Apply a patch plan. + + Args: + plan: Patch plan to apply + progress: Optional progress indicator for UI output + + Returns: + PatchResult with results + """ + if progress is None: + progress = get_progress_indicator() + + patch_id = f"patch_{int(time.time())}" + start_time = datetime.now() + + if not plan.packages_to_update: + return PatchResult( + patch_id=patch_id, + timestamp=start_time.isoformat(), + vulnerabilities_patched=0, + packages_updated=[], + success=True, + errors=[], + ) + + if self.dry_run: + progress.print_info("Would update the following packages:") + for package, version in plan.packages_to_update.items(): + progress.print_info(f" {package} → {version}") + + return PatchResult( + patch_id=patch_id, + timestamp=start_time.isoformat(), + vulnerabilities_patched=len(plan.vulnerabilities), + packages_updated=list(plan.packages_to_update.keys()), + success=True, + errors=[], + ) + + # Check for root privileges before executing apt commands + if not self._has_root_privileges(): + error_msg = ( + "Root privileges required to install packages. " + "Run with sudo or as root: sudo cortex security patch --apply" + ) + progress.print_error(error_msg) + logger.error(error_msg) + return PatchResult( + patch_id=patch_id, + timestamp=start_time.isoformat(), + vulnerabilities_patched=0, + packages_updated=[], + success=False, + errors=[error_msg], + ) + + # Record installation start + packages_list = list(plan.packages_to_update.keys()) + commands = ["apt-get update"] + if packages_list: + commands.append("apt-get install -y " + " ".join(packages_list)) + + install_id = self.history.record_installation( + InstallationType.UPGRADE, + packages_list, + commands, + start_time, + ) + + # Execute patching + errors = [] + updated_packages = [] + + try: + # Update package list + progress.print_info("Updating package list...") + success, stdout, stderr = self._run_command(["apt-get", "update", "-qq"]) + if not success: + errors.append(f"Failed to update package list: {stderr}") + progress.print_warning("Could not update package list") + + # Install updates with progress + package_items = list(plan.packages_to_update.items()) + for package_name, target_version in progress.progress_bar( + package_items, description="šŸ“¦ Installing updates" + ): + # Use apt-get install with specific version if available + if target_version: + cmd = ["apt-get", "install", "-y", f"{package_name}={target_version}"] + else: + cmd = ["apt-get", "install", "-y", package_name] + + success, stdout, stderr = self._run_command(cmd) + + if success: + updated_packages.append(package_name) + progress.print_success(f"{package_name} → {target_version}") + else: + error_msg = f"Failed to update {package_name}: {stderr}" + errors.append(error_msg) + progress.print_error(f"{package_name}: update failed") + logger.error(error_msg) + + # Update installation record + if errors: + self.history.update_installation( + install_id, InstallationStatus.FAILED, "\n".join(errors) + ) + success = False + else: + self.history.update_installation(install_id, InstallationStatus.SUCCESS) + success = True + + duration = (datetime.now() - start_time).total_seconds() + + result = PatchResult( + patch_id=patch_id, + timestamp=start_time.isoformat(), + vulnerabilities_patched=len(plan.vulnerabilities), + packages_updated=updated_packages, + success=success, + errors=errors, + rollback_id=install_id, + duration_seconds=duration, + ) + + return result + + except Exception as e: + error_msg = f"Patch operation failed: {e}" + logger.error(error_msg) + errors.append(error_msg) + + self.history.update_installation(install_id, InstallationStatus.FAILED, error_msg) + + return PatchResult( + patch_id=patch_id, + timestamp=start_time.isoformat(), + vulnerabilities_patched=0, + packages_updated=[], + success=False, + errors=errors, + rollback_id=install_id, + ) + + def patch_vulnerabilities( + self, + vulnerabilities: list[Vulnerability] | None = None, + progress: ProgressIndicator | None = None, + ) -> PatchResult: + """ + Scan and patch vulnerabilities automatically. + + Args: + vulnerabilities: Optional list of vulnerabilities to patch + progress: Optional progress indicator for UI output + + Returns: + PatchResult with patching results + """ + if progress is None: + progress = get_progress_indicator() + + # Create patch plan + plan = self.create_patch_plan(vulnerabilities, progress=progress) + + if not plan.packages_to_update: + progress.print_success("No packages need patching") + return PatchResult( + patch_id=f"patch_{int(time.time())}", + timestamp=datetime.now().isoformat(), + vulnerabilities_patched=0, + packages_updated=[], + success=True, + errors=[], + ) + + # Show plan in a nice format + progress.print_info("─" * 50) + progress.print_info("šŸ“‹ Patch Plan") + progress.print_info(f" Vulnerabilities: {len(plan.vulnerabilities)}") + progress.print_info(f" Packages: {len(plan.packages_to_update)}") + progress.print_info(f" Est. time: ~{plan.estimated_duration_minutes:.0f} min") + if plan.requires_reboot: + progress.print_warning(" āš ļø Reboot required after patching") + progress.print_info("─" * 50) + + # Apply plan + return self.apply_patch_plan(plan, progress=progress) + + def add_to_whitelist(self, package_name: str): + """Add package to whitelist""" + self.whitelist.add(package_name) + self._save_config() + logger.info(f"Added {package_name} to whitelist") + + def add_to_blacklist(self, package_name: str): + """Add package to blacklist""" + self.blacklist.add(package_name) + self._save_config() + logger.info(f"Added {package_name} to blacklist") + + def set_min_severity(self, severity: Severity): + """Set minimum severity for patching""" + self.min_severity = severity + self._save_config() + logger.info(f"Minimum severity set to {severity.value}") + + +# CLI Interface +if __name__ == "__main__": + import argparse + import sys + + parser = argparse.ArgumentParser(description="Autonomous security patcher") + parser.add_argument( + "--scan-and-patch", action="store_true", help="Scan and patch vulnerabilities" + ) + parser.add_argument( + "--dry-run", action="store_true", default=True, help="Dry run mode (default)" + ) + parser.add_argument( + "--apply", action="store_true", help="Actually apply patches (disable dry-run)" + ) + parser.add_argument( + "--strategy", + choices=["automatic", "critical_only", "high_and_above", "manual"], + default="critical_only", + help="Patching strategy", + ) + parser.add_argument("--whitelist", help="Add package to whitelist") + parser.add_argument("--blacklist", help="Add package to blacklist") + parser.add_argument( + "--min-severity", + choices=["critical", "high", "medium", "low"], + help="Minimum severity to patch", + ) + + args = parser.parse_args() + + dry_run = args.dry_run and not args.apply + strategy = PatchStrategy(args.strategy) + + patcher = AutonomousPatcher(strategy=strategy, dry_run=dry_run) + + if args.whitelist: + patcher.add_to_whitelist(args.whitelist) + print(f"āœ… Added {args.whitelist} to whitelist") + + if args.blacklist: + patcher.add_to_blacklist(args.blacklist) + print(f"āœ… Added {args.blacklist} to blacklist") + + if args.min_severity: + patcher.set_min_severity(Severity(args.min_severity)) + print(f"āœ… Minimum severity set to {args.min_severity}") + + if args.scan_and_patch: + if dry_run: + print("šŸ” DRY RUN MODE - No packages will be updated\n") + + result = patcher.patch_vulnerabilities() + + if result.success: + print("\nāœ… Patch complete!") + print(f" Packages updated: {len(result.packages_updated)}") + print(f" Vulnerabilities patched: {result.vulnerabilities_patched}") + if result.duration_seconds: + print(f" Duration: {result.duration_seconds:.2f}s") + else: + print("\nāŒ Patch failed!") + for error in result.errors: + print(f" - {error}") + sys.exit(1) + + if not any([args.scan_and_patch, args.whitelist, args.blacklist, args.min_severity]): + parser.print_help() diff --git a/cortex/cli.py b/cortex/cli.py index ea8976d1..2837a6c8 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -1078,6 +1078,288 @@ def env(self, args: argparse.Namespace) -> int: traceback.print_exc() return 1 + def security(self, args: argparse.Namespace) -> int: + """Handle security vulnerability management commands.""" + action = getattr(args, "security_action", None) + + if not action: + self._print_error("Please specify a subcommand (scan/patch/schedule)") + return 1 + + try: + if action == "scan": + return self._security_scan(args) + elif action == "patch": + return self._security_patch(args) + elif action == "schedule": + return self._security_schedule(args) + else: + self._print_error(f"Unknown security subcommand: {action}") + return 1 + except Exception as e: + self._print_error(f"Security operation failed: {e}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 + + def _security_scan(self, args: argparse.Namespace) -> int: + """Handle vulnerability scanning.""" + from cortex.vulnerability_scanner import Severity, VulnerabilityScanner + + # Suppress verbose logging for cleaner output + logging.getLogger("cortex.vulnerability_scanner").setLevel(logging.WARNING) + + scanner = VulnerabilityScanner() + + if getattr(args, "critical", False): + critical = scanner.get_critical_vulnerabilities() + if critical: + console.print("\nšŸ”“ Critical Vulnerabilities:") + console.print("=" * 80) + for vuln in critical[:20]: # Limit to 20 + console.print(f"\n[red]CVE: {vuln.cve_id}[/red]") + console.print( + f"Package: [yellow]{vuln.package_name}[/yellow] {vuln.installed_version}" + ) + console.print(f"Description: {vuln.description[:100]}...") + if vuln.fixed_version: + console.print(f"Fixed in: [green]{vuln.fixed_version}[/green]") + else: + cx_print("āœ… No critical vulnerabilities found", "success") + return 0 + + package = getattr(args, "package", None) + scan_all = getattr(args, "all", False) + + if package: + version = scanner.get_package_version(package) + if version is None: + self._print_error(f"Package {package} not found") + return 1 + vulns = scanner.scan_package(package, version) + + console.print(f"\nšŸ” Vulnerabilities for {package} {version}:") + console.print("=" * 80) + if vulns: + for vuln in vulns: + severity_color = { + Severity.CRITICAL: "red", + Severity.HIGH: "yellow", + Severity.MEDIUM: "blue", + Severity.LOW: "green", + }.get(vuln.severity, "white") + + console.print( + f"\n[{severity_color}]CVE: {vuln.cve_id} [{vuln.severity.value.upper()}][/{severity_color}]" + ) + console.print(f"Description: {vuln.description}") + if vuln.fixed_version: + console.print(f"Fixed in: {vuln.fixed_version}") + else: + cx_print("āœ… No vulnerabilities found", "success") + elif scan_all: + result = scanner.scan_all_packages() + + console.print("\nšŸ“Š Scan Results:") + console.print("=" * 80) + console.print(f"Packages scanned: {result.total_packages_scanned}") + console.print(f"Vulnerabilities found: {result.vulnerabilities_found}") + console.print(f" [red]šŸ”“ Critical: {result.critical_count}[/red]") + console.print(f" [yellow]🟠 High: {result.high_count}[/yellow]") + console.print(f" [blue]🟔 Medium: {result.medium_count}[/blue]") + console.print(f" [green]🟢 Low: {result.low_count}[/green]") + console.print(f"\nScan duration: {result.scan_duration_seconds:.2f}s") + + if result.vulnerabilities: + console.print("\nšŸ“‹ Top Vulnerabilities:") + sorted_vulns = sorted( + result.vulnerabilities, + key=lambda v: ( + v.severity == Severity.CRITICAL, + v.severity == Severity.HIGH, + v.cvss_score or 0, + ), + reverse=True, + ) + + for vuln in sorted_vulns[:10]: + severity_color = { + Severity.CRITICAL: "red", + Severity.HIGH: "yellow", + Severity.MEDIUM: "blue", + Severity.LOW: "green", + }.get(vuln.severity, "white") + + console.print( + f"\n [{severity_color}]{vuln.cve_id} - {vuln.package_name} [{vuln.severity.value.upper()}][/{severity_color}]" + ) + console.print(f" {vuln.description[:80]}...") + else: + self._print_error("Please specify --package, --all, or --critical") + return 1 + + return 0 + + def _security_patch(self, args: argparse.Namespace) -> int: + """Handle autonomous patching.""" + from cortex.autonomous_patcher import AutonomousPatcher, PatchStrategy + from cortex.progress_indicators import get_progress_indicator + + # Suppress verbose logging for cleaner output + logging.getLogger("cortex.vulnerability_scanner").setLevel(logging.WARNING) + logging.getLogger("cortex.autonomous_patcher").setLevel(logging.WARNING) + + progress = get_progress_indicator() + + # Dry run is the default; only disabled when --apply is explicitly specified + dry_run = not getattr(args, "apply", False) + strategy = PatchStrategy(getattr(args, "strategy", "critical_only")) + package_filter = getattr(args, "package", None) + + patcher = AutonomousPatcher(strategy=strategy, dry_run=dry_run) + + if getattr(args, "scan_and_patch", False) or package_filter: + # Show header + console.print() + if dry_run: + console.print("[yellow]šŸ” DRY RUN MODE[/yellow] - No packages will be updated") + else: + console.print("[green]šŸ”§ APPLY MODE[/green] - Patches will be installed") + console.print(f"[dim]Strategy: {strategy.value}[/dim]") + console.print() + + vulnerabilities = None + if package_filter: + # Scan specific package first + from cortex.vulnerability_scanner import VulnerabilityScanner + + scanner = VulnerabilityScanner() + scan_result = scanner.scan_all_packages( + package_filter=[package_filter], progress=progress + ) + vulnerabilities = scan_result.vulnerabilities + + if not vulnerabilities: + console.print() + progress.print_success(f"No vulnerabilities found in {package_filter}") + return 0 + + console.print() + + result = patcher.patch_vulnerabilities(vulnerabilities, progress=progress) + + # Show summary + console.print() + if result.success: + if result.packages_updated: + console.print( + "[green]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/green]" + ) + console.print("[green]āœ… Patch complete![/green]") + console.print( + f" Packages updated: [cyan]{len(result.packages_updated)}[/cyan]" + ) + console.print( + f" Vulnerabilities patched: [cyan]{result.vulnerabilities_patched}[/cyan]" + ) + if result.duration_seconds: + console.print(f" Duration: [dim]{result.duration_seconds:.2f}s[/dim]") + console.print( + "[green]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/green]" + ) + else: + console.print("[dim]No updates were applied.[/dim]") + else: + console.print("[red]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/red]") + console.print("[red]āŒ Patch failed![/red]") + for error in result.errors: + console.print(f" [red]•[/red] {error}") + console.print("[red]━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/red]") + return 1 + else: + self._print_error("Use --scan-and-patch or --package to patch vulnerabilities") + return 1 + + return 0 + + def _security_schedule(self, args: argparse.Namespace) -> int: + """Handle security scheduling.""" + from cortex.security_scheduler import ScheduleFrequency, SecurityScheduler + + scheduler = SecurityScheduler() + schedule_action = getattr(args, "schedule_action", None) + + if schedule_action == "create": + from cortex.autonomous_patcher import PatchStrategy + + # Default to dry-run mode unless --apply is specified + dry_run = not getattr(args, "apply", False) + + schedule = scheduler.create_schedule( + schedule_id=args.id, + frequency=ScheduleFrequency(getattr(args, "frequency", "monthly")), + scan_enabled=True, + patch_enabled=getattr(args, "enable_patch", False), + patch_strategy=PatchStrategy(getattr(args, "patch_strategy", "critical_only")), + dry_run=dry_run, + ) + + cx_print(f"āœ… Created schedule: {args.id}", "success") + console.print(f" Frequency: {schedule.frequency.value}") + console.print(f" Scan: {'enabled' if schedule.scan_enabled else 'disabled'}") + console.print(f" Patch: {'enabled' if schedule.patch_enabled else 'disabled'}") + console.print(f" Dry-run: {'enabled' if schedule.dry_run else 'disabled'}") + + elif schedule_action == "list": + schedules = scheduler.list_schedules() + if schedules: + console.print("\nšŸ“… Security Schedules:") + console.print("=" * 80) + for s in schedules: + console.print(f"\nID: [green]{s.schedule_id}[/green]") + console.print(f" Frequency: {s.frequency.value}") + console.print(f" Scan: {'āœ…' if s.scan_enabled else 'āŒ'}") + console.print(f" Patch: {'āœ…' if s.patch_enabled else 'āŒ'}") + console.print(f" Dry-run: {'āœ…' if s.dry_run else 'āŒ'}") + if s.last_run: + console.print(f" Last run: {s.last_run}") + if s.next_run: + console.print(f" Next run: {s.next_run}") + else: + cx_print("No schedules configured", "info") + + elif schedule_action == "run": + results = scheduler.run_schedule(args.id) + if results["success"]: + cx_print("āœ… Schedule execution complete", "success") + if results["scan_result"]: + console.print( + f" Vulnerabilities found: {results['scan_result']['vulnerabilities_found']}" + ) + if results["patch_result"]: + console.print( + f" Packages updated: {results['patch_result']['packages_updated']}" + ) + else: + self._print_error("āŒ Schedule execution failed") + for error in results["errors"]: + console.print(f" - {error}") + return 1 + + elif schedule_action == "install-timer": + if scheduler.install_systemd_timer(args.id): + cx_print(f"āœ… Installed systemd timer for {args.id}", "success") + else: + self._print_error(f"Failed to install systemd timer for {args.id}") + return 1 + else: + self._print_error("Please specify a schedule action (create/list/run/install-timer)") + return 1 + + return 0 + def _env_set(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: """Set an environment variable.""" app = args.app @@ -2270,6 +2552,62 @@ def main(): sandbox_exec_parser.add_argument("cmd", nargs="+", help="Command to execute") # -------------------------- + # --- Security Vulnerability Management Commands --- + security_parser = subparsers.add_parser("security", help="Security vulnerability management") + security_subs = security_parser.add_subparsers(dest="security_action", help="Security actions") + + # Security scan + sec_scan_parser = security_subs.add_parser("scan", help="Scan for vulnerabilities") + sec_scan_parser.add_argument("--package", help="Scan specific package") + sec_scan_parser.add_argument("--all", action="store_true", help="Scan all packages") + sec_scan_parser.add_argument( + "--critical", action="store_true", help="Show only critical vulnerabilities" + ) + + # Security patch + sec_patch_parser = security_subs.add_parser("patch", help="Patch vulnerabilities") + sec_patch_parser.add_argument( + "--scan-and-patch", action="store_true", help="Scan and patch automatically" + ) + sec_patch_parser.add_argument("--package", help="Patch specific package only") + sec_patch_parser.add_argument( + "--dry-run", action="store_true", default=True, help="Dry run mode (default)" + ) + sec_patch_parser.add_argument("--apply", action="store_true", help="Actually apply patches") + sec_patch_parser.add_argument( + "--strategy", + choices=["automatic", "critical_only", "high_and_above"], + default="critical_only", + help="Patching strategy", + ) + + # Security schedule + sec_schedule_parser = security_subs.add_parser("schedule", help="Manage security schedules") + sec_schedule_subs = sec_schedule_parser.add_subparsers( + dest="schedule_action", help="Schedule actions" + ) + sec_schedule_create = sec_schedule_subs.add_parser("create", help="Create a schedule") + sec_schedule_create.add_argument("id", help="Schedule ID") + sec_schedule_create.add_argument( + "--frequency", + choices=["daily", "weekly", "monthly"], + default="monthly", + help="Schedule frequency", + ) + sec_schedule_create.add_argument("--enable-patch", action="store_true", help="Enable patching") + sec_schedule_create.add_argument( + "--apply", + action="store_true", + help="Enable real patching (default: dry-run mode)", + ) + sec_schedule_subs.add_parser("list", help="List schedules") + sec_schedule_run = sec_schedule_subs.add_parser("run", help="Run a schedule") + sec_schedule_run.add_argument("id", help="Schedule ID") + sec_schedule_install = sec_schedule_subs.add_parser( + "install-timer", help="Install systemd timer" + ) + sec_schedule_install.add_argument("id", help="Schedule ID") + # --- Environment Variable Management Commands --- env_parser = subparsers.add_parser("env", help="Manage environment variables") env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions") @@ -2531,6 +2869,8 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command == "security": + return cli.security(args) else: parser.print_help() return 1 diff --git a/cortex/security_scheduler.py b/cortex/security_scheduler.py new file mode 100644 index 00000000..20481db6 --- /dev/null +++ b/cortex/security_scheduler.py @@ -0,0 +1,537 @@ +#!/usr/bin/env python3 +""" +Security Scheduler for Cortex Linux + +Schedules regular vulnerability scans and autonomous patching. +Supports systemd timers, cron, and manual scheduling. +""" + +import calendar +import json +import logging +import subprocess +from dataclasses import dataclass +from datetime import datetime, timedelta +from enum import Enum +from pathlib import Path +from typing import Any + +from cortex.autonomous_patcher import AutonomousPatcher, PatchStrategy +from cortex.vulnerability_scanner import VulnerabilityScanner + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ScheduleFrequency(Enum): + """Schedule frequency options""" + + DAILY = "daily" + WEEKLY = "weekly" + MONTHLY = "monthly" + CUSTOM = "custom" + + +@dataclass +class SecuritySchedule: + """Security scan/patch schedule configuration""" + + schedule_id: str + frequency: ScheduleFrequency + scan_enabled: bool = True + patch_enabled: bool = False + patch_strategy: PatchStrategy = PatchStrategy.CRITICAL_ONLY + dry_run: bool = True + last_run: str | None = None + next_run: str | None = None + custom_cron: str | None = None # For custom frequency + + +class SecurityScheduler: + """Manages scheduled security scans and patches""" + + def __init__(self, cortex_binary: str = "/usr/bin/cortex"): + """Initialize the security scheduler. + + Args: + cortex_binary: Path to the cortex binary for systemd service files + """ + self.config_path = Path.home() / ".cortex" / "security_schedule.json" + self.cortex_binary = cortex_binary + self.schedules: dict[str, SecuritySchedule] = {} + self._load_schedules() + + def _load_schedules(self): + """Load schedules from configuration file""" + if self.config_path.exists(): + try: + with open(self.config_path) as f: + data = json.load(f) + + for schedule_data in data.get("schedules", []): + schedule = SecuritySchedule( + schedule_id=schedule_data["schedule_id"], + frequency=ScheduleFrequency(schedule_data["frequency"]), + scan_enabled=schedule_data.get("scan_enabled", True), + patch_enabled=schedule_data.get("patch_enabled", False), + patch_strategy=PatchStrategy( + schedule_data.get("patch_strategy", "critical_only") + ), + dry_run=schedule_data.get("dry_run", True), + last_run=schedule_data.get("last_run"), + next_run=schedule_data.get("next_run"), + custom_cron=schedule_data.get("custom_cron"), + ) + self.schedules[schedule.schedule_id] = schedule + + logger.info(f"Loaded {len(self.schedules)} schedules") + except Exception as e: + logger.warning(f"Failed to load schedules: {e}") + + def _save_schedules(self): + """Save schedules to configuration file""" + self.config_path.parent.mkdir(parents=True, exist_ok=True) + + try: + data = { + "schedules": [ + { + "schedule_id": s.schedule_id, + "frequency": s.frequency.value, + "scan_enabled": s.scan_enabled, + "patch_enabled": s.patch_enabled, + "patch_strategy": s.patch_strategy.value, + "dry_run": s.dry_run, + "last_run": s.last_run, + "next_run": s.next_run, + "custom_cron": s.custom_cron, + } + for s in self.schedules.values() + ] + } + + with open(self.config_path, "w") as f: + json.dump(data, f, indent=2) + + except Exception as e: + logger.error(f"Failed to save schedules: {e}") + + def create_schedule( + self, + schedule_id: str, + frequency: ScheduleFrequency, + scan_enabled: bool = True, + patch_enabled: bool = False, + patch_strategy: PatchStrategy = PatchStrategy.CRITICAL_ONLY, + dry_run: bool = True, + custom_cron: str | None = None, + ) -> SecuritySchedule: + """ + Create a new security schedule. + + Args: + schedule_id: Unique identifier for the schedule + frequency: How often to run + scan_enabled: Enable vulnerability scanning + patch_enabled: Enable autonomous patching + patch_strategy: Patching strategy + dry_run: Run patches in dry-run mode + custom_cron: Custom cron expression (for CUSTOM frequency) + + Returns: + Created SecuritySchedule + """ + # Calculate next run time + next_run = self._calculate_next_run(frequency, custom_cron) + + schedule = SecuritySchedule( + schedule_id=schedule_id, + frequency=frequency, + scan_enabled=scan_enabled, + patch_enabled=patch_enabled, + patch_strategy=patch_strategy, + dry_run=dry_run, + next_run=next_run.isoformat() if next_run else None, + custom_cron=custom_cron, + ) + + self.schedules[schedule_id] = schedule + self._save_schedules() + + logger.info(f"Created schedule: {schedule_id} ({frequency.value})") + return schedule + + def _calculate_next_run( + self, frequency: ScheduleFrequency, custom_cron: str | None = None + ) -> datetime | None: + """Calculate next run time based on frequency""" + now = datetime.now() + + if frequency == ScheduleFrequency.DAILY: + return now + timedelta(days=1) + elif frequency == ScheduleFrequency.WEEKLY: + return now + timedelta(weeks=1) + elif frequency == ScheduleFrequency.MONTHLY: + # Properly calculate next month, handling varying month lengths + year = now.year + month = now.month + 1 + if month > 12: + month = 1 + year += 1 + # Clamp day to max days in target month (e.g., Jan 31 -> Feb 28) + max_day = calendar.monthrange(year, month)[1] + day = min(now.day, max_day) + return now.replace(year=year, month=month, day=day) + elif frequency == ScheduleFrequency.CUSTOM: + # For custom, we'd need a cron parser, but for now just return None + # and let the user manage it manually + return None + + return None + + def run_schedule(self, schedule_id: str) -> dict[str, Any]: + """ + Execute a scheduled scan/patch. + + Args: + schedule_id: Schedule to run + + Returns: + Dictionary with execution results + """ + if schedule_id not in self.schedules: + raise ValueError(f"Schedule {schedule_id} not found") + + schedule = self.schedules[schedule_id] + results = { + "schedule_id": schedule_id, + "timestamp": datetime.now().isoformat(), + "scan_result": None, + "patch_result": None, + "success": True, + "errors": [], + } + + try: + # Run scan if enabled + if schedule.scan_enabled: + logger.info(f"Running vulnerability scan for schedule {schedule_id}...") + scanner = VulnerabilityScanner() + scan_result = scanner.scan_all_packages() + + results["scan_result"] = { + "vulnerabilities_found": scan_result.vulnerabilities_found, + "critical_count": scan_result.critical_count, + "high_count": scan_result.high_count, + "medium_count": scan_result.medium_count, + "low_count": scan_result.low_count, + } + + logger.info( + f"Scan complete: {scan_result.vulnerabilities_found} vulnerabilities found" + ) + + # Run patch if enabled and vulnerabilities found + if schedule.patch_enabled and scan_result.vulnerabilities_found > 0: + logger.info(f"Running autonomous patch for schedule {schedule_id}...") + patcher = AutonomousPatcher( + strategy=schedule.patch_strategy, dry_run=schedule.dry_run + ) + + # Let AutonomousPatcher apply its own strategy/severity filters + patch_result = patcher.patch_vulnerabilities(scan_result.vulnerabilities) + + results["patch_result"] = { + "packages_updated": len(patch_result.packages_updated), + "vulnerabilities_patched": patch_result.vulnerabilities_patched, + "success": patch_result.success, + "errors": patch_result.errors, + } + + if not patch_result.success: + results["success"] = False + results["errors"].extend(patch_result.errors) + + # Update schedule + schedule.last_run = datetime.now().isoformat() + next_run_time = self._calculate_next_run(schedule.frequency, schedule.custom_cron) + schedule.next_run = next_run_time.isoformat() if next_run_time else None + self._save_schedules() + + except Exception as e: + error_msg = f"Schedule execution failed: {e}" + logger.error(error_msg) + results["success"] = False + results["errors"].append(error_msg) + + return results + + def install_systemd_timer(self, schedule_id: str) -> bool: + """ + Install a systemd timer for the schedule. + + Args: + schedule_id: Schedule to install + + Returns: + True if successful + """ + if schedule_id not in self.schedules: + logger.error(f"Schedule {schedule_id} not found") + return False + + schedule = self.schedules[schedule_id] + + # Refuse to install systemd timer for CUSTOM frequency schedules + if schedule.frequency == ScheduleFrequency.CUSTOM: + logger.error( + f"Cannot install systemd timer for schedule '{schedule_id}': " + f"CUSTOM frequency requires manual configuration. " + f"Use the custom_cron field ('{schedule.custom_cron}') with cron or " + f"create a manual systemd timer with the appropriate OnCalendar value." + ) + return False + + # Generate systemd service file + service_content = f"""[Unit] +Description=Cortex Security Scan/Patch - {schedule_id} +After=network.target + +[Service] +Type=oneshot +ExecStart={self.cortex_binary} security schedule run {schedule_id} +User=root +""" + + # Generate systemd timer file + timer_content = f"""[Unit] +Description=Cortex Security Timer - {schedule_id} +Requires=cortex-security-{schedule_id}.service + +[Timer] +OnCalendar={self._frequency_to_systemd(schedule.frequency)} +Persistent=true + +[Install] +WantedBy=timers.target +""" + + try: + # Check for root privileges first (required to write to /etc/systemd/system) + if not self._has_root_privileges(): + logger.warning("Cannot install systemd timer: root privileges required") + logger.info( + "Try running with sudo: sudo cortex security schedule install-timer " + + schedule_id + ) + return False + + # Write service file + service_path = Path(f"/etc/systemd/system/cortex-security-{schedule_id}.service") + with open(service_path, "w") as f: + f.write(service_content) + + # Write timer file + timer_path = Path(f"/etc/systemd/system/cortex-security-{schedule_id}.timer") + with open(timer_path, "w") as f: + f.write(timer_content) + + # Reload systemd and enable timer + subprocess.run(["systemctl", "daemon-reload"], check=True) + subprocess.run( + ["systemctl", "enable", f"cortex-security-{schedule_id}.timer"], check=True + ) + subprocess.run( + ["systemctl", "start", f"cortex-security-{schedule_id}.timer"], check=True + ) + + logger.info(f"āœ… Installed systemd timer for {schedule_id}") + return True + + except PermissionError as e: + logger.error(f"Permission denied: {e}") + logger.info( + "Try running with sudo: sudo cortex security schedule install-timer " + schedule_id + ) + return False + except Exception as e: + logger.error(f"Failed to install systemd timer: {e}") + return False + + def _frequency_to_systemd(self, frequency: ScheduleFrequency) -> str: + """ + Convert frequency to systemd OnCalendar format. + + Args: + frequency: The schedule frequency + + Returns: + Systemd OnCalendar string + + Raises: + ValueError: If frequency is CUSTOM (requires manual cron->systemd conversion) + """ + if frequency == ScheduleFrequency.DAILY: + return "daily" + elif frequency == ScheduleFrequency.WEEKLY: + return "weekly" + elif frequency == ScheduleFrequency.MONTHLY: + return "monthly" + elif frequency == ScheduleFrequency.CUSTOM: + raise ValueError( + "CUSTOM frequency cannot be automatically converted to systemd format. " + "Use the custom_cron field with a manual systemd timer or cron job instead." + ) + else: + raise ValueError(f"Unknown frequency: {frequency}") + + def _has_root_privileges(self) -> bool: + """Check if we have root privileges (running as root or have passwordless sudo)""" + import os + + # Check if running as root + if os.geteuid() == 0: + return True + + # Check if we have passwordless sudo access + try: + result = subprocess.run(["sudo", "-n", "true"], capture_output=True, timeout=2) + return result.returncode == 0 + except Exception: + return False + + def list_schedules(self) -> list[SecuritySchedule]: + """List all schedules""" + return list(self.schedules.values()) + + def get_schedule(self, schedule_id: str) -> SecuritySchedule | None: + """Get a specific schedule""" + return self.schedules.get(schedule_id) + + def delete_schedule(self, schedule_id: str) -> bool: + """Delete a schedule""" + if schedule_id in self.schedules: + del self.schedules[schedule_id] + self._save_schedules() + logger.info(f"Deleted schedule: {schedule_id}") + return True + return False + + +# CLI Interface +if __name__ == "__main__": + import argparse + import sys + + parser = argparse.ArgumentParser(description="Security scheduler for Cortex Linux") + subparsers = parser.add_subparsers(dest="command", help="Commands") + + # Create schedule + create_parser = subparsers.add_parser("create", help="Create a new schedule") + create_parser.add_argument("id", help="Schedule ID") + create_parser.add_argument( + "--frequency", + choices=["daily", "weekly", "monthly"], + default="monthly", + help="Schedule frequency", + ) + create_parser.add_argument("--no-scan", action="store_true", help="Disable scanning") + create_parser.add_argument("--enable-patch", action="store_true", help="Enable patching") + create_parser.add_argument( + "--patch-strategy", + choices=["automatic", "critical_only", "high_and_above"], + default="critical_only", + help="Patching strategy", + ) + create_parser.add_argument("--no-dry-run", action="store_true", help="Disable dry-run") + + # List schedules + subparsers.add_parser("list", help="List all schedules") + + # Run schedule + run_parser = subparsers.add_parser("run", help="Run a schedule") + run_parser.add_argument("id", help="Schedule ID") + + # Install systemd timer + install_parser = subparsers.add_parser("install-timer", help="Install systemd timer") + install_parser.add_argument("id", help="Schedule ID") + + # Delete schedule + delete_parser = subparsers.add_parser("delete", help="Delete a schedule") + delete_parser.add_argument("id", help="Schedule ID") + + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + + scheduler = SecurityScheduler() + + try: + if args.command == "create": + schedule = scheduler.create_schedule( + schedule_id=args.id, + frequency=ScheduleFrequency(args.frequency), + scan_enabled=not args.no_scan, + patch_enabled=args.enable_patch, + patch_strategy=PatchStrategy(args.patch_strategy), + dry_run=not args.no_dry_run, + ) + print(f"āœ… Created schedule: {args.id}") + print(f" Frequency: {schedule.frequency.value}") + print(f" Scan: {'enabled' if schedule.scan_enabled else 'disabled'}") + print(f" Patch: {'enabled' if schedule.patch_enabled else 'disabled'}") + + elif args.command == "list": + schedules = scheduler.list_schedules() + if schedules: + print("\nšŸ“… Security Schedules:") + print("=" * 80) + for s in schedules: + print(f"\nID: {s.schedule_id}") + print(f" Frequency: {s.frequency.value}") + print(f" Scan: {'āœ…' if s.scan_enabled else 'āŒ'}") + print(f" Patch: {'āœ…' if s.patch_enabled else 'āŒ'}") + print(f" Dry-run: {'āœ…' if s.dry_run else 'āŒ'}") + if s.last_run: + print(f" Last run: {s.last_run}") + if s.next_run: + print(f" Next run: {s.next_run}") + else: + print("No schedules configured") + + elif args.command == "run": + results = scheduler.run_schedule(args.id) + if results["success"]: + print("āœ… Schedule execution complete") + if results["scan_result"]: + print( + f" Vulnerabilities found: {results['scan_result']['vulnerabilities_found']}" + ) + if results["patch_result"]: + print(f" Packages updated: {results['patch_result']['packages_updated']}") + else: + print("āŒ Schedule execution failed") + for error in results["errors"]: + print(f" - {error}") + sys.exit(1) + + elif args.command == "install-timer": + if scheduler.install_systemd_timer(args.id): + print(f"āœ… Installed systemd timer for {args.id}") + else: + print("āŒ Failed to install systemd timer") + sys.exit(1) + + elif args.command == "delete": + if scheduler.delete_schedule(args.id): + print(f"āœ… Deleted schedule: {args.id}") + else: + print(f"āŒ Schedule {args.id} not found") + sys.exit(1) + + except Exception as e: + print(f"āŒ Error: {e}", file=sys.stderr) + logger.exception("CLI error") + sys.exit(1) diff --git a/cortex/vulnerability_scanner.py b/cortex/vulnerability_scanner.py new file mode 100644 index 00000000..f72a6fca --- /dev/null +++ b/cortex/vulnerability_scanner.py @@ -0,0 +1,885 @@ +#!/usr/bin/env python3 +""" +Vulnerability Scanner for Cortex Linux + +Continuously monitors installed packages for security vulnerabilities by checking +against the OSV (Open Source Vulnerability) database and provides detailed +vulnerability reports. + +Note: NVD and Safety DB support are planned for future releases. +""" + +import json +import logging +import re +import subprocess +import sys +import time +from dataclasses import dataclass +from datetime import datetime, timedelta +from enum import Enum +from pathlib import Path + +import requests + +# Regex pattern for valid Debian package names +# Per Debian Policy: lowercase alphanumeric, plus, minus, period +# Must start with alphanumeric, minimum 2 characters +_VALID_PACKAGE_NAME_PATTERN = re.compile(r"^[a-z0-9][a-z0-9+.\-]+$") + +from cortex.progress_indicators import ProgressIndicator, get_progress_indicator +from cortex.utils.db_pool import SQLiteConnectionPool, get_connection_pool + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class Severity(Enum): + """CVE severity levels""" + + CRITICAL = "critical" + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + UNKNOWN = "unknown" + + +@dataclass +class Vulnerability: + """Represents a security vulnerability""" + + cve_id: str + package_name: str + installed_version: str + affected_versions: str + severity: Severity + description: str + published_date: str | None = None + fixed_version: str | None = None + cvss_score: float | None = None + source: str = "unknown" + references: list[str] = None + + def __post_init__(self): + if self.references is None: + self.references = [] + + +@dataclass +class ScanResult: + """Result of a vulnerability scan""" + + scan_id: str + timestamp: str + total_packages_scanned: int + vulnerabilities_found: int + critical_count: int + high_count: int + medium_count: int + low_count: int + vulnerabilities: list[Vulnerability] + scan_duration_seconds: float + errors: list[str] = None + + def __post_init__(self): + if self.errors is None: + self.errors = [] + + +class VulnerabilityScanner: + """Scans installed packages for security vulnerabilities""" + + def __init__( + self, + db_path: str | None = None, + cache_hours: float = 24.0, + dpkg_timeout: int = 60, + ): + """ + Initialize the vulnerability scanner. + + Args: + db_path: Path to the SQLite cache database. Defaults to ~/.cortex/vulnerability_cache.db + cache_hours: How long to cache vulnerability results in hours. Defaults to 24. + For security-critical systems, consider reducing this (e.g., 1-6 hours) + to catch newly disclosed vulnerabilities faster. + dpkg_timeout: Timeout in seconds for dpkg-query command. Defaults to 60. + Increase for systems with many packages (5000+) or slow storage. + """ + if db_path is None: + db_path = str(Path.home() / ".cortex" / "vulnerability_cache.db") + + self.db_path = db_path + self.cache_hours = cache_hours + self.dpkg_timeout = dpkg_timeout + self._ensure_db_directory() + self._pool: SQLiteConnectionPool | None = None + self._init_database() + + # API endpoints + self.osv_api = "https://api.osv.dev/v1/query" + # NVD API endpoint reserved for future fallback support + # self.nvd_api = "https://services.nvd.nist.gov/rest/json/cves/2.0" + + # Rate limiting + self.last_api_call = 0.0 + self.min_api_interval = 0.5 + + def _ensure_db_directory(self): + db_dir = Path(self.db_path).parent + db_dir.mkdir(parents=True, exist_ok=True) + + def _init_database(self): + try: + self._pool = get_connection_pool(self.db_path, pool_size=3) + + with self._pool.get_connection() as conn: + cursor = conn.cursor() + + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS vulnerability_cache ( + package_name TEXT, + version TEXT, + cve_id TEXT, + severity TEXT, + cached_at TEXT, + expires_at TEXT, + data TEXT, + PRIMARY KEY (package_name, version, cve_id) + ) + """ + ) + + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS scan_history ( + scan_id TEXT PRIMARY KEY, + timestamp TEXT NOT NULL, + total_packages INTEGER, + vulnerabilities_found INTEGER, + scan_duration REAL, + result_json TEXT + ) + """ + ) + + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_cache_expires + ON vulnerability_cache(expires_at) + """ + ) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_scan_timestamp + ON scan_history(timestamp) + """ + ) + + conn.commit() + + logger.info(f"Vulnerability database initialized at {self.db_path}") + except Exception as e: + logger.error(f"Failed to initialize database: {e}") + raise + + def _rate_limit(self): + elapsed = time.time() - self.last_api_call + if elapsed < self.min_api_interval: + time.sleep(self.min_api_interval - elapsed) + self.last_api_call = time.time() + + def _parse_cvss_vector(self, vector_string: str) -> float | None: + """Parse a CVSS vector string and estimate a severity score.""" + if not vector_string or not vector_string.startswith("CVSS:"): + return None + + try: + parts = vector_string.split("/") + metrics = {} + for part in parts[1:]: + if ":" in part: + key, value = part.split(":", 1) + metrics[key] = value + + score = 0.0 + av_scores = {"N": 2.5, "A": 2.0, "L": 1.5, "P": 1.0} + score += av_scores.get(metrics.get("AV", "L"), 1.5) + + ac_scores = {"L": 1.5, "H": 0.5} + score += ac_scores.get(metrics.get("AC", "L"), 1.0) + + pr_scores = {"N": 1.5, "L": 1.0, "H": 0.5} + score += pr_scores.get(metrics.get("PR", "L"), 1.0) + + impact_scores = {"H": 1.5, "L": 0.75, "N": 0.0} + score += impact_scores.get(metrics.get("C", "N"), 0.5) + score += impact_scores.get(metrics.get("I", "N"), 0.5) + score += impact_scores.get(metrics.get("A", "N"), 0.5) + + return min(10.0, max(0.0, score)) + + except Exception as e: + logger.debug(f"Failed to parse CVSS vector '{vector_string}': {e}") + return None + + def _is_valid_package_name(self, name: str) -> bool: + """ + Validate package name against Debian naming conventions. + + Args: + name: Package name to validate + + Returns: + True if the package name is valid and safe to use + """ + if not name or len(name) < 2 or len(name) > 128: + return False + return bool(_VALID_PACKAGE_NAME_PATTERN.match(name)) + + def _is_valid_version_string(self, version: str) -> bool: + """ + Validate version string for safe characters. + + Args: + version: Version string to validate + + Returns: + True if the version string is safe to use + """ + if not version or len(version) > 256: + return False + # Version strings: alphanumeric, plus, minus, period, colon, tilde + # No shell metacharacters or control characters + return bool(re.match(r"^[a-zA-Z0-9+.\-:~]+$", version)) + + def _get_installed_packages(self) -> dict[str, str]: + packages = {} + skipped_count = 0 + + try: + result = subprocess.run( + ["dpkg-query", "-W", "-f=${Package}|${Version}\n"], + capture_output=True, + text=True, + timeout=self.dpkg_timeout, + ) + + if result.returncode == 0: + for line in result.stdout.strip().split("\n"): + if "|" in line: + parts = line.split("|", 1) + if len(parts) == 2: + pkg_name = parts[0].strip() + pkg_version = parts[1].strip() + + # Validate package name and version before use + if not self._is_valid_package_name(pkg_name): + logger.warning(f"Skipping package with invalid name: {pkg_name!r}") + skipped_count += 1 + continue + + if not self._is_valid_version_string(pkg_version): + logger.warning( + f"Skipping {pkg_name} with invalid version: {pkg_version!r}" + ) + skipped_count += 1 + continue + + packages[pkg_name] = pkg_version + + if skipped_count > 0: + logger.warning(f"Skipped {skipped_count} packages with invalid names/versions") + logger.info(f"Found {len(packages)} installed packages") + except Exception as e: + logger.error(f"Failed to get installed packages: {e}") + + return packages + + def get_installed_packages(self) -> dict[str, str]: + """ + Get all installed packages and their versions. + + Returns: + Dictionary mapping package names to their installed versions. + """ + return self._get_installed_packages() + + def get_package_version(self, package_name: str) -> str | None: + """ + Get the installed version of a specific package. + + Args: + package_name: Name of the package to look up. + + Returns: + The installed version string if the package is installed, None otherwise. + """ + packages = self._get_installed_packages() + return packages.get(package_name) + + def _vulnerability_to_dict(self, vuln: Vulnerability) -> dict: + return { + "cve_id": vuln.cve_id, + "package_name": vuln.package_name, + "installed_version": vuln.installed_version, + "affected_versions": vuln.affected_versions, + "severity": vuln.severity.value, + "description": vuln.description, + "published_date": vuln.published_date, + "fixed_version": vuln.fixed_version, + "cvss_score": vuln.cvss_score, + "source": vuln.source, + "references": vuln.references or [], + } + + def _dict_to_vulnerability(self, data: dict) -> Vulnerability: + severity = data.get("severity", "unknown") + if isinstance(severity, str): + try: + severity = Severity(severity) + except ValueError: + severity = Severity.UNKNOWN + + return Vulnerability( + cve_id=data.get("cve_id", ""), + package_name=data.get("package_name", ""), + installed_version=data.get("installed_version", ""), + affected_versions=data.get("affected_versions", ""), + severity=severity, + description=data.get("description", ""), + published_date=data.get("published_date"), + fixed_version=data.get("fixed_version"), + cvss_score=data.get("cvss_score"), + source=data.get("source", "unknown"), + references=data.get("references", []), + ) + + # Sentinel CVE ID used to cache "no vulnerabilities found" results + _NO_VULNS_SENTINEL = "__NO_VULNERABILITIES__" + + def _check_cache(self, package_name: str, version: str) -> list[Vulnerability] | None: + try: + with self._pool.get_connection() as conn: + cursor = conn.cursor() + + cursor.execute( + "SELECT data, expires_at, cve_id FROM vulnerability_cache WHERE package_name = ? AND version = ?", + (package_name, version), + ) + + rows = cursor.fetchall() + if not rows: + return None + + vulnerabilities = [] + expired_found = False + has_valid_entry = False + now = datetime.now() + + for row in rows: + expires_at = datetime.fromisoformat(row[1]) + cve_id = row[2] + if now < expires_at: + has_valid_entry = True + # Check for sentinel entry (no vulnerabilities) + if cve_id == self._NO_VULNS_SENTINEL: + # Valid cache entry indicating no vulnerabilities + return [] + # Cache entry is valid + data = json.loads(row[0]) + if isinstance(data, dict): + vulnerabilities.append(self._dict_to_vulnerability(data)) + elif isinstance(data, list): + for v in data: + if isinstance(v, dict): + vulnerabilities.append(self._dict_to_vulnerability(v)) + else: + # Mark that we found expired entries + expired_found = True + + # Delete all expired entries for this package/version + if expired_found: + cursor.execute( + "DELETE FROM vulnerability_cache WHERE package_name = ? AND version = ? AND expires_at < ?", + (package_name, version, now.isoformat()), + ) + conn.commit() + + # Return vulnerabilities if we have valid entries, otherwise None + if has_valid_entry: + return vulnerabilities + return None + + except Exception as e: + logger.warning(f"Cache check failed: {e}") + + return None + + def _save_cache(self, package_name: str, version: str, vulnerabilities: list[Vulnerability]): + try: + with self._pool.get_connection() as conn: + cursor = conn.cursor() + + cached_at = datetime.now() + expires_at = cached_at + timedelta(hours=self.cache_hours) + + if vulnerabilities: + # Store actual vulnerabilities + for vuln in vulnerabilities: + vuln_dict = self._vulnerability_to_dict(vuln) + cursor.execute( + "INSERT OR REPLACE INTO vulnerability_cache VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + package_name, + version, + vuln.cve_id, + vuln.severity.value, + cached_at.isoformat(), + expires_at.isoformat(), + json.dumps(vuln_dict), + ), + ) + else: + # Store sentinel entry for "no vulnerabilities found" + # This prevents re-querying OSV for clean packages + cursor.execute( + "INSERT OR REPLACE INTO vulnerability_cache VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + package_name, + version, + self._NO_VULNS_SENTINEL, + "none", + cached_at.isoformat(), + expires_at.isoformat(), + json.dumps({"sentinel": True}), + ), + ) + + conn.commit() + except Exception as e: + logger.warning(f"Failed to save cache: {e}") + + def _query_osv(self, package_name: str, version: str) -> list[Vulnerability]: + vulnerabilities = [] + + try: + self._rate_limit() + + query = { + "package": {"name": package_name, "ecosystem": "Debian"}, + "version": version, + } + + response = requests.post( + self.osv_api, json=query, timeout=10, headers={"Content-Type": "application/json"} + ) + + if response.status_code == 200: + data = response.json() + if "vulns" in data: + for vuln in data["vulns"]: + severity = Severity.UNKNOWN + cvss_score = None + + if "database_specific" in vuln: + db_spec = vuln["database_specific"] + if "severity" in db_spec: + sev_str = db_spec["severity"].upper() + if sev_str in ["CRITICAL", "CRIT"]: + severity = Severity.CRITICAL + elif sev_str == "HIGH": + severity = Severity.HIGH + elif sev_str == "MEDIUM": + severity = Severity.MEDIUM + elif sev_str == "LOW": + severity = Severity.LOW + + if "severity" in vuln: + for sev in vuln["severity"]: + if sev["type"] == "CVSS_V3": + score_value = sev.get("score", "") + if isinstance(score_value, (int, float)): + cvss_score = float(score_value) + elif isinstance(score_value, str): + try: + cvss_score = float(score_value) + except ValueError: + cvss_score = self._parse_cvss_vector(score_value) + + if cvss_score is not None: + if cvss_score >= 9.0: + severity = Severity.CRITICAL + elif cvss_score >= 7.0: + severity = Severity.HIGH + elif cvss_score >= 4.0: + severity = Severity.MEDIUM + else: + severity = Severity.LOW + + affected = "unknown" + fixed_version = None + if "affected" in vuln: + for affected_range in vuln["affected"]: + if "ranges" in affected_range: + for range_item in affected_range["ranges"]: + if "events" in range_item: + affected = str(range_item["events"]) + for event in range_item["events"]: + if "fixed" in event: + fixed_version = event["fixed"] + + references = [] + if "references" in vuln: + for ref in vuln["references"]: + if "url" in ref: + references.append(ref["url"]) + + vuln_obj = Vulnerability( + cve_id=vuln.get("id", "UNKNOWN"), + package_name=package_name, + installed_version=version, + affected_versions=affected, + severity=severity, + description=vuln.get("summary", "No description available"), + published_date=vuln.get("published", ""), + fixed_version=fixed_version, + cvss_score=cvss_score, + source="osv", + references=references, + ) + + vulnerabilities.append(vuln_obj) + + except requests.RequestException as e: + logger.warning(f"OSV query failed for {package_name}: {e}") + except Exception as e: + logger.warning(f"Error processing OSV response: {e}") + + return vulnerabilities + + def scan_package( + self, package_name: str, version: str, return_cache_status: bool = False + ) -> list[Vulnerability] | tuple[list[Vulnerability], str]: + """ + Scan a package for vulnerabilities. + + Args: + package_name: Name of the package to scan + version: Version of the package + return_cache_status: If True, returns tuple of (vulnerabilities, cache_status) + where cache_status is 'cached', 'stored', or 'none' + + Returns: + List of vulnerabilities, or tuple with cache status if requested + """ + cached = self._check_cache(package_name, version) + if cached is not None: + logger.debug(f"Cache hit for {package_name}={version}") + if return_cache_status: + return cached, "cached" + return cached + + vulnerabilities = self._query_osv(package_name, version) + + # Always cache results (including empty results to avoid re-querying clean packages) + self._save_cache(package_name, version, vulnerabilities) + cache_status = "stored" + if vulnerabilities: + logger.debug(f"Cached {len(vulnerabilities)} vulns for {package_name}={version}") + else: + logger.debug(f"Cached clean status for {package_name}={version}") + + if return_cache_status: + return vulnerabilities, cache_status + return vulnerabilities + + def scan_all_packages( + self, + package_filter: list[str] | None = None, + progress: ProgressIndicator | None = None, + ) -> ScanResult: + start_time = time.time() + scan_id = f"scan_{int(start_time)}" + + logger.info("Starting vulnerability scan...") + + installed_packages = self._get_installed_packages() + + if package_filter: + installed_packages = { + k: v for k, v in installed_packages.items() if k in package_filter + } + + total_packages = len(installed_packages) + all_vulnerabilities = [] + errors = [] + + logger.info(f"Scanning {total_packages} packages...") + + # Use provided progress indicator or get global one + if progress is None: + progress = get_progress_indicator() + + # Convert packages to list for progress bar iteration + package_items = list(installed_packages.items()) + + # Track cache statistics + cache_hits = 0 + cache_stores = 0 + + for package_name, version in progress.progress_bar( + package_items, description=f"šŸ” Scanning {total_packages} packages" + ): + try: + vulns, cache_status = self.scan_package( + package_name, version, return_cache_status=True + ) + if cache_status == "cached": + cache_hits += 1 + elif cache_status == "stored": + cache_stores += 1 + + if vulns: + # Show vulnerabilities with cache status + if cache_status == "cached": + progress.print_warning( + f"{package_name}: {len(vulns)} vulnerability(ies) Ā·retrieved from cache" + ) + elif cache_status == "stored": + progress.print_warning( + f"{package_name}: {len(vulns)} vulnerability(ies) Ā·saved to cache" + ) + else: + progress.print_warning(f"{package_name}: {len(vulns)} vulnerability(ies)") + all_vulnerabilities.extend(vulns) + except Exception as e: + error_msg = f"Failed to scan {package_name}: {e}" + logger.error(error_msg) + errors.append(error_msg) + + # Show cache summary if cache was used + api_calls = total_packages - cache_hits + if cache_hits > 0: + progress.print_info( + f"Cache: {cache_hits} cached Ā· {cache_stores} new Ā· {api_calls} API calls" + ) + logger.debug(f"Cache stats: {cache_hits} hits, {cache_stores} new entries") + + critical_count = sum(1 for v in all_vulnerabilities if v.severity == Severity.CRITICAL) + high_count = sum(1 for v in all_vulnerabilities if v.severity == Severity.HIGH) + medium_count = sum(1 for v in all_vulnerabilities if v.severity == Severity.MEDIUM) + low_count = sum(1 for v in all_vulnerabilities if v.severity == Severity.LOW) + + scan_duration = time.time() - start_time + + result = ScanResult( + scan_id=scan_id, + timestamp=datetime.now().isoformat(), + total_packages_scanned=total_packages, + vulnerabilities_found=len(all_vulnerabilities), + critical_count=critical_count, + high_count=high_count, + medium_count=medium_count, + low_count=low_count, + vulnerabilities=all_vulnerabilities, + scan_duration_seconds=scan_duration, + errors=errors, + ) + + self._save_scan_history(result) + + logger.info( + f"Scan complete: {len(all_vulnerabilities)} vulnerabilities found in {scan_duration:.2f}s" + ) + + return result + + def _save_scan_history(self, result: ScanResult): + try: + with self._pool.get_connection() as conn: + cursor = conn.cursor() + + result_dict = { + "scan_id": result.scan_id, + "timestamp": result.timestamp, + "total_packages_scanned": result.total_packages_scanned, + "vulnerabilities_found": result.vulnerabilities_found, + "critical_count": result.critical_count, + "high_count": result.high_count, + "medium_count": result.medium_count, + "low_count": result.low_count, + "vulnerabilities": [ + self._vulnerability_to_dict(v) for v in result.vulnerabilities + ], + "scan_duration_seconds": result.scan_duration_seconds, + "errors": result.errors or [], + } + + cursor.execute( + "INSERT OR REPLACE INTO scan_history VALUES (?, ?, ?, ?, ?, ?)", + ( + result.scan_id, + result.timestamp, + result.total_packages_scanned, + result.vulnerabilities_found, + result.scan_duration_seconds, + json.dumps(result_dict), + ), + ) + + conn.commit() + except Exception as e: + logger.warning(f"Failed to save scan history: {e}") + + def get_scan_history(self, limit: int = 10) -> list[ScanResult]: + results = [] + + try: + with self._pool.get_connection() as conn: + cursor = conn.cursor() + + cursor.execute( + "SELECT result_json FROM scan_history ORDER BY timestamp DESC LIMIT ?", + (limit,), + ) + + for row in cursor.fetchall(): + data = json.loads(row[0]) + vulns = [ + self._dict_to_vulnerability(v) if isinstance(v, dict) else v + for v in data.get("vulnerabilities", []) + ] + data["vulnerabilities"] = vulns + results.append(ScanResult(**data)) + + except Exception as e: + logger.error(f"Failed to get scan history: {e}") + + return results + + def get_critical_vulnerabilities(self, days: int = 30) -> list[Vulnerability]: + all_critical = [] + + try: + cutoff = datetime.now() - timedelta(days=days) + cutoff_str = cutoff.isoformat() + + with self._pool.get_connection() as conn: + cursor = conn.cursor() + + cursor.execute( + "SELECT result_json FROM scan_history WHERE timestamp >= ? ORDER BY timestamp DESC", + (cutoff_str,), + ) + + for row in cursor.fetchall(): + data = json.loads(row[0]) + vulns = [ + self._dict_to_vulnerability(v) if isinstance(v, dict) else v + for v in data.get("vulnerabilities", []) + ] + critical = [v for v in vulns if v.severity == Severity.CRITICAL] + all_critical.extend(critical) + + except Exception as e: + logger.error(f"Failed to get critical vulnerabilities: {e}") + + seen = set() + unique_critical = [] + for vuln in all_critical: + if vuln.cve_id not in seen: + seen.add(vuln.cve_id) + unique_critical.append(vuln) + + return unique_critical + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Scan packages for security vulnerabilities") + parser.add_argument("--package", help="Scan specific package") + parser.add_argument("--all", action="store_true", help="Scan all installed packages") + parser.add_argument("--history", type=int, help="Show scan history (last N scans)") + parser.add_argument( + "--critical", action="store_true", help="Show only critical vulnerabilities" + ) + + args = parser.parse_args() + + scanner = VulnerabilityScanner() + + if args.history: + history = scanner.get_scan_history(args.history) + print(f"\nšŸ“Š Scan History (last {args.history} scans):") + print("=" * 80) + for scan in history: + print(f"\nScan ID: {scan.scan_id}") + print(f"Date: {scan.timestamp}") + print(f"Packages scanned: {scan.total_packages_scanned}") + print(f"Vulnerabilities: {scan.vulnerabilities_found}") + print(f" Critical: {scan.critical_count}, High: {scan.high_count}") + print(f" Medium: {scan.medium_count}, Low: {scan.low_count}") + + elif args.critical: + critical = scanner.get_critical_vulnerabilities() + print("\nšŸ”“ Critical Vulnerabilities:") + print("=" * 80) + for vuln in critical: + print(f"\nCVE: {vuln.cve_id}") + print(f"Package: {vuln.package_name} {vuln.installed_version}") + print(f"Description: {vuln.description[:100]}...") + if vuln.fixed_version: + print(f"Fixed in: {vuln.fixed_version}") + + elif args.package: + version = scanner.get_package_version(args.package) + if version is None: + print(f"āŒ Package {args.package} not found") + sys.exit(1) + vulns = scanner.scan_package(args.package, version) + + print(f"\nšŸ” Vulnerabilities for {args.package} {version}:") + print("=" * 80) + if vulns: + for vuln in vulns: + print(f"\nCVE: {vuln.cve_id} [{vuln.severity.value.upper()}]") + print(f"Description: {vuln.description}") + if vuln.fixed_version: + print(f"Fixed in: {vuln.fixed_version}") + else: + print("āœ… No vulnerabilities found") + + elif args.all: + result = scanner.scan_all_packages() + + print("\nšŸ“Š Scan Results:") + print("=" * 80) + print(f"Packages scanned: {result.total_packages_scanned}") + print(f"Vulnerabilities found: {result.vulnerabilities_found}") + print(f" šŸ”“ Critical: {result.critical_count}") + print(f" 🟠 High: {result.high_count}") + print(f" 🟔 Medium: {result.medium_count}") + print(f" 🟢 Low: {result.low_count}") + print(f"\nScan duration: {result.scan_duration_seconds:.2f}s") + + if result.vulnerabilities: + print("\nšŸ“‹ Top Vulnerabilities:") + sorted_vulns = sorted( + result.vulnerabilities, + key=lambda v: ( + v.severity == Severity.CRITICAL, + v.severity == Severity.HIGH, + v.cvss_score or 0, + ), + reverse=True, + ) + + for vuln in sorted_vulns[:10]: + print(f"\n {vuln.cve_id} - {vuln.package_name} [{vuln.severity.value.upper()}]") + print(f" {vuln.description[:80]}...") + + else: + parser.print_help() diff --git a/docs/SECURITY_MANAGEMENT.md b/docs/SECURITY_MANAGEMENT.md new file mode 100644 index 00000000..61f9823a --- /dev/null +++ b/docs/SECURITY_MANAGEMENT.md @@ -0,0 +1,261 @@ +# Security Vulnerability Management & Autonomous Patching + +## Problem + +**Security vulnerabilities in dependencies are the #1 attack vector for Linux systems.** According to recent CVE data: + +- **25,000+ new CVEs** are published annually +- **60% of breaches** exploit known, unpatched vulnerabilities +- Average time from CVE publication to exploit: **15 days** +- Average enterprise patching cycle: **102 days** āŒ + +Cortex Linux currently has **zero automated security monitoring**. Users must: + +1. Manually check each of their 2,000+ installed packages +2. Cross-reference against CVE databases (NVD, OSV, etc.) +3. Determine which updates fix which vulnerabilities +4. Hope they don't miss a critical exploit + +**This is unacceptable for an AI-native package manager.** + +### Real-World Impact + +| Vulnerability | Impact | +|---------------|--------| +| **Log4Shell (CVE-2021-44228)** | Organizations without automated scanning took weeks to identify affected systems | +| **Heartbleed (CVE-2014-0160)** | OpenSSL vulnerability affected 17% of "secure" web servers | +| **Monthly kernel patches** | Linux releases security updates monthly — missing one can expose the entire system | + +### Current State + +```bash +# Today: Manual, error-prone, incomplete +$ apt list --upgradable | grep security # Doesn't show CVE severity +$ apt-cache policy openssl # No vulnerability context +``` + +Users are flying blind. + +--- + +## Proposed Solution + +Implement **continuous vulnerability scanning** with **autonomous patching** capabilities. + +### Core Features + +| Feature | Description | +|---------|-------------| +| **Vulnerability Scanner** | Continuously monitor installed packages against CVE databases | +| **Autonomous Patcher** | Automatically patch vulnerabilities with safety controls | +| **Security Scheduler** | Monthly/weekly/daily automated security maintenance | +| **Rollback Support** | All patches tracked in history, fully reversible | + +### Example Commands + +```bash +# Scan all installed packages for vulnerabilities +cortex security scan --all + +# Output: +# šŸ” Scanning: 2636/2636 (100%) | Vulnerabilities found: 47 +# +# šŸ“Š Scan Results: +# šŸ”“ Critical: 3 +# 🟠 High: 12 +# 🟔 Medium: 24 +# 🟢 Low: 8 + +# Scan specific package +cortex security scan --package openssl + +# Show only critical vulnerabilities +cortex security scan --critical + +# Autonomous patching (dry-run by default for safety) +cortex security patch --scan-and-patch --strategy critical_only + +# Actually apply patches +cortex security patch --scan-and-patch --strategy critical_only --apply + +# Set up monthly automated patching (suitable for desktops/low-risk systems) +cortex security schedule create monthly-patch --frequency monthly --enable-patch + +# For servers/critical systems, use weekly with critical-only strategy +cortex security schedule create weekly-critical --frequency weekly --enable-patch +cortex security schedule install-timer monthly-patch +``` + +### Patching Frequency Guidelines + +Different systems have different security requirements. Choose the appropriate patching frequency based on your use case: + +| System Type | Recommended Frequency | Rationale | +|-------------|----------------------|-----------| +| **Production servers** | Weekly or daily (critical only) | Minimize exposure window for exploitable vulnerabilities | +| **Internet-facing services** | Daily (critical/high) | High risk of exploitation; CVEs are weaponized within ~15 days | +| **Development workstations** | Weekly | Balance productivity with security; less exposure than servers | +| **Desktop/personal use** | Monthly | Standard Linux practice; lower risk profile | +| **Air-gapped/isolated systems** | Monthly | Limited attack surface; coordinate with maintenance windows | +| **Compliance-regulated (SOC2, HIPAA)** | Per policy, typically weekly | Meet audit requirements; document all patching activity | + +**When to patch more frequently:** +- After major CVE disclosures (e.g., Log4Shell, Heartbleed-class vulnerabilities) +- Systems handling sensitive data (PII, financial, healthcare) +- Publicly accessible services (web servers, APIs, databases) + +**When monthly is appropriate:** +- Internal-only systems with limited network exposure +- Systems where stability is prioritized over immediate patching +- Environments with change control processes requiring scheduled maintenance windows + +### Safety Controls + +| Control | Description | +|---------|-------------| +| **Dry-run default** | Shows what would be patched without making changes | +| **Whitelist/Blacklist** | Control which packages can be auto-patched | +| **Severity filtering** | Only patch above threshold (e.g., critical only) | +| **Rollback support** | All patches recorded in history, reversible | +| **Systemd integration** | Native Linux scheduling via timers | + +### Data Sources + +| Source | Purpose | Speed | +|--------|---------|-------| +| **OSV (Open Source Vulnerabilities)** | Primary database, comprehensive | Fast | +| **NVD (National Vulnerability Database)** | Fallback for critical packages | Slower | +| **24-hour caching** | Reduces API load | Instant (cached) | + +--- + +## Why This Matters + +### For Cortex Linux + +1. **Differentiation**: No other package manager offers AI-assisted security scanning + natural language patching +2. **Enterprise requirement**: Automated compliance for SOC2, ISO27001, HIPAA +3. **User safety**: Protect users from the 25,000+ CVEs published each year +4. **Flexible patching schedules**: From daily (critical systems) to monthly (desktops) — we make it effortless + +### Industry Statistics + +```text +ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” +│ THE PATCHING GAP │ +ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤ +│ │ +│ CVE Published ──────────────────────────────────────────▶ │ +│ │ │ +│ │ 15 days ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │ +│ ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā–¶ā”‚ Exploit Created │ │ +│ │ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │ +│ │ │ +│ │ 102 days ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │ +│ └───────────▶│ Enterprise Patch│ ← TOO SLOW! │ +│ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │ +│ │ +│ WITH CORTEX: │ +│ │ < 24 hrs ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │ +│ └───────────▶│ Auto-Detected │ ← FIXED │ +│ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │ +│ │ +ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ +``` + +--- + +## Acceptance Criteria + +- [ ] `cortex security scan --all` scans installed packages against CVE databases +- [ ] `cortex security scan --package ` scans specific package +- [ ] `cortex security scan --critical` shows only critical vulnerabilities +- [ ] `cortex security patch --scan-and-patch` creates patch plan (dry-run) +- [ ] `cortex security patch --scan-and-patch --apply` applies patches +- [ ] `cortex security schedule create` creates automated schedules +- [ ] `cortex security schedule list` lists all schedules +- [ ] `cortex security schedule run ` manually runs a schedule +- [ ] `cortex security schedule install-timer` installs systemd timer +- [ ] All patches recorded in installation history with rollback support +- [ ] Configurable whitelist/blacklist for packages +- [ ] Severity filtering (critical_only, high_and_above, automatic) +- [ ] Progress output during long scans +- [ ] Caching to avoid repeated API calls + +--- + +## Technical Implementation + +### Architecture + +``` +ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” +│ CORTEX SECURITY │ +ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤ +│ │ +│ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │ +│ │ Vulnerability │───▶│ Autonomous │───▶│ Security │ │ +│ │ Scanner │ │ Patcher │ │ Scheduler │ │ +│ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”˜ │ +│ │ │ │ │ +│ ā–¼ ā–¼ ā–¼ │ +│ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │ +│ │ Installation History │ │ +│ │ (Rollback Support) │ │ +│ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │ +ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ + │ + ā–¼ + ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” + │ External CVE Databases │ + │ • OSV (Open Source Vulns) │ + │ • NVD (National Vuln DB) │ + ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ +``` + +### Files + +| File | Purpose | +|------|---------| +| `cortex/vulnerability_scanner.py` | Scans packages against CVE databases | +| `cortex/autonomous_patcher.py` | Applies patches with safety controls | +| `cortex/security_scheduler.py` | Manages scheduled scans/patches | +| `cortex/cli.py` | CLI integration (`cortex security ...`) | + +### Configuration + +Settings stored in `~/.cortex/patcher_config.json`: + +```json +{ + "whitelist": ["nginx", "openssl"], + "blacklist": ["linux-image-generic"], + "min_severity": "medium" +} +``` + +--- + +## Priority + +**šŸ”“ Critical** + +## Labels + +`security`, `feature`, `high-priority`, `enterprise` + +## Estimated Effort + +- Implementation: 2-3 days +- Testing: 1 day +- Documentation: 0.5 day + +--- + +## References + +- [OSV API Documentation](https://osv.dev/docs/) +- [NVD API Documentation](https://nvd.nist.gov/developers) +- [CVSS v3.1 Specification](https://www.first.org/cvss/v3.1/specification-document) +- [Linux Security Updates Best Practices](https://wiki.ubuntu.com/Security/Upgrades) + diff --git a/tests/test_autonomous_patcher.py b/tests/test_autonomous_patcher.py new file mode 100644 index 00000000..917cf4a8 --- /dev/null +++ b/tests/test_autonomous_patcher.py @@ -0,0 +1,632 @@ +#!/usr/bin/env python3 +""" +Tests for Autonomous Patcher Module +""" + +import os +import tempfile +import unittest +from unittest.mock import MagicMock, patch + +from cortex.autonomous_patcher import ( + AutonomousPatcher, + PatchPlan, + PatchResult, + PatchStrategy, +) +from cortex.vulnerability_scanner import Severity, Vulnerability + + +class TestPatchStrategyEnum(unittest.TestCase): + """Test cases for PatchStrategy enum""" + + def test_strategy_values(self): + """Test strategy enum has correct values""" + self.assertEqual(PatchStrategy.AUTOMATIC.value, "automatic") + self.assertEqual(PatchStrategy.CRITICAL_ONLY.value, "critical_only") + self.assertEqual(PatchStrategy.HIGH_AND_ABOVE.value, "high_and_above") + self.assertEqual(PatchStrategy.MANUAL.value, "manual") + + +class TestPatchPlan(unittest.TestCase): + """Test cases for PatchPlan dataclass""" + + def test_patch_plan_creation(self): + """Test creating patch plan object""" + plan = PatchPlan( + vulnerabilities=[], + packages_to_update={"nginx": "1.20.0"}, + estimated_duration_minutes=5.0, + requires_reboot=False, + rollback_available=True, + ) + + self.assertEqual(len(plan.packages_to_update), 1) + self.assertEqual(plan.packages_to_update["nginx"], "1.20.0") + self.assertFalse(plan.requires_reboot) + + +class TestPatchResult(unittest.TestCase): + """Test cases for PatchResult dataclass""" + + def test_patch_result_creation(self): + """Test creating patch result object""" + result = PatchResult( + patch_id="patch_123", + timestamp="2024-01-01T00:00:00", + vulnerabilities_patched=5, + packages_updated=["nginx", "openssl"], + success=True, + errors=[], + ) + + self.assertEqual(result.patch_id, "patch_123") + self.assertEqual(result.vulnerabilities_patched, 5) + self.assertTrue(result.success) + self.assertEqual(len(result.packages_updated), 2) + + +class TestAutonomousPatcher(unittest.TestCase): + """Test cases for AutonomousPatcher""" + + def setUp(self): + """Set up test fixtures""" + # Create temp config directory + self.temp_dir = tempfile.mkdtemp() + self.config_path = os.path.join(self.temp_dir, "patcher_config.json") + + # Use temp config path to avoid touching real user config + self.patcher = AutonomousPatcher( + strategy=PatchStrategy.CRITICAL_ONLY, + dry_run=True, + config_path=self.config_path, + ) + + def tearDown(self): + """Clean up temporary files""" + import shutil + + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + def test_initialization_defaults(self): + """Test patcher initializes with correct defaults""" + patcher = AutonomousPatcher(config_path=self.config_path) + + self.assertEqual(patcher.strategy, PatchStrategy.CRITICAL_ONLY) + self.assertTrue(patcher.dry_run) + self.assertFalse(patcher.auto_approve) + + def test_initialization_custom_strategy(self): + """Test patcher with custom strategy""" + patcher = AutonomousPatcher( + strategy=PatchStrategy.HIGH_AND_ABOVE, config_path=self.config_path + ) + + self.assertEqual(patcher.strategy, PatchStrategy.HIGH_AND_ABOVE) + + def test_should_patch_blacklisted(self): + """Test blacklisted packages are not patched""" + self.patcher.blacklist = {"nginx"} + + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.CRITICAL, + description="Test vulnerability", + ) + + self.assertFalse(self.patcher._should_patch(vuln)) + + def test_should_patch_whitelisted(self): + """Test whitelisted packages are always patched""" + self.patcher.whitelist = {"nginx"} + self.patcher.strategy = PatchStrategy.MANUAL # Would normally block + + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.LOW, # Below normal threshold + description="Test vulnerability", + ) + + self.assertTrue(self.patcher._should_patch(vuln)) + + def test_should_patch_critical_only_strategy(self): + """Test critical only strategy""" + self.patcher.strategy = PatchStrategy.CRITICAL_ONLY + + critical_vuln = Vulnerability( + cve_id="CVE-CRITICAL", + package_name="test", + installed_version="1.0", + affected_versions="all", + severity=Severity.CRITICAL, + description="Critical", + ) + + high_vuln = Vulnerability( + cve_id="CVE-HIGH", + package_name="test", + installed_version="1.0", + affected_versions="all", + severity=Severity.HIGH, + description="High", + ) + + self.assertTrue(self.patcher._should_patch(critical_vuln)) + self.assertFalse(self.patcher._should_patch(high_vuln)) + + def test_should_patch_high_and_above_strategy(self): + """Test high and above strategy""" + self.patcher.strategy = PatchStrategy.HIGH_AND_ABOVE + + critical_vuln = Vulnerability( + cve_id="CVE-CRITICAL", + package_name="test", + installed_version="1.0", + affected_versions="all", + severity=Severity.CRITICAL, + description="Critical", + ) + + high_vuln = Vulnerability( + cve_id="CVE-HIGH", + package_name="test", + installed_version="1.0", + affected_versions="all", + severity=Severity.HIGH, + description="High", + ) + + medium_vuln = Vulnerability( + cve_id="CVE-MEDIUM", + package_name="test", + installed_version="1.0", + affected_versions="all", + severity=Severity.MEDIUM, + description="Medium", + ) + + self.assertTrue(self.patcher._should_patch(critical_vuln)) + self.assertTrue(self.patcher._should_patch(high_vuln)) + self.assertFalse(self.patcher._should_patch(medium_vuln)) + + def test_should_patch_automatic_strategy(self): + """Test automatic strategy patches all""" + self.patcher.strategy = PatchStrategy.AUTOMATIC + self.patcher.min_severity = Severity.LOW + + low_vuln = Vulnerability( + cve_id="CVE-LOW", + package_name="test", + installed_version="1.0", + affected_versions="all", + severity=Severity.LOW, + description="Low", + ) + + self.assertTrue(self.patcher._should_patch(low_vuln)) + + def test_should_patch_manual_strategy(self): + """Test manual strategy blocks all automatic patching""" + self.patcher.strategy = PatchStrategy.MANUAL + + critical_vuln = Vulnerability( + cve_id="CVE-CRITICAL", + package_name="test", + installed_version="1.0", + affected_versions="all", + severity=Severity.CRITICAL, + description="Critical", + ) + + self.assertFalse(self.patcher._should_patch(critical_vuln)) + + def test_should_patch_respects_min_severity(self): + """Test minimum severity filtering""" + self.patcher.strategy = PatchStrategy.AUTOMATIC + self.patcher.min_severity = Severity.HIGH + + medium_vuln = Vulnerability( + cve_id="CVE-MEDIUM", + package_name="test", + installed_version="1.0", + affected_versions="all", + severity=Severity.MEDIUM, + description="Medium", + ) + + self.assertFalse(self.patcher._should_patch(medium_vuln)) + + @patch("subprocess.run") + def test_run_command_success(self, mock_run): + """Test running command successfully""" + mock_run.return_value = MagicMock(returncode=0, stdout="output", stderr="") + + success, stdout, stderr = self.patcher._run_command(["echo", "test"]) + + self.assertTrue(success) + self.assertEqual(stdout, "output") + + @patch("subprocess.run") + def test_run_command_failure(self, mock_run): + """Test running command with failure""" + mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="error") + + success, stdout, stderr = self.patcher._run_command(["false"]) + + self.assertFalse(success) + self.assertEqual(stderr, "error") + + @patch("subprocess.run") + def test_run_command_timeout(self, mock_run): + """Test running command with timeout""" + import subprocess + + mock_run.side_effect = subprocess.TimeoutExpired(cmd="test", timeout=300) + + success, stdout, stderr = self.patcher._run_command(["sleep", "1000"]) + + self.assertFalse(success) + self.assertIn("timed out", stderr.lower()) + + def test_create_patch_plan_empty(self): + """Test creating patch plan with no vulnerabilities""" + plan = self.patcher.create_patch_plan(vulnerabilities=[]) + + self.assertEqual(len(plan.vulnerabilities), 0) + self.assertEqual(len(plan.packages_to_update), 0) + self.assertEqual(plan.estimated_duration_minutes, 0.0) + + @patch.object(AutonomousPatcher, "_check_package_update_available") + @patch.object(AutonomousPatcher, "_update_fixes_vulnerability") + @patch.object(AutonomousPatcher, "ensure_apt_updated") + def test_create_patch_plan_with_updates(self, mock_apt, mock_fixes, mock_check): + """Test creating patch plan with available updates""" + mock_apt.return_value = True + mock_check.return_value = "1.20.0" + mock_fixes.return_value = True + + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.CRITICAL, + description="Test vulnerability", + fixed_version="1.20.0", + ) + + plan = self.patcher.create_patch_plan(vulnerabilities=[vuln]) + + self.assertEqual(len(plan.vulnerabilities), 1) + self.assertIn("nginx", plan.packages_to_update) + + @patch.object(AutonomousPatcher, "_check_package_update_available") + @patch.object(AutonomousPatcher, "_update_fixes_vulnerability") + @patch.object(AutonomousPatcher, "ensure_apt_updated") + def test_create_patch_plan_detects_kernel_reboot(self, mock_apt, mock_fixes, mock_check): + """Test patch plan detects kernel updates require reboot""" + mock_apt.return_value = True + mock_check.return_value = "5.15.0-100" + mock_fixes.return_value = True + + vuln = Vulnerability( + cve_id="CVE-2023-KERNEL", + package_name="linux-image-5.15.0-generic", + installed_version="5.15.0-90", + affected_versions="< 5.15.0-100", + severity=Severity.CRITICAL, + description="Kernel vulnerability", + fixed_version="5.15.0-100", + ) + + plan = self.patcher.create_patch_plan(vulnerabilities=[vuln]) + + self.assertTrue(plan.requires_reboot) + + @patch.object(AutonomousPatcher, "_check_package_update_available") + @patch.object(AutonomousPatcher, "_update_fixes_vulnerability") + @patch.object(AutonomousPatcher, "ensure_apt_updated") + def test_create_patch_plan_skips_unfixed_vulns(self, mock_apt, mock_fixes, mock_check): + """Test patch plan skips vulnerabilities not fixed by available update""" + mock_apt.return_value = True + mock_check.return_value = "1.19.0" # Available version + mock_fixes.return_value = False # Doesn't fix + + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.CRITICAL, + description="Test vulnerability", + fixed_version="1.20.0", # Requires 1.20.0, but only 1.19.0 available + ) + + plan = self.patcher.create_patch_plan(vulnerabilities=[vuln]) + + # Should not include this package since update doesn't fix the vulnerability + self.assertEqual(len(plan.vulnerabilities), 0) + self.assertNotIn("nginx", plan.packages_to_update) + + def test_apply_patch_plan_empty(self): + """Test applying empty patch plan""" + plan = PatchPlan( + vulnerabilities=[], + packages_to_update={}, + estimated_duration_minutes=0.0, + requires_reboot=False, + rollback_available=True, + ) + + result = self.patcher.apply_patch_plan(plan) + + self.assertTrue(result.success) + self.assertEqual(result.vulnerabilities_patched, 0) + + def test_apply_patch_plan_dry_run(self): + """Test applying patch plan in dry run mode""" + self.patcher.dry_run = True + + plan = PatchPlan( + vulnerabilities=[], + packages_to_update={"nginx": "1.20.0", "curl": "7.80.0"}, + estimated_duration_minutes=2.0, + requires_reboot=False, + rollback_available=True, + ) + + result = self.patcher.apply_patch_plan(plan) + + self.assertTrue(result.success) + self.assertEqual(len(result.packages_updated), 2) + # In dry run, packages are listed but not actually updated + + def test_add_to_whitelist(self): + """Test adding package to whitelist""" + self.patcher.whitelist = set() + self.patcher.add_to_whitelist("nginx") + + self.assertIn("nginx", self.patcher.whitelist) + + def test_add_to_blacklist(self): + """Test adding package to blacklist""" + self.patcher.blacklist = set() + self.patcher.add_to_blacklist("linux-image") + + self.assertIn("linux-image", self.patcher.blacklist) + + def test_set_min_severity(self): + """Test setting minimum severity""" + self.patcher.set_min_severity(Severity.HIGH) + + self.assertEqual(self.patcher.min_severity, Severity.HIGH) + + +class TestAutonomousPatcherAptUpdate(unittest.TestCase): + """Test apt update functionality""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.config_path = os.path.join(self.temp_dir, "patcher_config.json") + self.patcher = AutonomousPatcher(dry_run=True, config_path=self.config_path) + + def tearDown(self): + import shutil + + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + @patch("subprocess.run") + def test_ensure_apt_updated_first_call(self, mock_run): + """Test apt update runs on first call""" + mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") + + result = self.patcher.ensure_apt_updated() + + self.assertTrue(result) + mock_run.assert_called() + + @patch("subprocess.run") + def test_ensure_apt_updated_force(self, mock_run): + """Test apt update can be forced""" + mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") + + result = self.patcher.ensure_apt_updated(force=True) + + self.assertTrue(result) + + @patch("subprocess.run") + def test_check_package_update_available(self, mock_run): + """Test checking for package updates""" + mock_run.return_value = MagicMock( + returncode=0, + stdout="nginx:\n Installed: 1.18.0\n Candidate: 1.20.0\n Version table:\n", + stderr="", + ) + + version = self.patcher._check_package_update_available("nginx") + + self.assertEqual(version, "1.20.0") + + @patch("subprocess.run") + def test_check_package_update_not_available(self, mock_run): + """Test when no update is available""" + mock_run.return_value = MagicMock( + returncode=0, + stdout="nginx:\n Installed: 1.20.0\n Candidate: (none)\n", + stderr="", + ) + + version = self.patcher._check_package_update_available("nginx") + + self.assertIsNone(version) + + +class TestAutonomousPatcherConfig(unittest.TestCase): + """Test configuration save/load""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.config_path = os.path.join(self.temp_dir, "patcher_config.json") + + def tearDown(self): + import shutil + + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + def test_save_and_load_config(self): + """Test saving and loading configuration""" + patcher = AutonomousPatcher(config_path=self.config_path) + patcher.whitelist = {"nginx", "apache2"} + patcher.blacklist = {"kernel"} + patcher.min_severity = Severity.HIGH + + # Save config via add_to_whitelist + patcher.add_to_whitelist("curl") + self.assertIn("curl", patcher.whitelist) + + # Verify config was saved to temp file + self.assertTrue(os.path.exists(self.config_path)) + + # Load config in new instance + patcher2 = AutonomousPatcher(config_path=self.config_path) + self.assertIn("curl", patcher2.whitelist) + self.assertIn("nginx", patcher2.whitelist) + self.assertIn("kernel", patcher2.blacklist) + + +class TestVersionComparison(unittest.TestCase): + """Test version comparison and vulnerability fix verification""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.config_path = os.path.join(self.temp_dir, "patcher_config.json") + self.patcher = AutonomousPatcher(dry_run=True, config_path=self.config_path) + + def tearDown(self): + import shutil + + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + @patch("subprocess.run") + def test_compare_versions_greater(self, mock_run): + """Test version comparison with greater version""" + mock_run.return_value = MagicMock(returncode=0) + + result = self.patcher._compare_versions("1.20.0", "gt", "1.18.0") + self.assertTrue(result) + + @patch("subprocess.run") + def test_compare_versions_less(self, mock_run): + """Test version comparison with lesser version""" + mock_run.return_value = MagicMock(returncode=1) # dpkg returns 1 if comparison fails + + result = self.patcher._compare_versions("1.18.0", "gt", "1.20.0") + self.assertFalse(result) + + @patch("subprocess.run") + def test_compare_versions_equal(self, mock_run): + """Test version comparison with equal versions""" + mock_run.return_value = MagicMock(returncode=0) + + result = self.patcher._compare_versions("1.20.0", "eq", "1.20.0") + self.assertTrue(result) + + @patch("subprocess.run") + def test_compare_versions_ge(self, mock_run): + """Test version comparison with greater or equal""" + mock_run.return_value = MagicMock(returncode=0) + + result = self.patcher._compare_versions("1.20.0", "ge", "1.18.0") + self.assertTrue(result) + + @patch.object(AutonomousPatcher, "_compare_versions") + def test_update_fixes_vulnerability_yes(self, mock_compare): + """Test update fixes vulnerability when version is sufficient""" + mock_compare.return_value = True + + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.HIGH, + description="Test vulnerability", + fixed_version="1.20.0", + ) + + result = self.patcher._update_fixes_vulnerability("1.20.0", vuln) + self.assertTrue(result) + mock_compare.assert_called_with("1.20.0", "ge", "1.20.0") + + @patch.object(AutonomousPatcher, "_compare_versions") + def test_update_fixes_vulnerability_no(self, mock_compare): + """Test update does not fix vulnerability when version is insufficient""" + mock_compare.return_value = False + + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.HIGH, + description="Test vulnerability", + fixed_version="1.20.0", + ) + + result = self.patcher._update_fixes_vulnerability("1.19.0", vuln) + self.assertFalse(result) + + def test_update_fixes_vulnerability_no_fixed_version_default(self): + """Test update verification rejects when no fixed_version is specified (default)""" + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.HIGH, + description="Test vulnerability", + fixed_version=None, # No fixed version specified + ) + + # Should return False by default when fixed_version is unknown (refuse unverified) + self.assertFalse(self.patcher.allow_unverified_patches) + result = self.patcher._update_fixes_vulnerability("1.20.0", vuln) + self.assertFalse(result) + + def test_update_fixes_vulnerability_no_fixed_version_allow_unverified(self): + """Test update verification allows when allow_unverified_patches is True""" + # Create patcher with allow_unverified_patches=True + patcher = AutonomousPatcher( + dry_run=True, + config_path=self.config_path, + allow_unverified_patches=True, + ) + + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.HIGH, + description="Test vulnerability", + fixed_version=None, # No fixed version specified + ) + + # Should return True when allow_unverified_patches is enabled + self.assertTrue(patcher.allow_unverified_patches) + result = patcher._update_fixes_vulnerability("1.20.0", vuln) + self.assertTrue(result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_security_scheduler.py b/tests/test_security_scheduler.py new file mode 100644 index 00000000..e5295fcd --- /dev/null +++ b/tests/test_security_scheduler.py @@ -0,0 +1,628 @@ +#!/usr/bin/env python3 +""" +Tests for Security Scheduler Module +""" + +import json +import os +import tempfile +import unittest +from datetime import datetime +from unittest.mock import MagicMock, patch + +from cortex.autonomous_patcher import PatchStrategy +from cortex.security_scheduler import ( + ScheduleFrequency, + SecuritySchedule, + SecurityScheduler, +) + + +class TestScheduleFrequencyEnum(unittest.TestCase): + """Test cases for ScheduleFrequency enum""" + + def test_frequency_values(self): + """Test frequency enum has correct values""" + self.assertEqual(ScheduleFrequency.DAILY.value, "daily") + self.assertEqual(ScheduleFrequency.WEEKLY.value, "weekly") + self.assertEqual(ScheduleFrequency.MONTHLY.value, "monthly") + + def test_custom_frequency(self): + """Test custom frequency value""" + self.assertEqual(ScheduleFrequency.CUSTOM.value, "custom") + + +class TestSecuritySchedule(unittest.TestCase): + """Test cases for SecuritySchedule dataclass""" + + def test_schedule_creation(self): + """Test creating security schedule object""" + schedule = SecuritySchedule( + schedule_id="monthly_scan", + frequency=ScheduleFrequency.MONTHLY, + scan_enabled=True, + patch_enabled=False, + ) + + self.assertEqual(schedule.schedule_id, "monthly_scan") + self.assertEqual(schedule.frequency, ScheduleFrequency.MONTHLY) + self.assertTrue(schedule.scan_enabled) + self.assertFalse(schedule.patch_enabled) + self.assertTrue(schedule.dry_run) # Default value + + def test_schedule_defaults(self): + """Test schedule default values""" + schedule = SecuritySchedule( + schedule_id="test", + frequency=ScheduleFrequency.DAILY, + ) + + self.assertTrue(schedule.scan_enabled) + self.assertFalse(schedule.patch_enabled) + self.assertEqual(schedule.patch_strategy, PatchStrategy.CRITICAL_ONLY) + self.assertTrue(schedule.dry_run) + self.assertIsNone(schedule.last_run) + self.assertIsNone(schedule.next_run) + self.assertIsNone(schedule.custom_cron) + + def test_schedule_with_patch_enabled(self): + """Test schedule with patching enabled""" + schedule = SecuritySchedule( + schedule_id="auto_patch", + frequency=ScheduleFrequency.WEEKLY, + scan_enabled=True, + patch_enabled=True, + patch_strategy=PatchStrategy.HIGH_AND_ABOVE, + dry_run=False, + ) + + self.assertTrue(schedule.patch_enabled) + self.assertEqual(schedule.patch_strategy, PatchStrategy.HIGH_AND_ABOVE) + self.assertFalse(schedule.dry_run) + + def test_schedule_with_custom_cron(self): + """Test schedule with custom cron expression""" + schedule = SecuritySchedule( + schedule_id="custom_schedule", + frequency=ScheduleFrequency.CUSTOM, + scan_enabled=True, + custom_cron="0 3 * * 0", # Every Sunday at 3 AM + ) + + self.assertEqual(schedule.frequency, ScheduleFrequency.CUSTOM) + self.assertEqual(schedule.custom_cron, "0 3 * * 0") + + +class TestSecurityScheduler(unittest.TestCase): + """Test cases for SecurityScheduler""" + + def setUp(self): + """Set up test fixtures""" + self.temp_dir = tempfile.mkdtemp() + self.config_path = os.path.join(self.temp_dir, "security_schedule.json") + # Patch the config path + self.config_patcher = patch.object( + SecurityScheduler, + "__init__", + lambda self_obj: self._init_scheduler(self_obj), + ) + + def _init_scheduler(self, scheduler_obj): + """Custom init for testing with temp config path""" + from pathlib import Path + + scheduler_obj.config_path = Path(self.config_path) + scheduler_obj.schedules = {} + # Don't call _load_schedules since config doesn't exist yet + + def tearDown(self): + """Clean up temporary files""" + import shutil + + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + def test_initialization(self): + """Test scheduler initializes correctly""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.config_path = None + scheduler.schedules = {} + self.assertIsNotNone(scheduler) + self.assertIsInstance(scheduler.schedules, dict) + + def test_create_schedule(self): + """Test creating a schedule""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + from pathlib import Path + + scheduler.config_path = Path(self.config_path) + scheduler.schedules = {} + + schedule = scheduler.create_schedule( + schedule_id="test_schedule", + frequency=ScheduleFrequency.WEEKLY, + scan_enabled=True, + patch_enabled=False, + ) + + self.assertEqual(schedule.schedule_id, "test_schedule") + self.assertIn("test_schedule", scheduler.schedules) + + def test_create_schedule_with_patch(self): + """Test creating schedule with patching enabled""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + from pathlib import Path + + scheduler.config_path = Path(self.config_path) + scheduler.schedules = {} + + schedule = scheduler.create_schedule( + schedule_id="patch_schedule", + frequency=ScheduleFrequency.MONTHLY, + scan_enabled=True, + patch_enabled=True, + patch_strategy=PatchStrategy.HIGH_AND_ABOVE, + dry_run=False, + ) + + self.assertTrue(schedule.patch_enabled) + self.assertEqual(schedule.patch_strategy, PatchStrategy.HIGH_AND_ABOVE) + self.assertFalse(schedule.dry_run) + + def test_get_schedule(self): + """Test getting a schedule by ID""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + from pathlib import Path + + scheduler.config_path = Path(self.config_path) + scheduler.schedules = {} + + scheduler.create_schedule( + schedule_id="get_test", + frequency=ScheduleFrequency.DAILY, + scan_enabled=True, + ) + + schedule = scheduler.get_schedule("get_test") + + self.assertIsNotNone(schedule) + self.assertEqual(schedule.schedule_id, "get_test") + + def test_get_nonexistent_schedule(self): + """Test getting non-existent schedule returns None""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + schedule = scheduler.get_schedule("nonexistent") + self.assertIsNone(schedule) + + def test_delete_schedule(self): + """Test deleting a schedule""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + from pathlib import Path + + scheduler.config_path = Path(self.config_path) + scheduler.schedules = {} + + scheduler.create_schedule( + schedule_id="to_delete", + frequency=ScheduleFrequency.DAILY, + scan_enabled=True, + ) + + success = scheduler.delete_schedule("to_delete") + + self.assertTrue(success) + self.assertNotIn("to_delete", scheduler.schedules) + + def test_delete_nonexistent_schedule(self): + """Test deleting non-existent schedule returns False""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + from pathlib import Path + + scheduler.config_path = Path(self.config_path) + scheduler.schedules = {} + + success = scheduler.delete_schedule("nonexistent") + self.assertFalse(success) + + def test_list_schedules(self): + """Test listing all schedules""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + from pathlib import Path + + scheduler.config_path = Path(self.config_path) + scheduler.schedules = {} + + scheduler.create_schedule( + schedule_id="schedule_1", + frequency=ScheduleFrequency.DAILY, + scan_enabled=True, + ) + scheduler.create_schedule( + schedule_id="schedule_2", + frequency=ScheduleFrequency.WEEKLY, + scan_enabled=True, + ) + + schedules = scheduler.list_schedules() + + self.assertEqual(len(schedules), 2) + schedule_ids = [s.schedule_id for s in schedules] + self.assertIn("schedule_1", schedule_ids) + self.assertIn("schedule_2", schedule_ids) + + def test_calculate_next_run_daily(self): + """Test calculating next run time for daily schedule""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + next_run = scheduler._calculate_next_run(ScheduleFrequency.DAILY) + + self.assertIsNotNone(next_run) + # Should be roughly 1 day from now + delta = next_run - datetime.now() + self.assertGreater(delta.total_seconds(), 23 * 3600) # At least 23 hours + self.assertLess(delta.total_seconds(), 25 * 3600) # Less than 25 hours + + def test_calculate_next_run_weekly(self): + """Test calculating next run time for weekly schedule""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + next_run = scheduler._calculate_next_run(ScheduleFrequency.WEEKLY) + + self.assertIsNotNone(next_run) + # Should be roughly 1 week from now + delta = next_run - datetime.now() + self.assertGreaterEqual(delta.days, 6) + self.assertLessEqual(delta.days, 8) + + def test_calculate_next_run_monthly(self): + """Test calculating next run time for monthly schedule""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + next_run = scheduler._calculate_next_run(ScheduleFrequency.MONTHLY) + + self.assertIsNotNone(next_run) + # Should be roughly 30 days from now + delta = next_run - datetime.now() + self.assertGreaterEqual(delta.days, 29) + self.assertLessEqual(delta.days, 31) + + def test_calculate_next_run_custom(self): + """Test calculating next run for custom frequency returns None""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + next_run = scheduler._calculate_next_run(ScheduleFrequency.CUSTOM) + self.assertIsNone(next_run) + + +class TestSecuritySchedulerSystemd(unittest.TestCase): + """Test systemd timer generation""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.config_path = os.path.join(self.temp_dir, "security_schedule.json") + + def tearDown(self): + import shutil + + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + def test_frequency_to_systemd_daily(self): + """Test converting daily frequency to systemd format""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + result = scheduler._frequency_to_systemd(ScheduleFrequency.DAILY) + self.assertEqual(result, "daily") + + def test_frequency_to_systemd_weekly(self): + """Test converting weekly frequency to systemd format""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + result = scheduler._frequency_to_systemd(ScheduleFrequency.WEEKLY) + self.assertEqual(result, "weekly") + + def test_frequency_to_systemd_monthly(self): + """Test converting monthly frequency to systemd format""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + result = scheduler._frequency_to_systemd(ScheduleFrequency.MONTHLY) + self.assertEqual(result, "monthly") + + def test_frequency_to_systemd_custom(self): + """Test custom frequency raises ValueError (cannot auto-convert to systemd)""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + with self.assertRaises(ValueError) as context: + scheduler._frequency_to_systemd(ScheduleFrequency.CUSTOM) + + self.assertIn("CUSTOM frequency cannot be automatically converted", str(context.exception)) + + @patch("os.geteuid") + def test_has_root_privileges_as_root(self, mock_geteuid): + """Test root privilege check when running as root""" + mock_geteuid.return_value = 0 + + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + has_root = scheduler._has_root_privileges() + self.assertTrue(has_root) + + @patch("os.geteuid") + @patch("subprocess.run") + def test_has_root_privileges_with_sudo(self, mock_run, mock_geteuid): + """Test root privilege check with passwordless sudo""" + mock_geteuid.return_value = 1000 # Non-root + mock_run.return_value = MagicMock(returncode=0) + + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + has_root = scheduler._has_root_privileges() + self.assertTrue(has_root) + + @patch("os.geteuid") + @patch("subprocess.run") + def test_has_root_privileges_without_sudo(self, mock_run, mock_geteuid): + """Test root privilege check without sudo access""" + mock_geteuid.return_value = 1000 # Non-root + mock_run.return_value = MagicMock(returncode=1) + + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + has_root = scheduler._has_root_privileges() + self.assertFalse(has_root) + + @patch.object(SecurityScheduler, "_has_root_privileges") + def test_install_systemd_timer_no_privileges(self, mock_has_root): + """Test installing timer fails without root""" + mock_has_root.return_value = False + + with patch.object(SecurityScheduler, "__init__", lambda x: None): + from pathlib import Path + + scheduler = SecurityScheduler() + scheduler.config_path = Path(self.config_path) + scheduler.cortex_binary = "/usr/bin/cortex" + scheduler.schedules = {} + + scheduler.create_schedule( + schedule_id="no_root_test", + frequency=ScheduleFrequency.DAILY, + scan_enabled=True, + ) + + success = scheduler.install_systemd_timer("no_root_test") + self.assertFalse(success) + + def test_install_systemd_timer_nonexistent_schedule(self): + """Test installing timer for non-existent schedule fails""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + success = scheduler.install_systemd_timer("nonexistent") + self.assertFalse(success) + + +class TestSecuritySchedulerExecution(unittest.TestCase): + """Test schedule execution""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.config_path = os.path.join(self.temp_dir, "security_schedule.json") + + def tearDown(self): + import shutil + + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + @patch("cortex.security_scheduler.VulnerabilityScanner") + def test_run_schedule_scan_only(self, mock_scanner_class): + """Test running schedule with scan only""" + mock_scanner = MagicMock() + mock_scan_result = MagicMock() + mock_scan_result.vulnerabilities = [] + mock_scan_result.vulnerabilities_found = 0 + mock_scan_result.critical_count = 0 + mock_scan_result.high_count = 0 + mock_scan_result.medium_count = 0 + mock_scan_result.low_count = 0 + mock_scanner.scan_all_packages.return_value = mock_scan_result + mock_scanner_class.return_value = mock_scanner + + with patch.object(SecurityScheduler, "__init__", lambda x: None): + from pathlib import Path + + scheduler = SecurityScheduler() + scheduler.config_path = Path(self.config_path) + scheduler.schedules = {} + + scheduler.create_schedule( + schedule_id="scan_only", + frequency=ScheduleFrequency.DAILY, + scan_enabled=True, + patch_enabled=False, + ) + + result = scheduler.run_schedule("scan_only") + + self.assertTrue(result["success"]) + self.assertIsNotNone(result["scan_result"]) + mock_scanner.scan_all_packages.assert_called_once() + + def test_run_nonexistent_schedule(self): + """Test running non-existent schedule raises error""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + scheduler = SecurityScheduler() + scheduler.schedules = {} + + with self.assertRaises(ValueError) as context: + scheduler.run_schedule("nonexistent") + + self.assertIn("not found", str(context.exception).lower()) + + @patch("cortex.security_scheduler.VulnerabilityScanner") + @patch("cortex.security_scheduler.AutonomousPatcher") + def test_run_schedule_with_patching(self, mock_patcher_class, mock_scanner_class): + """Test running schedule with patching enabled""" + # Setup mock scanner + mock_scanner = MagicMock() + mock_vuln = MagicMock() + mock_vuln.severity.value = "critical" + mock_scan_result = MagicMock() + mock_scan_result.vulnerabilities = [mock_vuln] + mock_scan_result.vulnerabilities_found = 1 + mock_scan_result.critical_count = 1 + mock_scan_result.high_count = 0 + mock_scan_result.medium_count = 0 + mock_scan_result.low_count = 0 + mock_scanner.scan_all_packages.return_value = mock_scan_result + mock_scanner_class.return_value = mock_scanner + + # Setup mock patcher + mock_patcher = MagicMock() + mock_patch_result = MagicMock() + mock_patch_result.packages_updated = ["test-pkg"] + mock_patch_result.vulnerabilities_patched = 1 + mock_patch_result.success = True + mock_patch_result.errors = [] + mock_patcher.patch_vulnerabilities.return_value = mock_patch_result + mock_patcher_class.return_value = mock_patcher + + with patch.object(SecurityScheduler, "__init__", lambda x: None): + from pathlib import Path + + scheduler = SecurityScheduler() + scheduler.config_path = Path(self.config_path) + scheduler.schedules = {} + + scheduler.create_schedule( + schedule_id="patch_test", + frequency=ScheduleFrequency.DAILY, + scan_enabled=True, + patch_enabled=True, + ) + + result = scheduler.run_schedule("patch_test") + + self.assertTrue(result["success"]) + self.assertIsNotNone(result["patch_result"]) + self.assertEqual(result["patch_result"]["packages_updated"], 1) + + +class TestSecuritySchedulerSaveLoad(unittest.TestCase): + """Test schedule persistence""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.config_path = os.path.join(self.temp_dir, "security_schedule.json") + + def tearDown(self): + import shutil + + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + def test_save_schedules(self): + """Test saving schedules to file""" + with patch.object(SecurityScheduler, "__init__", lambda x: None): + from pathlib import Path + + scheduler = SecurityScheduler() + scheduler.config_path = Path(self.config_path) + scheduler.schedules = {} + + scheduler.create_schedule( + schedule_id="save_test", + frequency=ScheduleFrequency.WEEKLY, + scan_enabled=True, + patch_enabled=True, + ) + + # Verify file was created + self.assertTrue(os.path.exists(self.config_path)) + + # Verify content + with open(self.config_path) as f: + data = json.load(f) + + self.assertIn("schedules", data) + self.assertEqual(len(data["schedules"]), 1) + self.assertEqual(data["schedules"][0]["schedule_id"], "save_test") + + def test_load_schedules(self): + """Test loading schedules from file""" + # Create a config file manually + from pathlib import Path + + config_path = Path(self.config_path) + config_path.parent.mkdir(parents=True, exist_ok=True) + + config_data = { + "schedules": [ + { + "schedule_id": "loaded_schedule", + "frequency": "monthly", + "scan_enabled": True, + "patch_enabled": False, + "patch_strategy": "critical_only", + "dry_run": True, + "last_run": None, + "next_run": None, + "custom_cron": None, + } + ] + } + + with open(config_path, "w") as f: + json.dump(config_data, f) + + # Create scheduler with patched home path + with patch("pathlib.Path.home") as mock_home: + mock_home.return_value = Path(self.temp_dir) + with patch("pathlib.Path.exists", return_value=True): + # Manually load + scheduler = SecurityScheduler.__new__(SecurityScheduler) + scheduler.config_path = config_path + scheduler.schedules = {} + scheduler._load_schedules() + + self.assertIn("loaded_schedule", scheduler.schedules) + schedule = scheduler.schedules["loaded_schedule"] + self.assertEqual(schedule.frequency, ScheduleFrequency.MONTHLY) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_vulnerability_scanner.py b/tests/test_vulnerability_scanner.py new file mode 100644 index 00000000..a787306e --- /dev/null +++ b/tests/test_vulnerability_scanner.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python3 +""" +Tests for Vulnerability Scanner Module +""" + +import os +import tempfile +import unittest +from datetime import datetime +from unittest.mock import MagicMock, patch + +from cortex.vulnerability_scanner import ( + ScanResult, + Severity, + Vulnerability, + VulnerabilityScanner, +) + + +class TestSeverityEnum(unittest.TestCase): + """Test cases for Severity enum""" + + def test_severity_values(self): + """Test severity enum has correct values""" + self.assertEqual(Severity.CRITICAL.value, "critical") + self.assertEqual(Severity.HIGH.value, "high") + self.assertEqual(Severity.MEDIUM.value, "medium") + self.assertEqual(Severity.LOW.value, "low") + self.assertEqual(Severity.UNKNOWN.value, "unknown") + + def test_severity_from_string(self): + """Test creating severity from string""" + self.assertEqual(Severity("critical"), Severity.CRITICAL) + self.assertEqual(Severity("high"), Severity.HIGH) + + +class TestVulnerability(unittest.TestCase): + """Test cases for Vulnerability dataclass""" + + def test_vulnerability_creation(self): + """Test creating vulnerability object""" + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.HIGH, + description="Test vulnerability", + ) + + self.assertEqual(vuln.cve_id, "CVE-2023-12345") + self.assertEqual(vuln.package_name, "nginx") + self.assertEqual(vuln.severity, Severity.HIGH) + self.assertIsNotNone(vuln.references) + self.assertEqual(len(vuln.references), 0) + + def test_vulnerability_with_optional_fields(self): + """Test vulnerability with optional fields""" + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.CRITICAL, + description="Test vulnerability", + fixed_version="1.20.0", + cvss_score=9.8, + source="nvd", + references=["https://example.com"], + ) + + self.assertEqual(vuln.fixed_version, "1.20.0") + self.assertEqual(vuln.cvss_score, 9.8) + self.assertEqual(vuln.source, "nvd") + self.assertEqual(len(vuln.references), 1) + + +class TestScanResult(unittest.TestCase): + """Test cases for ScanResult dataclass""" + + def test_scan_result_creation(self): + """Test creating scan result object""" + result = ScanResult( + scan_id="scan_123", + timestamp="2024-01-01T00:00:00", + total_packages_scanned=100, + vulnerabilities_found=5, + critical_count=1, + high_count=2, + medium_count=1, + low_count=1, + vulnerabilities=[], + scan_duration_seconds=10.5, + ) + + self.assertEqual(result.scan_id, "scan_123") + self.assertEqual(result.vulnerabilities_found, 5) + self.assertEqual(result.critical_count, 1) + self.assertIsNotNone(result.errors) + + +class TestVulnerabilityScanner(unittest.TestCase): + """Test cases for VulnerabilityScanner""" + + def setUp(self): + """Set up test fixtures""" + self.temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + self.temp_db.close() + self.scanner = VulnerabilityScanner(db_path=self.temp_db.name) + + def tearDown(self): + """Clean up temporary files""" + if os.path.exists(self.temp_db.name): + os.unlink(self.temp_db.name) + + def test_database_initialization(self): + """Test database is created properly""" + self.assertTrue(os.path.exists(self.temp_db.name)) + + def test_vulnerability_to_dict(self): + """Test converting vulnerability to dict""" + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="nginx", + installed_version="1.18.0", + affected_versions="< 1.20.0", + severity=Severity.HIGH, + description="Test vulnerability", + cvss_score=7.5, + ) + + vuln_dict = self.scanner._vulnerability_to_dict(vuln) + + self.assertEqual(vuln_dict["cve_id"], "CVE-2023-12345") + self.assertEqual(vuln_dict["severity"], "high") # Should be string, not enum + self.assertEqual(vuln_dict["cvss_score"], 7.5) + + def test_dict_to_vulnerability(self): + """Test converting dict to vulnerability""" + vuln_dict = { + "cve_id": "CVE-2023-12345", + "package_name": "nginx", + "installed_version": "1.18.0", + "affected_versions": "< 1.20.0", + "severity": "high", + "description": "Test vulnerability", + } + + vuln = self.scanner._dict_to_vulnerability(vuln_dict) + + self.assertEqual(vuln.cve_id, "CVE-2023-12345") + self.assertEqual(vuln.severity, Severity.HIGH) + + def test_dict_to_vulnerability_unknown_severity(self): + """Test converting dict with unknown severity""" + vuln_dict = { + "cve_id": "CVE-2023-12345", + "package_name": "test", + "installed_version": "1.0", + "affected_versions": "all", + "severity": "invalid_severity", + "description": "Test", + } + + vuln = self.scanner._dict_to_vulnerability(vuln_dict) + + self.assertEqual(vuln.severity, Severity.UNKNOWN) + + def test_parse_cvss_vector_critical(self): + """Test parsing CVSS vector for critical severity""" + # CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H = ~9.8 (Critical) + vector = "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + score = self.scanner._parse_cvss_vector(vector) + + self.assertIsNotNone(score) + self.assertGreaterEqual(score, 9.0) + + def test_parse_cvss_vector_low(self): + """Test parsing CVSS vector for low severity""" + # CVSS:3.1/AV:P/AC:H/PR:H/UI:R/S:U/C:N/I:N/A:L = low + vector = "CVSS:3.1/AV:P/AC:H/PR:H/UI:R/S:U/C:N/I:N/A:L" + score = self.scanner._parse_cvss_vector(vector) + + self.assertIsNotNone(score) + self.assertLess(score, 4.0) + + def test_parse_cvss_vector_invalid(self): + """Test parsing invalid CVSS vector""" + score = self.scanner._parse_cvss_vector("not a cvss vector") + self.assertIsNone(score) + + score = self.scanner._parse_cvss_vector("") + self.assertIsNone(score) + + score = self.scanner._parse_cvss_vector(None) + self.assertIsNone(score) + + @patch("subprocess.run") + def test_get_installed_packages(self, mock_run): + """Test getting installed packages""" + mock_run.return_value = MagicMock( + returncode=0, stdout="package1|1.0.0\npackage2|2.0.0\npackage3|3.0.0\n" + ) + + packages = self.scanner._get_installed_packages() + + self.assertEqual(len(packages), 3) + self.assertEqual(packages["package1"], "1.0.0") + self.assertEqual(packages["package2"], "2.0.0") + + def test_cache_save_and_check(self): + """Test saving and retrieving from cache""" + vuln = Vulnerability( + cve_id="CVE-2023-12345", + package_name="test-pkg", + installed_version="1.0.0", + affected_versions="< 2.0.0", + severity=Severity.HIGH, + description="Test vulnerability", + ) + + # Save to cache + self.scanner._save_cache("test-pkg", "1.0.0", [vuln]) + + # Check cache + cached = self.scanner._check_cache("test-pkg", "1.0.0") + + self.assertIsNotNone(cached) + self.assertEqual(len(cached), 1) + self.assertEqual(cached[0].cve_id, "CVE-2023-12345") + self.assertEqual(cached[0].severity, Severity.HIGH) + + def test_cache_returns_none_for_uncached(self): + """Test cache returns None for uncached packages""" + cached = self.scanner._check_cache("nonexistent-pkg", "1.0.0") + self.assertIsNone(cached) + + def test_save_scan_history(self): + """Test saving scan history""" + result = ScanResult( + scan_id="test_scan_123", + timestamp=datetime.now().isoformat(), + total_packages_scanned=10, + vulnerabilities_found=2, + critical_count=1, + high_count=1, + medium_count=0, + low_count=0, + vulnerabilities=[], + scan_duration_seconds=5.0, + ) + + self.scanner._save_scan_history(result) + + # Retrieve history + history = self.scanner.get_scan_history(limit=1) + + self.assertEqual(len(history), 1) + self.assertEqual(history[0].scan_id, "test_scan_123") + + def test_get_scan_history_empty(self): + """Test getting empty scan history""" + history = self.scanner.get_scan_history(limit=10) + self.assertEqual(len(history), 0) + + def test_get_critical_vulnerabilities(self): + """Test getting critical vulnerabilities from history""" + # Create and save a scan with critical vulnerability + critical_vuln = Vulnerability( + cve_id="CVE-CRITICAL", + package_name="test-pkg", + installed_version="1.0", + affected_versions="all", + severity=Severity.CRITICAL, + description="Critical vulnerability", + ) + + high_vuln = Vulnerability( + cve_id="CVE-HIGH", + package_name="test-pkg", + installed_version="1.0", + affected_versions="all", + severity=Severity.HIGH, + description="High vulnerability", + ) + + result = ScanResult( + scan_id="test_critical", + timestamp=datetime.now().isoformat(), + total_packages_scanned=1, + vulnerabilities_found=2, + critical_count=1, + high_count=1, + medium_count=0, + low_count=0, + vulnerabilities=[critical_vuln, high_vuln], + scan_duration_seconds=1.0, + ) + + self.scanner._save_scan_history(result) + + # Get critical vulnerabilities + critical = self.scanner.get_critical_vulnerabilities(days=30) + + self.assertEqual(len(critical), 1) + self.assertEqual(critical[0].cve_id, "CVE-CRITICAL") + self.assertEqual(critical[0].severity, Severity.CRITICAL) + + @patch("requests.post") + def test_query_osv_success(self, mock_post): + """Test successful OSV query""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "vulns": [ + { + "id": "CVE-2023-12345", + "summary": "Test vulnerability", + "severity": [{"type": "CVSS_V3", "score": 7.5}], + } + ] + } + mock_post.return_value = mock_response + + vulns = self.scanner._query_osv("test-pkg", "1.0.0") + + self.assertEqual(len(vulns), 1) + self.assertEqual(vulns[0].cve_id, "CVE-2023-12345") + + @patch("requests.post") + def test_query_osv_no_vulns(self, mock_post): + """Test OSV query with no vulnerabilities""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {} + mock_post.return_value = mock_response + + vulns = self.scanner._query_osv("safe-pkg", "1.0.0") + + self.assertEqual(len(vulns), 0) + + @patch("requests.post") + def test_query_osv_network_error(self, mock_post): + """Test OSV query handles network errors""" + import requests + + mock_post.side_effect = requests.RequestException("Network error") + + vulns = self.scanner._query_osv("test-pkg", "1.0.0") + + self.assertEqual(len(vulns), 0) # Should return empty list on error + + @patch.object(VulnerabilityScanner, "_check_cache") + @patch.object(VulnerabilityScanner, "_query_osv") + def test_scan_package_uses_cache(self, mock_query, mock_cache): + """Test scan_package uses cache when available""" + cached_vuln = Vulnerability( + cve_id="CVE-CACHED", + package_name="test-pkg", + installed_version="1.0", + affected_versions="all", + severity=Severity.MEDIUM, + description="Cached vulnerability", + ) + mock_cache.return_value = [cached_vuln] + + vulns = self.scanner.scan_package("test-pkg", "1.0") + + self.assertEqual(len(vulns), 1) + self.assertEqual(vulns[0].cve_id, "CVE-CACHED") + mock_query.assert_not_called() # Should not query if cached + + @patch.object(VulnerabilityScanner, "_check_cache") + @patch.object(VulnerabilityScanner, "_query_osv") + @patch.object(VulnerabilityScanner, "_save_cache") + def test_scan_package_queries_when_not_cached(self, mock_save, mock_query, mock_cache): + """Test scan_package queries API when not cached""" + mock_cache.return_value = None + mock_query.return_value = [ + Vulnerability( + cve_id="CVE-NEW", + package_name="test-pkg", + installed_version="1.0", + affected_versions="all", + severity=Severity.HIGH, + description="New vulnerability", + ) + ] + + vulns = self.scanner.scan_package("test-pkg", "1.0") + + self.assertEqual(len(vulns), 1) + mock_query.assert_called_once() + mock_save.assert_called_once() + + +class TestVulnerabilityScannerRateLimiting(unittest.TestCase): + """Test rate limiting functionality""" + + def setUp(self): + self.temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + self.temp_db.close() + self.scanner = VulnerabilityScanner(db_path=self.temp_db.name) + + def tearDown(self): + if os.path.exists(self.temp_db.name): + os.unlink(self.temp_db.name) + + def test_rate_limit_enforces_delay(self): + """Test rate limiting enforces minimum delay""" + import time + + self.scanner.min_api_interval = 0.1 # 100ms for testing + self.scanner.last_api_call = time.time() + + start = time.time() + self.scanner._rate_limit() + elapsed = time.time() - start + + # Should have waited at least some time + self.assertGreaterEqual(elapsed, 0.05) + + +class TestPackageNameValidation(unittest.TestCase): + """Test package name and version validation""" + + def setUp(self): + self.temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + self.temp_db.close() + self.scanner = VulnerabilityScanner(db_path=self.temp_db.name) + + def tearDown(self): + if os.path.exists(self.temp_db.name): + os.unlink(self.temp_db.name) + + def test_valid_package_names(self): + """Test valid Debian package names are accepted""" + valid_names = [ + "nginx", + "python3", + "libc6", + "libssl1.1", + "g++", + "apt-utils", + "linux-image-5.15.0-generic", + "ca-certificates", + ] + for name in valid_names: + self.assertTrue( + self.scanner._is_valid_package_name(name), + f"Expected {name!r} to be valid", + ) + + def test_invalid_package_names(self): + """Test invalid package names are rejected""" + invalid_names = [ + "", # Empty + "a", # Too short + "A", # Uppercase not allowed at start + "Nginx", # Uppercase + "package name", # Space + "pkg;rm -rf /", # Shell injection + "pkg$(whoami)", # Command substitution + "pkg`id`", # Backtick injection + "../etc/passwd", # Path traversal + "pkg\x00null", # Null byte + "a" * 200, # Too long + ] + for name in invalid_names: + self.assertFalse( + self.scanner._is_valid_package_name(name), + f"Expected {name!r} to be invalid", + ) + + def test_valid_version_strings(self): + """Test valid version strings are accepted""" + valid_versions = [ + "1.0", + "1.2.3", + "1:2.3.4-5", + "1.0~beta1", + "2.0+dfsg-1", + "5.15.0-100-generic", + ] + for version in valid_versions: + self.assertTrue( + self.scanner._is_valid_version_string(version), + f"Expected version {version!r} to be valid", + ) + + def test_invalid_version_strings(self): + """Test invalid version strings are rejected""" + invalid_versions = [ + "", # Empty + "1.0; rm -rf /", # Shell injection + "$(whoami)", # Command substitution + "1.0\n2.0", # Newline + "a" * 300, # Too long + ] + for version in invalid_versions: + self.assertFalse( + self.scanner._is_valid_version_string(version), + f"Expected version {version!r} to be invalid", + ) + + @patch("subprocess.run") + def test_get_installed_packages_filters_invalid(self, mock_run): + """Test that invalid package names are filtered out""" + mock_run.return_value = MagicMock( + returncode=0, + stdout="valid-pkg|1.0.0\nInvalidPkg|2.0.0\n;malicious|3.0.0\ngood-pkg|4.0.0\n", + ) + + packages = self.scanner._get_installed_packages() + + # Should only include valid packages + self.assertIn("valid-pkg", packages) + self.assertIn("good-pkg", packages) + self.assertNotIn("InvalidPkg", packages) # Uppercase + self.assertNotIn(";malicious", packages) # Invalid characters + + +if __name__ == "__main__": + unittest.main()