From 55c760bacc9393f968aa2fa5896d91bf6d594a12 Mon Sep 17 00:00:00 2001 From: jeremylongshore Date: Sat, 10 Jan 2026 10:29:21 -0600 Subject: [PATCH 1/7] feat: add systemd service helper with plain-English explanations Implements the systemd service helper feature requested in #448: - Plain-English status explanations for any service - Failure diagnosis with log analysis and actionable suggestions - Visual dependency tree display - Interactive unit file generator wizard CLI commands added: - cortex systemd status - cortex systemd diagnose - cortex systemd deps - cortex systemd generate Includes 37 unit tests with 95% code coverage. Closes #448 --- cortex/cli.py | 96 ++++++ cortex/systemd_helper.py | 577 +++++++++++++++++++++++++++++++++++ demo_systemd.cast | 122 ++++++++ demo_systemd.sh | 36 +++ tests/test_systemd_helper.py | 502 ++++++++++++++++++++++++++++++ 5 files changed, 1333 insertions(+) create mode 100644 cortex/systemd_helper.py create mode 100644 demo_systemd.cast create mode 100644 demo_systemd.sh create mode 100644 tests/test_systemd_helper.py diff --git a/cortex/cli.py b/cortex/cli.py index 9261a816..55738c80 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2001,6 +2001,66 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: console.print(f"Error: {result.error_message}", style="red") return 1 + def systemd(self, args: argparse.Namespace) -> int: + """Handle systemd service helper commands. + + Args: + args: Parsed command-line arguments. + + Returns: + int: Exit code (0 for success, 1 for error). + """ + from cortex.systemd_helper import ( + run_deps_command, + run_diagnose_command, + run_generate_command, + run_status_command, + ) + + action = getattr(args, "systemd_action", None) + + if action == "status": + try: + run_status_command(args.service) + return 0 + except RuntimeError as e: + console.print(f"[red]Error: {e}[/red]") + return 1 + + elif action == "diagnose": + try: + run_diagnose_command(args.service) + return 0 + except RuntimeError as e: + console.print(f"[red]Error: {e}[/red]") + return 1 + + elif action == "deps": + try: + run_deps_command(args.service) + return 0 + except RuntimeError as e: + console.print(f"[red]Error: {e}[/red]") + return 1 + + elif action == "generate": + try: + run_generate_command() + return 0 + except RuntimeError as e: + console.print(f"[red]Error: {e}[/red]") + return 1 + + else: + console.print("[bold]Systemd Service Helper[/bold]") + console.print() + console.print("Commands:") + console.print(" status - Explain service status in plain English") + console.print(" diagnose - Diagnose why a service failed") + console.print(" deps - Show service dependencies") + console.print(" generate - Interactive unit file generator") + return 0 + # -------------------------- @@ -2040,6 +2100,7 @@ def show_rich_help(): table.add_row("stack ", "Install the stack") table.add_row("docker permissions", "Fix Docker bind-mount permissions") table.add_row("sandbox ", "Test packages in Docker sandbox") + table.add_row("systemd ", "Systemd service helper") table.add_row("doctor", "System health check") console.print(table) @@ -2270,6 +2331,39 @@ def main(): sandbox_exec_parser.add_argument("command", nargs="+", help="Command to execute") # -------------------------- + # --- Systemd Service Helper Commands --- + systemd_parser = subparsers.add_parser( + "systemd", help="Systemd service helper with plain-English explanations" + ) + systemd_subs = systemd_parser.add_subparsers(dest="systemd_action", help="Systemd actions") + + # systemd status + systemd_status_parser = systemd_subs.add_parser( + "status", help="Explain a service's status in plain English" + ) + systemd_status_parser.add_argument("service", help="Service name to check") + + # systemd diagnose [--lines N] + systemd_diagnose_parser = systemd_subs.add_parser( + "diagnose", help="Diagnose why a service failed" + ) + systemd_diagnose_parser.add_argument("service", help="Service name to diagnose") + systemd_diagnose_parser.add_argument( + "--lines", "-n", type=int, default=50, help="Number of log lines to analyze" + ) + + # systemd deps + systemd_deps_parser = systemd_subs.add_parser( + "deps", help="Show service dependencies as a visual tree" + ) + systemd_deps_parser.add_argument("service", help="Service name") + + # systemd generate + systemd_subs.add_parser( + "generate", help="Interactive wizard to generate a systemd unit file" + ) + # -------------------------- + # --- Environment Variable Management Commands --- env_parser = subparsers.add_parser("env", help="Manage environment variables") env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions") @@ -2531,6 +2625,8 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command == "systemd": + return cli.systemd(args) else: parser.print_help() return 1 diff --git a/cortex/systemd_helper.py b/cortex/systemd_helper.py new file mode 100644 index 00000000..c3950298 --- /dev/null +++ b/cortex/systemd_helper.py @@ -0,0 +1,577 @@ +""" +Systemd Service Helper for Cortex Linux. +Provides plain-English explanations, unit file generation, +failure diagnostics, and dependency visualization. +""" + +import re +import subprocess +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Optional + +from rich import box +from rich.panel import Panel +from rich.prompt import Confirm, Prompt +from rich.table import Table +from rich.tree import Tree + +from cortex.branding import console + + +class ServiceState(Enum): + """Possible states of a systemd service.""" + + RUNNING = "running" + STOPPED = "stopped" + FAILED = "failed" + INACTIVE = "inactive" + ACTIVATING = "activating" + DEACTIVATING = "deactivating" + UNKNOWN = "unknown" + + +@dataclass +class ServiceStatus: + """Parsed status information for a systemd service. + + Attributes: + name: The service name (without .service suffix). + state: Current state of the service. + description: Human-readable description of the service. + load_state: Whether the unit file is loaded. + active_state: Active state (active, inactive, failed). + sub_state: Sub-state providing more detail. + pid: Process ID if running. + memory: Memory usage if available. + cpu: CPU time if available. + started_at: When the service started. + exit_code: Exit code if service failed. + main_pid_code: The exit status of the main process. + """ + + name: str + state: ServiceState + description: str = "" + load_state: str = "" + active_state: str = "" + sub_state: str = "" + pid: Optional[int] = None + memory: str = "" + cpu: str = "" + started_at: str = "" + exit_code: Optional[int] = None + main_pid_code: str = "" + + +class SystemdHelper: + """ + Helper for managing and understanding systemd services. + + Provides plain-English explanations of service status, + generates unit files from simple descriptions, diagnoses + failures, and visualizes service dependencies. + """ + + def __init__(self) -> None: + """Initialize the SystemdHelper.""" + self._check_systemd_available() + + def _check_systemd_available(self) -> None: + """Check if systemd is available on the system. + + Raises: + RuntimeError: If systemd is not available. + """ + try: + result = subprocess.run( + ["systemctl", "--version"], + capture_output=True, + text=True, + timeout=5, + ) + if result.returncode != 0: + raise RuntimeError("systemd is not available on this system") + except FileNotFoundError: + raise RuntimeError("systemctl command not found - is systemd installed?") + except subprocess.TimeoutExpired: + raise RuntimeError("Timeout checking systemd availability") + + def get_service_status(self, service_name: str) -> ServiceStatus: + """Get the status of a systemd service. + + Args: + service_name: Name of the service (with or without .service suffix). + + Returns: + ServiceStatus object with parsed information. + + Raises: + ValueError: If the service name is empty. + """ + if not service_name: + raise ValueError("Service name cannot be empty") + + # Normalize service name + if not service_name.endswith(".service"): + service_name = f"{service_name}.service" + + result = subprocess.run( + ["systemctl", "show", service_name, "--no-pager"], + capture_output=True, + text=True, + timeout=10, + ) + + properties = {} + for line in result.stdout.strip().split("\n"): + if "=" in line: + key, _, value = line.partition("=") + properties[key] = value + + # Determine state + active_state = properties.get("ActiveState", "unknown") + sub_state = properties.get("SubState", "") + + if active_state == "active" and sub_state == "running": + state = ServiceState.RUNNING + elif active_state == "failed": + state = ServiceState.FAILED + elif active_state == "inactive": + state = ServiceState.INACTIVE + elif active_state == "activating": + state = ServiceState.ACTIVATING + elif active_state == "deactivating": + state = ServiceState.DEACTIVATING + else: + state = ServiceState.UNKNOWN + + # Parse PID + pid = None + main_pid = properties.get("MainPID", "0") + if main_pid.isdigit() and int(main_pid) > 0: + pid = int(main_pid) + + # Parse exit code + exit_code = None + exec_main_status = properties.get("ExecMainStatus", "0") + if exec_main_status.isdigit(): + exit_code = int(exec_main_status) + + return ServiceStatus( + name=service_name.replace(".service", ""), + state=state, + description=properties.get("Description", ""), + load_state=properties.get("LoadState", ""), + active_state=active_state, + sub_state=sub_state, + pid=pid, + memory=properties.get("MemoryCurrent", ""), + cpu=properties.get("CPUUsageNSec", ""), + started_at=properties.get("ActiveEnterTimestamp", ""), + exit_code=exit_code, + main_pid_code=properties.get("ExecMainCode", ""), + ) + + def explain_status(self, service_name: str) -> str: + """Explain a service's status in plain English. + + Args: + service_name: Name of the service to explain. + + Returns: + Human-readable explanation of the service status. + """ + status = self.get_service_status(service_name) + + explanations = [] + + # Main status + if status.state == ServiceState.RUNNING: + explanations.append(f"[green]'{status.name}' is running normally.[/green]") + if status.pid: + explanations.append(f" Process ID: {status.pid}") + if status.started_at: + explanations.append(f" Started: {status.started_at}") + elif status.state == ServiceState.FAILED: + explanations.append(f"[red]'{status.name}' has failed![/red]") + if status.exit_code and status.exit_code != 0: + explanations.append(f" Exit code: {status.exit_code}") + explanations.append(self._explain_exit_code(status.exit_code)) + elif status.state == ServiceState.INACTIVE: + explanations.append(f"[yellow]'{status.name}' is not running (inactive).[/yellow]") + explanations.append(" The service is stopped but can be started.") + elif status.state == ServiceState.ACTIVATING: + explanations.append(f"[cyan]'{status.name}' is starting up...[/cyan]") + elif status.state == ServiceState.DEACTIVATING: + explanations.append(f"[cyan]'{status.name}' is shutting down...[/cyan]") + else: + explanations.append(f"[yellow]'{status.name}' is in an unknown state.[/yellow]") + + if status.description: + explanations.append(f" Description: {status.description}") + + return "\n".join(explanations) + + def _explain_exit_code(self, exit_code: int) -> str: + """Provide explanation for common exit codes. + + Args: + exit_code: The exit code to explain. + + Returns: + Human-readable explanation of the exit code. + """ + common_codes = { + 1: " Likely cause: General error or misconfiguration", + 2: " Likely cause: Misuse of command or invalid arguments", + 126: " Likely cause: Command found but not executable (permission issue)", + 127: " Likely cause: Command not found (check if binary exists)", + 128: " Likely cause: Invalid exit argument", + 130: " Likely cause: Script terminated by Ctrl+C", + 137: " Likely cause: Process killed (SIGKILL) - possibly out of memory", + 139: " Likely cause: Segmentation fault (SIGSEGV) - program crash", + 143: " Likely cause: Process terminated (SIGTERM) - graceful shutdown requested", + 255: " Likely cause: Exit status out of range or SSH error", + } + return common_codes.get( + exit_code, f" Exit code {exit_code} - check service logs for details" + ) + + def diagnose_failure(self, service_name: str, lines: int = 50) -> str: + """Diagnose why a service failed using journal logs. + + Args: + service_name: Name of the service to diagnose. + lines: Number of log lines to analyze. + + Returns: + Diagnostic report with actionable advice. + """ + status = self.get_service_status(service_name) + + report = [] + report.append(f"[bold]Diagnostic Report for '{service_name}'[/bold]\n") + + # Get status explanation + report.append(self.explain_status(service_name)) + report.append("") + + # Get recent logs + result = subprocess.run( + ["journalctl", "-u", service_name, "-n", str(lines), "--no-pager"], + capture_output=True, + text=True, + timeout=15, + ) + + logs = result.stdout + + # Analyze logs for common issues + report.append("[bold]Log Analysis:[/bold]") + + issues_found = [] + + # Check for common error patterns + error_patterns = [ + ( + r"permission denied", + "Permission issue detected", + "Try: Check file permissions and user/group settings in the unit file", + ), + ( + r"address already in use", + "Port conflict detected", + "Try: Check what's using the port with 'ss -tlnp | grep '", + ), + ( + r"no such file or directory", + "Missing file or directory", + "Try: Verify all paths in the unit file exist", + ), + ( + r"connection refused", + "Network connection issue", + "Try: Check if the target service is running and accessible", + ), + ( + r"out of memory|cannot allocate|oom", + "Memory issue", + "Try: Increase available memory or add memory limits to prevent OOM", + ), + ( + r"timeout|timed out", + "Timeout occurred", + "Try: Increase TimeoutStartSec/TimeoutStopSec in the unit file", + ), + ( + r"dependency|requires|wants", + "Dependency issue", + "Try: Check if required services are running with 'systemctl status '", + ), + ] + + for pattern, issue, advice in error_patterns: + if re.search(pattern, logs, re.IGNORECASE): + issues_found.append((issue, advice)) + + if issues_found: + for issue, advice in issues_found: + report.append(f" [red]! {issue}[/red]") + report.append(f" {advice}") + else: + report.append(" No common error patterns detected in logs.") + report.append(" Review the full logs below for details.") + + report.append("") + report.append("[bold]Recent Logs:[/bold]") + # Show last 20 lines of logs + log_lines = logs.strip().split("\n")[-20:] + for line in log_lines: + if "error" in line.lower() or "fail" in line.lower(): + report.append(f" [red]{line}[/red]") + elif "warn" in line.lower(): + report.append(f" [yellow]{line}[/yellow]") + else: + report.append(f" {line}") + + return "\n".join(report) + + def show_dependencies(self, service_name: str) -> Tree: + """Show service dependencies as a visual tree. + + Args: + service_name: Name of the service. + + Returns: + Rich Tree object showing dependencies. + """ + if not service_name.endswith(".service"): + service_name = f"{service_name}.service" + + result = subprocess.run( + ["systemctl", "list-dependencies", service_name, "--no-pager"], + capture_output=True, + text=True, + timeout=10, + ) + + tree = Tree(f"[bold cyan]{service_name}[/bold cyan]") + + lines = result.stdout.strip().split("\n")[1:] # Skip header + + # Parse the tree structure + current_nodes = {0: tree} + + for line in lines: + if not line.strip(): + continue + + # Count indentation (each level is 2 spaces or tree chars) + indent = 0 + clean_line = line + for char in line: + if char in " │├└─": + indent += 1 + else: + break + + # Get the service name + clean_line = re.sub(r"[│├└─\s]+", "", line).strip() + if not clean_line: + continue + + level = indent // 2 + + # Color based on state + if ".target" in clean_line: + styled = f"[blue]{clean_line}[/blue]" + elif ".socket" in clean_line: + styled = f"[magenta]{clean_line}[/magenta]" + elif ".service" in clean_line: + styled = f"[green]{clean_line}[/green]" + else: + styled = clean_line + + # Add to tree + parent_level = max(0, level - 1) + if parent_level in current_nodes: + new_node = current_nodes[parent_level].add(styled) + current_nodes[level] = new_node + + return tree + + def generate_unit_file( + self, + description: str, + exec_start: str, + service_name: Optional[str] = None, + user: Optional[str] = None, + working_dir: Optional[str] = None, + restart: bool = True, + after: Optional[list[str]] = None, + environment: Optional[dict[str, str]] = None, + ) -> str: + """Generate a systemd unit file from simple parameters. + + Args: + description: Human-readable description of the service. + exec_start: Command to run when starting the service. + service_name: Name for the service (optional). + user: User to run the service as (optional). + working_dir: Working directory for the service (optional). + restart: Whether to restart on failure (default True). + after: List of units to start after (optional). + environment: Environment variables to set (optional). + + Returns: + Complete systemd unit file content as a string. + """ + lines = [] + + # [Unit] section + lines.append("[Unit]") + lines.append(f"Description={description}") + + if after: + lines.append(f"After={' '.join(after)}") + else: + lines.append("After=network.target") + + lines.append("") + + # [Service] section + lines.append("[Service]") + lines.append("Type=simple") + lines.append(f"ExecStart={exec_start}") + + if user: + lines.append(f"User={user}") + + if working_dir: + lines.append(f"WorkingDirectory={working_dir}") + + if environment: + for key, value in environment.items(): + lines.append(f"Environment={key}={value}") + + if restart: + lines.append("Restart=on-failure") + lines.append("RestartSec=5") + + lines.append("") + + # [Install] section + lines.append("[Install]") + lines.append("WantedBy=multi-user.target") + lines.append("") + + return "\n".join(lines) + + def interactive_unit_generator(self) -> str: + """Interactive CLI wizard for generating a unit file. + + Asks the user simple questions and generates a complete + systemd unit file based on their answers. + + Returns: + Generated unit file content as a string. + """ + console.print("\n[bold cyan]Systemd Unit File Generator[/bold cyan]") + console.print("Answer a few questions to create your service file.\n") + + # Service name + service_name = Prompt.ask("[cyan]Service name[/cyan]", default="my-service") + + # Description + description = Prompt.ask( + "[cyan]What does this service do?[/cyan]", default="My custom service" + ) + + # Command + exec_start = Prompt.ask("[cyan]Command to run[/cyan]", default="/usr/local/bin/my-app") + + # User + run_as_root = Confirm.ask("[cyan]Run as root?[/cyan]", default=False) + user = None + if not run_as_root: + user = Prompt.ask("[cyan]Run as which user?[/cyan]", default="nobody") + + # Working directory + has_workdir = Confirm.ask("[cyan]Set a working directory?[/cyan]", default=False) + working_dir = None + if has_workdir: + working_dir = Prompt.ask( + "[cyan]Working directory path[/cyan]", default="/var/lib/my-service" + ) + + # Restart on failure + restart = Confirm.ask("[cyan]Restart automatically on failure?[/cyan]", default=True) + + # Start on boot + start_on_boot = Confirm.ask("[cyan]Start automatically on boot?[/cyan]", default=True) + + # Generate the unit file + unit_content = self.generate_unit_file( + description=description, + exec_start=exec_start, + service_name=service_name, + user=user, + working_dir=working_dir, + restart=restart, + ) + + console.print("\n[bold green]Generated Unit File:[/bold green]\n") + console.print(Panel(unit_content, title=f"{service_name}.service", border_style="green")) + + # Installation instructions + console.print("\n[bold]Installation Instructions:[/bold]") + console.print(f"1. Save to: /etc/systemd/system/{service_name}.service") + console.print("2. Reload systemd: sudo systemctl daemon-reload") + if start_on_boot: + console.print(f"3. Enable service: sudo systemctl enable {service_name}") + console.print(f"4. Start service: sudo systemctl start {service_name}") + console.print(f"5. Check status: sudo systemctl status {service_name}") + + return unit_content + + +def run_status_command(service_name: str) -> None: + """Run the status explanation command. + + Args: + service_name: Name of the service to check. + """ + helper = SystemdHelper() + explanation = helper.explain_status(service_name) + console.print(Panel(explanation, title="Service Status", border_style="cyan")) + + +def run_diagnose_command(service_name: str) -> None: + """Run the diagnostic command for a failed service. + + Args: + service_name: Name of the service to diagnose. + """ + helper = SystemdHelper() + report = helper.diagnose_failure(service_name) + console.print(report) + + +def run_deps_command(service_name: str) -> None: + """Show dependencies for a service. + + Args: + service_name: Name of the service. + """ + helper = SystemdHelper() + tree = helper.show_dependencies(service_name) + console.print("\n[bold]Service Dependencies:[/bold]") + console.print(tree) + + +def run_generate_command() -> None: + """Run the interactive unit file generator.""" + helper = SystemdHelper() + helper.interactive_unit_generator() diff --git a/demo_systemd.cast b/demo_systemd.cast new file mode 100644 index 00000000..8589b86b --- /dev/null +++ b/demo_systemd.cast @@ -0,0 +1,122 @@ +{"version": 2, "width": 100, "height": 30, "timestamp": 1768059791, "title": "Cortex Systemd Helper Demo", "env": {"SHELL": "/bin/bash", "TERM": "xterm-256color"}} +[0.5, "o", "\u001b[1;36m=== Cortex Systemd Helper Demo ===\u001b[0m\r\n\r\n"] +[0.8, "o", "\u001b[1;33m1. Plain-English service status explanation:\u001b[0m\r\n"] +[1.1, "o", "\r\n\u001b[32m$\u001b[0m "] +[1.1300000000000001, "o", "c"] +[1.1600000000000001, "o", "o"] +[1.1900000000000002, "o", "r"] +[1.2200000000000002, "o", "t"] +[1.2500000000000002, "o", "e"] +[1.2800000000000002, "o", "x"] +[1.3100000000000003, "o", " "] +[1.3400000000000003, "o", "s"] +[1.3700000000000003, "o", "y"] +[1.4000000000000004, "o", "s"] +[1.4300000000000004, "o", "t"] +[1.4600000000000004, "o", "e"] +[1.4900000000000004, "o", "m"] +[1.5200000000000005, "o", "d"] +[1.5500000000000005, "o", " "] +[1.5800000000000005, "o", "s"] +[1.6100000000000005, "o", "t"] +[1.6400000000000006, "o", "a"] +[1.6700000000000006, "o", "t"] +[1.7000000000000006, "o", "u"] +[1.7300000000000006, "o", "s"] +[1.7600000000000007, "o", " "] +[1.7900000000000007, "o", "s"] +[1.8200000000000007, "o", "s"] +[1.8500000000000008, "o", "h"] +[1.9500000000000008, "o", "\r\n"] +[1.9700000000000009, "o", "\u256d\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Service Status \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u256e\r\n"] +[1.9900000000000009, "o", "\u2502 'ssh' is running normally. \u2502\r\n"] +[2.0100000000000007, "o", "\u2502 Process ID: 2470 \u2502\r\n"] +[2.0300000000000007, "o", "\u2502 Started: Sat 2025-10-25 14:02:02 CDT \u2502\r\n"] +[2.0500000000000007, "o", "\u2502 Description: OpenBSD Secure Shell server \u2502\r\n"] +[2.0700000000000007, "o", "\u2570\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u256f\r\n"] +[2.0900000000000007, "o", "\r\n"] +[3.3900000000000006, "o", "\r\n\u001b[1;33m2. Visual dependency tree:\u001b[0m\r\n"] +[3.6900000000000004, "o", "\r\n\u001b[32m$\u001b[0m "] +[3.72, "o", "c"] +[3.75, "o", "o"] +[3.78, "o", "r"] +[3.8099999999999996, "o", "t"] +[3.8399999999999994, "o", "e"] +[3.869999999999999, "o", "x"] +[3.899999999999999, "o", " "] +[3.929999999999999, "o", "s"] +[3.9599999999999986, "o", "y"] +[3.9899999999999984, "o", "s"] +[4.019999999999999, "o", "t"] +[4.049999999999999, "o", "e"] +[4.079999999999999, "o", "m"] +[4.109999999999999, "o", "d"] +[4.14, "o", " "] +[4.17, "o", "d"] +[4.2, "o", "e"] +[4.23, "o", "p"] +[4.260000000000001, "o", "s"] +[4.290000000000001, "o", " "] +[4.320000000000001, "o", "s"] +[4.350000000000001, "o", "y"] +[4.380000000000002, "o", "s"] +[4.410000000000002, "o", "t"] +[4.440000000000002, "o", "e"] +[4.470000000000002, "o", "m"] +[4.500000000000003, "o", "d"] +[4.530000000000003, "o", "-"] +[4.560000000000003, "o", "j"] +[4.590000000000003, "o", "o"] +[4.620000000000004, "o", "u"] +[4.650000000000004, "o", "r"] +[4.680000000000004, "o", "n"] +[4.710000000000004, "o", "a"] +[4.740000000000005, "o", "l"] +[4.770000000000005, "o", "d"] +[4.8700000000000045, "o", "\r\n"] +[4.890000000000004, "o", "\r\n"] +[4.910000000000004, "o", "Service Dependencies:\r\n"] +[4.930000000000003, "o", "systemd-journald.service\r\n"] +[4.950000000000003, "o", "\u2514\u2500\u2500 \u25cf-.mount\r\n"] +[4.970000000000002, "o", " \u2514\u2500\u2500 \u25cfsystem.slice\r\n"] +[4.990000000000002, "o", " \u2514\u2500\u2500 \u25cfsystemd-journald-dev-log.socket\r\n"] +[5.010000000000002, "o", " \u2514\u2500\u2500 \u25cfsystemd-journald.socket\r\n"] +[5.030000000000001, "o", "\r\n"] +[6.330000000000001, "o", "\r\n\u001b[1;33m3. Available commands:\u001b[0m\r\n"] +[6.630000000000001, "o", "\r\n\u001b[32m$\u001b[0m "] +[6.660000000000001, "o", "c"] +[6.690000000000001, "o", "o"] +[6.7200000000000015, "o", "r"] +[6.750000000000002, "o", "t"] +[6.780000000000002, "o", "e"] +[6.810000000000002, "o", "x"] +[6.8400000000000025, "o", " "] +[6.870000000000003, "o", "s"] +[6.900000000000003, "o", "y"] +[6.930000000000003, "o", "s"] +[6.9600000000000035, "o", "t"] +[6.990000000000004, "o", "e"] +[7.020000000000004, "o", "m"] +[7.050000000000004, "o", "d"] +[7.0800000000000045, "o", " "] +[7.110000000000005, "o", "-"] +[7.140000000000005, "o", "-"] +[7.170000000000005, "o", "h"] +[7.2000000000000055, "o", "e"] +[7.230000000000006, "o", "l"] +[7.260000000000006, "o", "p"] +[7.360000000000006, "o", "\r\n"] +[7.380000000000005, "o", "usage: cortex systemd [-h] {status,diagnose,deps,generate} ...\r\n"] +[7.400000000000005, "o", "\r\n"] +[7.420000000000004, "o", "positional arguments:\r\n"] +[7.440000000000004, "o", " {status,diagnose,deps,generate}\r\n"] +[7.4600000000000035, "o", " Systemd actions\r\n"] +[7.480000000000003, "o", " status Explain a service's status in plain English\r\n"] +[7.500000000000003, "o", " diagnose Diagnose why a service failed\r\n"] +[7.520000000000002, "o", " deps Show service dependencies as a visual tree\r\n"] +[7.540000000000002, "o", " generate Interactive wizard to generate a systemd unit file\r\n"] +[7.560000000000001, "o", "\r\n"] +[7.580000000000001, "o", "options:\r\n"] +[7.6000000000000005, "o", " -h, --help show this help message and exit\r\n"] +[7.62, "o", "\r\n"] +[9.120000000000001, "o", "\r\n\u001b[1;32m=== Demo Complete ===\u001b[0m\r\n"] diff --git a/demo_systemd.sh b/demo_systemd.sh new file mode 100644 index 00000000..55761d18 --- /dev/null +++ b/demo_systemd.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# Demo script for cortex systemd helper +# Run with: asciinema rec demo.cast && bash demo_systemd.sh + +echo "=== Cortex Systemd Helper Demo ===" +echo "" +sleep 1 + +echo "1. Check service status with plain-English explanation:" +echo " $ cortex systemd status ssh" +sleep 1 +cortex systemd status ssh +sleep 2 + +echo "" +echo "2. View service dependencies as a visual tree:" +echo " $ cortex systemd deps ssh" +sleep 1 +cortex systemd deps ssh +sleep 2 + +echo "" +echo "3. Diagnose a failed service (showing what it would look like):" +echo " $ cortex systemd diagnose some-failed-service" +sleep 1 +cortex systemd diagnose ssh 2>/dev/null || echo "[Would show diagnosis for failed services]" +sleep 2 + +echo "" +echo "4. Generate a systemd unit file interactively:" +echo " $ cortex systemd generate" +echo " [Interactive wizard would start here]" +sleep 2 + +echo "" +echo "=== Demo Complete ===" diff --git a/tests/test_systemd_helper.py b/tests/test_systemd_helper.py new file mode 100644 index 00000000..6f718040 --- /dev/null +++ b/tests/test_systemd_helper.py @@ -0,0 +1,502 @@ +""" +Unit tests for cortex/systemd_helper.py - Systemd Service Helper. +Tests the SystemdHelper class used by 'cortex systemd' commands. +""" + +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from cortex.systemd_helper import ( + ServiceState, + ServiceStatus, + SystemdHelper, + run_deps_command, + run_diagnose_command, + run_generate_command, + run_status_command, +) + + +class TestServiceStateEnum: + """Test the ServiceState enum values.""" + + def test_all_states_exist(self): + assert ServiceState.RUNNING.value == "running" + assert ServiceState.STOPPED.value == "stopped" + assert ServiceState.FAILED.value == "failed" + assert ServiceState.INACTIVE.value == "inactive" + assert ServiceState.ACTIVATING.value == "activating" + assert ServiceState.DEACTIVATING.value == "deactivating" + assert ServiceState.UNKNOWN.value == "unknown" + + +class TestServiceStatusDataclass: + """Test the ServiceStatus dataclass.""" + + def test_basic_creation(self): + status = ServiceStatus( + name="nginx", + state=ServiceState.RUNNING, + ) + assert status.name == "nginx" + assert status.state == ServiceState.RUNNING + assert status.description == "" + assert status.pid is None + + def test_full_creation(self): + status = ServiceStatus( + name="nginx", + state=ServiceState.RUNNING, + description="A high performance web server", + load_state="loaded", + active_state="active", + sub_state="running", + pid=1234, + memory="50MB", + cpu="100ms", + started_at="2024-01-01", + exit_code=0, + main_pid_code="exited", + ) + assert status.pid == 1234 + assert status.memory == "50MB" + + +class TestSystemdHelperInit: + """Test SystemdHelper initialization.""" + + def test_init_success(self): + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "systemd 252" + + with patch("subprocess.run", return_value=mock_result): + helper = SystemdHelper() + assert helper is not None + + def test_init_systemd_not_available(self): + mock_result = MagicMock() + mock_result.returncode = 1 + + with patch("subprocess.run", return_value=mock_result): + with pytest.raises(RuntimeError, match="systemd is not available"): + SystemdHelper() + + def test_init_systemctl_not_found(self): + with patch("subprocess.run", side_effect=FileNotFoundError()): + with pytest.raises(RuntimeError, match="systemctl command not found"): + SystemdHelper() + + def test_init_timeout(self): + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("cmd", 5)): + with pytest.raises(RuntimeError, match="Timeout"): + SystemdHelper() + + +class TestGetServiceStatus: + """Test the get_service_status method.""" + + @pytest.fixture + def helper(self): + """Create a helper instance with mocked init.""" + with patch.object(SystemdHelper, "_check_systemd_available"): + return SystemdHelper() + + def test_empty_service_name(self, helper): + with pytest.raises(ValueError, match="Service name cannot be empty"): + helper.get_service_status("") + + def test_running_service(self, helper): + mock_output = """LoadState=loaded +ActiveState=active +SubState=running +Description=The NGINX HTTP Server +MainPID=1234 +MemoryCurrent=52428800 +CPUUsageNSec=1000000000 +ActiveEnterTimestamp=Mon 2024-01-01 00:00:00 UTC +ExecMainStatus=0 +ExecMainCode=exited +""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = mock_output + + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + assert status.name == "nginx" + assert status.state == ServiceState.RUNNING + assert status.pid == 1234 + assert status.description == "The NGINX HTTP Server" + + def test_failed_service(self, helper): + mock_output = """LoadState=loaded +ActiveState=failed +SubState=failed +Description=My Service +MainPID=0 +ExecMainStatus=1 +""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = mock_output + + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("myservice") + + assert status.state == ServiceState.FAILED + assert status.exit_code == 1 + + def test_inactive_service(self, helper): + mock_output = """LoadState=loaded +ActiveState=inactive +SubState=dead +""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = mock_output + + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("stopped-service") + + assert status.state == ServiceState.INACTIVE + + def test_activating_service(self, helper): + mock_output = """LoadState=loaded +ActiveState=activating +SubState=start +""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = mock_output + + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("starting-service") + + assert status.state == ServiceState.ACTIVATING + + def test_deactivating_service(self, helper): + mock_output = """LoadState=loaded +ActiveState=deactivating +SubState=stop +""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = mock_output + + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("stopping-service") + + assert status.state == ServiceState.DEACTIVATING + + def test_service_suffix_normalization(self, helper): + mock_output = """ActiveState=active +SubState=running +""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = mock_output + + with patch("subprocess.run", return_value=mock_result) as mock_run: + helper.get_service_status("nginx") + + # Check that .service was appended + call_args = mock_run.call_args[0][0] + assert "nginx.service" in call_args + + +class TestExplainStatus: + """Test the explain_status method.""" + + @pytest.fixture + def helper(self): + with patch.object(SystemdHelper, "_check_systemd_available"): + return SystemdHelper() + + def test_explain_running_service(self, helper): + mock_output = """ActiveState=active +SubState=running +MainPID=1234 +ActiveEnterTimestamp=Mon 2024-01-01 00:00:00 UTC +""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = mock_output + + with patch("subprocess.run", return_value=mock_result): + explanation = helper.explain_status("nginx") + + assert "running normally" in explanation + assert "1234" in explanation + + def test_explain_failed_service(self, helper): + mock_output = """ActiveState=failed +SubState=failed +ExecMainStatus=137 +""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = mock_output + + with patch("subprocess.run", return_value=mock_result): + explanation = helper.explain_status("myservice") + + assert "failed" in explanation + assert "137" in explanation + assert "SIGKILL" in explanation or "memory" in explanation.lower() + + def test_explain_inactive_service(self, helper): + mock_output = """ActiveState=inactive +SubState=dead +""" + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = mock_output + + with patch("subprocess.run", return_value=mock_result): + explanation = helper.explain_status("stopped") + + assert "not running" in explanation or "inactive" in explanation + + +class TestExplainExitCode: + """Test the _explain_exit_code method.""" + + @pytest.fixture + def helper(self): + with patch.object(SystemdHelper, "_check_systemd_available"): + return SystemdHelper() + + @pytest.mark.parametrize( + "exit_code, expected_text", + [ + (1, "General error"), + (127, "Command not found"), + (137, "SIGKILL"), + (139, "Segmentation fault"), + (143, "SIGTERM"), + (999, "check service logs"), + ], + ) + def test_exit_code_explanations(self, helper, exit_code, expected_text): + explanation = helper._explain_exit_code(exit_code) + assert expected_text in explanation + + +class TestDiagnoseFailure: + """Test the diagnose_failure method.""" + + @pytest.fixture + def helper(self): + with patch.object(SystemdHelper, "_check_systemd_available"): + return SystemdHelper() + + def test_diagnose_with_permission_error(self, helper): + status_output = """ActiveState=failed +SubState=failed +ExecMainStatus=1 +""" + journal_output = """Jan 01 00:00:00 host myservice[1234]: permission denied while opening /etc/myservice.conf +Jan 01 00:00:01 host myservice[1234]: fatal error, exiting +""" + mock_status = MagicMock(returncode=0, stdout=status_output) + mock_journal = MagicMock(returncode=0, stdout=journal_output) + + # diagnose_failure calls get_service_status (which calls subprocess.run) + # then calls subprocess.run for journalctl + with patch("subprocess.run", side_effect=[mock_status, mock_status, mock_journal]): + report = helper.diagnose_failure("myservice") + + assert "Permission issue detected" in report + assert "permission" in report.lower() + + def test_diagnose_with_port_conflict(self, helper): + status_output = """ActiveState=failed +SubState=failed +""" + journal_output = """Jan 01 00:00:00 host nginx[1234]: bind() to 0.0.0.0:80 failed: address already in use +""" + mock_status = MagicMock(returncode=0, stdout=status_output) + mock_journal = MagicMock(returncode=0, stdout=journal_output) + + # diagnose_failure calls get_service_status then journalctl + with patch("subprocess.run", side_effect=[mock_status, mock_status, mock_journal]): + report = helper.diagnose_failure("nginx") + + assert "Port conflict detected" in report + + def test_diagnose_with_missing_file(self, helper): + status_output = """ActiveState=failed +SubState=failed +""" + journal_output = """Jan 01 00:00:00 host myapp[1234]: /opt/myapp/bin/run: No such file or directory +""" + mock_status = MagicMock(returncode=0, stdout=status_output) + mock_journal = MagicMock(returncode=0, stdout=journal_output) + + with patch("subprocess.run", side_effect=[mock_status, mock_status, mock_journal]): + report = helper.diagnose_failure("myapp") + + assert "Missing file" in report + + +class TestShowDependencies: + """Test the show_dependencies method.""" + + @pytest.fixture + def helper(self): + with patch.object(SystemdHelper, "_check_systemd_available"): + return SystemdHelper() + + def test_show_dependencies_basic(self, helper): + mock_output = """nginx.service +├─system.slice +│ └─-.slice +├─network.target +│ └─network-online.target +└─sysinit.target +""" + mock_result = MagicMock(returncode=0, stdout=mock_output) + + with patch("subprocess.run", return_value=mock_result): + tree = helper.show_dependencies("nginx") + + # Tree should be a Rich Tree object + assert tree is not None + assert tree.label is not None + + +class TestGenerateUnitFile: + """Test the generate_unit_file method.""" + + @pytest.fixture + def helper(self): + with patch.object(SystemdHelper, "_check_systemd_available"): + return SystemdHelper() + + def test_basic_generation(self, helper): + content = helper.generate_unit_file( + description="My Test Service", + exec_start="/usr/bin/myapp", + ) + + assert "[Unit]" in content + assert "Description=My Test Service" in content + assert "[Service]" in content + assert "ExecStart=/usr/bin/myapp" in content + assert "[Install]" in content + assert "WantedBy=multi-user.target" in content + assert "After=network.target" in content + + def test_with_user(self, helper): + content = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + user="www-data", + ) + + assert "User=www-data" in content + + def test_with_working_directory(self, helper): + content = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + working_dir="/var/lib/myapp", + ) + + assert "WorkingDirectory=/var/lib/myapp" in content + + def test_with_restart_disabled(self, helper): + content = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + restart=False, + ) + + assert "Restart=on-failure" not in content + assert "RestartSec" not in content + + def test_with_custom_after(self, helper): + content = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + after=["postgresql.service", "redis.service"], + ) + + assert "After=postgresql.service redis.service" in content + + def test_with_environment(self, helper): + content = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + environment={"PORT": "8080", "DEBUG": "false"}, + ) + + assert "Environment=PORT=8080" in content + assert "Environment=DEBUG=false" in content + + +class TestRunCommands: + """Test the run_* command functions.""" + + def test_run_status_command(self): + mock_output = """ActiveState=active +SubState=running +""" + mock_init = MagicMock(returncode=0, stdout="systemd 252") + mock_status = MagicMock(returncode=0, stdout=mock_output) + + with patch("subprocess.run", side_effect=[mock_init, mock_status]): + # Should not raise + run_status_command("nginx") + + def test_run_diagnose_command(self): + mock_init = MagicMock(returncode=0, stdout="systemd 252") + mock_status = MagicMock(returncode=0, stdout="ActiveState=failed\n") + mock_journal = MagicMock(returncode=0, stdout="Error log line\n") + + # diagnose_failure calls get_service_status twice (once for status, once for explain) + # plus journalctl + with patch( + "subprocess.run", side_effect=[mock_init, mock_status, mock_status, mock_journal] + ): + run_diagnose_command("myservice") + + def test_run_deps_command(self): + mock_init = MagicMock(returncode=0, stdout="systemd 252") + mock_deps = MagicMock(returncode=0, stdout="nginx.service\n├─network.target\n") + + with patch("subprocess.run", side_effect=[mock_init, mock_deps]): + run_deps_command("nginx") + + def test_run_generate_command(self): + mock_init = MagicMock(returncode=0, stdout="systemd 252") + + with patch("subprocess.run", return_value=mock_init): + with patch( + "rich.prompt.Prompt.ask", + side_effect=[ + "myservice", # service name + "My Service", # description + "/usr/bin/myapp", # exec_start + "nobody", # user + "/var/lib/myapp", # working dir + ], + ): + with patch( + "rich.prompt.Confirm.ask", + side_effect=[ + False, # run as root + True, # has workdir + True, # restart + True, # start on boot + ], + ): + run_generate_command() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From 5330f4fccf0661e7b4d011a33d07269811ec00b9 Mon Sep 17 00:00:00 2001 From: jeremylongshore Date: Sun, 11 Jan 2026 22:23:40 -0600 Subject: [PATCH 2/7] fix(systemd): address bot review comments - Pass --lines CLI option to run_diagnose_command (fixes unused arg) - Add error handling for systemctl show failures with proper error messages - Add timeout handling to list-dependencies call - Fix Environment line quoting for values containing spaces/special chars - Remove unused imports (Path, box, Table) - Add shlex import for potential future quoting needs --- cortex/cli.py | 2 +- cortex/systemd_helper.py | 54 ++++++++++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index 55738c80..d3f0f54c 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2029,7 +2029,7 @@ def systemd(self, args: argparse.Namespace) -> int: elif action == "diagnose": try: - run_diagnose_command(args.service) + run_diagnose_command(args.service, lines=args.lines) return 0 except RuntimeError as e: console.print(f"[red]Error: {e}[/red]") diff --git a/cortex/systemd_helper.py b/cortex/systemd_helper.py index c3950298..c1551987 100644 --- a/cortex/systemd_helper.py +++ b/cortex/systemd_helper.py @@ -5,16 +5,14 @@ """ import re +import shlex import subprocess from dataclasses import dataclass from enum import Enum -from pathlib import Path from typing import Optional -from rich import box from rich.panel import Panel from rich.prompt import Confirm, Prompt -from rich.table import Table from rich.tree import Tree from cortex.branding import console @@ -117,12 +115,19 @@ def get_service_status(self, service_name: str) -> ServiceStatus: if not service_name.endswith(".service"): service_name = f"{service_name}.service" - result = subprocess.run( - ["systemctl", "show", service_name, "--no-pager"], - capture_output=True, - text=True, - timeout=10, - ) + try: + result = subprocess.run( + ["systemctl", "show", service_name, "--no-pager"], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode != 0: + raise RuntimeError( + f"Failed to get service status for {service_name}: {result.stderr.strip()}" + ) + except subprocess.TimeoutExpired: + raise RuntimeError(f"Timeout getting service status for {service_name}") properties = {} for line in result.stdout.strip().split("\n"): @@ -350,15 +355,19 @@ def show_dependencies(self, service_name: str) -> Tree: if not service_name.endswith(".service"): service_name = f"{service_name}.service" - result = subprocess.run( - ["systemctl", "list-dependencies", service_name, "--no-pager"], - capture_output=True, - text=True, - timeout=10, - ) - tree = Tree(f"[bold cyan]{service_name}[/bold cyan]") + try: + result = subprocess.run( + ["systemctl", "list-dependencies", service_name, "--no-pager"], + capture_output=True, + text=True, + timeout=10, + ) + except subprocess.TimeoutExpired: + tree.add("[yellow]Timed out fetching dependencies[/yellow]") + return tree + lines = result.stdout.strip().split("\n")[1:] # Skip header # Parse the tree structure @@ -454,7 +463,13 @@ def generate_unit_file( if environment: for key, value in environment.items(): - lines.append(f"Environment={key}={value}") + # Quote values that contain spaces or special characters + if " " in value or '"' in value or "'" in value or "\\" in value: + # Escape existing double quotes and backslashes + escaped_value = value.replace("\\", "\\\\").replace('"', '\\"') + lines.append(f'Environment={key}="{escaped_value}"') + else: + lines.append(f"Environment={key}={value}") if restart: lines.append("Restart=on-failure") @@ -548,14 +563,15 @@ def run_status_command(service_name: str) -> None: console.print(Panel(explanation, title="Service Status", border_style="cyan")) -def run_diagnose_command(service_name: str) -> None: +def run_diagnose_command(service_name: str, lines: int = 50) -> None: """Run the diagnostic command for a failed service. Args: service_name: Name of the service to diagnose. + lines: Number of log lines to analyze. """ helper = SystemdHelper() - report = helper.diagnose_failure(service_name) + report = helper.diagnose_failure(service_name, lines=lines) console.print(report) From 2ee1378bc5b760a7b3016f6c10d1063d17fce245 Mon Sep 17 00:00:00 2001 From: jeremylongshore Date: Mon, 12 Jan 2026 00:19:17 -0600 Subject: [PATCH 3/7] chore: trigger CLA re-check after CLA PR #568 merged From 33340133299d1b5f2f8dc23a7c99051cc7406c62 Mon Sep 17 00:00:00 2001 From: jeremylongshore Date: Mon, 12 Jan 2026 00:31:22 -0600 Subject: [PATCH 4/7] fix(lint): use X | None instead of Optional[X] for type annotations --- cortex/systemd_helper.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/cortex/systemd_helper.py b/cortex/systemd_helper.py index c1551987..78c7dc75 100644 --- a/cortex/systemd_helper.py +++ b/cortex/systemd_helper.py @@ -9,7 +9,6 @@ import subprocess from dataclasses import dataclass from enum import Enum -from typing import Optional from rich.panel import Panel from rich.prompt import Confirm, Prompt @@ -55,11 +54,11 @@ class ServiceStatus: load_state: str = "" active_state: str = "" sub_state: str = "" - pid: Optional[int] = None + pid: int | None = None memory: str = "" cpu: str = "" started_at: str = "" - exit_code: Optional[int] = None + exit_code: int | None = None main_pid_code: str = "" @@ -415,12 +414,12 @@ def generate_unit_file( self, description: str, exec_start: str, - service_name: Optional[str] = None, - user: Optional[str] = None, - working_dir: Optional[str] = None, + service_name: str | None = None, + user: str | None = None, + working_dir: str | None = None, restart: bool = True, - after: Optional[list[str]] = None, - environment: Optional[dict[str, str]] = None, + after: list[str] | None = None, + environment: dict[str, str] | None = None, ) -> str: """Generate a systemd unit file from simple parameters. From 78e34aa33c5fbfa597e863fcb07d17764f17c761 Mon Sep 17 00:00:00 2001 From: jeremylongshore Date: Mon, 12 Jan 2026 00:34:08 -0600 Subject: [PATCH 5/7] style: format cli.py with black --- cortex/cli.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index d3f0f54c..255d577b 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2359,9 +2359,7 @@ def main(): systemd_deps_parser.add_argument("service", help="Service name") # systemd generate - systemd_subs.add_parser( - "generate", help="Interactive wizard to generate a systemd unit file" - ) + systemd_subs.add_parser("generate", help="Interactive wizard to generate a systemd unit file") # -------------------------- # --- Environment Variable Management Commands --- From ec0cd4fb100ddc3289d999bdc6cf256718bb4de6 Mon Sep 17 00:00:00 2001 From: jeremylongshore Date: Tue, 13 Jan 2026 03:08:05 -0600 Subject: [PATCH 6/7] fix(systemd): address 19 security and quality issues Security fixes (CRITICAL): - Add proper escaping for $, backticks, and newlines in Environment= values - Prevent shell injection via malicious environment variables Security fixes (HIGH): - Increase journalctl timeout to 30s + add --since "1 hour ago" - Add subprocess error handling with returncode check - Add FileNotFoundError handling for systemctl/journalctl - Add service name validation regex (^[a-zA-Z0-9_@.-]+$) - Add robust dependency tree parsing with error recovery - Remove unused shlex import Quality improvements: - Define timeout constants (SYSTEMCTL_TIMEOUT, JOURNALCTL_TIMEOUT) - Add comprehensive exit code explanations (141, 142, 129, 2, etc.) - Add formula for signal codes (128+N) - Add sudo notes to installation instructions - Standardize error message format Tests: - Add 62 comprehensive unit tests - Test service name validation with malicious inputs - Test environment variable escaping edge cases - Test timeout and error handling scenarios - 91% coverage on systemd_helper.py Co-Authored-By: Claude Opus 4.5 --- cortex/systemd_helper.py | 235 +++++++---- tests/test_systemd_helper.py | 779 +++++++++++++++++++++++------------ 2 files changed, 690 insertions(+), 324 deletions(-) diff --git a/cortex/systemd_helper.py b/cortex/systemd_helper.py index 78c7dc75..872813d3 100644 --- a/cortex/systemd_helper.py +++ b/cortex/systemd_helper.py @@ -5,7 +5,6 @@ """ import re -import shlex import subprocess from dataclasses import dataclass from enum import Enum @@ -16,6 +15,14 @@ from cortex.branding import console +# Timeout constants (in seconds) +SYSTEMCTL_TIMEOUT = 10 +JOURNALCTL_TIMEOUT = 30 +VERSION_CHECK_TIMEOUT = 5 + +# Valid service name pattern +SERVICE_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9_@.-]+$') + class ServiceState(Enum): """Possible states of a systemd service.""" @@ -62,6 +69,36 @@ class ServiceStatus: main_pid_code: str = "" +def _validate_service_name(service_name: str) -> str: + """Validate and normalize a service name. + + Args: + service_name: The service name to validate. + + Returns: + Normalized service name with .service suffix. + + Raises: + ValueError: If the service name is invalid. + """ + if not service_name: + raise ValueError("Service name cannot be empty") + + # Remove .service suffix for validation + base_name = service_name.replace(".service", "") + + if not SERVICE_NAME_PATTERN.match(base_name): + raise ValueError( + f"Invalid service name: {service_name}. " + "Service names can only contain letters, numbers, underscores, @, dots, and hyphens." + ) + + # Normalize with .service suffix + if not service_name.endswith(".service"): + return f"{service_name}.service" + return service_name + + class SystemdHelper: """ Helper for managing and understanding systemd services. @@ -86,7 +123,7 @@ def _check_systemd_available(self) -> None: ["systemctl", "--version"], capture_output=True, text=True, - timeout=5, + timeout=VERSION_CHECK_TIMEOUT, ) if result.returncode != 0: raise RuntimeError("systemd is not available on this system") @@ -105,26 +142,24 @@ def get_service_status(self, service_name: str) -> ServiceStatus: ServiceStatus object with parsed information. Raises: - ValueError: If the service name is empty. + ValueError: If the service name is invalid. + RuntimeError: If the status cannot be retrieved. """ - if not service_name: - raise ValueError("Service name cannot be empty") - - # Normalize service name - if not service_name.endswith(".service"): - service_name = f"{service_name}.service" + service_name = _validate_service_name(service_name) try: result = subprocess.run( ["systemctl", "show", service_name, "--no-pager"], capture_output=True, text=True, - timeout=10, + timeout=SYSTEMCTL_TIMEOUT, ) if result.returncode != 0: raise RuntimeError( f"Failed to get service status for {service_name}: {result.stderr.strip()}" ) + except FileNotFoundError: + raise RuntimeError("systemctl command not found - is systemd installed?") except subprocess.TimeoutExpired: raise RuntimeError(f"Timeout getting service status for {service_name}") @@ -227,32 +262,45 @@ def _explain_exit_code(self, exit_code: int) -> str: Returns: Human-readable explanation of the exit code. """ + # Common exit codes common_codes = { 1: " Likely cause: General error or misconfiguration", - 2: " Likely cause: Misuse of command or invalid arguments", + 2: " Likely cause: Misuse of command, invalid arguments, or bad configuration", 126: " Likely cause: Command found but not executable (permission issue)", 127: " Likely cause: Command not found (check if binary exists)", 128: " Likely cause: Invalid exit argument", - 130: " Likely cause: Script terminated by Ctrl+C", + 130: " Likely cause: Script terminated by Ctrl+C (SIGINT)", 137: " Likely cause: Process killed (SIGKILL) - possibly out of memory", 139: " Likely cause: Segmentation fault (SIGSEGV) - program crash", + 141: " Likely cause: Broken pipe (SIGPIPE) - write to closed connection", + 142: " Likely cause: Alarm timeout (SIGALRM)", 143: " Likely cause: Process terminated (SIGTERM) - graceful shutdown requested", 255: " Likely cause: Exit status out of range or SSH error", } - return common_codes.get( - exit_code, f" Exit code {exit_code} - check service logs for details" - ) + + if exit_code in common_codes: + return common_codes[exit_code] + + # Exit codes 128+N mean killed by signal N + if exit_code > 128: + signal_num = exit_code - 128 + return f" Likely cause: Process killed by signal {signal_num}" + + return f" Exit code {exit_code} - check service logs for details" def diagnose_failure(self, service_name: str, lines: int = 50) -> str: """Diagnose why a service failed using journal logs. Args: service_name: Name of the service to diagnose. - lines: Number of log lines to analyze. + lines: Number of log lines to analyze (1-1000). Returns: Diagnostic report with actionable advice. """ + # Validate lines parameter + lines = max(1, min(1000, lines)) + status = self.get_service_status(service_name) report = [] @@ -262,15 +310,33 @@ def diagnose_failure(self, service_name: str, lines: int = 50) -> str: report.append(self.explain_status(service_name)) report.append("") - # Get recent logs - result = subprocess.run( - ["journalctl", "-u", service_name, "-n", str(lines), "--no-pager"], - capture_output=True, - text=True, - timeout=15, - ) - - logs = result.stdout + # Get recent logs with proper error handling + try: + result = subprocess.run( + [ + "journalctl", + "-u", + service_name, + "-n", + str(lines), + "--no-pager", + "--since", + "1 hour ago", + ], + capture_output=True, + text=True, + timeout=JOURNALCTL_TIMEOUT, + ) + if result.returncode != 0: + report.append(f"[red]Failed to read logs: {result.stderr.strip()}[/red]") + return "\n".join(report) + logs = result.stdout + except FileNotFoundError: + report.append("[red]journalctl command not found - cannot read logs[/red]") + return "\n".join(report) + except subprocess.TimeoutExpired: + report.append("[yellow]Timeout reading logs - journal may be very large[/yellow]") + logs = "" # Analyze logs for common issues report.append("[bold]Log Analysis:[/bold]") @@ -331,14 +397,17 @@ def diagnose_failure(self, service_name: str, lines: int = 50) -> str: report.append("") report.append("[bold]Recent Logs:[/bold]") # Show last 20 lines of logs - log_lines = logs.strip().split("\n")[-20:] - for line in log_lines: - if "error" in line.lower() or "fail" in line.lower(): - report.append(f" [red]{line}[/red]") - elif "warn" in line.lower(): - report.append(f" [yellow]{line}[/yellow]") - else: - report.append(f" {line}") + log_lines = logs.strip().split("\n")[-20:] if logs else [] + if not log_lines or (len(log_lines) == 1 and not log_lines[0]): + report.append(" [dim]No logs found for this service[/dim]") + else: + for line in log_lines: + if "error" in line.lower() or "fail" in line.lower(): + report.append(f" [red]{line}[/red]") + elif "warn" in line.lower(): + report.append(f" [yellow]{line}[/yellow]") + else: + report.append(f" {line}") return "\n".join(report) @@ -351,8 +420,7 @@ def show_dependencies(self, service_name: str) -> Tree: Returns: Rich Tree object showing dependencies. """ - if not service_name.endswith(".service"): - service_name = f"{service_name}.service" + service_name = _validate_service_name(service_name) tree = Tree(f"[bold cyan]{service_name}[/bold cyan]") @@ -361,53 +429,70 @@ def show_dependencies(self, service_name: str) -> Tree: ["systemctl", "list-dependencies", service_name, "--no-pager"], capture_output=True, text=True, - timeout=10, + timeout=SYSTEMCTL_TIMEOUT, ) + except FileNotFoundError: + tree.add("[red]systemctl command not found[/red]") + return tree except subprocess.TimeoutExpired: tree.add("[yellow]Timed out fetching dependencies[/yellow]") return tree + if result.returncode != 0: + tree.add(f"[red]Error: {result.stderr.strip()}[/red]") + return tree + lines = result.stdout.strip().split("\n")[1:] # Skip header - # Parse the tree structure + # Parse the tree structure with error recovery current_nodes = {0: tree} for line in lines: if not line.strip(): continue - # Count indentation (each level is 2 spaces or tree chars) - indent = 0 - clean_line = line - for char in line: - if char in " │├└─": - indent += 1 + try: + # Count indentation (handle various tree characters) + indent = 0 + for char in line: + if char in " │├└─●○": + indent += 1 + else: + break + + # Get the service name - remove tree drawing characters + clean_line = re.sub(r"[│├└─●○\s]+", "", line).strip() + if not clean_line: + continue + + # Calculate tree level (approximately 2 chars per level) + level = indent // 2 + + # Color based on unit type + if ".target" in clean_line: + styled = f"[blue]{clean_line}[/blue]" + elif ".socket" in clean_line: + styled = f"[magenta]{clean_line}[/magenta]" + elif ".service" in clean_line: + styled = f"[green]{clean_line}[/green]" + elif ".slice" in clean_line: + styled = f"[cyan]{clean_line}[/cyan]" else: - break + styled = clean_line - # Get the service name - clean_line = re.sub(r"[│├└─\s]+", "", line).strip() - if not clean_line: + # Add to tree with fallback to root + parent_level = max(0, level - 1) + if parent_level in current_nodes: + new_node = current_nodes[parent_level].add(styled) + current_nodes[level] = new_node + else: + # Fallback to root if parent not found + new_node = tree.add(styled) + current_nodes[level] = new_node + except Exception: + # Skip malformed lines continue - level = indent // 2 - - # Color based on state - if ".target" in clean_line: - styled = f"[blue]{clean_line}[/blue]" - elif ".socket" in clean_line: - styled = f"[magenta]{clean_line}[/magenta]" - elif ".service" in clean_line: - styled = f"[green]{clean_line}[/green]" - else: - styled = clean_line - - # Add to tree - parent_level = max(0, level - 1) - if parent_level in current_nodes: - new_node = current_nodes[parent_level].add(styled) - current_nodes[level] = new_node - return tree def generate_unit_file( @@ -462,10 +547,18 @@ def generate_unit_file( if environment: for key, value in environment.items(): - # Quote values that contain spaces or special characters - if " " in value or '"' in value or "'" in value or "\\" in value: - # Escape existing double quotes and backslashes - escaped_value = value.replace("\\", "\\\\").replace('"', '\\"') + # Escape special characters for systemd Environment= + # Must escape: backslash, double quote, dollar sign, backtick + # Must remove: newlines (not supported in Environment=) + needs_quoting = any(c in value for c in ' "$`\n\\') + if needs_quoting: + # Remove newlines (not supported) + escaped_value = value.replace("\n", " ") + # Escape in correct order: backslash first, then others + escaped_value = escaped_value.replace("\\", "\\\\") + escaped_value = escaped_value.replace("$", "\\$") + escaped_value = escaped_value.replace("`", "\\`") + escaped_value = escaped_value.replace('"', '\\"') lines.append(f'Environment={key}="{escaped_value}"') else: lines.append(f"Environment={key}={value}") @@ -541,7 +634,9 @@ def interactive_unit_generator(self) -> str: # Installation instructions console.print("\n[bold]Installation Instructions:[/bold]") - console.print(f"1. Save to: /etc/systemd/system/{service_name}.service") + console.print( + f"1. Save to: sudo tee /etc/systemd/system/{service_name}.service" + ) console.print("2. Reload systemd: sudo systemctl daemon-reload") if start_on_boot: console.print(f"3. Enable service: sudo systemctl enable {service_name}") diff --git a/tests/test_systemd_helper.py b/tests/test_systemd_helper.py index 6f718040..09770437 100644 --- a/tests/test_systemd_helper.py +++ b/tests/test_systemd_helper.py @@ -9,13 +9,17 @@ import pytest from cortex.systemd_helper import ( + SERVICE_NAME_PATTERN, + SYSTEMCTL_TIMEOUT, + JOURNALCTL_TIMEOUT, ServiceState, ServiceStatus, SystemdHelper, - run_deps_command, + _validate_service_name, + run_status_command, run_diagnose_command, + run_deps_command, run_generate_command, - run_status_command, ) @@ -36,14 +40,12 @@ class TestServiceStatusDataclass: """Test the ServiceStatus dataclass.""" def test_basic_creation(self): - status = ServiceStatus( - name="nginx", - state=ServiceState.RUNNING, - ) + status = ServiceStatus(name="nginx", state=ServiceState.RUNNING) assert status.name == "nginx" assert status.state == ServiceState.RUNNING assert status.description == "" assert status.pid is None + assert status.exit_code is None def test_full_creation(self): status = ServiceStatus( @@ -54,38 +56,118 @@ def test_full_creation(self): active_state="active", sub_state="running", pid=1234, - memory="50MB", - cpu="100ms", - started_at="2024-01-01", + memory="50M", + cpu="100ns", + started_at="2024-01-01 12:00:00", exit_code=0, main_pid_code="exited", ) assert status.pid == 1234 - assert status.memory == "50MB" + assert status.memory == "50M" + assert status.started_at == "2024-01-01 12:00:00" + + +class TestValidateServiceName: + """Test service name validation.""" + + def test_valid_simple_name(self): + assert _validate_service_name("nginx") == "nginx.service" + + def test_valid_with_suffix(self): + assert _validate_service_name("nginx.service") == "nginx.service" + + def test_valid_with_hyphen(self): + assert _validate_service_name("my-service") == "my-service.service" + + def test_valid_with_underscore(self): + assert _validate_service_name("my_service") == "my_service.service" + + def test_valid_with_dots(self): + assert _validate_service_name("my.service.name") == "my.service.name.service" + + def test_valid_with_at_sign(self): + # Instantiated services like getty@tty1 + assert _validate_service_name("getty@tty1") == "getty@tty1.service" + + def test_valid_with_numbers(self): + assert _validate_service_name("service123") == "service123.service" + + def test_invalid_empty(self): + with pytest.raises(ValueError, match="cannot be empty"): + _validate_service_name("") + + def test_invalid_shell_chars(self): + with pytest.raises(ValueError, match="Invalid service name"): + _validate_service_name("nginx; rm -rf /") + + def test_invalid_path_separator(self): + with pytest.raises(ValueError, match="Invalid service name"): + _validate_service_name("../../etc/passwd") + + def test_invalid_dollar_sign(self): + with pytest.raises(ValueError, match="Invalid service name"): + _validate_service_name("$HOME/service") + + def test_invalid_backtick(self): + with pytest.raises(ValueError, match="Invalid service name"): + _validate_service_name("`whoami`") + + +class TestServiceNamePattern: + """Test the SERVICE_NAME_PATTERN regex.""" + + def test_valid_patterns(self): + valid_names = [ + "nginx", + "my-service", + "my_service", + "service123", + "My.Service", + "getty@tty1", + "user-1000", + "a", + "ABC123", + ] + for name in valid_names: + assert SERVICE_NAME_PATTERN.match(name), f"{name} should be valid" + + def test_invalid_patterns(self): + invalid_names = [ + "", + " nginx", + "nginx ", + "nginx;rm", + "../test", + "$HOME", + "`cmd`", + "service\nname", + "service\tname", + ] + for name in invalid_names: + # Empty string doesn't match, others shouldn't match the full pattern + if name: + match = SERVICE_NAME_PATTERN.match(name) + # Should either not match or not match the full string + assert match is None or match.group() != name, f"{name!r} should be invalid" class TestSystemdHelperInit: """Test SystemdHelper initialization.""" def test_init_success(self): - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = "systemd 252" - + mock_result = MagicMock(returncode=0) with patch("subprocess.run", return_value=mock_result): helper = SystemdHelper() assert helper is not None def test_init_systemd_not_available(self): - mock_result = MagicMock() - mock_result.returncode = 1 - + mock_result = MagicMock(returncode=1) with patch("subprocess.run", return_value=mock_result): with pytest.raises(RuntimeError, match="systemd is not available"): SystemdHelper() def test_init_systemctl_not_found(self): - with patch("subprocess.run", side_effect=FileNotFoundError()): + with patch("subprocess.run", side_effect=FileNotFoundError): with pytest.raises(RuntimeError, match="systemctl command not found"): SystemdHelper() @@ -96,34 +178,23 @@ def test_init_timeout(self): class TestGetServiceStatus: - """Test the get_service_status method.""" + """Test get_service_status method.""" - @pytest.fixture - def helper(self): - """Create a helper instance with mocked init.""" - with patch.object(SystemdHelper, "_check_systemd_available"): + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): return SystemdHelper() - def test_empty_service_name(self, helper): - with pytest.raises(ValueError, match="Service name cannot be empty"): - helper.get_service_status("") - - def test_running_service(self, helper): - mock_output = """LoadState=loaded -ActiveState=active + def test_get_status_running(self): + helper = self._create_helper() + mock_output = """ActiveState=active SubState=running Description=The NGINX HTTP Server MainPID=1234 +LoadState=loaded MemoryCurrent=52428800 -CPUUsageNSec=1000000000 -ActiveEnterTimestamp=Mon 2024-01-01 00:00:00 UTC -ExecMainStatus=0 -ExecMainCode=exited """ - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = mock_output - + mock_result = MagicMock(returncode=0, stdout=mock_output) with patch("subprocess.run", return_value=mock_result): status = helper.get_service_status("nginx") @@ -132,370 +203,570 @@ def test_running_service(self, helper): assert status.pid == 1234 assert status.description == "The NGINX HTTP Server" - def test_failed_service(self, helper): - mock_output = """LoadState=loaded -ActiveState=failed + def test_get_status_failed(self): + helper = self._create_helper() + mock_output = """ActiveState=failed SubState=failed -Description=My Service -MainPID=0 ExecMainStatus=1 """ - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = mock_output - + mock_result = MagicMock(returncode=0, stdout=mock_output) with patch("subprocess.run", return_value=mock_result): - status = helper.get_service_status("myservice") + status = helper.get_service_status("nginx") assert status.state == ServiceState.FAILED assert status.exit_code == 1 - def test_inactive_service(self, helper): - mock_output = """LoadState=loaded -ActiveState=inactive + def test_get_status_inactive(self): + helper = self._create_helper() + mock_output = """ActiveState=inactive SubState=dead """ - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = mock_output - + mock_result = MagicMock(returncode=0, stdout=mock_output) with patch("subprocess.run", return_value=mock_result): - status = helper.get_service_status("stopped-service") + status = helper.get_service_status("nginx") assert status.state == ServiceState.INACTIVE - def test_activating_service(self, helper): - mock_output = """LoadState=loaded -ActiveState=activating + def test_get_status_activating(self): + helper = self._create_helper() + mock_output = """ActiveState=activating SubState=start """ - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = mock_output - + mock_result = MagicMock(returncode=0, stdout=mock_output) with patch("subprocess.run", return_value=mock_result): - status = helper.get_service_status("starting-service") + status = helper.get_service_status("nginx") assert status.state == ServiceState.ACTIVATING - def test_deactivating_service(self, helper): - mock_output = """LoadState=loaded -ActiveState=deactivating -SubState=stop + def test_get_status_deactivating(self): + helper = self._create_helper() + mock_output = """ActiveState=deactivating +SubState=stop-sigterm """ - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = mock_output - + mock_result = MagicMock(returncode=0, stdout=mock_output) with patch("subprocess.run", return_value=mock_result): - status = helper.get_service_status("stopping-service") + status = helper.get_service_status("nginx") assert status.state == ServiceState.DEACTIVATING - def test_service_suffix_normalization(self, helper): - mock_output = """ActiveState=active -SubState=running -""" - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = mock_output + def test_get_status_invalid_service_name(self): + helper = self._create_helper() + with pytest.raises(ValueError, match="Invalid service name"): + helper.get_service_status("nginx; rm -rf /") + + def test_get_status_command_fails(self): + helper = self._create_helper() + mock_result = MagicMock(returncode=1, stderr="Unit not found") + with patch("subprocess.run", return_value=mock_result): + with pytest.raises(RuntimeError, match="Failed to get service status"): + helper.get_service_status("nginx") - with patch("subprocess.run", return_value=mock_result) as mock_run: - helper.get_service_status("nginx") + def test_get_status_filenotfound(self): + helper = self._create_helper() + with patch("subprocess.run", side_effect=FileNotFoundError): + with pytest.raises(RuntimeError, match="systemctl command not found"): + helper.get_service_status("nginx") - # Check that .service was appended - call_args = mock_run.call_args[0][0] - assert "nginx.service" in call_args + def test_get_status_timeout(self): + helper = self._create_helper() + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("cmd", 10)): + with pytest.raises(RuntimeError, match="Timeout"): + helper.get_service_status("nginx") class TestExplainStatus: - """Test the explain_status method.""" + """Test explain_status method.""" - @pytest.fixture - def helper(self): - with patch.object(SystemdHelper, "_check_systemd_available"): + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): return SystemdHelper() - def test_explain_running_service(self, helper): + def test_explain_running(self): + helper = self._create_helper() mock_output = """ActiveState=active SubState=running +Description=Web Server MainPID=1234 -ActiveEnterTimestamp=Mon 2024-01-01 00:00:00 UTC +ActiveEnterTimestamp=Mon 2024-01-01 12:00:00 UTC """ - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = mock_output - + mock_result = MagicMock(returncode=0, stdout=mock_output) with patch("subprocess.run", return_value=mock_result): explanation = helper.explain_status("nginx") assert "running normally" in explanation assert "1234" in explanation - def test_explain_failed_service(self, helper): + def test_explain_failed(self): + helper = self._create_helper() mock_output = """ActiveState=failed SubState=failed ExecMainStatus=137 """ - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = mock_output - + mock_result = MagicMock(returncode=0, stdout=mock_output) with patch("subprocess.run", return_value=mock_result): - explanation = helper.explain_status("myservice") + explanation = helper.explain_status("nginx") assert "failed" in explanation assert "137" in explanation - assert "SIGKILL" in explanation or "memory" in explanation.lower() + assert "SIGKILL" in explanation or "killed" in explanation.lower() - def test_explain_inactive_service(self, helper): + def test_explain_inactive(self): + helper = self._create_helper() mock_output = """ActiveState=inactive SubState=dead """ - mock_result = MagicMock() - mock_result.returncode = 0 - mock_result.stdout = mock_output - + mock_result = MagicMock(returncode=0, stdout=mock_output) with patch("subprocess.run", return_value=mock_result): - explanation = helper.explain_status("stopped") + explanation = helper.explain_status("nginx") assert "not running" in explanation or "inactive" in explanation class TestExplainExitCode: - """Test the _explain_exit_code method.""" + """Test _explain_exit_code method.""" - @pytest.fixture - def helper(self): - with patch.object(SystemdHelper, "_check_systemd_available"): + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): return SystemdHelper() - @pytest.mark.parametrize( - "exit_code, expected_text", - [ - (1, "General error"), - (127, "Command not found"), - (137, "SIGKILL"), - (139, "Segmentation fault"), - (143, "SIGTERM"), - (999, "check service logs"), - ], - ) - def test_exit_code_explanations(self, helper, exit_code, expected_text): - explanation = helper._explain_exit_code(exit_code) - assert expected_text in explanation + def test_common_codes(self): + helper = self._create_helper() + + # Test specific exit codes + assert "General error" in helper._explain_exit_code(1) + assert "Misuse" in helper._explain_exit_code(2) or "invalid" in helper._explain_exit_code(2).lower() + assert "not executable" in helper._explain_exit_code(126) or "permission" in helper._explain_exit_code(126).lower() + assert "not found" in helper._explain_exit_code(127) + assert "SIGKILL" in helper._explain_exit_code(137) or "killed" in helper._explain_exit_code(137).lower() + assert "Segmentation" in helper._explain_exit_code(139) or "SIGSEGV" in helper._explain_exit_code(139) + assert "SIGTERM" in helper._explain_exit_code(143) or "terminated" in helper._explain_exit_code(143).lower() + + def test_signal_calculation(self): + helper = self._create_helper() + # Exit codes > 128 are signal codes (128 + signal number) + explanation = helper._explain_exit_code(129) # 128 + 1 = SIGHUP + assert "signal" in explanation.lower() + + def test_unknown_code(self): + helper = self._create_helper() + explanation = helper._explain_exit_code(42) + assert "42" in explanation class TestDiagnoseFailure: - """Test the diagnose_failure method.""" + """Test diagnose_failure method.""" - @pytest.fixture - def helper(self): - with patch.object(SystemdHelper, "_check_systemd_available"): + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): return SystemdHelper() - def test_diagnose_with_permission_error(self, helper): + def test_diagnose_with_permission_error(self): + helper = self._create_helper() + + # First call: systemctl show status_output = """ActiveState=failed SubState=failed ExecMainStatus=1 """ - journal_output = """Jan 01 00:00:00 host myservice[1234]: permission denied while opening /etc/myservice.conf -Jan 01 00:00:01 host myservice[1234]: fatal error, exiting -""" - mock_status = MagicMock(returncode=0, stdout=status_output) - mock_journal = MagicMock(returncode=0, stdout=journal_output) + # Second call: journalctl + log_output = "Jan 01 12:00:00 server myapp[1234]: Error: permission denied opening /var/log/app.log" + + def mock_run(cmd, *args, **kwargs): + result = MagicMock() + result.returncode = 0 + if "show" in cmd: + result.stdout = status_output + else: + result.stdout = log_output + return result + + with patch("subprocess.run", side_effect=mock_run): + report = helper.diagnose_failure("myapp") - # diagnose_failure calls get_service_status (which calls subprocess.run) - # then calls subprocess.run for journalctl - with patch("subprocess.run", side_effect=[mock_status, mock_status, mock_journal]): - report = helper.diagnose_failure("myservice") + assert "Permission issue" in report - assert "Permission issue detected" in report - assert "permission" in report.lower() + def test_diagnose_with_port_conflict(self): + helper = self._create_helper() - def test_diagnose_with_port_conflict(self, helper): - status_output = """ActiveState=failed -SubState=failed -""" - journal_output = """Jan 01 00:00:00 host nginx[1234]: bind() to 0.0.0.0:80 failed: address already in use -""" - mock_status = MagicMock(returncode=0, stdout=status_output) - mock_journal = MagicMock(returncode=0, stdout=journal_output) + status_output = "ActiveState=failed\nSubState=failed\n" + log_output = "Error: address already in use :8080" - # diagnose_failure calls get_service_status then journalctl - with patch("subprocess.run", side_effect=[mock_status, mock_status, mock_journal]): - report = helper.diagnose_failure("nginx") + def mock_run(cmd, *args, **kwargs): + result = MagicMock() + result.returncode = 0 + if "show" in cmd: + result.stdout = status_output + else: + result.stdout = log_output + return result - assert "Port conflict detected" in report + with patch("subprocess.run", side_effect=mock_run): + report = helper.diagnose_failure("myapp") - def test_diagnose_with_missing_file(self, helper): - status_output = """ActiveState=failed -SubState=failed -""" - journal_output = """Jan 01 00:00:00 host myapp[1234]: /opt/myapp/bin/run: No such file or directory -""" - mock_status = MagicMock(returncode=0, stdout=status_output) - mock_journal = MagicMock(returncode=0, stdout=journal_output) + assert "Port conflict" in report + + def test_diagnose_lines_validation(self): + helper = self._create_helper() + + status_output = "ActiveState=active\nSubState=running\n" + log_output = "" + + def mock_run(cmd, *args, **kwargs): + result = MagicMock() + result.returncode = 0 + if "show" in cmd: + result.stdout = status_output + else: + result.stdout = log_output + # Verify lines parameter is clamped + if "-n" in cmd: + idx = cmd.index("-n") + lines_val = int(cmd[idx + 1]) + assert 1 <= lines_val <= 1000 + return result + + with patch("subprocess.run", side_effect=mock_run): + # Test negative value gets clamped to 1 + helper.diagnose_failure("myapp", lines=-10) + # Test huge value gets clamped to 1000 + helper.diagnose_failure("myapp", lines=999999) + + def test_diagnose_journalctl_not_found(self): + helper = self._create_helper() + + status_output = "ActiveState=failed\nSubState=failed\n" + + def mock_run(cmd, *args, **kwargs): + if "journalctl" in cmd: + raise FileNotFoundError() + result = MagicMock() + result.returncode = 0 + result.stdout = status_output + return result + + with patch("subprocess.run", side_effect=mock_run): + report = helper.diagnose_failure("myapp") + + assert "journalctl command not found" in report + + def test_diagnose_journalctl_timeout(self): + helper = self._create_helper() + + status_output = "ActiveState=failed\nSubState=failed\n" - with patch("subprocess.run", side_effect=[mock_status, mock_status, mock_journal]): + def mock_run(cmd, *args, **kwargs): + if "journalctl" in cmd: + raise subprocess.TimeoutExpired("journalctl", 30) + result = MagicMock() + result.returncode = 0 + result.stdout = status_output + return result + + with patch("subprocess.run", side_effect=mock_run): report = helper.diagnose_failure("myapp") - assert "Missing file" in report + assert "Timeout" in report class TestShowDependencies: - """Test the show_dependencies method.""" + """Test show_dependencies method.""" - @pytest.fixture - def helper(self): - with patch.object(SystemdHelper, "_check_systemd_available"): + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): return SystemdHelper() - def test_show_dependencies_basic(self, helper): - mock_output = """nginx.service + def test_show_deps_success(self): + helper = self._create_helper() + + # First call: init check, Second call: dependencies + deps_output = """nginx.service ├─system.slice │ └─-.slice -├─network.target -│ └─network-online.target -└─sysinit.target +└─network.target + └─network-pre.target """ - mock_result = MagicMock(returncode=0, stdout=mock_output) - + mock_result = MagicMock(returncode=0, stdout=deps_output) with patch("subprocess.run", return_value=mock_result): tree = helper.show_dependencies("nginx") - # Tree should be a Rich Tree object + # Tree should exist + assert tree is not None + + def test_show_deps_invalid_service_name(self): + helper = self._create_helper() + with pytest.raises(ValueError, match="Invalid service name"): + helper.show_dependencies("nginx; rm -rf /") + + def test_show_deps_filenotfound(self): + helper = self._create_helper() + with patch("subprocess.run", side_effect=FileNotFoundError): + tree = helper.show_dependencies("nginx") + # Should return a tree with error message + assert tree is not None + + def test_show_deps_timeout(self): + helper = self._create_helper() + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("cmd", 10)): + tree = helper.show_dependencies("nginx") + # Should return a tree with timeout message assert tree is not None - assert tree.label is not None class TestGenerateUnitFile: - """Test the generate_unit_file method.""" + """Test generate_unit_file method.""" - @pytest.fixture - def helper(self): - with patch.object(SystemdHelper, "_check_systemd_available"): + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): return SystemdHelper() - def test_basic_generation(self, helper): - content = helper.generate_unit_file( + def test_generate_basic(self): + helper = self._create_helper() + unit = helper.generate_unit_file( description="My Test Service", exec_start="/usr/bin/myapp", ) - assert "[Unit]" in content - assert "Description=My Test Service" in content - assert "[Service]" in content - assert "ExecStart=/usr/bin/myapp" in content - assert "[Install]" in content - assert "WantedBy=multi-user.target" in content - assert "After=network.target" in content + assert "[Unit]" in unit + assert "Description=My Test Service" in unit + assert "[Service]" in unit + assert "ExecStart=/usr/bin/myapp" in unit + assert "[Install]" in unit + assert "WantedBy=multi-user.target" in unit - def test_with_user(self, helper): - content = helper.generate_unit_file( + def test_generate_with_user(self): + helper = self._create_helper() + unit = helper.generate_unit_file( description="My Service", exec_start="/usr/bin/myapp", - user="www-data", + user="nobody", ) - assert "User=www-data" in content + assert "User=nobody" in unit - def test_with_working_directory(self, helper): - content = helper.generate_unit_file( + def test_generate_with_working_dir(self): + helper = self._create_helper() + unit = helper.generate_unit_file( description="My Service", exec_start="/usr/bin/myapp", working_dir="/var/lib/myapp", ) - assert "WorkingDirectory=/var/lib/myapp" in content + assert "WorkingDirectory=/var/lib/myapp" in unit + + def test_generate_with_restart(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + restart=True, + ) + + assert "Restart=on-failure" in unit + assert "RestartSec=5" in unit - def test_with_restart_disabled(self, helper): - content = helper.generate_unit_file( + def test_generate_without_restart(self): + helper = self._create_helper() + unit = helper.generate_unit_file( description="My Service", exec_start="/usr/bin/myapp", restart=False, ) - assert "Restart=on-failure" not in content - assert "RestartSec" not in content + assert "Restart=" not in unit - def test_with_custom_after(self, helper): - content = helper.generate_unit_file( + def test_generate_with_after(self): + helper = self._create_helper() + unit = helper.generate_unit_file( description="My Service", exec_start="/usr/bin/myapp", after=["postgresql.service", "redis.service"], ) - assert "After=postgresql.service redis.service" in content + assert "After=postgresql.service redis.service" in unit - def test_with_environment(self, helper): - content = helper.generate_unit_file( + def test_generate_with_simple_environment(self): + helper = self._create_helper() + unit = helper.generate_unit_file( description="My Service", exec_start="/usr/bin/myapp", - environment={"PORT": "8080", "DEBUG": "false"}, + environment={"FOO": "bar", "DEBUG": "true"}, ) - assert "Environment=PORT=8080" in content - assert "Environment=DEBUG=false" in content + assert "Environment=FOO=bar" in unit + assert "Environment=DEBUG=true" in unit + + def test_generate_with_escaped_environment(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + environment={ + "PATH_WITH_SPACE": "/some path/with spaces", + "DOLLAR_VAR": "$HOME/test", + "BACKTICK_VAR": "`whoami`", + "QUOTE_VAR": 'say "hello"', + "NEWLINE_VAR": "line1\nline2", + "BACKSLASH_VAR": "C:\\Windows\\Path", + }, + ) + + # Check proper escaping + assert 'Environment=PATH_WITH_SPACE="/some path/with spaces"' in unit + assert r'Environment=DOLLAR_VAR="\$HOME/test"' in unit + assert r'Environment=BACKTICK_VAR="\`whoami\`"' in unit + assert r'Environment=QUOTE_VAR="say \"hello\""' in unit + # Newlines should be replaced with spaces + assert "line1 line2" in unit + # Backslashes should be escaped + assert r"\\" in unit + + +class TestInteractiveUnitGenerator: + """Test interactive_unit_generator method.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_interactive_generator(self): + helper = self._create_helper() + + # Mock user inputs + with patch("cortex.systemd_helper.Prompt.ask") as mock_prompt: + with patch("cortex.systemd_helper.Confirm.ask") as mock_confirm: + with patch("cortex.systemd_helper.console.print"): + mock_prompt.side_effect = [ + "my-service", # service name + "My Test Service", # description + "/usr/bin/myapp", # command + "appuser", # user + "/var/lib/myapp", # working dir + ] + mock_confirm.side_effect = [ + False, # run as root + True, # set working dir + True, # restart on failure + True, # start on boot + ] + + unit = helper.interactive_unit_generator() + + assert "Description=My Test Service" in unit + assert "ExecStart=/usr/bin/myapp" in unit class TestRunCommands: """Test the run_* command functions.""" def test_run_status_command(self): - mock_output = """ActiveState=active -SubState=running -""" - mock_init = MagicMock(returncode=0, stdout="systemd 252") - mock_status = MagicMock(returncode=0, stdout=mock_output) + status_output = "ActiveState=active\nSubState=running\n" + mock_result = MagicMock(returncode=0, stdout=status_output) - with patch("subprocess.run", side_effect=[mock_init, mock_status]): - # Should not raise - run_status_command("nginx") + with patch("subprocess.run", return_value=mock_result): + with patch("cortex.systemd_helper.console.print"): + run_status_command("nginx") def test_run_diagnose_command(self): - mock_init = MagicMock(returncode=0, stdout="systemd 252") - mock_status = MagicMock(returncode=0, stdout="ActiveState=failed\n") - mock_journal = MagicMock(returncode=0, stdout="Error log line\n") - - # diagnose_failure calls get_service_status twice (once for status, once for explain) - # plus journalctl - with patch( - "subprocess.run", side_effect=[mock_init, mock_status, mock_status, mock_journal] - ): - run_diagnose_command("myservice") + status_output = "ActiveState=failed\nSubState=failed\n" + log_output = "Some logs here" + + def mock_run(cmd, *args, **kwargs): + result = MagicMock() + result.returncode = 0 + if "show" in cmd: + result.stdout = status_output + else: + result.stdout = log_output + return result + + with patch("subprocess.run", side_effect=mock_run): + with patch("cortex.systemd_helper.console.print"): + run_diagnose_command("nginx", lines=50) def test_run_deps_command(self): - mock_init = MagicMock(returncode=0, stdout="systemd 252") - mock_deps = MagicMock(returncode=0, stdout="nginx.service\n├─network.target\n") + deps_output = "nginx.service\n├─system.slice\n" + mock_result = MagicMock(returncode=0, stdout=deps_output) - with patch("subprocess.run", side_effect=[mock_init, mock_deps]): - run_deps_command("nginx") + with patch("subprocess.run", return_value=mock_result): + with patch("cortex.systemd_helper.console.print"): + run_deps_command("nginx") def test_run_generate_command(self): - mock_init = MagicMock(returncode=0, stdout="systemd 252") - - with patch("subprocess.run", return_value=mock_init): - with patch( - "rich.prompt.Prompt.ask", - side_effect=[ - "myservice", # service name - "My Service", # description - "/usr/bin/myapp", # exec_start - "nobody", # user - "/var/lib/myapp", # working dir - ], - ): - with patch( - "rich.prompt.Confirm.ask", - side_effect=[ - False, # run as root - True, # has workdir - True, # restart - True, # start on boot - ], - ): - run_generate_command() + mock_result = MagicMock(returncode=0) + + with patch("subprocess.run", return_value=mock_result): + with patch("cortex.systemd_helper.Prompt.ask") as mock_prompt: + with patch("cortex.systemd_helper.Confirm.ask") as mock_confirm: + with patch("cortex.systemd_helper.console.print"): + mock_prompt.side_effect = [ + "test-service", + "Test Service", + "/usr/bin/test", + "testuser", + "/var/lib/test", + ] + mock_confirm.side_effect = [False, True, True, True] + run_generate_command() + + +class TestConstants: + """Test module constants.""" + + def test_timeout_constants_reasonable(self): + assert SYSTEMCTL_TIMEOUT > 0 + assert SYSTEMCTL_TIMEOUT <= 60 + assert JOURNALCTL_TIMEOUT > 0 + assert JOURNALCTL_TIMEOUT <= 120 + + +class TestEdgeCases: + """Test edge cases and error handling.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_empty_systemctl_output(self): + helper = self._create_helper() + mock_result = MagicMock(returncode=0, stdout="") + + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + assert status.state == ServiceState.UNKNOWN + + def test_malformed_systemctl_output(self): + helper = self._create_helper() + mock_result = MagicMock(returncode=0, stdout="no equals sign here\n") + + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + # Should handle gracefully + assert status.state == ServiceState.UNKNOWN + + def test_environment_with_all_special_chars(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="Test", + exec_start="/bin/test", + environment={ + "COMPLEX": 'Value with $var, `cmd`, "quotes", \\backslash, and\nnewline', + }, + ) + + # Should produce valid output without crashes + assert "Environment=COMPLEX=" in unit + # Critical characters should be escaped + assert r"\$" in unit + assert r"\`" in unit if __name__ == "__main__": From 44872dcec512d24f447d0e8578db1971e133edbb Mon Sep 17 00:00:00 2001 From: jeremylongshore Date: Tue, 13 Jan 2026 13:51:10 -0600 Subject: [PATCH 7/7] fix: resolve ruff I001 import sorting lint errors Alphabetically sort imports in test_systemd_helper.py to comply with ruff's I001 import block sorting rules. Co-Authored-By: Claude Opus 4.5 --- cortex/systemd_helper.py | 6 ++---- tests/test_systemd_helper.py | 34 +++++++++++++++++++++++++--------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/cortex/systemd_helper.py b/cortex/systemd_helper.py index 872813d3..fcee2db3 100644 --- a/cortex/systemd_helper.py +++ b/cortex/systemd_helper.py @@ -21,7 +21,7 @@ VERSION_CHECK_TIMEOUT = 5 # Valid service name pattern -SERVICE_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9_@.-]+$') +SERVICE_NAME_PATTERN = re.compile(r"^[a-zA-Z0-9_@.-]+$") class ServiceState(Enum): @@ -634,9 +634,7 @@ def interactive_unit_generator(self) -> str: # Installation instructions console.print("\n[bold]Installation Instructions:[/bold]") - console.print( - f"1. Save to: sudo tee /etc/systemd/system/{service_name}.service" - ) + console.print(f"1. Save to: sudo tee /etc/systemd/system/{service_name}.service") console.print("2. Reload systemd: sudo systemctl daemon-reload") if start_on_boot: console.print(f"3. Enable service: sudo systemctl enable {service_name}") diff --git a/tests/test_systemd_helper.py b/tests/test_systemd_helper.py index 09770437..93db7d96 100644 --- a/tests/test_systemd_helper.py +++ b/tests/test_systemd_helper.py @@ -9,17 +9,17 @@ import pytest from cortex.systemd_helper import ( + JOURNALCTL_TIMEOUT, SERVICE_NAME_PATTERN, SYSTEMCTL_TIMEOUT, - JOURNALCTL_TIMEOUT, ServiceState, ServiceStatus, SystemdHelper, _validate_service_name, - run_status_command, - run_diagnose_command, run_deps_command, + run_diagnose_command, run_generate_command, + run_status_command, ) @@ -336,12 +336,26 @@ def test_common_codes(self): # Test specific exit codes assert "General error" in helper._explain_exit_code(1) - assert "Misuse" in helper._explain_exit_code(2) or "invalid" in helper._explain_exit_code(2).lower() - assert "not executable" in helper._explain_exit_code(126) or "permission" in helper._explain_exit_code(126).lower() + assert ( + "Misuse" in helper._explain_exit_code(2) + or "invalid" in helper._explain_exit_code(2).lower() + ) + assert ( + "not executable" in helper._explain_exit_code(126) + or "permission" in helper._explain_exit_code(126).lower() + ) assert "not found" in helper._explain_exit_code(127) - assert "SIGKILL" in helper._explain_exit_code(137) or "killed" in helper._explain_exit_code(137).lower() - assert "Segmentation" in helper._explain_exit_code(139) or "SIGSEGV" in helper._explain_exit_code(139) - assert "SIGTERM" in helper._explain_exit_code(143) or "terminated" in helper._explain_exit_code(143).lower() + assert ( + "SIGKILL" in helper._explain_exit_code(137) + or "killed" in helper._explain_exit_code(137).lower() + ) + assert "Segmentation" in helper._explain_exit_code( + 139 + ) or "SIGSEGV" in helper._explain_exit_code(139) + assert ( + "SIGTERM" in helper._explain_exit_code(143) + or "terminated" in helper._explain_exit_code(143).lower() + ) def test_signal_calculation(self): helper = self._create_helper() @@ -372,7 +386,9 @@ def test_diagnose_with_permission_error(self): ExecMainStatus=1 """ # Second call: journalctl - log_output = "Jan 01 12:00:00 server myapp[1234]: Error: permission denied opening /var/log/app.log" + log_output = ( + "Jan 01 12:00:00 server myapp[1234]: Error: permission denied opening /var/log/app.log" + ) def mock_run(cmd, *args, **kwargs): result = MagicMock()