Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion cortex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
from .cli import main
from .env_loader import load_env
from .packages import PackageManager, PackageManagerType
from .unified_package_manager import (
PackageFormat,
PackageInfo,
StorageAnalysis,
UnifiedPackageManager,
)

__version__ = "0.1.0"

__all__ = ["main", "load_env", "PackageManager", "PackageManagerType"]
__all__ = [
"main",
"load_env",
"PackageManager",
"PackageManagerType",
"UnifiedPackageManager",
"PackageFormat",
"PackageInfo",
"StorageAnalysis",
]
297 changes: 297 additions & 0 deletions cortex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

if TYPE_CHECKING:
from cortex.shell_env_analyzer import ShellEnvironmentAnalyzer
from cortex.unified_package_manager import UnifiedPackageManager

# Suppress noisy log messages in normal operation
logging.getLogger("httpx").setLevel(logging.WARNING)
Expand Down Expand Up @@ -2825,6 +2826,256 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None:
console.print(f"Error: {result.error_message}", style="red")
return 1

# --- Unified Package Manager Commands (Issue #450) ---
def pkg(self, args: argparse.Namespace) -> int:
"""Handle unified package manager commands for snap/flatpak/deb."""
from cortex.unified_package_manager import (
PackageFormat,
UnifiedPackageManager,
)

upm = UnifiedPackageManager()
action = getattr(args, "pkg_action", None)

if not action:
cx_print("\n📦 Unified Package Manager - Snap/Flatpak/Deb\n", "info")
console.print("Usage: cortex pkg <command> [options]")
console.print("\nCommands:")
console.print(" sources <package> Show available sources for a package")
console.print(" compare <package> Compare package across formats")
console.print(" list [--format FORMAT] List installed packages")
console.print(" permissions <package> Show package permissions")
console.print(" storage Analyze storage by format")
console.print(" snap-redirects [--disable] Check/disable snap redirects")
return 0

if action == "sources":
return self._pkg_sources(upm, args)
elif action == "compare":
return self._pkg_compare(upm, args)
elif action == "list":
return self._pkg_list(upm, args)
elif action == "permissions":
return self._pkg_permissions(upm, args)
elif action == "storage":
return self._pkg_storage(upm)
elif action == "snap-redirects":
return self._pkg_snap_redirects(upm, args)
else:
self._print_error(f"Unknown pkg action: {action}")
return 1

def _pkg_sources(self, upm: "UnifiedPackageManager", args: argparse.Namespace) -> int:
"""Show available sources for a package."""
package = args.package
cx_print(f"\n🔍 Checking sources for '{package}'...\n", "info")

sources = upm.detect_package_sources(package)

found_any = False
for format_name, info in sources.items():
if info is not None:
found_any = True
status = "[green]✓ Installed[/green]" if info.installed else "[dim]Available[/dim]"
console.print(f" [{format_name.upper()}] {status}")
console.print(f" Version: {info.version or 'N/A'}")
if info.description:
console.print(f" {info.description[:60]}...")

if not found_any:
cx_print(f"No sources found for '{package}'", "warning")

return 0

def _pkg_compare(self, upm, args: argparse.Namespace) -> int:
"""Compare package across formats."""
package = args.package
cx_print(f"\n📊 Comparing '{package}' across formats...\n", "info")

comparison = upm.compare_package_options(package)

if not comparison["available_formats"]:
cx_print(f"Package '{package}' not found in any format", "warning")
return 1

console.print(f"[bold]Package:[/bold] {comparison['package_name']}")
if comparison["installed_as"]:
console.print(f"[bold]Installed as:[/bold] {comparison['installed_as']}")
console.print(f"[bold]Available in:[/bold] {', '.join(comparison['available_formats'])}")
console.print()

# Show comparison table
for fmt, data in comparison["comparison"].items():
status = "[green]Installed[/green]" if data["installed"] else "[dim]Not installed[/dim]"
console.print(f"[cyan]{fmt.upper()}[/cyan]: {status}")
console.print(f" Version: {data['version'] or 'N/A'}")
if data["size"]:
size_mb = data["size"] / (1024 * 1024)
console.print(f" Size: {size_mb:.1f} MB")

return 0

def _pkg_list(self, upm, args: argparse.Namespace) -> int:
"""List installed packages by format."""
from cortex.unified_package_manager import PackageFormat

format_filter = None
if hasattr(args, "format") and args.format:
format_map = {
"deb": PackageFormat.DEB,
"snap": PackageFormat.SNAP,
"flatpak": PackageFormat.FLATPAK,
}
format_filter = format_map.get(args.format.lower())

cx_print("\n📦 Installed Packages\n", "info")

packages = upm.list_installed_packages(format_filter)

for fmt, pkgs in packages.items():
if pkgs:
console.print(f"\n[cyan][{fmt.upper()}][/cyan] ({len(pkgs)} packages)")
for pkg in pkgs[:10]: # Show top 10
console.print(f" • {pkg.name} ({pkg.version})")
if len(pkgs) > 10:
console.print(f" ... and {len(pkgs) - 10} more")

return 0

def _pkg_permissions(self, upm, args: argparse.Namespace) -> int:
"""Show package permissions."""
package = args.package
fmt = getattr(args, "format", None)

cx_print(f"\n🔐 Permissions for '{package}'\n", "info")

# Determine format: explicit flag > detect from installed packages
if fmt is None:
# Check where package is actually installed
sources = upm.detect_package_sources(package)
if sources.get("flatpak") and sources["flatpak"].installed:
fmt = "flatpak"
elif sources.get("snap") and sources["snap"].installed:
fmt = "snap"
else:
fmt = "flatpak" if "." in package and package.count(".") >= 2 else "snap"

if fmt == "flatpak":
try:
perms = upm.list_flatpak_permissions(package)
except RuntimeError as e:
self._print_error(str(e))
return 1

for section, values in perms.items():
console.print(f"[cyan][{section}][/cyan]")
if isinstance(values, dict):
for k, v in values.items():
console.print(f" {k} = {v}")
elif isinstance(values, list):
for v in values:
console.print(f" {v}")
else:
# Snap
try:
perms = upm.list_snap_permissions(package)
except RuntimeError as e:
self._print_error(str(e))
return 1

if perms.get("connected"):
console.print("[green]Connected Interfaces:[/green]")
for conn in perms["connected"]:
console.print(f" • {conn['interface']}: {conn['plug']} → {conn['slot']}")

if perms.get("available"):
console.print("\n[dim]Available (not connected):[/dim]")
for avail in perms["available"]:
console.print(f" • {avail['interface']}")

return 0

def _pkg_storage(self, upm) -> int:
"""Analyze storage by package format."""
cx_print("\n💾 Analyzing storage...\n", "info")

analysis = upm.analyze_storage()
output = upm.format_storage_analysis(analysis)
console.print(output)

return 0

def _pkg_snap_redirects(self, upm, args: argparse.Namespace) -> int:
"""Check or disable snap redirects."""
disable = getattr(args, "disable", False)

if disable:
# Show warning and get confirmation
console.print("\n[yellow]⚠️ This operation will modify system configuration:[/yellow]")
console.print(" File: /etc/apt/apt.conf.d/20snapd.conf")
console.print(" Action: Move to .disabled (backup created)")
console.print("\n[dim]This requires elevated privileges (sudo).[/dim]\n")

try:
confirm = input("Proceed with disabling snap redirects? [y/N]: ").strip().lower()
if confirm not in ("y", "yes"):
cx_print("Operation cancelled", "info")
return 0
except (EOFError, KeyboardInterrupt):
console.print()
cx_print("Operation cancelled", "info")
return 0

cx_print("\n⚠️ Disabling snap redirects...\n", "warning")

# Record to installation history
from datetime import datetime

history = InstallationHistory()
start_time = datetime.now()

install_id = None
try:
install_id = history.record_installation(
operation_type=InstallationType.REMOVE,
packages=["snap-redirects"],
commands=["disable_snap_redirects"],
start_time=start_time,
)

success, message = upm.disable_snap_redirects()

if success:
history.update_installation(install_id, InstallationStatus.SUCCESS)
cx_print(message, "success")
else:
history.update_installation(install_id, InstallationStatus.FAILED, message)
self._print_error(message)

return 0 if success else 1

except Exception as e:
if install_id is not None:
history.update_installation(install_id, InstallationStatus.FAILED, str(e))
self._print_error(f"Failed to disable snap redirects: {e}")
return 1

cx_print("\n🔍 Checking for snap redirects...\n", "info")
redirects = upm.check_snap_redirects()

if not redirects:
cx_print("No snap redirects detected", "success")
return 0

console.print(f"Found {len(redirects)} potential snap redirects:\n")
for redirect in redirects:
console.print(f" [yellow]•[/yellow] {redirect['package']}")
console.print(f" Type: {redirect['type']}")
console.print(f" {redirect['reason']}")

console.print("\n[dim]To disable: cortex pkg snap-redirects --disable[/dim]")
return 0

# --------------------------


Expand Down Expand Up @@ -3214,6 +3465,49 @@ def main():
sandbox_exec_parser.add_argument("cmd", nargs="+", help="Command to execute")
# --------------------------

# --- Unified Package Manager Commands (Issue #450) ---
pkg_parser = subparsers.add_parser("pkg", help="Unified package manager (snap/flatpak/deb)")
pkg_subs = pkg_parser.add_subparsers(dest="pkg_action", help="Package manager actions")

# pkg sources <package>
pkg_sources_parser = pkg_subs.add_parser("sources", help="Show available sources for a package")
pkg_sources_parser.add_argument("package", help="Package name to search for")

# pkg compare <package>
pkg_compare_parser = pkg_subs.add_parser("compare", help="Compare package across formats")
pkg_compare_parser.add_argument("package", help="Package name to compare")

# pkg list [--format FORMAT]
pkg_list_parser = pkg_subs.add_parser("list", help="List installed packages")
pkg_list_parser.add_argument(
"--format",
choices=["deb", "snap", "flatpak"],
help="Filter by package format",
)

# pkg permissions <package> [--format FORMAT]
pkg_permissions_parser = pkg_subs.add_parser("permissions", help="Show package permissions")
pkg_permissions_parser.add_argument("package", help="Package name or app ID")
pkg_permissions_parser.add_argument(
"--format",
choices=["snap", "flatpak"],
help="Package format (auto-detected if not specified)",
)

# pkg storage
pkg_subs.add_parser("storage", help="Analyze storage usage by package format")

# pkg snap-redirects [--disable]
pkg_redirects_parser = pkg_subs.add_parser(
"snap-redirects", help="Check or disable snap redirects"
)
pkg_redirects_parser.add_argument(
"--disable",
action="store_true",
help="Disable snap redirects (requires sudo)",
)
# --------------------------

# --- Environment Variable Management Commands ---
env_parser = subparsers.add_parser("env", help="Manage environment variables")
env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions")
Expand Down Expand Up @@ -3623,11 +3917,14 @@ def main():
return 1
elif args.command == "env":
return cli.env(args)
elif args.command == "pkg":
return cli.pkg(args)
elif args.command == "upgrade":
from cortex.licensing import open_upgrade_page

open_upgrade_page()
return 0

elif args.command == "license":
from cortex.licensing import show_license_status

Expand Down
Loading
Loading