From e3517c6266fe961d1a208454627acba253bc04ec Mon Sep 17 00:00:00 2001 From: Murat Aslan Date: Sat, 3 Jan 2026 11:54:32 +0300 Subject: [PATCH 1/3] feat: implement docker permission fixer tool (#449) --- cortex/cli.py | 16 +++ cortex/docker_fixer.py | 216 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+) create mode 100644 cortex/docker_fixer.py diff --git a/cortex/cli.py b/cortex/cli.py index 7d248002..b122fa12 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -23,6 +23,7 @@ from cortex.notification_manager import NotificationManager from cortex.stack_manager import StackManager from cortex.validators import validate_api_key, validate_install_request +from cortex.docker_fixer import DockerPermissionFixer # Suppress noisy log messages in normal operation logging.getLogger("httpx").setLevel(logging.WARNING) @@ -1521,6 +1522,12 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: console.print(f"Error: {result.error_message}", style="red") return 1 + def fix_docker(self) -> int: + """Run the Interactive Docker Permission Fixer.""" + fixer = DockerPermissionFixer() + fixer.run() + return 0 + # -------------------------- @@ -1857,6 +1864,13 @@ def main(): env_template_apply_parser.add_argument( "--encrypt-keys", help="Comma-separated list of keys to encrypt" ) + + # --- Docker Fixer Command --- + subparsers.add_parser( + "fix-docker", + help="Diagnose and fix Docker permission issues", + aliases=["docker-fix"] + ) # -------------------------- args = parser.parse_args() @@ -1903,6 +1917,8 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command in ["fix-docker", "docker-fix"]: + return cli.fix_docker() else: parser.print_help() return 1 diff --git a/cortex/docker_fixer.py b/cortex/docker_fixer.py new file mode 100644 index 00000000..e92fe712 --- /dev/null +++ b/cortex/docker_fixer.py @@ -0,0 +1,216 @@ +import json +import os +import subprocess +import sys +from typing import Dict, List, Optional, Tuple + +from rich.prompt import Confirm, Prompt +from rich.table import Table + +from cortex.branding import console, cx_header + + +class DockerPermissionFixer: + """ + Diagnoses and suggests fixes for Docker container permission issues, + specifically focusing on bind mounts and UID/GID mapping. + """ + + def __init__(self): + self.host_uid = os.getuid() + self.host_gid = os.getgid() + + def _run_docker_command(self, args: List[str]) -> Tuple[bool, str, str]: + """Run a docker command and return (success, stdout, stderr).""" + try: + result = subprocess.run( + ["docker"] + args, + capture_output=True, + text=True, + check=False + ) + return result.returncode == 0, result.stdout, result.stderr + except FileNotFoundError: + return False, "", "Docker executable not found." + except Exception as e: + return False, "", str(e) + + def list_containers(self) -> List[Dict[str, str]]: + """List running containers.""" + success, stdout, stderr = self._run_docker_command( + ["ps", "--format", "{{.ID}}|{{.Names}}|{{.Image}}|{{.Status}}"] + ) + if not success: + console.print(f"[red]Error listing containers: {stderr}[/red]") + return [] + + containers = [] + for line in stdout.strip().split("\n"): + if not line: + continue + parts = line.split("|") + if len(parts) == 4: + containers.append({ + "id": parts[0], + "name": parts[1], + "image": parts[2], + "status": parts[3] + }) + return containers + + def inspect_container(self, container_id: str) -> Optional[Dict]: + """Get container inspection data.""" + success, stdout, stderr = self._run_docker_command(["inspect", container_id]) + if not success: + console.print(f"[red]Error inspecting container: {stderr}[/red]") + return None + try: + data = json.loads(stdout) + return data[0] if data else None + except json.JSONDecodeError: + return None + + def diagnose(self, container_id: str): + """Diagnose permission issues for a specific container.""" + details = self.inspect_container(container_id) + if not details: + return + + container_name = details.get("Name", "").lstrip("/") + config = details.get("Config", {}) + container_user = config.get("User", "") + + console.print(f"\n[bold cyan]Diagnosing Container: {container_name}[/bold cyan] ({container_id[:12]})") + + # 1. Check Container User + effective_uid = 0 # Default to root if not specified + effective_gid = 0 + + if container_user: + console.print(f" Existing User Config: [yellow]{container_user}[/yellow]") + # Try to parse UID:GID + parts = container_user.split(":") + try: + effective_uid = int(parts[0]) + if len(parts) > 1: + effective_gid = int(parts[1]) + except ValueError: + console.print(f" [dim]User '{container_user}' is a name, assuming mapped to UID inside image.[/dim]") + # In a real tool, we might exec into container to check 'id', but let's keep it simple/safe + else: + console.print(" Existing User Config: [red]Root (0:0)[/red] (Default)") + + # 2. Check Bind Mounts + mounts = details.get("Mounts", []) + bind_mounts = [m for m in mounts if m["Type"] == "bind"] + + if not bind_mounts: + console.print(" [green]No bind mounts detected. Permission issues unlikely.[/green]") + return + + console.print(f" [bold]Found {len(bind_mounts)} bind mounts:[/bold]") + + issues_found = False + + for mount in bind_mounts: + source = mount["Source"] + destination = mount["Destination"] + rw = mount["RW"] + + if not os.path.exists(source): + console.print(f" [dim]{source} -> {destination} (Site not found)[/dim]") + continue + + try: + stat = os.stat(source) + owner_uid = stat.st_uid + owner_gid = stat.st_gid + + status_icon = "✅" + status_msg = "[green]OK[/green]" + + # Logic: If container runs as root (0), it can access host files (usually). + # But files created by container will be root-owned on host, causing issues for host user. + # If container runs as non-root (e.g. 1000), it needs host files to be 1000 or readable/writable by 1000. + + is_root_container = (effective_uid == 0) + is_host_owner_match = (owner_uid == self.host_uid) + + issues = [] + + if is_root_container: + if rw: + issues.append("[yellow]Writes will be owned by root on host[/yellow]") + status_icon = "⚠️" + else: + # Non-root container + if owner_uid != effective_uid: + # Mismatched UID. Can we write? + # This is a simplification. Group permissions matter too. + issues.append(f"[red]UID mismatch[/red] (Host: {owner_uid}, Container: {effective_uid})") + status_icon = "❌" + issues_found = True + + issue_str = f" - {', '.join(issues)}" if issues else "" + + console.print(f" {status_icon} [bold]{source}[/bold]") + console.print(f" -> {destination}") + console.print(f" Host Owner: UID={owner_uid}, GID={owner_gid} {issue_str}") + + except Exception as e: + console.print(f" ❓ {source}: {e}") + + # 3. Recommendations + console.print("\n[bold]Recommendations:[/bold]") + + if effective_uid == 0: + console.print("\n[bold yellow]Option 1: Run as current host user[/bold yellow]") + console.print("To avoid root-owned files on your host machine, perform user mapping:") + console.print(f"\n [dim]# docker run command[/dim]") + console.print(f" docker run [green]--user {self.host_uid}:{self.host_gid}[/green] ...") + console.print(f"\n [dim]# docker-compose.yml[/dim]") + console.print(f" services:") + console.print(f" {container_name}:") + console.print(f" [green]user: \"{self.host_uid}:{self.host_gid}\"[/green]") + + if issues_found: + console.print("\n[bold red]Option 2: Fix Host Permissions[/bold red]") + console.print("If the container *must* run as a specific user (e.g. postgres=999), change host ownership:") + for mount in bind_mounts: + source = mount["Source"] + if os.path.exists(source): + # If we knew the target UID strictly, we'd use that. + # For now, warn user to check container docs. + console.print(f" sudo chown -R : {source}") + + def run(self): + """Interactive wizard.""" + cx_header("Docker Permission Fixer") + console.print(f"Host User: UID=[bold green]{self.host_uid}[/bold green], GID=[bold green]{self.host_gid}[/bold green]") + + containers = self.list_containers() + if not containers: + console.print("No running containers found.") + return + + table = Table(title="Running Containers") + table.add_column("#", style="cyan") + table.add_column("Name", style="magenta") + table.add_column("Image", style="blue") + table.add_column("Status", style="green") + + options = {} + for idx, c in enumerate(containers, 1): + table.add_row(str(idx), c["name"], c["image"], c["status"]) + options[str(idx)] = c["id"] + + console.print(table) + + choice = Prompt.ask("Select a container to diagnose", choices=list(options.keys())) + container_id = options[choice] + + self.diagnose(container_id) + +if __name__ == "__main__": + fixer = DockerPermissionFixer() + fixer.run() From a97989279b805dce969cc78ba18781c0140f38af Mon Sep 17 00:00:00 2001 From: Murat Aslan Date: Sun, 4 Jan 2026 00:22:41 +0300 Subject: [PATCH 2/3] fix: address PR review feedback - remove unused imports and add docstring --- cortex/cli.py | 10 +++++++++- cortex/docker_fixer.py | 21 +++++++++++++++------ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index b122fa12..d798a943 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -1523,7 +1523,15 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: return 1 def fix_docker(self) -> int: - """Run the Interactive Docker Permission Fixer.""" + """ + Run the interactive Docker permission diagnostic tool. + + Diagnoses container user/group settings and bind mount permission mismatches. + Provides actionable recommendations for fixing permission issues. + + Returns: + int: 0 on success + """ fixer = DockerPermissionFixer() fixer.run() return 0 diff --git a/cortex/docker_fixer.py b/cortex/docker_fixer.py index e92fe712..d7951e2f 100644 --- a/cortex/docker_fixer.py +++ b/cortex/docker_fixer.py @@ -1,7 +1,7 @@ import json import os import subprocess -import sys + from typing import Dict, List, Optional, Tuple from rich.prompt import Confirm, Prompt @@ -16,7 +16,18 @@ class DockerPermissionFixer: specifically focusing on bind mounts and UID/GID mapping. """ - def __init__(self): + def __init__(self) -> None: + """Initialize the Docker permission fixer with current host UID/GID. + + Attributes: + host_uid: The current user's UID on the host system. + host_gid: The current user's GID on the host system. + + Raises: + OSError: If running on a non-POSIX system (e.g., Windows). + """ + if not hasattr(os, 'getuid'): + raise OSError("DockerPermissionFixer requires a POSIX-compatible system.") self.host_uid = os.getuid() self.host_gid = os.getgid() @@ -70,7 +81,7 @@ def inspect_container(self, container_id: str) -> Optional[Dict]: except json.JSONDecodeError: return None - def diagnose(self, container_id: str): + def diagnose(self, container_id: str) -> None: """Diagnose permission issues for a specific container.""" details = self.inspect_container(container_id) if not details: @@ -127,14 +138,12 @@ def diagnose(self, container_id: str): owner_gid = stat.st_gid status_icon = "✅" - status_msg = "[green]OK[/green]" # Logic: If container runs as root (0), it can access host files (usually). # But files created by container will be root-owned on host, causing issues for host user. # If container runs as non-root (e.g. 1000), it needs host files to be 1000 or readable/writable by 1000. is_root_container = (effective_uid == 0) - is_host_owner_match = (owner_uid == self.host_uid) issues = [] @@ -183,7 +192,7 @@ def diagnose(self, container_id: str): # For now, warn user to check container docs. console.print(f" sudo chown -R : {source}") - def run(self): + def run(self) -> None: """Interactive wizard.""" cx_header("Docker Permission Fixer") console.print(f"Host User: UID=[bold green]{self.host_uid}[/bold green], GID=[bold green]{self.host_gid}[/bold green]") From d2ff3778b170ec54259c5cb52b36268770ff478c Mon Sep 17 00:00:00 2001 From: Murat Aslan Date: Sun, 4 Jan 2026 01:11:31 +0300 Subject: [PATCH 3/3] fix: address all lint errors - modern type hints, fix indentation, remove unused imports --- cortex/docker_fixer.py | 86 ++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/cortex/docker_fixer.py b/cortex/docker_fixer.py index d7951e2f..feadc5c0 100644 --- a/cortex/docker_fixer.py +++ b/cortex/docker_fixer.py @@ -2,9 +2,7 @@ import os import subprocess -from typing import Dict, List, Optional, Tuple - -from rich.prompt import Confirm, Prompt +from rich.prompt import Prompt from rich.table import Table from cortex.branding import console, cx_header @@ -31,7 +29,7 @@ def __init__(self) -> None: self.host_uid = os.getuid() self.host_gid = os.getgid() - def _run_docker_command(self, args: List[str]) -> Tuple[bool, str, str]: + def _run_docker_command(self, args: list[str]) -> tuple[bool, str, str]: """Run a docker command and return (success, stdout, stderr).""" try: result = subprocess.run( @@ -46,7 +44,7 @@ def _run_docker_command(self, args: List[str]) -> Tuple[bool, str, str]: except Exception as e: return False, "", str(e) - def list_containers(self) -> List[Dict[str, str]]: + def list_containers(self) -> list[dict[str, str]]: """List running containers.""" success, stdout, stderr = self._run_docker_command( ["ps", "--format", "{{.ID}}|{{.Names}}|{{.Image}}|{{.Status}}"] @@ -69,7 +67,7 @@ def list_containers(self) -> List[Dict[str, str]]: }) return containers - def inspect_container(self, container_id: str) -> Optional[Dict]: + def inspect_container(self, container_id: str) -> dict | None: """Get container inspection data.""" success, stdout, stderr = self._run_docker_command(["inspect", container_id]) if not success: @@ -90,21 +88,18 @@ def diagnose(self, container_id: str) -> None: container_name = details.get("Name", "").lstrip("/") config = details.get("Config", {}) container_user = config.get("User", "") - + console.print(f"\n[bold cyan]Diagnosing Container: {container_name}[/bold cyan] ({container_id[:12]})") - + # 1. Check Container User - effective_uid = 0 # Default to root if not specified - effective_gid = 0 - + effective_uid = 0 # Default to root if not specified + if container_user: console.print(f" Existing User Config: [yellow]{container_user}[/yellow]") # Try to parse UID:GID parts = container_user.split(":") try: effective_uid = int(parts[0]) - if len(parts) > 1: - effective_gid = int(parts[1]) except ValueError: console.print(f" [dim]User '{container_user}' is a name, assuming mapped to UID inside image.[/dim]") # In a real tool, we might exec into container to check 'id', but let's keep it simple/safe @@ -114,39 +109,39 @@ def diagnose(self, container_id: str) -> None: # 2. Check Bind Mounts mounts = details.get("Mounts", []) bind_mounts = [m for m in mounts if m["Type"] == "bind"] - + if not bind_mounts: console.print(" [green]No bind mounts detected. Permission issues unlikely.[/green]") return console.print(f" [bold]Found {len(bind_mounts)} bind mounts:[/bold]") - + issues_found = False - + for mount in bind_mounts: source = mount["Source"] destination = mount["Destination"] rw = mount["RW"] - + if not os.path.exists(source): - console.print(f" [dim]{source} -> {destination} (Site not found)[/dim]") + console.print(f" [dim]{source} -> {destination} (Source not found)[/dim]") continue try: stat = os.stat(source) owner_uid = stat.st_uid owner_gid = stat.st_gid - + status_icon = "✅" - + # Logic: If container runs as root (0), it can access host files (usually). # But files created by container will be root-owned on host, causing issues for host user. # If container runs as non-root (e.g. 1000), it needs host files to be 1000 or readable/writable by 1000. - + is_root_container = (effective_uid == 0) - + issues = [] - + if is_root_container: if rw: issues.append("[yellow]Writes will be owned by root on host[/yellow]") @@ -161,7 +156,7 @@ def diagnose(self, container_id: str) -> None: issues_found = True issue_str = f" - {', '.join(issues)}" if issues else "" - + console.print(f" {status_icon} [bold]{source}[/bold]") console.print(f" -> {destination}") console.print(f" Host Owner: UID={owner_uid}, GID={owner_gid} {issue_str}") @@ -171,32 +166,32 @@ def diagnose(self, container_id: str) -> None: # 3. Recommendations console.print("\n[bold]Recommendations:[/bold]") - + if effective_uid == 0: - console.print("\n[bold yellow]Option 1: Run as current host user[/bold yellow]") - console.print("To avoid root-owned files on your host machine, perform user mapping:") - console.print(f"\n [dim]# docker run command[/dim]") - console.print(f" docker run [green]--user {self.host_uid}:{self.host_gid}[/green] ...") - console.print(f"\n [dim]# docker-compose.yml[/dim]") - console.print(f" services:") - console.print(f" {container_name}:") - console.print(f" [green]user: \"{self.host_uid}:{self.host_gid}\"[/green]") - + console.print("\n[bold yellow]Option 1: Run as current host user[/bold yellow]") + console.print("To avoid root-owned files on your host machine, perform user mapping:") + console.print(f"\n [dim]# docker run command[/dim]") + console.print(f" docker run [green]--user {self.host_uid}:{self.host_gid}[/green] ...") + console.print(f"\n [dim]# docker-compose.yml[/dim]") + console.print(f" services:") + console.print(f" {container_name}:") + console.print(f" [green]user: \"{self.host_uid}:{self.host_gid}\"[/green]") + if issues_found: - console.print("\n[bold red]Option 2: Fix Host Permissions[/bold red]") - console.print("If the container *must* run as a specific user (e.g. postgres=999), change host ownership:") - for mount in bind_mounts: - source = mount["Source"] - if os.path.exists(source): - # If we knew the target UID strictly, we'd use that. - # For now, warn user to check container docs. - console.print(f" sudo chown -R : {source}") + console.print("\n[bold red]Option 2: Fix Host Permissions[/bold red]") + console.print("If the container *must* run as a specific user (e.g. postgres=999), change host ownership:") + for mount in bind_mounts: + source = mount["Source"] + if os.path.exists(source): + # If we knew the target UID strictly, we'd use that. + # For now, warn user to check container docs. + console.print(f" sudo chown -R : {source}") def run(self) -> None: """Interactive wizard.""" cx_header("Docker Permission Fixer") console.print(f"Host User: UID=[bold green]{self.host_uid}[/bold green], GID=[bold green]{self.host_gid}[/bold green]") - + containers = self.list_containers() if not containers: console.print("No running containers found.") @@ -214,12 +209,13 @@ def run(self) -> None: options[str(idx)] = c["id"] console.print(table) - + choice = Prompt.ask("Select a container to diagnose", choices=list(options.keys())) container_id = options[choice] - + self.diagnose(container_id) + if __name__ == "__main__": fixer = DockerPermissionFixer() fixer.run()