diff --git a/.gitignore b/.gitignore index ad7f433d..216e0224 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,8 @@ __pycache__/ # ============================== env/ venv/ +myenv/ +venv312/ ENV/ env.bak/ venv.bak/ diff --git a/README.md b/README.md index 19948549..62688c00 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ cortex install "tools for video compression" | Feature | Description | |---------|-------------| | **Natural Language** | Describe what you need in plain English | +| **Voice Input** | Hands-free mode with Whisper speech recognition ([F9 to speak](docs/VOICE_INPUT.md)) | | **Dry-Run Default** | Preview all commands before execution | | **Sandboxed Execution** | Commands run in Firejail isolation | | **Full Rollback** | Undo any installation with `cortex rollback` | diff --git a/cortex/api_key_detector.py b/cortex/api_key_detector.py index fb8535e5..efffdcad 100644 --- a/cortex/api_key_detector.py +++ b/cortex/api_key_detector.py @@ -123,7 +123,23 @@ def detect(self) -> tuple[bool, str | None, str | None, str | None]: return result or (False, None, None, None) def _check_environment_api_keys(self) -> tuple[bool, str, str, str] | None: - """Check for API keys in environment variables.""" + """Check for API keys in environment variables. + + If CORTEX_PROVIDER is explicitly set, prefer that provider's key first. + """ + explicit_provider = os.environ.get("CORTEX_PROVIDER", "").lower() + + # If user explicitly set CORTEX_PROVIDER to a specific provider, check that key first + if explicit_provider in ["openai", "claude"]: + target_env_var = ( + "OPENAI_API_KEY" if explicit_provider == "openai" else "ANTHROPIC_API_KEY" + ) + target_provider = "openai" if explicit_provider == "openai" else "anthropic" + value = os.environ.get(target_env_var) + if value: + return (True, value, target_provider, "environment") + + # Otherwise check all providers in default order for env_var, provider in ENV_VAR_PROVIDERS.items(): value = os.environ.get(env_var) if value: @@ -141,7 +157,25 @@ def _check_encrypted_storage(self) -> tuple[bool, str, str, str] | None: env_mgr = get_env_manager() - # Check for API keys in encrypted storage + # If CORTEX_PROVIDER is explicitly set, check that provider's key first + explicit_provider = os.environ.get("CORTEX_PROVIDER", "").lower() + if explicit_provider in ["openai", "claude"]: + target_env_var = ( + "OPENAI_API_KEY" if explicit_provider == "openai" else "ANTHROPIC_API_KEY" + ) + target_provider = "openai" if explicit_provider == "openai" else "anthropic" + value = env_mgr.get_variable(app="cortex", key=target_env_var, decrypt=True) + if value: + os.environ[target_env_var] = value + logger.debug(f"Loaded {target_env_var} from encrypted storage") + return ( + True, + value, + target_provider, + "encrypted storage (~/.cortex/environments/)", + ) + + # Check for API keys in encrypted storage (default order) for env_var, provider in ENV_VAR_PROVIDERS.items(): value = env_mgr.get_variable(app="cortex", key=env_var, decrypt=True) if value: diff --git a/cortex/branding.py b/cortex/branding.py index 0dea1f35..5c4a5850 100644 --- a/cortex/branding.py +++ b/cortex/branding.py @@ -5,10 +5,13 @@ Uses Rich library for cross-platform terminal styling. """ +import sys + from rich.console import Console from rich.panel import Panel -console = Console() +# Use force_terminal and legacy_windows for better Windows compatibility +console = Console(force_terminal=True, legacy_windows=sys.platform == "win32") # Brand colors CORTEX_CYAN = "cyan" @@ -57,13 +60,23 @@ def cx_print(message: str, status: str = "info"): """ badge = "[bold white on dark_cyan] CX [/bold white on dark_cyan]" - status_icons = { - "info": "[dim]│[/dim]", - "success": "[green]✓[/green]", - "warning": "[yellow]⚠[/yellow]", - "error": "[red]✗[/red]", - "thinking": "[cyan]⠋[/cyan]", # Spinner frame - } + # Use ASCII-only icons on Windows for better compatibility + if sys.platform == "win32": + status_icons = { + "info": "[dim]|[/dim]", + "success": "[green]+[/green]", + "warning": "[yellow]![/yellow]", + "error": "[red]x[/red]", + "thinking": "[cyan]*[/cyan]", + } + else: + status_icons = { + "info": "[dim]│[/dim]", + "success": "[green]✓[/green]", + "warning": "[yellow]⚠[/yellow]", + "error": "[red]✗[/red]", + "thinking": "[cyan]⠋[/cyan]", # Spinner frame + } icon = status_icons.get(status, status_icons["info"]) console.print(f"{badge} {icon} {message}") @@ -73,10 +86,11 @@ def cx_step(step_num: int, total: int, message: str): """ Print a numbered step with the CX badge. - Example: CX │ [1/4] Updating package lists... + Example: CX | [1/4] Updating package lists... """ badge = "[bold white on dark_cyan] CX [/bold white on dark_cyan]" - console.print(f"{badge} [dim]│[/dim] [{step_num}/{total}] {message}") + separator = "|" if sys.platform == "win32" else "│" + console.print(f"{badge} [dim]{separator}[/dim] [{step_num}/{total}] {message}") def cx_header(title: str): @@ -84,7 +98,8 @@ def cx_header(title: str): Print a section header. """ console.print() - console.print(f"[bold cyan]━━━ {title} ━━━[/bold cyan]") + separator = "---" if sys.platform == "win32" else "━━━" + console.print(f"[bold cyan]{separator} {title} {separator}[/bold cyan]") console.print() diff --git a/cortex/cli.py b/cortex/cli.py index ea8976d1..781ed3d7 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -7,7 +7,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any -from cortex.api_key_detector import auto_detect_api_key, setup_api_key +from cortex.api_key_detector import setup_api_key from cortex.ask import AskHandler from cortex.branding import VERSION, console, cx_header, cx_print, show_banner from cortex.coordinator import InstallationCoordinator, InstallationStep, StepStatus @@ -16,7 +16,6 @@ DependencyImporter, PackageEcosystem, ParseResult, - format_package_list, ) from cortex.env_manager import EnvironmentManager, get_env_manager from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType @@ -24,7 +23,7 @@ from cortex.network_config import NetworkConfig from cortex.notification_manager import NotificationManager from cortex.stack_manager import StackManager -from cortex.validators import validate_api_key, validate_install_request +from cortex.validators import validate_install_request if TYPE_CHECKING: from cortex.shell_env_analyzer import ShellEnvironmentAnalyzer @@ -37,6 +36,9 @@ class CortexCLI: + # Installation messages + INSTALL_FAIL_MSG = "Installation failed" + def __init__(self, verbose: bool = False): self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] self.spinner_idx = 0 @@ -150,25 +152,26 @@ def _get_api_key(self) -> str | None: return None def _get_provider(self) -> str: - # Check environment variable for explicit provider choice + # 1. Check explicit provider override FIRST (highest priority) explicit_provider = os.environ.get("CORTEX_PROVIDER", "").lower() if explicit_provider in ["ollama", "openai", "claude", "fake"]: + self._debug(f"Using explicit CORTEX_PROVIDER={explicit_provider}") return explicit_provider - # Use provider from auto-detection (set by _get_api_key) + # 2. Use provider from auto-detection (set by _get_api_key) detected = getattr(self, "_detected_provider", None) if detected == "anthropic": return "claude" elif detected == "openai": return "openai" - # Check env vars (may have been set by auto-detect) + # 3. Check env vars (may have been set by auto-detect) if os.environ.get("ANTHROPIC_API_KEY"): return "claude" elif os.environ.get("OPENAI_API_KEY"): return "openai" - # Fallback to Ollama for offline mode + # 4. Fallback to Ollama for offline mode return "ollama" def _print_status(self, emoji: str, message: str): @@ -389,7 +392,6 @@ def sandbox(self, args: argparse.Namespace) -> int: DockerSandbox, SandboxAlreadyExistsError, SandboxNotFoundError, - SandboxTestStatus, ) action = getattr(args, "sandbox_action", None) @@ -632,31 +634,303 @@ def ask(self, question: str) -> int: self._print_error(str(e)) return 1 + def voice(self, continuous: bool = False, model: str | None = None) -> int: + """Handle voice input mode. + + Args: + continuous: If True, stay in voice mode until Ctrl+C. + If False, record single input and exit. + model: Whisper model name (e.g., 'base.en', 'small.en'). + If None, uses CORTEX_WHISPER_MODEL env var or 'base.en'. + """ + try: + from cortex.voice import VoiceInputError, VoiceInputHandler + except ImportError: + self._print_error("Voice dependencies not installed.") + cx_print("Install with: pip install cortex-linux[voice]", "info") + return 1 + + api_key = self._get_api_key() + if not api_key: + return 1 + + # Display model information if specified + if model: + model_info = { + "tiny.en": "(39 MB, fastest, good for clear speech)", + "base.en": "(140 MB, balanced speed/accuracy)", + "small.en": "(466 MB, better accuracy)", + "medium.en": "(1.5 GB, high accuracy)", + "tiny": "(39 MB, multilingual)", + "base": "(290 MB, multilingual)", + "small": "(968 MB, multilingual)", + "medium": "(3 GB, multilingual)", + "large": "(6 GB, best accuracy, multilingual)", + } + cx_print(f"Using Whisper model: {model} {model_info.get(model, '')}", "info") + + def process_voice_command(text: str) -> None: + """Process transcribed voice command.""" + if not text: + return + + # Determine if this is an install command or a question + text_lower = text.lower().strip() + is_install = any( + text_lower.startswith(word) for word in ["install", "setup", "add", "get", "put"] + ) + + if is_install: + # Remove the command verb for install + software = text + for verb in ["install", "setup", "add", "get", "put"]: + if text_lower.startswith(verb): + software = text[len(verb) :].strip() + break + + # Validate software name + if not software or len(software) > 200: + cx_print("Invalid software name", "error") + return + + # Check for dangerous characters that shouldn't be in package names + dangerous_chars = [";", "&", "|", "`", "$", "(", ")"] + if any(char in software for char in dangerous_chars): + cx_print("Invalid characters detected in software name", "error") + return + + cx_print(f"Installing: {software}", "info") + + # Ask user for confirmation + console.print() + console.print("[bold cyan]Choose an action:[/bold cyan]") + console.print(" [1] Dry run (preview commands)") + console.print(" [2] Execute (run commands)") + console.print(" [3] Cancel") + console.print() + + try: + choice = input("Enter choice [1/2/3]: ").strip() + + if choice == "1": + self.install(software, execute=False, dry_run=True) + elif choice == "2": + cx_print("Executing installation...", "info") + self.install(software, execute=True, dry_run=False) + else: + cx_print("Cancelled.", "info") + except (KeyboardInterrupt, EOFError): + cx_print("\nCancelled.", "info") + else: + # Treat as a question + cx_print(f"Question: {text}", "info") + self.ask(text) + + handler = None + try: + handler = VoiceInputHandler(model_name=model) + + if continuous: + # Continuous voice mode + handler.start_voice_mode(process_voice_command) + else: + # Single recording mode + text = handler.record_single() + if text: + process_voice_command(text) + else: + cx_print("No speech detected.", "warning") + + return 0 + + except VoiceInputError as e: + self._print_error(str(e)) + return 1 + except KeyboardInterrupt: + cx_print("\nVoice mode exited.", "info") + return 0 + finally: + # Ensure cleanup even if exceptions occur + if handler is not None: + try: + handler.stop() + except Exception as e: + # Log cleanup errors but don't raise + logging.debug("Error during voice handler cleanup: %s", e) + + def _normalize_software_name(self, software: str) -> str: + """Normalize software name by cleaning whitespace. + + Returns a natural-language description suitable for LLM interpretation. + Does NOT return shell commands - all command generation must go through + the LLM and validation pipeline. + """ + # Just normalize whitespace - return natural language description + return " ".join(software.split()) + + def _record_history_error( + self, + history: InstallationHistory, + install_id: str | None, + error: str, + ) -> None: + """Record installation error to history.""" + if install_id: + history.update_installation(install_id, InstallationStatus.FAILED, error) + + def _handle_parallel_execution( + self, + commands: list[str], + software: str, + install_id: str | None, + history: InstallationHistory, + ) -> int: + """Handle parallel installation execution.""" + import asyncio + + from cortex.install_parallel import run_parallel_install + + def parallel_log_callback(message: str, level: str = "info"): + if level == "success": + cx_print(f" ✅ {message}", "success") + elif level == "error": + cx_print(f" ❌ {message}", "error") + else: + cx_print(f" ℹ {message}", "info") + + try: + success, parallel_tasks = asyncio.run( + run_parallel_install( + commands=commands, + descriptions=[f"Step {i + 1}" for i in range(len(commands))], + timeout=300, + stop_on_error=True, + log_callback=parallel_log_callback, + ) + ) + + if success: + total_duration = self._calculate_duration(parallel_tasks) + self._print_success(f"{software} installed successfully!") + print(f"\nCompleted in {total_duration:.2f} seconds (parallel mode)") + if install_id: + history.update_installation(install_id, InstallationStatus.SUCCESS) + print(f"\n📝 Installation recorded (ID: {install_id})") + print(f" To rollback: cortex rollback {install_id}") + return 0 + + error_msg = self._get_parallel_error_msg(parallel_tasks) + self._record_history_error(history, install_id, error_msg) + self._print_error(self.INSTALL_FAIL_MSG) + if error_msg: + print(f" Error: {error_msg}", file=sys.stderr) + if install_id: + print(f"\n📝 Installation recorded (ID: {install_id})") + print(f" View details: cortex history {install_id}") + return 1 + + except (ValueError, OSError) as e: + self._record_history_error(history, install_id, str(e)) + self._print_error(f"Parallel execution failed: {str(e)}") + return 1 + except Exception as e: + self._record_history_error(history, install_id, str(e)) + self._print_error(f"Unexpected parallel execution error: {str(e)}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 + + def _calculate_duration(self, parallel_tasks: list) -> float: + """Calculate total duration from parallel tasks.""" + if not parallel_tasks: + return 0.0 + + max_end = max( + (t.end_time for t in parallel_tasks if t.end_time is not None), + default=None, + ) + min_start = min( + (t.start_time for t in parallel_tasks if t.start_time is not None), + default=None, + ) + if max_end is not None and min_start is not None: + return max_end - min_start + return 0.0 + + def _get_parallel_error_msg(self, parallel_tasks: list) -> str: + """Extract error message from failed parallel tasks.""" + failed_tasks = [t for t in parallel_tasks if getattr(t.status, "value", "") == "failed"] + return failed_tasks[0].error if failed_tasks else self.INSTALL_FAIL_MSG + + def _handle_sequential_execution( + self, + commands: list[str], + software: str, + install_id: str | None, + history: InstallationHistory, + ) -> int: + """Handle sequential installation execution.""" + + def progress_callback(current, total, step): + status_emoji = "⏳" + if step.status == StepStatus.SUCCESS: + status_emoji = "✅" + elif step.status == StepStatus.FAILED: + status_emoji = "❌" + print(f"\n[{current}/{total}] {status_emoji} {step.description}") + print(f" Command: {step.command}") + + coordinator = InstallationCoordinator( + commands=commands, + descriptions=[f"Step {i + 1}" for i in range(len(commands))], + timeout=300, + stop_on_error=True, + progress_callback=progress_callback, + ) + + result = coordinator.execute() + + if result.success: + self._print_success(f"{software} installed successfully!") + print(f"\nCompleted in {result.total_duration:.2f} seconds") + if install_id: + history.update_installation(install_id, InstallationStatus.SUCCESS) + print(f"\n📝 Installation recorded (ID: {install_id})") + print(f" To rollback: cortex rollback {install_id}") + return 0 + + # Handle failure + self._record_history_error( + history, install_id, result.error_message or self.INSTALL_FAIL_MSG + ) + if result.failed_step is not None: + self._print_error(f"{self.INSTALL_FAIL_MSG} at step {result.failed_step + 1}") + else: + self._print_error(self.INSTALL_FAIL_MSG) + if result.error_message: + print(f" Error: {result.error_message}", file=sys.stderr) + if install_id: + print(f"\n📝 Installation recorded (ID: {install_id})") + print(f" View details: cortex history {install_id}") + return 1 + def install( self, software: str, execute: bool = False, dry_run: bool = False, parallel: bool = False, - ): + ) -> int: + """Install software using the LLM-powered package manager.""" # Validate input first is_valid, error = validate_install_request(software) if not is_valid: self._print_error(error) return 1 - # Special-case the ml-cpu stack: - # The LLM sometimes generates outdated torch==1.8.1+cpu installs - # which fail on modern Python. For the "pytorch-cpu jupyter numpy pandas" - # combo, force a supported CPU-only PyTorch recipe instead. - normalized = " ".join(software.split()).lower() - - if normalized == "pytorch-cpu jupyter numpy pandas": - software = ( - "pip3 install torch torchvision torchaudio " - "--index-url https://download.pytorch.org/whl/cpu && " - "pip3 install jupyter numpy pandas" - ) + software = self._normalize_software_name(software) api_key = self._get_api_key() if not api_key: @@ -673,11 +947,9 @@ def install( try: self._print_status("🧠", "Understanding request...") - interpreter = CommandInterpreter(api_key=api_key, provider=provider) self._print_status("📦", "Planning installation...") - for _ in range(10): self._animate_spinner("Analyzing system requirements...") self._clear_line() @@ -710,169 +982,31 @@ def install( history.update_installation(install_id, InstallationStatus.SUCCESS) return 0 - if execute: - - def progress_callback(current, total, step): - status_emoji = "⏳" - if step.status == StepStatus.SUCCESS: - status_emoji = "✅" - elif step.status == StepStatus.FAILED: - status_emoji = "❌" - print(f"\n[{current}/{total}] {status_emoji} {step.description}") - print(f" Command: {step.command}") - - print("\nExecuting commands...") - - if parallel: - import asyncio - - from cortex.install_parallel import run_parallel_install - - def parallel_log_callback(message: str, level: str = "info"): - if level == "success": - cx_print(f" ✅ {message}", "success") - elif level == "error": - cx_print(f" ❌ {message}", "error") - else: - cx_print(f" ℹ {message}", "info") - - try: - success, parallel_tasks = asyncio.run( - run_parallel_install( - commands=commands, - descriptions=[f"Step {i + 1}" for i in range(len(commands))], - timeout=300, - stop_on_error=True, - log_callback=parallel_log_callback, - ) - ) - - total_duration = 0.0 - if parallel_tasks: - max_end = max( - (t.end_time for t in parallel_tasks if t.end_time is not None), - default=None, - ) - min_start = min( - (t.start_time for t in parallel_tasks if t.start_time is not None), - default=None, - ) - if max_end is not None and min_start is not None: - total_duration = max_end - min_start - - if success: - self._print_success(f"{software} installed successfully!") - print(f"\nCompleted in {total_duration:.2f} seconds (parallel mode)") - - if install_id: - history.update_installation(install_id, InstallationStatus.SUCCESS) - print(f"\n📝 Installation recorded (ID: {install_id})") - print(f" To rollback: cortex rollback {install_id}") - - return 0 - - failed_tasks = [ - t for t in parallel_tasks if getattr(t.status, "value", "") == "failed" - ] - error_msg = failed_tasks[0].error if failed_tasks else "Installation failed" - - if install_id: - history.update_installation( - install_id, - InstallationStatus.FAILED, - error_msg, - ) - - self._print_error("Installation failed") - if error_msg: - print(f" Error: {error_msg}", file=sys.stderr) - if install_id: - print(f"\n📝 Installation recorded (ID: {install_id})") - print(f" View details: cortex history {install_id}") - return 1 - - except (ValueError, OSError) as e: - if install_id: - history.update_installation( - install_id, InstallationStatus.FAILED, str(e) - ) - self._print_error(f"Parallel execution failed: {str(e)}") - return 1 - except Exception as e: - if install_id: - history.update_installation( - install_id, InstallationStatus.FAILED, str(e) - ) - self._print_error(f"Unexpected parallel execution error: {str(e)}") - if self.verbose: - import traceback - - traceback.print_exc() - return 1 - - coordinator = InstallationCoordinator( - commands=commands, - descriptions=[f"Step {i + 1}" for i in range(len(commands))], - timeout=300, - stop_on_error=True, - progress_callback=progress_callback, - ) - - result = coordinator.execute() - - if result.success: - self._print_success(f"{software} installed successfully!") - print(f"\nCompleted in {result.total_duration:.2f} seconds") - - # Record successful installation - if install_id: - history.update_installation(install_id, InstallationStatus.SUCCESS) - print(f"\n📝 Installation recorded (ID: {install_id})") - print(f" To rollback: cortex rollback {install_id}") - - return 0 - else: - # Record failed installation - if install_id: - error_msg = result.error_message or "Installation failed" - history.update_installation( - install_id, InstallationStatus.FAILED, error_msg - ) - - if result.failed_step is not None: - self._print_error(f"Installation failed at step {result.failed_step + 1}") - else: - self._print_error("Installation failed") - if result.error_message: - print(f" Error: {result.error_message}", file=sys.stderr) - if install_id: - print(f"\n📝 Installation recorded (ID: {install_id})") - print(f" View details: cortex history {install_id}") - return 1 - else: + if not execute: print("\nTo execute these commands, run with --execute flag") print("Example: cortex install docker --execute") + return 0 - return 0 + print("\nExecuting commands...") + if parallel: + return self._handle_parallel_execution(commands, software, install_id, history) + + return self._handle_sequential_execution(commands, software, install_id, history) except ValueError as e: - if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + self._record_history_error(history, install_id, str(e)) self._print_error(str(e)) return 1 except RuntimeError as e: - if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + self._record_history_error(history, install_id, str(e)) self._print_error(f"API call failed: {str(e)}") return 1 except OSError as e: - if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + self._record_history_error(history, install_id, str(e)) self._print_error(f"System error: {str(e)}") return 1 except Exception as e: - if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + self._record_history_error(history, install_id, str(e)) self._print_error(f"Unexpected error: {str(e)}") if self.verbose: import traceback @@ -1897,7 +2031,6 @@ def _display_parse_result(self, result: ParseResult, include_dev: bool) -> None: } ecosystem_name = ecosystem_names.get(result.ecosystem, "Unknown") - filename = os.path.basename(result.file_path) cx_print(f"\n📋 Found {result.prod_count} {ecosystem_name} packages", "info") @@ -1958,7 +2091,7 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: console.print(f"Completed in {result.total_duration:.2f} seconds") return 0 else: - self._print_error("Installation failed") + self._print_error(self.INSTALL_FAIL_MSG) if result.error_message: console.print(f"Error: {result.error_message}", style="red") return 1 @@ -1994,9 +2127,9 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: return 0 else: if result.failed_step is not None: - self._print_error(f"\nInstallation failed at step {result.failed_step + 1}") + self._print_error(f"\n{self.INSTALL_FAIL_MSG} at step {result.failed_step + 1}") else: - self._print_error("\nInstallation failed") + self._print_error(f"\n{self.INSTALL_FAIL_MSG}") if result.error_message: console.print(f"Error: {result.error_message}", style="red") return 1 @@ -2027,10 +2160,12 @@ def show_rich_help(): # Command Rows table.add_row("ask ", "Ask about your system") + table.add_row("voice", "Voice input mode (F9 to speak)") table.add_row("demo", "See Cortex in action") table.add_row("wizard", "Configure API key") table.add_row("status", "System status") table.add_row("install ", "Install software") + table.add_row("install --mic", "Install via voice input") table.add_row("import ", "Import deps from package files") table.add_row("history", "View history") table.add_row("rollback ", "Undo installation") @@ -2122,21 +2257,54 @@ def main(): ) # Demo command - demo_parser = subparsers.add_parser("demo", help="See Cortex in action") + subparsers.add_parser("demo", help="See Cortex in action") # Wizard command - wizard_parser = subparsers.add_parser("wizard", help="Configure API key interactively") + subparsers.add_parser("wizard", help="Configure API key interactively") # Status command (includes comprehensive health checks) subparsers.add_parser("status", help="Show comprehensive system status and health checks") # Ask command ask_parser = subparsers.add_parser("ask", help="Ask a question about your system") - ask_parser.add_argument("question", type=str, help="Natural language question") + ask_parser.add_argument("question", nargs="?", type=str, help="Natural language question") + ask_parser.add_argument( + "--mic", + action="store_true", + help="Use voice input (press F9 to record)", + ) + + # Voice command - continuous voice mode + voice_parser = subparsers.add_parser( + "voice", help="Voice input mode (F9 to speak, Ctrl+C to exit)" + ) + voice_parser.add_argument( + "--single", + "-s", + action="store_true", + help="Record single input and exit (default: continuous mode)", + ) + voice_parser.add_argument( + "--model", + "-m", + type=str, + choices=[ + "tiny.en", + "base.en", + "small.en", + "medium.en", + "tiny", + "base", + "small", + "medium", + "large", + ], + help="Whisper model (default: base.en). Higher models = better accuracy but more storage.", + ) # Install command install_parser = subparsers.add_parser("install", help="Install software") - install_parser.add_argument("software", type=str, help="Software to install") + install_parser.add_argument("software", nargs="?", type=str, help="Software to install") install_parser.add_argument("--execute", action="store_true", help="Execute commands") install_parser.add_argument("--dry-run", action="store_true", help="Show commands only") install_parser.add_argument( @@ -2144,6 +2312,11 @@ def main(): action="store_true", help="Enable parallel execution for multi-step installs", ) + install_parser.add_argument( + "--mic", + action="store_true", + help="Use voice input for software name (press F9 to record)", + ) # Import command - import dependencies from package manager files import_parser = subparsers.add_parser( @@ -2502,11 +2675,53 @@ def main(): return cli.wizard() elif args.command == "status": return cli.status() + elif args.command == "voice": + model = getattr(args, "model", None) + return cli.voice(continuous=not getattr(args, "single", False), model=model) elif args.command == "ask": + # Handle --mic flag for voice input + if getattr(args, "mic", False): + return cli.voice(continuous=False) + if not args.question: + cli._print_error("Please provide a question or use --mic for voice input") + return 1 return cli.ask(args.question) elif args.command == "install": + # Handle --mic flag for voice input + if getattr(args, "mic", False): + handler = None + try: + from cortex.voice import VoiceInputError, VoiceInputHandler + + handler = VoiceInputHandler() + cx_print("Press F9 to speak what you want to install...", "info") + software = handler.record_single() + if not software: + cx_print("No speech detected.", "warning") + return 1 + cx_print(f"Installing: {software}", "info") + except ImportError: + cli._print_error("Voice dependencies not installed.") + cx_print("Install with: pip install cortex-linux[voice]", "info") + return 1 + except VoiceInputError as e: + cli._print_error(f"Voice input error: {e}") + return 1 + finally: + # Always clean up resources + if handler is not None: + try: + handler.stop() + except Exception as e: + # Log cleanup errors but don't raise + logging.debug("Error during voice handler cleanup: %s", e) + else: + software = args.software + if not software: + cli._print_error("Please provide software name or use --mic for voice input") + return 1 return cli.install( - args.software, + software, execute=args.execute, dry_run=args.dry_run, parallel=args.parallel, diff --git a/cortex/voice.py b/cortex/voice.py new file mode 100644 index 00000000..cd1ad4fb --- /dev/null +++ b/cortex/voice.py @@ -0,0 +1,580 @@ +""" +Cortex Linux Voice Input Module + +Provides voice command capability using faster-whisper for speech-to-text. +Supports push-to-talk (F9 by default) for low-latency voice input. +""" + +import logging +import os +import threading +import time +from collections.abc import Callable +from pathlib import Path +from typing import Any + +from cortex.branding import console, cx_print + + +class VoiceInputError(Exception): + """Base exception for voice input errors.""" + + pass + + +class MicrophoneNotFoundError(VoiceInputError): + """Raised when no microphone is available.""" + + pass + + +class ModelNotFoundError(VoiceInputError): + """Raised when the whisper model cannot be loaded.""" + + pass + + +class VoiceInputHandler: + """Handles voice input with push-to-talk and speech-to-text transcription. + + Uses faster-whisper for efficient, accurate transcription with minimal + resource usage. Supports F9 push-to-talk hotkey by default. + + Attributes: + model_name: Whisper model to use (base.en, small.en, medium.en) + sample_rate: Audio sample rate in Hz (default: 16000) + hotkey: Push-to-talk hotkey (default: f9) + """ + + def __init__( + self, + model_name: str | None = None, + sample_rate: int = 16000, + hotkey: str = "f9", + model_dir: str | None = None, + ): + """Initialize the voice input handler. + + Args: + model_name: Whisper model name (base.en, small.en, medium.en). + Defaults to CORTEX_WHISPER_MODEL env var or 'base.en'. + sample_rate: Audio sample rate in Hz. Default 16000. + hotkey: Push-to-talk hotkey. Default 'f9'. + model_dir: Directory to store whisper models. + Defaults to ~/.cortex/models/ + """ + self.model_name = model_name or os.environ.get("CORTEX_WHISPER_MODEL", "base.en") + self.sample_rate = sample_rate + self.hotkey = hotkey.lower() + self.model_dir = model_dir or str(Path.home() / ".cortex" / "models") + + # Recording state + self._is_recording = False + self._audio_buffer: list[Any] = [] # numpy arrays when recording + self._recording_thread: threading.Thread | None = None + self._stop_recording = threading.Event() + self._stream = None + + # Whisper model (lazy loaded) + self._model = None + + # Hotkey listener + self._hotkey_listener = None + self._hotkey_callback: Callable[[str], None] | None = None + + def _ensure_dependencies(self) -> bool: + """Check if voice dependencies are installed. + + Returns: + True if all dependencies are available, False otherwise. + """ + missing = [] + + try: + import sounddevice # noqa: F401 + except ImportError: + missing.append("sounddevice") + + try: + import faster_whisper # noqa: F401 + except ImportError: + missing.append("faster-whisper") + + try: + from pynput import keyboard # noqa: F401 + except ImportError: + missing.append("pynput") + + if missing: + cx_print( + f"Missing voice dependencies: {', '.join(missing)}", + "error", + ) + cx_print( + "Install with: pip install cortex-linux[voice]", + "info", + ) + cx_print( + f"Or: pip install {' '.join(missing)}", + "info", + ) + return False + + return True + + def _load_model(self) -> None: + """Load the whisper model. + + Raises: + ModelNotFoundError: If model cannot be loaded. + """ + from faster_whisper import WhisperModel + + # Model sizes in MB (int8 quantized) with accuracy descriptions + model_info = { + "tiny.en": {"size": 39, "desc": "fastest, good for clear speech"}, + "base.en": {"size": 140, "desc": "balanced speed/accuracy"}, + "small.en": {"size": 466, "desc": "better accuracy"}, + "medium.en": {"size": 1534, "desc": "high accuracy"}, + "tiny": {"size": 39, "desc": "fastest, multilingual"}, + "base": {"size": 290, "desc": "balanced, multilingual"}, + "small": {"size": 968, "desc": "better accuracy, multilingual"}, + "medium": {"size": 3090, "desc": "high accuracy, multilingual"}, + "large": {"size": 6000, "desc": "best accuracy, multilingual"}, + } + + info = model_info.get(self.model_name, {"size": "unknown", "desc": ""}) + size_str = f"{info['size']} MB" if isinstance(info["size"], int) else info["size"] + desc_str = f" - {info['desc']}" if info["desc"] else "" + + cx_print( + f"Loading whisper model '{self.model_name}' ({size_str}{desc_str})...", + "info", + ) + + # Ensure model directory exists + os.makedirs(self.model_dir, exist_ok=True) + + try: + # Show download progress with progress bar + from rich.progress import Progress + + with Progress() as progress: + task = progress.add_task( + f"[cyan]Downloading {self.model_name}...", + total=None, + ) + + self._model = WhisperModel( + self.model_name, + device="cpu", + compute_type="int8", + download_root=self.model_dir, + ) + progress.update(task, completed=True) + + cx_print( + f"✓ Model '{self.model_name}' loaded successfully.", + "success", + ) + if info["desc"]: + cx_print( + f" {info['desc'].capitalize()} | Size: {size_str} | Tip: Use --model flag to try different models", + "dim", + ) + except Exception as e: + raise ModelNotFoundError( + f"Failed to load whisper model '{self.model_name}': {e}" + ) from e + + def _check_microphone(self) -> bool: + """Check if a microphone is available. + + Returns: + True if microphone is available, False otherwise. + """ + import sounddevice as sd + + try: + devices = sd.query_devices() + input_devices = [d for d in devices if d["max_input_channels"] > 0] + + if not input_devices: + cx_print("No microphone found. Please connect a microphone.", "error") + return False + + default = sd.query_devices(kind="input") + cx_print(f"Using microphone: {default['name']}", "info") + return True + + except Exception as e: + cx_print(f"Error checking microphone: {e}", "error") + return False + + def _start_recording(self) -> None: + """Start recording audio from microphone.""" + import numpy as np # Import locally for optional dependency + import sounddevice as sd + + self._audio_buffer = [] + self._stop_recording.clear() + self._is_recording = True + self._numpy = np # Store for use in callback + + def audio_callback(indata, frames, time_info, status): + if status: + logging.debug("Audio status: %s", status) + if self._is_recording: + # Limit buffer size to prevent memory issues (max ~60 seconds) + if len(self._audio_buffer) < 60 * self.sample_rate // 1024: + self._audio_buffer.append(indata.copy()) + else: + self._stop_recording.set() + + try: + self._stream = sd.InputStream( + samplerate=self.sample_rate, + channels=1, + dtype=self._numpy.float32, + callback=audio_callback, + blocksize=1024, + ) + self._stream.start() + except PermissionError as e: + self._is_recording = False + raise MicrophoneNotFoundError( + "Permission denied to access microphone. " + "On Linux, add user to 'audio' group: sudo usermod -a -G audio $USER" + ) from e + except Exception as e: + self._is_recording = False + raise MicrophoneNotFoundError(f"Failed to start recording: {e}") from e + + def _stop_recording_stream(self) -> Any: + """Stop recording and return the audio data. + + Returns: + Numpy array of recorded audio samples. + """ + import numpy as np + + self._is_recording = False + + if hasattr(self, "_stream") and self._stream: + try: + self._stream.stop() + self._stream.close() + except Exception as e: + logging.debug("Error closing stream: %s", e) + finally: + self._stream = None + + if not self._audio_buffer: + return np.array([], dtype=np.float32) + + # Concatenate all audio chunks + try: + audio_data = np.concatenate(self._audio_buffer, axis=0) + return audio_data.flatten() + finally: + # Always clear buffer to prevent memory leaks + self._audio_buffer = [] + + def transcribe(self, audio_data: Any) -> str: + """Transcribe audio data to text. + + Args: + audio_data: Numpy array of audio samples (float32, mono). + + Returns: + Transcribed text string. + + Raises: + ModelNotFoundError: If model is not loaded. + """ + import numpy as np + + if self._model is None: + self._load_model() + + if len(audio_data) == 0: + return "" + + # faster-whisper expects float32 audio normalized to [-1, 1] + if audio_data.dtype != np.float32: + audio_data = audio_data.astype(np.float32) + + # Model should be loaded at this point + if self._model is None: + raise ModelNotFoundError("Model must be loaded before transcription") + + segments, _ = self._model.transcribe( + audio_data, + beam_size=5, + language="en", + vad_filter=True, + vad_parameters={ + "min_silence_duration_ms": 300, + "speech_pad_ms": 200, + }, + condition_on_previous_text=False, # Prevents repetition + no_speech_threshold=0.6, + ) + + # Collect all segment texts + text_parts = [] + for segment in segments: + text_parts.append(segment.text.strip()) + + return " ".join(text_parts).strip() + + def record_and_transcribe(self) -> str: + """Record audio until stopped and transcribe it. + + This is a blocking call that records until _stop_recording is set. + + Returns: + Transcribed text from the recording. + """ + self._start_recording() + + # Wait for stop signal + self._stop_recording.wait() + + # Get audio and transcribe + audio_data = self._stop_recording_stream() + + if len(audio_data) < self.sample_rate * 0.5: # Less than 0.5 seconds + return "" + + cx_print("Transcribing...", "thinking") + text = self.transcribe(audio_data) + + return text + + def _recording_indicator(self) -> None: + """Show a recording indicator with animated dots.""" + dots = 0 + indicators = ["●○○", "●●○", "●●●", "○●●", "○○●", "○○○"] + while self._is_recording: + indicator = indicators[dots % len(indicators)] + console.print( + f"Recording {indicator} (Press {self.hotkey.upper()} to stop)", + end="\r", + ) + dots += 1 + time.sleep(0.2) + # Clear the line + console.print(" " * 70, end="\r") + + def _get_hotkey_key(self): + """Get the pynput key object for the configured hotkey.""" + from pynput import keyboard + + # Map hotkey string to pynput key + hotkey_map = { + "f1": keyboard.Key.f1, + "f2": keyboard.Key.f2, + "f3": keyboard.Key.f3, + "f4": keyboard.Key.f4, + "f5": keyboard.Key.f5, + "f6": keyboard.Key.f6, + "f7": keyboard.Key.f7, + "f8": keyboard.Key.f8, + "f9": keyboard.Key.f9, + "f10": keyboard.Key.f10, + "f11": keyboard.Key.f11, + "f12": keyboard.Key.f12, + "pause": keyboard.Key.pause, + "insert": keyboard.Key.insert, + "home": keyboard.Key.home, + "end": keyboard.Key.end, + "pageup": keyboard.Key.page_up, + "pagedown": keyboard.Key.page_down, + } + + return hotkey_map.get(self.hotkey) + + def _setup_hotkey(self, on_transcription: Callable[[str], None]) -> None: + """Set up the push-to-talk hotkey listener. + + Args: + on_transcription: Callback function called with transcribed text. + """ + from pynput import keyboard + + self._hotkey_callback = on_transcription + recording_lock = threading.Lock() + target_key = self._get_hotkey_key() + + if target_key is None: + cx_print(f"Unknown hotkey: {self.hotkey}. Using F9.", "warning") + target_key = keyboard.Key.f9 + self.hotkey = "f9" + + def on_press(key): + if key == target_key: + with recording_lock: + if not self._is_recording: + # Start recording - set flag BEFORE starting thread + self._is_recording = True + self._stop_recording.clear() + + # Start indicator thread + indicator_thread = threading.Thread( + target=self._recording_indicator, + daemon=True, + ) + indicator_thread.start() + + # Start recording thread + self._recording_thread = threading.Thread( + target=self._recording_worker, + daemon=True, + ) + self._recording_thread.start() + else: + # Stop recording + self._stop_recording.set() + + listener = keyboard.Listener(on_press=on_press) + self._hotkey_listener = listener + listener.start() + + def _recording_worker(self) -> None: + """Worker thread for recording and transcription.""" + try: + text = self.record_and_transcribe() + + if text and self._hotkey_callback: + console.print(f"\n[bold cyan]Heard:[/bold cyan] {text}\n") + self._hotkey_callback(text) + elif not text: + cx_print("No speech detected. Try speaking louder or closer to the mic.", "warning") + + except Exception as e: + cx_print(f"Recording error: {e}", "error") + finally: + self._is_recording = False + + def start_voice_mode(self, on_transcription: Callable[[str], None]) -> None: + """Start continuous voice input mode. + + Listens for the hotkey and transcribes speech when triggered. + + Args: + on_transcription: Callback called with transcribed text. + """ + if not self._ensure_dependencies(): + return + + if not self._check_microphone(): + return + + # Pre-load the model + try: + self._load_model() + except ModelNotFoundError as e: + cx_print(str(e), "error") + return + + cx_print( + f"Voice mode active. Press {self.hotkey.upper()} to speak, Ctrl+C to exit.", "success" + ) + cx_print("Listening...", "info") + + self._setup_hotkey(on_transcription) + + try: + # Keep the main thread alive + while True: + time.sleep(0.1) + except KeyboardInterrupt: + cx_print("\nVoice mode exited.", "info") + finally: + self.stop() + + def record_single(self) -> str: + """Record a single voice input and return the transcribed text. + + This is a blocking call that waits for the user to press the hotkey + to start and stop recording. + + Returns: + Transcribed text from the recording. + """ + if not self._ensure_dependencies(): + return "" + + if not self._check_microphone(): + return "" + + # Pre-load the model + try: + self._load_model() + except ModelNotFoundError as e: + cx_print(str(e), "error") + return "" + + cx_print(f"Press {self.hotkey.upper()} to start recording...", "info") + + result = {"text": ""} + done_event = threading.Event() + + def on_transcription(text: str) -> None: + result["text"] = text + done_event.set() + + self._setup_hotkey(on_transcription) + + try: + # Wait for transcription to complete + done_event.wait() + except KeyboardInterrupt: + cx_print("\nCancelled.", "info") + finally: + self.stop() + + return result["text"] + + def stop(self) -> None: + """Stop the voice input handler and clean up resources.""" + self._is_recording = False + self._stop_recording.set() + + if self._hotkey_listener: + try: + self._hotkey_listener.stop() + except Exception as e: + logging.debug("Error stopping hotkey listener: %s", e) + self._hotkey_listener = None + + if hasattr(self, "_stream") and self._stream: + try: + self._stream.stop() + self._stream.close() + except OSError as e: + logging.debug("Error closing audio stream: %s", e) + self._stream = None + + +def get_voice_handler( + model_name: str | None = None, + sample_rate: int = 16000, + hotkey: str = "f9", +) -> VoiceInputHandler: + """Factory function to create a VoiceInputHandler. + + Args: + model_name: Whisper model name. Defaults to env var or 'base.en'. + sample_rate: Audio sample rate. Default 16000. + hotkey: Push-to-talk hotkey. Default 'f9'. + + Returns: + Configured VoiceInputHandler instance. + """ + return VoiceInputHandler( + model_name=model_name, + sample_rate=sample_rate, + hotkey=hotkey, + ) diff --git a/docs/COMMANDS.md b/docs/COMMANDS.md index 9e6e9a25..166a5f45 100644 --- a/docs/COMMANDS.md +++ b/docs/COMMANDS.md @@ -8,6 +8,7 @@ This document provides a comprehensive reference for all commands available in t |---------|-------------| | `cortex` | Show help and available commands | | `cortex install ` | Install software | +| `cortex voice` | Voice input mode (hands-free with F9) | | `cortex demo` | See Cortex in action | | `cortex wizard` | Configure API key | | `cortex status` | Show comprehensive system status and health checks | diff --git a/docs/VOICE_INPUT.md b/docs/VOICE_INPUT.md new file mode 100644 index 00000000..a0c90839 --- /dev/null +++ b/docs/VOICE_INPUT.md @@ -0,0 +1,261 @@ +# Voice Input for Cortex + +Cortex supports voice commands using speech-to-text, allowing you to install software and ask questions using your voice. + +## Quick Start + +```bash +# Install voice dependencies +pip install cortex-linux[voice] + +# Start voice mode +cortex voice + +# Or use voice for a single command +cortex install --mic +``` + +## Requirements + +- **Python 3.10+** +- **Microphone** - Any USB or built-in microphone +- **Voice dependencies** - Installed separately (see below) + +## Installation + +Voice support is an optional feature. Install the voice dependencies with: + +```bash +pip install cortex-linux[voice] +``` + +Or install dependencies individually: + +```bash +pip install faster-whisper sounddevice pynput numpy +``` + +**Note:** On Linux, you may need to install PortAudio for audio support: +```bash +# Ubuntu/Debian +sudo apt install libportaudio2 portaudio19-dev + +# Fedora +sudo dnf install portaudio portaudio-devel +``` + +### First Run + +On first use, Cortex will download the Whisper model (~150MB for `base.en`). This happens automatically and is stored in `~/.cortex/models/`. + +## Usage + +### Voice Mode (Continuous) + +Enter continuous voice mode where you can speak multiple commands: + +```bash +cortex voice +``` + +**Controls:** +- **F9** - Start/stop recording +- **Ctrl+C** - Exit voice mode + +**Example session:** +``` +$ cortex voice +CX ✓ Voice mode active. Press F9 to speak, Ctrl+C to exit. +CX │ Listening... + +[Press F9] +CX │ Recording ●●○ (Press F9 to stop) + +[Speak: "Install nginx"] +[Press F9] + +CX ⠋ Transcribing... +Heard: Install nginx + +CX │ Installing: nginx +CX ⠋ Understanding request... +... +``` + +### Single Voice Command + +Use `--mic` flag for a single voice input: + +```bash +# Install via voice +cortex install --mic + +# Ask a question via voice +cortex ask --mic +``` + +### Single Recording Mode + +Record one command and exit: + +```bash +cortex voice --single +``` + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `CORTEX_WHISPER_MODEL` | `base.en` | Whisper model to use | + +### Available Models + +| Model | Size | Speed | Accuracy | +|-------|------|-------|----------| +| `base.en` | ~150MB | Fast | Good (default, recommended) | +| `small.en` | ~500MB | Medium | Better | +| `medium.en` | ~1.5GB | Slow | Best | + +Set your preferred model for higher accuracy: + +```bash +export CORTEX_WHISPER_MODEL=small.en +``` + +### Config File + +Add to `~/.cortex/config.yaml`: + +```yaml +voice: + model: "base.en" + hotkey: "f9" + sample_rate: 16000 +``` + +## How It Works + +1. **Hotkey Detection** - Uses `pynput` library to listen for F9 (no root required) +2. **Audio Capture** - Records via `sounddevice` at 16kHz mono +3. **Speech-to-Text** - Transcribes using `faster-whisper` (OpenAI Whisper optimized) +4. **Command Processing** - Passes transcribed text to Cortex LLM interpreter +5. **Execution** - Normal Cortex workflow (dry-run by default) + +``` +┌──────────────┐ ┌──────────────┐ ┌──────────────┐ +│ F9 │───>│ Record │───>│ Transcribe │ +│ Hotkey │ │ Audio │ │ (Whisper) │ +└──────────────┘ └──────────────┘ └──────────────┘ + │ + ▼ +┌──────────────┐ ┌──────────────┐ ┌──────────────┐ +│ Execute │<───│ Generate │<───│ LLM Parse │ +│ Commands │ │ Commands │ │ Request │ +└──────────────┘ └──────────────┘ └──────────────┘ +``` + +## Troubleshooting + +### "No microphone found" + +**Linux:** +```bash +# Check ALSA devices +arecord -l + +# Install ALSA utilities +sudo apt install alsa-utils pulseaudio +``` + +**macOS:** +- Check System Preferences > Security & Privacy > Microphone +- Grant terminal app microphone access + +### "Voice dependencies not installed" + +```bash +pip install cortex-linux[voice] +``` + +### "Model download failed" + +Check internet connection and try: +```bash +# Manually download model +python -c "from faster_whisper import WhisperModel; WhisperModel('base.en')" +``` + +### Recording quality issues + +- Speak clearly and at normal volume +- Reduce background noise +- Position microphone 6-12 inches from mouth +- Try a different microphone + +### Hotkey not working + +On Linux, you may need to run with elevated permissions or use X11: +```bash +# Check if running in Wayland (hotkeys may not work) +echo $XDG_SESSION_TYPE + +# For Wayland, consider using X11 or alternative input method +``` + +## Privacy + +- **Local Processing** - All speech-to-text happens locally on your machine +- **No Audio Uploads** - Audio is never sent to external servers +- **Model Storage** - Whisper models stored in `~/.cortex/models/` + +## Limitations + +- English language only (using `.en` models) +- Requires ~150MB-1.5GB disk space for models +- CPU-based inference (no GPU acceleration by default) +- Push-to-talk only (no continuous listening for privacy) + +## API Reference + +### VoiceInputHandler + +```python +from cortex.voice import VoiceInputHandler + +# Create handler +handler = VoiceInputHandler( + model_name="base.en", # default + sample_rate=16000, + hotkey="f9", +) + +# Single recording +text = handler.record_single() + +# Continuous mode +def on_transcription(text): + print(f"You said: {text}") + +handler.start_voice_mode(on_transcription) +``` + +### Factory Function + +```python +from cortex.voice import get_voice_handler + +handler = get_voice_handler( + model_name="base.en", + sample_rate=16000, + hotkey="f9", +) +``` + +## See Also + +- [Getting Started Guide](guides/Getting-Started.md) +- [CLI Commands Reference](COMMANDS.md) +- [Configuration Guide](CONFIGURATION.md) + diff --git a/pyproject.toml b/pyproject.toml index e59f5b83..a5bb0866 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,8 +69,14 @@ docs = [ "mkdocs-material>=9.0.0", "mkdocstrings[python]>=0.24.0", ] +voice = [ + "faster-whisper>=0.10.0", + "sounddevice>=0.4.6", + "pynput>=1.7.6", + "numpy>=1.24.0", +] all = [ - "cortex-linux[dev,security,docs]", + "cortex-linux[dev,security,docs,voice]", ] [project.scripts] diff --git a/requirements.txt b/requirements.txt index 166a777e..856e8486 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,9 +17,20 @@ cryptography>=42.0.0 # Terminal UI rich>=13.0.0 -# Configuration -pyyaml>=6.0.0 - # Type hints for older Python versions typing-extensions>=4.0.0 -PyYAML==6.0.3 +# ============================================================================= +# OPTIONAL: Voice Input Feature +# ============================================================================= +# Install with: pip install cortex-linux[voice] +# Or in development: pip install -e ".[voice]" +# +# Voice feature includes: +# - faster-whisper>=0.10.0 (speech-to-text) +# - sounddevice>=0.4.6 (audio recording) +# - pynput>=1.7.6 (hotkey support) +# - numpy>=1.24.0 (audio processing) +# +# System dependencies (Ubuntu/Debian): +# sudo apt update && sudo apt install -y libportaudio2 portaudio19-dev libasound2-dev +# ============================================================================= \ No newline at end of file diff --git a/setup.py b/setup.py index 3a218042..8b0680d3 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,14 @@ ], python_requires=">=3.10", install_requires=requirements, + extras_require={ + "voice": [ + "faster-whisper>=0.10.0", + "sounddevice>=0.4.6", + "pynput>=1.7.6", + "numpy>=1.24.0", + ], + }, entry_points={ "console_scripts": [ "cortex=cortex.cli:main", diff --git a/tests/integration/test_end_to_end.py b/tests/integration/test_end_to_end.py index 00776095..5db268dd 100644 --- a/tests/integration/test_end_to_end.py +++ b/tests/integration/test_end_to_end.py @@ -17,6 +17,7 @@ "PYTHONPATH": "/workspace", "PYTHONDONTWRITEBYTECODE": "1", } +# Basic pip bootstrap without voice dependencies PIP_BOOTSTRAP = "python -m pip install --quiet --upgrade pip setuptools && python -m pip install --quiet --no-cache-dir -r /workspace/requirements.txt" PIP_BOOTSTRAP_DEV = "python -m pip install --quiet --upgrade pip setuptools && python -m pip install --quiet --no-cache-dir -r /workspace/requirements.txt -r /workspace/requirements-dev.txt" diff --git a/tests/test_api_key_detector.py b/tests/test_api_key_detector.py index f67a17e6..e6aafa1d 100644 --- a/tests/test_api_key_detector.py +++ b/tests/test_api_key_detector.py @@ -159,10 +159,11 @@ def test_no_key_found(self, detector): """Test when no key is found.""" with patch.dict(os.environ, {}, clear=True): with patch("pathlib.Path.home", return_value=Path("/nonexistent")): - found, key, provider, _ = detector.detect() - assert found is False - assert key is None - assert provider is None + with patch("pathlib.Path.cwd", return_value=Path("/nonexistent")): + found, key, provider, _ = detector.detect() + assert found is False + assert key is None + assert provider is None def test_extract_key_from_env_file(self, detector): """Test extracting key from .env format file.""" diff --git a/tests/test_cli.py b/tests/test_cli.py index bed29ab4..274b79f7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -43,9 +43,10 @@ def test_get_api_key_not_found(self, mock_stderr): with patch.dict(os.environ, {}, clear=True): with patch("pathlib.Path.home", return_value=self._temp_home): - with patch("builtins.input", return_value=PROVIDER_MENU_CHOICES["ollama"]): - api_key = self.cli._get_api_key() - self.assertEqual(api_key, "ollama-local") + with patch("pathlib.Path.cwd", return_value=self._temp_home): + with patch("builtins.input", return_value=PROVIDER_MENU_CHOICES["ollama"]): + api_key = self.cli._get_api_key() + self.assertEqual(api_key, "ollama-local") def test_get_provider_openai(self): with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True): diff --git a/tests/test_cli_extended.py b/tests/test_cli_extended.py index 173d7a7d..26334307 100644 --- a/tests/test_cli_extended.py +++ b/tests/test_cli_extended.py @@ -46,9 +46,10 @@ def test_get_api_key_not_found(self) -> None: with patch.dict(os.environ, {}, clear=True): with patch("pathlib.Path.home", return_value=self._temp_home): - with patch("builtins.input", return_value=PROVIDER_MENU_CHOICES["ollama"]): - api_key = self.cli._get_api_key() - self.assertEqual(api_key, "ollama-local") + with patch("pathlib.Path.cwd", return_value=self._temp_home): + with patch("builtins.input", return_value=PROVIDER_MENU_CHOICES["ollama"]): + api_key = self.cli._get_api_key() + self.assertEqual(api_key, "ollama-local") def test_get_provider_openai(self) -> None: with patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}, clear=True): diff --git a/tests/test_ollama_integration.py b/tests/test_ollama_integration.py index f5b0a1ef..68a26636 100755 --- a/tests/test_ollama_integration.py +++ b/tests/test_ollama_integration.py @@ -13,6 +13,7 @@ """ import os +import shutil import subprocess import sys from pathlib import Path @@ -91,8 +92,7 @@ def is_ollama_running() -> bool: def check_ollama_installed(): """Check if Ollama is installed.""" print("1. Checking Ollama installation...") - result = subprocess.run(["which", "ollama"], capture_output=True) - if result.returncode == 0: + if shutil.which("ollama") is not None: print(" ✓ Ollama is installed") return True else: diff --git a/tests/test_voice.py b/tests/test_voice.py new file mode 100644 index 00000000..9f4a49df --- /dev/null +++ b/tests/test_voice.py @@ -0,0 +1,318 @@ +"""Tests for the voice input module.""" + +from unittest.mock import MagicMock, patch + +import pytest + +# Skip all tests if voice dependencies are not installed +np = pytest.importorskip("numpy", reason="numpy not installed (voice dependencies required)") + +from cortex.voice import ( # noqa: E402 + MicrophoneNotFoundError, + ModelNotFoundError, + VoiceInputError, + VoiceInputHandler, +) + + +class TestVoiceInputHandler: + """Test suite for VoiceInputHandler class.""" + + @pytest.fixture + def mock_dependencies(self): + """Mock all voice dependencies.""" + with patch.dict( + "sys.modules", + { + "sounddevice": MagicMock(), + "faster_whisper": MagicMock(), + "pynput": MagicMock(), + "pynput.keyboard": MagicMock(), + }, + ): + yield + + @pytest.fixture + def handler(self, mock_dependencies): + """Create a VoiceInputHandler instance with mocked dependencies.""" + return VoiceInputHandler( + model_name="base.en", + sample_rate=16000, + hotkey="f9", + ) + + def test_init_defaults(self, mock_dependencies): + """Test VoiceInputHandler initialization with defaults.""" + handler = VoiceInputHandler() + assert handler.model_name == "base.en" + assert handler.sample_rate == 16000 + assert handler.hotkey == "f9" + assert handler._model is None + assert handler._is_recording is False + + def test_init_custom_params(self, mock_dependencies): + """Test VoiceInputHandler initialization with custom parameters.""" + handler = VoiceInputHandler( + model_name="base.en", + sample_rate=44100, + hotkey="ctrl+m", + model_dir="/custom/path", + ) + assert handler.model_name == "base.en" + assert handler.sample_rate == 44100 + assert handler.hotkey == "ctrl+m" + assert handler.model_dir == "/custom/path" + + def test_init_with_env_var(self, mock_dependencies, monkeypatch): + """Test model name from environment variable.""" + monkeypatch.setenv("CORTEX_WHISPER_MODEL", "small.en") + handler = VoiceInputHandler() + assert handler.model_name == "small.en" + + def test_init_hotkey_from_env_var(self, mock_dependencies, monkeypatch): + """Test hotkey configuration from environment variable.""" + # Test default hotkey + handler = VoiceInputHandler() + assert handler.hotkey == "f9" + + # Test custom hotkey from constructor + handler = VoiceInputHandler(hotkey="f10") + assert handler.hotkey == "f10" + + # Test lowercase normalization + handler = VoiceInputHandler(hotkey="F11") + assert handler.hotkey == "f11" + + def test_ensure_dependencies_all_present(self, handler): + """Test _ensure_dependencies when all deps are installed.""" + with patch.dict( + "sys.modules", + { + "sounddevice": MagicMock(), + "faster_whisper": MagicMock(), + "pynput": MagicMock(), + "pynput.keyboard": MagicMock(), + }, + ): + result = handler._ensure_dependencies() + assert result is True + + def test_ensure_dependencies_missing(self, handler): + """Test _ensure_dependencies when deps are missing.""" + # Test that ensure_dependencies returns False when import fails + with patch.object(handler, "_ensure_dependencies") as mock_deps: + mock_deps.return_value = False + result = handler._ensure_dependencies() + assert result is False + + def test_check_microphone_available(self): + """Test microphone check when device is available.""" + mock_sd = MagicMock() + mock_devices = [{"max_input_channels": 2, "name": "Test Mic"}] + mock_sd.query_devices.return_value = mock_devices + mock_sd.query_devices.side_effect = lambda kind=None: ( + {"name": "Test Mic", "max_input_channels": 2} if kind == "input" else mock_devices + ) + + with patch.dict("sys.modules", {"sounddevice": mock_sd}): + with patch("cortex.voice.cx_print"): + # Import fresh to get mocked module + import importlib + + import cortex.voice + + importlib.reload(cortex.voice) + handler = cortex.voice.VoiceInputHandler() + result = handler._check_microphone() + assert result is True + + def test_check_microphone_not_available(self): + """Test microphone check when no device available.""" + mock_sd = MagicMock() + mock_sd.query_devices.return_value = [] + + with patch.dict("sys.modules", {"sounddevice": mock_sd}): + with patch("cortex.voice.cx_print"): + import importlib + + import cortex.voice + + importlib.reload(cortex.voice) + handler = cortex.voice.VoiceInputHandler() + result = handler._check_microphone() + assert result is False + + def test_transcribe_empty_audio(self, handler): + """Test transcription with empty audio data.""" + handler._model = MagicMock() + result = handler.transcribe(np.array([], dtype=np.float32)) + assert result == "" + + def test_transcribe_with_audio(self, handler): + """Test transcription with valid audio data.""" + # Mock the model + mock_segment = MagicMock() + mock_segment.text = " Hello world " + mock_info = MagicMock() + + handler._model = MagicMock() + handler._model.transcribe.return_value = ([mock_segment], mock_info) + + audio_data = np.random.randn(16000).astype(np.float32) # 1 second of audio + result = handler.transcribe(audio_data) + assert result == "Hello world" + + def test_transcribe_loads_model_if_needed(self, handler): + """Test that transcribe loads model if not loaded.""" + # Ensure model is None initially to test lazy loading + handler._model = None + + mock_segment = MagicMock() + mock_segment.text = "test" + mock_model = MagicMock() + mock_model.transcribe.return_value = ([mock_segment], MagicMock()) + + # Mock _load_model to set up the mock model + def setup_model(): + handler._model = mock_model + + with patch.object(handler, "_load_model", side_effect=setup_model) as mock_load: + audio_data = np.random.randn(16000).astype(np.float32) + result = handler.transcribe(audio_data) + # _load_model should be called since _model was None + mock_load.assert_called_once() + assert result == "test" + + def test_stop_cleans_up_resources(self, handler): + """Test that stop() properly cleans up resources.""" + handler._is_recording = True + mock_listener = MagicMock() + mock_stream = MagicMock() + handler._hotkey_listener = mock_listener + handler._stream = mock_stream + + handler.stop() + + assert handler._is_recording is False + mock_listener.stop.assert_called_once() + assert handler._hotkey_listener is None + mock_stream.stop.assert_called_once() + mock_stream.close.assert_called_once() + + def test_stop_handles_missing_stream(self, handler): + """Test that stop() handles case when stream doesn't exist.""" + handler._is_recording = True + handler._hotkey_listener = None + # No _stream attribute + + # Should not raise + handler.stop() + assert handler._is_recording is False + + def test_stop_handles_stream_error(self, handler): + """Test that stop() handles stream close errors gracefully.""" + handler._is_recording = True + handler._hotkey_listener = None + handler._stream = MagicMock() + handler._stream.close.side_effect = OSError("Stream error") + + # Should not raise, just log + handler.stop() + assert handler._stream is None + + +class TestVoiceInputExceptions: + """Test voice input exception classes.""" + + def test_voice_input_error(self): + """Test VoiceInputError exception.""" + with pytest.raises(VoiceInputError): + raise VoiceInputError("Test error") + + def test_microphone_not_found_error(self): + """Test MicrophoneNotFoundError exception.""" + error = MicrophoneNotFoundError("No mic") + assert isinstance(error, VoiceInputError) + + def test_model_not_found_error(self): + """Test ModelNotFoundError exception.""" + error = ModelNotFoundError("Model missing") + assert isinstance(error, VoiceInputError) + + +class TestGetVoiceHandler: + """Test the factory function.""" + + def test_get_voice_handler_defaults(self): + """Test get_voice_handler with default parameters.""" + with patch.dict( + "sys.modules", + { + "sounddevice": MagicMock(), + "faster_whisper": MagicMock(), + "pynput": MagicMock(), + "pynput.keyboard": MagicMock(), + }, + ): + from cortex.voice import get_voice_handler + + handler = get_voice_handler() + assert handler.model_name == "base.en" + assert handler.sample_rate == 16000 + assert handler.hotkey == "f9" + + def test_get_voice_handler_custom(self): + """Test get_voice_handler with custom parameters.""" + with patch.dict( + "sys.modules", + { + "sounddevice": MagicMock(), + "faster_whisper": MagicMock(), + "pynput": MagicMock(), + "pynput.keyboard": MagicMock(), + }, + ): + from cortex.voice import get_voice_handler + + handler = get_voice_handler( + model_name="base.en", + sample_rate=44100, + hotkey="ctrl+m", + ) + assert handler.model_name == "base.en" + assert handler.sample_rate == 44100 + assert handler.hotkey == "ctrl+m" + + +class TestRecordingState: + """Test recording state management.""" + + @pytest.fixture + def handler(self): + """Create handler with mocked dependencies.""" + with patch.dict( + "sys.modules", + { + "sounddevice": MagicMock(), + "faster_whisper": MagicMock(), + "pynput": MagicMock(), + "pynput.keyboard": MagicMock(), + }, + ): + from cortex.voice import VoiceInputHandler + + return VoiceInputHandler() + + def test_initial_state(self, handler): + """Test initial recording state.""" + assert handler._is_recording is False + assert handler._audio_buffer == [] + assert handler._recording_thread is None + + def test_stop_recording_event(self, handler): + """Test stop recording event is properly set.""" + assert not handler._stop_recording.is_set() + handler._stop_recording.set() + assert handler._stop_recording.is_set() + handler._stop_recording.clear() + assert not handler._stop_recording.is_set()