diff --git a/cortex/__init__.py b/cortex/__init__.py index dcf98a77..0639dc6d 100644 --- a/cortex/__init__.py +++ b/cortex/__init__.py @@ -1,7 +1,22 @@ from .cli import main from .env_loader import load_env from .packages import PackageManager, PackageManagerType +from .unified_package_manager import ( + PackageFormat, + PackageInfo, + StorageAnalysis, + UnifiedPackageManager, +) __version__ = "0.1.0" -__all__ = ["main", "load_env", "PackageManager", "PackageManagerType"] +__all__ = [ + "main", + "load_env", + "PackageManager", + "PackageManagerType", + "UnifiedPackageManager", + "PackageFormat", + "PackageInfo", + "StorageAnalysis", +] diff --git a/cortex/cli.py b/cortex/cli.py index b1cfe4a1..ddbe6c9c 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -44,6 +44,7 @@ if TYPE_CHECKING: from cortex.shell_env_analyzer import ShellEnvironmentAnalyzer + from cortex.unified_package_manager import UnifiedPackageManager # Suppress noisy log messages in normal operation logging.getLogger("httpx").setLevel(logging.WARNING) @@ -2825,6 +2826,256 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: console.print(f"Error: {result.error_message}", style="red") return 1 + # --- Unified Package Manager Commands (Issue #450) --- + def pkg(self, args: argparse.Namespace) -> int: + """Handle unified package manager commands for snap/flatpak/deb.""" + from cortex.unified_package_manager import ( + PackageFormat, + UnifiedPackageManager, + ) + + upm = UnifiedPackageManager() + action = getattr(args, "pkg_action", None) + + if not action: + cx_print("\n📦 Unified Package Manager - Snap/Flatpak/Deb\n", "info") + console.print("Usage: cortex pkg [options]") + console.print("\nCommands:") + console.print(" sources Show available sources for a package") + console.print(" compare Compare package across formats") + console.print(" list [--format FORMAT] List installed packages") + console.print(" permissions Show package permissions") + console.print(" storage Analyze storage by format") + console.print(" snap-redirects [--disable] Check/disable snap redirects") + return 0 + + if action == "sources": + return self._pkg_sources(upm, args) + elif action == "compare": + return self._pkg_compare(upm, args) + elif action == "list": + return self._pkg_list(upm, args) + elif action == "permissions": + return self._pkg_permissions(upm, args) + elif action == "storage": + return self._pkg_storage(upm) + elif action == "snap-redirects": + return self._pkg_snap_redirects(upm, args) + else: + self._print_error(f"Unknown pkg action: {action}") + return 1 + + def _pkg_sources(self, upm: "UnifiedPackageManager", args: argparse.Namespace) -> int: + """Show available sources for a package.""" + package = args.package + cx_print(f"\n🔍 Checking sources for '{package}'...\n", "info") + + sources = upm.detect_package_sources(package) + + found_any = False + for format_name, info in sources.items(): + if info is not None: + found_any = True + status = "[green]✓ Installed[/green]" if info.installed else "[dim]Available[/dim]" + console.print(f" [{format_name.upper()}] {status}") + console.print(f" Version: {info.version or 'N/A'}") + if info.description: + console.print(f" {info.description[:60]}...") + + if not found_any: + cx_print(f"No sources found for '{package}'", "warning") + + return 0 + + def _pkg_compare(self, upm, args: argparse.Namespace) -> int: + """Compare package across formats.""" + package = args.package + cx_print(f"\n📊 Comparing '{package}' across formats...\n", "info") + + comparison = upm.compare_package_options(package) + + if not comparison["available_formats"]: + cx_print(f"Package '{package}' not found in any format", "warning") + return 1 + + console.print(f"[bold]Package:[/bold] {comparison['package_name']}") + if comparison["installed_as"]: + console.print(f"[bold]Installed as:[/bold] {comparison['installed_as']}") + console.print(f"[bold]Available in:[/bold] {', '.join(comparison['available_formats'])}") + console.print() + + # Show comparison table + for fmt, data in comparison["comparison"].items(): + status = "[green]Installed[/green]" if data["installed"] else "[dim]Not installed[/dim]" + console.print(f"[cyan]{fmt.upper()}[/cyan]: {status}") + console.print(f" Version: {data['version'] or 'N/A'}") + if data["size"]: + size_mb = data["size"] / (1024 * 1024) + console.print(f" Size: {size_mb:.1f} MB") + + return 0 + + def _pkg_list(self, upm, args: argparse.Namespace) -> int: + """List installed packages by format.""" + from cortex.unified_package_manager import PackageFormat + + format_filter = None + if hasattr(args, "format") and args.format: + format_map = { + "deb": PackageFormat.DEB, + "snap": PackageFormat.SNAP, + "flatpak": PackageFormat.FLATPAK, + } + format_filter = format_map.get(args.format.lower()) + + cx_print("\n📦 Installed Packages\n", "info") + + packages = upm.list_installed_packages(format_filter) + + for fmt, pkgs in packages.items(): + if pkgs: + console.print(f"\n[cyan][{fmt.upper()}][/cyan] ({len(pkgs)} packages)") + for pkg in pkgs[:10]: # Show top 10 + console.print(f" • {pkg.name} ({pkg.version})") + if len(pkgs) > 10: + console.print(f" ... and {len(pkgs) - 10} more") + + return 0 + + def _pkg_permissions(self, upm, args: argparse.Namespace) -> int: + """Show package permissions.""" + package = args.package + fmt = getattr(args, "format", None) + + cx_print(f"\n🔐 Permissions for '{package}'\n", "info") + + # Determine format: explicit flag > detect from installed packages + if fmt is None: + # Check where package is actually installed + sources = upm.detect_package_sources(package) + if sources.get("flatpak") and sources["flatpak"].installed: + fmt = "flatpak" + elif sources.get("snap") and sources["snap"].installed: + fmt = "snap" + else: + fmt = "flatpak" if "." in package and package.count(".") >= 2 else "snap" + + if fmt == "flatpak": + try: + perms = upm.list_flatpak_permissions(package) + except RuntimeError as e: + self._print_error(str(e)) + return 1 + + for section, values in perms.items(): + console.print(f"[cyan][{section}][/cyan]") + if isinstance(values, dict): + for k, v in values.items(): + console.print(f" {k} = {v}") + elif isinstance(values, list): + for v in values: + console.print(f" {v}") + else: + # Snap + try: + perms = upm.list_snap_permissions(package) + except RuntimeError as e: + self._print_error(str(e)) + return 1 + + if perms.get("connected"): + console.print("[green]Connected Interfaces:[/green]") + for conn in perms["connected"]: + console.print(f" • {conn['interface']}: {conn['plug']} → {conn['slot']}") + + if perms.get("available"): + console.print("\n[dim]Available (not connected):[/dim]") + for avail in perms["available"]: + console.print(f" • {avail['interface']}") + + return 0 + + def _pkg_storage(self, upm) -> int: + """Analyze storage by package format.""" + cx_print("\n💾 Analyzing storage...\n", "info") + + analysis = upm.analyze_storage() + output = upm.format_storage_analysis(analysis) + console.print(output) + + return 0 + + def _pkg_snap_redirects(self, upm, args: argparse.Namespace) -> int: + """Check or disable snap redirects.""" + disable = getattr(args, "disable", False) + + if disable: + # Show warning and get confirmation + console.print("\n[yellow]⚠️ This operation will modify system configuration:[/yellow]") + console.print(" File: /etc/apt/apt.conf.d/20snapd.conf") + console.print(" Action: Move to .disabled (backup created)") + console.print("\n[dim]This requires elevated privileges (sudo).[/dim]\n") + + try: + confirm = input("Proceed with disabling snap redirects? [y/N]: ").strip().lower() + if confirm not in ("y", "yes"): + cx_print("Operation cancelled", "info") + return 0 + except (EOFError, KeyboardInterrupt): + console.print() + cx_print("Operation cancelled", "info") + return 0 + + cx_print("\n⚠️ Disabling snap redirects...\n", "warning") + + # Record to installation history + from datetime import datetime + + history = InstallationHistory() + start_time = datetime.now() + + install_id = None + try: + install_id = history.record_installation( + operation_type=InstallationType.REMOVE, + packages=["snap-redirects"], + commands=["disable_snap_redirects"], + start_time=start_time, + ) + + success, message = upm.disable_snap_redirects() + + if success: + history.update_installation(install_id, InstallationStatus.SUCCESS) + cx_print(message, "success") + else: + history.update_installation(install_id, InstallationStatus.FAILED, message) + self._print_error(message) + + return 0 if success else 1 + + except Exception as e: + if install_id is not None: + history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + self._print_error(f"Failed to disable snap redirects: {e}") + return 1 + + cx_print("\n🔍 Checking for snap redirects...\n", "info") + redirects = upm.check_snap_redirects() + + if not redirects: + cx_print("No snap redirects detected", "success") + return 0 + + console.print(f"Found {len(redirects)} potential snap redirects:\n") + for redirect in redirects: + console.print(f" [yellow]•[/yellow] {redirect['package']}") + console.print(f" Type: {redirect['type']}") + console.print(f" {redirect['reason']}") + + console.print("\n[dim]To disable: cortex pkg snap-redirects --disable[/dim]") + return 0 + # -------------------------- @@ -3214,6 +3465,49 @@ def main(): sandbox_exec_parser.add_argument("cmd", nargs="+", help="Command to execute") # -------------------------- + # --- Unified Package Manager Commands (Issue #450) --- + pkg_parser = subparsers.add_parser("pkg", help="Unified package manager (snap/flatpak/deb)") + pkg_subs = pkg_parser.add_subparsers(dest="pkg_action", help="Package manager actions") + + # pkg sources + pkg_sources_parser = pkg_subs.add_parser("sources", help="Show available sources for a package") + pkg_sources_parser.add_argument("package", help="Package name to search for") + + # pkg compare + pkg_compare_parser = pkg_subs.add_parser("compare", help="Compare package across formats") + pkg_compare_parser.add_argument("package", help="Package name to compare") + + # pkg list [--format FORMAT] + pkg_list_parser = pkg_subs.add_parser("list", help="List installed packages") + pkg_list_parser.add_argument( + "--format", + choices=["deb", "snap", "flatpak"], + help="Filter by package format", + ) + + # pkg permissions [--format FORMAT] + pkg_permissions_parser = pkg_subs.add_parser("permissions", help="Show package permissions") + pkg_permissions_parser.add_argument("package", help="Package name or app ID") + pkg_permissions_parser.add_argument( + "--format", + choices=["snap", "flatpak"], + help="Package format (auto-detected if not specified)", + ) + + # pkg storage + pkg_subs.add_parser("storage", help="Analyze storage usage by package format") + + # pkg snap-redirects [--disable] + pkg_redirects_parser = pkg_subs.add_parser( + "snap-redirects", help="Check or disable snap redirects" + ) + pkg_redirects_parser.add_argument( + "--disable", + action="store_true", + help="Disable snap redirects (requires sudo)", + ) + # -------------------------- + # --- Environment Variable Management Commands --- env_parser = subparsers.add_parser("env", help="Manage environment variables") env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions") @@ -3623,11 +3917,14 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command == "pkg": + return cli.pkg(args) elif args.command == "upgrade": from cortex.licensing import open_upgrade_page open_upgrade_page() return 0 + elif args.command == "license": from cortex.licensing import show_license_status diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py new file mode 100644 index 00000000..860da099 --- /dev/null +++ b/cortex/unified_package_manager.py @@ -0,0 +1,865 @@ +#!/usr/bin/env python3 +""" +Unified Package Manager for Cortex Linux + +Provides a unified interface for managing packages across multiple formats: +- APT/DEB (traditional Debian packages) +- Snap (Canonical's universal packages) +- Flatpak (cross-distribution application packages) + +""" + +import logging +import os +import re +import shutil +import subprocess +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + + +class PackageFormat(Enum): + """Supported package formats.""" + + DEB = "deb" + SNAP = "snap" + FLATPAK = "flatpak" + + +@dataclass +class PackageInfo: + """Information about a package in a specific format.""" + + name: str + format: PackageFormat + version: str = "" + size: int = 0 # Size in bytes + installed: bool = False + description: str = "" + permissions: list[str] = field(default_factory=list) + + +@dataclass +class StorageAnalysis: + """Storage usage analysis by package format.""" + + deb_total: int = 0 + snap_total: int = 0 + flatpak_total: int = 0 + deb_packages: list[tuple[str, int]] = field(default_factory=list) + snap_packages: list[tuple[str, int]] = field(default_factory=list) + flatpak_packages: list[tuple[str, int]] = field(default_factory=list) + + +class UnifiedPackageManager: + """ + Unified manager for APT, Snap, and Flatpak packages. + + Provides transparency about package sources and enables users to: + - See true package source (deb vs snap vs flatpak) + - Compare package options across formats + - Manage permissions for sandboxed packages + - Detect and optionally disable snap redirects + - Analyze storage usage by format + """ + + # Known transitional packages that redirect to snap + KNOWN_SNAP_REDIRECTS = { + "firefox", + "chromium-browser", + "thunderbird", + "libreoffice", + "gnome-calculator", + "gnome-characters", + "gnome-logs", + "gnome-system-monitor", + } + + SNAP_REDIRECT_CONFIG = "/etc/apt/apt.conf.d/20snapd.conf" + + def __init__(self) -> None: + """Initialize the unified package manager.""" + self._snap_available = self._check_command_available("snap") + self._flatpak_available = self._check_command_available("flatpak") + self._apt_available = self._check_command_available("apt") + + def _check_command_available(self, command: str) -> bool: + """Check if a command is available on the system.""" + return shutil.which(command) is not None + + def _run_command(self, cmd: list[str], timeout: int = 30) -> tuple[bool, str, str]: + """ + Run a shell command and return success status, stdout, stderr. + + Args: + cmd: Command to run as list of arguments + timeout: Timeout in seconds + + Returns: + Tuple of (success, stdout, stderr) + """ + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + ) + return result.returncode == 0, result.stdout, result.stderr + except subprocess.TimeoutExpired: + logger.warning(f"Command timed out: {' '.join(cmd)}") + return False, "", "Command timed out" + except FileNotFoundError: + logger.warning(f"Command not found: {cmd[0]}") + return False, "", f"Command not found: {cmd[0]}" + except Exception as e: + logger.error(f"Error running command {cmd}: {e}") + return False, "", str(e) + + # ========================================================================= + # Package Source Detection + # ========================================================================= + + def detect_package_sources(self, package_name: str) -> dict[str, PackageInfo | None]: + """ + Detect all available sources for a package. + + Args: + package_name: Name of the package to search for + + Returns: + Dictionary with keys 'deb', 'snap', 'flatpak' containing PackageInfo or None + """ + return { + "deb": self._check_deb_package(package_name), + "snap": self._check_snap_package(package_name), + "flatpak": self._check_flatpak_package(package_name), + } + + def _check_deb_package(self, package_name: str) -> PackageInfo | None: + """Check if package is available as .deb.""" + if not self._apt_available: + return None + + # First check if installed + success, stdout, _ = self._run_command( + ["dpkg-query", "-W", "-f=${Status}|${Version}|${Installed-Size}", package_name] + ) + + if success and "install ok installed" in stdout: + parts = stdout.strip().split("|", 2) + version = parts[1] if len(parts) > 1 else "" + size = int(parts[2]) * 1024 if len(parts) > 2 and parts[2].isdigit() else 0 + return PackageInfo( + name=package_name, + format=PackageFormat.DEB, + version=version, + size=size, + installed=True, + ) + + # Check if available in apt cache + success, stdout, _ = self._run_command(["apt-cache", "show", package_name]) + if success and stdout: + # Check if this is a transitional/dummy package + if "dummy" in stdout.lower() or "transitional" in stdout.lower(): + # This is likely a snap redirect + return None + + version = "" + size = 0 + description = "" + for line in stdout.split("\n"): + if line.startswith("Version:"): + version = line.split(":", 1)[1].strip() + elif line.startswith("Installed-Size:"): + size_str = line.split(":", 1)[1].strip() + try: + size = int(size_str) * 1024 + except ValueError: + size = 0 + elif line.startswith("Description:"): + description = line.split(":", 1)[1].strip() + + return PackageInfo( + name=package_name, + format=PackageFormat.DEB, + version=version, + size=size, + installed=False, + description=description, + ) + + return None + + def _check_snap_package(self, package_name: str) -> PackageInfo | None: + """Check if package is available as snap.""" + if not self._snap_available: + return None + + # Check if installed + success, stdout, _ = self._run_command(["snap", "list", package_name]) + if success: + lines = stdout.strip().split("\n") + for line in lines[1:]: + if not line: + continue + parts = line.split() + if parts and parts[0] == package_name: + version = parts[1] if len(parts) > 1 else "" + return PackageInfo( + name=package_name, + format=PackageFormat.SNAP, + version=version, + installed=True, + ) + + # Check if available in snap store + success, stdout, _ = self._run_command(["snap", "info", package_name]) + if success and stdout: + version = "" + description = "" + for line in stdout.split("\n"): + if line.startswith("stable:"): + version = line.split()[1] if len(line.split()) > 1 else "" + elif line.startswith("summary:"): + description = line.split(":", 1)[1].strip() + + return PackageInfo( + name=package_name, + format=PackageFormat.SNAP, + version=version, + installed=False, + description=description, + ) + + return None + + def _check_flatpak_package(self, package_name: str) -> PackageInfo | None: + """ + Check if package is available as flatpak. + + Note: Returns only the first match if multiple flatpaks match the search term. + For full search results, use `flatpak search ` directly. + """ + if not self._flatpak_available: + return None + + # Check if installed (match by full app ID or app basename) + success, stdout, _ = self._run_command( + ["flatpak", "list", "--app", "--columns=application,version"] + ) + if success: + for line in stdout.strip().split("\n"): + if not line: + continue + parts = line.split("\t") + app_id = parts[0].strip() if parts else "" + if not app_id: + continue + # Match exact app ID or exact basename (e.g., "firefox" matches "org.mozilla.firefox") + app_basename = app_id.split(".")[-1].lower() + if app_id.lower() == package_name.lower() or app_basename == package_name.lower(): + version = parts[1].strip() if len(parts) > 1 else "" + return PackageInfo( + name=app_id, + format=PackageFormat.FLATPAK, + version=version, + installed=True, + ) + + # Search in remote repos + success, stdout, _ = self._run_command( + ["flatpak", "search", package_name, "--columns=application,version,description"] + ) + if success and stdout: + lines = stdout.strip().split("\n") + for line in lines: + # Skip empty lines and "No matches" messages + if not line or "no matches" in line.lower() or "no results" in line.lower(): + continue + parts = line.split("\t") + # Validate: app_id should look like org.something.App + app_id = parts[0] if parts else "" + if app_id and "." in app_id: + version = parts[1] if len(parts) > 1 else "" + description = parts[2] if len(parts) > 2 else "" + return PackageInfo( + name=app_id, + format=PackageFormat.FLATPAK, + version=version, + installed=False, + description=description, + ) + + return None + + # ========================================================================= + # Installed Package Listing + # ========================================================================= + + def list_installed_packages( + self, format_filter: PackageFormat | None = None + ) -> dict[str, list[PackageInfo]]: + """ + List all installed packages, optionally filtered by format. + + Args: + format_filter: Optional filter to show only packages of specific format + + Returns: + Dictionary with keys 'deb', 'snap', 'flatpak' containing lists of PackageInfo + """ + result: dict[str, list[PackageInfo]] = {"deb": [], "snap": [], "flatpak": []} + + if format_filter is None or format_filter == PackageFormat.DEB: + result["deb"] = self._list_deb_packages() + + if format_filter is None or format_filter == PackageFormat.SNAP: + result["snap"] = self._list_snap_packages() + + if format_filter is None or format_filter == PackageFormat.FLATPAK: + result["flatpak"] = self._list_flatpak_packages() + + return result + + def _list_deb_packages(self) -> list[PackageInfo]: + """List installed .deb packages.""" + if not self._apt_available: + return [] + + success, stdout, _ = self._run_command( + ["dpkg-query", "-W", "-f=${Package}|${Version}|${Installed-Size}\n"] + ) + + packages = [] + if success: + for line in stdout.strip().split("\n"): + if not line: + continue + parts = line.split("|") + name = parts[0] + version = parts[1] if len(parts) > 1 else "" + size = int(parts[2]) * 1024 if len(parts) > 2 and parts[2].isdigit() else 0 + packages.append( + PackageInfo( + name=name, + format=PackageFormat.DEB, + version=version, + size=size, + installed=True, + ) + ) + + return packages + + def _list_snap_packages(self) -> list[PackageInfo]: + """List installed snap packages.""" + if not self._snap_available: + return [] + + success, stdout, _ = self._run_command(["snap", "list"]) + + packages = [] + if success: + lines = stdout.strip().split("\n") + for line in lines[1:]: # Skip header + if not line: + continue + parts = line.split() + if len(parts) >= 2: + name = parts[0] + version = parts[1] + # Parse size from notes column if available + packages.append( + PackageInfo( + name=name, + format=PackageFormat.SNAP, + version=version, + installed=True, + ) + ) + + return packages + + def _list_flatpak_packages(self) -> list[PackageInfo]: + """List installed flatpak packages.""" + if not self._flatpak_available: + return [] + + success, stdout, _ = self._run_command( + ["flatpak", "list", "--app", "--columns=application,version,size"] + ) + + packages = [] + if success: + for line in stdout.strip().split("\n"): + if not line: + continue + parts = line.split("\t") + app_id = parts[0] if parts else "" + version = parts[1] if len(parts) > 1 else "" + size_str = parts[2] if len(parts) > 2 else "0" + # Parse size (e.g., "1.2 GB" -> bytes) + size = self._parse_size_string(size_str) + packages.append( + PackageInfo( + name=app_id, + format=PackageFormat.FLATPAK, + version=version, + size=size, + installed=True, + ) + ) + + return packages + + def _parse_size_string(self, size_str: str) -> int: + """Parse a human-readable size string to bytes.""" + size_str = size_str.strip() + if not size_str: + return 0 + + multipliers = { + "B": 1, + "KB": 1024, + "MB": 1024**2, + "GB": 1024**3, + "kB": 1024, + "mB": 1024**2, + "gB": 1024**3, + } + + match = re.match(r"([\d.]+)\s*([A-Za-z]+)", size_str) + if match: + value = float(match.group(1)) + unit = match.group(2) + return int(value * multipliers.get(unit, 1)) + + return 0 + + # ========================================================================= + # Package Comparison + # ========================================================================= + + def compare_package_options(self, package_name: str) -> dict[str, Any]: + """ + Compare a package across all available formats. + + Args: + package_name: Name of the package to compare + + Returns: + Dictionary with comparison data including versions, sizes, permissions + """ + sources = self.detect_package_sources(package_name) + + comparison = { + "package_name": package_name, + "available_formats": [], + "installed_as": None, + "comparison": {}, + } + + for format_name, info in sources.items(): + if info is not None: + comparison["available_formats"].append(format_name) + comparison["comparison"][format_name] = { + "version": info.version, + "size": info.size, + "installed": info.installed, + "description": info.description, + } + if info.installed: + comparison["installed_as"] = format_name + + return comparison + + # ========================================================================= + # Permission Management + # ========================================================================= + + def list_snap_permissions(self, snap_name: str) -> dict[str, list[dict[str, str]]]: + """ + List permissions/interfaces for a snap package. + + Args: + snap_name: Name of the snap package + + Returns: + Dictionary with 'connected' and 'available' interface lists + """ + if not self._snap_available: + raise RuntimeError("Snap is not available on this system") + + success, stdout, _ = self._run_command(["snap", "connections", snap_name]) + + result: dict[str, list[dict[str, str]]] = {"connected": [], "available": []} + + if success: + lines = stdout.strip().split("\n") + for line in lines[1:]: # Skip header + if not line: + continue + parts = line.split() + if len(parts) >= 4: + interface = parts[0] + plug = parts[1] + slot = parts[2] + notes = parts[3] if len(parts) > 3 else "" + + entry = { + "interface": interface, + "plug": plug, + "slot": slot, + "notes": notes, + } + + if slot != "-": + result["connected"].append(entry) + else: + result["available"].append(entry) + + return result + + def list_flatpak_permissions(self, app_id: str) -> dict[str, Any]: + """ + List permissions for a flatpak application. + + Args: + app_id: Flatpak application ID (e.g., org.mozilla.firefox) + + Returns: + Dictionary with permission categories and values + + Raises: + RuntimeError: If flatpak is not available or permissions cannot be retrieved + """ + if not self._flatpak_available: + raise RuntimeError("Flatpak is not available on this system") + + success, stdout, _ = self._run_command(["flatpak", "info", "--show-permissions", app_id]) + + if not success: + raise RuntimeError(f"Could not get permissions for {app_id}") + + permissions: dict[str, Any] = {} + current_section = "" + + # Parse permission sections - each section is either all key=value pairs (dict) + # or all standalone values (list). Mixed sections are not supported as flatpak + # permissions don't use mixed formats within a single section. + for line in stdout.strip().split("\n"): + if not line: + continue + + if line.startswith("[") and line.endswith("]"): + current_section = line[1:-1] + elif "=" in line and current_section: + if current_section not in permissions: + permissions[current_section] = {} + + if isinstance(permissions[current_section], dict): + key, value = line.split("=", 1) + permissions[current_section][key] = value + elif current_section: + if current_section not in permissions: + permissions[current_section] = [] + + if isinstance(permissions[current_section], list): + permissions[current_section].append(line) + + return permissions + + def modify_snap_permission( + self, snap_name: str, interface: str, action: str, slot: str | None = None + ) -> tuple[bool, str]: + """ + Modify a snap permission (connect/disconnect an interface). + + Note: + This operation may require elevated privileges (sudo) for system-level + interface modifications. The caller (CLI layer) should request user + confirmation before invoking this method with privilege escalation. + + Args: + snap_name: Name of the snap package + interface: Interface/plug to modify + action: 'connect' or 'disconnect' + slot: Optional slot specification. If not provided, connects to the + system/core slot with matching interface name (e.g., :audio-playback). + For connecting to slots in other snaps, use format "snap-name:slot-name". + + Returns: + Tuple of (success, message). On permission errors, success=False with + an error message indicating privilege requirements. + + Example: + # Connect to system slot (most common case) + modify_snap_permission("firefox", "camera", "connect") + + # Connect to specific snap's slot + modify_snap_permission("myapp", "content", "connect", "other-snap:data") + """ + if action not in ("connect", "disconnect"): + return False, f"Invalid action: {action}. Use 'connect' or 'disconnect'" + + if not self._snap_available: + return False, "Snap is not available on this system" + + # Build command with optional slot specification + plug_spec = f"{snap_name}:{interface}" + if slot: + cmd = ["snap", action, plug_spec, slot] + else: + cmd = ["snap", action, plug_spec] + + success, stdout, stderr = self._run_command(cmd) + + if success: + return True, f"Successfully {action}ed {interface} for {snap_name}" + else: + return False, f"Failed to {action} {interface}: {stderr}" + + def modify_flatpak_permission( + self, app_id: str, permission: str, value: str + ) -> tuple[bool, str]: + """ + Modify a flatpak permission using `flatpak override`. + + Args: + app_id: Flatpak application ID + permission: Permission name without leading dashes (e.g., 'filesystem') + value: Value/argument for the permission (e.g., 'home' to produce + '--filesystem=home') + + Returns: + Tuple of (success, message) + """ + if not self._flatpak_available: + return False, "Flatpak is not available on this system" + + # Construct the override command + cmd = ["flatpak", "override", "--user", f"--{permission}={value}", app_id] + success, stdout, stderr = self._run_command(cmd) + + if success: + return True, f"Successfully modified {permission} for {app_id}" + else: + return False, f"Failed to modify permission: {stderr}" + + # ========================================================================= + # Snap Redirect Detection & Management + # ========================================================================= + + def check_snap_redirects(self) -> list[dict[str, str]]: + """ + Detect packages that redirect apt install to snap. + + Returns: + List of dictionaries with 'package' and 'reason' keys + """ + redirects = [] + + # Check for known transitional packages + for pkg in self.KNOWN_SNAP_REDIRECTS: + success, stdout, _ = self._run_command(["apt-cache", "show", pkg]) + if success: + if "dummy" in stdout.lower() or "transitional" in stdout.lower(): + redirects.append( + { + "package": pkg, + "type": "transitional", + "reason": "APT package is a dummy that installs snap version", + } + ) + elif "snap" in stdout.lower(): + redirects.append( + { + "package": pkg, + "type": "snap_meta", + "reason": "Package description mentions snap installation", + } + ) + + # Check snap preference config file + if os.path.exists(self.SNAP_REDIRECT_CONFIG): + redirects.append( + { + "package": "system", + "type": "config", + "reason": f"Snap preference config exists: {self.SNAP_REDIRECT_CONFIG}", + } + ) + + return redirects + + def disable_snap_redirects(self, backup: bool = True) -> tuple[bool, str]: + """ + Disable snap redirects by moving the APT config file. + + WARNING: This modifies system configuration and requires root. + + Args: + backup: Whether to create a backup of the config file + + Returns: + Tuple of (success, message) + """ + config_path = Path(self.SNAP_REDIRECT_CONFIG) + + if not config_path.exists(): + return True, "Snap redirect config not found - redirects may already be disabled" + + # Check if we have write permission + if not os.access(config_path.parent, os.W_OK): + return False, "Permission denied. Run with sudo to disable snap redirects." + + try: + if backup: + backup_path = config_path.with_suffix(".conf.disabled") + if backup_path.exists(): + return ( + True, + f"Snap redirects already disabled. Existing backup preserved at: {backup_path}", + ) + shutil.move(str(config_path), str(backup_path)) + return True, f"Snap redirects disabled. Backup saved to: {backup_path}" + else: + config_path.unlink() + return True, "Snap redirects disabled. Config file removed." + except Exception as e: + return False, f"Failed to disable snap redirects: {e}" + + def restore_snap_redirects(self) -> tuple[bool, str]: + """ + Restore snap redirects from backup. + + Returns: + Tuple of (success, message) + """ + backup_path = Path(self.SNAP_REDIRECT_CONFIG + ".disabled") + config_path = Path(self.SNAP_REDIRECT_CONFIG) + + if not backup_path.exists(): + return False, "No backup found to restore" + + if config_path.exists(): + return False, "Snap redirect config already exists" + + try: + shutil.move(str(backup_path), str(config_path)) + return True, "Snap redirects restored from backup" + except Exception as e: + return False, f"Failed to restore snap redirects: {e}" + + # ========================================================================= + # Storage Analysis + # ========================================================================= + + def analyze_storage(self) -> StorageAnalysis: + """ + Analyze disk usage by package format. + + Returns: + StorageAnalysis with totals and top packages per format + """ + analysis = StorageAnalysis() + + # Analyze DEB packages + deb_packages = self._list_deb_packages() + for pkg in deb_packages: + analysis.deb_total += pkg.size + analysis.deb_packages.append((pkg.name, pkg.size)) + + # Sort and keep top 10 + analysis.deb_packages.sort(key=lambda x: x[1], reverse=True) + analysis.deb_packages = analysis.deb_packages[:10] + + # Analyze Snap packages + if self._snap_available: + success, stdout, _ = self._run_command(["snap", "list", "--all"]) + if success: + # Get snap directory size + snap_dir = Path("/var/lib/snapd/snaps") + if snap_dir.exists(): + for snap_file in snap_dir.glob("*.snap"): + size = snap_file.stat().st_size + name = snap_file.stem.rsplit("_", 1)[0] + analysis.snap_total += size + analysis.snap_packages.append((name, size)) + + analysis.snap_packages.sort(key=lambda x: x[1], reverse=True) + # Deduplicate (keep largest per name) and limit to top 10 + seen = set() + deduped = [] + for name, size in analysis.snap_packages: + if name not in seen: + seen.add(name) + deduped.append((name, size)) + analysis.snap_packages = deduped[:10] + + # Analyze Flatpak packages + flatpak_packages = self._list_flatpak_packages() + for pkg in flatpak_packages: + analysis.flatpak_total += pkg.size + analysis.flatpak_packages.append((pkg.name, pkg.size)) + + analysis.flatpak_packages.sort(key=lambda x: x[1], reverse=True) + analysis.flatpak_packages = analysis.flatpak_packages[:10] + + return analysis + + def format_storage_analysis(self, analysis: StorageAnalysis) -> str: + """ + Format storage analysis for display. + + Args: + analysis: StorageAnalysis object + + Returns: + Formatted string for display + """ + lines = [] + lines.append("=" * 60) + lines.append("Storage Analysis by Package Format") + lines.append("=" * 60) + + def format_size(size: int) -> str: + if size >= 1024**3: + return f"{size / 1024**3:.2f} GB" + elif size >= 1024**2: + return f"{size / 1024**2:.2f} MB" + elif size >= 1024: + return f"{size / 1024:.2f} KB" + return f"{size} B" + + # Summary + total = analysis.deb_total + analysis.snap_total + analysis.flatpak_total + lines.append(f"\nTotal Package Storage: {format_size(total)}") + lines.append("-" * 40) + lines.append(f" DEB/APT: {format_size(analysis.deb_total)}") + lines.append(f" Snap: {format_size(analysis.snap_total)}") + lines.append(f" Flatpak: {format_size(analysis.flatpak_total)}") + + # Top packages per format + if analysis.deb_packages: + lines.append("\nTop DEB Packages:") + for name, size in analysis.deb_packages[:5]: + lines.append(f" {name}: {format_size(size)}") + + if analysis.snap_packages: + lines.append("\nTop Snap Packages:") + for name, size in analysis.snap_packages[:5]: + lines.append(f" {name}: {format_size(size)}") + + if analysis.flatpak_packages: + lines.append("\nTop Flatpak Packages:") + for name, size in analysis.flatpak_packages[:5]: + lines.append(f" {name}: {format_size(size)}") + + return "\n".join(lines) diff --git a/docs/UNIFIED_PACKAGE_MANAGER.md b/docs/UNIFIED_PACKAGE_MANAGER.md new file mode 100644 index 00000000..9552c20d --- /dev/null +++ b/docs/UNIFIED_PACKAGE_MANAGER.md @@ -0,0 +1,116 @@ +# Unified Package Manager + +> Addresses Issue [#450](https://github.com/cortexlinux/cortex/issues/450): Snap/Flatpak Unified Manager + +## Overview + +The Unified Package Manager provides transparency and control over package sources on Ubuntu/Debian systems. It helps users understand whether packages are installed as .deb, snap, or flatpak, and provides tools to manage permissions and disable "forced" snap installations. + +## Features + +1. **Package Source Detection** - See where a package is available (deb/snap/flatpak) +2. **Package Comparison** - Compare versions and sizes across formats +3. **Installed Package Listing** - List packages by format +4. **Permission Management** - View and modify snap/flatpak permissions (like Flatseal) +5. **Snap Redirect Detection** - Find packages that redirect apt→snap +6. **Storage Analysis** - See disk usage breakdown by format + +## Usage + +### Check Package Sources + +```bash +# See where a package is available +cortex pkg sources firefox + +# Output shows availability in each format with version info +``` + +### Compare Packages + +```bash +# Compare package across all formats +cortex pkg compare firefox + +# Shows versions, sizes, and installation status for each format +``` + +### List Installed Packages + +```bash +# List all packages by format +cortex pkg list + +# Filter by format +cortex pkg list --format snap +cortex pkg list --format flatpak +cortex pkg list --format deb +``` + +### View Permissions + +```bash +# View snap permissions (interfaces) +cortex pkg permissions firefox + +# View flatpak permissions +cortex pkg permissions org.mozilla.firefox --format flatpak +``` + +### Analyze Storage + +```bash +# See storage breakdown by package format +cortex pkg storage + +# Shows total size and top packages for each format +``` + +### Manage Snap Redirects + +```bash +# Check for snap redirects (apt packages that secretly install snaps) +cortex pkg snap-redirects + +# Disable snap redirects (requires sudo) +sudo cortex pkg snap-redirects --disable +``` + +## Technical Details + +### Module: `cortex/unified_package_manager.py` + +Core classes: + +- `UnifiedPackageManager` - Main manager class +- `PackageFormat` - Enum for deb/snap/flatpak +- `PackageInfo` - Package metadata dataclass +- `StorageAnalysis` - Storage breakdown dataclass + +### CLI Commands + +All commands are under the `cortex pkg` namespace: + +| Command | Description | +| --------------------------- | ----------------------- | +| `pkg sources ` | Show available sources | +| `pkg compare ` | Compare across formats | +| `pkg list [--format]` | List installed packages | +| `pkg permissions ` | View permissions | +| `pkg storage` | Storage analysis | +| `pkg snap-redirects` | Check/disable redirects | + +## Safety Notes + +> [!WARNING] +> The `--disable` option for snap-redirects modifies system configuration at `/etc/apt/apt.conf.d/20snapd.conf`. A backup is created automatically, and the change can be reverted by restoring the backup. + +## Testing + +Run unit tests: + +```bash +python -m pytest tests/test_unified_package_manager.py -v +``` + +The test suite includes 30+ test cases covering all functionality with mocked subprocess calls. diff --git a/tests/test_unified_package_manager.py b/tests/test_unified_package_manager.py new file mode 100644 index 00000000..aada8496 --- /dev/null +++ b/tests/test_unified_package_manager.py @@ -0,0 +1,606 @@ +#!/usr/bin/env python3 +""" +Unit tests for the Unified Package Manager. + +Tests cover all major functionality including: +- Package source detection (deb, snap, flatpak) +- Installed package listing +- Package comparison +- Permission management +- Snap redirect detection +- Storage analysis +""" + +import os +import sys +import unittest +from unittest.mock import patch + +# Add project root to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from cortex.unified_package_manager import ( + PackageFormat, + PackageInfo, + StorageAnalysis, + UnifiedPackageManager, +) + + +class TestUnifiedPackageManager(unittest.TestCase): + """Test cases for UnifiedPackageManager class.""" + + def setUp(self): + """Set up test fixtures.""" + # Mock command availability checks + with patch.object(UnifiedPackageManager, "_check_command_available") as mock_check: + mock_check.return_value = True + self.upm = UnifiedPackageManager() + + # ========================================================================= + # Package Source Detection Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_deb_package_installed(self, mock_run): + """Test detection of installed deb package.""" + mock_run.return_value = (True, "install ok installed|1.0.0|1024", "") + + result = self.upm._check_deb_package("test-package") + + self.assertIsNotNone(result) + self.assertEqual(result.name, "test-package") + self.assertEqual(result.format, PackageFormat.DEB) + self.assertEqual(result.version, "1.0.0") + self.assertTrue(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_deb_package_available(self, mock_run): + """Test detection of available but not installed deb package.""" + # First call returns not installed, second returns apt-cache info + mock_run.side_effect = [ + (False, "", ""), + (True, "Package: test-package\nVersion: 2.0.0\nDescription: A test package", ""), + ] + + result = self.upm._check_deb_package("test-package") + + self.assertIsNotNone(result) + self.assertEqual(result.version, "2.0.0") + self.assertFalse(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_deb_transitional_package(self, mock_run): + """Test detection of transitional/dummy packages returns None.""" + mock_run.side_effect = [ + (False, "", ""), + (True, "Package: firefox\nDescription: dummy package for snap", ""), + ] + + result = self.upm._check_deb_package("firefox") + + self.assertIsNone(result) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_snap_package_installed(self, mock_run): + """Test detection of installed snap package.""" + mock_run.return_value = ( + True, + "Name Version Rev Tracking Publisher Notes\nfirefox 120.0 1234 latest mozilla -", + "", + ) + + result = self.upm._check_snap_package("firefox") + + self.assertIsNotNone(result) + self.assertEqual(result.name, "firefox") + self.assertEqual(result.format, PackageFormat.SNAP) + self.assertTrue(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_snap_package_available(self, mock_run): + """Test detection of available snap package.""" + mock_run.side_effect = [ + (False, "", ""), + (True, "name: test-snap\nsummary: A test snap\nstable: 1.0.0 2024-01-01", ""), + ] + + result = self.upm._check_snap_package("test-snap") + + self.assertIsNotNone(result) + self.assertEqual(result.format, PackageFormat.SNAP) + self.assertFalse(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_flatpak_package_installed(self, mock_run): + """Test detection of installed flatpak package.""" + mock_run.return_value = ( + True, + "org.mozilla.firefox\t120.0", + "", + ) + + result = self.upm._check_flatpak_package("firefox") + + self.assertIsNotNone(result) + self.assertEqual(result.name, "org.mozilla.firefox") + self.assertEqual(result.format, PackageFormat.FLATPAK) + self.assertTrue(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_flatpak_package_available(self, mock_run): + """Test detection of available flatpak package.""" + mock_run.side_effect = [ + (True, "", ""), # Not in installed list + (True, "org.test.App\t1.0.0\tA test application", ""), + ] + + result = self.upm._check_flatpak_package("test") + + self.assertIsNotNone(result) + self.assertEqual(result.name, "org.test.App") + + @patch.object(UnifiedPackageManager, "_check_deb_package") + @patch.object(UnifiedPackageManager, "_check_snap_package") + @patch.object(UnifiedPackageManager, "_check_flatpak_package") + def test_detect_package_sources(self, mock_flatpak, mock_snap, mock_deb): + """Test combined package source detection.""" + mock_deb.return_value = PackageInfo("test", PackageFormat.DEB, "1.0") + mock_snap.return_value = PackageInfo("test", PackageFormat.SNAP, "2.0") + mock_flatpak.return_value = None + + result = self.upm.detect_package_sources("test") + + self.assertIn("deb", result) + self.assertIn("snap", result) + self.assertIn("flatpak", result) + self.assertIsNotNone(result["deb"]) + self.assertIsNotNone(result["snap"]) + self.assertIsNone(result["flatpak"]) + + # ========================================================================= + # Package Listing Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_deb_packages(self, mock_run): + """Test listing installed deb packages.""" + mock_run.return_value = ( + True, + "package1|1.0.0|1024\npackage2|2.0.0|2048\n", + "", + ) + + result = self.upm._list_deb_packages() + + self.assertEqual(len(result), 2) + self.assertEqual(result[0].name, "package1") + self.assertEqual(result[1].name, "package2") + self.assertEqual(result[0].format, PackageFormat.DEB) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_snap_packages(self, mock_run): + """Test listing installed snap packages.""" + mock_run.return_value = ( + True, + "Name Version Rev Tracking Publisher Notes\ncore 16-2.58 14447 latest/stable canonical✓ -\nfirefox 120.0 1234 latest mozilla -", + "", + ) + + result = self.upm._list_snap_packages() + + self.assertEqual(len(result), 2) + self.assertEqual(result[0].name, "core") + self.assertEqual(result[1].name, "firefox") + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_flatpak_packages(self, mock_run): + """Test listing installed flatpak packages.""" + mock_run.return_value = ( + True, + "org.mozilla.firefox\t120.0\t500 MB", + "", + ) + + result = self.upm._list_flatpak_packages() + + self.assertEqual(len(result), 1) + self.assertEqual(result[0].name, "org.mozilla.firefox") + self.assertEqual(result[0].format, PackageFormat.FLATPAK) + + @patch.object(UnifiedPackageManager, "_list_deb_packages") + @patch.object(UnifiedPackageManager, "_list_snap_packages") + @patch.object(UnifiedPackageManager, "_list_flatpak_packages") + def test_list_installed_packages_all(self, mock_flatpak, mock_snap, mock_deb): + """Test listing all installed packages.""" + mock_deb.return_value = [PackageInfo("pkg1", PackageFormat.DEB)] + mock_snap.return_value = [PackageInfo("pkg2", PackageFormat.SNAP)] + mock_flatpak.return_value = [PackageInfo("pkg3", PackageFormat.FLATPAK)] + + result = self.upm.list_installed_packages() + + self.assertEqual(len(result["deb"]), 1) + self.assertEqual(len(result["snap"]), 1) + self.assertEqual(len(result["flatpak"]), 1) + + @patch.object(UnifiedPackageManager, "_list_deb_packages") + @patch.object(UnifiedPackageManager, "_list_snap_packages") + @patch.object(UnifiedPackageManager, "_list_flatpak_packages") + def test_list_installed_packages_filtered(self, mock_flatpak, mock_snap, mock_deb): + """Test listing packages with format filter.""" + mock_snap.return_value = [PackageInfo("snap-pkg", PackageFormat.SNAP)] + + result = self.upm.list_installed_packages(format_filter=PackageFormat.SNAP) + + self.assertEqual(len(result["snap"]), 1) + self.assertEqual(len(result["deb"]), 0) + self.assertEqual(len(result["flatpak"]), 0) + + # ========================================================================= + # Package Comparison Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "detect_package_sources") + def test_compare_package_options(self, mock_detect): + """Test package comparison across formats.""" + mock_detect.return_value = { + "deb": PackageInfo("test", PackageFormat.DEB, "1.0", installed=True), + "snap": PackageInfo("test", PackageFormat.SNAP, "2.0"), + "flatpak": None, + } + + result = self.upm.compare_package_options("test") + + self.assertEqual(result["package_name"], "test") + self.assertIn("deb", result["available_formats"]) + self.assertIn("snap", result["available_formats"]) + self.assertNotIn("flatpak", result["available_formats"]) + self.assertEqual(result["installed_as"], "deb") + + # ========================================================================= + # Permission Management Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_snap_permissions(self, mock_run): + """Test listing snap permissions.""" + mock_run.return_value = ( + True, + "Interface Plug Slot Notes\naudio-playback firefox:audio-playback :audio-playback -\nnetwork firefox:network - -", + "", + ) + + result = self.upm.list_snap_permissions("firefox") + + self.assertIn("connected", result) + self.assertIn("available", result) + self.assertGreaterEqual(len(result["connected"]), 1) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_flatpak_permissions(self, mock_run): + """Test listing flatpak permissions.""" + mock_run.return_value = ( + True, + "[Context]\nshared=network;ipc;\n\n[filesystems]\nhome\nxdg-data/themes\n\n[Session Bus Policy]\norg.freedesktop.Notifications=talk\n", + "", + ) + + result = self.upm.list_flatpak_permissions("org.test.App") + + self.assertIn("Context", result) + self.assertIsInstance(result["Context"], dict) + self.assertEqual(result["Context"]["shared"], "network;ipc;") + + self.assertIn("filesystems", result) + self.assertIsInstance(result["filesystems"], list) + self.assertEqual(len(result["filesystems"]), 2) + self.assertIn("home", result["filesystems"]) + + self.assertIn("Session Bus Policy", result) + self.assertIsInstance(result["Session Bus Policy"], dict) + self.assertEqual(result["Session Bus Policy"]["org.freedesktop.Notifications"], "talk") + + @patch.object(UnifiedPackageManager, "_run_command") + def test_modify_snap_permission_connect(self, mock_run): + """Test connecting a snap interface.""" + mock_run.return_value = (True, "", "") + + success, message = self.upm.modify_snap_permission("firefox", "camera", "connect") + + self.assertTrue(success) + self.assertIn("connect", message.lower()) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_modify_snap_permission_disconnect(self, mock_run): + """Test disconnecting a snap interface.""" + mock_run.return_value = (True, "", "") + + success, message = self.upm.modify_snap_permission("firefox", "camera", "disconnect") + + self.assertTrue(success) + + def test_modify_snap_permission_invalid_action(self): + """Test invalid action for snap permission modification.""" + success, message = self.upm.modify_snap_permission("firefox", "camera", "invalid") + + self.assertFalse(success) + self.assertIn("Invalid action", message) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_modify_flatpak_permission(self, mock_run): + """Test modifying flatpak permission.""" + mock_run.return_value = (True, "", "") + + success, message = self.upm.modify_flatpak_permission("org.test.App", "filesystem", "home") + + self.assertTrue(success) + + # ========================================================================= + # Snap Redirect Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_run_command") + @patch("os.path.exists") + def test_check_snap_redirects(self, mock_exists, mock_run): + """Test detection of snap redirects.""" + mock_run.return_value = ( + True, + "Package: firefox\nDescription: dummy transitional package", + "", + ) + mock_exists.return_value = True + + result = self.upm.check_snap_redirects() + + self.assertGreaterEqual(len(result), 1) + # Should find Firefox as transitional + firefox_redirect = next((r for r in result if r["package"] == "firefox"), None) + self.assertIsNotNone(firefox_redirect) + + @patch("pathlib.Path.exists") + @patch("os.access") + @patch("shutil.move") + def test_disable_snap_redirects_success(self, mock_move, mock_access, mock_exists): + """Test successful disabling of snap redirects.""" + mock_exists.return_value = True + mock_access.return_value = True + mock_move.return_value = None + + success, message = self.upm.disable_snap_redirects() + + self.assertTrue(success) + self.assertIn("disabled", message.lower()) + + @patch("pathlib.Path.exists") + def test_disable_snap_redirects_not_found(self, mock_exists): + """Test disabling when config doesn't exist.""" + mock_exists.return_value = False + + success, message = self.upm.disable_snap_redirects() + + self.assertTrue(success) + self.assertIn("not found", message.lower()) + + @patch("pathlib.Path.exists") + @patch("os.access") + def test_disable_snap_redirects_permission_denied(self, mock_access, mock_exists): + """Test disabling without root permissions.""" + mock_exists.return_value = True + mock_access.return_value = False + + success, message = self.upm.disable_snap_redirects() + + self.assertFalse(success) + self.assertIn("Permission denied", message) + + @patch("pathlib.Path.exists") + @patch("os.access") + def test_disable_snap_redirects_backup_already_exists(self, mock_access, mock_exists): + """Test disabling when backup already exists - should preserve existing backup.""" + # config_path.exists() = True, then backup_path.exists() = True + mock_exists.side_effect = [True, True] + mock_access.return_value = True + + success, message = self.upm.disable_snap_redirects() + + self.assertTrue(success) + self.assertIn("already disabled", message.lower()) + self.assertIn("preserved", message.lower()) + + @patch("pathlib.Path.exists") + @patch("shutil.move") + def test_restore_snap_redirects_success(self, mock_move, mock_exists): + """Test successful restore of snap redirects from backup.""" + # backup exists, config doesn't exist + mock_exists.side_effect = lambda: True # backup exists + + with patch("pathlib.Path.exists") as mock_path_exists: + # First call for backup_path.exists() = True, second for config_path.exists() = False + mock_path_exists.side_effect = [True, False] + mock_move.return_value = None + + success, message = self.upm.restore_snap_redirects() + + self.assertTrue(success) + self.assertIn("restored", message.lower()) + mock_move.assert_called_once() + + @patch("pathlib.Path.exists") + def test_restore_snap_redirects_no_backup(self, mock_exists): + """Test restore when no backup exists.""" + mock_exists.return_value = False + + success, message = self.upm.restore_snap_redirects() + + self.assertFalse(success) + self.assertIn("No backup found", message) + + @patch("pathlib.Path.exists") + def test_restore_snap_redirects_config_exists(self, mock_exists): + """Test restore when config already exists.""" + # backup exists, config also exists + mock_exists.side_effect = [True, True] + + success, message = self.upm.restore_snap_redirects() + + self.assertFalse(success) + self.assertIn("already exists", message) + + @patch("pathlib.Path.exists") + @patch("shutil.move") + def test_restore_snap_redirects_move_failure(self, mock_move, mock_exists): + """Test restore when shutil.move raises an exception.""" + # backup exists, config doesn't exist + mock_exists.side_effect = [True, False] + mock_move.side_effect = OSError("Permission denied") + + success, message = self.upm.restore_snap_redirects() + + self.assertFalse(success) + self.assertIn("Failed to restore", message) + self.assertIn("Permission denied", message) + + # ========================================================================= + # Storage Analysis Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_list_deb_packages") + @patch.object(UnifiedPackageManager, "_list_flatpak_packages") + @patch.object(UnifiedPackageManager, "_run_command") + def test_analyze_storage(self, mock_run, mock_flatpak, mock_deb): + """Test storage analysis.""" + mock_deb.return_value = [ + PackageInfo("pkg1", PackageFormat.DEB, "1.0", size=1024000), + PackageInfo("pkg2", PackageFormat.DEB, "2.0", size=2048000), + ] + mock_flatpak.return_value = [ + PackageInfo("org.test.App", PackageFormat.FLATPAK, "1.0", size=500000000), + ] + mock_run.return_value = (True, "", "") + + # Mock Path.exists and glob for snap analysis + with patch("pathlib.Path.exists", return_value=False): + result = self.upm.analyze_storage() + + self.assertIsInstance(result, StorageAnalysis) + self.assertEqual(result.deb_total, 3072000) + self.assertEqual(result.flatpak_total, 500000000) + + def test_parse_size_string(self): + """Test parsing of size strings.""" + self.assertEqual(self.upm._parse_size_string("1.5 GB"), int(1.5 * 1024**3)) + self.assertEqual(self.upm._parse_size_string("500 MB"), int(500 * 1024**2)) + self.assertEqual(self.upm._parse_size_string("1024 KB"), int(1024 * 1024)) + self.assertEqual(self.upm._parse_size_string(""), 0) + + def test_format_storage_analysis(self): + """Test formatting of storage analysis.""" + analysis = StorageAnalysis( + deb_total=1024**3, # 1 GB + snap_total=512 * 1024**2, # 512 MB + flatpak_total=256 * 1024**2, # 256 MB + snap_packages=[("firefox", 200 * 1024**2)], + flatpak_packages=[("org.test.App", 256 * 1024**2)], + ) + + result = self.upm.format_storage_analysis(analysis) + + self.assertIn("Storage Analysis", result) + self.assertIn("DEB/APT", result) + self.assertIn("Snap", result) + self.assertIn("Flatpak", result) + self.assertIn("GB", result) # Should show GB for large sizes + + # ========================================================================= + # Error Handling Tests + # ========================================================================= + + def test_command_not_available(self): + """Test handling when commands are not available.""" + with patch.object(UnifiedPackageManager, "_check_command_available") as mock_check: + mock_check.return_value = False + upm = UnifiedPackageManager() + + self.assertFalse(upm._snap_available) + self.assertFalse(upm._flatpak_available) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_snap_permissions_unavailable(self, mock_run): + """Test listing permissions when snap unavailable.""" + self.upm._snap_available = False + + with self.assertRaises(RuntimeError) as context: + self.upm.list_snap_permissions("test") + + self.assertIn("not available", str(context.exception)) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_flatpak_permissions_unavailable(self, mock_run): + """Test listing permissions when flatpak unavailable.""" + self.upm._flatpak_available = False + + with self.assertRaises(RuntimeError) as context: + self.upm.list_flatpak_permissions("org.test.App") + + self.assertIn("not available", str(context.exception)) + + +class TestPackageInfo(unittest.TestCase): + """Test cases for PackageInfo dataclass.""" + + def test_package_info_creation(self): + """Test creation of PackageInfo instance.""" + info = PackageInfo( + name="test-package", + format=PackageFormat.DEB, + version="1.0.0", + size=1024, + installed=True, + description="A test package", + ) + + self.assertEqual(info.name, "test-package") + self.assertEqual(info.format, PackageFormat.DEB) + self.assertEqual(info.version, "1.0.0") + self.assertEqual(info.size, 1024) + self.assertTrue(info.installed) + + def test_package_info_defaults(self): + """Test default values for PackageInfo.""" + info = PackageInfo(name="test", format=PackageFormat.SNAP) + + self.assertEqual(info.version, "") + self.assertEqual(info.size, 0) + self.assertFalse(info.installed) + self.assertEqual(info.permissions, []) + + +class TestStorageAnalysis(unittest.TestCase): + """Test cases for StorageAnalysis dataclass.""" + + def test_storage_analysis_creation(self): + """Test creation of StorageAnalysis instance.""" + analysis = StorageAnalysis( + deb_total=1024, + snap_total=2048, + flatpak_total=4096, + ) + + self.assertEqual(analysis.deb_total, 1024) + self.assertEqual(analysis.snap_total, 2048) + self.assertEqual(analysis.flatpak_total, 4096) + + def test_storage_analysis_defaults(self): + """Test default values for StorageAnalysis.""" + analysis = StorageAnalysis() + + self.assertEqual(analysis.deb_total, 0) + self.assertEqual(analysis.snap_total, 0) + self.assertEqual(analysis.flatpak_total, 0) + self.assertEqual(analysis.deb_packages, []) + self.assertEqual(analysis.snap_packages, []) + self.assertEqual(analysis.flatpak_packages, []) + + +if __name__ == "__main__": + unittest.main()