From 0d0e68b8e4978015a7999da2e24781b28b39be95 Mon Sep 17 00:00:00 2001 From: Amarjeet LNU Date: Fri, 5 Dec 2025 10:25:24 -0800 Subject: [PATCH] Updating evaluator to append timestamp to function name --- .../src/sagemaker/ai_registry/evaluator.py | 2 +- .../train/common_utils/trainer_wait.py | 233 +++++++++--------- .../train/common_utils/test_trainer_wait.py | 39 --- 3 files changed, 124 insertions(+), 150 deletions(-) diff --git a/sagemaker-train/src/sagemaker/ai_registry/evaluator.py b/sagemaker-train/src/sagemaker/ai_registry/evaluator.py index 10c5406ab0..349705f08d 100644 --- a/sagemaker-train/src/sagemaker/ai_registry/evaluator.py +++ b/sagemaker-train/src/sagemaker/ai_registry/evaluator.py @@ -381,7 +381,7 @@ def _create_lambda_function(cls, name: str, source_file: str, role: Optional[str # Create Lambda function lambda_client = boto3.client("lambda") - function_name = f"SageMaker-evaluator-{name}" + function_name = f"SageMaker-evaluator-{name}-{datetime.now().strftime('%Y%m%d_%H%M%S')}" handler_name = f"{os.path.splitext(os.path.basename(source_file))[0]}.lambda_handler" try: diff --git a/sagemaker-train/src/sagemaker/train/common_utils/trainer_wait.py b/sagemaker-train/src/sagemaker/train/common_utils/trainer_wait.py index 0175b2ba7a..eb276678e4 100644 --- a/sagemaker-train/src/sagemaker/train/common_utils/trainer_wait.py +++ b/sagemaker-train/src/sagemaker/train/common_utils/trainer_wait.py @@ -4,7 +4,9 @@ with progress tracking and MLflow integration. """ +import logging import time +from contextlib import contextmanager from typing import Optional, Tuple from sagemaker.core.resources import TrainingJob @@ -13,6 +15,18 @@ from sagemaker.train.common_utils.mlflow_metrics_util import _MLflowMetricsUtil +@contextmanager +def _suppress_info_logging(): + """Context manager to temporarily suppress INFO level logging.""" + logger = logging.getLogger() + original_level = logger.level + logger.setLevel(logging.WARNING) + try: + yield + finally: + logger.setLevel(original_level) + + def _setup_mlflow_integration(training_job: TrainingJob) -> Tuple[ Optional[str], Optional[_MLflowMetricsUtil], Optional[str]]: """Setup MLflow integration for training job monitoring. @@ -172,124 +186,123 @@ def wait( from rich.panel import Panel from rich.text import Text from rich.console import Group + with _suppress_info_logging(): + console = Console(force_jupyter=True) + + iteration = 0 + while True: + iteration += 1 + time.sleep(poll) + training_job.refresh() + clear_output(wait=True) + + status = training_job.training_job_status + secondary_status = training_job.secondary_status + elapsed = time.time() - start_time + + # Header section with training job name and MLFlow URL + header_table = Table(show_header=False, box=None, padding=(0, 1)) + header_table.add_column("Property", style="cyan bold", width=20) + header_table.add_column("Value", style="white") + header_table.add_row("TrainingJob Name", f"[bold green]{training_job.training_job_name}[/bold green]") + if mlflow_url: + header_table.add_row("MLFlow URL", + f"[link={mlflow_url}][bold bright_blue underline]{mlflow_run_name}(link valid for 5 mins)[/bright_blue bold underline][/link]") + + status_table = Table(show_header=False, box=None, padding=(0, 1)) + status_table.add_column("Property", style="cyan bold", width=20) + status_table.add_column("Value", style="white") + + status_table.add_row("Job Status", f"[bold][orange3]{status}[/][/]") + status_table.add_row("Secondary Status", f"[bold yellow]{secondary_status}[/bold yellow]") + status_table.add_row("Elapsed Time", f"[bold bright_red]{elapsed:.1f}s[/bold bright_red]") + + failure_reason = training_job.failure_reason + if failure_reason and not _is_unassigned_attribute(failure_reason): + status_table.add_row("Failure Reason", f"[bright_red]{failure_reason}[/bright_red]") + + # Calculate training progress + training_progress_pct = None + training_progress_text = "" + if secondary_status == "Training" and training_job.progress_info: + if not progress_started: + progress_started = True + time.sleep(poll) + training_job.refresh() + + training_progress_pct, training_progress_text = _calculate_training_progress( + training_job.progress_info, metrics_util, mlflow_run_name, training_job + ) + + # Build transitions table if available + transitions_table = None + if training_job.secondary_status_transitions: + from rich.box import SIMPLE + transitions_table = Table(show_header=True, header_style="bold magenta", box=SIMPLE, padding=(0, 1)) + transitions_table.add_column("", style="green", width=2) + transitions_table.add_column("Step", style="cyan", width=15) + transitions_table.add_column("Details", style="orange3", width=35) + transitions_table.add_column("Duration", style="green", width=12) + + for trans in training_job.secondary_status_transitions: + duration, check = _calculate_transition_duration(trans) + + # Add progress bar for Training step + if trans.status == "Training" and training_progress_pct is not None: + bar = f"[green][{'█' * int(training_progress_pct / 5)}{'░' * (20 - int(training_progress_pct / 5))}][/green] {training_progress_pct:.1f}% {training_progress_text}" + transitions_table.add_row(check, trans.status, bar, duration) + else: + transitions_table.add_row(check, trans.status, trans.status_message or "", duration) + + # Prepare metrics table for terminal states + metrics_table = None + if status in ["Completed", "Failed", "Stopped"]: + try: + steps_per_epoch = training_job.progress_info.total_step_count_per_epoch + loss_metrics_by_epoch = metrics_util._get_loss_metrics_by_epoch(run_name=mlflow_run_name, + steps_per_epoch=steps_per_epoch) + if loss_metrics_by_epoch: + metrics_table = Table(show_header=True, header_style="bold magenta", box=SIMPLE, + padding=(0, 1)) + metrics_table.add_column("Epochs", style="cyan", width=8) + metrics_table.add_column("Loss Metrics", style="white") - console = Console(force_jupyter=True) - - iteration = 0 - while True: - iteration += 1 - time.sleep(poll) - training_job.refresh() - clear_output(wait=False) - - status = training_job.training_job_status - secondary_status = training_job.secondary_status - elapsed = time.time() - start_time - - # Header section with training job name and MLFlow URL - header_table = Table(show_header=False, box=None, padding=(0, 1)) - header_table.add_column("Property", style="cyan bold", width=20) - header_table.add_column("Value", style="white") - header_table.add_row("TrainingJob Name", f"[bold green]{training_job.training_job_name}[/bold green]") - if mlflow_url: - header_table.add_row("MLFlow URL", - f"[link={mlflow_url}][bold bright_blue underline]{mlflow_run_name}(link valid for 5 mins)[/bright_blue bold underline][/link]") - - status_table = Table(show_header=False, box=None, padding=(0, 1)) - status_table.add_column("Property", style="cyan bold", width=20) - status_table.add_column("Value", style="white") - - status_table.add_row("Job Status", f"[bold][orange3]{status}[/][/]") - status_table.add_row("Secondary Status", f"[bold yellow]{secondary_status}[/bold yellow]") - status_table.add_row("Elapsed Time", f"[bold bright_red]{elapsed:.1f}s[/bold bright_red]") - - failure_reason = training_job.failure_reason - if failure_reason and not _is_unassigned_attribute(failure_reason): - status_table.add_row("Failure Reason", f"[bright_red]{failure_reason}[/bright_red]") - - # Calculate training progress - training_progress_pct = None - training_progress_text = "" - if secondary_status == "Training" and training_job.progress_info: - if not progress_started: - progress_started = True - time.sleep(10) - continue - # training_job.refresh() - - training_progress_pct, training_progress_text = _calculate_training_progress( - training_job.progress_info, metrics_util, mlflow_run_name, training_job - ) - - # Build transitions table if available - transitions_table = None - if training_job.secondary_status_transitions: - from rich.box import SIMPLE - transitions_table = Table(show_header=True, header_style="bold magenta", box=SIMPLE, padding=(0, 1)) - transitions_table.add_column("", style="green", width=2) - transitions_table.add_column("Step", style="cyan", width=15) - transitions_table.add_column("Details", style="orange3", width=35) - transitions_table.add_column("Duration", style="green", width=12) - - for trans in training_job.secondary_status_transitions: - duration, check = _calculate_transition_duration(trans) + for epoch, metrics in list(loss_metrics_by_epoch.items())[:-1]: + metrics_str = ", ".join([f"{k}: {v:.6f}" for k, v in metrics.items()]) + metrics_table.add_row(str(epoch + 1), metrics_str, style="yellow") + except Exception: + pass - # Add progress bar for Training step - if trans.status == "Training" and training_progress_pct is not None: - bar = f"[green][{'█' * int(training_progress_pct / 5)}{'░' * (20 - int(training_progress_pct / 5))}][/green] {training_progress_pct:.1f}% {training_progress_text}" - transitions_table.add_row(check, trans.status, bar, duration) + # Build combined group with metrics if available + if training_job.secondary_status_transitions: + if metrics_table: + combined = Group(header_table, Text(""), status_table, Text(""), + Text("Status Transitions", style="bold magenta"), transitions_table, Text(""), + Text("Loss Metrics by Epoch", style="bold magenta"), metrics_table) else: - transitions_table.add_row(check, trans.status, trans.status_message or "", duration) - - # Prepare metrics table for terminal states - metrics_table = None - if status in ["Completed", "Failed", "Stopped"]: - try: - steps_per_epoch = training_job.progress_info.total_step_count_per_epoch - loss_metrics_by_epoch = metrics_util._get_loss_metrics_by_epoch(run_name=mlflow_run_name, - steps_per_epoch=steps_per_epoch) - if loss_metrics_by_epoch: - metrics_table = Table(show_header=True, header_style="bold magenta", box=SIMPLE, - padding=(0, 1)) - metrics_table.add_column("Epochs", style="cyan", width=8) - metrics_table.add_column("Loss Metrics", style="white") - - for epoch, metrics in list(loss_metrics_by_epoch.items())[:-1]: - metrics_str = ", ".join([f"{k}: {v:.6f}" for k, v in metrics.items()]) - metrics_table.add_row(str(epoch + 1), metrics_str, style="yellow") - except Exception: - pass - - # Build combined group with metrics if available - if training_job.secondary_status_transitions: - if metrics_table: - combined = Group(header_table, Text(""), status_table, Text(""), - Text("Status Transitions", style="bold magenta"), transitions_table, Text(""), - Text("Loss Metrics by Epoch", style="bold magenta"), metrics_table) - else: - combined = Group(header_table, Text(""), status_table, Text(""), - Text("Status Transitions", style="bold magenta"), transitions_table) - else: - if metrics_table: - combined = Group(header_table, Text(""), status_table, Text(""), - Text("Loss Metrics by Epoch", style="bold magenta"), metrics_table) + combined = Group(header_table, Text(""), status_table, Text(""), + Text("Status Transitions", style="bold magenta"), transitions_table) else: - combined = Group(header_table, Text(""), status_table) + if metrics_table: + combined = Group(header_table, Text(""), status_table, Text(""), + Text("Loss Metrics by Epoch", style="bold magenta"), metrics_table) + else: + combined = Group(header_table, Text(""), status_table) - panel_width = 80 - if console.width and not _is_unassigned_attribute(console.width): - panel_width = int(console.width * 0.8) - console.print(Panel(combined, title="[bold bright_blue]Training Job Status[/bold bright_blue]", - border_style="orange3", width=panel_width)) + panel_width = 80 + if console.width and not _is_unassigned_attribute(console.width): + panel_width = int(console.width * 0.8) + console.print(Panel(combined, title="[bold bright_blue]Training Job Status[/bold bright_blue]", + border_style="orange3", width=panel_width)) - if status in ["Completed", "Failed", "Stopped"]: - return + if status in ["Completed", "Failed", "Stopped"]: + return - if status == "Failed" or (failure_reason and not _is_unassigned_attribute(failure_reason)): - raise FailedStatusError(resource_type="TrainingJob", status=status, reason=failure_reason) + if status == "Failed" or (failure_reason and not _is_unassigned_attribute(failure_reason)): + raise FailedStatusError(resource_type="TrainingJob", status=status, reason=failure_reason) - if timeout and elapsed >= timeout: - raise TimeoutExceededError(resouce_type="TrainingJob", status=status) + if timeout and elapsed >= timeout: + raise TimeoutExceededError(resouce_type="TrainingJob", status=status) else: print(f"\nTrainingJob Name: {training_job.training_job_name}") diff --git a/sagemaker-train/tests/unit/train/common_utils/test_trainer_wait.py b/sagemaker-train/tests/unit/train/common_utils/test_trainer_wait.py index ff4672ecd0..7bcff8fa0c 100644 --- a/sagemaker-train/tests/unit/train/common_utils/test_trainer_wait.py +++ b/sagemaker-train/tests/unit/train/common_utils/test_trainer_wait.py @@ -99,45 +99,6 @@ def test_mlflow_setup_no_config(self): assert metrics_util is None assert mlflow_run_name is None - -class TestIsJupyterEnvironment: - """Test cases for _is_jupyter_environment function.""" - - @patch('IPython.get_ipython') - def test_jupyter_environment_true(self, mock_get_ipython): - """Test detection of Jupyter environment.""" - mock_ipython = MagicMock() - mock_ipython.config = {'IPKernelApp': {}} - mock_get_ipython.return_value = mock_ipython - - result = _is_jupyter_environment() - assert result is True - - @patch('IPython.get_ipython') - def test_jupyter_environment_false_no_ipkernel(self, mock_get_ipython): - """Test non-Jupyter environment without IPKernelApp.""" - mock_ipython = MagicMock() - mock_ipython.config = {} - mock_get_ipython.return_value = mock_ipython - - result = _is_jupyter_environment() - assert result is False - - @patch('IPython.get_ipython') - def test_jupyter_environment_false_no_ipython(self, mock_get_ipython): - """Test non-Jupyter environment without IPython.""" - mock_get_ipython.return_value = None - - result = _is_jupyter_environment() - assert result is False - - def test_jupyter_environment_import_error(self): - """Test ImportError when IPython is not available.""" - with patch('builtins.__import__', side_effect=ImportError): - result = _is_jupyter_environment() - assert result is False - - class TestIsUnassignedAttribute: """Test cases for _is_unassigned_attribute function."""