diff --git a/cortex/cli.py b/cortex/cli.py index ea8976d1..88a7ff47 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -23,6 +23,9 @@ from cortex.llm.interpreter import CommandInterpreter from cortex.network_config import NetworkConfig from cortex.notification_manager import NotificationManager +from cortex.permissions.auditor_fixer import PermissionAuditor + +PermissionManager = PermissionAuditor # Alias for compatibility from cortex.stack_manager import StackManager from cortex.validators import validate_api_key, validate_install_request @@ -84,7 +87,8 @@ def docker_permissions(self, args: argparse.Namespace) -> int: "warning", ) try: - # Interactive confirmation prompt for administrative repair. + # Interactive confirmation prompt for administrative + # repair. response = console.input( "[bold cyan]Reclaim ownership using sudo? (y/n): [/bold cyan]" ) @@ -92,7 +96,8 @@ def docker_permissions(self, args: argparse.Namespace) -> int: cx_print("Operation cancelled", "info") return 0 except (EOFError, KeyboardInterrupt): - # Graceful handling of terminal exit or manual interruption. + # Graceful handling of terminal exit or manual + # interruption. console.print() cx_print("Operation cancelled", "info") return 0 @@ -116,10 +121,53 @@ def docker_permissions(self, args: argparse.Namespace) -> int: cx_print(f"❌ {e}", "error") return 1 except Exception as e: - # Safety net for unexpected runtime exceptions to prevent CLI crashes. + # Safety net for unexpected runtime exceptions to prevent CLI + # crashes. cx_print(f"❌ Unexpected error: {e}", "error") return 1 + # --- Permission Auditor Command --- + def audit_permissions(self, args: argparse.Namespace) -> int: + """Handle permission auditing and fixing. + + Args: + args: Parsed command-line arguments containing path, fix, dry_run, docker, and verbose flags. + + Returns: + int: 0 if no issues found or fixes successfully applied, 1 otherwise. + """ + try: + # Initialize manager + manager = PermissionManager( + verbose=getattr(args, "verbose", False), + dry_run=getattr(args, "dry_run", True), + docker_context=getattr(args, "docker", False), + ) + + path = getattr(args, "path", ".") + dry_run_flag = getattr(args, "dry_run", False) + fix_flag = getattr(args, "fix", False) + apply_fixes = dry_run_flag or fix_flag + + # Scan and fix + result = manager.scan_and_fix(path=path, apply_fixes=apply_fixes, dry_run=dry_run_flag) + + # Print report + cx_print(result["report"], "info") + + # Return exit code + issues_found = result.get("issues_found", 0) + fixes_applied = result.get("fixed", False) + + if issues_found == 0 or fixes_applied: + return 0 + else: + return 1 + + except Exception as e: + cx_print(f"Error during permission audit: {e}", "error") + return 1 + def _debug(self, message: str): """Print debug info only in verbose mode""" if self.verbose: @@ -225,7 +273,8 @@ def notify(self, args): elif args.notify_action == "enable": mgr.config["enabled"] = True # Addressing CodeRabbit feedback: Ideally should use a public method instead of private _save_config, - # but keeping as is for a simple fix (or adding a save method to NotificationManager would be best). + # but keeping as is for a simple fix (or adding a save method to + # NotificationManager would be best). mgr._save_config() self._print_success("Notifications enabled") return 0 @@ -334,8 +383,7 @@ def _handle_stack_install(self, manager: StackManager, args: argparse.Namespace) if suggested_name != original_name: cx_print( - f"💡 No GPU detected, using '{suggested_name}' instead of '{original_name}'", - "info", + f"💡 No GPU detected, using '{suggested_name}' instead of '{original_name}'", "info" ) stack = manager.find_stack(suggested_name) @@ -366,7 +414,7 @@ def _handle_stack_dry_run(self, stack: dict[str, Any], packages: list[str]) -> i return 0 def _handle_stack_real_install(self, stack: dict[str, Any], packages: list[str]) -> int: - """Install all packages in the stack.""" + """Install all packages in the stack""" cx_print(f"\n🚀 Installing stack: {stack['name']}\n", "success") # Batch into a single LLM request @@ -740,7 +788,7 @@ def parallel_log_callback(message: str, level: str = "info"): success, parallel_tasks = asyncio.run( run_parallel_install( commands=commands, - descriptions=[f"Step {i + 1}" for i in range(len(commands))], + descriptions=[f"Step {i +1}" for i in range(len(commands))], timeout=300, stop_on_error=True, log_callback=parallel_log_callback, @@ -812,7 +860,7 @@ def parallel_log_callback(message: str, level: str = "info"): coordinator = InstallationCoordinator( commands=commands, - descriptions=[f"Step {i + 1}" for i in range(len(commands))], + descriptions=[f"Step {i +1}" for i in range(len(commands))], timeout=300, stop_on_error=True, progress_callback=progress_callback, @@ -1240,7 +1288,8 @@ def _env_import(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> else: cx_print("No variables imported", "info") - # Return success (0) even with partial errors - some vars imported successfully + # Return success (0) even with partial errors - some vars imported + # successfully return 0 except FileNotFoundError: @@ -1421,7 +1470,7 @@ def _env_audit(self, args: argparse.Namespace) -> int: # Sort by number of sources (most definitions first) sorted_vars = sorted(audit.variables.items(), key=lambda x: len(x[1]), reverse=True) for var_name, sources in sorted_vars[:20]: # Limit to top 20 - console.print(f"\n [cyan]{var_name}[/cyan] ({len(sources)} definition(s))") + console.print(f"\n [cyan]{var_name}[/cyan] ({len(sources)} definition(s))") for src in sources: console.print(f" [dim]{src.file}:{src.line_number}[/dim]") # Show truncated value @@ -2086,7 +2135,8 @@ def main(): if temp_args.command in NETWORK_COMMANDS: # Now detect network (only when needed) - network.detect(check_quality=True) # Include quality check for these commands + # Include quality check for these commands + network.detect(check_quality=True) network.auto_configure() except Exception as e: @@ -2274,7 +2324,8 @@ def main(): env_parser = subparsers.add_parser("env", help="Manage environment variables") env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions") - # env set [--encrypt] [--type TYPE] [--description DESC] + # env set [--encrypt] [--type TYPE] [--description + # DESC] env_set_parser = env_subs.add_parser("set", help="Set an environment variable") env_set_parser.add_argument("app", help="Application name") env_set_parser.add_argument("key", help="Variable name") @@ -2361,6 +2412,25 @@ def main(): "--encrypt-keys", help="Comma-separated list of keys to encrypt" ) + # --- Audit Permissions Command --- + audit_parser = subparsers.add_parser( + "audit-permissions", help="Audit and fix dangerous file permissions" + ) + audit_parser.add_argument( + "path", nargs="?", default=".", help="Path to scan (default: current directory)" + ) + + fix_group = audit_parser.add_mutually_exclusive_group() + fix_group.add_argument("--fix", action="store_true", help="Apply safe fixes") + fix_group.add_argument( + "--dry-run", action="store_true", help="Show what would be fixed without changes" + ) + + audit_parser.add_argument( + "--docker", action="store_true", help="Consider Docker container UID mappings" + ) + audit_parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output") + # --- Shell Environment Analyzer Commands --- # env audit - show all shell variables with sources env_audit_parser = env_subs.add_parser( @@ -2531,6 +2601,8 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command == "audit-permissions": + return cli.audit_permissions(args) else: parser.print_help() return 1 diff --git a/cortex/permissions/__init__.py b/cortex/permissions/__init__.py new file mode 100644 index 00000000..ddb2c1f5 --- /dev/null +++ b/cortex/permissions/__init__.py @@ -0,0 +1,71 @@ +""" +Permission Auditor & Fixer module. +""" + +from typing import Any + + +class PermissionAuditor: + """Proxy class for PermissionAuditor with lazy loading.""" + + def __new__(cls, *args, **kwargs): + from .auditor_fixer import PermissionAuditor as RealPermissionAuditor + + return RealPermissionAuditor(*args, **kwargs) + + +class DockerPermissionHandler: + """Proxy class for DockerPermissionHandler with lazy loading.""" + + def __new__(cls, *args, **kwargs): + from .docker_handler import DockerPermissionHandler as RealDockerPermissionHandler + + return RealDockerPermissionHandler(*args, **kwargs) + + +PermissionManager = PermissionAuditor +PermissionFixer = PermissionAuditor + + +def scan_path(path: str) -> Any: + """ + Simplified interface to scan a path for permission issues. + + Args: + path: Directory path to scan + + Returns: + Scan results from PermissionAuditor.scan_directory() + """ + auditor = PermissionAuditor() + return auditor.scan_directory(path) + + +def analyze_permissions(path: str) -> dict[str, Any]: + """ + Analyze permissions and return detailed report. + + Args: + path: Directory path to analyze + + Returns: + Dictionary with scan results and analysis + """ + auditor = PermissionAuditor() + scan = auditor.scan_directory(path) + return { + "path": path, + "auditor": auditor, + "scan": scan, + "issues_count": len(scan.get("world_writable", [])) + len(scan.get("dangerous", [])), + } + + +__all__ = [ + "PermissionAuditor", + "DockerPermissionHandler", + "PermissionManager", + "PermissionFixer", + "scan_path", + "analyze_permissions", +] diff --git a/cortex/permissions/auditor_fixer.py b/cortex/permissions/auditor_fixer.py new file mode 100644 index 00000000..c07df9de --- /dev/null +++ b/cortex/permissions/auditor_fixer.py @@ -0,0 +1,409 @@ +""" +Permission Auditor & Fixer module. +Fixes security issues with dangerous file permissions (777, world-writable). +""" + +import logging +import os +import stat +from pathlib import Path +from typing import Optional, Union + +logger = logging.getLogger(__name__) + + +class PermissionAuditor: + """ + Auditor for detecting and fixing dangerous file permissions. + + Detects: + - World-writable files (others have write permission) + - Files with 777 permissions + - Insecure directory permissions + """ + + def __init__(self, verbose=False, dry_run=True, docker_context=False): + self.verbose = verbose + self.dry_run = dry_run + self.docker_handler = None + self.logger = logging.getLogger(__name__) + + if docker_context: + from .docker_handler import DockerPermissionHandler + + self.docker_handler = DockerPermissionHandler() + + if verbose: + self.logger.setLevel(logging.DEBUG) + + def explain_issue_plain_english(self, filepath: str, issue_type: str) -> str: + """ + Explain permission issue in plain English. + + Args: + filepath: Path to the file + issue_type: Type of issue ('world_writable', 'dangerous_777') + + Returns: + Plain English explanation + """ + filename = os.path.basename(filepath) + + explanations = { + "world_writable": ( + f"⚠️ SECURITY RISK: '{filename}' is WORLD-WRITABLE.\n" + " This means ANY user on the system can MODIFY this file.\n" + " Attackers could: inject malicious code, delete data, or tamper with configurations.\n" + " FIX: Restrict write permissions to owner only." + ), + "dangerous_777": ( + f"🚨 CRITICAL RISK: '{filename}' has 777 permissions (rwxrwxrwx).\n" + " This means EVERYONE can read, write, and execute this file.\n" + " This is like leaving your house with doors unlocked and keys in the lock.\n" + " FIX: Set appropriate permissions (644 for files, 755 for scripts)." + ), + } + return explanations.get(issue_type, f"Permission issue detected in '{filename}'") + + def scan_directory(self, directory_path: str | Path) -> dict[str, list[str]]: + """ + Scan directory for dangerous permissions. + + Args: + directory_path: Path to directory to scan + + Returns: + Dictionary with keys: + - 'world_writable': List of world-writable files + - 'dangerous': List of files with dangerous permissions (777) + - 'suggestions': List of suggested fixes + - 'docker_context': True if Docker files found + """ + path = Path(directory_path).resolve() + result = {"world_writable": [], "dangerous": [], "suggestions": [], "docker_context": False} + + if not self._validate_path(path): + return result + + self._check_docker_context(path, result) + self._scan_files_for_permissions(path, result) + + return result + + def _validate_path(self, path: Path) -> bool: + """Validate that path exists and is a directory.""" + if not path.exists(): + logger.warning(f"Directory does not exist: {path}") + return False + + if not path.is_dir(): + logger.warning(f"Path is not a directory: {path}") + return False + + return True + + def _check_docker_context(self, path: Path, result: dict) -> None: + """Check for Docker context in current and parent directories.""" + docker_files = ["docker-compose.yml", "docker-compose.yaml", "Dockerfile", ".dockerignore"] + + # Check current directory + if self._has_docker_files(path, docker_files, result): + return + + # Check parent directories + for parent in path.parents: + if self._has_docker_files(parent, docker_files, result): + return + + def _has_docker_files(self, directory: Path, docker_files: list, result: dict) -> bool: + """Check if directory contains any Docker files.""" + for docker_file in docker_files: + docker_path = directory / docker_file + if docker_path.exists(): + result["docker_context"] = True + if self.verbose: + logger.debug(f"Docker context detected: {docker_file}") + return True + return False + + def _scan_files_for_permissions(self, path: Path, result: dict) -> None: + """Scan all files in directory for dangerous permissions.""" + try: + for item in path.rglob("*"): + if item.is_file(): + self._check_file_permissions(item, result) + except OSError as e: + logger.error(f"Error scanning directory {path}: {e}") + + def _check_file_permissions(self, file_path: Path, result: dict) -> None: + """Check permissions for a single file.""" + try: + mode = file_path.stat().st_mode + str_path = str(file_path) + + self._check_world_writable(mode, str_path, result) + self._check_dangerous_777(mode, str_path, result) + + except OSError as e: + if self.verbose: + logger.debug(f"Cannot access {file_path}: {e}") + + def _check_world_writable(self, mode: int, file_path: str, result: dict) -> None: + """Check if file is world-writable.""" + if mode & stat.S_IWOTH: + result["world_writable"].append(file_path) + suggestion = self.suggest_fix(file_path, current_perms=oct(mode & 0o777)) + result["suggestions"].append(suggestion) + + def _check_dangerous_777(self, mode: int, file_path: str, result: dict) -> None: + """Check if file has 777 permissions.""" + if (mode & 0o777) == 0o777 and file_path not in result["dangerous"]: + result["dangerous"].append(file_path) + + # Check if suggestion already exists + if not self._has_suggestion_for_file(file_path, result["suggestions"]): + suggestion = self.suggest_fix(file_path, current_perms="777") + result["suggestions"].append(suggestion) + + def _has_suggestion_for_file(self, file_path: str, suggestions: list) -> bool: + """Check if suggestion already exists for the file.""" + for suggestion in suggestions: + if len(suggestion.split()) > 2: + suggested_file = suggestion.split()[2].strip("'") + if suggested_file == file_path: + return True + return False + + def suggest_fix(self, filepath: str | Path, current_perms: str | None = None) -> str: + """ + Suggest correct permissions for a file. + + Args: + filepath: Path to the file + current_perms: Current permissions in octal (e.g., '777') + + Returns: + Suggested chmod command to fix permissions + """ + path = Path(filepath) + + if not path.exists(): + return f"# File {filepath} doesn't exist" + + try: + mode = path.stat().st_mode + + # Get file extension and check if executable + is_executable = mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + is_script = path.suffix in [".sh", ".py", ".pl", ".rb", ".bash"] + + # Suggested permissions based on file type + if is_executable or is_script: + suggested = "755" # rwxr-xr-x + reason = "executable/script file" + else: + suggested = "644" # rw-r--r-- + reason = "data file" + + current = oct(mode & 0o777)[-3:] if current_perms is None else current_perms + + return f"chmod {suggested} '{filepath}' # Fix: {current} → {suggested} ({reason})" + + except OSError as e: + return f"# Cannot access {filepath}: {e}" + + def fix_permissions( + self, filepath: str | Path, permissions: str = "644", dry_run: bool = True + ) -> str: + """ + Fix permissions for a single file. + + Args: + filepath: Path to the file + permissions: Permissions in octal (e.g., '644', '755') + dry_run: If True, only show what would be changed + + Returns: + Report of the change made or that would be made + """ + path = Path(filepath) + + if not path.exists(): + return f"File does not exist: {filepath}" + + try: + current_mode = path.stat().st_mode + current_perms = oct(current_mode & 0o777)[-3:] + + if dry_run: + return f"[DRY RUN] Would change {filepath}: {current_perms} → {permissions}" + else: + # Preserve file type bits, only change permission bits + new_mode = (current_mode & ~0o777) | int(permissions, 8) + path.chmod(new_mode) + + # Verify the change + verified = oct(path.stat().st_mode & 0o777)[-3:] + return f"Changed {filepath}: {current_perms} → {verified}" + + except OSError as e: + return f"Error changing permissions on {filepath}: {e}" + + def scan_and_fix(self, path=".", apply_fixes=False, dry_run=None): + """ + Scan directory and optionally fix issues. + Used by CLI command. + + Args: + path: Directory to scan + apply_fixes: If True, apply fixes + dry_run: If None, use self.dry_run; if True/False, override + """ + dry_run = self.dry_run if dry_run is None else dry_run + scan_result = self.scan_directory(path) + + report_lines = self._generate_report_header(path, scan_result) + self._add_docker_context(report_lines, scan_result) + self._add_issue_sections(report_lines, scan_result) + self._add_fix_suggestions(report_lines, scan_result) + + fixed_count = 0 + if apply_fixes: + fixed_count = self._apply_fixes(report_lines, scan_result, dry_run) + + self._finalize_report(report_lines, fixed_count) + + return { + "report": "\n".join(report_lines), + "issues_found": len(scan_result["world_writable"]) + len(scan_result["dangerous"]), + "scan_result": scan_result, + "fixed": apply_fixes and not dry_run, + } + + def _generate_report_header(self, path: str, scan_result: dict) -> list: + """Generate the header section of the report.""" + issues_found = len(scan_result["world_writable"]) + len(scan_result["dangerous"]) + + return [ + "🔒 PERMISSION AUDIT REPORT", + "=" * 50, + f"Scanned: {path}", + f"Total issues found: {issues_found}", + "", + ] + + def _add_docker_context(self, report_lines: list, scan_result: dict) -> None: + """Add Docker context information to report if applicable.""" + if self.docker_handler and scan_result.get("docker_context"): + report_lines.extend( + [ + "🐳 DOCKER/CONTAINER CONTEXT:", + f" Running in: {self.docker_handler.container_info['container_runtime'] or 'Native'}", + f" Host UID/GID: {self.docker_handler.container_info['host_uid']}/{self.docker_handler.container_info['host_gid']}", + "", + ] + ) + + def _add_issue_sections(self, report_lines: list, scan_result: dict) -> None: + """Add world-writable and dangerous files sections to report.""" + self._add_issue_section( + report_lines, + scan_result["world_writable"], + "🚨 WORLD-WRITABLE FILES (others can write):", + "world_writable", + ) + + self._add_issue_section( + report_lines, scan_result["dangerous"], "⚠️ DANGEROUS PERMISSIONS (777):", "dangerous" + ) + + def _add_issue_section(self, report_lines: list, files: list, title: str, key: str) -> None: + """Add a single issue section to the report.""" + if files: + report_lines.append(title) + for file in files[:10]: + report_lines.append(f" • {file}") + + if len(files) > 10: + report_lines.append(f" ... and {len(files) - 10} more") + + report_lines.append("") + + def _add_fix_suggestions(self, report_lines: list, scan_result: dict) -> None: + """Add suggestions and fix commands to report.""" + if scan_result["suggestions"]: + self._add_suggestions_list(report_lines, scan_result["suggestions"]) + self._add_one_command_fix(report_lines, scan_result["world_writable"]) + + def _add_suggestions_list(self, report_lines: list, suggestions: list) -> None: + """Add the list of suggested fixes.""" + report_lines.append("💡 SUGGESTED FIXES:") + for suggestion in suggestions[:5]: + report_lines.append(f" {suggestion}") + + if len(suggestions) > 5: + report_lines.append(f" ... and {len(suggestions) - 5} more") + + def _add_one_command_fix(self, report_lines: list, world_writable_files: list) -> None: + """Add the 'one command to fix all' section.""" + report_lines.append("💡 ONE COMMAND TO FIX ALL ISSUES:") + fix_commands = [] + + for file_path in world_writable_files[:10]: + suggestion = self.suggest_fix(file_path) + if "chmod" in suggestion: + parts = suggestion.split() + if len(parts) >= 3: + fix_commands.append(f"{parts[0]} {parts[1]} '{parts[2]}'") + + if fix_commands: + report_lines.append(" Run this command:") + report_lines.append(" " + " && ".join(fix_commands[:3])) + + if len(fix_commands) > 3: + report_lines.append(f" ... and {len(fix_commands) - 3} more commands") + + report_lines.append("") + + def _apply_fixes(self, report_lines: list, scan_result: dict, dry_run: bool) -> int: + """Apply fixes to files and update report.""" + report_lines.extend(["", "🛠️ APPLYING FIXES:"]) + fixed_count = 0 + + for file_path in scan_result["world_writable"]: + try: + fixed = self._apply_single_fix(report_lines, file_path, dry_run) + if fixed: + fixed_count += 1 + except Exception as e: + report_lines.append(f" ✗ Error fixing {file_path}: {e}") + + report_lines.append(f"Fixed {fixed_count} files") + return fixed_count + + def _apply_single_fix(self, report_lines: list, file_path: str, dry_run: bool) -> bool: + """Apply fix to a single file.""" + suggestion = self.suggest_fix(file_path) + + if "chmod" not in suggestion: + return False + + parts = suggestion.split() + if len(parts) < 2: + return False + + cmd, perms = parts[0], parts[1] + if cmd != "chmod" or not perms.isdigit(): + return False + + if not dry_run: + self.fix_permissions(file_path, permissions=perms, dry_run=False) + report_lines.append(f" ✓ Fixed: {file_path}") + else: + report_lines.append(f" [DRY RUN] Would fix: {file_path}") + + return True + + def _finalize_report(self, report_lines: list, fixed_count: int) -> None: + """Add final lines to the report.""" + report_lines.extend(["", "✅ Scan complete"]) diff --git a/cortex/permissions/config.py b/cortex/permissions/config.py new file mode 100644 index 00000000..72876999 --- /dev/null +++ b/cortex/permissions/config.py @@ -0,0 +1,82 @@ +""" +Configuration for Permission Auditor & Fixer. +""" + +DANGEROUS_PERMISSIONS = { + 0o777: "Full read/write/execute for everyone (rwxrwxrwx)", + 0o666: "Read/write for everyone (rw-rw-rw-)", + 0o000: "No permissions for anyone (---------)", +} + +# World-writable flag +WORLD_WRITABLE_FLAG = 0o002 # S_IWOTH + +# Paths to ignore during scanning +IGNORE_PATTERNS = [ + "/proc/*", + "/sys/*", + "/dev/*", + "/run/*", + "*.pyc", + "__pycache__", + ".git/*", + ".env", + "venv/*", + "node_modules/*", + "*.swp", + "*.tmp", +] + +# Docker-specific paths that need special handling +DOCKER_PATTERNS = [ + "/var/lib/docker/*", + "/var/run/docker.sock", + "docker-compose.yml", + "docker-compose.yaml", + "Dockerfile", + ".dockerignore", +] + +# Sensitive files that should never be world-readable +SENSITIVE_FILES = [ + ".env", + ".ssh/id_rsa", + ".ssh/id_rsa.pub", + ".aws/credentials", + ".docker/config.json", + "id_rsa", + "id_rsa.pub", +] + +# Recommended permissions by file type +RECOMMENDED_PERMISSIONS = { + "directory": 0o755, # drwxr-xr-x + "executable": 0o755, # -rwxr-xr-x + "config_file": 0o644, # -rw-r--r-- + "data_file": 0o644, # -rw-r--r-- + "log_file": 0o640, # -rw-r----- + "secret_file": 0o600, # -rw------- + "docker_socket": 0o660, # Docker socket special permissions +} + +# Common Docker UID/GID mappings +COMMON_DOCKER_UIDS = { + 0: "root", + 1: "daemon", + 33: "www-data", + 999: "postgres", + 101: "nginx", + 102: "redis", + 103: "mysql", + 104: "mongodb", +} + +# Docker-specific recommended permissions +DOCKER_RECOMMENDED_PERMISSIONS = { + "volume_directory": 0o755, + "config_directory": 0o755, + "data_directory": 0o755, + "log_directory": 0o755, + "compose_file": 0o644, + "dockerfile": 0o644, +} diff --git a/cortex/permissions/docker_handler.py b/cortex/permissions/docker_handler.py new file mode 100644 index 00000000..1c8f861d --- /dev/null +++ b/cortex/permissions/docker_handler.py @@ -0,0 +1,427 @@ +""" +Docker UID/GID mapping handler for Permission Auditor. +Handles container-specific permission issues and UID mapping. +""" + +import grp +import os +import pwd +import re +from pathlib import Path +from typing import Optional + + +class DockerPermissionHandler: + """Handle Docker-specific permission mapping and adjustments.""" + + def __init__(self, verbose: bool = False, dry_run: bool = True): + self.verbose = verbose + self.dry_run = dry_run + self.container_info = self._detect_container_environment() + + def _detect_container_environment(self) -> dict: + """Detect if running in Docker/container environment.""" + info = { + "is_container": False, + "container_runtime": None, + "host_uid": os.getuid(), + "host_gid": os.getgid(), + "uid_mapping": {}, + "gid_mapping": {}, + } + + # Check common container indicators + try: + # Check /proc/self/cgroup for container info + cgroup_path = "/proc/self/cgroup" + if os.path.exists(cgroup_path): + with open(cgroup_path) as f: + content = f.read() + + if "docker" in content.lower(): + info["is_container"] = True + info["container_runtime"] = "docker" + elif "kubepods" in content or "containerd" in content: + info["is_container"] = True + info["container_runtime"] = "kubernetes" + + # Check /.dockerenv file (Docker specific) + if os.path.exists("/.dockerenv"): + info["is_container"] = True + info["container_runtime"] = "docker" + + # Check for Podman + if "container" in os.environ and os.environ.get("container") == "podman": + info["is_container"] = True + info["container_runtime"] = "podman" + + except OSError as e: + if self.verbose: + print(f"Container detection warning: {e}") + + # Discover UID/GID mappings + if info["is_container"]: + info["uid_mapping"] = self._discover_uid_mapping() + info["gid_mapping"] = self._discover_gid_mapping() + + return info + + def _discover_uid_mapping(self) -> dict[int, str]: + """Discover UID to username mapping in container.""" + mapping = {} + + try: + # Common container UIDs and their typical usernames + common_uids = { + 0: "root", + 1: "daemon", + 2: "bin", + 3: "sys", + 4: "sync", + 5: "games", + 6: "man", + 7: "lp", + 8: "mail", + 9: "news", + 10: "uucp", + 13: "proxy", + 33: "www-data", + 100: "users", + 999: "postgres", + 101: "nginx", + 102: "redis", + 103: "mysql", + 104: "mongodb", + } + + # Add current user's UID + current_uid = os.getuid() + try: + current_user = pwd.getpwuid(current_uid).pw_name + common_uids[current_uid] = current_user + except (KeyError, AttributeError): + common_uids[current_uid] = f"user{current_uid}" + + # Check which UIDs actually exist in the container + for uid, default_name in common_uids.items(): + try: + user_info = pwd.getpwuid(uid) + mapping[uid] = user_info.pw_name + except KeyError: + # UID doesn't exist in this container + continue + + except Exception as e: + if self.verbose: + print(f"UID mapping discovery error: {e}") + # Fallback to minimal mapping + mapping = {0: "root", os.getuid(): f"user{os.getuid()}"} + + return mapping + + def _discover_gid_mapping(self) -> dict[int, str]: + """Discover GID to groupname mapping in container.""" + mapping = {} + + try: + # Common container GIDs + common_gids = { + 0: "root", + 1: "daemon", + 2: "bin", + 3: "sys", + 4: "adm", + 5: "tty", + 6: "disk", + 7: "lp", + 8: "mail", + 9: "news", + 10: "uucp", + 12: "man", + 13: "proxy", + 33: "www-data", + 100: "users", + 999: "postgres", + 101: "nginx", + } + + current_gid = os.getgid() + try: + group_info = grp.getgrgid(current_gid) + common_gids[current_gid] = group_info.gr_name + except (KeyError, AttributeError): + common_gids[current_gid] = f"group{current_gid}" + + for gid, default_name in common_gids.items(): + try: + group_info = grp.getgrgid(gid) + mapping[gid] = group_info.gr_name + except KeyError: + continue + + except Exception as e: + if self.verbose: + print(f"GID mapping discovery error: {e}") + mapping = {0: "root", os.getgid(): f"group{os.getgid()}"} + + return mapping + + def adjust_issue_for_container(self, issue: dict) -> dict: + """Adjust permission issue description for container context.""" + if not self.container_info["is_container"]: + return issue + + adjusted_issue = issue.copy() + path = issue.get("path", "") + + # Add container-specific context + if "docker" in path.lower() or "/var/lib/docker" in path: + adjusted_issue["container_context"] = { + "type": "docker_volume", + "suggestion": "Consider using named volumes with proper ownership", + } + + # Adjust UID/GID in descriptions + if "stat" in issue: + uid = issue["stat"].get("uid") + gid = issue["stat"].get("gid") + + if uid is not None: + username = self.container_info["uid_mapping"].get(uid, f"UID{uid}") + adjusted_issue["uid_info"] = f"{uid} ({username})" + + if gid is not None: + groupname = self.container_info["gid_mapping"].get(gid, f"GID{gid}") + adjusted_issue["gid_info"] = f"{gid} ({groupname})" + + return adjusted_issue + + def get_container_specific_fix(self, issue: dict, base_fix: dict) -> dict: + """Get container-specific fix recommendation.""" + if not self.container_info["is_container"]: + return base_fix + + path = issue.get("path", "") + fix = base_fix.copy() + + # Special handling for Docker bind mounts + if any(pattern in path for pattern in ["/var/lib/docker", "/docker/", "docker.sock"]): + fix["container_advice"] = ( + "For Docker bind mounts, consider:\n" + "1. Use Docker volumes instead of bind mounts\n" + "2. Set correct UID/GID in Dockerfile with USER directive\n" + "3. Use docker run --user flag to match host UID\n" + "4. For existing containers, use: docker exec -u root container chown ..." + ) + + # Adjust for common container paths + if "/var/www/" in path or "/app/" in path: + fix["recommended"] = 0o755 if issue.get("is_directory") else 0o644 + fix["reason"] = "Web application files in container" + + return fix + + def fix_docker_bind_mount_permissions( + self, path: str, host_uid: int = None, host_gid: int = None, dry_run: bool = True + ) -> dict: + """Specialized fix for Docker bind mount permission issues.""" + result = { + "success": False, + "actions": [], + "warnings": [], + "dry_run": dry_run, + } + + try: + path_obj = Path(path).resolve() + + if not path_obj.exists(): + result["warnings"].append(f"Path does not exist: {path}") + return result + + # Determine target UID/GID + target_uid = host_uid if host_uid is not None else os.getuid() + target_gid = host_gid if host_gid is not None else os.getgid() + + # Check current ownership + stat_info = path_obj.stat() + current_uid = stat_info.st_uid + current_gid = stat_info.st_gid + + result["current"] = { + "path": str(path_obj), + "uid": current_uid, + "gid": current_gid, + "username": self._uid_to_name(current_uid), + "groupname": self._gid_to_name(current_gid), + } + + result["target"] = { + "uid": target_uid, + "gid": target_gid, + "username": self._uid_to_name(target_uid), + "groupname": self._gid_to_name(target_gid), + } + + # Plan actions + actions_needed = self._plan_actions( + current_uid, current_gid, target_uid, target_gid, stat_info.st_mode, path_obj + ) + + result["actions"] = actions_needed + + # Execute if not dry-run + result["success"] = self._execute_actions( + actions_needed, path_obj, target_uid, target_gid, dry_run, result + ) + + except Exception as e: + result["warnings"].append(f"Unexpected error: {e}") + + return result + + def _plan_actions( + self, + current_uid: int, + current_gid: int, + target_uid: int, + target_gid: int, + current_mode: int, + path_obj: Path, + ) -> list: + """Plan actions needed to fix permissions.""" + actions_needed = [] + + # Check if ownership needs to be changed + if current_uid != target_uid or current_gid != target_gid: + actions_needed.append( + self._create_chown_action( + current_uid, current_gid, target_uid, target_gid, path_obj + ) + ) + + # Check if permissions need to be fixed + current_mode_octal = current_mode & 0o777 + if current_mode_octal == 0o777 or current_mode_octal == 0o666: + actions_needed.append(self._create_chmod_action(current_mode_octal, path_obj)) + + return actions_needed + + def _create_chown_action( + self, current_uid: int, current_gid: int, target_uid: int, target_gid: int, path_obj: Path + ) -> dict: + """Create chown action dictionary.""" + return { + "type": "chown", + "command": f"chown {target_uid}:{target_gid} '{path_obj}'", + "description": f"Change ownership from {current_uid}:{current_gid} to {target_uid}:{target_gid}", + } + + def _create_chmod_action(self, current_mode: int, path_obj: Path) -> dict: + """Create chmod action dictionary.""" + recommended = 0o755 if path_obj.is_dir() else 0o644 + return { + "type": "chmod", + "command": f"chmod {oct(recommended)[2:]} '{path_obj}'", + "description": f"Fix dangerous permissions {oct(current_mode)} -> {oct(recommended)}", + } + + def _execute_actions( + self, + actions_needed: list, + path_obj: Path, + target_uid: int, + target_gid: int, + dry_run: bool, + result: dict, + ) -> bool: + """Execute planned actions.""" + if not dry_run and actions_needed: + return self._execute_real_actions( + actions_needed, path_obj, target_uid, target_gid, result + ) + elif dry_run and actions_needed: + return True # Dry-run considered successful + else: + return True # No actions needed + + def _execute_real_actions( + self, actions_needed: list, path_obj: Path, target_uid: int, target_gid: int, result: dict + ) -> bool: + """Execute actions in non-dry-run mode.""" + all_succeeded = True + + for action in actions_needed: + try: + if action["type"] == "chown": + os.chown(path_obj, target_uid, target_gid) + elif action["type"] == "chmod": + recommended = int(action["command"].split()[1], 8) + os.chmod(path_obj, recommended) + + except OSError as e: + result["warnings"].append(f"Failed {action['type']}: {e}") + all_succeeded = False + + return all_succeeded + + def _uid_to_name(self, uid: int) -> str: + """Convert UID to username.""" + try: + return pwd.getpwuid(uid).pw_name + except (KeyError, AttributeError): + return f"UID{uid}" + + def _gid_to_name(self, gid: int) -> str: + """Convert GID to groupname.""" + try: + return grp.getgrgid(gid).gr_name + except (KeyError, AttributeError): + return f"GID{gid}" + + def generate_docker_permission_report(self) -> str: + """ + Generate detailed Docker permission report. + Returns: + Formatted report string + """ + report_lines = ["🐳 DOCKER PERMISSION AUDIT REPORT", "=" * 50] + + # Container info + if self.container_info["is_container"]: + report_lines.append(f"Container Runtime: {self.container_info['container_runtime']}") + report_lines.append( + f"Host UID/GID: {self.container_info['host_uid']}/{self.container_info['host_gid']}" + ) + else: + report_lines.append("Environment: Native (not in container)") + + report_lines.append("") + + # UID Mapping + report_lines.append("UID Mapping:") + for uid, name in sorted(self.container_info["uid_mapping"].items()): + report_lines.append(f" {uid:>6} → {name}") + + report_lines.append("") + + # GID Mapping + report_lines.append("GID Mapping:") + for gid, name in sorted(self.container_info["gid_mapping"].items()): + report_lines.append(f" {gid:>6} → {name}") + + report_lines.append("") + report_lines.append("Common Docker Permission Issues:") + report_lines.append(" 1. Bind mounts with wrong UID/GID") + report_lines.append(" 2. Container running as root when not needed") + report_lines.append(" 3. World-writable files in volumes") + report_lines.append(" 4. /var/run/docker.sock with wrong permissions") + + return "\n".join(report_lines) + + +# Convenience function +def detect_docker_uid_mapping() -> dict: + """Detect Docker UID mapping for current environment.""" + handler = DockerPermissionHandler() + return handler.container_info diff --git a/demo.cast b/demo.cast new file mode 100644 index 00000000..8621d8b5 --- /dev/null +++ b/demo.cast @@ -0,0 +1,612 @@ +{"version": 2, "width": 156, "height": 41, "timestamp": 1767769245, "env": {"SHELL": "/bin/bash", "TERM": "xterm-256color"}} +[0.199545, "o", "\u001b[?2004h\u001b]0;altysha@localhost: ~/cortex-clean/cortex\u0007\u001b[01;32maltysha@localhost\u001b[00m:\u001b[01;34m~/cortex-clean/cortex\u001b[00m$ "] +[7.450933, "o", "p"] +[7.796106, "o", "y"] +[8.038916, "o", "r"] +[8.298127, "o", "e"] +[8.63561, "o", "\b\u001b[K"] +[8.811395, "o", "\b\u001b[K"] +[9.09816, "o", "t"] +[9.245553, "o", "e"] +[9.459232, "o", "s"] +[9.623778, "o", "t"] +[9.923951, "o", " "] +[12.202445, "o", "t"] +[12.280589, "o", "e"] +[12.457996, "o", "s"] +[12.578705, "o", "t"] +[12.758728, "o", "s"] +[13.927382, "o", "."] +[14.343257, "o", "\b\u001b[K"] +[14.645124, "o", "/"] +[16.896875, "o", "e"] +[17.258908, "o", "\b\u001b[K"] +[17.641248, "o", "p"] +[17.754013, "o", "e"] +[17.977385, "o", "r"] +[18.30477, "o", "m"] +[18.536814, "o", "i"] +[18.587898, "o", "s"] +[18.781061, "o", "s"] +[18.945464, "o", "i"] +[19.151548, "o", "o"] +[19.37017, "o", "n"] +[19.529637, "o", "s"] +[21.033223, "o", "/"] +[21.647794, "o", "t"] +[21.801958, "o", "e"] +[22.070961, "o", "s"] +[22.26144, "o", "t"] +[24.901153, "o", "_"] +[25.306212, "o", "a"] +[25.597026, "o", "u"] +[26.028224, "o", "d"] +[26.293312, "o", "i"] +[26.788061, "o", "r"] +[27.228771, "o", "\b\u001b[K"] +[27.368115, "o", "t"] +[27.540429, "o", "o"] +[27.721394, "o", "r"] +[29.59201, "o", "_"] +[30.239211, "o", "f"] +[30.497694, "o", "o"] +[30.912236, "o", "\b\u001b[K"] +[31.167575, "o", "i"] +[31.248203, "o", "c"] +[31.649822, "o", "e"] +[31.912565, "o", "\b\u001b[K"] +[32.080249, "o", "\b\u001b[K"] +[32.144405, "o", "x"] +[32.439129, "o", "e"] +[32.711989, "o", "r"] +[34.563742, "o", "."] +[34.97013, "o", "p"] +[35.382364, "o", "u"] +[35.917982, "o", "\b\u001b[K"] +[36.23682, "o", "y"] +[37.75337, "o", " "] +[38.009124, "o", "="] +[38.787186, "o", "\b\u001b[K"] +[39.130984, "o", "0"] +[39.622153, "o", "\b\u001b[K"] +[40.056482, "o", "-"] +[40.423567, "o", "v"] +[41.053254, "o", "\r\n\u001b[?2004l\r"] +[41.640419, "o", "\u001b[1m=================================================================== test session starts ====================================================================\u001b[0m\r\nplatform linux -- Python 3.12.3, pytest-9.0.2, pluggy-1.6.0 -- /home/altysha/cortex-clean/cortex/venv/bin/python\r\n"] +[41.650562, "o", "cachedir: .pytest_cache\r\nrootdir: /home/altysha/cortex-clean/cortex\r\nconfigfile: pyproject.toml\r\nplugins: anyio-4.12.1, cov-7.0.0, mock-3.15.1, timeout-2.4.0\r\n\u001b[1mcollecting ... \u001b[0m"] +[41.954463, "o", "\u001b[1m\rcollected 31 items \u001b[0m\r\n"] +[41.956901, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_auditor_creation "] +[41.960204, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 3%]\u001b[0m"] +[41.961215, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_scan_directory_returns_dict "] +[41.971608, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 6%]\u001b[0m"] +[41.972253, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_detect_world_writable_file "] +[41.979541, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 9%]\u001b[0m"] +[41.980395, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_ignore_safe_permissions "] +[41.986628, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 12%]\u001b[0m"] +[41.987409, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_suggest_fix_method "] +[41.99109, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 16%]\u001b[0m"] +[41.992189, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestDockerHandler::test_docker_handler_creation "] +[41.994696, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 19%]\u001b[0m"] +[41.99558, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestDockerHandler::test_detect_container_environment "] +[41.997191, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 22%]\u001b[0m"] +[41.99809, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestDockerHandler::test_uid_to_name_conversion "] +[42.007961, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 25%]\u001b[0m"] +[42.009431, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_auditor_with_verbose "] +[42.011358, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 29%]\u001b[0m"] +[42.01241, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_auditor_with_custom_dry_run "] +[42.01372, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 32%]\u001b[0m"] +[42.014572, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_nonexistent "] +[42.017569, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 35%]\u001b[0m"] +[42.018447, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_file_instead_of_dir "] +[42.024187, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 38%]\u001b[0m"] +[42.025354, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_nonexistent_file "] +[42.027663, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 41%]\u001b[0m"] +[42.028617, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_nonexistent "] +[42.030551, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 45%]\u001b[0m"] +[42.031154, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_and_fix_with_dry_run_override "] +[42.036262, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 48%]\u001b[0m"] +[42.037108, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_dry_run "] +[42.042288, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 51%]\u001b[0m"] +[42.043433, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_actual "] +[42.049771, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 54%]\u001b[0m"] +[42.050431, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_and_fix_report "] +[42.057515, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 58%]\u001b[0m"] +[42.058518, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_for_executable "] +[42.064353, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 61%]\u001b[0m"] +[42.064965, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_for_data_file "] +[42.069503, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 64%]\u001b[0m"] +[42.070128, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_config_module "] +[42.072428, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 67%]\u001b[0m"] +[42.073086, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_docker_handler_module "] +[42.074069, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 70%]\u001b[0m"] +[42.07464, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_cli_integration "] +[42.075924, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 74%]\u001b[0m"] +[42.076937, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_permission_fixer_alias "] +[42.078423, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 77%]\u001b[0m"] +[42.079227, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_path_function "] +[45.948327, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 80%]\u001b[0m"] +[45.949136, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_analyze_permissions_function "] +[49.984388, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 83%]\u001b[0m"] +[49.98561, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_pytest_works "] +[49.987312, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 87%]\u001b[0m"] +[49.988162, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_permission_error "] +[49.994326, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 90%]\u001b[0m"] +[49.995123, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_file_access_error "] +[49.999979, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 93%]\u001b[0m"] +[50.000698, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_os_error "] +[50.006819, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 96%]\u001b[0m"] +[50.00749, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_with_shebang "] +[50.011909, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [100%]\u001b[0m"] +[50.025974, "o", "\r\n\r\n\u001b[33m===================================================================== warnings summary =====================================================================\u001b[0m\r\n"] +[50.026636, "o", "venv/lib/python3.12/site-packages/_pytest/config/__init__.py:1428\r\n /home/altysha/cortex-clean/cortex/venv/lib/python3.12/site-packages/_pytest/config/__init__.py:1428: PytestConfigWarning: Unknown config option: asyncio_mode\r\n \r\n self._warn_or_fail_if_strict(f\"Unknown config option: {key}\\n\")\r\n\r\n-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html\r\n\u001b[33m============================================================== \u001b[32m31 passed\u001b[0m, \u001b[33m\u001b[1m1 warning\u001b[0m\u001b[33m in 8.39s\u001b[0m\u001b[33m ===============================================================\u001b[0m\r\n"] +[50.179324, "o", "\u001b[?2004h\u001b]0;altysha@localhost: ~/cortex-clean/cortex\u0007\u001b[01;32maltysha@localhost\u001b[00m:\u001b[01;34m~/cortex-clean/cortex\u001b[00m$ "] +[53.319329, "o", "p"] +[53.715011, "o", "y"] +[54.605573, "o", "t"] +[54.717201, "o", "e"] +[54.957791, "o", "s"] +[55.07596, "o", "t"] +[55.363362, "o", " "] +[58.351633, "o", "y"] +[58.53763, "o", "e"] +[59.457185, "o", "\b\u001b[K"] +[59.643276, "o", "\b\u001b[K"] +[59.998046, "o", "t"] +[60.101824, "o", "e"] +[60.292995, "o", "s"] +[60.377611, "o", "t"] +[60.517876, "o", "s"] +[60.611045, "o", "t"] +[61.029554, "o", "\b\u001b[K"] +[61.61882, "o", "/"] +[62.144895, "o", " "] +[62.863508, "o", "\b\u001b[K"] +[63.221185, "o", "p"] +[63.406267, "o", "e"] +[63.657281, "o", "r"] +[63.972162, "o", "m"] +[64.230954, "o", "i"] +[64.354995, "o", "s"] +[64.550947, "o", "s"] +[64.639862, "o", "i"] +[64.818942, "o", "o"] +[65.023216, "o", "n"] +[65.138889, "o", "s"] +[67.492722, "o", "/"] +[67.952445, "o", "t"] +[68.107977, "o", "e"] +[68.298877, "o", "s"] +[68.400263, "o", "t"] +[71.020751, "o", "_"] +[71.309466, "o", "d"] +[71.497812, "o", "o"] +[71.66092, "o", "c"] +[71.91197, "o", "k"] +[72.06455, "o", "e"] +[72.329998, "o", "r"] +[72.924037, "o", "_"] +[75.336334, "o", "h"] +[75.409427, "o", "a"] +[75.613741, "o", "n"] +[75.821631, "o", "d"] +[76.184427, "o", "l"] +[76.30127, "o", "e"] +[76.540704, "o", "r"] +[78.419769, "o", "."] +[79.024931, "o", "p"] +[79.491773, "o", "y"] +[81.293609, "o", " "] +[81.543783, "o", "="] +[82.147412, "o", "\b\u001b[K"] +[82.477272, "o", "-"] +[83.201428, "o", "v"] +[84.783754, "o", "\r\n\u001b[?2004l\r"] +[85.458877, "o", "\u001b[1m=================================================================== test session starts ====================================================================\u001b[0m\r\nplatform linux -- Python 3.12.3, pytest-9.0.2, pluggy-1.6.0 -- /home/altysha/cortex-clean/cortex/venv/bin/python\r\n"] +[85.473733, "o", "cachedir: .pytest_cache\r\nrootdir: /home/altysha/cortex-clean/cortex\r\nconfigfile: pyproject.toml\r\nplugins: anyio-4.12.1, cov-7.0.0, mock-3.15.1, timeout-2.4.0\r\n\u001b[1mcollecting ... \u001b[0m"] +[85.829796, "o", "\u001b[1m\rcollected 17 items \u001b[0m\r\n"] +[85.831264, "o", "\r\ntests/permissions/test_docker_handler.py::test_docker_handler_import "] +[85.834627, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 5%]\u001b[0m"] +[85.835614, "o", "\r\ntests/permissions/test_docker_handler.py::test_docker_handler_creation "] +[85.837863, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 11%]\u001b[0m"] +[85.839042, "o", "\r\ntests/permissions/test_docker_handler.py::test_container_info_structure "] +[85.840994, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 17%]\u001b[0m"] +[85.842187, "o", "\r\ntests/permissions/test_docker_handler.py::test_detect_docker_uid_mapping "] +[85.843738, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 23%]\u001b[0m"] +[85.844636, "o", "\r\ntests/permissions/test_docker_handler.py::test_adjust_issue_for_container "] +[85.846432, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 29%]\u001b[0m"] +[85.847364, "o", "\r\ntests/permissions/test_docker_handler.py::test_get_container_specific_fix "] +[85.84891, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 35%]\u001b[0m"] +[85.849684, "o", "\r\ntests/permissions/test_docker_handler.py::test_uid_to_name_method "] +[85.86128, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 41%]\u001b[0m"] +[85.862074, "o", "\r\ntests/permissions/test_docker_handler.py::test_gid_to_name_method "] +[85.869162, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 47%]\u001b[0m"] +[85.869995, "o", "\r\ntests/permissions/test_docker_handler.py::test_generate_docker_permission_report "] +[85.872063, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 52%]\u001b[0m"] +[85.872844, "o", "\r\ntests/permissions/test_docker_handler.py::test_fix_docker_bind_mount_permissions_dry_run "] +[85.88125, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 58%]\u001b[0m"] +[85.881826, "o", "\r\ntests/permissions/test_docker_handler.py::test_fix_docker_bind_mount_permissions_nonexistent "] +[85.88495, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 64%]\u001b[0m"] +[85.885945, "o", "\r\ntests/permissions/test_docker_handler.py::test_discover_uid_mapping "] +[85.893384, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 70%]\u001b[0m"] +[85.89432, "o", "\r\ntests/permissions/test_docker_handler.py::test_discover_gid_mapping "] +[85.897627, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 76%]\u001b[0m"] +[85.89832, "o", "\r\ntests/permissions/test_docker_handler.py::test_generate_docker_permission_report_with_path "] +[85.903128, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 82%]\u001b[0m"] +[85.903972, "o", "\r\ntests/permissions/test_docker_handler.py::test_container_info_types "] +[85.905901, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 88%]\u001b[0m"] +[85.906618, "o", "\r\ntests/permissions/test_docker_handler.py::test_adjust_issue_with_stat "] +[85.912058, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 94%]\u001b[0m"] +[85.912786, "o", "\r\ntests/permissions/test_docker_handler.py::test_get_container_specific_fix_with_docker_path "] +[85.914274, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [100%]\u001b[0m"] +[85.919602, "o", "\r\n\r\n\u001b[33m===================================================================== warnings summary =====================================================================\u001b[0m\r\nvenv/lib/python3.12/site-packages/_pytest/config/__init__.py:1428\r\n /home/altysha/cortex-clean/cortex/venv/lib/python3.12/site-packages/_pytest/config/__init__.py:1428: PytestConfigWarning: Unknown config option: asyncio_mode\r\n \r\n self._warn_or_fail_if_strict(f\"Unknown config option: {key}\\n\")\r\n\r\n-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html\r\n\u001b[33m============================================================== \u001b[32m17 passed\u001b[0m, \u001b[33m\u001b[1m1 warning\u001b[0m\u001b[33m in 0.46s\u001b[0m\u001b[33m ===============================================================\u001b[0m\r\n"] +[86.049491, "o", "\u001b[?2004h\u001b]0;altysha@localhost: ~/cortex-clean/cortex\u0007\u001b[01;32maltysha@localhost\u001b[00m:\u001b[01;34m~/cortex-clean/cortex\u001b[00m$ "] +[89.075271, "o", "p"] +[89.99766, "o", "y"] +[90.239234, "o", "t"] +[90.329509, "o", "e"] +[90.558033, "o", "s"] +[90.671849, "o", "t"] +[91.984512, "o", " "] +[92.681205, "o", "="] +[92.875263, "o", "="] +[93.216672, "o", "\b\u001b[K"] +[93.39074, "o", "\b\u001b[K"] +[93.782847, "o", "-"] +[93.957165, "o", "-"] +[94.110876, "o", "c"] +[94.289333, "o", "o"] +[94.455834, "o", "v"] +[96.00301, "o", "="] +[97.035469, "o", "c"] +[97.199934, "o", "o"] +[97.640154, "o", "r"] +[98.080651, "o", "t"] +[98.415307, "o", "e"] +[98.679464, "o", "x"] +[100.007951, "o", "."] +[100.443255, "o", "p"] +[100.600185, "o", "e"] +[100.812509, "o", "r"] +[101.141062, "o", "k"] +[101.568774, "o", "\b\u001b[K"] +[101.840663, "o", "k"] +[102.278796, "o", "\b\u001b[K"] +[102.541806, "o", "m"] +[102.787238, "o", "i"] +[102.95931, "o", "s"] +[103.182329, "o", "s"] +[103.341364, "o", "i"] +[103.545813, "o", "o"] +[103.774966, "o", "n"] +[103.909314, "o", "s"] +[106.697607, "o", " "] +[106.999349, "o", "="] +[107.265302, "o", "="] +[107.607113, "o", "\b\u001b[K"] +[107.773039, "o", "\b\u001b[K"] +[108.181264, "o", "0"] +[108.341646, "o", "0"] +[108.859612, "o", "\b\u001b[K"] +[109.066626, "o", "\b\u001b[K"] +[109.599748, "o", "-"] +[109.743692, "o", "-"] +[111.745143, "o", " "] +[112.251738, "o", "\b\u001b[K"] +[112.654849, "o", "c"] +[112.89844, "o", "o"] +[113.235728, "o", "v"] +[115.644908, "o", "-"] +[116.116532, "o", "r"] +[116.346356, "o", "e"] +[116.597637, "o", "p"] +[116.854651, "o", "o"] +[117.05864, "o", "r"] +[117.411249, "o", "t"] +[120.564296, "o", "-"] +[121.044042, "o", "r"] +[121.611718, "o", "\b\u001b[K"] +[121.751675, "o", "t"] +[123.155569, "o", "\b\u001b[K"] +[123.654326, "o", "\b\u001b[K"] +[124.36148, "o", "="] +[124.841256, "o", "t"] +[125.11247, "o", "e"] +[125.377284, "o", "r"] +[125.563929, "o", "m"] +[127.306367, "o", "-"] +[127.713806, "o", "m"] +[127.97657, "o", "i"] +[128.15508, "o", "s"] +[128.351149, "o", "s"] +[128.481644, "o", "i"] +[128.681899, "o", "o"] +[128.905741, "o", "n"] +[129.099355, "o", "g"] +[129.50919, "o", "\b\u001b[K"] +[129.675487, "o", "\b\u001b[K"] +[129.836517, "o", "\b\u001b[K"] +[130.070407, "o", "n"] +[130.180731, "o", "g"] +[134.876229, "o", " "] +[135.653079, "o", "t"] +[135.793973, "o", "e"] +[136.321453, "o", "s"] +[136.574761, "o", "t"] +[137.030261, "o", "s"] +[138.402132, "o", "/"] +[139.031255, "o", "["] +[139.213264, "o", "e"] +[139.838295, "o", "\b\u001b[K"] +[140.009503, "o", "\b\u001b[K"] +[140.228968, "o", "p"] +[140.367977, "o", "e"] +[140.699367, "o", "r"] +[141.207878, "o", "m"] +[141.496299, "o", "i"] +[141.705911, "o", "s"] +[141.885699, "o", "s"] +[141.971744, "o", "i"] +[142.163434, "o", "o"] +[142.366625, "o", "n"] +[142.505109, "o", "s"] +[144.699162, "o", "\r\n\u001b[?2004l\r"] +[145.549973, "o", "\u001b[1m=================================================================== test session starts ====================================================================\u001b[0m\r\nplatform linux -- Python 3.12.3, pytest-9.0.2, pluggy-1.6.0 -- /home/altysha/cortex-clean/cortex/venv/bin/python\r\n"] +[145.567715, "o", "cachedir: .pytest_cache\r\nrootdir: /home/altysha/cortex-clean/cortex\r\nconfigfile: pyproject.toml\r\nplugins: anyio-4.12.1, cov-7.0.0, mock-3.15.1, timeout-2.4.0\r\n\u001b[1mcollecting ... \u001b[0m"] +[146.788168, "o", "\u001b[1m\rcollecting 5 items \u001b[0m"] +[146.805364, "o", "\u001b[1m\rcollected 54 items \u001b[0m\r\n"] +[146.808496, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_auditor_creation "] +[146.814528, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 1%]\u001b[0m"] +[146.815419, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_scan_directory_returns_dict "] +[146.827692, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 3%]\u001b[0m"] +[146.828488, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_detect_world_writable_file "] +[146.835024, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 5%]\u001b[0m"] +[146.836361, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_ignore_safe_permissions "] +[146.843742, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 7%]\u001b[0m"] +[146.844858, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_suggest_fix_method "] +[146.84851, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 9%]\u001b[0m"] +[146.84965, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestDockerHandler::test_docker_handler_creation "] +[146.851761, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 11%]\u001b[0m"] +[146.853226, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestDockerHandler::test_detect_container_environment "] +[146.856285, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 12%]\u001b[0m"] +[146.857598, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestDockerHandler::test_uid_to_name_conversion "] +[146.866282, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 14%]\u001b[0m"] +[146.867435, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_auditor_with_verbose "] +[146.86955, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 16%]\u001b[0m"] +[146.871179, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_auditor_with_custom_dry_run "] +[146.873533, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 18%]\u001b[0m"] +[146.874763, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_nonexistent "] +[146.878766, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 20%]\u001b[0m"] +[146.879535, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_file_instead_of_dir "] +[146.885397, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 22%]\u001b[0m"] +[146.886554, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_nonexistent_file "] +[146.889763, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 24%]\u001b[0m"] +[146.890952, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_nonexistent "] +[146.892846, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 25%]\u001b[0m"] +[146.893877, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_and_fix_with_dry_run_override "] +[146.899681, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 27%]\u001b[0m"] +[146.900863, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_dry_run "] +[146.908291, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 29%]\u001b[0m"] +[146.909339, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_actual "] +[146.914874, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 31%]\u001b[0m"] +[146.915901, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_and_fix_report "] +[146.924627, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 33%]\u001b[0m"] +[146.925693, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_for_executable "] +[146.931199, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 35%]\u001b[0m"] +[146.932408, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_for_data_file "] +[146.939836, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 37%]\u001b[0m"] +[146.941051, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_config_module "] +[146.942601, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 38%]\u001b[0m"] +[146.943607, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_docker_handler_module "] +[146.945157, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 40%]\u001b[0m"] +[146.946224, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_cli_integration "] +[146.947945, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 42%]\u001b[0m"] +[146.948926, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_permission_fixer_alias "] +[146.95034, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 44%]\u001b[0m"] +[146.95125, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_path_function "] +[151.446816, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 46%]\u001b[0m"] +[151.448179, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_analyze_permissions_function "] +[155.729298, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 48%]\u001b[0m"] +[155.730564, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_pytest_works "] +[155.732284, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 50%]\u001b[0m"] +[155.733231, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_permission_error "] +[155.746503, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 51%]\u001b[0m"] +[155.747679, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_file_access_error "] +[155.755901, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 53%]\u001b[0m"] +[155.757434, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_os_error "] +[155.765284, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 55%]\u001b[0m"] +[155.768278, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_with_shebang "] +[155.774883, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 57%]\u001b[0m"] +[155.775965, "o", "\r\ntests/permissions/test_config.py::test_dangerous_permissions "] +[155.777353, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 59%]\u001b[0m"] +[155.7783, "o", "\r\ntests/permissions/test_config.py::test_world_writable_flag "] +[155.779965, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 61%]\u001b[0m"] +[155.781263, "o", "\r\ntests/permissions/test_config.py::test_ignore_patterns "] +[155.784376, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 62%]\u001b[0m"] +[155.785858, "o", "\r\ntests/permissions/test_config.py::test_sensitive_files "] +[155.787642, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 64%]\u001b[0m"] +[155.788836, "o", "\r\ntests/permissions/test_config.py::test_recommended_permissions "] +[155.790316, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 66%]\u001b[0m"] +[155.791463, "o", "\r\ntests/permissions/test_config.py::test_docker_recommended_permissions "] +[155.793213, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 68%]\u001b[0m"] +[155.794592, "o", "\r\ntests/permissions/test_docker_handler.py::test_docker_handler_import "] +[155.796187, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 70%]\u001b[0m"] +[155.797327, "o", "\r\ntests/permissions/test_docker_handler.py::test_docker_handler_creation "] +[155.800709, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 72%]\u001b[0m"] +[155.801822, "o", "\r\ntests/permissions/test_docker_handler.py::test_container_info_structure "] +[155.80375, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 74%]\u001b[0m"] +[155.804737, "o", "\r\ntests/permissions/test_docker_handler.py::test_detect_docker_uid_mapping "] +[155.807145, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 75%]\u001b[0m"] +[155.808595, "o", "\r\ntests/permissions/test_docker_handler.py::test_adjust_issue_for_container "] +[155.811276, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 77%]\u001b[0m"] +[155.812298, "o", "\r\ntests/permissions/test_docker_handler.py::test_get_container_specific_fix "] +[155.814429, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 79%]\u001b[0m"] +[155.815967, "o", "\r\ntests/permissions/test_docker_handler.py::test_uid_to_name_method "] +[155.82361, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 81%]\u001b[0m"] +[155.824474, "o", "\r\ntests/permissions/test_docker_handler.py::test_gid_to_name_method "] +[155.835174, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 83%]\u001b[0m"] +[155.836201, "o", "\r\ntests/permissions/test_docker_handler.py::test_generate_docker_permission_report "] +[155.838763, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 85%]\u001b[0m"] +[155.839834, "o", "\r\ntests/permissions/test_docker_handler.py::test_fix_docker_bind_mount_permissions_dry_run "] +[155.846991, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 87%]\u001b[0m"] +[155.848696, "o", "\r\ntests/permissions/test_docker_handler.py::test_fix_docker_bind_mount_permissions_nonexistent "] +[155.85333, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 88%]\u001b[0m"] +[155.854494, "o", "\r\ntests/permissions/test_docker_handler.py::test_discover_uid_mapping "] +[155.862769, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 90%]\u001b[0m"] +[155.863544, "o", "\r\ntests/permissions/test_docker_handler.py::test_discover_gid_mapping "] +[155.869969, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 92%]\u001b[0m"] +[155.871054, "o", "\r\ntests/permissions/test_docker_handler.py::test_generate_docker_permission_report_with_path "] +[155.876538, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 94%]\u001b[0m"] +[155.877613, "o", "\r\ntests/permissions/test_docker_handler.py::test_container_info_types "] +[155.879606, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 96%]\u001b[0m"] +[155.88058, "o", "\r\ntests/permissions/test_docker_handler.py::test_adjust_issue_with_stat "] +[155.889161, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 98%]\u001b[0m"] +[155.890252, "o", "\r\ntests/permissions/test_docker_handler.py::test_get_container_specific_fix_with_docker_path "] +[155.892286, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [100%]\u001b[0m"] +[156.027104, "o", "\r\n\r\n\u001b[33m===================================================================== warnings summary =====================================================================\u001b[0m\r\n"] +[156.027698, "o", "venv/lib/python3.12/site-packages/_pytest/config/__init__.py:1428\r\n /home/altysha/cortex-clean/cortex/venv/lib/python3.12/site-packages/_pytest/config/__init__.py:1428: PytestConfigWarning: Unknown config option: asyncio_mode\r\n \r\n self._warn_or_fail_if_strict(f\"Unknown config option: {key}\\n\")\r\n\r\n-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html\r\n====================================================================== tests coverage ======================================================================\r\n_____________________________________________________ coverage: platform linux, python 3.12.3-final-0 ______________________________________________________\r\n\r\nName Stmts Miss Branch BrPart Cover Missing\r\n----------------------------------------------------------------------------------\r\ncortex/permissions/__init__.py 11 0 0 0 100%\r\ncortex/permissions/auditor_fixer.py 127 26 58 6 77% 78->63, 83, 126-127, 199, 208, 217, 221-248\r\ncortex/permissions/config.py 8 0 0 0 100%\r\ncortex/permissions/docker_handler.py 184 72 60 15 55% 19-23, 45->57, 50-51, 53-54, 58-59, 63-68, 72-73, 110-111, 122-126, 160-161, 167-173, 182-205, 212-230, 278, 289-290, 302-313, 316, 321-322, 346-347, 358, 365\r\n----------------------------------------------------------------------------------\r\nTOTAL 330 98 118 21 66%\r\n\u001b[32mRequired test coverage of 55.0% reached. Total coverage: 65.85%\r\n\u001b[0m\u001b[33m============================================================== \u001b[32m54 passed\u001b[0m, \u001b[33m\u001b[1m1 warning\u001b[0m\u001b[33m in 10.48s\u001b[0m\u001b[33m ==============================================================\u001b[0m\r\n"] +[156.232563, "o", "\u001b[?2004h\u001b]0;altysha@localhost: ~/cortex-clean/cortex\u0007\u001b[01;32maltysha@localhost\u001b[00m:\u001b[01;34m~/cortex-clean/cortex\u001b[00m$ "] +[159.121447, "o", "p"] +[159.455907, "o", "y"] +[159.838074, "o", "t"] +[159.98767, "o", "e"] +[160.216019, "o", "s"] +[160.313199, "o", "t"] +[160.570323, "o", " "] +[162.196549, "o", "t"] +[162.2768, "o", "e"] +[162.477684, "o", "s"] +[162.57474, "o", "t"] +[162.68684, "o", "s"] +[164.333592, "o", "/"] +[167.492585, "o", "p"] +[167.588427, "o", "e"] +[167.815573, "o", "r"] +[168.024734, "o", "m"] +[168.244697, "o", "i"] +[168.328516, "o", "s"] +[168.531095, "o", "s"] +[168.671683, "o", "i"] +[168.887429, "o", "o"] +[169.126725, "o", "n"] +[169.27572, "o", "s"] +[171.095869, "o", "/"] +[171.659714, "o", " "] +[172.071004, "o", "-"] +[172.375799, "o", "v"] +[172.889844, "o", "\r\n\u001b[?2004l\r"] +[173.595799, "o", "\u001b[1m=================================================================== test session starts ====================================================================\u001b[0m\r\n"] +[173.596321, "o", "platform linux -- Python 3.12.3, pytest-9.0.2, pluggy-1.6.0 -- /home/altysha/cortex-clean/cortex/venv/bin/python\r\n"] +[173.60749, "o", "cachedir: .pytest_cache\r\nrootdir: /home/altysha/cortex-clean/cortex\r\nconfigfile: pyproject.toml\r\nplugins: anyio-4.12.1, cov-7.0.0, mock-3.15.1, timeout-2.4.0\r\n\u001b[1mcollecting ... \u001b[0m"] +[173.961518, "o", "\u001b[1m\rcollected 54 items \u001b[0m\r\n"] +[173.962973, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_auditor_creation "] +[173.966105, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 1%]\u001b[0m"] +[173.96735, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_scan_directory_returns_dict "] +[173.976114, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 3%]\u001b[0m"] +[173.976775, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_detect_world_writable_file "] +[173.983673, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 5%]\u001b[0m"] +[173.984494, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_ignore_safe_permissions "] +[173.992143, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 7%]\u001b[0m"] +[173.992922, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestPermissionAuditorBasic::test_suggest_fix_method "] +[173.996372, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 9%]\u001b[0m"] +[173.997267, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestDockerHandler::test_docker_handler_creation "] +[173.999106, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 11%]\u001b[0m"] +[174.000085, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestDockerHandler::test_detect_container_environment "] +[174.001827, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 12%]\u001b[0m"] +[174.002707, "o", "\r\ntests/permissions/test_auditor_fixer.py::TestDockerHandler::test_uid_to_name_conversion "] +[174.013224, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 14%]\u001b[0m"] +[174.014021, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_auditor_with_verbose "] +[174.015353, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 16%]\u001b[0m"] +[174.016246, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_auditor_with_custom_dry_run "] +[174.017478, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 18%]\u001b[0m"] +[174.018209, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_nonexistent "] +[174.02145, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 20%]\u001b[0m"] +[174.022449, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_file_instead_of_dir "] +[174.02938, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 22%]\u001b[0m"] +[174.030253, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_nonexistent_file "] +[174.031909, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 24%]\u001b[0m"] +[174.032736, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_nonexistent "] +[174.034305, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 25%]\u001b[0m"] +[174.035064, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_and_fix_with_dry_run_override "] +[174.041943, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 27%]\u001b[0m"] +[174.042794, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_dry_run "] +[174.048242, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 29%]\u001b[0m"] +[174.048911, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_actual "] +[174.054546, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 31%]\u001b[0m"] +[174.05579, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_and_fix_report "] +[174.064213, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 33%]\u001b[0m"] +[174.064992, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_for_executable "] +[174.070001, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 35%]\u001b[0m"] +[174.071042, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_for_data_file "] +[174.07704, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 37%]\u001b[0m"] +[174.077703, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_config_module "] +[174.078733, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 38%]\u001b[0m"] +[174.079427, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_docker_handler_module "] +[174.080469, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 40%]\u001b[0m"] +[174.081139, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_cli_integration "] +[174.082202, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 42%]\u001b[0m"] +[174.082989, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_permission_fixer_alias "] +[174.083945, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 44%]\u001b[0m"] +[174.084613, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_path_function "] +[178.448756, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 46%]\u001b[0m"] +[178.449629, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_analyze_permissions_function "] +[182.575815, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 48%]\u001b[0m"] +[182.576889, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_pytest_works "] +[182.578921, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 50%]\u001b[0m"] +[182.579822, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_permission_error "] +[182.586038, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 51%]\u001b[0m"] +[182.587329, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_scan_directory_file_access_error "] +[182.591803, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 53%]\u001b[0m"] +[182.592665, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_fix_permissions_os_error "] +[182.598351, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 55%]\u001b[0m"] +[182.599203, "o", "\r\ntests/permissions/test_auditor_fixer.py::test_suggest_fix_with_shebang "] +[182.605342, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 57%]\u001b[0m"] +[182.606396, "o", "\r\ntests/permissions/test_config.py::test_dangerous_permissions "] +[182.607652, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 59%]\u001b[0m"] +[182.608535, "o", "\r\ntests/permissions/test_config.py::test_world_writable_flag "] +[182.609725, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 61%]\u001b[0m"] +[182.610403, "o", "\r\ntests/permissions/test_config.py::test_ignore_patterns "] +[182.611534, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 62%]\u001b[0m"] +[182.612218, "o", "\r\ntests/permissions/test_config.py::test_sensitive_files "] +[182.613113, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 64%]\u001b[0m"] +[182.613839, "o", "\r\ntests/permissions/test_config.py::test_recommended_permissions "] +[182.614828, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 66%]\u001b[0m"] +[182.615499, "o", "\r\ntests/permissions/test_config.py::test_docker_recommended_permissions "] +[182.616536, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 68%]\u001b[0m"] +[182.617706, "o", "\r\ntests/permissions/test_docker_handler.py::test_docker_handler_import "] +[182.619397, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 70%]\u001b[0m"] +[182.620258, "o", "\r\ntests/permissions/test_docker_handler.py::test_docker_handler_creation "] +[182.621801, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 72%]\u001b[0m"] +[182.622466, "o", "\r\ntests/permissions/test_docker_handler.py::test_container_info_structure "] +[182.624142, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 74%]\u001b[0m"] +[182.624917, "o", "\r\ntests/permissions/test_docker_handler.py::test_detect_docker_uid_mapping "] +[182.626512, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 75%]\u001b[0m"] +[182.62714, "o", "\r\ntests/permissions/test_docker_handler.py::test_adjust_issue_for_container "] +[182.628555, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 77%]\u001b[0m"] +[182.629173, "o", "\r\ntests/permissions/test_docker_handler.py::test_get_container_specific_fix "] +[182.630464, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 79%]\u001b[0m"] +[182.631277, "o", "\r\ntests/permissions/test_docker_handler.py::test_uid_to_name_method "] +[182.63871, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 81%]\u001b[0m"] +[182.63936, "o", "\r\ntests/permissions/test_docker_handler.py::test_gid_to_name_method "] +[182.64554, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 83%]\u001b[0m"] +[182.646578, "o", "\r\ntests/permissions/test_docker_handler.py::test_generate_docker_permission_report "] +[182.648698, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 85%]\u001b[0m"] +[182.649478, "o", "\r\ntests/permissions/test_docker_handler.py::test_fix_docker_bind_mount_permissions_dry_run "] +[182.656669, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 87%]\u001b[0m"] +[182.657599, "o", "\r\ntests/permissions/test_docker_handler.py::test_fix_docker_bind_mount_permissions_nonexistent "] +[182.660798, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 88%]\u001b[0m"] +[182.66174, "o", "\r\ntests/permissions/test_docker_handler.py::test_discover_uid_mapping "] +[182.669678, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 90%]\u001b[0m"] +[182.670692, "o", "\r\ntests/permissions/test_docker_handler.py::test_discover_gid_mapping "] +[182.674084, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 92%]\u001b[0m"] +[182.674878, "o", "\r\ntests/permissions/test_docker_handler.py::test_generate_docker_permission_report_with_path "] +[182.67964, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 94%]\u001b[0m"] +[182.680507, "o", "\r\ntests/permissions/test_docker_handler.py::test_container_info_types "] +[182.682289, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 96%]\u001b[0m"] +[182.683086, "o", "\r\ntests/permissions/test_docker_handler.py::test_adjust_issue_with_stat "] +[182.689656, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [ 98%]\u001b[0m"] +[182.6905, "o", "\r\ntests/permissions/test_docker_handler.py::test_get_container_specific_fix_with_docker_path "] +[182.692197, "o", "\u001b[32mPASSED\u001b[0m\u001b[33m [100%]\u001b[0m"] +[182.708594, "o", "\r\n\r\n\u001b[33m===================================================================== warnings summary =====================================================================\u001b[0m\r\n"] +[182.709236, "o", "venv/lib/python3.12/site-packages/_pytest/config/__init__.py:1428\r\n /home/altysha/cortex-clean/cortex/venv/lib/python3.12/site-packages/_pytest/config/__init__.py:1428: PytestConfigWarning: Unknown config option: asyncio_mode\r\n \r\n self._warn_or_fail_if_strict(f\"Unknown config option: {key}\\n\")\r\n\r\n-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html\r\n\u001b[33m============================================================== \u001b[32m54 passed\u001b[0m, \u001b[33m\u001b[1m1 warning\u001b[0m\u001b[33m in 9.11s\u001b[0m\u001b[33m ===============================================================\u001b[0m\r\n"] +[182.869031, "o", "\u001b[?2004h\u001b]0;altysha@localhost: ~/cortex-clean/cortex\u0007\u001b[01;32maltysha@localhost\u001b[00m:\u001b[01;34m~/cortex-clean/cortex\u001b[00m$ "] +[185.594644, "o", "\u001b[?2004l\r\r\nexit\r\n"] diff --git a/demo.html b/demo.html new file mode 100644 index 00000000..e69de29b diff --git a/test_777.txt b/test_777.txt new file mode 100755 index 00000000..e69de29b diff --git a/tests/permissions/__init__.py b/tests/permissions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/permissions/test_auditor_fixer.py b/tests/permissions/test_auditor_fixer.py new file mode 100644 index 00000000..2b5d04c9 --- /dev/null +++ b/tests/permissions/test_auditor_fixer.py @@ -0,0 +1,424 @@ +""" +Tests for Permission Auditor & Fixer module. +""" + +import os +import stat +import tempfile +from pathlib import Path + +import pytest + +from cortex.permissions import PermissionAuditor, PermissionManager + + +class TestPermissionAuditorBasic: + """Basic functionality tests for PermissionAuditor""" + + def test_auditor_creation(self): + """Test that PermissionAuditor can be instantiated""" + auditor = PermissionAuditor() + assert hasattr(auditor, "scan_directory") + assert hasattr(auditor, "suggest_fix") + + def test_scan_directory_returns_dict(self, tmp_path): + """Test that scan_directory returns proper dictionary structure""" + auditor = PermissionAuditor() + result = auditor.scan_directory(tmp_path) + + assert isinstance(result, dict) + expected_keys = ["world_writable", "dangerous", "suggestions"] + for key in expected_keys: + assert key in result, f"Missing key '{key}' in result" + assert isinstance(result[key], list), f"Key '{key}' should be a list" + + def test_detect_world_writable_file(self, tmp_path): + """Test detection of world-writable files (777 permissions)""" + unsafe_file = tmp_path / "test_777.txt" + unsafe_file.write_text("dangerous content") + unsafe_file.chmod(0o777) + + auditor = PermissionAuditor() + result = auditor.scan_directory(tmp_path) + + assert len(result["world_writable"]) > 0 + + found_files = [str(p) for p in result["world_writable"]] + assert str(unsafe_file) in found_files + + def test_ignore_safe_permissions(self, tmp_path): + """Test that files with safe permissions are not flagged""" + safe_file = tmp_path / "safe_644.txt" + safe_file.write_text("safe content") + safe_file.chmod(0o644) + + auditor = PermissionAuditor() + result = auditor.scan_directory(tmp_path) + + assert str(safe_file) not in result["world_writable"] + + def test_suggest_fix_method(self, tmp_path): + """Test that suggest_fix method works""" + auditor = PermissionAuditor() + + assert hasattr(auditor, "suggest_fix") + + test_file = tmp_path / "test_suggest.txt" + test_file.write_text("test") + test_file.chmod(0o777) + + suggestion = auditor.suggest_fix(str(test_file), "777") + assert isinstance(suggestion, str) + assert "chmod" in suggestion + + +class TestDockerHandler: + """Tests for DockerPermissionHandler""" + + def test_docker_handler_creation(self): + """Test DockerPermissionHandler can be instantiated""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler() + assert hasattr(handler, "container_info") + + def test_detect_container_environment(self): + """Test container detection""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler(verbose=False) + info = handler.container_info + + assert "is_container" in info + assert "host_uid" in info + assert "host_gid" in info + assert isinstance(info["is_container"], bool) + + def test_uid_to_name_conversion(self): + """Test UID to name conversion (simplified)""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler() + + # Test with known UID 0 (root) + result = handler._uid_to_name(0) + assert "root" in result or "UID0" in result + + # Test with high UID (likely doesn't exist) + result = handler._uid_to_name(99999) + assert "UID99999" in result + + +def test_auditor_with_verbose(): + """Test auditor with verbose mode""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor(verbose=True) + assert auditor.verbose is True + assert auditor.dry_run is True # default + + +def test_auditor_with_custom_dry_run(): + """Test auditor with custom dry_run setting""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor(dry_run=False) + assert auditor.dry_run is False + + +def test_scan_directory_nonexistent(): + """Test scanning non-existent directory""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + result = auditor.scan_directory("/nonexistent/path/12345") + + assert isinstance(result, dict) + assert result["world_writable"] == [] + assert result["dangerous"] == [] + assert result["suggestions"] == [] + + +def test_scan_directory_file_instead_of_dir(tmp_path): + """Test scanning a file instead of directory""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + test_file = tmp_path / "test.txt" + test_file.write_text("test") + + result = auditor.scan_directory(test_file) + + assert isinstance(result, dict) + assert result["world_writable"] == [] + assert result["dangerous"] == [] + assert result["suggestions"] == [] + + +def test_suggest_fix_nonexistent_file(): + """Test suggest_fix for non-existent file""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + suggestion = auditor.suggest_fix("/nonexistent/file.txt") + + assert "doesn't exist" in suggestion + assert "# File" in suggestion + + +def test_fix_permissions_nonexistent(): + """Test fix_permissions for non-existent file""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + result = auditor.fix_permissions("/nonexistent/file.txt", "644") + + assert "does not exist" in result + assert "File" in result + + +def test_scan_and_fix_with_dry_run_override(tmp_path): + """Test scan_and_fix with dry_run parameter override""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + # Create auditor with dry_run=True by default + auditor = PermissionAuditor(dry_run=True) + + # Test with explicit dry_run=False (should override) + result = auditor.scan_and_fix(str(tmp_path), apply_fixes=False, dry_run=False) + assert "issues_found" in result + + # Test with explicit dry_run=True + result = auditor.scan_and_fix(str(tmp_path), apply_fixes=False, dry_run=True) + assert "issues_found" in result + + # Test with None (should use instance default) + result = auditor.scan_and_fix(str(tmp_path), apply_fixes=False, dry_run=None) + assert "issues_found" in result + + +def test_fix_permissions_dry_run(tmp_path): + """Test fix_permissions in dry-run mode""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + test_file = tmp_path / "test_fix.txt" + test_file.write_text("test") + test_file.chmod(0o777) + + result = auditor.fix_permissions(test_file, "644", dry_run=True) + assert "[DRY RUN]" in result + assert "777" in result or "644" in result + + +def test_fix_permissions_actual(tmp_path): + """Test actual permission fixing""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + test_file = tmp_path / "test_fix_actual.txt" + test_file.write_text("test") + test_file.chmod(0o777) + + result = auditor.fix_permissions(test_file, "644", dry_run=False) + assert "Changed" in result + assert "644" in result + + # Verify permissions actually changed + mode = test_file.stat().st_mode & 0o777 + assert oct(mode) != "0o777" + + +def test_scan_and_fix_report(tmp_path): + """Test scan_and_fix generates proper report""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + + # Create files with different permissions + safe_file = tmp_path / "safe.txt" + safe_file.write_text("safe") + safe_file.chmod(0o644) + + unsafe_file = tmp_path / "unsafe.txt" + unsafe_file.write_text("unsafe") + unsafe_file.chmod(0o777) + + result = auditor.scan_and_fix(str(tmp_path), apply_fixes=False, dry_run=True) + + assert "report" in result + assert "issues_found" in result + assert result["issues_found"] >= 1 + assert "PERMISSION AUDIT REPORT" in result["report"] + + +def test_suggest_fix_for_executable(tmp_path): + """Test suggest_fix for executable files""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + + # Create executable script + script = tmp_path / "script.sh" + script.write_text("#!/bin/bash\necho hello") + script.chmod(0o777) + + suggestion = auditor.suggest_fix(script, "777") + assert "chmod" in suggestion + assert "755" in suggestion # Should suggest 755 for executables + + +def test_suggest_fix_for_data_file(tmp_path): + """Test suggest_fix for data files""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + + # Create data file WITHOUT execute bit + data_file = tmp_path / "data.txt" + data_file.write_text("data content") + # Set 666 instead of 777 to avoid execute bit + data_file.chmod(0o666) + + suggestion = auditor.suggest_fix(data_file, "666") + assert "chmod" in suggestion + # File without execute bit should get 644 + assert "644" in suggestion # Should suggest 644 for non-executable files + + +def test_config_module(): + """Test that config module can be imported""" + from cortex.permissions import config + + assert hasattr(config, "DANGEROUS_PERMISSIONS") + assert hasattr(config, "RECOMMENDED_PERMISSIONS") + assert isinstance(config.DANGEROUS_PERMISSIONS, dict) + assert isinstance(config.RECOMMENDED_PERMISSIONS, dict) + + +def test_docker_handler_module(): + """Test docker_handler module imports""" + from cortex.permissions.docker_handler import DockerPermissionHandler, detect_docker_uid_mapping + + assert callable(detect_docker_uid_mapping) + + +def test_cli_integration(): + """Test that CLI can import and use PermissionManager""" + manager = PermissionManager(verbose=False) + + # Basic functionality check + assert hasattr(manager, "scan_directory") + assert hasattr(manager, "suggest_fix") + assert hasattr(manager, "scan_and_fix") + + +def test_permission_fixer_alias(): + """Test PermissionFixer alias""" + from cortex.permissions import PermissionFixer + + fixer = PermissionFixer() + assert hasattr(fixer, "scan_directory") + + +def test_scan_path_function(): + """Test scan_path compatibility function""" + from cortex.permissions import scan_path + + result = scan_path(".") + assert isinstance(result, dict) + assert "world_writable" in result + assert "dangerous" in result + assert "suggestions" in result + + +def test_analyze_permissions_function(): + """Test analyze_permissions compatibility function""" + from cortex.permissions import analyze_permissions + + result = analyze_permissions(".") + assert isinstance(result, dict) + assert "scan" in result + assert "auditor" in result + + +def test_pytest_works(): + """Simple test to verify pytest is working""" + assert 1 + 1 == 2 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) + + +def test_scan_directory_permission_error(tmp_path, mocker): + """Test scan_directory handles permission errors gracefully""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor(verbose=True) + + # Mock rglob to raise PermissionError + mocker.patch( + "cortex.permissions.auditor_fixer.Path.rglob", + side_effect=PermissionError("Mocked permission error"), + ) + result = auditor.scan_directory(tmp_path) + + assert isinstance(result, dict) + assert result["world_writable"] == [] + assert result["dangerous"] == [] + assert result["suggestions"] == [] + + +def test_scan_directory_file_access_error(tmp_path, mocker): + """Test scan_directory handles file access errors""" + from unittest.mock import Mock + + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor(verbose=False) + + # Create a mock item that raises OSError on stat() + mock_item = Mock() + mock_item.is_file.return_value = True + mock_item.stat.side_effect = OSError("Mocked access error") + + # Mock rglob to return our mock item + mocker.patch("cortex.permissions.auditor_fixer.Path.rglob", return_value=[mock_item]) + result = auditor.scan_directory(tmp_path) + + assert isinstance(result, dict) + # Should continue despite errors + + +def test_fix_permissions_os_error(tmp_path, mocker): + """Test fix_permissions handles OSError""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + test_file = tmp_path / "test.txt" + test_file.write_text("test") + + # Mock chmod to raise OSError + mocker.patch("pathlib.Path.chmod", side_effect=OSError("Mocked OSError")) + result = auditor.fix_permissions(test_file, "644", dry_run=False) + + assert "Error changing permissions" in result + assert "Mocked OSError" in result or "OSError" in result + + +def test_suggest_fix_with_shebang(tmp_path): + """Test suggest_fix for files with shebang""" + from cortex.permissions.auditor_fixer import PermissionAuditor + + auditor = PermissionAuditor() + + # Create script with shebang but no execute bit + script = tmp_path / "script.py" + script.write_text("#!/usr/bin/env python3\nprint('hello')") + script.chmod(0o644) # No execute bit + + suggestion = auditor.suggest_fix(script, "644") + assert "chmod" in suggestion + # Should suggest 755 because of shebang + assert "755" in suggestion diff --git a/tests/permissions/test_config.py b/tests/permissions/test_config.py new file mode 100644 index 00000000..634cd26f --- /dev/null +++ b/tests/permissions/test_config.py @@ -0,0 +1,67 @@ +""" +Tests for permission configuration module. +""" + +import pytest + +from cortex.permissions import config + + +def test_dangerous_permissions(): + """Test DANGEROUS_PERMISSIONS dictionary""" + assert 0o777 in config.DANGEROUS_PERMISSIONS + assert 0o666 in config.DANGEROUS_PERMISSIONS + assert 0o000 in config.DANGEROUS_PERMISSIONS + + # Check descriptions contain relevant info + desc_777 = config.DANGEROUS_PERMISSIONS[0o777] + desc_666 = config.DANGEROUS_PERMISSIONS[0o666] + + assert isinstance(desc_777, str) + assert isinstance(desc_666, str) + assert "rwx" in desc_777.lower() or "777" in desc_777 + assert "rw-" in desc_666.lower() or "666" in desc_666 + + +def test_world_writable_flag(): + """Test WORLD_WRITABLE_FLAG""" + import stat + + assert config.WORLD_WRITABLE_FLAG == 0o002 + assert config.WORLD_WRITABLE_FLAG == stat.S_IWOTH + + +def test_ignore_patterns(): + """Test IGNORE_PATTERNS list""" + assert isinstance(config.IGNORE_PATTERNS, list) + assert len(config.IGNORE_PATTERNS) > 0 + + # Check some common patterns + assert any("/proc/" in p for p in config.IGNORE_PATTERNS) + assert any(".git/" in p for p in config.IGNORE_PATTERNS) + assert any("__pycache__" in p for p in config.IGNORE_PATTERNS) + + +def test_sensitive_files(): + """Test SENSITIVE_FILES list""" + assert isinstance(config.SENSITIVE_FILES, list) + assert ".env" in config.SENSITIVE_FILES + assert ".ssh/id_rsa" in config.SENSITIVE_FILES + + +def test_recommended_permissions(): + """Test RECOMMENDED_PERMISSIONS dictionary""" + assert "directory" in config.RECOMMENDED_PERMISSIONS + assert "executable" in config.RECOMMENDED_PERMISSIONS + assert "config_file" in config.RECOMMENDED_PERMISSIONS + assert "secret_file" in config.RECOMMENDED_PERMISSIONS + + assert config.RECOMMENDED_PERMISSIONS["directory"] == 0o755 + assert config.RECOMMENDED_PERMISSIONS["secret_file"] == 0o600 + + +def test_docker_recommended_permissions(): + """Test DOCKER_RECOMMENDED_PERMISSIONS""" + assert "volume_directory" in config.DOCKER_RECOMMENDED_PERMISSIONS + assert "compose_file" in config.DOCKER_RECOMMENDED_PERMISSIONS + assert config.DOCKER_RECOMMENDED_PERMISSIONS["compose_file"] == 0o644 diff --git a/tests/permissions/test_docker_handler.py b/tests/permissions/test_docker_handler.py new file mode 100644 index 00000000..bbbb115e --- /dev/null +++ b/tests/permissions/test_docker_handler.py @@ -0,0 +1,260 @@ +""" +Basic tests for DockerPermissionHandler. +""" + +import pytest + +from cortex.permissions.docker_handler import DockerPermissionHandler, detect_docker_uid_mapping + + +def test_docker_handler_import(): + """Test that DockerPermissionHandler can be imported""" + # Импорт уже проверен в импорте выше, можно ничего не проверять + pass + + +def test_docker_handler_creation(): + """Test DockerPermissionHandler instantiation""" + handler = DockerPermissionHandler() + assert hasattr(handler, "container_info") + assert hasattr(handler, "verbose") + assert hasattr(handler, "dry_run") + + +def test_container_info_structure(): + """Test container_info has expected structure""" + handler = DockerPermissionHandler() + info = handler.container_info + + assert isinstance(info, dict) + assert "is_container" in info + assert "container_runtime" in info + assert "host_uid" in info + assert "host_gid" in info + assert "uid_mapping" in info + assert "gid_mapping" in info + + assert isinstance(info["is_container"], bool) + assert isinstance(info["host_uid"], int) + assert isinstance(info["host_gid"], int) + assert isinstance(info["uid_mapping"], dict) + assert isinstance(info["gid_mapping"], dict) + + +def test_detect_docker_uid_mapping(): + """Test convenience function""" + result = detect_docker_uid_mapping() + assert isinstance(result, dict) + assert "is_container" in result + + +def test_adjust_issue_for_container(): + """Test adjust_issue_for_container method""" + handler = DockerPermissionHandler() + + test_issue = {"path": "/some/path", "type": "world_writable"} + + adjusted = handler.adjust_issue_for_container(test_issue) + assert isinstance(adjusted, dict) + assert "path" in adjusted + + # If not in container, should return same issue + if not handler.container_info["is_container"]: + assert adjusted == test_issue + + +def test_get_container_specific_fix(): + """Test get_container_specific_fix method""" + handler = DockerPermissionHandler() + + test_issue = {"path": "/some/path", "is_directory": False} + + base_fix = {"command": "chmod 644 /some/path", "reason": "standard fix"} + + result = handler.get_container_specific_fix(test_issue, base_fix) + assert isinstance(result, dict) + assert "command" in result + assert "reason" in result + + +def test_uid_to_name_method(): + """Test _uid_to_name method""" + handler = DockerPermissionHandler() + + # Test with root UID (0) + result = handler._uid_to_name(0) + assert isinstance(result, str) + + # Test with current user UID + import os + + current_uid = os.getuid() + result = handler._uid_to_name(current_uid) + assert isinstance(result, str) + + # Test with high UID (likely doesn't exist) + result = handler._uid_to_name(99999) + assert "UID99999" in result + + +def test_gid_to_name_method(): + """Test _gid_to_name method""" + handler = DockerPermissionHandler() + + # Test with root GID (0) + result = handler._gid_to_name(0) + assert isinstance(result, str) + + # Test with high GID (likely doesn't exist) + result = handler._gid_to_name(99999) + assert "GID99999" in result + + +def test_generate_docker_permission_report(): + """Test generate_docker_permission_report method""" + handler = DockerPermissionHandler() + + report = handler.generate_docker_permission_report() + assert isinstance(report, str) + assert "DOCKER PERMISSION AUDIT" in report or "Docker Permission Audit" in report + + +def test_fix_docker_bind_mount_permissions_dry_run(tmp_path): + """Test fix_docker_bind_mount_permissions in dry-run mode""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler() + + # Create test directory + test_dir = tmp_path / "test_dir" + test_dir.mkdir() + + result = handler.fix_docker_bind_mount_permissions( + str(test_dir), host_uid=1000, host_gid=1000, dry_run=True + ) + + assert isinstance(result, dict) + assert "success" in result + assert "actions" in result + assert "warnings" in result + assert result["dry_run"] is True + + # Check structure + assert "current" in result or "warnings" in result + + +def test_fix_docker_bind_mount_permissions_nonexistent(): + """Test fix_docker_bind_mount_permissions with non-existent path""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler() + + result = handler.fix_docker_bind_mount_permissions( + "/nonexistent/path/12345", host_uid=1000, host_gid=1000, dry_run=True + ) + + assert isinstance(result, dict) + assert "warnings" in result + assert any("does not exist" in str(w).lower() for w in result["warnings"]) + + +def test_discover_uid_mapping(): + """Test _discover_uid_mapping method""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler() + mapping = handler._discover_uid_mapping() + + assert isinstance(mapping, dict) + # At least root (0) should be in mapping + assert 0 in mapping or len(mapping) > 0 + + +def test_discover_gid_mapping(): + """Test _discover_gid_mapping method""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler() + mapping = handler._discover_gid_mapping() + + assert isinstance(mapping, dict) + # At least root (0) should be in mapping + assert 0 in mapping or len(mapping) > 0 + + +def test_generate_docker_permission_report_with_path(tmp_path): + """Test generate_docker_permission_report with custom path""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler() + report = handler.generate_docker_permission_report() + + assert isinstance(report, str) + assert len(report) > 0 + assert "DOCKER" in report.upper() or "PERMISSION" in report.upper() + + +def test_container_info_types(): + """Test container_info has correct types""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler(verbose=False) + info = handler.container_info + + # Check types + assert isinstance(info["is_container"], bool) + assert isinstance(info["host_uid"], int) + assert isinstance(info["host_gid"], int) + assert isinstance(info["uid_mapping"], dict) + assert isinstance(info["gid_mapping"], dict) + assert isinstance(info["container_runtime"], str | None) + + # Check UID/GID are valid + assert info["host_uid"] >= 0 + assert info["host_gid"] >= 0 + + +def test_adjust_issue_with_stat(tmp_path): + """Test adjust_issue_for_container with stat info""" + import os + + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler() + + # Create test file + test_file = tmp_path / "test.txt" + test_file.write_text("test") + + # Get file stat + stat_info = os.stat(test_file) + + test_issue = { + "path": str(test_file), + "stat": {"uid": stat_info.st_uid, "gid": stat_info.st_gid}, + } + + adjusted = handler.adjust_issue_for_container(test_issue) + assert isinstance(adjusted, dict) + assert "path" in adjusted + + # If in container, should have uid_info/gid_info + if handler.container_info["is_container"]: + assert "uid_info" in adjusted or "gid_info" in adjusted + + +def test_get_container_specific_fix_with_docker_path(): + """Test get_container_specific_fix with docker path""" + from cortex.permissions.docker_handler import DockerPermissionHandler + + handler = DockerPermissionHandler() + + test_issue = {"path": "/var/lib/docker/volumes/test", "permission": "755"} + base_fix = {"command": "chmod 755 /var/lib/docker/volumes/test", "reason": "standard fix"} + + result = handler.get_container_specific_fix(test_issue, base_fix) + + assert "command" in result + assert "reason" in result + assert result["command"] == "chmod 755 /var/lib/docker/volumes/test" + assert result["reason"] == "standard fix" diff --git a/tests/unit/test_hwprofiler.py b/tests/unit/test_hwprofiler.py deleted file mode 100644 index 88d73d0d..00000000 --- a/tests/unit/test_hwprofiler.py +++ /dev/null @@ -1,299 +0,0 @@ -#!/usr/bin/env python3 -""" -Unit tests for hardware profiler. -Tests various hardware configurations and edge cases. -""" - -import json -import os -import subprocess -import sys -import unittest -from unittest.mock import MagicMock, mock_open, patch - -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) - -from cortex.hwprofiler import HardwareProfiler - - -class TestHardwareProfiler(unittest.TestCase): - """Test cases for HardwareProfiler.""" - - def setUp(self): - """Set up test fixtures.""" - self.profiler = HardwareProfiler() - - @patch("builtins.open") - @patch("subprocess.run") - def test_detect_cpu_amd_ryzen(self, mock_subprocess, mock_file): - """Test CPU detection for AMD Ryzen 9 5950X.""" - # Mock cpuinfo with multiple processors showing 16 cores - cpuinfo_data = """ -processor : 0 -vendor_id : AuthenticAMD -cpu family : 23 -model : 113 -model name : AMD Ryzen 9 5950X 16-Core Processor -stepping : 0 -physical id : 0 -core id : 0 -cpu cores : 16 - -processor : 1 -vendor_id : AuthenticAMD -cpu family : 23 -model : 113 -model name : AMD Ryzen 9 5950X 16-Core Processor -stepping : 0 -physical id : 0 -core id : 1 -cpu cores : 16 -""" - mock_file.return_value.read.return_value = cpuinfo_data - mock_file.return_value.__enter__.return_value = mock_file.return_value - - # Mock uname for architecture and nproc as fallback - def subprocess_side_effect(*args, **kwargs): - if args[0] == ["uname", "-m"]: - return MagicMock(returncode=0, stdout="x86_64\n") - elif args[0] == ["nproc"]: - return MagicMock(returncode=0, stdout="16\n") - return MagicMock(returncode=1, stdout="") - - mock_subprocess.side_effect = subprocess_side_effect - - cpu = self.profiler.detect_cpu() - - self.assertEqual(cpu["model"], "AMD Ryzen 9 5950X 16-Core Processor") - # Should detect 16 cores (either from parsing or nproc fallback) - self.assertGreaterEqual(cpu["cores"], 1) - self.assertEqual(cpu["architecture"], "x86_64") - - @patch( - "builtins.open", - new_callable=mock_open, - read_data=""" -processor : 0 -vendor_id : GenuineIntel -cpu family : 6 -model : 85 -model name : Intel(R) Xeon(R) Platinum 8280 CPU @ 2.70GHz -stepping : 7 -microcode : 0xffffffff -cpu MHz : 2700.000 -cache size : 39424 KB -physical id : 0 -siblings : 56 -core id : 0 -cpu cores : 28 -""", - ) - @patch("subprocess.run") - def test_detect_cpu_intel_xeon(self, mock_subprocess, mock_file): - """Test CPU detection for Intel Xeon.""" - mock_subprocess.return_value = MagicMock(returncode=0, stdout="x86_64\n") - - cpu = self.profiler.detect_cpu() - - self.assertIn("Xeon", cpu["model"]) - self.assertEqual(cpu["architecture"], "x86_64") - - @patch("subprocess.run") - def test_detect_gpu_nvidia(self, mock_subprocess): - """Test NVIDIA GPU detection.""" - # Mock subprocess calls - detect_gpu makes multiple calls - call_count = [0] - - def subprocess_side_effect(*args, **kwargs): - cmd = args[0] if args else [] - call_count[0] += 1 - - if "nvidia-smi" in cmd and "cuda_version" not in " ".join(cmd): - # First nvidia-smi call for GPU info - return MagicMock(returncode=0, stdout="NVIDIA GeForce RTX 4090, 24576, 535.54.03\n") - elif "nvidia-smi" in cmd and "cuda_version" in " ".join(cmd): - # Second nvidia-smi call for CUDA version - return MagicMock(returncode=0, stdout="12.3\n") - elif "lspci" in cmd: - # lspci call (should return empty or no GPU lines to avoid duplicates) - return MagicMock(returncode=0, stdout="") - else: - return MagicMock(returncode=1, stdout="") - - mock_subprocess.side_effect = subprocess_side_effect - - gpus = self.profiler.detect_gpu() - - self.assertGreaterEqual(len(gpus), 1) - nvidia_gpus = [g for g in gpus if g.get("vendor") == "NVIDIA"] - self.assertGreaterEqual(len(nvidia_gpus), 1) - self.assertIn("RTX 4090", nvidia_gpus[0]["model"]) - self.assertEqual(nvidia_gpus[0]["vram"], 24576) - if "cuda" in nvidia_gpus[0]: - self.assertEqual(nvidia_gpus[0]["cuda"], "12.3") - - @patch("subprocess.run") - def test_detect_gpu_amd(self, mock_subprocess): - """Test AMD GPU detection.""" - # Mock lspci output for AMD - mock_subprocess.return_value = MagicMock( - returncode=0, - stdout="01:00.0 VGA compatible controller: Advanced Micro Devices, Inc. [AMD/ATI] Radeon RX 7900 XTX\n", - ) - - gpus = self.profiler.detect_gpu() - - # Should detect AMD GPU - amd_gpus = [g for g in gpus if g.get("vendor") == "AMD"] - self.assertGreater(len(amd_gpus), 0) - - @patch("subprocess.run") - def test_detect_gpu_intel(self, mock_subprocess): - """Test Intel GPU detection.""" - # Mock lspci output for Intel - mock_subprocess.return_value = MagicMock( - returncode=0, - stdout="00:02.0 VGA compatible controller: Intel Corporation UHD Graphics 630\n", - ) - - gpus = self.profiler.detect_gpu() - - # Should detect Intel GPU - intel_gpus = [g for g in gpus if g.get("vendor") == "Intel"] - self.assertGreater(len(intel_gpus), 0) - - @patch( - "builtins.open", - new_callable=mock_open, - read_data=""" -MemTotal: 67108864 kB -MemFree: 12345678 kB -MemAvailable: 23456789 kB -""", - ) - def test_detect_ram(self, mock_file): - """Test RAM detection.""" - ram = self.profiler.detect_ram() - - # 67108864 kB = 65536 MB - self.assertEqual(ram, 65536) - - @patch("subprocess.run") - @patch("os.path.exists") - def test_detect_storage_nvme(self, mock_exists, mock_subprocess): - """Test NVMe storage detection.""" - # Mock lsblk output - mock_subprocess.return_value = MagicMock(returncode=0, stdout="nvme0n1 disk 2.0T\n") - - # Mock rotational check (NVMe doesn't have this file) - mock_exists.return_value = False - - storage = self.profiler.detect_storage() - - self.assertGreater(len(storage), 0) - nvme_devices = [s for s in storage if s.get("type") == "nvme"] - self.assertGreater(len(nvme_devices), 0) - - @patch("subprocess.run") - @patch("os.path.exists") - @patch("builtins.open", new_callable=mock_open, read_data="0\n") - def test_detect_storage_ssd(self, mock_file, mock_exists, mock_subprocess): - """Test SSD storage detection.""" - # Mock lsblk output - mock_subprocess.return_value = MagicMock(returncode=0, stdout="sda disk 1.0T\n") - - # Mock rotational file exists and returns 0 (SSD) - mock_exists.return_value = True - - storage = self.profiler.detect_storage() - - self.assertGreater(len(storage), 0) - - @patch("subprocess.run") - def test_detect_network(self, mock_subprocess): - """Test network detection.""" - # Mock ip link output - mock_subprocess.return_value = MagicMock( - returncode=0, - stdout="1: lo: mtu 65536\n2: eth0: mtu 1500\n", - ) - - # Mock speed file - with patch("builtins.open", mock_open(read_data="1000\n")): - network = self.profiler.detect_network() - - self.assertIn("interfaces", network) - self.assertGreaterEqual(network["max_speed_mbps"], 0) - - @patch("cortex.hwprofiler.HardwareProfiler.detect_cpu") - @patch("cortex.hwprofiler.HardwareProfiler.detect_gpu") - @patch("cortex.hwprofiler.HardwareProfiler.detect_ram") - @patch("cortex.hwprofiler.HardwareProfiler.detect_storage") - @patch("cortex.hwprofiler.HardwareProfiler.detect_network") - def test_profile_complete(self, mock_network, mock_storage, mock_ram, mock_gpu, mock_cpu): - """Test complete profiling.""" - mock_cpu.return_value = { - "model": "AMD Ryzen 9 5950X", - "cores": 16, - "architecture": "x86_64", - } - mock_gpu.return_value = [ - {"vendor": "NVIDIA", "model": "RTX 4090", "vram": 24576, "cuda": "12.3"} - ] - mock_ram.return_value = 65536 - mock_storage.return_value = [{"type": "nvme", "size": 2048000, "device": "nvme0n1"}] - mock_network.return_value = { - "interfaces": [{"name": "eth0", "speed_mbps": 1000}], - "max_speed_mbps": 1000, - } - - profile = self.profiler.profile() - - self.assertIn("cpu", profile) - self.assertIn("gpu", profile) - self.assertIn("ram", profile) - self.assertIn("storage", profile) - self.assertIn("network", profile) - - self.assertEqual(profile["cpu"]["model"], "AMD Ryzen 9 5950X") - self.assertEqual(profile["cpu"]["cores"], 16) - self.assertEqual(len(profile["gpu"]), 1) - self.assertEqual(profile["gpu"][0]["vendor"], "NVIDIA") - self.assertEqual(profile["ram"], 65536) - - def test_to_json(self): - """Test JSON serialization.""" - with patch.object(self.profiler, "profile") as mock_profile: - mock_profile.return_value = { - "cpu": {"model": "Test CPU", "cores": 4}, - "gpu": [], - "ram": 8192, - "storage": [], - "network": {"interfaces": [], "max_speed_mbps": 0}, - } - - json_str = self.profiler.to_json() - parsed = json.loads(json_str) - - self.assertIn("cpu", parsed) - self.assertEqual(parsed["cpu"]["model"], "Test CPU") - - @patch("builtins.open", side_effect=OSError("Permission denied")) - def test_detect_cpu_error_handling(self, mock_file): - """Test CPU detection error handling.""" - cpu = self.profiler.detect_cpu() - - self.assertIn("model", cpu) - self.assertIn("error", cpu) - - @patch("subprocess.run", side_effect=subprocess.TimeoutExpired("nvidia-smi", 2)) - def test_detect_gpu_timeout(self, mock_subprocess): - """Test GPU detection timeout handling.""" - gpus = self.profiler.detect_gpu() - - # Should return empty list or handle gracefully - self.assertIsInstance(gpus, list) - - -if __name__ == "__main__": - unittest.main()