diff --git a/cortex/ask.py b/cortex/ask.py index c66971d9..6d6453b2 100644 --- a/cortex/ask.py +++ b/cortex/ask.py @@ -285,11 +285,12 @@ def _call_fake(self, question: str, system_prompt: str) -> str: return f"You have Python {platform.python_version()} installed." return "I cannot answer that question in test mode." - def ask(self, question: str) -> str: + def ask(self, question: str, system_prompt: str | None = None) -> str: """Ask a natural language question about the system. Args: question: Natural language question + system_prompt: Optional override for the system prompt Returns: Human-readable answer string @@ -302,8 +303,11 @@ def ask(self, question: str) -> str: raise ValueError("Question cannot be empty") question = question.strip() - context = self.info_gatherer.gather_context() - system_prompt = self._get_system_prompt(context) + + # Use provided system prompt or generate default + if system_prompt is None: + context = self.info_gatherer.gather_context() + system_prompt = self._get_system_prompt(context) # Cache lookup uses both question and system context (via system_prompt) for system-specific answers cache_key = f"ask:{question}" diff --git a/cortex/cli.py b/cortex/cli.py index b1cfe4a1..47142c2a 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2825,6 +2825,20 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: console.print(f"Error: {result.error_message}", style="red") return 1 + def doctor(self) -> int: + """Run system health checks.""" + from cortex.doctor import SystemDoctor + + doc = SystemDoctor() + return doc.run_checks() + + def troubleshoot(self) -> int: + """Run interactive troubleshooter.""" + from cortex.troubleshoot import Troubleshooter + + troubleshooter = Troubleshooter() + return troubleshooter.start() + # -------------------------- @@ -3422,6 +3436,11 @@ def main(): ) # -------------------------- + # Doctor command + subparsers.add_parser("doctor", help="System health check") + + # Troubleshoot command + subparsers.add_parser("troubleshoot", help="Interactive system troubleshooter") # License and upgrade commands subparsers.add_parser("upgrade", help="Upgrade to Cortex Pro") subparsers.add_parser("license", help="Show license status") @@ -3623,6 +3642,10 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command == "doctor": + return cli.doctor() + elif args.command == "troubleshoot": + return cli.troubleshoot() elif args.command == "upgrade": from cortex.licensing import open_upgrade_page diff --git a/cortex/resolutions.py b/cortex/resolutions.py new file mode 100644 index 00000000..f5b790a8 --- /dev/null +++ b/cortex/resolutions.py @@ -0,0 +1,87 @@ +""" +Resolution Manager for Cortex Troubleshooter. + +This module handles the storage and retrieval of successful troubleshooting resolutions. +It uses a simple JSON file for storage and keyword matching for retrieval. +""" + +import json +import os +from pathlib import Path +from typing import TypedDict + + +class Resolution(TypedDict): + issue: str + fix: str + timestamp: float + + +class ResolutionManager: + def __init__(self, storage_path: str = "~/.cortex/resolutions.json"): + self.storage_path = Path(os.path.expanduser(storage_path)) + self._ensure_storage() + + def _ensure_storage(self) -> None: + """Ensure the storage file exists.""" + if not self.storage_path.exists(): + self.storage_path.parent.mkdir(parents=True, exist_ok=True) + with open(self.storage_path, "w") as f: + json.dump([], f) + + def save(self, issue: str, fix: str) -> None: + """Save a new resolution.""" + import time + + resolution: Resolution = { + "issue": issue, + "fix": fix, + "timestamp": time.time(), + } + + try: + with open(self.storage_path) as f: + resolutions = json.load(f) + except (json.JSONDecodeError, FileNotFoundError): + resolutions = [] + + resolutions.append(resolution) + + # Keep only the last 50 resolutions to prevent unlimited growth + if len(resolutions) > 50: + resolutions = resolutions[-50:] + + with open(self.storage_path, "w") as f: + json.dump(resolutions, f, indent=2) + + def search(self, query: str, limit: int = 3) -> list[Resolution]: + """ + Search for resolutions relevant to the query. + + Uses simple keyword matching: finds resolutions where the issue description + shares words with the query. + """ + try: + with open(self.storage_path) as f: + resolutions: list[Resolution] = json.load(f) + except (json.JSONDecodeError, FileNotFoundError): + return [] + + if not resolutions: + return [] + + query_words = set(query.lower().split()) + scored_resolutions = [] + + for res in resolutions: + if "issue" not in res or "fix" not in res: + continue + issue_words = set(res["issue"].lower().split()) + # Calculate overlap score + score = len(query_words.intersection(issue_words)) + if score > 0: + scored_resolutions.append((score, res)) + + # Sort by score (descending) and take top N + scored_resolutions.sort(key=lambda x: x[0], reverse=True) + return [res for _, res in scored_resolutions[:limit]] diff --git a/cortex/troubleshoot.py b/cortex/troubleshoot.py new file mode 100644 index 00000000..54e2fdd1 --- /dev/null +++ b/cortex/troubleshoot.py @@ -0,0 +1,320 @@ +""" +Interactive Troubleshooting Assistant for Cortex. + +This module provides the Troubleshooter class which: +1. Acts as a general-purpose AI assistant +2. Suggests shell commands to fix issues +3. Executes commands on behalf of the user (with confirmation) +""" + +import os +import re +import shutil +import subprocess +import sys +from typing import Optional + +from rich.console import Console +from rich.markdown import Markdown +from rich.panel import Panel +from rich.prompt import Confirm, Prompt +from rich.syntax import Syntax + +from cortex.api_key_detector import auto_detect_api_key +from cortex.ask import AskHandler +from cortex.logging_system import CortexLogger +from cortex.resolutions import ResolutionManager + +console = Console() + +# Dangerous command patterns that should never be executed +DANGEROUS_PATTERNS = [ + r"\brm\s+(-[^\s]*\s+)*-rf\b", # rm -rf + r"\brm\s+(-[^\s]*\s+)*-fr\b", # rm -fr (same as above) + r"\brm\s+(-[^\s]*\s+)*/\s*$", # rm / + r"\bmkfs\b", # Format filesystem + r"\bdd\s+.*of=/dev/", # dd to device + r">\s*/dev/sd[a-z]", # Redirect to disk + r"\bchmod\s+(-[^\s]*\s+)*777\s+/", # chmod 777 on root + r"\bchown\s+.*\s+/\s*$", # chown on root + r":\(\)\s*{\s*:\|:\s*&\s*}", # Fork bomb + r"\bshutdown\b", # Shutdown + r"\breboot\b", # Reboot + r"\binit\s+0\b", # Halt + r"\bpoweroff\b", # Poweroff + r"\|\s*bash", # Pipe to bash + r"\|\s*sh", # Pipe to sh +] + + +class Troubleshooter: + def __init__(self): + self.logger = CortexLogger("troubleshooter") + self.messages: list[dict[str, str]] = [] + + # Initialize AI + try: + found, key, provider, _ = auto_detect_api_key() + self.api_key = key or "" + provider_name = provider or "openai" + if provider_name == "anthropic": + self.provider = "claude" + else: + self.provider = provider_name + # Validate key presence (Ollama uses dummy key, so it's fine) + if not self.api_key and self.provider != "ollama": + raise ValueError(f"No API key found for provider '{self.provider}'") + self.ai = AskHandler(self.api_key, self.provider) + self.ai.cache = None # Disable caching for conversational context + except Exception as e: + self.logger.warning(f"Failed to initialize AI: {e}") + self.ai = None + + self.resolutions = ResolutionManager() + + def _get_provider(self) -> str: + """Determine which LLM provider to use.""" + found, _, provider, _ = auto_detect_api_key() + if provider == "anthropic": + return "claude" + return provider or "openai" + + def _get_api_key(self) -> str: + """Get the API key for the configured provider.""" + found, key, _, _ = auto_detect_api_key() + return key or "" + + def start(self) -> int: + """Start the troubleshooting session.""" + console.print("[bold cyan]🤖 Cortex Troubleshooter[/bold cyan]") + console.print( + "[dim]Describe your issue, type 'doctor' to run health checks, or 'help' to escalate to human support.[/dim]" + ) + + if not self.ai: + console.print("\n[red]❌ AI Assistant unavailable (check API key).[/red]") + return 1 + + # Initial System Prompt + system_prompt = ( + "You are Cortex, an AI-powered Cortex Linux troubleshooting assistant. " + "Your goal is to diagnose and fix system issues. " + "Do not answer general questions unrelated to system maintenance or troubleshooting. " + "Rules:\n" + "1. ALWAYS provide the specific shell command to run in a `bash` code block. Do not just tell the user to run it.\n" + "2. Suggest one step at a time. Wait for the command output before proceeding.\n" + "3. Analyze the command output and explain the findings step-by-step.\n" + "4. Maintain your identity as Cortex." + ) + self.messages.append({"role": "system", "content": system_prompt}) + + return self._interactive_loop() + + def _extract_code_blocks(self, text: str) -> list[str]: + """Extract content from markdown code blocks.""" + # Match ```bash ... ``` or ```sh ... ``` or just ``` ... ``` + pattern = r"```(?:bash|sh)?\n(.*?)```" + return re.findall(pattern, text, re.DOTALL) + + def _is_command_safe(self, cmd: str) -> tuple[bool, str]: + """Check if a command is safe to execute. + + Returns: + Tuple of (is_safe, reason) + """ + for pattern in DANGEROUS_PATTERNS: + if re.search(pattern, cmd, re.IGNORECASE): + return False, f"Command matches dangerous pattern: {pattern}" + return True, "" + + def _execute_command(self, cmd: str) -> str: + """Execute a shell command and return output. + + If Firejail is available, the command is executed in a sandbox + for additional security since AI-suggested commands are untrusted. + """ + # Log the command execution for audit + self.logger.info(f"Executing command: {cmd}") + + # Check if Firejail is available for sandboxing + use_sandbox = shutil.which("firejail") is not None + + exec_cmd = cmd + if use_sandbox: + exec_cmd = f"firejail --quiet --private-tmp {cmd}" + self.logger.info("Using Firejail sandbox for command execution") + + try: + result = subprocess.run( + exec_cmd, shell=True, capture_output=True, text=True, timeout=30 + ) + output = result.stdout + if result.stderr: + output += f"\n[STDERR]\n{result.stderr}" + result_output = output.strip() + self.logger.info(f"Command completed with exit code: {result.returncode}") + return result_output + except Exception as e: + self.logger.error(f"Command execution failed: {e}") + return f"Error executing command: {e}" + + def _interactive_loop(self) -> int: + """Main chat loop with command execution.""" + try: + while True: + user_input = Prompt.ask("\n[bold green]You[/bold green]") + + if user_input.lower() in ["exit", "quit", "q"]: + # Learning Trigger + if Confirm.ask("Did we solve your problem?"): + with console.status("[cyan]Learning from success...[/cyan]"): + history_text = "\n".join( + [f"{m['role']}: {m['content']}" for m in self.messages] + ) + try: + extraction = self.ai.ask( + f"Analyze this troubleshooting session. Extract the core issue and the specific command that fixed it. Return ONLY a JSON object with keys 'issue' and 'fix'.\n\nSession:\n{history_text}", + system_prompt="You are a knowledge extraction bot. Return only valid JSON.", + ) + # Simple parsing (robustness can be improved) + import json + import re + + # Use regex to find the JSON block + match = re.search(r"\{.*\}", extraction, re.DOTALL) + if match: + clean_json = match.group(0) + data = json.loads(clean_json) + + if "issue" in data and "fix" in data: + self.resolutions.save(data["issue"], data["fix"]) + console.print("[bold green]✓ Knowledge saved![/bold green]") + else: + self.logger.warning(f"Incomplete resolution data: {data}") + else: + self.logger.warning(f"No JSON found in response: {extraction}") + except Exception as e: + self.logger.warning(f"Failed to learn resolution: {e}") + + console.print("[dim]Exiting troubleshooter.[/dim]") + break + + # Special command to run doctor manually + if user_input.lower() == "doctor": + from cortex.doctor import SystemDoctor + + doc = SystemDoctor() + doc.run_checks() + continue + + # Help command for escalation + if user_input.lower() == "help": + with console.status("[cyan]Generating support summary...[/cyan]"): + # Ask AI to summarize the issue + history_text = "\n".join( + [f"{m['role']}: {m['content']}" for m in self.messages] + ) + summary = self.ai.ask( + f"Summarize the following troubleshooting session for a support ticket. Include the user's issue, commands tried, and errors encountered:\n\n{history_text}", + system_prompt="Create a concise summary of the issue with user's POV", + ) + + log_file = "cortex_support_log.txt" + log_path = os.path.abspath(log_file) + + with open(log_file, "w") as f: + f.write("Cortex Troubleshooting Log\n") + f.write("==========================\n\n") + f.write("Issue Summary:\n") + f.write(summary) + + console.print( + f"\n[bold green]✓ Diagnostic log saved to {log_path}[/bold green]" + ) + console.print(f"Please open a new issue and attach the {log_file} file.") + continue + + self.messages.append({"role": "user", "content": user_input}) + + with console.status("[cyan]Thinking...[/cyan]"): + # Construct prompt with history + history_text = "\n".join( + [f"{m['role']}: {m['content']}" for m in self.messages[-5:]] + ) + + # Dynamic Recall: Search for relevant past resolutions + relevant_fixes = self.resolutions.search(user_input) + current_system_prompt = self.messages[0]["content"] + + if relevant_fixes: + fixes_text = "\n".join( + [f"- Issue: {r['issue']} -> Fix: {r['fix']}" for r in relevant_fixes] + ) + current_system_prompt += f"\n\n[MEMORY] Here are past successful fixes for similar issues:\n{fixes_text}" + + # We pass the system prompt explicitly to override AskHandler's default + response = self.ai.ask( + question=f"History:\n{history_text}\n\nUser: {user_input}", + system_prompt=current_system_prompt, # The initial system prompt + memory + ) + + console.print(Markdown(response)) + self.messages.append({"role": "assistant", "content": response}) + + # Check for commands to execute + commands = self._extract_code_blocks(response) + if commands: + for cmd in commands: + cmd = cmd.strip() + if not cmd: + continue + + console.print("\n[bold yellow]Suggested Command:[/bold yellow]") + console.print(Syntax(cmd, "bash", theme="monokai", line_numbers=False)) + + # Check if command is safe + is_safe, reason = self._is_command_safe(cmd) + if not is_safe: + console.print( + "\n[bold red]⚠️ BLOCKED: This command is potentially dangerous.[/bold red]" + ) + console.print(f"[dim]Reason: {reason}[/dim]") + self.logger.warning(f"Blocked dangerous command: {cmd}") + continue + + if Confirm.ask("Execute this command?"): + with console.status("[bold yellow]Executing...[/bold yellow]"): + output = self._execute_command(cmd) + + # Show output to user + console.print( + Panel( + output, title="Command Output", border_style="dim", expand=False + ) + ) + + console.print("[dim]Output captured.[/dim]") + # Feed output back to AI + self.messages.append( + {"role": "system", "content": f"Command Output:\n{output}"} + ) + + # Ask AI for analysis of the output + with console.status("[cyan]Analyzing output...[/cyan]"): + analysis = self.ai.ask( + f"Command '{cmd}' produced this output:\n{output}\n\nWhat is the next step?", + system_prompt=self.messages[0]["content"], + ) + + console.print(Markdown(analysis)) + self.messages.append({"role": "assistant", "content": analysis}) + + except KeyboardInterrupt: + console.print("\n[dim]Session cancelled.[/dim]") + return 130 + except Exception as e: + console.print(f"\n[red]Error: {e}[/red]") + self.logger.error("Troubleshooting loop failed", exc_info=True) + return 1 + + return 0 diff --git a/cortex_support_log.txt b/cortex_support_log.txt new file mode 100644 index 00000000..570542f8 --- /dev/null +++ b/cortex_support_log.txt @@ -0,0 +1,5 @@ +Cortex Troubleshooting Log +========================== + +Issue Summary: +The user reported that Docker won't start. The assistant asked the user to check the status of the Docker service using the `systemctl status docker` command. However, the user encountered an error indicating that they don't have execute permissions for the `/usr/bin/systemctl` file. The assistant then suggested to change the permissions of the file to include execute permissions using the `sudo chmod +x /usr/bin/systemctl` command. \ No newline at end of file diff --git a/docs/COMMANDS.md b/docs/COMMANDS.md index 9e6e9a25..4cf66219 100644 --- a/docs/COMMANDS.md +++ b/docs/COMMANDS.md @@ -11,6 +11,8 @@ This document provides a comprehensive reference for all commands available in t | `cortex demo` | See Cortex in action | | `cortex wizard` | Configure API key | | `cortex status` | Show comprehensive system status and health checks | +| `cortex doctor` | Run system health checks | +| `cortex troubleshoot` | Interactive AI troubleshooting assistant | | `cortex history` | View installation history | | `cortex rollback ` | Undo an installation | | `cortex stack ` | Install a pre-built package stack | @@ -135,6 +137,85 @@ cortex status --- +### `cortex troubleshoot` + +Interactive AI-powered troubleshooting assistant that can diagnose system issues and execute commands. + +**Usage:** +```bash +cortex troubleshoot +``` + +**Features:** +- Conversational AI that understands your system issues +- Suggests shell commands to diagnose and fix problems +- Executes commands with your explicit confirmation +- Analyzes command output and suggests next steps +- Dangerous command protection (blocks `rm -rf`, `mkfs`, etc.) + +**Flow:** +``` +┌─────────────────────────────────────────┐ +│ User describes issue │ +└─────────────────┬───────────────────────┘ + ▼ +┌─────────────────────────────────────────┐ +│ AI suggests diagnostic command │ +└─────────────────┬───────────────────────┘ + ▼ +┌─────────────────────────────────────────┐ +│ User confirms execution [y/n] │ +└─────────────────┬───────────────────────┘ + ▼ +┌─────────────────────────────────────────┐ +│ Command runs, output displayed │ +└─────────────────┬───────────────────────┘ + ▼ +┌─────────────────────────────────────────┐ +│ AI analyzes output, suggests next step │ +└─────────────────────────────────────────┘ +``` + +**Example Session:** +```bash +$ cortex troubleshoot +🤖 Cortex Troubleshooter +Describe your issue, or type 'doctor' to run health checks. + +You: docker won't start + +AI: Let's check the Docker service status: +```bash +systemctl status docker +``` + +Suggested Command: +systemctl status docker +Execute this command? [y/n]: y + +[Command Output displayed] + +AI: The Docker daemon failed to start. Let's check the logs... +``` + +**Safety:** +- All commands require explicit user confirmation +- Dangerous commands are automatically blocked: + - `rm -rf`, `rm -fr` + - `mkfs` (filesystem format) + - `dd` to devices + - `shutdown`, `reboot`, `poweroff` + - `chmod 777 /` + - Fork bombs + +**Special Commands:** +| Command | Action | +|---------|--------| +| `doctor` | Run health checks mid-session | +| `exit`, `quit`, `q` | Exit troubleshooter | + +--- + ### `cortex history` View the history of package installations and operations. diff --git a/tests/test_resolutions.py b/tests/test_resolutions.py new file mode 100644 index 00000000..a8e2925a --- /dev/null +++ b/tests/test_resolutions.py @@ -0,0 +1,63 @@ +"""Tests for ResolutionManager.""" + +import json +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import patch + +from cortex.resolutions import ResolutionManager + + +class TestResolutionManager(unittest.TestCase): + def setUp(self): + self.temp_dir = TemporaryDirectory() + self.storage_path = Path(self.temp_dir.name) / "resolutions.json" + self.manager = ResolutionManager(str(self.storage_path)) + + def tearDown(self): + self.temp_dir.cleanup() + + def test_save_resolution(self): + """Test saving a resolution.""" + self.manager.save("Docker failed", "systemctl start docker") + + with open(self.storage_path) as f: + data = json.load(f) + + self.assertEqual(len(data), 1) + self.assertEqual(data[0]["issue"], "Docker failed") + self.assertEqual(data[0]["fix"], "systemctl start docker") + + def test_search_resolution(self): + """Test searching for resolutions.""" + self.manager.save("Docker failed to start", "systemctl start docker") + self.manager.save("Python missing", "apt install python3") + self.manager.save("Cannot connect to Docker", "usermod -aG docker $USER") + + # Search for "docker" + results = self.manager.search("I have a docker issue") + self.assertEqual(len(results), 2) + issues = [r["issue"] for r in results] + self.assertIn("Docker failed to start", issues) + self.assertIn("Cannot connect to Docker", issues) + self.assertNotIn("Python missing", issues) + + def test_search_limit(self): + """Test search result limit.""" + for i in range(5): + self.manager.save(f"Issue {i}", f"Fix {i}") + + results = self.manager.search("Issue", limit=2) + self.assertEqual(len(results), 2) + + def test_max_resolutions_limit(self): + """Test that we only keep the last 50 resolutions.""" + for i in range(60): + self.manager.save(f"Issue {i}", f"Fix {i}") + + with open(self.storage_path) as f: + data = json.load(f) + + self.assertEqual(len(data), 50) + self.assertEqual(data[-1]["issue"], "Issue 59") diff --git a/tests/test_troubleshoot.py b/tests/test_troubleshoot.py new file mode 100644 index 00000000..d3b2d586 --- /dev/null +++ b/tests/test_troubleshoot.py @@ -0,0 +1,656 @@ +"""Unit tests for the troubleshoot module.""" + +import os +import sys +import unittest +from unittest.mock import MagicMock, patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from cortex.troubleshoot import DANGEROUS_PATTERNS, Troubleshooter + + +class TestExtractCodeBlocks(unittest.TestCase): + """Tests for _extract_code_blocks method.""" + + def setUp(self): + """Set up test fixtures.""" + self.mock_res_manager_patcher = patch("cortex.troubleshoot.ResolutionManager") + self.mock_res_manager = self.mock_res_manager_patcher.start() + + # Mock the API key detector to avoid dependency on real config + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "fake-key", "fake", "test") + with patch("cortex.troubleshoot.AskHandler"): + self.troubleshooter = Troubleshooter() + + def tearDown(self): + self.mock_res_manager_patcher.stop() + + def test_extract_bash_block(self): + """Test extracting a bash code block.""" + text = """Here is a command: +```bash +ls -la +``` +That's it.""" + blocks = self.troubleshooter._extract_code_blocks(text) + self.assertEqual(len(blocks), 1) + self.assertEqual(blocks[0].strip(), "ls -la") + + def test_extract_sh_block(self): + """Test extracting an sh code block.""" + text = """Run this: +```sh +df -h +```""" + blocks = self.troubleshooter._extract_code_blocks(text) + self.assertEqual(len(blocks), 1) + self.assertEqual(blocks[0].strip(), "df -h") + + def test_extract_generic_block(self): + """Test extracting a generic code block without language specifier.""" + text = """Command: +``` +echo hello +```""" + blocks = self.troubleshooter._extract_code_blocks(text) + self.assertEqual(len(blocks), 1) + self.assertEqual(blocks[0].strip(), "echo hello") + + def test_extract_multiple_blocks(self): + """Test extracting multiple code blocks.""" + text = """First: +```bash +cmd1 +``` +Second: +```bash +cmd2 +```""" + blocks = self.troubleshooter._extract_code_blocks(text) + self.assertEqual(len(blocks), 2) + self.assertEqual(blocks[0].strip(), "cmd1") + self.assertEqual(blocks[1].strip(), "cmd2") + + def test_extract_no_blocks(self): + """Test text without code blocks.""" + text = "Just some text without any code blocks." + blocks = self.troubleshooter._extract_code_blocks(text) + self.assertEqual(len(blocks), 0) + + def test_extract_multiline_command(self): + """Test extracting a multiline command.""" + text = """Run: +```bash +for i in 1 2 3; do + echo $i +done +```""" + blocks = self.troubleshooter._extract_code_blocks(text) + self.assertEqual(len(blocks), 1) + self.assertIn("for i in", blocks[0]) + self.assertIn("done", blocks[0]) + + +class TestIsCommandSafe(unittest.TestCase): + """Tests for _is_command_safe method (blacklist enforcement).""" + + def setUp(self): + """Set up test fixtures.""" + self.mock_res_manager_patcher = patch("cortex.troubleshoot.ResolutionManager") + self.mock_res_manager = self.mock_res_manager_patcher.start() + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "fake-key", "fake", "test") + with patch("cortex.troubleshoot.AskHandler"): + self.troubleshooter = Troubleshooter() + + def tearDown(self): + self.mock_res_manager_patcher.stop() + + def test_safe_command_ls(self): + """Test that 'ls' is safe.""" + is_safe, reason = self.troubleshooter._is_command_safe("ls -la") + self.assertTrue(is_safe) + self.assertEqual(reason, "") + + def test_safe_command_df(self): + """Test that 'df -h' is safe.""" + is_safe, reason = self.troubleshooter._is_command_safe("df -h") + self.assertTrue(is_safe) + self.assertEqual(reason, "") + + def test_safe_command_systemctl_status(self): + """Test that 'systemctl status' is safe.""" + is_safe, reason = self.troubleshooter._is_command_safe("systemctl status docker") + self.assertTrue(is_safe) + self.assertEqual(reason, "") + + def test_dangerous_rm_rf(self): + """Test that 'rm -rf' is blocked.""" + is_safe, reason = self.troubleshooter._is_command_safe("rm -rf /tmp/test") + self.assertFalse(is_safe) + self.assertIn("dangerous", reason.lower()) + + def test_dangerous_rm_rf_slash(self): + """Test that 'rm -rf /' is blocked.""" + is_safe, reason = self.troubleshooter._is_command_safe("rm -rf /") + self.assertFalse(is_safe) + + def test_dangerous_rm_fr(self): + """Test that 'rm -fr' is blocked.""" + is_safe, reason = self.troubleshooter._is_command_safe("rm -fr /home/user") + self.assertFalse(is_safe) + + def test_dangerous_mkfs(self): + """Test that 'mkfs' is blocked.""" + is_safe, reason = self.troubleshooter._is_command_safe("mkfs.ext4 /dev/sda1") + self.assertFalse(is_safe) + + def test_dangerous_dd(self): + """Test that 'dd' to device is blocked.""" + is_safe, reason = self.troubleshooter._is_command_safe("dd if=/dev/zero of=/dev/sda") + self.assertFalse(is_safe) + + def test_dangerous_shutdown(self): + """Test that 'shutdown' is blocked.""" + is_safe, reason = self.troubleshooter._is_command_safe("shutdown -h now") + self.assertFalse(is_safe) + + def test_dangerous_reboot(self): + """Test that 'reboot' is blocked.""" + is_safe, reason = self.troubleshooter._is_command_safe("reboot") + self.assertFalse(is_safe) + + def test_dangerous_chmod_777_root(self): + """Test that 'chmod 777 /' is blocked.""" + is_safe, reason = self.troubleshooter._is_command_safe("chmod 777 /") + self.assertFalse(is_safe) + + def test_safe_chmod_normal(self): + """Test that 'chmod 755' on a normal directory is safe.""" + is_safe, reason = self.troubleshooter._is_command_safe("chmod 755 /tmp/mydir") + self.assertTrue(is_safe) + + +class TestExecuteCommand(unittest.TestCase): + """Tests for _execute_command method.""" + + def setUp(self): + """Set up test fixtures.""" + self.mock_res_manager_patcher = patch("cortex.troubleshoot.ResolutionManager") + self.mock_res_manager = self.mock_res_manager_patcher.start() + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "fake-key", "fake", "test") + with patch("cortex.troubleshoot.AskHandler"): + self.troubleshooter = Troubleshooter() + + def tearDown(self): + self.mock_res_manager_patcher.stop() + + def test_execute_simple_command(self): + """Test executing a simple echo command.""" + output = self.troubleshooter._execute_command("echo 'hello world'") + self.assertIn("hello world", output) + + def test_execute_command_with_stderr(self): + """Test command that produces stderr.""" + output = self.troubleshooter._execute_command("ls /nonexistent_directory_12345") + self.assertIn("[STDERR]", output) + + def test_execute_command_captures_output(self): + """Test that stdout is captured.""" + output = self.troubleshooter._execute_command("echo 'test output'") + self.assertEqual(output.strip(), "test output") + + @patch("subprocess.run") + def test_execute_command_timeout(self, mock_run): + """Test command timeout handling.""" + import subprocess + + mock_run.side_effect = subprocess.TimeoutExpired(cmd="sleep 100", timeout=30) + output = self.troubleshooter._execute_command("sleep 100") + self.assertIn("Error executing command", output) + + @patch("cortex.troubleshoot.shutil.which") + @patch("cortex.troubleshoot.subprocess.run") + def test_execute_command_with_firejail(self, mock_run, mock_which): + """Test that command is sandboxed when firejail is available.""" + mock_which.return_value = "/usr/bin/firejail" + mock_run.return_value = MagicMock(stdout="output", stderr="", returncode=0) + + self.troubleshooter._execute_command("ls") + + # Verify firejail was used + args, _ = mock_run.call_args + self.assertIn("firejail", args[0]) + self.assertIn("ls", args[0]) + + @patch("cortex.troubleshoot.shutil.which") + @patch("cortex.troubleshoot.subprocess.run") + def test_execute_command_without_firejail(self, mock_run, mock_which): + """Test that command is NOT sandboxed when firejail is missing.""" + mock_which.return_value = None + mock_run.return_value = MagicMock(stdout="output", stderr="", returncode=0) + + self.troubleshooter._execute_command("ls") + + # Verify firejail was NOT used + args, _ = mock_run.call_args + self.assertNotIn("firejail", args[0]) + self.assertEqual(args[0], "ls") + + +class TestDangerousPatterns(unittest.TestCase): + """Tests for DANGEROUS_PATTERNS constant.""" + + def test_patterns_list_not_empty(self): + """Test that dangerous patterns list is not empty.""" + self.assertGreater(len(DANGEROUS_PATTERNS), 0) + + def test_patterns_are_valid_regex(self): + """Test that all patterns are valid regex.""" + import re + + for pattern in DANGEROUS_PATTERNS: + try: + re.compile(pattern) + except re.error as e: + self.fail(f"Invalid regex pattern: {pattern} - {e}") + + +class TestGetProvider(unittest.TestCase): + """Tests for _get_provider method.""" + + def setUp(self): + self.mock_res_manager_patcher = patch("cortex.troubleshoot.ResolutionManager") + self.mock_res_manager = self.mock_res_manager_patcher.start() + + def tearDown(self): + self.mock_res_manager_patcher.stop() + + def test_get_provider_returns_claude_for_anthropic(self): + """Test that 'anthropic' is mapped to 'claude'.""" + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "sk-ant-xxx", "anthropic", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + self.assertEqual(troubleshooter.provider, "claude") + + def test_get_provider_returns_openai(self): + """Test that 'openai' is returned correctly.""" + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "sk-xxx", "openai", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + self.assertEqual(troubleshooter.provider, "openai") + + def test_get_provider_defaults_to_openai(self): + """Test that None provider defaults to 'openai'.""" + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (False, None, None, None) + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + self.assertEqual(troubleshooter.provider, "openai") + + +class TestGetApiKey(unittest.TestCase): + """Tests for _get_api_key method.""" + + def setUp(self): + self.mock_res_manager_patcher = patch("cortex.troubleshoot.ResolutionManager") + self.mock_res_manager = self.mock_res_manager_patcher.start() + + def tearDown(self): + self.mock_res_manager_patcher.stop() + + def test_get_api_key_returns_key(self): + """Test that API key is returned correctly.""" + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-api-key", "openai", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + self.assertEqual(troubleshooter.api_key, "test-api-key") + + def test_get_api_key_returns_empty_on_none(self): + """Test that None key returns empty string.""" + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (False, None, "openai", None) + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + self.assertEqual(troubleshooter.api_key, "") + + +class TestStart(unittest.TestCase): + """Tests for start method.""" + + def setUp(self): + self.mock_res_manager_patcher = patch("cortex.troubleshoot.ResolutionManager") + self.mock_res_manager = self.mock_res_manager_patcher.start() + + def tearDown(self): + self.mock_res_manager_patcher.stop() + + @patch("cortex.troubleshoot.console") + def test_start_no_ai_returns_error(self, mock_console): + """Test that start returns 1 when AI is unavailable.""" + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (False, None, "openai", None) + with patch("cortex.troubleshoot.AskHandler") as mock_handler: + mock_handler.side_effect = Exception("No API key") + troubleshooter = Troubleshooter() + troubleshooter.ai = None + result = troubleshooter.start() + self.assertEqual(result, 1) + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Troubleshooter._interactive_loop") + def test_start_with_ai_calls_loop(self, mock_loop, mock_console): + """Test that start calls interactive loop when AI is available.""" + mock_loop.return_value = 0 + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "openai", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + result = troubleshooter.start() + mock_loop.assert_called_once() + self.assertEqual(result, 0) + + +class TestInteractiveLoop(unittest.TestCase): + """Tests for _interactive_loop method.""" + + def setUp(self): + self.mock_res_manager_patcher = patch("cortex.troubleshoot.ResolutionManager") + self.mock_res_manager_cls = self.mock_res_manager_patcher.start() + self.mock_res_manager = self.mock_res_manager_cls.return_value + # Default search returns empty list to avoid side effects + self.mock_res_manager.search.return_value = [] + + def tearDown(self): + self.mock_res_manager_patcher.stop() + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + @patch("cortex.troubleshoot.Confirm") + def test_exit_command(self, mock_confirm, mock_prompt, mock_console): + """Test that 'exit' command exits the loop.""" + mock_prompt.ask.return_value = "exit" + mock_confirm.ask.return_value = False # User says "No" to learning + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + troubleshooter.messages = [{"role": "system", "content": "test"}] + result = troubleshooter._interactive_loop() + self.assertEqual(result, 0) + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + @patch("cortex.troubleshoot.Confirm") + def test_quit_command(self, mock_confirm, mock_prompt, mock_console): + """Test that 'quit' command exits the loop.""" + mock_prompt.ask.return_value = "quit" + mock_confirm.ask.return_value = False # User says "No" to learning + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + troubleshooter.messages = [{"role": "system", "content": "test"}] + result = troubleshooter._interactive_loop() + self.assertEqual(result, 0) + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + @patch("cortex.doctor.SystemDoctor") + def test_doctor_command(self, mock_doctor, mock_prompt, mock_console): + """Test that 'doctor' command runs SystemDoctor.""" + # First call returns 'doctor', second call returns 'exit' + mock_prompt.ask.side_effect = ["doctor", "exit"] + mock_doctor_instance = MagicMock() + mock_doctor.return_value = mock_doctor_instance + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + troubleshooter.messages = [{"role": "system", "content": "test"}] + troubleshooter._interactive_loop() + # The doctor command should be called via the import inside the loop + # Since SystemDoctor is imported inside the function, we skip this assertion + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + def test_help_command(self, mock_prompt, mock_console): + """Test that 'help' command creates log file and prints instructions.""" + mock_prompt.ask.side_effect = ["help", "exit"] + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + troubleshooter.messages = [{"role": "system", "content": "test"}] + + # Mock AI for summary generation + mock_ai = MagicMock() + mock_ai.ask.return_value = "Issue Summary: Test issue" + troubleshooter.ai = mock_ai + + # Mock file opening to avoid actual file creation + with patch("builtins.open", unittest.mock.mock_open()) as mock_file: + with patch("os.path.abspath", return_value="/abs/path/to/log"): + troubleshooter._interactive_loop() + + # Verify file was opened for writing + mock_file.assert_called_with("cortex_support_log.txt", "w") + + # Verify content was written + handle = mock_file() + handle.write.assert_any_call("Cortex Troubleshooting Log\n") + handle.write.assert_any_call("Issue Summary:\n") + handle.write.assert_any_call("Issue Summary: Test issue") + + # Verify instructions were printed + mock_console.print.assert_any_call( + "Please open a new issue and attach the cortex_support_log.txt file." + ) + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + @patch("cortex.troubleshoot.Confirm") + def test_learning_on_exit(self, mock_confirm, mock_prompt, mock_console): + """Test that we learn from successful sessions on exit.""" + mock_prompt.ask.side_effect = ["exit"] + mock_confirm.ask.return_value = True # User says "Yes, problem solved" + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + with patch("cortex.troubleshoot.ResolutionManager") as MockResManager: + troubleshooter = Troubleshooter() + troubleshooter.messages = [{"role": "system", "content": "test"}] + + # Mock AI extraction + mock_ai = MagicMock() + mock_ai.ask.return_value = '{"issue": "test issue", "fix": "test fix"}' + troubleshooter.ai = mock_ai + + troubleshooter._interactive_loop() + + # Verify save was called + troubleshooter.resolutions.save.assert_called_with("test issue", "test fix") + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + def test_dynamic_recall(self, mock_prompt, mock_console): + """Test that we search for resolutions and inject them.""" + mock_prompt.ask.side_effect = ["docker fail", "exit"] + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + with patch("cortex.troubleshoot.ResolutionManager") as MockResManager: + troubleshooter = Troubleshooter() + + # Mock search results + mock_manager = MockResManager.return_value + mock_manager.search.return_value = [ + {"issue": "Docker fail", "fix": "systemctl start docker"} + ] + troubleshooter.resolutions = mock_manager + + # Mock AI + mock_ai = MagicMock() + mock_ai.ask.return_value = "Try this" + troubleshooter.ai = mock_ai + + troubleshooter._interactive_loop() + + # Verify search was called + mock_manager.search.assert_called_with("docker fail") + + # Verify injection (by checking the call to ai.ask) + args, kwargs = mock_ai.ask.call_args + system_prompt = kwargs.get("system_prompt", "") + self.assertIn("[MEMORY]", system_prompt) + self.assertIn("systemctl start docker", system_prompt) + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + @patch("cortex.troubleshoot.Markdown") + def test_user_input_sent_to_ai(self, mock_md, mock_prompt, mock_console): + """Test that user input is sent to AI.""" + mock_prompt.ask.side_effect = ["my issue", "exit"] + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + + # Create and inject mock AI + mock_ai = MagicMock() + mock_ai.ask.return_value = "Here is my response." + troubleshooter.ai = mock_ai + troubleshooter.messages = [{"role": "system", "content": "test"}] + + troubleshooter._interactive_loop() + # Verify that AI was called with user input + self.assertTrue(mock_ai.ask.called) + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + @patch("cortex.troubleshoot.Markdown") + @patch("cortex.troubleshoot.Syntax") + @patch("cortex.troubleshoot.Confirm") + @patch("cortex.troubleshoot.Panel") + def test_command_execution_flow( + self, mock_panel, mock_confirm, mock_syntax, mock_md, mock_prompt, mock_console + ): + """Test the full command execution flow.""" + # AI returns a response with a bash code block + mock_prompt.ask.side_effect = ["check disk", "exit"] + mock_confirm.ask.return_value = True # User confirms execution + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + + mock_ai = MagicMock() + # First response has a command, second is analysis + mock_ai.ask.side_effect = [ + "Run this:\n```bash\ndf -h\n```", + "Disk looks good!", + ] + troubleshooter.ai = mock_ai + troubleshooter.messages = [{"role": "system", "content": "test"}] + + with patch.object(troubleshooter, "_execute_command") as mock_exec: + mock_exec.return_value = "Disk output here" + troubleshooter._interactive_loop() + # Verify command was executed + mock_exec.assert_called_once_with("df -h") + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + @patch("cortex.troubleshoot.Markdown") + @patch("cortex.troubleshoot.Syntax") + def test_dangerous_command_blocked(self, mock_syntax, mock_md, mock_prompt, mock_console): + """Test that dangerous commands are blocked.""" + mock_prompt.ask.side_effect = ["delete everything", "exit"] + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + + mock_ai = MagicMock() + mock_ai.ask.return_value = "Run this:\n```bash\nrm -rf /\n```" + troubleshooter.ai = mock_ai + troubleshooter.messages = [{"role": "system", "content": "test"}] + + with patch.object(troubleshooter, "_execute_command") as mock_exec: + troubleshooter._interactive_loop() + # Verify command was NOT executed (blocked) + mock_exec.assert_not_called() + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + def test_keyboard_interrupt_returns_130(self, mock_prompt, mock_console): + """Test that KeyboardInterrupt returns exit code 130.""" + mock_prompt.ask.side_effect = KeyboardInterrupt() + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + troubleshooter.messages = [{"role": "system", "content": "test"}] + result = troubleshooter._interactive_loop() + self.assertEqual(result, 130) + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + def test_exception_returns_1(self, mock_prompt, mock_console): + """Test that exceptions return exit code 1.""" + mock_prompt.ask.side_effect = RuntimeError("Test error") + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + troubleshooter.messages = [{"role": "system", "content": "test"}] + result = troubleshooter._interactive_loop() + self.assertEqual(result, 1) + + @patch("cortex.troubleshoot.console") + @patch("cortex.troubleshoot.Prompt") + @patch("cortex.troubleshoot.Markdown") + @patch("cortex.troubleshoot.Syntax") + @patch("cortex.troubleshoot.Confirm") + def test_user_declines_command( + self, mock_confirm, mock_syntax, mock_md, mock_prompt, mock_console + ): + """Test that declining command execution skips it.""" + mock_prompt.ask.side_effect = ["run something", "exit"] + mock_confirm.ask.return_value = False # User declines + + with patch("cortex.troubleshoot.auto_detect_api_key") as mock_detect: + mock_detect.return_value = (True, "test-key", "fake", "env") + with patch("cortex.troubleshoot.AskHandler"): + troubleshooter = Troubleshooter() + + mock_ai = MagicMock() + mock_ai.ask.return_value = "Run this:\n```bash\necho hello\n```" + troubleshooter.ai = mock_ai + troubleshooter.messages = [{"role": "system", "content": "test"}] + + with patch.object(troubleshooter, "_execute_command") as mock_exec: + troubleshooter._interactive_loop() + # Verify command was NOT executed (user declined) + mock_exec.assert_not_called() + + +if __name__ == "__main__": + unittest.main()