diff --git a/cortex/cli.py b/cortex/cli.py index b1cfe4a1..64412fbb 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -8,6 +8,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any +import psutil from rich.markdown import Markdown from cortex.api_key_detector import auto_detect_api_key, setup_api_key @@ -24,6 +25,8 @@ from cortex.env_manager import EnvironmentManager, get_env_manager from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType from cortex.llm.interpreter import CommandInterpreter +from cortex.monitor.live_monitor_ui import MonitorUI +from cortex.monitor.resource_monitor import ResourceMonitor from cortex.network_config import NetworkConfig from cortex.notification_manager import NotificationManager from cortex.role_manager import RoleManager @@ -58,6 +61,123 @@ def __init__(self, verbose: bool = False): self.spinner_idx = 0 self.verbose = verbose + def monitor(self, args: argparse.Namespace) -> int: + """Show current system resource usage.""" + resource_monitor = ResourceMonitor(interval=1.0) + duration = getattr(args, "duration", None) + + console.print("System Health:") + + metrics = self._collect_monitoring_metrics(resource_monitor, duration) + if metrics: + console.print(MonitorUI.format_system_health(metrics)) + + self._display_alerts(metrics) + + export_result = self._handle_monitor_export(resource_monitor, args) + if export_result != 0: + return export_result + + self._display_recommendations(resource_monitor) + return 0 + + def _display_recommendations(self, resource_monitor: ResourceMonitor) -> None: + """Display performance recommendations.""" + if not resource_monitor.history or len(resource_monitor.history) <= 1: + return + + recommendations = resource_monitor.get_recommendations() + if recommendations: + console.print("\n[bold cyan]⚡ Performance Recommendations:[/bold cyan]") + for rec in recommendations: + console.print(f" • {rec}") + + def _collect_monitoring_metrics( + self, resource_monitor: ResourceMonitor, duration: float | None + ) -> dict[str, Any] | None: + """Collect monitoring metrics based on duration.""" + + if duration: + # Run monitoring loop for the given duration + resource_monitor.monitor(duration) + + # Show final snapshot after monitoring + summary = resource_monitor.get_summary() + if summary: + return summary["current"] + else: + console.print("[yellow]No monitoring data collected.[/yellow]") + return None + else: + return resource_monitor.sample() + + def _display_alerts(self, metrics: dict[str, Any] | None) -> None: + """Display alerts from metrics.""" + if not metrics: + return + + alerts = metrics.get("alerts", []) + if alerts: + console.print("\n[bold yellow]⚠️ Alerts:[/bold yellow]") + for alert in alerts: + console.print(f" • {alert}") + + def _handle_monitor_export( + self, resource_monitor: ResourceMonitor, args: argparse.Namespace + ) -> int: + """Handle export of monitoring data.""" + if not getattr(args, "export", None): + return 0 + + filename = self._export_monitor_data( + monitor=resource_monitor, + export=args.export, + output=args.output, + ) + + if filename: + cx_print(f"✓ Monitoring data exported to {filename}", "success") + return 0 + else: + self._print_error("Failed to export monitoring data") + return 1 + + def _display_recommendations(self, resource_monitor: ResourceMonitor) -> None: + """Display performance recommendations.""" + if not resource_monitor.history or len(resource_monitor.history) <= 1: + return + + recommendations = resource_monitor.get_recommendations() + if recommendations: + console.print("\n[bold cyan]⚡ Performance Recommendations:[/bold cyan]") + for rec in recommendations: + console.print(f" • {rec}") + + # MONITOR HELPERS + def _get_latest_metrics(self, monitor: ResourceMonitor) -> dict: + """Return latest collected metrics or take a fresh sample.""" + return monitor.history[-1] if monitor.history else monitor.sample() + + def _export_monitor_data( + self, + monitor: ResourceMonitor, + export: str, + output: str | None, + software: str | None = None, + ) -> str | None: + """Export monitoring data safely.""" + from cortex.monitor import export_monitoring_data + + if output: + filename = f"{output}.{export}" + else: + safe_name = "".join(c if c.isalnum() else "_" for c in (software or "monitor")) + filename = f"{safe_name}_monitoring.{export}" + + if export_monitoring_data(monitor, export, filename): # Check if successful + return filename # Return filename on success + return None # Return None on failure + # Define a method to handle Docker-specific permission repairs def docker_permissions(self, args: argparse.Namespace) -> int: """Handle the diagnosis and repair of Docker file permissions. @@ -817,7 +937,23 @@ def install( execute: bool = False, dry_run: bool = False, parallel: bool = False, + monitor: bool = False, + export: str | None = None, + output: str | None = None, ): + + # If --monitor is used, automatically enable execution and initialize the resource monitor. + resource_monitor = None + if monitor and not execute and not dry_run: + print(f"📊 Monitoring enabled for: {software}") + print("Note: Monitoring requires execution. Auto-enabling --execute flag.") + execute = True + + if monitor: + resource_monitor = ResourceMonitor(interval=1.0) + console.print(f"Installing {software}...") # Simple print + cx_print("📊 Monitoring system resources during installation...", "info") + # Validate input first is_valid, error = validate_install_request(software) if not is_valid: @@ -900,6 +1036,22 @@ def progress_callback(current, total, step): print(f"\n[{current}/{total}] {status_emoji} {step.description}") print(f" Command: {step.command}") + # Samples current system resources during each install step and displays live metrics. + if resource_monitor: + metrics = self._get_latest_metrics(resource_monitor) + if current == 1 or "compil" in step.description.lower(): + from cortex.monitor.live_monitor_ui import MonitorUI + + installation_display = MonitorUI.format_installation_metrics(metrics) + console.print("\n" + installation_display) + + # Display alerts if any + alerts = metrics.get("alerts", []) + if alerts: + console.print("\n[yellow]⚠️ Resource Alert:[/yellow]") + for alert in alerts: + console.print(f" • {alert}") + print("\nExecuting commands...") if parallel: @@ -1003,6 +1155,39 @@ def parallel_log_callback(message: str, level: str = "info"): self._print_success(f"{software} installed successfully!") print(f"\nCompleted in {result.total_duration:.2f} seconds") + # Displays the highest CPU and memory usage recorded during the installation. + if monitor and resource_monitor: + summary = resource_monitor.get_summary() + peak = summary.get("peak", {}) + + from cortex.monitor.live_monitor_ui import MonitorUI + + peak_display = MonitorUI.format_peak_usage(peak) + console.print("\n" + peak_display) + + # Display performance recommendation + recommendations = resource_monitor.get_recommendations() + if recommendations: + console.print( + "\n[bold cyan]⚡ Performance Recommendations:[/bold cyan]" + ) + for rec in recommendations: + console.print(f" • {rec}") + + # Export if requested + if export: + filename = self._export_monitor_data( + monitor=resource_monitor, + export=export, + output=output, + software=software, + ) + + if filename: + cx_print(f"✓ Monitoring data exported to {filename}", "success") + else: + self._print_error("Failed to export monitoring data") + # Record successful installation if install_id: history.update_installation(install_id, InstallationStatus.SUCCESS) @@ -1271,13 +1456,6 @@ def _display_summary_table(self, result, style: str, table_class) -> None: console.print("\n[bold]📊 Impact Summary:[/bold]") console.print(summary_table) - def _display_recommendations(self, recommendations: list) -> None: - """Display recommendations.""" - if recommendations: - console.print("\n[bold green]💡 Recommendations:[/bold green]") - for rec in recommendations: - console.print(f" • {rec}") - def _execute_removal(self, package: str, purge: bool = False) -> int: """Execute the actual package removal with audit logging""" import datetime @@ -2965,6 +3143,30 @@ def main(): # Demo command demo_parser = subparsers.add_parser("demo", help="See Cortex in action") + # Monitor command + monitor_parser = subparsers.add_parser( + "monitor", + help="Show real-time system resource usage", + ) + + monitor_parser.add_argument( + "--export", + choices=["json", "csv"], + help="Export monitoring data to a file", + ) + + monitor_parser.add_argument( + "--output", + default="monitoring_data", + help="Output filename (without extension)", + ) + + monitor_parser.add_argument( + "--duration", + type=float, + help="Monitor for specified duration in seconds", + ) + # Wizard command wizard_parser = subparsers.add_parser("wizard", help="Configure API key interactively") @@ -3067,6 +3269,21 @@ def main(): action="store_true", help="Output impact analysis as JSON", ) + install_parser.add_argument( + "--monitor", + action="store_true", + help="Monitor system resources during installation", + ) + install_parser.add_argument( + "--export", + choices=["json", "csv"], + help="Export monitoring data to a file (requires --monitor)", + ) + install_parser.add_argument( + "--output", + default="installation_monitoring", + help="Output filename (without extension, used with --export)", + ) # Import command - import dependencies from package manager files import_parser = subparsers.add_parser( @@ -3566,6 +3783,8 @@ def main(): if args.command == "demo": return cli.demo() + elif args.command == "monitor": + return cli.monitor(args) elif args.command == "wizard": return cli.wizard() elif args.command == "status": @@ -3596,6 +3815,9 @@ def main(): execute=args.execute, dry_run=args.dry_run, parallel=args.parallel, + monitor=args.monitor, + export=args.export, + output=args.output, ) elif args.command == "remove": # Handle --execute flag to override default dry-run diff --git a/cortex/monitor/__init__.py b/cortex/monitor/__init__.py new file mode 100644 index 00000000..ef5331d0 --- /dev/null +++ b/cortex/monitor/__init__.py @@ -0,0 +1,16 @@ +from .exporter import ( + export_monitoring_data, + export_to_csv, + export_to_json, +) +from .live_monitor_ui import LiveMonitorUI, MonitorUI +from .resource_monitor import ResourceMonitor + +__all__ = [ + "ResourceMonitor", + "MonitorUI", + "LiveMonitorUI", + "export_to_csv", + "export_to_json", + "export_monitoring_data", +] diff --git a/cortex/monitor/exporter.py b/cortex/monitor/exporter.py new file mode 100644 index 00000000..b69e7877 --- /dev/null +++ b/cortex/monitor/exporter.py @@ -0,0 +1,309 @@ +""" +Data export functionality for monitoring system. +Handles JSON and CSV export formats. +This module provides data export capabilities for system monitoring data, +supporting both JSON (for structured analysis) and CSV (for spreadsheet +import) formats. It handles data serialization, file operations, and +error handling with specific exceptions. +""" + +import csv +import json +import logging +import os +import time +from collections.abc import Callable +from typing import Any + +# Set up logging +logger = logging.getLogger(__name__) + + +def export_to_json( + history: list[dict[str, Any]], + peak_usage: dict[str, float], + output_file: str, + include_recommendations: bool = False, + get_recommendations_func: Callable[[], list[str]] | None = None, +) -> None: + """ + Export monitoring data to a JSON file. + Args: + history: List of monitoring samples + peak_usage: Peak resource usage dictionary + output_file: Path to output JSON file + include_recommendations: Whether to include performance recommendations + get_recommendations_func: Function to generate recommendations (optional) + Raises: + OSError: If file cannot be written or directory cannot be created + ValueError: If output_file is empty or None + TypeError: If history or peak_usage have wrong types + AttributeError: If get_recommendations_func is invalid when called + """ + # Input validation + if not output_file or not isinstance(output_file, str): + raise ValueError(f"Invalid output_file: {output_file!r}") + + if not isinstance(history, list): + raise TypeError(f"history must be a list, got {type(history).__name__}") + + if not isinstance(peak_usage, dict): + raise TypeError(f"peak_usage must be a dict, got {type(peak_usage).__name__}") + + try: + # Ensure output directory exists + output_dir = os.path.dirname(os.path.abspath(output_file)) + if output_dir: # Only create if there's a directory component + os.makedirs(output_dir, exist_ok=True) + + payload = { + "metadata": { + "export_timestamp": time.time(), + "export_date": time.ctime(), + "samples_count": len(history), + "format_version": "1.0", + }, + "peak_usage": peak_usage, + "samples": history, + } + + # Add recommendations if requested + if include_recommendations and get_recommendations_func: + try: + recommendations = get_recommendations_func() + if isinstance(recommendations, list): + payload["recommendations"] = recommendations + logger.debug("Added recommendations to JSON export") + else: + logger.warning( + "get_recommendations_func returned non-list: %s", + type(recommendations).__name__, + ) + except AttributeError as exc: + logger.warning("Failed to call recommendations function: %s", exc) + except (TypeError, ValueError) as exc: + logger.warning("Error generating recommendations: %s", exc) + except Exception as exc: + logger.warning("Unexpected error generating recommendations: %s", exc) + # Continue without recommendations - don't fail the export + + # Write JSON with proper encoding + with open(output_file, "w", encoding="utf-8") as f: + json.dump(payload, f, indent=2, default=str) + + logger.info("JSON export successful: %s", output_file) + + except OSError as exc: + logger.error("File system error during JSON export to %s: %s", output_file, exc) + raise + except (json.JSONDecodeError, TypeError) as exc: + logger.error("Data serialization error during JSON export: %s", exc) + raise ValueError(f"Data cannot be serialized to JSON: {exc}") from exc + + +def export_to_csv( + history: list[dict[str, Any]], + output_file: str, +) -> None: + """ + Export monitoring history to a CSV file. + Args: + history: List of monitoring samples + output_file: Path to output CSV file + Raises: + OSError: If file cannot be written or directory cannot be created + ValueError: If output_file is empty or None, or history has inconsistent structure + TypeError: If history has wrong type + """ + # Input validation + if not output_file or not isinstance(output_file, str): + raise ValueError(f"Invalid output_file: {output_file!r}") + + if not isinstance(history, list): + raise TypeError(f"history must be a list, got {type(history).__name__}") + + try: + # Ensure output directory exists + output_dir = os.path.dirname(os.path.abspath(output_file)) + if output_dir: # Only create if there's a directory component + os.makedirs(output_dir, exist_ok=True) + + if not history: + # Create file with standard headers for empty data + with open(output_file, "w", newline="", encoding="utf-8") as f: + # Use standard field names for empty data + writer = csv.DictWriter( + f, + fieldnames=[ + "timestamp", + "cpu_percent", + "memory_percent", + "disk_percent", + "alerts", + ], + ) + writer.writeheader() + logger.info("Empty CSV export created: %s", output_file) + return + + # Get all possible fieldnames from all samples + fieldnames_set = set() + for sample in history: + if not isinstance(sample, dict): + raise ValueError(f"Sample must be a dict, got {type(sample).__name__}") + fieldnames_set.update(sample.keys()) + + if not fieldnames_set: + raise ValueError("No fieldnames found in history data") + + fieldnames = sorted(fieldnames_set) + + with open(output_file, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + + for i, sample in enumerate(history): + try: + # Convert any non-serializable values to strings + row = {} + for key in fieldnames: + value = sample.get(key) + if isinstance(value, list): + # Convert lists (like alerts) to semicolon-separated strings + row[key] = "; ".join(str(item) for item in value) + elif value is not None: + row[key] = str(value) + else: + row[key] = "" + writer.writerow(row) + except (KeyError, AttributeError) as exc: + logger.warning("Error processing sample %d: %s", i, exc) + # Skip problematic sample but continue export + + logger.info("CSV export successful: %s (%d rows)", output_file, len(history)) + + except OSError as exc: + logger.error("File system error during CSV export to %s: %s", output_file, exc) + raise + except csv.Error as exc: + logger.error("CSV formatting error: %s", exc) + raise ValueError(f"CSV formatting error: {exc}") from exc + + +def export_monitoring_data( + monitor, + format_type: str, + output_file: str, + include_recommendations: bool = True, +) -> bool: + """ + Convenience function to export monitoring data from a ResourceMonitor instance. + Args: + monitor: ResourceMonitor instance with get_history() and get_peak_usage() methods + format_type: 'json' or 'csv' (case-insensitive) + output_file: Path to output file (must be non-empty string) + include_recommendations: Whether to include recommendations (JSON only) + Returns: + bool: True if successful, False otherwise + Raises: + AttributeError: If monitor doesn't have required methods + ValueError: If format_type is unsupported or output_file is invalid + """ + # Input validation + if not output_file or not isinstance(output_file, str): + logger.error("Invalid output file: %s", output_file) + return False + + if not isinstance(format_type, str): + logger.error("Format type must be a string, got %s", type(format_type).__name__) + return False + + format_type_lower = format_type.lower() + if format_type_lower not in ("json", "csv"): + logger.error("Unsupported export format: %s", format_type) + return False + + try: + # Validate monitor has required methods + if not hasattr(monitor, "get_history"): + raise AttributeError("monitor missing get_history() method") + if not hasattr(monitor, "get_peak_usage"): + raise AttributeError("monitor missing get_peak_usage() method") + + history = monitor.get_history() + peak_usage = monitor.get_peak_usage() + + if format_type_lower == "json": + # Get recommendations function if available and requested + get_recommendations_func = None + if include_recommendations and hasattr(monitor, "get_recommendations"): + get_recommendations_func = monitor.get_recommendations + + export_to_json( + history, + peak_usage, + output_file, + include_recommendations=include_recommendations, + get_recommendations_func=get_recommendations_func, + ) + else: # csv + export_to_csv(history, output_file) + + return True + + except (OSError, ValueError, TypeError, AttributeError) as exc: + logger.error("Export failed for %s: %s", output_file, exc) + return False + except Exception as exc: + logger.error("Unexpected error during export: %s", exc) + return False + + +def export_json( + history: list[dict[str, Any]], + peak_usage: dict[str, float], + output_file: str, + **kwargs: Any, +) -> bool: + """ + Simplified JSON export function that returns success/failure. + Args: + history: List of monitoring samples + peak_usage: Peak resource usage dictionary + output_file: Path to output JSON file + **kwargs: Additional arguments passed to export_to_json + Returns: + bool: True if successful, False otherwise + """ + try: + export_to_json(history, peak_usage, output_file, **kwargs) + return True + except (OSError, ValueError, TypeError, AttributeError) as exc: + logger.error("Simplified JSON export failed: %s", exc) + return False + except Exception as exc: + logger.error("Unexpected error in simplified JSON export: %s", exc) + return False + + +def export_csv( + history: list[dict[str, Any]], + output_file: str, +) -> bool: + """ + Simplified CSV export function that returns success/failure. + Args: + history: List of monitoring samples + output_file: Path to output CSV file + Returns: + bool: True if successful, False otherwise + """ + try: + export_to_csv(history, output_file) + return True + except (OSError, ValueError, TypeError) as exc: + logger.error("Simplified CSV export failed: %s", exc) + return False + except Exception as exc: + logger.error("Unexpected error in simplified CSV export: %s", exc) + return False diff --git a/cortex/monitor/live_monitor_ui.py b/cortex/monitor/live_monitor_ui.py new file mode 100644 index 00000000..422347e9 --- /dev/null +++ b/cortex/monitor/live_monitor_ui.py @@ -0,0 +1,302 @@ +""" +UI components for system monitoring display. +Separates UI logic from monitoring logic. +This module provides user interface components for displaying system +monitoring data. It handles all formatting and display logic, keeping +UI concerns separate from data collection in ResourceMonitor. +""" + +import threading +import time +from typing import Any + +from rich.live import Live +from rich.panel import Panel +from rich.text import Text + +from cortex.monitor.resource_monitor import ResourceMonitor + + +def bar(percent: float, width: int = 10) -> str: + """ + Create a text-based progress bar. + Args: + percent: Percentage value (0-100) + width: Width of the bar in characters + Returns: + Progress bar string with filled and empty portions + Example: + >>> bar(75, 10) + '███████░░░' + """ + percent = max(0, min(100, percent)) + filled = int((percent / 100) * width) + empty = width - filled + return "█" * filled + "░" * empty + + +class MonitorUI: + """ + Static UI formatting methods for monitoring displays. + This class provides methods to format monitoring data for different + contexts (command output, installation displays, summaries). + All methods are static/class methods to emphasize their pure formatting + nature without state. + """ + + @staticmethod + def create_progress_bar(percent: float, width: int = 10) -> str: + """ + Create a text-based progress bar. + Args: + percent: Percentage value (0-100) + width: Width of the bar in characters + Returns: + Progress bar string + Example: + >>> MonitorUI.create_progress_bar(80, 10) + '████████░░' + """ + return bar(percent, width) + + @staticmethod + def format_installing_header(name: str) -> str: + """ + Format the installation header. + Args: + name: Name of the package being installed (e.g., CUDA) + Returns: + Formatted installing header string + Example: + >>> MonitorUI.format_installing_header("CUDA") + 'Installing CUDA...' + """ + return f"Installing {name}..." + + @classmethod + def format_system_health(cls, metrics: dict[str, Any]) -> str: + """ + Format system health output for `cortex monitor` command. + Args: + metrics: Dictionary containing system metrics with keys: + - cpu_percent: CPU usage percentage + - memory_used_gb: Used memory in GB + - memory_total_gb: Total memory in GB + - memory_percent: Memory usage percentage + - disk_used_gb: Used disk space in GB + - disk_total_gb: Total disk space in GB + - disk_percent: Disk usage percentage + - network_down_mb: Download rate in MB/s + - network_up_mb: Upload rate in MB/s + - cpu_cores: Number of CPU cores (optional) + Returns: + Formatted multi-line string + """ + cpu_cores = metrics.get("cpu_cores", "?") + + lines = [ + f" CPU: {metrics['cpu_percent']:.0f}% ({cpu_cores} cores)", + f" RAM: {metrics['memory_used_gb']:.1f}/{metrics['memory_total_gb']:.1f} GB " + f"({metrics['memory_percent']:.0f}%)", + f" Disk: {metrics['disk_used_gb']:.0f}/{metrics['disk_total_gb']:.0f} GB " + f"({metrics['disk_percent']:.0f}%)", + f" Network: {metrics['network_down_mb']:.1f} MB/s ↓ " + f"{metrics['network_up_mb']:.1f} MB/s ↑", + ] + + return "\n".join(lines) + + @classmethod + def format_installation_metrics(cls, metrics: dict[str, Any]) -> str: + """ + Format real-time metrics during installation. + Returns the exact format: + CPU: ████████░░ 80% (compilation) + RAM: ██████████ 12.5/16 GB + Disk: Writing... 2.1 GB/3.5 GB + Args: + metrics: Dictionary containing system metrics + Returns: + Formatted installation metrics string + """ + cpu_bar = cls.create_progress_bar(metrics["cpu_percent"], 10) + ram_bar = cls.create_progress_bar(metrics["memory_percent"], 10) + + lines = [ + f" CPU: {cpu_bar} {metrics['cpu_percent']:.0f}% (compilation)", + f" RAM: {ram_bar} {metrics['memory_used_gb']:.1f}/{metrics['memory_total_gb']:.1f} GB", + f" Disk: Writing... {metrics['disk_used_gb']:.1f}/{metrics['disk_total_gb']:.1f} GB", + ] + + return "\n".join(lines) + + @classmethod + def format_peak_usage(cls, peak_metrics: dict[str, float]) -> str: + """ + Format peak usage summary after installation. + Returns the exact format: + 📊 Peak usage: CPU 95%, RAM 13.2 GB + Args: + peak_metrics: Dictionary containing peak usage values + Returns: + Formatted peak usage string + """ + cpu = peak_metrics.get("cpu_percent", 0) + ram = peak_metrics.get("memory_used_gb", 0) + return f"📊 Peak usage: CPU {cpu:.0f}%, RAM {ram:.1f} GB" + + @classmethod + def format_installation_complete(cls) -> str: + """ + Format installation complete message. + Returns the exact format: + ✓ Installation complete + Returns: + Installation complete message + """ + return "✓ Installation complete" + + +class LiveMonitorUI: + """ + Live-rendered UI for installation monitoring. + Provides a real-time updating display of system metrics during + installations. This is a pure UI component that renders data + provided by ResourceMonitor. + Attributes: + monitor (ResourceMonitor): Monitoring instance providing data + title (str): Display title for the UI + _stop_event (threading.Event): Event to signal UI thread to stop + _thread (threading.Thread | None): Background UI thread + Example: + >>> monitor = ResourceMonitor() + >>> ui = LiveMonitorUI(monitor, "Installing CUDA...") + >>> ui.start() + >>> # Installation happens here + >>> ui.stop() + """ + + def __init__(self, monitor: ResourceMonitor, title: str = "Installing..."): + """ + Initialize a LiveMonitorUI instance. + Args: + monitor: ResourceMonitor instance providing metrics data + title: Display title for the UI panel + """ + self.monitor = monitor + self.title = title + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + + def _render(self) -> Panel: + """ + Render the current monitoring state as a Rich Panel. + Returns: + Panel: Rich Panel object ready for display + Note: + This method is thread-safe and handles missing data gracefully. + It accesses monitor.history with bounds checking. + """ + # Safely access the latest metrics with bounds checking + latest_metrics = self._get_latest_metrics() + if not latest_metrics: + return Panel("Collecting metrics...", border_style="cyan") + + cpu = latest_metrics["cpu_percent"] + ram_used = latest_metrics["memory_used_gb"] + ram_total = latest_metrics["memory_total_gb"] + ram_percent = latest_metrics["memory_percent"] + disk_used = latest_metrics["disk_used_gb"] + disk_total = latest_metrics["disk_total_gb"] + disk_percent = latest_metrics["disk_percent"] + + # Network metrics (if available) + net_down = latest_metrics.get("network_down_mb", 0) + net_up = latest_metrics.get("network_up_mb", 0) + + text = Text() + text.append(f"{self.title}\n\n", style="bold") + + # CPU + text.append(f"CPU: {bar(cpu)} {cpu:.0f}%\n") + + # RAM - add check for zero division + if ram_total > 0: + text.append( + f"RAM: {bar(ram_percent)} {ram_used:.1f}/{ram_total:.1f} GB ({ram_percent:.0f}%)\n" + ) + else: + text.append(f"RAM: {ram_used:.1f} GB (total unavailable)\n") + + # Disk + if disk_total > 0: + text.append( + f"Disk: {bar(disk_percent)} {disk_used:.1f}/{disk_total:.1f} GB ({disk_percent:.0f}%)\n" + ) + else: + text.append(f"Disk: {disk_used:.1f} GB (total unavailable)\n") + + # Network + if net_down > 0 or net_up > 0: + text.append(f"Net: ↓{net_down:.1f} MB/s ↑{net_up:.1f} MB/s\n") + + return Panel(text, border_style="cyan") + + def _get_latest_metrics(self) -> dict[str, Any] | None: + """ + Safely get the latest metrics from monitor history. + Returns: + Latest metrics dictionary or None if no data available + Note: + This method handles thread safety by using a copy of the + history and bounds checking. + """ + try: + # Use get_history to get a copy for thread safety + history = self.monitor.get_history(limit=1) + if history: + return history[0].copy() + except (IndexError, AttributeError, TypeError): + pass + return None + + def start(self) -> None: + """ + Start the monitoring UI. + Spawns a background thread that continuously renders the + monitoring display until stop() is called. + Raises: + RuntimeError: If UI is already running + """ + if self._thread and self._thread.is_alive(): + raise RuntimeError("LiveMonitorUI is already running") + + self._stop_event.clear() + + def loop() -> None: + """Main UI rendering loop.""" + with Live(self._render(), refresh_per_second=4, screen=False) as live: + while not self._stop_event.is_set(): + try: + live.update(self._render()) + time.sleep(0.25) # 4 FPS + except (KeyboardInterrupt, SystemExit): + break + except Exception as exc: + # Log but continue rendering + print(f"UI rendering error: {exc}") + time.sleep(0.5) + + self._thread = threading.Thread(target=loop, daemon=True) + self._thread.start() + + def stop(self) -> None: + """ + Stop the monitoring UI. + Signals the UI thread to stop and waits for it to finish + with a timeout to prevent hanging. + """ + self._stop_event.set() + if self._thread: + self._thread.join(timeout=2.0) + self._thread = None diff --git a/cortex/monitor/resource_monitor.py b/cortex/monitor/resource_monitor.py new file mode 100644 index 00000000..d0966731 --- /dev/null +++ b/cortex/monitor/resource_monitor.py @@ -0,0 +1,446 @@ +""" +Core resource monitoring system. +Collects and tracks CPU, memory, disk, and network usage. + +This module provides real-time system resource monitoring capabilities +for Cortex Linux, enabling users to track performance during operations +like software installations. +""" + +import logging +import time +from typing import Any + +import psutil + +# Default alert threshold constants +DEFAULT_CPU_ALERT_THRESHOLD = 85.0 +DEFAULT_MEMORY_ALERT_THRESHOLD = 90.0 +DEFAULT_DISK_ALERT_THRESHOLD = 95.0 +DEFAULT_MAX_HISTORY_SIZE = 1000 + +logger = logging.getLogger(__name__) + + +class ResourceMonitor: + """ + Collects and tracks system resource usage. + This class provides comprehensive system monitoring capabilities, + tracking CPU, memory, disk, and network metrics over time. It includes + alerting mechanisms for resource thresholds and generates performance + recommendations based on usage patterns. + Attributes: + interval (float): Sampling interval in seconds (default: 1.0) + cpu_threshold (float): CPU usage alert threshold percentage + memory_threshold (float): Memory usage alert threshold percentage + disk_threshold (float): Disk usage alert threshold percentage + max_history_size (int | None): Maximum number of samples to store + history (list[dict[str, Any]]): Collected metric samples + peak_usage (dict[str, float]): Peak values for each metric + Example: + >>> from cortex.monitor import ResourceMonitor + >>> monitor = ResourceMonitor(interval=0.5) + >>> monitor.monitor(duration=5.0) + >>> recommendations = monitor.get_recommendations() + >>> for rec in recommendations: + ... print(rec) + """ + + def __init__( + self, + interval: float = 1.0, + cpu_threshold: float = DEFAULT_CPU_ALERT_THRESHOLD, + memory_threshold: float = DEFAULT_MEMORY_ALERT_THRESHOLD, + disk_threshold: float = DEFAULT_DISK_ALERT_THRESHOLD, + max_history_size: int | None = DEFAULT_MAX_HISTORY_SIZE, + ) -> None: + """ + Initialize a ResourceMonitor instance. + Args: + interval: Sampling interval in seconds (must be > 0) + cpu_threshold: CPU usage percentage that triggers alerts + memory_threshold: Memory usage percentage that triggers alerts + disk_threshold: Disk usage percentage that triggers alerts + max_history_size: Maximum number of samples to store (None = unlimited) + Raises: + ValueError: If interval <= 0 or thresholds are not in valid range (0-100) + Note: + Thresholds are expressed as percentages (0-100). Values outside + this range will be clamped to valid percentage bounds. + """ + if interval <= 0: + raise ValueError(f"Interval must be positive, got {interval}") + + # Validate thresholds are within reasonable bounds + for name, value in [ + ("cpu_threshold", cpu_threshold), + ("memory_threshold", memory_threshold), + ("disk_threshold", disk_threshold), + ]: + if not 0 <= value <= 100: + logger.warning( + "%s %.1f%% is outside recommended range 0-100%%, " "consider adjusting", + name, + value, + ) + + self.interval = interval + self.cpu_threshold = cpu_threshold + self.memory_threshold = memory_threshold + self.disk_threshold = disk_threshold + self.max_history_size = max_history_size + + self.history: list[dict[str, Any]] = [] + + self.peak_usage: dict[str, float] = { + "cpu_percent": 0.0, + "memory_percent": 0.0, + "memory_used_gb": 0.0, + "disk_percent": 0.0, + "disk_used_gb": 0.0, + "disk_read_mb": 0.0, + "disk_write_mb": 0.0, + "network_up_mb": 0.0, + "network_down_mb": 0.0, + } + + self._disk_before: Any = None + self._net_before: Any = None + + # Metric Collection + + def collect_metrics(self) -> dict[str, Any]: + """ + Collect a single snapshot of system metrics. + Gathers comprehensive system metrics including: + - CPU usage and core count + - Memory usage (used, total, percentage) + - Disk usage (used, total, percentage) and I/O rates + - Network I/O rates + Returns: + dict: Dictionary containing all collected metrics with keys: + - timestamp: Unix timestamp of collection + - cpu_percent: CPU usage percentage (0-100) + - cpu_cores: Number of logical CPU cores + - memory_used_gb: Used memory in GB + - memory_total_gb: Total memory in GB + - memory_percent: Memory usage percentage (0-100) + - disk_used_gb: Used disk space in GB + - disk_total_gb: Total disk space in GB + - disk_percent: Disk usage percentage (0-100) + - disk_read_mb: Disk read rate in MB/s + - disk_write_mb: Disk write rate in MB/s + - network_up_mb: Network upload rate in MB/s + - network_down_mb: Network download rate in MB/s + Raises: + OSError: If system metrics cannot be accessed + RuntimeError: If metric calculation fails + Note: + Disk and network rates are calculated relative to previous + sample. First call returns 0.0 for rates. + """ + try: + timestamp = time.time() + + cpu_percent = psutil.cpu_percent(interval=None) + cpu_cores = psutil.cpu_count(logical=True) + + memory = psutil.virtual_memory() + disk_space = psutil.disk_usage("/") + disk_io = psutil.disk_io_counters() + net_io = psutil.net_io_counters() + + memory_used_gb = memory.used / (1024**3) + memory_total_gb = memory.total / (1024**3) + + disk_used_gb = disk_space.used / (1024**3) + disk_total_gb = disk_space.total / (1024**3) + + disk_read_mb = disk_write_mb = 0.0 + network_up_mb = network_down_mb = 0.0 + + if self._disk_before: + disk_read_mb = ( + (disk_io.read_bytes - self._disk_before.read_bytes) / (1024**2) / self.interval + ) + disk_write_mb = ( + (disk_io.write_bytes - self._disk_before.write_bytes) + / (1024**2) + / self.interval + ) + + if self._net_before: + network_up_mb = ( + (net_io.bytes_sent - self._net_before.bytes_sent) / (1024**2) / self.interval + ) + network_down_mb = ( + (net_io.bytes_recv - self._net_before.bytes_recv) / (1024**2) / self.interval + ) + + self._disk_before = disk_io + self._net_before = net_io + + return { + "timestamp": timestamp, + "cpu_percent": cpu_percent, + "cpu_cores": cpu_cores, + "memory_used_gb": memory_used_gb, + "memory_total_gb": memory_total_gb, + "memory_percent": memory.percent, + "disk_used_gb": disk_used_gb, + "disk_total_gb": disk_total_gb, + "disk_percent": disk_space.percent, + "disk_read_mb": disk_read_mb, + "disk_write_mb": disk_write_mb, + "network_up_mb": network_up_mb, + "network_down_mb": network_down_mb, + } + except OSError as exc: + logger.error("Failed to collect system metrics: %s", exc) + raise + except (AttributeError, TypeError, ZeroDivisionError) as exc: + logger.error("Error calculating metrics: %s", exc) + raise RuntimeError(f"Metric calculation failed: {exc}") from exc + + # Alerts & Storage + + def check_alerts(self, metrics: dict[str, Any]) -> list[str]: + """ + Check metrics against configured thresholds and generate alerts. + Args: + metrics: Dictionary of collected metrics from collect_metrics() + Returns: + list[str]: List of alert messages for threshold violations. + Empty list if no thresholds exceeded. + Note: + Only checks CPU, memory, and disk thresholds. Network and + disk I/O alerts are handled in recommendations. + """ + alerts: list[str] = [] + + if metrics["cpu_percent"] >= self.cpu_threshold: + alerts.append(f"High CPU usage detected ({metrics['cpu_percent']:.1f}%)") + + if metrics["memory_percent"] >= self.memory_threshold: + alerts.append(f"High memory usage detected ({metrics['memory_percent']:.1f}%)") + + if metrics["disk_percent"] >= self.disk_threshold: + alerts.append(f"Low disk space detected ({metrics['disk_percent']:.1f}%)") + + return alerts + + def update(self, metrics: dict[str, Any]) -> None: + """ + Update history and peak usage with new metrics. + Args: + metrics: Dictionary of metrics to store + Note: + Maintains history size within max_history_size limit. + Updates peak_usage dictionary with maximum values seen. + """ + if self.max_history_size and len(self.history) >= self.max_history_size: + self.history.pop(0) + + self.history.append(metrics) + + for key in self.peak_usage: + if key in metrics: + self.peak_usage[key] = max(self.peak_usage[key], metrics[key]) + + def sample(self) -> dict[str, Any]: + """ + Collect, check, and store a single sample of system metrics. + Returns: + dict: Metrics dictionary with added 'alerts' key containing + any threshold violation alerts. + Example: + >>> monitor = ResourceMonitor() + >>> sample = monitor.sample() + >>> if sample.get('alerts'): + ... for alert in sample['alerts']: + ... print(f"ALERT: {alert}") + """ + metrics = self.collect_metrics() + metrics["alerts"] = self.check_alerts(metrics) + self.update(metrics) + return metrics + + # Monitoring Loop + + def monitor(self, duration: float | None = None) -> None: + """ + Run continuous monitoring for specified duration. + Args: + duration: Monitoring duration in seconds. If None, runs until + interrupted (typically by KeyboardInterrupt). + Raises: + KeyboardInterrupt: If monitoring interrupted by user + OSError: If system metrics cannot be accessed + RuntimeError: If monitoring loop encounters fatal error + Note: + Use Ctrl+C to interrupt monitoring when duration is None. + First sample may have 0.0 for disk/network rates as they + require a previous sample for calculation. + """ + start_time = time.time() + + try: + while True: + if duration and (time.time() - start_time) >= duration: + break + self.sample() + time.sleep(self.interval) + + except KeyboardInterrupt: + logger.info("Monitoring interrupted by user") + except (OSError, RuntimeError, ValueError) as exc: + logger.error("Monitoring error: %s", exc) + raise + + # Data Accessors (NO UI FORMATTING) + + def get_summary(self) -> dict[str, Any]: + """ + Get comprehensive summary of monitoring session. + Returns: + dict: Summary containing: + - current: Latest metrics sample (including alerts) + - peak: Peak values for all tracked metrics + - samples: Number of samples collected + - duration: Total monitoring duration in seconds + - thresholds: Configured alert thresholds + Returns empty dict if no history available. + """ + if not self.history: + return {} + + latest = self.history[-1] + + return { + "current": latest.copy(), + "peak": self.peak_usage.copy(), + "samples": len(self.history), + "duration": ( + self.history[-1]["timestamp"] - self.history[0]["timestamp"] + if len(self.history) > 1 + else 0.0 + ), + "thresholds": { + "cpu": self.cpu_threshold, + "memory": self.memory_threshold, + "disk": self.disk_threshold, + }, + } + + def get_history(self, limit: int | None = None) -> list[dict[str, Any]]: + """ + Get monitoring history with optional limit. + Args: + limit: Maximum number of recent samples to return. + If None, returns entire history. + Returns: + list: List of metric dictionaries. Returns copy to prevent + modification of internal history. + """ + if limit and limit < len(self.history): + return self.history[-limit:].copy() + return self.history.copy() + + def get_recent_alerts(self, last_n_samples: int = 10) -> list[dict[str, Any]]: + """ + Get recent samples that contain alerts. + Args: + last_n_samples: Number of most recent samples to inspect. + Returns: + list[dict[str, Any]]: List of metric samples that include alerts. + """ + if last_n_samples <= 0: + return [] + + recent = self.get_history(limit=last_n_samples) + return [sample for sample in recent if sample.get("alerts")] + + def get_stats(self) -> dict[str, Any]: + """ + Compute basic statistics from monitoring history. + Returns: + dict: Dictionary containing: + - averages: Average values for numeric metrics + - samples: Total number of samples collected + """ + if not self.history: + return {} + + numeric_keys = [ + "cpu_percent", + "memory_percent", + "disk_percent", + ] + + totals: dict[str, float] = dict.fromkeys(numeric_keys, 0.0) + count = 0 + + for sample in self.history: + for key in numeric_keys: + if key in sample: + totals[key] += sample[key] + count += 1 + + averages = {key: totals[key] / count for key in totals} + + return { + "averages": averages, + "samples": count, + } + + def get_peak_usage(self) -> dict[str, float]: + """ + Get peak usage values for all tracked metrics. + Returns: + dict: Copy of peak_usage dictionary + """ + return self.peak_usage.copy() + + def clear_history(self) -> None: + """ + Clear all stored history and reset peak usage. + Resets: + - history list (emptied) + - peak_usage dictionary (all values set to 0.0) + - internal disk/net counters (set to None) + """ + self.history.clear() + self.peak_usage = dict.fromkeys(self.peak_usage, 0.0) + self._disk_before = None + self._net_before = None + + # Recommendations + + def get_recommendations(self) -> list[str]: + """ + Generate performance recommendations based on usage patterns. + Analyzes peak usage to provide actionable suggestions for + improving system performance and stability. + Returns: + list[str]: List of recommendation messages. If no issues + detected, returns a single positive message. + Note: + Recommendations are based on peak usage during monitoring, + not current values. Run monitor() or multiple sample() calls + before calling for meaningful recommendations. + """ + recs: list[str] = [] + + if self.peak_usage["cpu_percent"] >= self.cpu_threshold: + recs.append("High CPU usage detected — consider lowering system load.") + + if self.peak_usage["memory_percent"] >= self.memory_threshold: + recs.append("High memory usage detected — consider closing applications.") + + if self.peak_usage["disk_percent"] >= self.disk_threshold: + recs.append("Disk usage was very high — ensure sufficient free space.") + + if self.peak_usage["network_up_mb"] > 50 or self.peak_usage["network_down_mb"] > 50: + recs.append("High network usage detected — downloads may slow the system.") + + return recs or ["System resources remained within optimal limits."] diff --git a/pyproject.toml b/pyproject.toml index 2879e774..fade4248 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,8 @@ dependencies = [ "rich>=13.0.0", # Type hints for older Python versions "typing-extensions>=4.0.0", + # System resource monitoring + "psutil>=5.9.0", ] [project.optional-dependencies] diff --git a/tests/monitor/test_exporter.py b/tests/monitor/test_exporter.py new file mode 100644 index 00000000..1e9dd088 --- /dev/null +++ b/tests/monitor/test_exporter.py @@ -0,0 +1,845 @@ +import csv +import json +import os +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, call, patch + +import pytest + +from cortex.monitor.exporter import ( + export_csv, + export_json, + export_monitoring_data, + export_to_csv, + export_to_json, +) +from cortex.monitor.resource_monitor import ResourceMonitor + + +class TestExporter: + """Test cases for monitoring data export functionality.""" + + def test_export_to_json(self, tmp_path): + """Test JSON export with sample data.""" + history = [ + { + "timestamp": 1234567890.0, + "cpu_percent": 50.0, + "memory_percent": 60.0, + "disk_percent": 30.0, + "alerts": [], + }, + { + "timestamp": 1234567891.0, + "cpu_percent": 90.0, + "memory_percent": 85.0, + "disk_percent": 35.0, + "alerts": ["⚠ High CPU usage detected (90.0% > 85.0%)"], + }, + ] + + peak_usage = { + "cpu_percent": 90.0, + "memory_percent": 85.0, + "disk_percent": 35.0, + } + + output_file = tmp_path / "test_output.json" + + # This should not raise an exception + export_to_json(history, peak_usage, str(output_file)) + + assert output_file.exists() + + # Verify JSON content + with open(output_file) as f: + data = json.load(f) + + assert "metadata" in data + assert "peak_usage" in data + assert "samples" in data + assert data["peak_usage"]["cpu_percent"] == pytest.approx(90.0, rel=1e-9) + assert len(data["samples"]) == 2 + + def test_export_to_csv(self, tmp_path): + """Test CSV export with sample data.""" + history = [ + { + "timestamp": 1234567890.0, + "cpu_percent": 50.0, + "memory_percent": 60.0, + "disk_percent": 30.0, + }, + { + "timestamp": 1234567891.0, + "cpu_percent": 90.0, + "memory_percent": 85.0, + "disk_percent": 35.0, + "alerts": ["CPU alert"], + }, + ] + + output_file = tmp_path / "test_output.csv" + + # This should not raise an exception + export_to_csv(history, str(output_file)) + + assert output_file.exists() + + # Verify CSV content + with open(output_file) as f: + reader = csv.DictReader(f) + rows = list(reader) + + assert len(rows) == 2 + # CSV stores as strings, convert to float for comparison + assert float(rows[0]["cpu_percent"]) == pytest.approx(50.0, rel=1e-9) + assert float(rows[1]["cpu_percent"]) == pytest.approx(90.0, rel=1e-9) + + def test_export_to_csv_empty_history(self, tmp_path): + """Test CSV export with empty history.""" + history = [] + output_file = tmp_path / "empty.csv" + + export_to_csv(history, str(output_file)) + + assert output_file.exists() + + # Should create file with just headers + with open(output_file) as f: + content = f.read() + + assert "timestamp" in content + + def test_export_to_json_with_recommendations(self, tmp_path): + """Test JSON export with recommendations.""" + history = [{"cpu_percent": 90.0}] + peak_usage = {"cpu_percent": 90.0} + + def mock_recommendations(): + return ["High CPU usage detected"] + + output_file = tmp_path / "test_with_recs.json" + + export_to_json( + history, + peak_usage, + str(output_file), + include_recommendations=True, + get_recommendations_func=mock_recommendations, + ) + + with open(output_file) as f: + data = json.load(f) + + assert "recommendations" in data + assert len(data["recommendations"]) == 1 + + def test_export_monitoring_data_json(self, tmp_path): + """Test export_monitoring_data with JSON format.""" + monitor = ResourceMonitor() + + # Mock the methods since ResourceMonitor might not have real data + monitor.get_history = MagicMock( + return_value=[ + { + "timestamp": 1234567890.0, + "cpu_percent": 75.0, + "memory_percent": 65.0, + "alerts": [], + } + ] + ) + monitor.get_peak_usage = MagicMock( + return_value={"cpu_percent": 75.0, "memory_percent": 65.0} + ) + monitor.get_recommendations = MagicMock(return_value=[]) + + output_file = tmp_path / "monitor_data.json" + result = export_monitoring_data(monitor, "json", str(output_file)) + + assert result is True + assert output_file.exists() + + def test_export_monitoring_data_csv(self, tmp_path): + """Test export_monitoring_data with CSV format.""" + monitor = ResourceMonitor() + + # Mock the methods + monitor.get_history = MagicMock( + return_value=[ + { + "timestamp": 1234567890.0, + "cpu_percent": 75.0, + "memory_percent": 65.0, + } + ] + ) + monitor.get_peak_usage = MagicMock(return_value={}) + + output_file = tmp_path / "monitor_data.csv" + result = export_monitoring_data(monitor, "csv", str(output_file)) + + assert result is True + assert output_file.exists() + + def test_export_monitoring_data_invalid_format(self): + """Test export_monitoring_data with invalid format.""" + monitor = ResourceMonitor() + + # Mock minimal methods + monitor.get_history = MagicMock(return_value=[]) + monitor.get_peak_usage = MagicMock(return_value={}) + + with tempfile.NamedTemporaryFile() as tmp: + result = export_monitoring_data(monitor, "invalid", tmp.name) + + assert result is False + + def test_export_json_handles_complex_data(self, tmp_path): + """Test JSON export handles complex data types.""" + history = [ + { + "timestamp": 1234567890.0, + "cpu_percent": 50.0, + "alerts": ["Alert 1", "Alert 2"], + "nested": {"key": "value"}, + } + ] + + peak_usage = {"cpu_percent": 50.0} + + output_file = tmp_path / "complex.json" + export_to_json(history, peak_usage, str(output_file)) + + assert output_file.exists() + + with open(output_file) as f: + data = json.load(f) + + # Should handle lists and nested dicts + assert len(data["samples"][0]["alerts"]) == 2 + + def test_export_csv_handles_missing_fields(self, tmp_path): + """Test CSV export handles samples with different fields.""" + history = [ + {"timestamp": 1, "cpu_percent": 50.0}, + {"timestamp": 2, "cpu_percent": 60.0, "memory_percent": 70.0}, + {"timestamp": 3, "disk_percent": 40.0}, + ] + + output_file = tmp_path / "mixed_fields.csv" + export_to_csv(history, str(output_file)) + + assert output_file.exists() + + with open(output_file) as f: + reader = csv.DictReader(f) + rows = list(reader) + + # Should have all 3 rows + assert len(rows) == 3 + # Should have all fieldnames + assert "cpu_percent" in reader.fieldnames + assert "memory_percent" in reader.fieldnames + assert "disk_percent" in reader.fieldnames + + def test_export_csv_alerts_conversion(self, tmp_path): + """Test CSV export converts alert lists to strings.""" + history = [ + { + "timestamp": 1234567890.0, + "cpu_percent": 90.0, + "alerts": ["CPU alert", "Memory alert"], + } + ] + + output_file = tmp_path / "alerts.csv" + export_to_csv(history, str(output_file)) + + with open(output_file) as f: + reader = csv.DictReader(f) + rows = list(reader) + + # Alerts should be converted to semicolon-separated string + assert "CPU alert; Memory alert" in rows[0]["alerts"] + + def test_export_monitoring_data_no_history(self, tmp_path): + """Test export_monitoring_data with no history.""" + monitor = ResourceMonitor() + + # Mock methods to return empty data + monitor.get_history = MagicMock(return_value=[]) + monitor.get_peak_usage = MagicMock(return_value={}) + monitor.get_recommendations = MagicMock(return_value=[]) + + output_file = tmp_path / "empty.json" + result = export_monitoring_data(monitor, "json", str(output_file)) + + assert result is True + assert output_file.exists() + + # Should create valid JSON even with empty history + with open(output_file) as f: + data = json.load(f) + + assert data["metadata"]["samples_count"] == 0 + + def test_export_to_json_handles_write_error(self, tmp_path): + """Test export_to_json handles file write errors.""" + history = [{"timestamp": 1.0, "cpu_percent": 50.0}] + peak_usage = {"cpu_percent": 50.0} + + # Test that it raises OSError as documented + output_file = tmp_path / "test.json" + + # Make the file read-only to cause a write error + import os + + output_file.touch() + os.chmod(output_file, 0o444) # Read-only + + try: + # Should raise OSError + with pytest.raises(OSError): + export_to_json(history, peak_usage, str(output_file)) + finally: + # Restore permissions for cleanup + os.chmod(output_file, 0o755) + + def test_export_to_csv_handles_write_error(self, tmp_path): + """Test export_to_csv handles file write errors.""" + history = [{"timestamp": 1.0, "cpu_percent": 50.0}] + + # Test that it raises OSError as documented + output_file = tmp_path / "test.csv" + + # Make the file read-only to cause a write error + import os + + output_file.touch() + os.chmod(output_file, 0o444) # Read-only + + try: + # Should raise OSError + with pytest.raises(OSError): + export_to_csv(history, str(output_file)) + finally: + # Restore permissions for cleanup + os.chmod(output_file, 0o755) + + def test_export_monitoring_data_invalid_format_handling(self): + """Test export_monitoring_data with invalid format.""" + monitor = ResourceMonitor() + + # Mock methods + monitor.get_history = MagicMock(return_value=[{"timestamp": 1.0}]) + monitor.get_peak_usage = MagicMock(return_value={}) + + # Test with invalid format - should return False + result = export_monitoring_data(monitor, "invalid_format", "test.txt") + assert result is False + + def test_export_monitoring_data_empty_monitor(self, tmp_path): + """Test export_monitoring_data with empty monitor.""" + monitor = ResourceMonitor() + + # Mock methods to return empty data + monitor.get_history = MagicMock(return_value=[]) + monitor.get_peak_usage = MagicMock(return_value={}) + monitor.get_recommendations = MagicMock(return_value=[]) + + output_file = tmp_path / "test.json" + result = export_monitoring_data(monitor, "json", str(output_file)) + + # Should succeed even with empty monitor + assert result is True + + def test_export_monitoring_data_invalid_path(self): + """Test export_monitoring_data with invalid path.""" + monitor = ResourceMonitor() + + # Mock methods + monitor.get_history = MagicMock(return_value=[{"timestamp": 1.0}]) + monitor.get_peak_usage = MagicMock(return_value={}) + + # Test with None path - should return False + result = export_monitoring_data(monitor, "json", "") + assert result is False + + # Test with empty path - should return False + result = export_monitoring_data(monitor, "json", "") + assert result is False + + def test_export_monitoring_data_export_functions_fail(self, monkeypatch): + """Test when underlying export functions raise exceptions.""" + monitor = ResourceMonitor() + + # Mock methods + monitor.get_history = MagicMock(return_value=[{"timestamp": 1.0}]) + monitor.get_peak_usage = MagicMock(return_value={}) + + # Make export_to_json raise an exception + def mock_export_to_json(*args, **kwargs): + raise OSError("Simulated write error") + + monkeypatch.setattr("cortex.monitor.exporter.export_to_json", mock_export_to_json) + + # Should catch the exception and return False + result = export_monitoring_data(monitor, "json", "test.json") + assert result is False + + def test_export_json_simplified_api(self, tmp_path): + """Test the simplified export_json API.""" + history = [{"timestamp": 1.0, "cpu_percent": 50.0}] + peak_usage = {"cpu_percent": 50.0} + + output_file = tmp_path / "simple.json" + result = export_json(history, peak_usage, str(output_file)) + + assert result is True + assert output_file.exists() + + def test_export_csv_simplified_api(self, tmp_path): + """Test the simplified export_csv API.""" + history = [{"timestamp": 1.0, "cpu_percent": 50.0}] + + output_file = tmp_path / "simple.csv" + result = export_csv(history, str(output_file)) + + assert result is True + assert output_file.exists() + + def test_export_json_simplified_api_failure(self, monkeypatch): + """Test the simplified export_json API returns False on failure.""" + + # Mock export_to_json to raise an exception + def mock_export_to_json(*args, **kwargs): + raise OSError("Simulated error") + + monkeypatch.setattr("cortex.monitor.exporter.export_to_json", mock_export_to_json) + + history = [{"timestamp": 1.0}] + peak_usage = {} + + result = export_json(history, peak_usage, "test.json") + assert result is False + + def test_export_csv_simplified_api_failure(self, monkeypatch): + """Test the simplified export_csv API returns False on failure.""" + + # Mock export_to_csv to raise an exception + def mock_export_to_csv(*args, **kwargs): + raise OSError("Simulated error") + + monkeypatch.setattr("cortex.monitor.exporter.export_to_csv", mock_export_to_csv) + + history = [{"timestamp": 1.0}] + + result = export_csv(history, "test.csv") + assert result is False + + def test_export_monitoring_data_missing_methods(self): + class BadMonitor: + pass + + assert export_monitoring_data(BadMonitor(), "json", "out.json") is False + + def test_export_monitoring_data_invalid_format_type(self): + monitor = ResourceMonitor() + assert export_monitoring_data(monitor, 123, "out.json") is False + + # NEW TESTS TO INCREASE COVERAGE + + def test_export_to_json_invalid_output_file(self): + """Test export_to_json with invalid output_file.""" + history = [{"timestamp": 1.0}] + peak_usage = {} + + with pytest.raises(ValueError): + export_to_json(history, peak_usage, "") + + with pytest.raises(ValueError): + export_to_json(history, peak_usage, None) + + def test_export_to_json_invalid_history_type(self): + """Test export_to_json with invalid history type.""" + peak_usage = {} + + with pytest.raises(TypeError): + export_to_json("not a list", peak_usage, "test.json") + + with pytest.raises(TypeError): + export_to_json({"not": "a list"}, peak_usage, "test.json") + + def test_export_to_json_invalid_peak_usage_type(self): + """Test export_to_json with invalid peak_usage type.""" + history = [{"timestamp": 1.0}] + + with pytest.raises(TypeError): + export_to_json(history, "not a dict", "test.json") + + with pytest.raises(TypeError): + export_to_json(history, ["not", "a", "dict"], "test.json") + + def test_export_to_csv_invalid_output_file(self): + """Test export_to_csv with invalid output_file.""" + history = [{"timestamp": 1.0}] + + with pytest.raises(ValueError): + export_to_csv(history, "") + + with pytest.raises(ValueError): + export_to_csv(history, None) + + def test_export_to_csv_invalid_history_type(self): + """Test export_to_csv with invalid history type.""" + with pytest.raises(TypeError): + export_to_csv("not a list", "test.csv") + + with pytest.raises(TypeError): + export_to_csv({"not": "a list"}, "test.csv") + + def test_export_to_csv_invalid_sample_type(self): + """Test export_to_csv with invalid sample in history.""" + history = [{"timestamp": 1.0}, "not a dict", {"timestamp": 2.0}] + + output_file = "test.csv" + with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: + output_file = f.name + + try: + with pytest.raises(ValueError): + export_to_csv(history, output_file) + finally: + os.unlink(output_file) + + def test_export_to_csv_empty_fieldnames(self): + """Test export_to_csv with empty fieldnames.""" + history = [{}] # Empty dict + + output_file = "test.csv" + with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: + output_file = f.name + + try: + with pytest.raises(ValueError): + export_to_csv(history, output_file) + finally: + os.unlink(output_file) + + def test_export_to_json_with_recommendations_non_list(self, tmp_path): + """Test JSON export when recommendations function returns non-list.""" + history = [{"cpu_percent": 90.0}] + peak_usage = {"cpu_percent": 90.0} + + def mock_recommendations(): + return "not a list" # Should trigger warning + + output_file = tmp_path / "test.json" + + with patch("cortex.monitor.exporter.logger") as mock_logger: + export_to_json( + history, + peak_usage, + str(output_file), + include_recommendations=True, + get_recommendations_func=mock_recommendations, + ) + + # Verify warning was logged + assert mock_logger.warning.called + + def test_export_to_json_with_recommendations_exception(self, tmp_path): + """Test JSON export when recommendations function raises exception.""" + history = [{"cpu_percent": 90.0}] + peak_usage = {"cpu_percent": 90.0} + + def mock_recommendations(): + raise AttributeError("Simulated attribute error") + + output_file = tmp_path / "test.json" + + with patch("cortex.monitor.exporter.logger") as mock_logger: + export_to_json( + history, + peak_usage, + str(output_file), + include_recommendations=True, + get_recommendations_func=mock_recommendations, + ) + + # Verify warning was logged but export succeeded + assert mock_logger.warning.called + assert output_file.exists() + + def test_export_to_json_serialization_error(self, tmp_path): + """Test JSON export with unserializable data.""" + history = [{"timestamp": 1.0, "func": lambda x: x}] # Can't serialize function + + class BadObject: + def __repr__(self): + raise TypeError("Can't serialize") + + peak_usage = {"obj": BadObject()} + + output_file = tmp_path / "test.json" + + with pytest.raises(ValueError): + export_to_json(history, peak_usage, str(output_file)) + + def test_export_to_csv_csv_error(self, tmp_path): + """Test CSV export with CSV formatting error.""" + history = [{"timestamp": 1.0}] + output_file = tmp_path / "test.csv" + + # Mock csv.DictWriter to raise csv.Error + with patch("csv.DictWriter") as mock_writer: + mock_writer.side_effect = csv.Error("Simulated CSV error") + + with pytest.raises(ValueError): + export_to_csv(history, str(output_file)) + + def test_export_monitoring_data_with_recommendations_disabled(self, tmp_path): + """Test export_monitoring_data with recommendations disabled.""" + monitor = ResourceMonitor() + + monitor.get_history = MagicMock(return_value=[{"timestamp": 1.0}]) + monitor.get_peak_usage = MagicMock(return_value={}) + monitor.get_recommendations = MagicMock(return_value=["rec1", "rec2"]) + + output_file = tmp_path / "test.json" + + # Test with recommendations disabled + result = export_monitoring_data( + monitor, "json", str(output_file), include_recommendations=False + ) + + assert result is True + assert output_file.exists() + + with open(output_file) as f: + data = json.load(f) + + # Recommendations should not be included + assert "recommendations" not in data + + def test_export_monitoring_data_no_recommendations_method(self, tmp_path): + """Test export_monitoring_data when monitor has no get_recommendations method.""" + monitor = ResourceMonitor() + + monitor.get_history = MagicMock(return_value=[{"timestamp": 1.0}]) + monitor.get_peak_usage = MagicMock(return_value={}) + # Don't mock get_recommendations + + output_file = tmp_path / "test.json" + + result = export_monitoring_data(monitor, "json", str(output_file)) + + assert result is True + assert output_file.exists() + + def test_export_monitoring_data_raises_attribute_error(self): + """Test export_monitoring_data when monitor missing required methods.""" + + class BadMonitor: + pass + + # The function catches AttributeError and returns False + result = export_monitoring_data(BadMonitor(), "json", "test.json") + assert result is False # Should return False, not raise + + def test_export_monitoring_data_raises_other_exceptions(self, tmp_path): + """Test export_monitoring_data catches other exceptions.""" + monitor = ResourceMonitor() + + monitor.get_history = MagicMock(side_effect=RuntimeError("Simulated error")) + monitor.get_peak_usage = MagicMock(return_value={}) + + output_file = tmp_path / "test.json" + + result = export_monitoring_data(monitor, "json", str(output_file)) + + assert result is False + + def test_export_json_with_kwargs(self, tmp_path): + """Test simplified export_json with additional kwargs.""" + history = [{"timestamp": 1.0}] + peak_usage = {} + + def mock_recommendations(): + return ["Test recommendation"] + + output_file = tmp_path / "test.json" + + result = export_json( + history, + peak_usage, + str(output_file), + include_recommendations=True, + get_recommendations_func=mock_recommendations, + ) + + assert result is True + assert output_file.exists() + + def test_export_json_simplified_catches_attribute_error(self, monkeypatch): + """Test export_json catches AttributeError.""" + + def mock_export_to_json(*args, **kwargs): + raise AttributeError("Simulated attribute error") + + monkeypatch.setattr("cortex.monitor.exporter.export_to_json", mock_export_to_json) + + result = export_json([{}], {}, "test.json") + assert result is False + + def test_export_csv_simplified_catches_type_error(self, monkeypatch): + """Test export_csv catches TypeError.""" + + def mock_export_to_csv(*args, **kwargs): + raise TypeError("Simulated type error") + + monkeypatch.setattr("cortex.monitor.exporter.export_to_csv", mock_export_to_csv) + + result = export_csv([{}], "test.csv") + assert result is False + + def test_export_to_json_with_directory_creation(self, tmp_path): + """Test export_to_json creates directory if needed.""" + history = [{"timestamp": 1.0}] + peak_usage = {} + + # Create a file in a non-existent directory + output_file = tmp_path / "new_dir" / "subdir" / "test.json" + + export_to_json(history, peak_usage, str(output_file)) + + assert output_file.exists() + assert output_file.parent.exists() + + def test_export_to_csv_with_directory_creation(self, tmp_path): + """Test export_to_csv creates directory if needed.""" + history = [{"timestamp": 1.0}] + + # Create a file in a non-existent directory + output_file = tmp_path / "new_dir" / "subdir" / "test.csv" + + export_to_csv(history, str(output_file)) + + assert output_file.exists() + assert output_file.parent.exists() + + def test_export_to_json_handles_none_values(self, tmp_path): + """Test JSON export handles None values in recommendations.""" + history = [{"timestamp": 1.0}] + peak_usage = {} + + def mock_recommendations(): + return None + + output_file = tmp_path / "test.json" + + with patch("cortex.monitor.exporter.logger") as mock_logger: + export_to_json( + history, + peak_usage, + str(output_file), + include_recommendations=True, + get_recommendations_func=mock_recommendations, + ) + + # Should log warning + assert mock_logger.warning.called + + def test_export_to_csv_processes_sample_error(self, tmp_path, caplog): + """Test CSV export continues when processing a sample fails.""" + history = [ + {"timestamp": 1.0, "cpu_percent": 50.0}, + {"timestamp": 2.0, "cpu_percent": 60.0}, # This will cause error + {"timestamp": 3.0, "cpu_percent": 70.0}, + ] + + # Create a mock sample that raises error during processing + class BadSample(dict): + def get(self, key): + if key == "timestamp": + return 2.0 + raise AttributeError("Simulated attribute error") + + history[1] = BadSample() + + output_file = tmp_path / "test.csv" + + # Should not raise exception + export_to_csv(history, str(output_file)) + + assert output_file.exists() + + # Should have logged warning + assert "Error processing sample" in caplog.text + + def test_export_monitoring_data_case_insensitive_format(self, tmp_path): + """Test export_monitoring_data handles case-insensitive format.""" + monitor = ResourceMonitor() + + monitor.get_history = MagicMock(return_value=[{"timestamp": 1.0}]) + monitor.get_peak_usage = MagicMock(return_value={}) + + # Test uppercase format + output_file = tmp_path / "test.json" + result = export_monitoring_data(monitor, "JSON", str(output_file)) + assert result is True + + # Test mixed case + output_file2 = tmp_path / "test2.json" + result2 = export_monitoring_data(monitor, "Json", str(output_file2)) + assert result2 is True + + def test_export_to_json_logs_success(self, tmp_path, caplog): + """Test export_to_json logs success message.""" + history = [{"timestamp": 1.0}] + peak_usage = {} + output_file = tmp_path / "test.json" + + with caplog.at_level("INFO"): + export_to_json(history, peak_usage, str(output_file)) + + assert "JSON export successful" in caplog.text + + def test_export_to_csv_logs_success(self, tmp_path, caplog): + """Test export_to_csv logs success message.""" + history = [{"timestamp": 1.0}] + output_file = tmp_path / "test.csv" + + with caplog.at_level("INFO"): + export_to_csv(history, str(output_file)) + + assert "CSV export successful" in caplog.text + + def test_export_monitoring_data_with_none_monitor(self): + """Test export_monitoring_data with None monitor.""" + result = export_monitoring_data(None, "json", "test.json") + assert result is False + + def test_export_to_json_with_recommendations_unexpected_error(self, tmp_path): + """Test JSON export when recommendations function raises unexpected error.""" + history = [{"cpu_percent": 90.0}] + peak_usage = {"cpu_percent": 90.0} + + def mock_recommendations(): + raise Exception("Simulated unexpected error") + + output_file = tmp_path / "test.json" + + # Should not raise exception, just log warning + export_to_json( + history, + peak_usage, + str(output_file), + include_recommendations=True, + get_recommendations_func=mock_recommendations, + ) + + assert output_file.exists() diff --git a/tests/monitor/test_live_monitor_ui.py b/tests/monitor/test_live_monitor_ui.py new file mode 100644 index 00000000..c5d88c2f --- /dev/null +++ b/tests/monitor/test_live_monitor_ui.py @@ -0,0 +1,154 @@ +""" +Tests for the live monitor UI module. +""" + +import re +import time +from unittest.mock import patch + +import pytest +from rich.panel import Panel + +from cortex.monitor.live_monitor_ui import LiveMonitorUI, MonitorUI, bar +from cortex.monitor.resource_monitor import ResourceMonitor + + +class TestBarFunction: + """Tests for the bar() function.""" + + def test_bar_normal_percentage(self): + assert bar(0, 10) == "░░░░░░░░░░" + assert bar(50, 10) == "█████░░░░░" + assert bar(100, 10) == "██████████" + assert bar(25, 8) == "██░░░░░░" + + def test_bar_edge_cases(self): + assert bar(-10, 10) == "░░░░░░░░░░" + assert bar(150, 10) == "██████████" + assert bar(50, 20) == "██████████░░░░░░░░░░" + assert bar(30, 4) == "█░░░" + + def test_bar_precise_values(self): + assert bar(33, 10) == "███░░░░░░░" + assert bar(67, 10) == "██████░░░░" + + +class TestMonitorUI: + """Tests for MonitorUI formatting helpers.""" + + def test_create_progress_bar(self): + assert MonitorUI.create_progress_bar(0, 10) == "░░░░░░░░░░" + assert MonitorUI.create_progress_bar(100, 10) == "██████████" + assert MonitorUI.create_progress_bar(50, 4) == "██░░" + + def test_format_system_health(self): + metrics = { + "cpu_percent": 45.0, + "cpu_cores": 4, + "memory_used_gb": 8.2, + "memory_total_gb": 16.0, + "memory_percent": 51.0, + "disk_used_gb": 120.0, + "disk_total_gb": 500.0, + "disk_percent": 24.0, + "network_down_mb": 2.5, + "network_up_mb": 0.8, + } + + output = MonitorUI.format_system_health(metrics) + assert "CPU:" in output + assert "RAM:" in output + assert "Disk:" in output + assert "Network:" in output + + def test_format_installation_metrics(self): + metrics = { + "cpu_percent": 80.0, + "memory_used_gb": 12.5, + "memory_total_gb": 16.0, + "memory_percent": 78.125, + "disk_used_gb": 2.1, + "disk_total_gb": 3.5, + } + + output = MonitorUI.format_installation_metrics(metrics) + assert "80%" in output + assert "12.5/16.0" in output + assert "2.1/3.5" in output + assert "█" in output + + def test_format_peak_usage(self): + peak = {"cpu_percent": 95.0, "memory_used_gb": 13.2} + assert MonitorUI.format_peak_usage(peak) == "📊 Peak usage: CPU 95%, RAM 13.2 GB" + + def test_format_installation_complete(self): + assert MonitorUI.format_installation_complete() == "✓ Installation complete" + + +class TestLiveMonitorUI: + """Tests for LiveMonitorUI behavior.""" + + def test_get_latest_metrics_empty(self): + monitor = ResourceMonitor() + ui = LiveMonitorUI(monitor) + assert ui._get_latest_metrics() is None + + def test_get_latest_metrics_present(self): + monitor = ResourceMonitor() + monitor.history.append( + { + "cpu_percent": 10.0, + "memory_used_gb": 2.0, + "memory_total_gb": 8.0, + "memory_percent": 25.0, + "disk_used_gb": 10.0, + "disk_total_gb": 100.0, + "disk_percent": 10.0, + } + ) + + ui = LiveMonitorUI(monitor) + metrics = ui._get_latest_metrics() + assert metrics["cpu_percent"] == 10.0 + + def test_render_no_history(self): + monitor = ResourceMonitor() + ui = LiveMonitorUI(monitor) + panel = ui._render() + assert isinstance(panel, Panel) + assert "Collecting metrics" in str(panel.renderable) + + def test_render_with_metrics(self): + monitor = ResourceMonitor() + monitor.history.append( + { + "cpu_percent": 50.0, + "memory_used_gb": 4.0, + "memory_total_gb": 8.0, + "memory_percent": 50.0, + "disk_used_gb": 20.0, + "disk_total_gb": 100.0, + "disk_percent": 20.0, + "network_up_mb": 1.0, + "network_down_mb": 2.0, + } + ) + + ui = LiveMonitorUI(monitor) + panel = ui._render() + content = str(panel.renderable) + + assert "CPU" in content + assert "RAM" in content + assert "Disk" in content + + def test_start_and_stop(self): + monitor = ResourceMonitor() + ui = LiveMonitorUI(monitor) + + with patch("time.sleep", return_value=None): + ui.start() + time.sleep(0.05) + ui.stop() + + assert ui._thread is None diff --git a/tests/monitor/test_resource_monitor.py b/tests/monitor/test_resource_monitor.py new file mode 100644 index 00000000..9b9b73bb --- /dev/null +++ b/tests/monitor/test_resource_monitor.py @@ -0,0 +1,328 @@ +""" +Tests for the ResourceMonitor core monitoring logic. +""" + +import time +from unittest.mock import MagicMock, patch + +import psutil +import pytest + +from cortex.monitor.resource_monitor import ResourceMonitor + + +@pytest.fixture +def monitor(): + return ResourceMonitor(interval=1.0) + + +def test_initial_state(monitor): + """Test that monitor initializes with correct defaults.""" + assert monitor.interval == 1.0 + assert monitor.history == [] + assert all(value == 0.0 for value in monitor.peak_usage.values()) + assert monitor._disk_before is None + assert monitor._net_before is None + + +def test_collect_metrics_basic(monkeypatch, monitor): + """Test metrics collection with mocked psutil calls.""" + monkeypatch.setattr(psutil, "cpu_percent", lambda interval=None: 42.0) + + mock_memory = MagicMock() + mock_memory.used = 8 * 1024**3 + mock_memory.total = 16 * 1024**3 + mock_memory.percent = 50.0 + monkeypatch.setattr(psutil, "virtual_memory", lambda: mock_memory) + + mock_disk = MagicMock() + mock_disk.used = 120 * 1024**3 + mock_disk.total = 500 * 1024**3 + mock_disk.percent = 24.0 + monkeypatch.setattr(psutil, "disk_usage", lambda _: mock_disk) + + mock_disk_io = MagicMock(read_bytes=1000, write_bytes=2000) + monkeypatch.setattr(psutil, "disk_io_counters", lambda: mock_disk_io) + + mock_net = MagicMock(bytes_sent=3000, bytes_recv=4000) + monkeypatch.setattr(psutil, "net_io_counters", lambda: mock_net) + + metrics = monitor.collect_metrics() + + assert metrics["cpu_percent"] == pytest.approx(42.0) + assert metrics["memory_used_gb"] == pytest.approx(8.0) + assert metrics["memory_total_gb"] == pytest.approx(16.0) + assert metrics["memory_percent"] == pytest.approx(50.0) + assert metrics["disk_used_gb"] == pytest.approx(120.0) + assert metrics["disk_total_gb"] == pytest.approx(500.0) + assert metrics["disk_percent"] == pytest.approx(24.0) + + # First sample has zero rates + assert metrics["disk_read_mb"] == 0.0 + assert metrics["disk_write_mb"] == 0.0 + assert metrics["network_up_mb"] == 0.0 + assert metrics["network_down_mb"] == 0.0 + + +def test_collect_metrics_with_previous_values(monkeypatch): + """Test rate calculations when previous values exist.""" + monitor = ResourceMonitor(interval=1.0) + + monitor._disk_before = MagicMock(read_bytes=1000, write_bytes=2000) + monitor._net_before = MagicMock(bytes_sent=3000, bytes_recv=4000) + + monkeypatch.setattr(psutil, "cpu_percent", lambda interval=None: 50.0) + + mock_memory = MagicMock() + mock_memory.used = 8 * 1024**3 + mock_memory.total = 16 * 1024**3 + mock_memory.percent = 50.0 + monkeypatch.setattr(psutil, "virtual_memory", lambda: mock_memory) + + monkeypatch.setattr( + psutil, + "disk_usage", + lambda _: MagicMock(used=120 * 1024**3, total=500 * 1024**3, percent=24.0), + ) + + monkeypatch.setattr( + psutil, + "disk_io_counters", + lambda: MagicMock(read_bytes=1000 + 1024**2, write_bytes=2000 + 1024**2), + ) + + monkeypatch.setattr( + psutil, + "net_io_counters", + lambda: MagicMock(bytes_sent=3000 + 1024**2, bytes_recv=4000 + 1024**2), + ) + + metrics = monitor.collect_metrics() + + assert metrics["disk_read_mb"] == pytest.approx(1.0, rel=0.01) + assert metrics["disk_write_mb"] == pytest.approx(1.0, rel=0.01) + assert metrics["network_up_mb"] == pytest.approx(1.0, rel=0.01) + assert metrics["network_down_mb"] == pytest.approx(1.0, rel=0.01) + + +def test_update_and_peak_usage(monitor): + """Test that update() stores metrics and tracks peaks correctly.""" + metrics1 = { + "cpu_percent": 30.0, + "memory_percent": 40.0, + "memory_used_gb": 6.0, + "disk_percent": 10.0, + "disk_used_gb": 50.0, + "disk_read_mb": 1.0, + "disk_write_mb": 2.0, + "network_up_mb": 0.5, + "network_down_mb": 1.5, + } + + metrics2 = { + "cpu_percent": 80.0, + "memory_percent": 70.0, + "memory_used_gb": 12.0, + "disk_percent": 30.0, + "disk_used_gb": 150.0, + "disk_read_mb": 5.0, + "disk_write_mb": 6.0, + "network_up_mb": 2.0, + "network_down_mb": 3.0, + } + + monitor.update(metrics1) + monitor.update(metrics2) + + assert monitor.peak_usage["cpu_percent"] == 80.0 + assert monitor.peak_usage["memory_percent"] == 70.0 + assert monitor.peak_usage["memory_used_gb"] == 12.0 + assert monitor.peak_usage["disk_percent"] == 30.0 + assert monitor.peak_usage["disk_used_gb"] == 150.0 + assert monitor.peak_usage["disk_read_mb"] == 5.0 + assert monitor.peak_usage["network_up_mb"] == 2.0 + + assert len(monitor.history) == 2 + + +def test_sample_adds_history(monkeypatch, monitor): + """Test that sample() collects metrics and updates history.""" + mock_metrics = { + "timestamp": time.time(), + "cpu_percent": 10.0, + "memory_percent": 20.0, + "memory_used_gb": 4.0, + "memory_total_gb": 16.0, + "disk_percent": 5.0, + "disk_used_gb": 30.0, + "disk_total_gb": 500.0, + "disk_read_mb": 0.1, + "disk_write_mb": 0.2, + "network_up_mb": 0.01, + "network_down_mb": 0.02, + } + + monkeypatch.setattr(monitor, "collect_metrics", lambda: mock_metrics) + + metrics = monitor.sample() + + assert len(monitor.history) == 1 + assert metrics == mock_metrics + assert monitor.peak_usage["cpu_percent"] == 10.0 + + +def test_get_summary(monitor): + """Test get_summary() returns raw numeric data.""" + now = time.time() + + monitor.history.append( + { + "timestamp": now, + "cpu_percent": 55.5, + "memory_used_gb": 8.2, + "memory_total_gb": 16.0, + "memory_percent": 51.0, + "disk_used_gb": 120.0, + "disk_total_gb": 500.0, + "disk_percent": 24.0, + "disk_read_mb": 0.0, + "disk_write_mb": 0.0, + "network_up_mb": 0.8, + "network_down_mb": 2.5, + } + ) + + summary = monitor.get_summary() + current = summary["current"] + + assert current["cpu_percent"] == pytest.approx(55.5) + assert current["memory_used_gb"] == pytest.approx(8.2) + assert current["disk_used_gb"] == pytest.approx(120.0) + assert current["network_down_mb"] == pytest.approx(2.5) + assert current["network_up_mb"] == pytest.approx(0.8) + + +def test_get_summary_empty_history(monitor): + """Test get_summary() with empty history.""" + assert monitor.get_summary() == {} + + +def test_get_peak_usage(monitor): + """Test get_peak_usage() returns a copy.""" + monitor.peak_usage["cpu_percent"] = 90.0 + peaks = monitor.get_peak_usage() + + assert peaks["cpu_percent"] == 90.0 + peaks["cpu_percent"] = 0.0 + assert monitor.peak_usage["cpu_percent"] == 90.0 + + +def test_get_history(monitor): + """Test get_history() returns stored history.""" + monitor.history = [{"cpu_percent": 10.0}, {"cpu_percent": 20.0}] + history = monitor.get_history() + + assert len(history) == 2 + assert history[0]["cpu_percent"] == 10.0 + + +def test_clear_history_resets_state(monitor): + """Test clear_history() resets all internal state.""" + monitor.history = [{"cpu_percent": 10.0}] + monitor.peak_usage["cpu_percent"] = 90.0 + monitor._disk_before = MagicMock() + monitor._net_before = MagicMock() + + monitor.clear_history() + + assert monitor.history == [] + assert all(value == 0.0 for value in monitor.peak_usage.values()) + assert monitor._disk_before is None + assert monitor._net_before is None + + +def test_monitor_with_duration(monitor): + """Test monitor() respects duration.""" + with patch.object(monitor, "sample") as mock_sample: + with patch("time.time", side_effect=[0.0, 0.5, 1.5, 3.0]): + with patch("time.sleep"): + monitor.monitor(duration=2.0) + + assert mock_sample.call_count == 2 + + +def test_monitor_keyboard_interrupt(monitor): + """Test monitor() stops on KeyboardInterrupt.""" + calls = 0 + + def side_effect(): + nonlocal calls + calls += 1 + if calls == 2: + raise KeyboardInterrupt + + with patch.object(monitor, "sample", side_effect=side_effect): + with patch("time.sleep"): + monitor.monitor() + + assert calls == 2 + + +def test_get_recent_alerts(monitor): + """Test get_recent_alerts() returns only alert samples.""" + monitor.history = [ + {"timestamp": 1000, "alerts": ["CPU alert"], "cpu_percent": 90}, + {"timestamp": 2000, "alerts": [], "cpu_percent": 50}, + {"timestamp": 3000, "alerts": ["Memory alert"], "cpu_percent": 60}, + ] + + alerts = monitor.get_recent_alerts(last_n_samples=3) + assert len(alerts) == 2 + assert alerts[0]["timestamp"] == 1000 + assert alerts[1]["timestamp"] == 3000 + + +def test_get_recommendations(monitor): + """Test recommendations are generated from peak usage.""" + monitor.peak_usage = { + "cpu_percent": 90.0, + "memory_percent": 95.0, + "disk_percent": 10.0, + "network_up_mb": 60.0, + "network_down_mb": 70.0, + } + + recs = monitor.get_recommendations() + + assert any("CPU" in r for r in recs) + assert any("memory" in r.lower() for r in recs) + assert any("network" in r.lower() for r in recs) + + +def test_get_stats(monitor): + """Test get_stats() returns averages and metadata.""" + monitor.history = [ + {"cpu_percent": 10.0, "memory_percent": 20.0, "disk_percent": 30.0, "timestamp": 1000}, + {"cpu_percent": 20.0, "memory_percent": 40.0, "disk_percent": 60.0, "timestamp": 2000}, + ] + + stats = monitor.get_stats() + + assert stats["averages"]["cpu_percent"] == pytest.approx(15.0) + assert stats["averages"]["memory_percent"] == pytest.approx(30.0) + assert stats["samples"] == 2 + + +def test_check_alerts(monitor): + """Test alert detection logic.""" + monitor.cpu_threshold = 80.0 + monitor.memory_threshold = 90.0 + monitor.disk_threshold = 95.0 + + low = {"cpu_percent": 50.0, "memory_percent": 60.0, "disk_percent": 70.0} + assert monitor.check_alerts(low) == [] + + high = {"cpu_percent": 90.0, "memory_percent": 95.0, "disk_percent": 99.0} + alerts = monitor.check_alerts(high) + + assert len(alerts) == 3 diff --git a/tests/test_cli.py b/tests/test_cli.py index bed29ab4..2a26dcae 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -212,7 +212,15 @@ def test_main_install_command(self, mock_install): mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=False, dry_run=False, parallel=False) + mock_install.assert_called_once_with( + "docker", + execute=False, + dry_run=False, + parallel=False, + monitor=False, + export=None, + output="installation_monitoring", + ) @patch("sys.argv", ["cortex", "install", "docker", "--execute"]) @patch("cortex.cli.CortexCLI.install") @@ -220,7 +228,15 @@ def test_main_install_with_execute(self, mock_install): mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=True, dry_run=False, parallel=False) + mock_install.assert_called_once_with( + "docker", + execute=True, + dry_run=False, + parallel=False, + monitor=False, + export=None, + output="installation_monitoring", + ) @patch("sys.argv", ["cortex", "install", "docker", "--dry-run"]) @patch("cortex.cli.CortexCLI.install") @@ -228,7 +244,15 @@ def test_main_install_with_dry_run(self, mock_install): mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=False, dry_run=True, parallel=False) + mock_install.assert_called_once_with( + "docker", + execute=False, + dry_run=True, + parallel=False, + monitor=False, + export=None, + output="installation_monitoring", + ) def test_spinner_animation(self): initial_idx = self.cli.spinner_idx diff --git a/tests/test_cli_extended.py b/tests/test_cli_extended.py index 173d7a7d..f9a6ffec 100644 --- a/tests/test_cli_extended.py +++ b/tests/test_cli_extended.py @@ -303,7 +303,15 @@ def test_main_install_command(self, mock_install) -> None: mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=False, dry_run=False, parallel=False) + mock_install.assert_called_once_with( + "docker", + execute=False, + dry_run=False, + parallel=False, + monitor=False, + export=None, + output="installation_monitoring", + ) @patch("sys.argv", ["cortex", "install", "docker", "--execute"]) @patch("cortex.cli.CortexCLI.install") @@ -311,7 +319,15 @@ def test_main_install_with_execute(self, mock_install) -> None: mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=True, dry_run=False, parallel=False) + mock_install.assert_called_once_with( + "docker", + execute=True, + dry_run=False, + parallel=False, + monitor=False, + export=None, + output="installation_monitoring", + ) @patch("sys.argv", ["cortex", "install", "docker", "--dry-run"]) @patch("cortex.cli.CortexCLI.install") @@ -319,7 +335,15 @@ def test_main_install_with_dry_run(self, mock_install) -> None: mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=False, dry_run=True, parallel=False) + mock_install.assert_called_once_with( + "docker", + execute=False, + dry_run=True, + parallel=False, + monitor=False, + export=None, + output="installation_monitoring", + ) def test_spinner_animation(self) -> None: initial_idx = self.cli.spinner_idx