From 4069188fc5ee3e0ebe31fa09e677588204f69e42 Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Sun, 11 Jan 2026 23:35:25 +0000 Subject: [PATCH 01/13] feat: add unified package manager for snap/flatpak/deb Addresses #450 - Snap/Flatpak Unified Manager Features: - Package source detection (deb/snap/flatpak) - Package comparison across formats - Permission management (snap interfaces, flatpak permissions) - Snap redirect detection and removal - Storage analysis by package format New CLI commands: - cortex pkg sources - cortex pkg compare - cortex pkg list [--format] - cortex pkg permissions - cortex pkg storage - cortex pkg snap-redirects [--disable] Includes 30+ unit tests and documentation. --- cortex/__init__.py | 17 +- cortex/cli.py | 243 ++++++++ cortex/unified_package_manager.py | 806 ++++++++++++++++++++++++++ docs/UNIFIED_PACKAGE_MANAGER.md | 116 ++++ tests/test_unified_package_manager.py | 529 +++++++++++++++++ 5 files changed, 1710 insertions(+), 1 deletion(-) create mode 100644 cortex/unified_package_manager.py create mode 100644 docs/UNIFIED_PACKAGE_MANAGER.md create mode 100644 tests/test_unified_package_manager.py diff --git a/cortex/__init__.py b/cortex/__init__.py index dcf98a77..0639dc6d 100644 --- a/cortex/__init__.py +++ b/cortex/__init__.py @@ -1,7 +1,22 @@ from .cli import main from .env_loader import load_env from .packages import PackageManager, PackageManagerType +from .unified_package_manager import ( + PackageFormat, + PackageInfo, + StorageAnalysis, + UnifiedPackageManager, +) __version__ = "0.1.0" -__all__ = ["main", "load_env", "PackageManager", "PackageManagerType"] +__all__ = [ + "main", + "load_env", + "PackageManager", + "PackageManagerType", + "UnifiedPackageManager", + "PackageFormat", + "PackageInfo", + "StorageAnalysis", +] diff --git a/cortex/cli.py b/cortex/cli.py index 9261a816..d7247119 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2001,6 +2001,202 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: console.print(f"Error: {result.error_message}", style="red") return 1 + # --- Unified Package Manager Commands (Issue #450) --- + def pkg(self, args: argparse.Namespace) -> int: + """Handle unified package manager commands for snap/flatpak/deb.""" + from cortex.unified_package_manager import ( + PackageFormat, + UnifiedPackageManager, + ) + + upm = UnifiedPackageManager() + action = getattr(args, "pkg_action", None) + + if not action: + cx_print("\n📦 Unified Package Manager - Snap/Flatpak/Deb\n", "info") + console.print("Usage: cortex pkg [options]") + console.print("\nCommands:") + console.print(" sources Show available sources for a package") + console.print(" compare Compare package across formats") + console.print(" list [--format FORMAT] List installed packages") + console.print(" permissions Show package permissions") + console.print(" storage Analyze storage by format") + console.print(" snap-redirects [--disable] Check/disable snap redirects") + return 0 + + if action == "sources": + return self._pkg_sources(upm, args) + elif action == "compare": + return self._pkg_compare(upm, args) + elif action == "list": + return self._pkg_list(upm, args) + elif action == "permissions": + return self._pkg_permissions(upm, args) + elif action == "storage": + return self._pkg_storage(upm) + elif action == "snap-redirects": + return self._pkg_snap_redirects(upm, args) + else: + self._print_error(f"Unknown pkg action: {action}") + return 1 + + def _pkg_sources(self, upm, args: argparse.Namespace) -> int: + """Show available sources for a package.""" + package = args.package + cx_print(f"\n🔍 Checking sources for '{package}'...\n", "info") + + sources = upm.detect_package_sources(package) + + found_any = False + for format_name, info in sources.items(): + if info is not None: + found_any = True + status = "[green]✓ Installed[/green]" if info.installed else "[dim]Available[/dim]" + console.print(f" [{format_name.upper()}] {status}") + console.print(f" Version: {info.version or 'N/A'}") + if info.description: + console.print(f" {info.description[:60]}...") + + if not found_any: + cx_print(f"No sources found for '{package}'", "warning") + + return 0 + + def _pkg_compare(self, upm, args: argparse.Namespace) -> int: + """Compare package across formats.""" + package = args.package + cx_print(f"\n📊 Comparing '{package}' across formats...\n", "info") + + comparison = upm.compare_package_options(package) + + if not comparison["available_formats"]: + cx_print(f"Package '{package}' not found in any format", "warning") + return 1 + + console.print(f"[bold]Package:[/bold] {comparison['package_name']}") + if comparison["installed_as"]: + console.print(f"[bold]Installed as:[/bold] {comparison['installed_as']}") + console.print(f"[bold]Available in:[/bold] {', '.join(comparison['available_formats'])}") + console.print() + + # Show comparison table + for fmt, data in comparison["comparison"].items(): + status = "[green]Installed[/green]" if data["installed"] else "[dim]Not installed[/dim]" + console.print(f"[cyan]{fmt.upper()}[/cyan]: {status}") + console.print(f" Version: {data['version'] or 'N/A'}") + if data["size"]: + size_mb = data["size"] / (1024 * 1024) + console.print(f" Size: {size_mb:.1f} MB") + + return 0 + + def _pkg_list(self, upm, args: argparse.Namespace) -> int: + """List installed packages by format.""" + from cortex.unified_package_manager import PackageFormat + + format_filter = None + if hasattr(args, "format") and args.format: + format_map = { + "deb": PackageFormat.DEB, + "snap": PackageFormat.SNAP, + "flatpak": PackageFormat.FLATPAK, + } + format_filter = format_map.get(args.format.lower()) + + cx_print("\n📦 Installed Packages\n", "info") + + packages = upm.list_installed_packages(format_filter) + + for fmt, pkgs in packages.items(): + if pkgs: + console.print(f"\n[cyan][{fmt.upper()}][/cyan] ({len(pkgs)} packages)") + for pkg in pkgs[:10]: # Show top 10 + console.print(f" • {pkg.name} ({pkg.version})") + if len(pkgs) > 10: + console.print(f" ... and {len(pkgs) - 10} more") + + return 0 + + def _pkg_permissions(self, upm, args: argparse.Namespace) -> int: + """Show package permissions.""" + package = args.package + fmt = getattr(args, "format", None) + + cx_print(f"\n🔐 Permissions for '{package}'\n", "info") + + if fmt == "flatpak" or "." in package: + # Likely a flatpak app ID + perms = upm.list_flatpak_permissions(package) + if "error" in perms: + self._print_error(perms["error"]) + return 1 + + for section, values in perms.items(): + console.print(f"[cyan][{section}][/cyan]") + if isinstance(values, dict): + for k, v in values.items(): + console.print(f" {k} = {v}") + elif isinstance(values, list): + for v in values: + console.print(f" {v}") + else: + # Assume snap + perms = upm.list_snap_permissions(package) + if "error" in perms: + self._print_error(perms["error"]) + return 1 + + if perms["connected"]: + console.print("[green]Connected Interfaces:[/green]") + for conn in perms["connected"]: + console.print(f" • {conn['interface']}: {conn['plug']} → {conn['slot']}") + + if perms["available"]: + console.print("\n[dim]Available (not connected):[/dim]") + for avail in perms["available"]: + console.print(f" • {avail['interface']}") + + return 0 + + def _pkg_storage(self, upm) -> int: + """Analyze storage by package format.""" + cx_print("\n💾 Analyzing storage...\n", "info") + + analysis = upm.analyze_storage() + output = upm.format_storage_analysis(analysis) + console.print(output) + + return 0 + + def _pkg_snap_redirects(self, upm, args: argparse.Namespace) -> int: + """Check or disable snap redirects.""" + disable = getattr(args, "disable", False) + + if disable: + cx_print("\n⚠️ Disabling snap redirects...\n", "warning") + success, message = upm.disable_snap_redirects() + if success: + cx_print(message, "success") + else: + self._print_error(message) + return 0 if success else 1 + + cx_print("\n🔍 Checking for snap redirects...\n", "info") + redirects = upm.check_snap_redirects() + + if not redirects: + cx_print("No snap redirects detected", "success") + return 0 + + console.print(f"Found {len(redirects)} potential snap redirects:\n") + for redirect in redirects: + console.print(f" [yellow]•[/yellow] {redirect['package']}") + console.print(f" Type: {redirect['type']}") + console.print(f" {redirect['reason']}") + + console.print("\n[dim]To disable: cortex pkg snap-redirects --disable[/dim]") + return 0 + # -------------------------- @@ -2270,6 +2466,51 @@ def main(): sandbox_exec_parser.add_argument("command", nargs="+", help="Command to execute") # -------------------------- + # --- Unified Package Manager Commands (Issue #450) --- + pkg_parser = subparsers.add_parser( + "pkg", help="Unified package manager (snap/flatpak/deb)" + ) + pkg_subs = pkg_parser.add_subparsers(dest="pkg_action", help="Package manager actions") + + # pkg sources + pkg_sources_parser = pkg_subs.add_parser("sources", help="Show available sources for a package") + pkg_sources_parser.add_argument("package", help="Package name to search for") + + # pkg compare + pkg_compare_parser = pkg_subs.add_parser("compare", help="Compare package across formats") + pkg_compare_parser.add_argument("package", help="Package name to compare") + + # pkg list [--format FORMAT] + pkg_list_parser = pkg_subs.add_parser("list", help="List installed packages") + pkg_list_parser.add_argument( + "--format", + choices=["deb", "snap", "flatpak"], + help="Filter by package format", + ) + + # pkg permissions [--format FORMAT] + pkg_permissions_parser = pkg_subs.add_parser("permissions", help="Show package permissions") + pkg_permissions_parser.add_argument("package", help="Package name or app ID") + pkg_permissions_parser.add_argument( + "--format", + choices=["snap", "flatpak"], + help="Package format (auto-detected if not specified)", + ) + + # pkg storage + pkg_subs.add_parser("storage", help="Analyze storage usage by package format") + + # pkg snap-redirects [--disable] + pkg_redirects_parser = pkg_subs.add_parser( + "snap-redirects", help="Check or disable snap redirects" + ) + pkg_redirects_parser.add_argument( + "--disable", + action="store_true", + help="Disable snap redirects (requires sudo)", + ) + # -------------------------- + # --- Environment Variable Management Commands --- env_parser = subparsers.add_parser("env", help="Manage environment variables") env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions") @@ -2531,6 +2772,8 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command == "pkg": + return cli.pkg(args) else: parser.print_help() return 1 diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py new file mode 100644 index 00000000..06fffa7d --- /dev/null +++ b/cortex/unified_package_manager.py @@ -0,0 +1,806 @@ +#!/usr/bin/env python3 +""" +Unified Package Manager for Cortex Linux + +Provides a unified interface for managing packages across multiple formats: +- APT/DEB (traditional Debian packages) +- Snap (Canonical's universal packages) +- Flatpak (cross-distribution application packages) + +Addresses issue #450: Snap/Flatpak confusion and transparency. +""" + +import json +import logging +import os +import re +import shutil +import subprocess +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + + +class PackageFormat(Enum): + """Supported package formats.""" + + DEB = "deb" + SNAP = "snap" + FLATPAK = "flatpak" + + +@dataclass +class PackageInfo: + """Information about a package in a specific format.""" + + name: str + format: PackageFormat + version: str = "" + size: int = 0 # Size in bytes + installed: bool = False + description: str = "" + permissions: list[str] = field(default_factory=list) + + +@dataclass +class StorageAnalysis: + """Storage usage analysis by package format.""" + + deb_total: int = 0 + snap_total: int = 0 + flatpak_total: int = 0 + deb_packages: list[tuple[str, int]] = field(default_factory=list) + snap_packages: list[tuple[str, int]] = field(default_factory=list) + flatpak_packages: list[tuple[str, int]] = field(default_factory=list) + + +class UnifiedPackageManager: + """ + Unified manager for APT, Snap, and Flatpak packages. + + Provides transparency about package sources and enables users to: + - See true package source (deb vs snap vs flatpak) + - Compare package options across formats + - Manage permissions for sandboxed packages + - Detect and optionally disable snap redirects + - Analyze storage usage by format + """ + + # Known transitional packages that redirect to snap + KNOWN_SNAP_REDIRECTS = { + "firefox", + "chromium-browser", + "thunderbird", + "libreoffice", + "gnome-calculator", + "gnome-characters", + "gnome-logs", + "gnome-system-monitor", + } + + SNAP_REDIRECT_CONFIG = "/etc/apt/apt.conf.d/20snapd.conf" + + def __init__(self) -> None: + """Initialize the unified package manager.""" + self._snap_available = self._check_command_available("snap") + self._flatpak_available = self._check_command_available("flatpak") + self._apt_available = self._check_command_available("apt") + + def _check_command_available(self, command: str) -> bool: + """Check if a command is available on the system.""" + return shutil.which(command) is not None + + def _run_command( + self, cmd: list[str], timeout: int = 30 + ) -> tuple[bool, str, str]: + """ + Run a shell command and return success status, stdout, stderr. + + Args: + cmd: Command to run as list of arguments + timeout: Timeout in seconds + + Returns: + Tuple of (success, stdout, stderr) + """ + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + ) + return result.returncode == 0, result.stdout, result.stderr + except subprocess.TimeoutExpired: + logger.warning(f"Command timed out: {' '.join(cmd)}") + return False, "", "Command timed out" + except FileNotFoundError: + logger.warning(f"Command not found: {cmd[0]}") + return False, "", f"Command not found: {cmd[0]}" + except Exception as e: + logger.error(f"Error running command {cmd}: {e}") + return False, "", str(e) + + # ========================================================================= + # Package Source Detection + # ========================================================================= + + def detect_package_sources(self, package_name: str) -> dict[str, PackageInfo | None]: + """ + Detect all available sources for a package. + + Args: + package_name: Name of the package to search for + + Returns: + Dictionary with keys 'deb', 'snap', 'flatpak' containing PackageInfo or None + """ + return { + "deb": self._check_deb_package(package_name), + "snap": self._check_snap_package(package_name), + "flatpak": self._check_flatpak_package(package_name), + } + + def _check_deb_package(self, package_name: str) -> PackageInfo | None: + """Check if package is available as .deb.""" + if not self._apt_available: + return None + + # First check if installed + success, stdout, _ = self._run_command( + ["dpkg-query", "-W", "-f=${Status}|${Version}|${Installed-Size}", package_name] + ) + + if success and "install ok installed" in stdout: + parts = stdout.strip().split("|") + version = parts[1] if len(parts) > 1 else "" + size = int(parts[2]) * 1024 if len(parts) > 2 and parts[2].isdigit() else 0 + return PackageInfo( + name=package_name, + format=PackageFormat.DEB, + version=version, + size=size, + installed=True, + ) + + # Check if available in apt cache + success, stdout, _ = self._run_command(["apt-cache", "show", package_name]) + if success and stdout: + # Check if this is a transitional/dummy package + if "dummy" in stdout.lower() or "transitional" in stdout.lower(): + # This is likely a snap redirect + return None + + version = "" + size = 0 + description = "" + for line in stdout.split("\n"): + if line.startswith("Version:"): + version = line.split(":", 1)[1].strip() + elif line.startswith("Installed-Size:"): + size_str = line.split(":", 1)[1].strip() + size = int(size_str) * 1024 if size_str.isdigit() else 0 + elif line.startswith("Description:"): + description = line.split(":", 1)[1].strip() + + return PackageInfo( + name=package_name, + format=PackageFormat.DEB, + version=version, + size=size, + installed=False, + description=description, + ) + + return None + + def _check_snap_package(self, package_name: str) -> PackageInfo | None: + """Check if package is available as snap.""" + if not self._snap_available: + return None + + # Check if installed + success, stdout, _ = self._run_command(["snap", "list", package_name]) + if success and package_name in stdout: + lines = stdout.strip().split("\n") + if len(lines) > 1: + parts = lines[1].split() + version = parts[1] if len(parts) > 1 else "" + return PackageInfo( + name=package_name, + format=PackageFormat.SNAP, + version=version, + installed=True, + ) + + # Check if available in snap store + success, stdout, _ = self._run_command(["snap", "info", package_name]) + if success and stdout: + version = "" + description = "" + for line in stdout.split("\n"): + if line.startswith("stable:"): + version = line.split()[1] if len(line.split()) > 1 else "" + elif line.startswith("summary:"): + description = line.split(":", 1)[1].strip() + + return PackageInfo( + name=package_name, + format=PackageFormat.SNAP, + version=version, + installed=False, + description=description, + ) + + return None + + def _check_flatpak_package(self, package_name: str) -> PackageInfo | None: + """Check if package is available as flatpak.""" + if not self._flatpak_available: + return None + + # Check if installed (by partial match) + success, stdout, _ = self._run_command( + ["flatpak", "list", "--app", "--columns=application,version"] + ) + if success: + for line in stdout.strip().split("\n"): + if package_name.lower() in line.lower(): + parts = line.split("\t") + app_id = parts[0] if parts else "" + version = parts[1] if len(parts) > 1 else "" + return PackageInfo( + name=app_id, + format=PackageFormat.FLATPAK, + version=version, + installed=True, + ) + + # Search in remote repos + success, stdout, _ = self._run_command( + ["flatpak", "search", package_name, "--columns=application,version,description"] + ) + if success and stdout: + lines = stdout.strip().split("\n") + for line in lines: + # Skip empty lines and "No matches" messages + if not line or "no matches" in line.lower() or "no results" in line.lower(): + continue + parts = line.split("\t") + # Validate: app_id should look like org.something.App + app_id = parts[0] if parts else "" + if app_id and "." in app_id: + version = parts[1] if len(parts) > 1 else "" + description = parts[2] if len(parts) > 2 else "" + return PackageInfo( + name=app_id, + format=PackageFormat.FLATPAK, + version=version, + installed=False, + description=description, + ) + + return None + + # ========================================================================= + # Installed Package Listing + # ========================================================================= + + def list_installed_packages( + self, format_filter: PackageFormat | None = None + ) -> dict[str, list[PackageInfo]]: + """ + List all installed packages, optionally filtered by format. + + Args: + format_filter: Optional filter to show only packages of specific format + + Returns: + Dictionary with keys 'deb', 'snap', 'flatpak' containing lists of PackageInfo + """ + result: dict[str, list[PackageInfo]] = {"deb": [], "snap": [], "flatpak": []} + + if format_filter is None or format_filter == PackageFormat.DEB: + result["deb"] = self._list_deb_packages() + + if format_filter is None or format_filter == PackageFormat.SNAP: + result["snap"] = self._list_snap_packages() + + if format_filter is None or format_filter == PackageFormat.FLATPAK: + result["flatpak"] = self._list_flatpak_packages() + + return result + + def _list_deb_packages(self) -> list[PackageInfo]: + """List installed .deb packages.""" + if not self._apt_available: + return [] + + success, stdout, _ = self._run_command( + ["dpkg-query", "-W", "-f=${Package}|${Version}|${Installed-Size}\n"] + ) + + packages = [] + if success: + for line in stdout.strip().split("\n"): + if not line: + continue + parts = line.split("|") + name = parts[0] + version = parts[1] if len(parts) > 1 else "" + size = int(parts[2]) * 1024 if len(parts) > 2 and parts[2].isdigit() else 0 + packages.append( + PackageInfo( + name=name, + format=PackageFormat.DEB, + version=version, + size=size, + installed=True, + ) + ) + + return packages + + def _list_snap_packages(self) -> list[PackageInfo]: + """List installed snap packages.""" + if not self._snap_available: + return [] + + success, stdout, _ = self._run_command(["snap", "list"]) + + packages = [] + if success: + lines = stdout.strip().split("\n") + for line in lines[1:]: # Skip header + if not line: + continue + parts = line.split() + if len(parts) >= 2: + name = parts[0] + version = parts[1] + # Parse size from notes column if available + packages.append( + PackageInfo( + name=name, + format=PackageFormat.SNAP, + version=version, + installed=True, + ) + ) + + return packages + + def _list_flatpak_packages(self) -> list[PackageInfo]: + """List installed flatpak packages.""" + if not self._flatpak_available: + return [] + + success, stdout, _ = self._run_command( + ["flatpak", "list", "--app", "--columns=application,version,size"] + ) + + packages = [] + if success: + for line in stdout.strip().split("\n"): + if not line: + continue + parts = line.split("\t") + app_id = parts[0] if parts else "" + version = parts[1] if len(parts) > 1 else "" + size_str = parts[2] if len(parts) > 2 else "0" + # Parse size (e.g., "1.2 GB" -> bytes) + size = self._parse_size_string(size_str) + packages.append( + PackageInfo( + name=app_id, + format=PackageFormat.FLATPAK, + version=version, + size=size, + installed=True, + ) + ) + + return packages + + def _parse_size_string(self, size_str: str) -> int: + """Parse a human-readable size string to bytes.""" + size_str = size_str.strip() + if not size_str: + return 0 + + multipliers = { + "B": 1, + "KB": 1024, + "MB": 1024**2, + "GB": 1024**3, + "kB": 1024, + "mB": 1024**2, + "gB": 1024**3, + } + + match = re.match(r"([\d.]+)\s*([A-Za-z]+)", size_str) + if match: + value = float(match.group(1)) + unit = match.group(2) + return int(value * multipliers.get(unit, 1)) + + return 0 + + # ========================================================================= + # Package Comparison + # ========================================================================= + + def compare_package_options(self, package_name: str) -> dict[str, Any]: + """ + Compare a package across all available formats. + + Args: + package_name: Name of the package to compare + + Returns: + Dictionary with comparison data including versions, sizes, permissions + """ + sources = self.detect_package_sources(package_name) + + comparison = { + "package_name": package_name, + "available_formats": [], + "installed_as": None, + "comparison": {}, + } + + for format_name, info in sources.items(): + if info is not None: + comparison["available_formats"].append(format_name) + comparison["comparison"][format_name] = { + "version": info.version, + "size": info.size, + "installed": info.installed, + "description": info.description, + } + if info.installed: + comparison["installed_as"] = format_name + + return comparison + + # ========================================================================= + # Permission Management + # ========================================================================= + + def list_snap_permissions(self, snap_name: str) -> dict[str, list[dict[str, str]]]: + """ + List permissions/interfaces for a snap package. + + Args: + snap_name: Name of the snap package + + Returns: + Dictionary with 'connected' and 'available' interface lists + """ + if not self._snap_available: + return {"error": "Snap is not available on this system"} + + success, stdout, _ = self._run_command(["snap", "connections", snap_name]) + + result: dict[str, list[dict[str, str]]] = {"connected": [], "available": []} + + if success: + lines = stdout.strip().split("\n") + for line in lines[1:]: # Skip header + if not line: + continue + parts = line.split() + if len(parts) >= 4: + interface = parts[0] + plug = parts[1] + slot = parts[2] + notes = parts[3] if len(parts) > 3 else "" + + entry = { + "interface": interface, + "plug": plug, + "slot": slot, + "notes": notes, + } + + if slot != "-": + result["connected"].append(entry) + else: + result["available"].append(entry) + + return result + + def list_flatpak_permissions(self, app_id: str) -> dict[str, Any]: + """ + List permissions for a flatpak application. + + Args: + app_id: Flatpak application ID (e.g., org.mozilla.firefox) + + Returns: + Dictionary with permission categories and values + """ + if not self._flatpak_available: + return {"error": "Flatpak is not available on this system"} + + success, stdout, _ = self._run_command( + ["flatpak", "info", "--show-permissions", app_id] + ) + + if not success: + return {"error": f"Could not get permissions for {app_id}"} + + permissions: dict[str, Any] = {} + current_section = "" + + for line in stdout.strip().split("\n"): + if not line: + continue + + if line.startswith("[") and line.endswith("]"): + current_section = line[1:-1] + permissions[current_section] = {} + elif "=" in line and current_section: + key, value = line.split("=", 1) + permissions[current_section][key] = value + elif current_section: + # Single value without key + if current_section not in permissions: + permissions[current_section] = [] + if isinstance(permissions[current_section], list): + permissions[current_section].append(line) + + return permissions + + def modify_snap_permission( + self, snap_name: str, interface: str, action: str + ) -> tuple[bool, str]: + """ + Modify a snap permission (connect/disconnect an interface). + + Args: + snap_name: Name of the snap package + interface: Interface to modify + action: 'connect' or 'disconnect' + + Returns: + Tuple of (success, message) + """ + if action not in ("connect", "disconnect"): + return False, f"Invalid action: {action}. Use 'connect' or 'disconnect'" + + if not self._snap_available: + return False, "Snap is not available on this system" + + cmd = ["snap", action, f"{snap_name}:{interface}"] + success, stdout, stderr = self._run_command(cmd) + + if success: + return True, f"Successfully {action}ed {interface} for {snap_name}" + else: + return False, f"Failed to {action} {interface}: {stderr}" + + def modify_flatpak_permission( + self, app_id: str, permission: str, value: str + ) -> tuple[bool, str]: + """ + Modify a flatpak permission using flatpak override. + + Args: + app_id: Flatpak application ID + permission: Permission flag (e.g., '--filesystem=home') + value: Value for the permission + + Returns: + Tuple of (success, message) + """ + if not self._flatpak_available: + return False, "Flatpak is not available on this system" + + # Construct the override command + cmd = ["flatpak", "override", "--user", f"--{permission}={value}", app_id] + success, stdout, stderr = self._run_command(cmd) + + if success: + return True, f"Successfully modified {permission} for {app_id}" + else: + return False, f"Failed to modify permission: {stderr}" + + # ========================================================================= + # Snap Redirect Detection & Management + # ========================================================================= + + def check_snap_redirects(self) -> list[dict[str, str]]: + """ + Detect packages that redirect apt install to snap. + + Returns: + List of dictionaries with 'package' and 'reason' keys + """ + redirects = [] + + # Check for known transitional packages + for pkg in self.KNOWN_SNAP_REDIRECTS: + success, stdout, _ = self._run_command(["apt-cache", "show", pkg]) + if success: + if "dummy" in stdout.lower() or "transitional" in stdout.lower(): + redirects.append({ + "package": pkg, + "type": "transitional", + "reason": "APT package is a dummy that installs snap version", + }) + elif "snap" in stdout.lower(): + redirects.append({ + "package": pkg, + "type": "snap_meta", + "reason": "Package description mentions snap installation", + }) + + # Check snap preference config file + if os.path.exists(self.SNAP_REDIRECT_CONFIG): + redirects.append({ + "package": "system", + "type": "config", + "reason": f"Snap preference config exists: {self.SNAP_REDIRECT_CONFIG}", + }) + + return redirects + + def disable_snap_redirects(self, backup: bool = True) -> tuple[bool, str]: + """ + Disable snap redirects by moving the APT config file. + + WARNING: This modifies system configuration and requires root. + + Args: + backup: Whether to create a backup of the config file + + Returns: + Tuple of (success, message) + """ + config_path = Path(self.SNAP_REDIRECT_CONFIG) + + if not config_path.exists(): + return True, "Snap redirect config not found - redirects may already be disabled" + + # Check if we have write permission + if not os.access(config_path.parent, os.W_OK): + return False, "Permission denied. Run with sudo to disable snap redirects." + + try: + if backup: + backup_path = config_path.with_suffix(".conf.disabled") + shutil.move(str(config_path), str(backup_path)) + return True, f"Snap redirects disabled. Backup saved to: {backup_path}" + else: + config_path.unlink() + return True, "Snap redirects disabled. Config file removed." + except Exception as e: + return False, f"Failed to disable snap redirects: {e}" + + def restore_snap_redirects(self) -> tuple[bool, str]: + """ + Restore snap redirects from backup. + + Returns: + Tuple of (success, message) + """ + backup_path = Path(self.SNAP_REDIRECT_CONFIG + ".disabled") + config_path = Path(self.SNAP_REDIRECT_CONFIG) + + if not backup_path.exists(): + return False, "No backup found to restore" + + if config_path.exists(): + return False, "Snap redirect config already exists" + + try: + shutil.move(str(backup_path), str(config_path)) + return True, "Snap redirects restored from backup" + except Exception as e: + return False, f"Failed to restore snap redirects: {e}" + + # ========================================================================= + # Storage Analysis + # ========================================================================= + + def analyze_storage(self) -> StorageAnalysis: + """ + Analyze disk usage by package format. + + Returns: + StorageAnalysis with totals and top packages per format + """ + analysis = StorageAnalysis() + + # Analyze DEB packages + deb_packages = self._list_deb_packages() + for pkg in deb_packages: + analysis.deb_total += pkg.size + analysis.deb_packages.append((pkg.name, pkg.size)) + + # Sort and keep top 10 + analysis.deb_packages.sort(key=lambda x: x[1], reverse=True) + analysis.deb_packages = analysis.deb_packages[:10] + + # Analyze Snap packages + if self._snap_available: + success, stdout, _ = self._run_command(["snap", "list", "--all"]) + if success: + # Get snap directory size + snap_dir = Path("/var/lib/snapd/snaps") + if snap_dir.exists(): + for snap_file in snap_dir.glob("*.snap"): + size = snap_file.stat().st_size + name = snap_file.stem.rsplit("_", 1)[0] + analysis.snap_total += size + analysis.snap_packages.append((name, size)) + + analysis.snap_packages.sort(key=lambda x: x[1], reverse=True) + # Deduplicate (keep largest per name) and limit to top 10 + seen = set() + deduped = [] + for name, size in analysis.snap_packages: + if name not in seen: + seen.add(name) + deduped.append((name, size)) + analysis.snap_packages = deduped[:10] + + # Analyze Flatpak packages + flatpak_packages = self._list_flatpak_packages() + for pkg in flatpak_packages: + analysis.flatpak_total += pkg.size + analysis.flatpak_packages.append((pkg.name, pkg.size)) + + analysis.flatpak_packages.sort(key=lambda x: x[1], reverse=True) + analysis.flatpak_packages = analysis.flatpak_packages[:10] + + return analysis + + def format_storage_analysis(self, analysis: StorageAnalysis) -> str: + """ + Format storage analysis for display. + + Args: + analysis: StorageAnalysis object + + Returns: + Formatted string for display + """ + lines = [] + lines.append("=" * 60) + lines.append("Storage Analysis by Package Format") + lines.append("=" * 60) + + def format_size(size: int) -> str: + if size >= 1024**3: + return f"{size / 1024**3:.2f} GB" + elif size >= 1024**2: + return f"{size / 1024**2:.2f} MB" + elif size >= 1024: + return f"{size / 1024:.2f} KB" + return f"{size} B" + + # Summary + total = analysis.deb_total + analysis.snap_total + analysis.flatpak_total + lines.append(f"\nTotal Package Storage: {format_size(total)}") + lines.append("-" * 40) + lines.append(f" DEB/APT: {format_size(analysis.deb_total)}") + lines.append(f" Snap: {format_size(analysis.snap_total)}") + lines.append(f" Flatpak: {format_size(analysis.flatpak_total)}") + + # Top packages per format + if analysis.snap_packages: + lines.append("\nTop Snap Packages:") + for name, size in analysis.snap_packages[:5]: + lines.append(f" {name}: {format_size(size)}") + + if analysis.flatpak_packages: + lines.append("\nTop Flatpak Packages:") + for name, size in analysis.flatpak_packages[:5]: + lines.append(f" {name}: {format_size(size)}") + + return "\n".join(lines) diff --git a/docs/UNIFIED_PACKAGE_MANAGER.md b/docs/UNIFIED_PACKAGE_MANAGER.md new file mode 100644 index 00000000..9552c20d --- /dev/null +++ b/docs/UNIFIED_PACKAGE_MANAGER.md @@ -0,0 +1,116 @@ +# Unified Package Manager + +> Addresses Issue [#450](https://github.com/cortexlinux/cortex/issues/450): Snap/Flatpak Unified Manager + +## Overview + +The Unified Package Manager provides transparency and control over package sources on Ubuntu/Debian systems. It helps users understand whether packages are installed as .deb, snap, or flatpak, and provides tools to manage permissions and disable "forced" snap installations. + +## Features + +1. **Package Source Detection** - See where a package is available (deb/snap/flatpak) +2. **Package Comparison** - Compare versions and sizes across formats +3. **Installed Package Listing** - List packages by format +4. **Permission Management** - View and modify snap/flatpak permissions (like Flatseal) +5. **Snap Redirect Detection** - Find packages that redirect apt→snap +6. **Storage Analysis** - See disk usage breakdown by format + +## Usage + +### Check Package Sources + +```bash +# See where a package is available +cortex pkg sources firefox + +# Output shows availability in each format with version info +``` + +### Compare Packages + +```bash +# Compare package across all formats +cortex pkg compare firefox + +# Shows versions, sizes, and installation status for each format +``` + +### List Installed Packages + +```bash +# List all packages by format +cortex pkg list + +# Filter by format +cortex pkg list --format snap +cortex pkg list --format flatpak +cortex pkg list --format deb +``` + +### View Permissions + +```bash +# View snap permissions (interfaces) +cortex pkg permissions firefox + +# View flatpak permissions +cortex pkg permissions org.mozilla.firefox --format flatpak +``` + +### Analyze Storage + +```bash +# See storage breakdown by package format +cortex pkg storage + +# Shows total size and top packages for each format +``` + +### Manage Snap Redirects + +```bash +# Check for snap redirects (apt packages that secretly install snaps) +cortex pkg snap-redirects + +# Disable snap redirects (requires sudo) +sudo cortex pkg snap-redirects --disable +``` + +## Technical Details + +### Module: `cortex/unified_package_manager.py` + +Core classes: + +- `UnifiedPackageManager` - Main manager class +- `PackageFormat` - Enum for deb/snap/flatpak +- `PackageInfo` - Package metadata dataclass +- `StorageAnalysis` - Storage breakdown dataclass + +### CLI Commands + +All commands are under the `cortex pkg` namespace: + +| Command | Description | +| --------------------------- | ----------------------- | +| `pkg sources ` | Show available sources | +| `pkg compare ` | Compare across formats | +| `pkg list [--format]` | List installed packages | +| `pkg permissions ` | View permissions | +| `pkg storage` | Storage analysis | +| `pkg snap-redirects` | Check/disable redirects | + +## Safety Notes + +> [!WARNING] +> The `--disable` option for snap-redirects modifies system configuration at `/etc/apt/apt.conf.d/20snapd.conf`. A backup is created automatically, and the change can be reverted by restoring the backup. + +## Testing + +Run unit tests: + +```bash +python -m pytest tests/test_unified_package_manager.py -v +``` + +The test suite includes 30+ test cases covering all functionality with mocked subprocess calls. diff --git a/tests/test_unified_package_manager.py b/tests/test_unified_package_manager.py new file mode 100644 index 00000000..69c0530d --- /dev/null +++ b/tests/test_unified_package_manager.py @@ -0,0 +1,529 @@ +#!/usr/bin/env python3 +""" +Unit tests for the Unified Package Manager. + +Tests cover all major functionality including: +- Package source detection (deb, snap, flatpak) +- Installed package listing +- Package comparison +- Permission management +- Snap redirect detection +- Storage analysis +""" + +import os +import sys +import unittest +from unittest.mock import MagicMock, patch + +# Add project root to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from cortex.unified_package_manager import ( + PackageFormat, + PackageInfo, + StorageAnalysis, + UnifiedPackageManager, +) + + +class TestUnifiedPackageManager(unittest.TestCase): + """Test cases for UnifiedPackageManager class.""" + + def setUp(self): + """Set up test fixtures.""" + # Mock command availability checks + with patch.object(UnifiedPackageManager, "_check_command_available") as mock_check: + mock_check.return_value = True + self.upm = UnifiedPackageManager() + + # ========================================================================= + # Package Source Detection Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_deb_package_installed(self, mock_run): + """Test detection of installed deb package.""" + mock_run.return_value = (True, "install ok installed|1.0.0|1024", "") + + result = self.upm._check_deb_package("test-package") + + self.assertIsNotNone(result) + self.assertEqual(result.name, "test-package") + self.assertEqual(result.format, PackageFormat.DEB) + self.assertEqual(result.version, "1.0.0") + self.assertTrue(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_deb_package_available(self, mock_run): + """Test detection of available but not installed deb package.""" + # First call returns not installed, second returns apt-cache info + mock_run.side_effect = [ + (False, "", ""), + (True, "Package: test-package\nVersion: 2.0.0\nDescription: A test package", ""), + ] + + result = self.upm._check_deb_package("test-package") + + self.assertIsNotNone(result) + self.assertEqual(result.version, "2.0.0") + self.assertFalse(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_deb_transitional_package(self, mock_run): + """Test detection of transitional/dummy packages returns None.""" + mock_run.side_effect = [ + (False, "", ""), + (True, "Package: firefox\nDescription: dummy package for snap", ""), + ] + + result = self.upm._check_deb_package("firefox") + + self.assertIsNone(result) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_snap_package_installed(self, mock_run): + """Test detection of installed snap package.""" + mock_run.return_value = ( + True, + "Name Version Rev Tracking Publisher Notes\nfirefox 120.0 1234 latest mozilla -", + "", + ) + + result = self.upm._check_snap_package("firefox") + + self.assertIsNotNone(result) + self.assertEqual(result.name, "firefox") + self.assertEqual(result.format, PackageFormat.SNAP) + self.assertTrue(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_snap_package_available(self, mock_run): + """Test detection of available snap package.""" + mock_run.side_effect = [ + (False, "", ""), + (True, "name: test-snap\nsummary: A test snap\nstable: 1.0.0 2024-01-01", ""), + ] + + result = self.upm._check_snap_package("test-snap") + + self.assertIsNotNone(result) + self.assertEqual(result.format, PackageFormat.SNAP) + self.assertFalse(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_flatpak_package_installed(self, mock_run): + """Test detection of installed flatpak package.""" + mock_run.return_value = ( + True, + "org.mozilla.firefox\t120.0", + "", + ) + + result = self.upm._check_flatpak_package("firefox") + + self.assertIsNotNone(result) + self.assertEqual(result.name, "org.mozilla.firefox") + self.assertEqual(result.format, PackageFormat.FLATPAK) + self.assertTrue(result.installed) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_detect_flatpak_package_available(self, mock_run): + """Test detection of available flatpak package.""" + mock_run.side_effect = [ + (True, "", ""), # Not in installed list + (True, "org.test.App\t1.0.0\tA test application", ""), + ] + + result = self.upm._check_flatpak_package("test") + + self.assertIsNotNone(result) + self.assertEqual(result.name, "org.test.App") + + @patch.object(UnifiedPackageManager, "_check_deb_package") + @patch.object(UnifiedPackageManager, "_check_snap_package") + @patch.object(UnifiedPackageManager, "_check_flatpak_package") + def test_detect_package_sources(self, mock_flatpak, mock_snap, mock_deb): + """Test combined package source detection.""" + mock_deb.return_value = PackageInfo("test", PackageFormat.DEB, "1.0") + mock_snap.return_value = PackageInfo("test", PackageFormat.SNAP, "2.0") + mock_flatpak.return_value = None + + result = self.upm.detect_package_sources("test") + + self.assertIn("deb", result) + self.assertIn("snap", result) + self.assertIn("flatpak", result) + self.assertIsNotNone(result["deb"]) + self.assertIsNotNone(result["snap"]) + self.assertIsNone(result["flatpak"]) + + # ========================================================================= + # Package Listing Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_deb_packages(self, mock_run): + """Test listing installed deb packages.""" + mock_run.return_value = ( + True, + "package1|1.0.0|1024\npackage2|2.0.0|2048\n", + "", + ) + + result = self.upm._list_deb_packages() + + self.assertEqual(len(result), 2) + self.assertEqual(result[0].name, "package1") + self.assertEqual(result[1].name, "package2") + self.assertEqual(result[0].format, PackageFormat.DEB) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_snap_packages(self, mock_run): + """Test listing installed snap packages.""" + mock_run.return_value = ( + True, + "Name Version Rev Tracking Publisher Notes\ncore 16-2.58 14447 latest/stable canonical✓ -\nfirefox 120.0 1234 latest mozilla -", + "", + ) + + result = self.upm._list_snap_packages() + + self.assertEqual(len(result), 2) + self.assertEqual(result[0].name, "core") + self.assertEqual(result[1].name, "firefox") + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_flatpak_packages(self, mock_run): + """Test listing installed flatpak packages.""" + mock_run.return_value = ( + True, + "org.mozilla.firefox\t120.0\t500 MB", + "", + ) + + result = self.upm._list_flatpak_packages() + + self.assertEqual(len(result), 1) + self.assertEqual(result[0].name, "org.mozilla.firefox") + self.assertEqual(result[0].format, PackageFormat.FLATPAK) + + @patch.object(UnifiedPackageManager, "_list_deb_packages") + @patch.object(UnifiedPackageManager, "_list_snap_packages") + @patch.object(UnifiedPackageManager, "_list_flatpak_packages") + def test_list_installed_packages_all(self, mock_flatpak, mock_snap, mock_deb): + """Test listing all installed packages.""" + mock_deb.return_value = [PackageInfo("pkg1", PackageFormat.DEB)] + mock_snap.return_value = [PackageInfo("pkg2", PackageFormat.SNAP)] + mock_flatpak.return_value = [PackageInfo("pkg3", PackageFormat.FLATPAK)] + + result = self.upm.list_installed_packages() + + self.assertEqual(len(result["deb"]), 1) + self.assertEqual(len(result["snap"]), 1) + self.assertEqual(len(result["flatpak"]), 1) + + @patch.object(UnifiedPackageManager, "_list_deb_packages") + @patch.object(UnifiedPackageManager, "_list_snap_packages") + @patch.object(UnifiedPackageManager, "_list_flatpak_packages") + def test_list_installed_packages_filtered(self, mock_flatpak, mock_snap, mock_deb): + """Test listing packages with format filter.""" + mock_snap.return_value = [PackageInfo("snap-pkg", PackageFormat.SNAP)] + + result = self.upm.list_installed_packages(format_filter=PackageFormat.SNAP) + + self.assertEqual(len(result["snap"]), 1) + self.assertEqual(len(result["deb"]), 0) + self.assertEqual(len(result["flatpak"]), 0) + + # ========================================================================= + # Package Comparison Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "detect_package_sources") + def test_compare_package_options(self, mock_detect): + """Test package comparison across formats.""" + mock_detect.return_value = { + "deb": PackageInfo("test", PackageFormat.DEB, "1.0", installed=True), + "snap": PackageInfo("test", PackageFormat.SNAP, "2.0"), + "flatpak": None, + } + + result = self.upm.compare_package_options("test") + + self.assertEqual(result["package_name"], "test") + self.assertIn("deb", result["available_formats"]) + self.assertIn("snap", result["available_formats"]) + self.assertNotIn("flatpak", result["available_formats"]) + self.assertEqual(result["installed_as"], "deb") + + # ========================================================================= + # Permission Management Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_snap_permissions(self, mock_run): + """Test listing snap permissions.""" + mock_run.return_value = ( + True, + "Interface Plug Slot Notes\naudio-playback firefox:audio-playback :audio-playback -\nnetwork firefox:network - -", + "", + ) + + result = self.upm.list_snap_permissions("firefox") + + self.assertIn("connected", result) + self.assertIn("available", result) + self.assertTrue(len(result["connected"]) >= 1) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_flatpak_permissions(self, mock_run): + """Test listing flatpak permissions.""" + mock_run.return_value = ( + True, + "[Context]\nshared=network;ipc;\nfilesystems=xdg-download\n\n[Session Bus Policy]\norg.freedesktop.Notifications=talk", + "", + ) + + result = self.upm.list_flatpak_permissions("org.test.App") + + self.assertIn("Context", result) + self.assertIn("shared", result["Context"]) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_modify_snap_permission_connect(self, mock_run): + """Test connecting a snap interface.""" + mock_run.return_value = (True, "", "") + + success, message = self.upm.modify_snap_permission("firefox", "camera", "connect") + + self.assertTrue(success) + self.assertIn("connect", message.lower()) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_modify_snap_permission_disconnect(self, mock_run): + """Test disconnecting a snap interface.""" + mock_run.return_value = (True, "", "") + + success, message = self.upm.modify_snap_permission("firefox", "camera", "disconnect") + + self.assertTrue(success) + + def test_modify_snap_permission_invalid_action(self): + """Test invalid action for snap permission modification.""" + success, message = self.upm.modify_snap_permission("firefox", "camera", "invalid") + + self.assertFalse(success) + self.assertIn("Invalid action", message) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_modify_flatpak_permission(self, mock_run): + """Test modifying flatpak permission.""" + mock_run.return_value = (True, "", "") + + success, message = self.upm.modify_flatpak_permission( + "org.test.App", "filesystem", "home" + ) + + self.assertTrue(success) + + # ========================================================================= + # Snap Redirect Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_run_command") + @patch("os.path.exists") + def test_check_snap_redirects(self, mock_exists, mock_run): + """Test detection of snap redirects.""" + mock_run.return_value = ( + True, + "Package: firefox\nDescription: dummy transitional package", + "", + ) + mock_exists.return_value = True + + result = self.upm.check_snap_redirects() + + self.assertTrue(len(result) >= 1) + # Should find Firefox as transitional + firefox_redirect = next((r for r in result if r["package"] == "firefox"), None) + self.assertIsNotNone(firefox_redirect) + + @patch("pathlib.Path.exists") + @patch("os.access") + @patch("shutil.move") + def test_disable_snap_redirects_success(self, mock_move, mock_access, mock_exists): + """Test successful disabling of snap redirects.""" + mock_exists.return_value = True + mock_access.return_value = True + mock_move.return_value = None + + success, message = self.upm.disable_snap_redirects() + + self.assertTrue(success) + self.assertIn("disabled", message.lower()) + + @patch("pathlib.Path.exists") + def test_disable_snap_redirects_not_found(self, mock_exists): + """Test disabling when config doesn't exist.""" + mock_exists.return_value = False + + success, message = self.upm.disable_snap_redirects() + + self.assertTrue(success) + self.assertIn("not found", message.lower()) + + @patch("pathlib.Path.exists") + @patch("os.access") + def test_disable_snap_redirects_permission_denied(self, mock_access, mock_exists): + """Test disabling without root permissions.""" + mock_exists.return_value = True + mock_access.return_value = False + + success, message = self.upm.disable_snap_redirects() + + self.assertFalse(success) + self.assertIn("Permission denied", message) + + # ========================================================================= + # Storage Analysis Tests + # ========================================================================= + + @patch.object(UnifiedPackageManager, "_list_deb_packages") + @patch.object(UnifiedPackageManager, "_list_flatpak_packages") + @patch.object(UnifiedPackageManager, "_run_command") + def test_analyze_storage(self, mock_run, mock_flatpak, mock_deb): + """Test storage analysis.""" + mock_deb.return_value = [ + PackageInfo("pkg1", PackageFormat.DEB, "1.0", size=1024000), + PackageInfo("pkg2", PackageFormat.DEB, "2.0", size=2048000), + ] + mock_flatpak.return_value = [ + PackageInfo("org.test.App", PackageFormat.FLATPAK, "1.0", size=500000000), + ] + mock_run.return_value = (True, "", "") + + # Mock Path.exists and glob for snap analysis + with patch("pathlib.Path.exists", return_value=False): + result = self.upm.analyze_storage() + + self.assertIsInstance(result, StorageAnalysis) + self.assertEqual(result.deb_total, 3072000) + self.assertEqual(result.flatpak_total, 500000000) + + def test_parse_size_string(self): + """Test parsing of size strings.""" + self.assertEqual(self.upm._parse_size_string("1.5 GB"), int(1.5 * 1024**3)) + self.assertEqual(self.upm._parse_size_string("500 MB"), int(500 * 1024**2)) + self.assertEqual(self.upm._parse_size_string("1024 KB"), int(1024 * 1024)) + self.assertEqual(self.upm._parse_size_string(""), 0) + + def test_format_storage_analysis(self): + """Test formatting of storage analysis.""" + analysis = StorageAnalysis( + deb_total=1024**3, # 1 GB + snap_total=512 * 1024**2, # 512 MB + flatpak_total=256 * 1024**2, # 256 MB + snap_packages=[("firefox", 200 * 1024**2)], + flatpak_packages=[("org.test.App", 256 * 1024**2)], + ) + + result = self.upm.format_storage_analysis(analysis) + + self.assertIn("Storage Analysis", result) + self.assertIn("DEB/APT", result) + self.assertIn("Snap", result) + self.assertIn("Flatpak", result) + self.assertIn("GB", result) # Should show GB for large sizes + + # ========================================================================= + # Error Handling Tests + # ========================================================================= + + def test_command_not_available(self): + """Test handling when commands are not available.""" + with patch.object(UnifiedPackageManager, "_check_command_available") as mock_check: + mock_check.return_value = False + upm = UnifiedPackageManager() + + self.assertFalse(upm._snap_available) + self.assertFalse(upm._flatpak_available) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_snap_permissions_unavailable(self, mock_run): + """Test listing permissions when snap unavailable.""" + self.upm._snap_available = False + + result = self.upm.list_snap_permissions("test") + + self.assertIn("error", result) + + @patch.object(UnifiedPackageManager, "_run_command") + def test_list_flatpak_permissions_unavailable(self, mock_run): + """Test listing permissions when flatpak unavailable.""" + self.upm._flatpak_available = False + + result = self.upm.list_flatpak_permissions("org.test.App") + + self.assertIn("error", result) + + +class TestPackageInfo(unittest.TestCase): + """Test cases for PackageInfo dataclass.""" + + def test_package_info_creation(self): + """Test creation of PackageInfo instance.""" + info = PackageInfo( + name="test-package", + format=PackageFormat.DEB, + version="1.0.0", + size=1024, + installed=True, + description="A test package", + ) + + self.assertEqual(info.name, "test-package") + self.assertEqual(info.format, PackageFormat.DEB) + self.assertEqual(info.version, "1.0.0") + self.assertEqual(info.size, 1024) + self.assertTrue(info.installed) + + def test_package_info_defaults(self): + """Test default values for PackageInfo.""" + info = PackageInfo(name="test", format=PackageFormat.SNAP) + + self.assertEqual(info.version, "") + self.assertEqual(info.size, 0) + self.assertFalse(info.installed) + self.assertEqual(info.permissions, []) + + +class TestStorageAnalysis(unittest.TestCase): + """Test cases for StorageAnalysis dataclass.""" + + def test_storage_analysis_creation(self): + """Test creation of StorageAnalysis instance.""" + analysis = StorageAnalysis( + deb_total=1024, + snap_total=2048, + flatpak_total=4096, + ) + + self.assertEqual(analysis.deb_total, 1024) + self.assertEqual(analysis.snap_total, 2048) + self.assertEqual(analysis.flatpak_total, 4096) + + def test_storage_analysis_defaults(self): + """Test default values for StorageAnalysis.""" + analysis = StorageAnalysis() + + self.assertEqual(analysis.deb_total, 0) + self.assertEqual(analysis.snap_total, 0) + self.assertEqual(analysis.flatpak_total, 0) + self.assertEqual(analysis.deb_packages, []) + self.assertEqual(analysis.snap_packages, []) + self.assertEqual(analysis.flatpak_packages, []) + + +if __name__ == "__main__": + unittest.main() From a6caab0abdb72f5bd412787768f0046677cd53d3 Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Mon, 12 Jan 2026 00:01:40 +0000 Subject: [PATCH 02/13] fix: add optional slot parameter to modify_snap_permission Addresses CodeRabbit feedback --- cortex/unified_package_manager.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py index 06fffa7d..4658b91f 100644 --- a/cortex/unified_package_manager.py +++ b/cortex/unified_package_manager.py @@ -182,7 +182,10 @@ def _check_deb_package(self, package_name: str) -> PackageInfo | None: version = line.split(":", 1)[1].strip() elif line.startswith("Installed-Size:"): size_str = line.split(":", 1)[1].strip() - size = int(size_str) * 1024 if size_str.isdigit() else 0 + try: + size = int(size_str) * 1024 + except ValueError: + size = 0 elif line.startswith("Description:"): description = line.split(":", 1)[1].strip() @@ -556,18 +559,28 @@ def list_flatpak_permissions(self, app_id: str) -> dict[str, Any]: return permissions def modify_snap_permission( - self, snap_name: str, interface: str, action: str + self, snap_name: str, interface: str, action: str, slot: str | None = None ) -> tuple[bool, str]: """ Modify a snap permission (connect/disconnect an interface). Args: snap_name: Name of the snap package - interface: Interface to modify + interface: Interface/plug to modify action: 'connect' or 'disconnect' + slot: Optional slot specification. If not provided, connects to the + system/core slot with matching interface name (e.g., :audio-playback). + For connecting to slots in other snaps, use format "snap-name:slot-name". Returns: Tuple of (success, message) + + Example: + # Connect to system slot (most common case) + modify_snap_permission("firefox", "camera", "connect") + + # Connect to specific snap's slot + modify_snap_permission("myapp", "content", "connect", "other-snap:data") """ if action not in ("connect", "disconnect"): return False, f"Invalid action: {action}. Use 'connect' or 'disconnect'" @@ -575,7 +588,13 @@ def modify_snap_permission( if not self._snap_available: return False, "Snap is not available on this system" - cmd = ["snap", action, f"{snap_name}:{interface}"] + # Build command with optional slot specification + plug_spec = f"{snap_name}:{interface}" + if slot: + cmd = ["snap", action, plug_spec, slot] + else: + cmd = ["snap", action, plug_spec] + success, stdout, stderr = self._run_command(cmd) if success: From c37662c24ad3f886cfa486888b88e695fac51b4d Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Mon, 12 Jan 2026 00:32:48 +0000 Subject: [PATCH 03/13] update unified_package_manager.py --- cortex/unified_package_manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py index 4658b91f..5e1a6e86 100644 --- a/cortex/unified_package_manager.py +++ b/cortex/unified_package_manager.py @@ -7,7 +7,6 @@ - Snap (Canonical's universal packages) - Flatpak (cross-distribution application packages) -Addresses issue #450: Snap/Flatpak confusion and transparency. """ import json From abc067c5d4552fa6c4e79b3a2c6b1a6e861bb4fc Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Tue, 13 Jan 2026 15:44:54 +0000 Subject: [PATCH 04/13] made some changes suggested by coderabbit ai --- cortex/unified_package_manager.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py index 5e1a6e86..96ff709e 100644 --- a/cortex/unified_package_manager.py +++ b/cortex/unified_package_manager.py @@ -206,17 +206,20 @@ def _check_snap_package(self, package_name: str) -> PackageInfo | None: # Check if installed success, stdout, _ = self._run_command(["snap", "list", package_name]) - if success and package_name in stdout: + if success: lines = stdout.strip().split("\n") - if len(lines) > 1: - parts = lines[1].split() - version = parts[1] if len(parts) > 1 else "" - return PackageInfo( - name=package_name, - format=PackageFormat.SNAP, - version=version, - installed=True, - ) + for line in lines[1:]: + if not line: + continue + parts = line.split() + if parts and parts[0] == package_name: + version = parts[1] if len(parts) > 1 else "" + return PackageInfo( + name=package_name, + format=PackageFormat.SNAP, + version=version, + installed=True, + ) # Check if available in snap store success, stdout, _ = self._run_command(["snap", "info", package_name]) @@ -483,7 +486,7 @@ def list_snap_permissions(self, snap_name: str) -> dict[str, list[dict[str, str] Dictionary with 'connected' and 'available' interface lists """ if not self._snap_available: - return {"error": "Snap is not available on this system"} + raise RuntimeError("Snap is not available on this system") success, stdout, _ = self._run_command(["snap", "connections", snap_name]) From f0e4905a026e1ab28dc9ca6c2930917369cb5667 Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Tue, 13 Jan 2026 15:50:33 +0000 Subject: [PATCH 05/13] fixed linting issues and fix broken test --- cortex/cli.py | 4 +-- cortex/unified_package_manager.py | 44 ++++++++++++++------------- tests/test_unified_package_manager.py | 9 +++--- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index d7247119..53740b5d 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2467,9 +2467,7 @@ def main(): # -------------------------- # --- Unified Package Manager Commands (Issue #450) --- - pkg_parser = subparsers.add_parser( - "pkg", help="Unified package manager (snap/flatpak/deb)" - ) + pkg_parser = subparsers.add_parser("pkg", help="Unified package manager (snap/flatpak/deb)") pkg_subs = pkg_parser.add_subparsers(dest="pkg_action", help="Package manager actions") # pkg sources diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py index 96ff709e..9746e893 100644 --- a/cortex/unified_package_manager.py +++ b/cortex/unified_package_manager.py @@ -92,9 +92,7 @@ def _check_command_available(self, command: str) -> bool: """Check if a command is available on the system.""" return shutil.which(command) is not None - def _run_command( - self, cmd: list[str], timeout: int = 30 - ) -> tuple[bool, str, str]: + def _run_command(self, cmd: list[str], timeout: int = 30) -> tuple[bool, str, str]: """ Run a shell command and return success status, stdout, stderr. @@ -531,9 +529,7 @@ def list_flatpak_permissions(self, app_id: str) -> dict[str, Any]: if not self._flatpak_available: return {"error": "Flatpak is not available on this system"} - success, stdout, _ = self._run_command( - ["flatpak", "info", "--show-permissions", app_id] - ) + success, stdout, _ = self._run_command(["flatpak", "info", "--show-permissions", app_id]) if not success: return {"error": f"Could not get permissions for {app_id}"} @@ -648,25 +644,31 @@ def check_snap_redirects(self) -> list[dict[str, str]]: success, stdout, _ = self._run_command(["apt-cache", "show", pkg]) if success: if "dummy" in stdout.lower() or "transitional" in stdout.lower(): - redirects.append({ - "package": pkg, - "type": "transitional", - "reason": "APT package is a dummy that installs snap version", - }) + redirects.append( + { + "package": pkg, + "type": "transitional", + "reason": "APT package is a dummy that installs snap version", + } + ) elif "snap" in stdout.lower(): - redirects.append({ - "package": pkg, - "type": "snap_meta", - "reason": "Package description mentions snap installation", - }) + redirects.append( + { + "package": pkg, + "type": "snap_meta", + "reason": "Package description mentions snap installation", + } + ) # Check snap preference config file if os.path.exists(self.SNAP_REDIRECT_CONFIG): - redirects.append({ - "package": "system", - "type": "config", - "reason": f"Snap preference config exists: {self.SNAP_REDIRECT_CONFIG}", - }) + redirects.append( + { + "package": "system", + "type": "config", + "reason": f"Snap preference config exists: {self.SNAP_REDIRECT_CONFIG}", + } + ) return redirects diff --git a/tests/test_unified_package_manager.py b/tests/test_unified_package_manager.py index 69c0530d..daed24b7 100644 --- a/tests/test_unified_package_manager.py +++ b/tests/test_unified_package_manager.py @@ -321,9 +321,7 @@ def test_modify_flatpak_permission(self, mock_run): """Test modifying flatpak permission.""" mock_run.return_value = (True, "", "") - success, message = self.upm.modify_flatpak_permission( - "org.test.App", "filesystem", "home" - ) + success, message = self.upm.modify_flatpak_permission("org.test.App", "filesystem", "home") self.assertTrue(success) @@ -454,9 +452,10 @@ def test_list_snap_permissions_unavailable(self, mock_run): """Test listing permissions when snap unavailable.""" self.upm._snap_available = False - result = self.upm.list_snap_permissions("test") + with self.assertRaises(RuntimeError) as context: + self.upm.list_snap_permissions("test") - self.assertIn("error", result) + self.assertIn("not available", str(context.exception)) @patch.object(UnifiedPackageManager, "_run_command") def test_list_flatpak_permissions_unavailable(self, mock_run): From ab8c618042966ccadf05c6a99eca17d44a4a9962 Mon Sep 17 00:00:00 2001 From: PIYUSH RATHORE <163632958+Piyushrathoree@users.noreply.github.com> Date: Wed, 14 Jan 2026 01:21:43 +0530 Subject: [PATCH 06/13] Update cortex/unified_package_manager.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- cortex/unified_package_manager.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py index 9746e893..ac6dd285 100644 --- a/cortex/unified_package_manager.py +++ b/cortex/unified_package_manager.py @@ -537,25 +537,27 @@ def list_flatpak_permissions(self, app_id: str) -> dict[str, Any]: permissions: dict[str, Any] = {} current_section = "" + for line in stdout.strip().split("\n"): for line in stdout.strip().split("\n"): if not line: continue if line.startswith("[") and line.endswith("]"): current_section = line[1:-1] - permissions[current_section] = {} elif "=" in line and current_section: - key, value = line.split("=", 1) - permissions[current_section][key] = value + if current_section not in permissions: + permissions[current_section] = {} + + if isinstance(permissions[current_section], dict): + key, value = line.split("=", 1) + permissions[current_section][key] = value elif current_section: - # Single value without key if current_section not in permissions: permissions[current_section] = [] + if isinstance(permissions[current_section], list): permissions[current_section].append(line) - return permissions - def modify_snap_permission( self, snap_name: str, interface: str, action: str, slot: str | None = None ) -> tuple[bool, str]: From b46edbfd110f89946993f536b7cc50e4e9b01959 Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Tue, 13 Jan 2026 21:08:43 +0000 Subject: [PATCH 07/13] addressed all the code reviewer ai comment --- cortex/cli.py | 87 +++++++++++++++++++++------ cortex/unified_package_manager.py | 63 +++++++++++++------ tests/test_unified_package_manager.py | 64 +++++++++++++++++--- 3 files changed, 169 insertions(+), 45 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index 8971acc7..e99a6858 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2040,7 +2040,7 @@ def pkg(self, args: argparse.Namespace) -> int: self._print_error(f"Unknown pkg action: {action}") return 1 - def _pkg_sources(self, upm, args: argparse.Namespace) -> int: + def _pkg_sources(self, upm: "UnifiedPackageManager", args: argparse.Namespace) -> int: """Show available sources for a package.""" package = args.package cx_print(f"\n🔍 Checking sources for '{package}'...\n", "info") @@ -2124,11 +2124,22 @@ def _pkg_permissions(self, upm, args: argparse.Namespace) -> int: cx_print(f"\n🔐 Permissions for '{package}'\n", "info") - if fmt == "flatpak" or "." in package: - # Likely a flatpak app ID - perms = upm.list_flatpak_permissions(package) - if "error" in perms: - self._print_error(perms["error"]) + # Determine format: explicit flag > detect from installed packages + if fmt is None: + # Check where package is actually installed + sources = upm.detect_package_sources(package) + if sources.get("flatpak") and sources["flatpak"].installed: + fmt = "flatpak" + elif sources.get("snap") and sources["snap"].installed: + fmt = "snap" + else: + fmt = "flatpak" if "." in package and package.count(".") >= 2 else "snap" + + if fmt == "flatpak": + try: + perms = upm.list_flatpak_permissions(package) + except RuntimeError as e: + self._print_error(str(e)) return 1 for section, values in perms.items(): @@ -2140,18 +2151,19 @@ def _pkg_permissions(self, upm, args: argparse.Namespace) -> int: for v in values: console.print(f" {v}") else: - # Assume snap - perms = upm.list_snap_permissions(package) - if "error" in perms: - self._print_error(perms["error"]) + # Snap + try: + perms = upm.list_snap_permissions(package) + except RuntimeError as e: + self._print_error(str(e)) return 1 - if perms["connected"]: + if perms.get("connected"): console.print("[green]Connected Interfaces:[/green]") for conn in perms["connected"]: console.print(f" • {conn['interface']}: {conn['plug']} → {conn['slot']}") - if perms["available"]: + if perms.get("available"): console.print("\n[dim]Available (not connected):[/dim]") for avail in perms["available"]: console.print(f" • {avail['interface']}") @@ -2173,13 +2185,52 @@ def _pkg_snap_redirects(self, upm, args: argparse.Namespace) -> int: disable = getattr(args, "disable", False) if disable: + # Show warning and get confirmation + console.print("\n[yellow]⚠️ This operation will modify system configuration:[/yellow]") + console.print(" File: /etc/apt/apt.conf.d/20snapd.conf") + console.print(" Action: Move to .disabled (backup created)") + console.print("\n[dim]This requires elevated privileges (sudo).[/dim]\n") + + try: + confirm = input("Proceed with disabling snap redirects? [y/N]: ").strip().lower() + if confirm not in ("y", "yes"): + cx_print("Operation cancelled", "info") + return 0 + except (EOFError, KeyboardInterrupt): + console.print() + cx_print("Operation cancelled", "info") + return 0 + cx_print("\n⚠️ Disabling snap redirects...\n", "warning") - success, message = upm.disable_snap_redirects() - if success: - cx_print(message, "success") - else: - self._print_error(message) - return 0 if success else 1 + + # Record to installation history + from datetime import datetime + history = InstallationHistory() + start_time = datetime.now() + + try: + install_id = history.record_installation( + operation_type=InstallationType.REMOVE, + packages=["snap-redirects"], + commands=["disable_snap_redirects"], + start_time=start_time, + ) + + success, message = upm.disable_snap_redirects() + + if success: + history.update_installation(install_id, InstallationStatus.SUCCESS) + cx_print(message, "success") + else: + history.update_installation(install_id, InstallationStatus.FAILED, message) + self._print_error(message) + + return 0 if success else 1 + + except Exception as e: + history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + self._print_error(f"Failed to disable snap redirects: {e}") + return 1 cx_print("\n🔍 Checking for snap redirects...\n", "info") redirects = upm.check_snap_redirects() diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py index ac6dd285..32341adb 100644 --- a/cortex/unified_package_manager.py +++ b/cortex/unified_package_manager.py @@ -7,9 +7,8 @@ - Snap (Canonical's universal packages) - Flatpak (cross-distribution application packages) -""" +""" -import json import logging import os import re @@ -152,7 +151,7 @@ def _check_deb_package(self, package_name: str) -> PackageInfo | None: ) if success and "install ok installed" in stdout: - parts = stdout.strip().split("|") + parts = stdout.strip().split("|",2) version = parts[1] if len(parts) > 1 else "" size = int(parts[2]) * 1024 if len(parts) > 2 and parts[2].isdigit() else 0 return PackageInfo( @@ -241,20 +240,31 @@ def _check_snap_package(self, package_name: str) -> PackageInfo | None: return None def _check_flatpak_package(self, package_name: str) -> PackageInfo | None: - """Check if package is available as flatpak.""" + """ + Check if package is available as flatpak. + + Note: Returns only the first match if multiple flatpaks match the search term. + For full search results, use `flatpak search ` directly. + """ if not self._flatpak_available: return None - # Check if installed (by partial match) + # Check if installed (match by full app ID or app basename) success, stdout, _ = self._run_command( ["flatpak", "list", "--app", "--columns=application,version"] ) if success: for line in stdout.strip().split("\n"): - if package_name.lower() in line.lower(): - parts = line.split("\t") - app_id = parts[0] if parts else "" - version = parts[1] if len(parts) > 1 else "" + if not line: + continue + parts = line.split("\t") + app_id = parts[0].strip() if parts else "" + if not app_id: + continue + # Match exact app ID or exact basename (e.g., "firefox" matches "org.mozilla.firefox") + app_basename = app_id.split(".")[-1].lower() + if app_id.lower() == package_name.lower() or app_basename == package_name.lower(): + version = parts[1].strip() if len(parts) > 1 else "" return PackageInfo( name=app_id, format=PackageFormat.FLATPAK, @@ -525,19 +535,24 @@ def list_flatpak_permissions(self, app_id: str) -> dict[str, Any]: Returns: Dictionary with permission categories and values + + Raises: + RuntimeError: If flatpak is not available or permissions cannot be retrieved """ if not self._flatpak_available: - return {"error": "Flatpak is not available on this system"} + raise RuntimeError("Flatpak is not available on this system") success, stdout, _ = self._run_command(["flatpak", "info", "--show-permissions", app_id]) if not success: - return {"error": f"Could not get permissions for {app_id}"} + raise RuntimeError(f"Could not get permissions for {app_id}") permissions: dict[str, Any] = {} current_section = "" - for line in stdout.strip().split("\n"): + # Parse permission sections - each section is either all key=value pairs (dict) + # or all standalone values (list). Mixed sections are not supported as flatpak + # permissions don't use mixed formats within a single section. for line in stdout.strip().split("\n"): if not line: continue @@ -547,20 +562,20 @@ def list_flatpak_permissions(self, app_id: str) -> dict[str, Any]: elif "=" in line and current_section: if current_section not in permissions: permissions[current_section] = {} - + if isinstance(permissions[current_section], dict): key, value = line.split("=", 1) permissions[current_section][key] = value elif current_section: if current_section not in permissions: permissions[current_section] = [] - + if isinstance(permissions[current_section], list): permissions[current_section].append(line) - def modify_snap_permission( - self, snap_name: str, interface: str, action: str, slot: str | None = None - ) -> tuple[bool, str]: + return permissions + + def modify_snap_permission(self, snap_name: str, interface: str, action: str, slot: str | None = None) -> tuple[bool, str]: """ Modify a snap permission (connect/disconnect an interface). @@ -606,12 +621,13 @@ def modify_flatpak_permission( self, app_id: str, permission: str, value: str ) -> tuple[bool, str]: """ - Modify a flatpak permission using flatpak override. + Modify a flatpak permission using `flatpak override`. Args: app_id: Flatpak application ID - permission: Permission flag (e.g., '--filesystem=home') - value: Value for the permission + permission: Permission name without leading dashes (e.g., 'filesystem') + value: Value/argument for the permission (e.g., 'home' to produce + '--filesystem=home') Returns: Tuple of (success, message) @@ -698,6 +714,8 @@ def disable_snap_redirects(self, backup: bool = True) -> tuple[bool, str]: try: if backup: backup_path = config_path.with_suffix(".conf.disabled") + if backup_path.exists(): + return True, f"Snap redirects already disabled. Existing backup preserved at: {backup_path}" shutil.move(str(config_path), str(backup_path)) return True, f"Snap redirects disabled. Backup saved to: {backup_path}" else: @@ -818,6 +836,11 @@ def format_size(size: int) -> str: lines.append(f" Flatpak: {format_size(analysis.flatpak_total)}") # Top packages per format + if analysis.deb_packages: + lines.append("\nTop DEB Packages:") + for name, size in analysis.deb_packages[:5]: + lines.append(f" {name}: {format_size(size)}") + if analysis.snap_packages: lines.append("\nTop Snap Packages:") for name, size in analysis.snap_packages[:5]: diff --git a/tests/test_unified_package_manager.py b/tests/test_unified_package_manager.py index daed24b7..3f396444 100644 --- a/tests/test_unified_package_manager.py +++ b/tests/test_unified_package_manager.py @@ -14,7 +14,7 @@ import os import sys import unittest -from unittest.mock import MagicMock, patch +from unittest.mock import patch # Add project root to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) @@ -274,21 +274,31 @@ def test_list_snap_permissions(self, mock_run): self.assertIn("connected", result) self.assertIn("available", result) - self.assertTrue(len(result["connected"]) >= 1) + self.assertGreaterEqual(len(result["connected"]), 1) @patch.object(UnifiedPackageManager, "_run_command") def test_list_flatpak_permissions(self, mock_run): """Test listing flatpak permissions.""" mock_run.return_value = ( True, - "[Context]\nshared=network;ipc;\nfilesystems=xdg-download\n\n[Session Bus Policy]\norg.freedesktop.Notifications=talk", + "[Context]\nshared=network;ipc;\n\n[filesystems]\nhome\nxdg-data/themes\n\n[Session Bus Policy]\norg.freedesktop.Notifications=talk\n", "", ) result = self.upm.list_flatpak_permissions("org.test.App") self.assertIn("Context", result) - self.assertIn("shared", result["Context"]) + self.assertIsInstance(result["Context"], dict) + self.assertEqual(result["Context"]["shared"], "network;ipc;") + + self.assertIn("filesystems", result) + self.assertIsInstance(result["filesystems"], list) + self.assertEqual(len(result["filesystems"]), 2) + self.assertIn("home", result["filesystems"]) + + self.assertIn("Session Bus Policy", result) + self.assertIsInstance(result["Session Bus Policy"], dict) + self.assertEqual(result["Session Bus Policy"]["org.freedesktop.Notifications"], "talk") @patch.object(UnifiedPackageManager, "_run_command") def test_modify_snap_permission_connect(self, mock_run): @@ -342,7 +352,7 @@ def test_check_snap_redirects(self, mock_exists, mock_run): result = self.upm.check_snap_redirects() - self.assertTrue(len(result) >= 1) + self.assertGreaterEqual(len(result), 1) # Should find Firefox as transitional firefox_redirect = next((r for r in result if r["package"] == "firefox"), None) self.assertIsNotNone(firefox_redirect) @@ -383,6 +393,45 @@ def test_disable_snap_redirects_permission_denied(self, mock_access, mock_exists self.assertFalse(success) self.assertIn("Permission denied", message) + @patch("pathlib.Path.exists") + @patch("shutil.move") + def test_restore_snap_redirects_success(self, mock_move, mock_exists): + """Test successful restore of snap redirects from backup.""" + # backup exists, config doesn't exist + mock_exists.side_effect = lambda: True # backup exists + + with patch("pathlib.Path.exists") as mock_path_exists: + # First call for backup_path.exists() = True, second for config_path.exists() = False + mock_path_exists.side_effect = [True, False] + mock_move.return_value = None + + success, message = self.upm.restore_snap_redirects() + + self.assertTrue(success) + self.assertIn("restored", message.lower()) + mock_move.assert_called_once() + + @patch("pathlib.Path.exists") + def test_restore_snap_redirects_no_backup(self, mock_exists): + """Test restore when no backup exists.""" + mock_exists.return_value = False + + success, message = self.upm.restore_snap_redirects() + + self.assertFalse(success) + self.assertIn("No backup found", message) + + @patch("pathlib.Path.exists") + def test_restore_snap_redirects_config_exists(self, mock_exists): + """Test restore when config already exists.""" + # backup exists, config also exists + mock_exists.side_effect = [True, True] + + success, message = self.upm.restore_snap_redirects() + + self.assertFalse(success) + self.assertIn("already exists", message) + # ========================================================================= # Storage Analysis Tests # ========================================================================= @@ -462,9 +511,10 @@ def test_list_flatpak_permissions_unavailable(self, mock_run): """Test listing permissions when flatpak unavailable.""" self.upm._flatpak_available = False - result = self.upm.list_flatpak_permissions("org.test.App") + with self.assertRaises(RuntimeError) as context: + self.upm.list_flatpak_permissions("org.test.App") - self.assertIn("error", result) + self.assertIn("not available", str(context.exception)) class TestPackageInfo(unittest.TestCase): From 8e493c7574fe606ef9d19cc951ee48ca4f2bbdee Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Tue, 13 Jan 2026 21:09:29 +0000 Subject: [PATCH 08/13] fixed linting --- cortex/cli.py | 1 + cortex/unified_package_manager.py | 13 +++++++++---- tests/test_unified_package_manager.py | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index e99a6858..f2a49421 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2205,6 +2205,7 @@ def _pkg_snap_redirects(self, upm, args: argparse.Namespace) -> int: # Record to installation history from datetime import datetime + history = InstallationHistory() start_time = datetime.now() diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py index 32341adb..fb7d2b3a 100644 --- a/cortex/unified_package_manager.py +++ b/cortex/unified_package_manager.py @@ -7,7 +7,7 @@ - Snap (Canonical's universal packages) - Flatpak (cross-distribution application packages) -""" +""" import logging import os @@ -151,7 +151,7 @@ def _check_deb_package(self, package_name: str) -> PackageInfo | None: ) if success and "install ok installed" in stdout: - parts = stdout.strip().split("|",2) + parts = stdout.strip().split("|", 2) version = parts[1] if len(parts) > 1 else "" size = int(parts[2]) * 1024 if len(parts) > 2 and parts[2].isdigit() else 0 return PackageInfo( @@ -575,7 +575,9 @@ def list_flatpak_permissions(self, app_id: str) -> dict[str, Any]: return permissions - def modify_snap_permission(self, snap_name: str, interface: str, action: str, slot: str | None = None) -> tuple[bool, str]: + def modify_snap_permission( + self, snap_name: str, interface: str, action: str, slot: str | None = None + ) -> tuple[bool, str]: """ Modify a snap permission (connect/disconnect an interface). @@ -715,7 +717,10 @@ def disable_snap_redirects(self, backup: bool = True) -> tuple[bool, str]: if backup: backup_path = config_path.with_suffix(".conf.disabled") if backup_path.exists(): - return True, f"Snap redirects already disabled. Existing backup preserved at: {backup_path}" + return ( + True, + f"Snap redirects already disabled. Existing backup preserved at: {backup_path}", + ) shutil.move(str(config_path), str(backup_path)) return True, f"Snap redirects disabled. Backup saved to: {backup_path}" else: diff --git a/tests/test_unified_package_manager.py b/tests/test_unified_package_manager.py index 3f396444..c5bdfdca 100644 --- a/tests/test_unified_package_manager.py +++ b/tests/test_unified_package_manager.py @@ -399,7 +399,7 @@ def test_restore_snap_redirects_success(self, mock_move, mock_exists): """Test successful restore of snap redirects from backup.""" # backup exists, config doesn't exist mock_exists.side_effect = lambda: True # backup exists - + with patch("pathlib.Path.exists") as mock_path_exists: # First call for backup_path.exists() = True, second for config_path.exists() = False mock_path_exists.side_effect = [True, False] From 41bc08524b9f903ce8e1d4cddb89572f6200fe34 Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Tue, 13 Jan 2026 21:13:13 +0000 Subject: [PATCH 09/13] fixed lint --- cortex/cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cortex/cli.py b/cortex/cli.py index f2a49421..8a18e6d8 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -28,6 +28,7 @@ if TYPE_CHECKING: from cortex.shell_env_analyzer import ShellEnvironmentAnalyzer + from cortex.unified_package_manager import UnifiedPackageManager # Suppress noisy log messages in normal operation logging.getLogger("httpx").setLevel(logging.WARNING) From 9716d335638b98a11b2c8e343325643d9965c7d5 Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Tue, 13 Jan 2026 21:40:58 +0000 Subject: [PATCH 10/13] update docstring for sudo provilleges --- cortex/unified_package_manager.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cortex/unified_package_manager.py b/cortex/unified_package_manager.py index fb7d2b3a..860da099 100644 --- a/cortex/unified_package_manager.py +++ b/cortex/unified_package_manager.py @@ -581,6 +581,11 @@ def modify_snap_permission( """ Modify a snap permission (connect/disconnect an interface). + Note: + This operation may require elevated privileges (sudo) for system-level + interface modifications. The caller (CLI layer) should request user + confirmation before invoking this method with privilege escalation. + Args: snap_name: Name of the snap package interface: Interface/plug to modify @@ -590,7 +595,8 @@ def modify_snap_permission( For connecting to slots in other snaps, use format "snap-name:slot-name". Returns: - Tuple of (success, message) + Tuple of (success, message). On permission errors, success=False with + an error message indicating privilege requirements. Example: # Connect to system slot (most common case) From 9d82673c995c5ca840f9964187602f888177b1ab Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Tue, 13 Jan 2026 21:53:58 +0000 Subject: [PATCH 11/13] improved logging --- cortex/cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cortex/cli.py b/cortex/cli.py index 8a18e6d8..ffda4054 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2210,6 +2210,7 @@ def _pkg_snap_redirects(self, upm, args: argparse.Namespace) -> int: history = InstallationHistory() start_time = datetime.now() + install_id = None try: install_id = history.record_installation( operation_type=InstallationType.REMOVE, @@ -2230,7 +2231,8 @@ def _pkg_snap_redirects(self, upm, args: argparse.Namespace) -> int: return 0 if success else 1 except Exception as e: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + if install_id is not None: + history.update_installation(install_id, InstallationStatus.FAILED, str(e)) self._print_error(f"Failed to disable snap redirects: {e}") return 1 From 2e92c3a646e6ccb434a752a4fd7dacc9592167e8 Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Thu, 15 Jan 2026 10:10:43 +0000 Subject: [PATCH 12/13] update tests for snap redirect no backup --- tests/test_unified_package_manager.py | 28 +++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/test_unified_package_manager.py b/tests/test_unified_package_manager.py index c5bdfdca..aada8496 100644 --- a/tests/test_unified_package_manager.py +++ b/tests/test_unified_package_manager.py @@ -393,6 +393,20 @@ def test_disable_snap_redirects_permission_denied(self, mock_access, mock_exists self.assertFalse(success) self.assertIn("Permission denied", message) + @patch("pathlib.Path.exists") + @patch("os.access") + def test_disable_snap_redirects_backup_already_exists(self, mock_access, mock_exists): + """Test disabling when backup already exists - should preserve existing backup.""" + # config_path.exists() = True, then backup_path.exists() = True + mock_exists.side_effect = [True, True] + mock_access.return_value = True + + success, message = self.upm.disable_snap_redirects() + + self.assertTrue(success) + self.assertIn("already disabled", message.lower()) + self.assertIn("preserved", message.lower()) + @patch("pathlib.Path.exists") @patch("shutil.move") def test_restore_snap_redirects_success(self, mock_move, mock_exists): @@ -432,6 +446,20 @@ def test_restore_snap_redirects_config_exists(self, mock_exists): self.assertFalse(success) self.assertIn("already exists", message) + @patch("pathlib.Path.exists") + @patch("shutil.move") + def test_restore_snap_redirects_move_failure(self, mock_move, mock_exists): + """Test restore when shutil.move raises an exception.""" + # backup exists, config doesn't exist + mock_exists.side_effect = [True, False] + mock_move.side_effect = OSError("Permission denied") + + success, message = self.upm.restore_snap_redirects() + + self.assertFalse(success) + self.assertIn("Failed to restore", message) + self.assertIn("Permission denied", message) + # ========================================================================= # Storage Analysis Tests # ========================================================================= From 39b7b46083486070006ce69c21e3ed3173644891 Mon Sep 17 00:00:00 2001 From: Piyushrathoree Date: Thu, 15 Jan 2026 10:23:19 +0000 Subject: [PATCH 13/13] update cli.py and linting --- cortex/cli.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cortex/cli.py b/cortex/cli.py index ffda4054..110c78d7 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2827,6 +2827,11 @@ def main(): return cli.env(args) elif args.command == "pkg": return cli.pkg(args) + elif args.command == "upgrade": + from cortex.licensing import open_upgrade_page + + open_upgrade_page() + return 0 else: parser.print_help() return 1