Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 166 additions & 1 deletion runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6278,6 +6278,7 @@ def __init__(self, script_path: str, script_args: Optional[List[str]] = None,
self.max_output_lines = None
self.hooks = ExecutionHook()
self.monitor_interval = 0.1
self.config_file = config_file

# UPDATED: Phase 2 retry config (replaces old retry_count and retry_delay)
self.retry_config = RetryConfig() # Default configuration
Expand All @@ -6291,8 +6292,10 @@ def __init__(self, script_path: str, script_args: Optional[List[str]] = None,
# NEW: Phase 2 features
self.enable_history = enable_history
self.history_manager = None
self.history_db_path = None
if enable_history:
db_path = history_db or 'script_runner_history.db'
self.history_db_path = db_path
self.history_manager = HistoryManager(db_path=db_path)

# NEW: Trend Analysis (Phase 2)
Expand Down Expand Up @@ -6497,6 +6500,30 @@ def validate_script(self) -> bool:
self.logger.warning(f"Script does not have .py extension: {self.script_path}")
return True

def get_execution_plan(self) -> Dict[str, Any]:
"""Return a structured view of how the script will be executed.

This helper is used by the CLI ``--dry-run`` flag to show what the
runner would do without actually launching the subprocess. It surfaces
key configuration such as the script path, arguments, timeouts, logging
level, configuration file, and history database state.

Returns:
Dict[str, Any]: Execution summary including paths and toggles.
"""
return {
'script_path': os.path.abspath(self.script_path),
'script_args': list(self.script_args),
'timeout': self.timeout,
'log_level': logging.getLevelName(self.logger.level),
'config_file': os.path.abspath(self.config_file) if self.config_file else None,
'history_enabled': self.enable_history,
'history_db': os.path.abspath(self.history_db_path) if self.history_db_path else None,
'monitor_interval': self.monitor_interval,
'retry_strategy': self.retry_config.strategy,
'max_attempts': self.retry_config.max_attempts,
}

def run_script(self, retry_on_failure: bool = False) -> Dict:
"""Execute script with advanced retry and monitoring capabilities.

Expand Down Expand Up @@ -7021,6 +7048,129 @@ def estimate_execution_costs(self) -> Optional[Dict]:
self.logger.warning(f"Cost estimation failed: {e}")
return None

# ------------------------------------------------------------------
# General-purpose helpers derived from the previous v7 enhancement
# module. These helpers make the advanced features directly
# accessible from ScriptRunner without requiring a separate wrapper.
# ------------------------------------------------------------------
def pre_execution_security_scan(
self, script_path: Optional[str] = None, block_on_critical: bool = False
) -> Dict[str, Any]:
"""Run code analysis before execution.

Args:
script_path: Optional explicit script path; defaults to runner script.
block_on_critical: Whether to mark the scan as failed when critical
findings are present.

Returns:
Dict[str, Any]: Scan outcome including findings and block status.
"""
target = script_path or self.script_path

if not self.enable_code_analysis or not self.code_analyzer:
return {'success': True, 'findings': []}

try:
result = self.code_analyzer.analyze(target)
critical_findings = getattr(result, 'critical_findings', [])
findings = getattr(result, 'findings', [])

if critical_findings and block_on_critical:
self.logger.error(f"Critical security findings detected in {target}")
return {
'success': False,
'findings': [f.to_dict() if hasattr(f, 'to_dict') else f for f in critical_findings],
'blocked': True,
}

return {
'success': True,
'findings': [f.to_dict() if hasattr(f, 'to_dict') else f for f in findings],
'critical_count': len(critical_findings),
}
except Exception as e:
self.logger.error(f"Security scan error: {e}")
return {'success': False, 'error': str(e)}

def scan_dependencies(self, requirements_file: str = 'requirements.txt') -> Dict[str, Any]:
"""Scan dependencies for vulnerabilities using the configured scanner."""
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for scan_dependencies is incomplete. It should document the parameters, return value structure, and provide an example similar to other methods in this class.

Consider expanding the docstring to include:

  • Args section describing the requirements_file parameter
  • Returns section describing the dictionary structure with keys like 'success', 'vulnerability_count', 'vulnerabilities', and 'sbom'
  • An example showing typical usage
Suggested change
"""Scan dependencies for vulnerabilities using the configured scanner."""
"""
Scan dependencies for vulnerabilities using the configured scanner.
Args:
requirements_file (str): Path to the requirements file to scan. Defaults to 'requirements.txt'.
Returns:
Dict[str, Any]: Dictionary containing the scan results:
- success (bool): Whether the scan completed successfully.
- vulnerability_count (int): Number of vulnerabilities found.
- vulnerabilities (list): List of vulnerability details (dicts).
- sbom (Any): Software Bill of Materials, if available.
- error (str, optional): Error message if the scan failed.
Example:
>>> runner = ScriptRunner('myscript.py')
>>> result = runner.scan_dependencies('requirements.txt')
>>> if result['success']:
... print(f"Found {result['vulnerability_count']} vulnerabilities")
... for vuln in result['vulnerabilities']:
... print(vuln)
... else:
... print("Scan failed:", result.get('error'))
"""

Copilot uses AI. Check for mistakes.
if not self.enable_dependency_scanning or not self.dependency_scanner:
return {'success': True, 'vulnerabilities': []}

if not os.path.exists(requirements_file):
return {'success': False, 'error': f'{requirements_file} not found'}

try:
result = self.dependency_scanner.scan_requirements(requirements_file)
vulnerabilities = getattr(result, 'vulnerabilities', [])
return {
'success': getattr(result, 'success', True),
'vulnerability_count': len(vulnerabilities),
'vulnerabilities': [v.to_dict() if hasattr(v, 'to_dict') else v for v in vulnerabilities],
'sbom': getattr(result, 'sbom', None),
}
except Exception as e:
self.logger.error(f"Dependency scan error: {e}")
return {'success': False, 'error': str(e)}

def scan_secrets(self, path: str = '.') -> Dict[str, Any]:
"""Scan a path for hardcoded secrets."""
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for scan_secrets is incomplete. It should document the parameters, return value structure, and provide an example similar to other methods in this class.

Consider expanding the docstring to include:

  • Args section describing the path parameter (what types of paths are accepted - file or directory)
  • Returns section describing the dictionary structure with keys like 'success', 'has_secrets', 'secret_count', and 'secrets'
  • An example showing typical usage
Suggested change
"""Scan a path for hardcoded secrets."""
"""
Scan a path (file or directory) for hardcoded secrets using the configured secret scanner.
Args:
path (str): Path to a file or directory to scan for secrets. Defaults to current directory ('.').
Returns:
dict: Dictionary with the following keys:
- success (bool): True if scan completed successfully, False otherwise.
- has_secrets (bool): True if any secrets were found, False otherwise.
- secret_count (int): Number of secrets found.
- secrets (list): List of secrets found (as dicts).
- error (str, optional): Error message if scan failed.
Example:
>>> runner = ScriptRunner()
>>> result = runner.scan_secrets('my_script.py')
>>> print(result)
{
'success': True,
'has_secrets': False,
'secret_count': 0,
'secrets': []
}
"""

Copilot uses AI. Check for mistakes.
if not self.enable_secret_scanning or not self.secret_scanner:
return {'success': True, 'secrets': []}

try:
if os.path.isfile(path):
result = self.secret_scanner.scan_file(path)
else:
result = self.secret_scanner.scan_directory(path)

secrets = getattr(result, 'secrets', [])
return {
'success': getattr(result, 'success', True),
'has_secrets': getattr(result, 'has_secrets', bool(secrets)),
'secret_count': len(secrets),
'secrets': [s.to_dict() if hasattr(s, 'to_dict') else s for s in secrets],
}
except Exception as e:
self.logger.error(f"Secret scan error: {e}")
return {'success': False, 'error': str(e)}

def start_tracing_span(self, span_name: str):
"""Start a distributed tracing span using the configured tracer."""
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for start_tracing_span is incomplete. It should document the parameters, return value, and behavior when tracing is disabled.

Consider expanding the docstring to include:

  • Args section describing the span_name parameter
  • Returns section describing what the context manager yields (the span when tracing is enabled, None when disabled)
  • An example showing how to use this as a context manager
Suggested change
"""Start a distributed tracing span using the configured tracer."""
"""
Start a distributed tracing span using the configured tracer.
Args:
span_name (str): The name of the tracing span to start.
Returns:
Context manager that yields the tracing span object if tracing is enabled,
or None if tracing is disabled.
Behavior:
If tracing is enabled and a tracing manager is configured, this returns a context
manager that yields the active tracing span. If tracing is disabled, it returns
a no-op context manager that yields None.
Example:
>>> with runner.start_tracing_span("my_span") as span:
... # code to trace
... if span is not None:
... span.set_tag("key", "value")
"""

Copilot uses AI. Check for mistakes.
if self.tracing_manager:
return self.tracing_manager.trace_span(span_name)

from contextlib import contextmanager

@contextmanager
def noop():
yield None

return noop()

def start_cost_tracking(self) -> None:
"""Begin monitoring execution costs if enabled."""
if self.cost_tracker:
self.cost_tracker.start_monitoring()
self.logger.info("Cost tracking started")

def stop_cost_tracking(self) -> Dict[str, Any]:
"""Stop cost tracking and return a summary report."""
if not self.cost_tracker:
return {}

try:
report = self.cost_tracker.get_cost_report()
return {
'total_estimated_cost_usd': getattr(report, 'total_estimated_cost_usd', 0),
'cost_by_provider': getattr(report, 'cost_by_provider', {}),
'cost_by_service': getattr(report, 'cost_by_service', {}),
}
except Exception as e:
self.logger.error(f"Cost tracking error: {e}")
return {}

def start_execution_tracing(self) -> Optional[Any]:
"""Start OpenTelemetry tracing for script execution.

Expand Down Expand Up @@ -7148,8 +7298,10 @@ def main():
parser.add_argument('script', nargs='?', help='Python script to execute')
parser.add_argument('script_args', nargs='*', help='Arguments to pass to the script')
parser.add_argument('--timeout', type=int, default=None, help='Execution timeout in seconds')
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
default='INFO', help='Logging level')
parser.add_argument('--dry-run', action='store_true',
help='Validate the script and show execution plan without running it')
parser.add_argument('--config', help='Configuration file (YAML)')
parser.add_argument('--monitor-interval', type=float, default=0.1,
help='Process monitor sampling interval (seconds)')
Expand Down Expand Up @@ -8479,6 +8631,19 @@ def main():
enable_history=not args.disable_history
)

if args.dry_run:
try:
runner.validate_script()
except Exception as exc:
logging.error(f"Dry-run validation failed: {exc}")
return 1

plan = runner.get_execution_plan()
print("\nDRY-RUN: Execution plan (no script executed)")
for key, value in plan.items():
print(f" {key}: {value}")
return 0

runner.monitor_interval = args.monitor_interval
runner.suppress_warnings = args.suppress_warnings

Expand Down
Loading
Loading