diff --git a/runner.py b/runner.py index b79e7d8..90e8920 100644 --- a/runner.py +++ b/runner.py @@ -6278,6 +6278,7 @@ def __init__(self, script_path: str, script_args: Optional[List[str]] = None, self.max_output_lines = None self.hooks = ExecutionHook() self.monitor_interval = 0.1 + self.config_file = config_file # UPDATED: Phase 2 retry config (replaces old retry_count and retry_delay) self.retry_config = RetryConfig() # Default configuration @@ -6291,8 +6292,10 @@ def __init__(self, script_path: str, script_args: Optional[List[str]] = None, # NEW: Phase 2 features self.enable_history = enable_history self.history_manager = None + self.history_db_path = None if enable_history: db_path = history_db or 'script_runner_history.db' + self.history_db_path = db_path self.history_manager = HistoryManager(db_path=db_path) # NEW: Trend Analysis (Phase 2) @@ -6497,6 +6500,30 @@ def validate_script(self) -> bool: self.logger.warning(f"Script does not have .py extension: {self.script_path}") return True + def get_execution_plan(self) -> Dict[str, Any]: + """Return a structured view of how the script will be executed. + + This helper is used by the CLI ``--dry-run`` flag to show what the + runner would do without actually launching the subprocess. It surfaces + key configuration such as the script path, arguments, timeouts, logging + level, configuration file, and history database state. + + Returns: + Dict[str, Any]: Execution summary including paths and toggles. + """ + return { + 'script_path': os.path.abspath(self.script_path), + 'script_args': list(self.script_args), + 'timeout': self.timeout, + 'log_level': logging.getLevelName(self.logger.level), + 'config_file': os.path.abspath(self.config_file) if self.config_file else None, + 'history_enabled': self.enable_history, + 'history_db': os.path.abspath(self.history_db_path) if self.history_db_path else None, + 'monitor_interval': self.monitor_interval, + 'retry_strategy': self.retry_config.strategy, + 'max_attempts': self.retry_config.max_attempts, + } + def run_script(self, retry_on_failure: bool = False) -> Dict: """Execute script with advanced retry and monitoring capabilities. @@ -7021,6 +7048,129 @@ def estimate_execution_costs(self) -> Optional[Dict]: self.logger.warning(f"Cost estimation failed: {e}") return None + # ------------------------------------------------------------------ + # General-purpose helpers derived from the previous v7 enhancement + # module. These helpers make the advanced features directly + # accessible from ScriptRunner without requiring a separate wrapper. + # ------------------------------------------------------------------ + def pre_execution_security_scan( + self, script_path: Optional[str] = None, block_on_critical: bool = False + ) -> Dict[str, Any]: + """Run code analysis before execution. + + Args: + script_path: Optional explicit script path; defaults to runner script. + block_on_critical: Whether to mark the scan as failed when critical + findings are present. + + Returns: + Dict[str, Any]: Scan outcome including findings and block status. + """ + target = script_path or self.script_path + + if not self.enable_code_analysis or not self.code_analyzer: + return {'success': True, 'findings': []} + + try: + result = self.code_analyzer.analyze(target) + critical_findings = getattr(result, 'critical_findings', []) + findings = getattr(result, 'findings', []) + + if critical_findings and block_on_critical: + self.logger.error(f"Critical security findings detected in {target}") + return { + 'success': False, + 'findings': [f.to_dict() if hasattr(f, 'to_dict') else f for f in critical_findings], + 'blocked': True, + } + + return { + 'success': True, + 'findings': [f.to_dict() if hasattr(f, 'to_dict') else f for f in findings], + 'critical_count': len(critical_findings), + } + except Exception as e: + self.logger.error(f"Security scan error: {e}") + return {'success': False, 'error': str(e)} + + def scan_dependencies(self, requirements_file: str = 'requirements.txt') -> Dict[str, Any]: + """Scan dependencies for vulnerabilities using the configured scanner.""" + if not self.enable_dependency_scanning or not self.dependency_scanner: + return {'success': True, 'vulnerabilities': []} + + if not os.path.exists(requirements_file): + return {'success': False, 'error': f'{requirements_file} not found'} + + try: + result = self.dependency_scanner.scan_requirements(requirements_file) + vulnerabilities = getattr(result, 'vulnerabilities', []) + return { + 'success': getattr(result, 'success', True), + 'vulnerability_count': len(vulnerabilities), + 'vulnerabilities': [v.to_dict() if hasattr(v, 'to_dict') else v for v in vulnerabilities], + 'sbom': getattr(result, 'sbom', None), + } + except Exception as e: + self.logger.error(f"Dependency scan error: {e}") + return {'success': False, 'error': str(e)} + + def scan_secrets(self, path: str = '.') -> Dict[str, Any]: + """Scan a path for hardcoded secrets.""" + if not self.enable_secret_scanning or not self.secret_scanner: + return {'success': True, 'secrets': []} + + try: + if os.path.isfile(path): + result = self.secret_scanner.scan_file(path) + else: + result = self.secret_scanner.scan_directory(path) + + secrets = getattr(result, 'secrets', []) + return { + 'success': getattr(result, 'success', True), + 'has_secrets': getattr(result, 'has_secrets', bool(secrets)), + 'secret_count': len(secrets), + 'secrets': [s.to_dict() if hasattr(s, 'to_dict') else s for s in secrets], + } + except Exception as e: + self.logger.error(f"Secret scan error: {e}") + return {'success': False, 'error': str(e)} + + def start_tracing_span(self, span_name: str): + """Start a distributed tracing span using the configured tracer.""" + if self.tracing_manager: + return self.tracing_manager.trace_span(span_name) + + from contextlib import contextmanager + + @contextmanager + def noop(): + yield None + + return noop() + + def start_cost_tracking(self) -> None: + """Begin monitoring execution costs if enabled.""" + if self.cost_tracker: + self.cost_tracker.start_monitoring() + self.logger.info("Cost tracking started") + + def stop_cost_tracking(self) -> Dict[str, Any]: + """Stop cost tracking and return a summary report.""" + if not self.cost_tracker: + return {} + + try: + report = self.cost_tracker.get_cost_report() + return { + 'total_estimated_cost_usd': getattr(report, 'total_estimated_cost_usd', 0), + 'cost_by_provider': getattr(report, 'cost_by_provider', {}), + 'cost_by_service': getattr(report, 'cost_by_service', {}), + } + except Exception as e: + self.logger.error(f"Cost tracking error: {e}") + return {} + def start_execution_tracing(self) -> Optional[Any]: """Start OpenTelemetry tracing for script execution. @@ -7148,8 +7298,10 @@ def main(): parser.add_argument('script', nargs='?', help='Python script to execute') parser.add_argument('script_args', nargs='*', help='Arguments to pass to the script') parser.add_argument('--timeout', type=int, default=None, help='Execution timeout in seconds') - parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], + parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help='Logging level') + parser.add_argument('--dry-run', action='store_true', + help='Validate the script and show execution plan without running it') parser.add_argument('--config', help='Configuration file (YAML)') parser.add_argument('--monitor-interval', type=float, default=0.1, help='Process monitor sampling interval (seconds)') @@ -8479,6 +8631,19 @@ def main(): enable_history=not args.disable_history ) + if args.dry_run: + try: + runner.validate_script() + except Exception as exc: + logging.error(f"Dry-run validation failed: {exc}") + return 1 + + plan = runner.get_execution_plan() + print("\nDRY-RUN: Execution plan (no script executed)") + for key, value in plan.items(): + print(f" {key}: {value}") + return 0 + runner.monitor_interval = args.monitor_interval runner.suppress_warnings = args.suppress_warnings diff --git a/runners/v7_enhancement.py b/runners/v7_enhancement.py deleted file mode 100644 index e4e6272..0000000 --- a/runners/v7_enhancement.py +++ /dev/null @@ -1,291 +0,0 @@ -""" -Python Script Runner v7.0 - ScriptRunner Enhancement with v7 Features - -This module integrates all v7.0 features (workflows, tracing, security, costs) -seamlessly into the existing ScriptRunner class while maintaining 100% backward -compatibility. - -Features: -- Workflow Engine integration -- OpenTelemetry distributed tracing -- Automated security scanning -- Dependency vulnerability scanning -- Secret detection -- Multi-cloud cost tracking -""" - -import os -import sys -import logging -from typing import Dict, List, Optional, Any -from pathlib import Path - -# Import v7 features -try: - from runners.workflows.workflow_engine import WorkflowEngine - from runners.tracers.otel_manager import TracingManager - from runners.scanners.code_analyzer import CodeAnalyzer - from runners.scanners.dependency_scanner import DependencyVulnerabilityScanner - from runners.security.secret_scanner import SecretScanner - from runners.integrations.cloud_cost_tracker import CloudCostTracker - V7_FEATURES_AVAILABLE = True -except ImportError as e: - V7_FEATURES_AVAILABLE = False - print(f"Warning: v7 features not fully available: {e}") - - -logger = logging.getLogger(__name__) - - -class V7ScriptRunnerEnhancer: - """Enhances ScriptRunner with v7.0 features while maintaining backward compatibility""" - - def __init__(self, script_runner, config: Optional[Dict[str, Any]] = None): - """Initialize enhancer with existing ScriptRunner instance - - Args: - script_runner: Existing ScriptRunner instance - config: Configuration dict for v7 features - """ - self.runner = script_runner - self.config = config or {} - self.logger = logging.getLogger(__name__) - - # Initialize v7 feature managers - self.workflow_engine = None - self.tracing_manager = None - self.code_analyzer = None - self.dependency_scanner = None - self.secret_scanner = None - self.cost_tracker = None - - # Feature flags - self.enable_workflows = self.config.get('workflows', {}).get('enabled', False) - self.enable_tracing = self.config.get('tracing', {}).get('enabled', False) - self.enable_security = self.config.get('security', {}).get('enabled', False) - self.enable_costs = self.config.get('costs', {}).get('enabled', False) - - self._initialize_features() - - def _initialize_features(self): - """Initialize all enabled v7 features""" - if not V7_FEATURES_AVAILABLE: - self.logger.warning("v7 features not available") - return - - try: - # Initialize workflow engine - if self.enable_workflows: - self.workflow_engine = WorkflowEngine() - self.logger.info("✓ Workflow Engine initialized") - - # Initialize tracing - if self.enable_tracing: - tracing_config = self.config.get('tracing', {}) - self.tracing_manager = TracingManager( - service_name=tracing_config.get('service_name', 'script_runner'), - exporter_type=tracing_config.get('exporter_type', 'jaeger'), - sampling_rate=tracing_config.get('sampling_rate', 0.1) - ) - self.logger.info("✓ Tracing Manager initialized") - - # Initialize security scanning - if self.enable_security: - self.code_analyzer = CodeAnalyzer() - self.dependency_scanner = DependencyVulnerabilityScanner() - self.secret_scanner = SecretScanner() - self.logger.info("✓ Security scanners initialized") - - # Initialize cost tracking - if self.enable_costs: - self.cost_tracker = CloudCostTracker() - self.logger.info("✓ Cost tracker initialized") - - except Exception as e: - self.logger.error(f"Error initializing v7 features: {e}") - - def pre_execution_security_scan(self, script_path: str) -> Dict[str, Any]: - """Run pre-execution security scanning - - Args: - script_path: Path to script to scan - - Returns: - Dict with security findings - """ - if not self.enable_security or not self.code_analyzer: - return {'success': True, 'findings': []} - - try: - # Scan the script for vulnerabilities - result = self.code_analyzer.analyze(script_path) - - # Check for critical findings - if result.critical_findings and self.config.get('security', {}).get('block_on_critical', False): - self.logger.error(f"Critical security findings detected in {script_path}") - return { - 'success': False, - 'findings': [f.to_dict() for f in result.critical_findings], - 'blocked': True - } - - return { - 'success': True, - 'findings': [f.to_dict() for f in result.findings], - 'critical_count': len(result.critical_findings) - } - except Exception as e: - self.logger.error(f"Security scan error: {e}") - return {'success': False, 'error': str(e)} - - def scan_dependencies(self, requirements_file: str = 'requirements.txt') -> Dict[str, Any]: - """Scan project dependencies for vulnerabilities - - Args: - requirements_file: Path to requirements.txt - - Returns: - Dict with vulnerability findings - """ - if not self.enable_security or not self.dependency_scanner: - return {'success': True, 'vulnerabilities': []} - - if not os.path.exists(requirements_file): - return {'success': False, 'error': f'{requirements_file} not found'} - - try: - result = self.dependency_scanner.scan_requirements(requirements_file) - return { - 'success': result.success, - 'vulnerability_count': len(result.vulnerabilities), - 'vulnerabilities': [v.to_dict() for v in result.vulnerabilities], - 'sbom': result.sbom if hasattr(result, 'sbom') else None - } - except Exception as e: - self.logger.error(f"Dependency scan error: {e}") - return {'success': False, 'error': str(e)} - - def scan_secrets(self, path: str = '.') -> Dict[str, Any]: - """Scan for hardcoded secrets - - Args: - path: Path to scan (file or directory) - - Returns: - Dict with detected secrets - """ - if not self.enable_security or not self.secret_scanner: - return {'success': True, 'secrets': []} - - try: - if os.path.isfile(path): - result = self.secret_scanner.scan_file(path) - else: - result = self.secret_scanner.scan_directory(path) - - return { - 'success': result.success if hasattr(result, 'success') else True, - 'has_secrets': result.has_secrets if hasattr(result, 'has_secrets') else False, - 'secret_count': len(result.secrets) if hasattr(result, 'secrets') else 0, - 'secrets': [s.to_dict() for s in result.secrets] if hasattr(result, 'secrets') else [] - } - except Exception as e: - self.logger.error(f"Secret scan error: {e}") - return {'success': False, 'error': str(e)} - - def start_tracing_span(self, span_name: str): - """Start a distributed tracing span - - Args: - span_name: Name of the span - - Returns: - Context manager for the span - """ - if self.tracing_manager: - return self.tracing_manager.trace_span(span_name) - else: - # Return no-op context manager - from contextlib import contextmanager - @contextmanager - def noop(): - yield None - return noop() - - def start_cost_tracking(self): - """Start cloud cost tracking""" - if self.cost_tracker: - self.cost_tracker.start_monitoring() - self.logger.info("Cost tracking started") - - def stop_cost_tracking(self) -> Dict[str, Any]: - """Stop cost tracking and get cost report - - Returns: - Dict with cost analysis - """ - if not self.cost_tracker: - return {} - - try: - report = self.cost_tracker.get_cost_report() - return { - 'total_estimated_cost_usd': report.total_estimated_cost_usd if hasattr(report, 'total_estimated_cost_usd') else 0, - 'cost_by_provider': report.cost_by_provider if hasattr(report, 'cost_by_provider') else {}, - 'cost_by_service': report.cost_by_service if hasattr(report, 'cost_by_service') else {} - } - except Exception as e: - self.logger.error(f"Cost tracking error: {e}") - return {} - - -def enhance_script_runner(runner, config: Optional[Dict[str, Any]] = None) -> V7ScriptRunnerEnhancer: - """Enhance existing ScriptRunner instance with v7 features - - Args: - runner: ScriptRunner instance - config: Configuration dict for v7 features - - Returns: - V7ScriptRunnerEnhancer instance - - Example: - >>> from runner import ScriptRunner - >>> runner = ScriptRunner('script.py') - >>> v7_enhancer = enhance_script_runner(runner, { - ... 'workflows': {'enabled': True}, - ... 'tracing': {'enabled': True, 'sampling_rate': 0.1}, - ... 'security': {'enabled': True, 'block_on_critical': True}, - ... 'costs': {'enabled': True} - ... }) - >>> v7_enhancer.pre_execution_security_scan('script.py') - """ - return V7ScriptRunnerEnhancer(runner, config) - - -def load_v7_config(config_file: str) -> Dict[str, Any]: - """Load v7 feature configuration from YAML file - - Args: - config_file: Path to config.yaml - - Returns: - Configuration dict - """ - try: - import yaml - except ImportError: - logger.warning("PyYAML not installed, using default config") - return {} - - if not os.path.exists(config_file): - logger.warning(f"Config file {config_file} not found") - return {} - - try: - with open(config_file, 'r') as f: - config = yaml.safe_load(f) or {} - return config - except Exception as e: - logger.error(f"Error loading config: {e}") - return {} diff --git a/tests/test_integration.py b/tests/test_integration.py index a5bf15a..b42638f 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -15,6 +15,7 @@ import tempfile import json import time +import subprocess from pathlib import Path from unittest.mock import Mock, patch @@ -66,16 +67,33 @@ def test_history_database_creation(self, tmp_path): """Test that history database is properly created""" script_file = tmp_path / "test_db.py" script_file.write_text("print('test'); exit(0)") - + db_file = tmp_path / "test.db" runner = ScriptRunner(str(script_file), enable_history=True) - + result = runner.run_script() - + # Check if metrics are collected assert 'metrics' in result assert len(result['metrics']) > 0 + def test_cli_dry_run_shows_execution_plan(self, tmp_path): + """Ensure CLI dry-run validates script and prints plan without running it.""" + script_file = tmp_path / "dry_run_target.py" + script_file.write_text("print('should not run during dry-run')") + + result = subprocess.run( + [sys.executable, "-m", "runner", str(script_file), "--dry-run", "--timeout", "3"], + capture_output=True, + text=True, + check=False, + ) + + assert result.returncode == 0 + assert "DRY-RUN: Execution plan" in result.stdout + assert "dry_run_target.py" in result.stdout + assert "timeout: 3" in result.stdout + @pytest.mark.integration class TestAlertIntegration: diff --git a/tests/test_runner_core.py b/tests/test_runner_core.py index 53eb81f..e25d99b 100644 --- a/tests/test_runner_core.py +++ b/tests/test_runner_core.py @@ -60,11 +60,35 @@ def test_runner_with_history(self, tmp_path): script_file = tmp_path / "test.py" script_file.write_text("print('hello')") db_file = tmp_path / "history.db" - + runner = ScriptRunner(str(script_file), enable_history=True, history_db=str(db_file)) - + assert runner.enable_history is True + def test_execution_plan_summary(self, tmp_path): + """Ensure execution plan surfaces key configuration without running script.""" + script_file = tmp_path / "plan.py" + script_file.write_text("print('dry run')") + db_file = tmp_path / "history.db" + + runner = ScriptRunner( + str(script_file), + script_args=["--flag", "value"], + timeout=5, + history_db=str(db_file), + enable_history=True, + log_level="DEBUG", + ) + + plan = runner.get_execution_plan() + + assert plan["script_path"].endswith("plan.py") + assert plan["script_args"] == ["--flag", "value"] + assert plan["timeout"] == 5 + assert plan["history_enabled"] is True + assert plan["history_db"].endswith("history.db") + assert plan["log_level"] == "DEBUG" + @pytest.mark.unit class TestScriptExecution: diff --git a/tests/unit/test_runner.py b/tests/unit/test_runner.py index 132ea26..25b135e 100644 --- a/tests/unit/test_runner.py +++ b/tests/unit/test_runner.py @@ -400,34 +400,40 @@ def test_script_syntax_error_handling(self, tmp_path): class TestV7FeatureIntegration: """Test integration with v7 features""" - - def test_v7_enhancement_available(self, tmp_path): - """Test v7 enhancement loading""" - try: - from runners.v7_enhancement import enhance_script_runner - script_file = tmp_path / "test.py" - script_file.write_text("print('test')\nexit(0)") - - runner = ScriptRunner(str(script_file)) - # Should be able to enhance - assert runner is not None - except ImportError: - pytest.skip("v7 features not available") - - @patch('runners.v7_enhancement.V7ScriptRunnerEnhancer') - def test_v7_security_scanning(self, mock_enhancer, tmp_path): - """Test v7 security scanning integration""" + + def test_v7_features_exposed_directly(self, tmp_path): + """Runner should expose v7 helpers without a separate enhancer.""" script_file = tmp_path / "test.py" script_file.write_text("print('test')\nexit(0)") - + runner = ScriptRunner(str(script_file)) - - # Mock enhancer - enhancer_instance = Mock() - mock_enhancer.return_value = enhancer_instance - - # Should be able to use enhancer - assert runner is not None + + assert hasattr(runner, 'start_tracing_span') + assert hasattr(runner, 'start_cost_tracking') + assert runner.pre_execution_security_scan()['success'] is True + + def test_pre_execution_security_scan_blocks_on_critical(self, tmp_path): + """Security scan helper should block when critical findings are present.""" + script_file = tmp_path / "test.py" + script_file.write_text("print('test')\nexit(0)") + + runner = ScriptRunner(str(script_file)) + + class FakeFinding: + def to_dict(self): + return {'id': 'C1'} + + class FakeResult: + findings = [FakeFinding()] + critical_findings = [FakeFinding()] + + runner.code_analyzer = Mock(analyze=Mock(return_value=FakeResult())) + runner.enable_code_analysis = True + + scan_result = runner.pre_execution_security_scan(block_on_critical=True) + + assert scan_result['success'] is False + assert scan_result['blocked'] is True class TestIntegration: