Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sagemaker-train/src/sagemaker/ai_registry/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def _create_lambda_function(cls, name: str, source_file: str, role: Optional[str

# Create Lambda function
lambda_client = boto3.client("lambda")
function_name = f"SageMaker-evaluator-{name}"
function_name = f"SageMaker-evaluator-{name}-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
handler_name = f"{os.path.splitext(os.path.basename(source_file))[0]}.lambda_handler"

try:
Expand Down
233 changes: 123 additions & 110 deletions sagemaker-train/src/sagemaker/train/common_utils/trainer_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
with progress tracking and MLflow integration.
"""

import logging
import time
from contextlib import contextmanager
from typing import Optional, Tuple

from sagemaker.core.resources import TrainingJob
Expand All @@ -13,6 +15,18 @@
from sagemaker.train.common_utils.mlflow_metrics_util import _MLflowMetricsUtil


@contextmanager
def _suppress_info_logging():
"""Context manager to temporarily suppress INFO level logging."""
logger = logging.getLogger()
original_level = logger.level
logger.setLevel(logging.WARNING)
try:
yield
finally:
logger.setLevel(original_level)


def _setup_mlflow_integration(training_job: TrainingJob) -> Tuple[
Optional[str], Optional[_MLflowMetricsUtil], Optional[str]]:
"""Setup MLflow integration for training job monitoring.
Expand Down Expand Up @@ -172,124 +186,123 @@ def wait(
from rich.panel import Panel
from rich.text import Text
from rich.console import Group
with _suppress_info_logging():
console = Console(force_jupyter=True)

iteration = 0
while True:
iteration += 1
time.sleep(poll)
training_job.refresh()
clear_output(wait=True)

status = training_job.training_job_status
secondary_status = training_job.secondary_status
elapsed = time.time() - start_time

# Header section with training job name and MLFlow URL
header_table = Table(show_header=False, box=None, padding=(0, 1))
header_table.add_column("Property", style="cyan bold", width=20)
header_table.add_column("Value", style="white")
header_table.add_row("TrainingJob Name", f"[bold green]{training_job.training_job_name}[/bold green]")
if mlflow_url:
header_table.add_row("MLFlow URL",
f"[link={mlflow_url}][bold bright_blue underline]{mlflow_run_name}(link valid for 5 mins)[/bright_blue bold underline][/link]")

status_table = Table(show_header=False, box=None, padding=(0, 1))
status_table.add_column("Property", style="cyan bold", width=20)
status_table.add_column("Value", style="white")

status_table.add_row("Job Status", f"[bold][orange3]{status}[/][/]")
status_table.add_row("Secondary Status", f"[bold yellow]{secondary_status}[/bold yellow]")
status_table.add_row("Elapsed Time", f"[bold bright_red]{elapsed:.1f}s[/bold bright_red]")

failure_reason = training_job.failure_reason
if failure_reason and not _is_unassigned_attribute(failure_reason):
status_table.add_row("Failure Reason", f"[bright_red]{failure_reason}[/bright_red]")

# Calculate training progress
training_progress_pct = None
training_progress_text = ""
if secondary_status == "Training" and training_job.progress_info:
if not progress_started:
progress_started = True
time.sleep(poll)
training_job.refresh()

training_progress_pct, training_progress_text = _calculate_training_progress(
training_job.progress_info, metrics_util, mlflow_run_name, training_job
)

# Build transitions table if available
transitions_table = None
if training_job.secondary_status_transitions:
from rich.box import SIMPLE
transitions_table = Table(show_header=True, header_style="bold magenta", box=SIMPLE, padding=(0, 1))
transitions_table.add_column("", style="green", width=2)
transitions_table.add_column("Step", style="cyan", width=15)
transitions_table.add_column("Details", style="orange3", width=35)
transitions_table.add_column("Duration", style="green", width=12)

for trans in training_job.secondary_status_transitions:
duration, check = _calculate_transition_duration(trans)

# Add progress bar for Training step
if trans.status == "Training" and training_progress_pct is not None:
bar = f"[green][{'█' * int(training_progress_pct / 5)}{'░' * (20 - int(training_progress_pct / 5))}][/green] {training_progress_pct:.1f}% {training_progress_text}"
transitions_table.add_row(check, trans.status, bar, duration)
else:
transitions_table.add_row(check, trans.status, trans.status_message or "", duration)

# Prepare metrics table for terminal states
metrics_table = None
if status in ["Completed", "Failed", "Stopped"]:
try:
steps_per_epoch = training_job.progress_info.total_step_count_per_epoch
loss_metrics_by_epoch = metrics_util._get_loss_metrics_by_epoch(run_name=mlflow_run_name,
steps_per_epoch=steps_per_epoch)
if loss_metrics_by_epoch:
metrics_table = Table(show_header=True, header_style="bold magenta", box=SIMPLE,
padding=(0, 1))
metrics_table.add_column("Epochs", style="cyan", width=8)
metrics_table.add_column("Loss Metrics", style="white")

console = Console(force_jupyter=True)

iteration = 0
while True:
iteration += 1
time.sleep(poll)
training_job.refresh()
clear_output(wait=False)

status = training_job.training_job_status
secondary_status = training_job.secondary_status
elapsed = time.time() - start_time

# Header section with training job name and MLFlow URL
header_table = Table(show_header=False, box=None, padding=(0, 1))
header_table.add_column("Property", style="cyan bold", width=20)
header_table.add_column("Value", style="white")
header_table.add_row("TrainingJob Name", f"[bold green]{training_job.training_job_name}[/bold green]")
if mlflow_url:
header_table.add_row("MLFlow URL",
f"[link={mlflow_url}][bold bright_blue underline]{mlflow_run_name}(link valid for 5 mins)[/bright_blue bold underline][/link]")

status_table = Table(show_header=False, box=None, padding=(0, 1))
status_table.add_column("Property", style="cyan bold", width=20)
status_table.add_column("Value", style="white")

status_table.add_row("Job Status", f"[bold][orange3]{status}[/][/]")
status_table.add_row("Secondary Status", f"[bold yellow]{secondary_status}[/bold yellow]")
status_table.add_row("Elapsed Time", f"[bold bright_red]{elapsed:.1f}s[/bold bright_red]")

failure_reason = training_job.failure_reason
if failure_reason and not _is_unassigned_attribute(failure_reason):
status_table.add_row("Failure Reason", f"[bright_red]{failure_reason}[/bright_red]")

# Calculate training progress
training_progress_pct = None
training_progress_text = ""
if secondary_status == "Training" and training_job.progress_info:
if not progress_started:
progress_started = True
time.sleep(10)
continue
# training_job.refresh()

training_progress_pct, training_progress_text = _calculate_training_progress(
training_job.progress_info, metrics_util, mlflow_run_name, training_job
)

# Build transitions table if available
transitions_table = None
if training_job.secondary_status_transitions:
from rich.box import SIMPLE
transitions_table = Table(show_header=True, header_style="bold magenta", box=SIMPLE, padding=(0, 1))
transitions_table.add_column("", style="green", width=2)
transitions_table.add_column("Step", style="cyan", width=15)
transitions_table.add_column("Details", style="orange3", width=35)
transitions_table.add_column("Duration", style="green", width=12)

for trans in training_job.secondary_status_transitions:
duration, check = _calculate_transition_duration(trans)
for epoch, metrics in list(loss_metrics_by_epoch.items())[:-1]:
metrics_str = ", ".join([f"{k}: {v:.6f}" for k, v in metrics.items()])
metrics_table.add_row(str(epoch + 1), metrics_str, style="yellow")
except Exception:
pass

# Add progress bar for Training step
if trans.status == "Training" and training_progress_pct is not None:
bar = f"[green][{'█' * int(training_progress_pct / 5)}{'░' * (20 - int(training_progress_pct / 5))}][/green] {training_progress_pct:.1f}% {training_progress_text}"
transitions_table.add_row(check, trans.status, bar, duration)
# Build combined group with metrics if available
if training_job.secondary_status_transitions:
if metrics_table:
combined = Group(header_table, Text(""), status_table, Text(""),
Text("Status Transitions", style="bold magenta"), transitions_table, Text(""),
Text("Loss Metrics by Epoch", style="bold magenta"), metrics_table)
else:
transitions_table.add_row(check, trans.status, trans.status_message or "", duration)

# Prepare metrics table for terminal states
metrics_table = None
if status in ["Completed", "Failed", "Stopped"]:
try:
steps_per_epoch = training_job.progress_info.total_step_count_per_epoch
loss_metrics_by_epoch = metrics_util._get_loss_metrics_by_epoch(run_name=mlflow_run_name,
steps_per_epoch=steps_per_epoch)
if loss_metrics_by_epoch:
metrics_table = Table(show_header=True, header_style="bold magenta", box=SIMPLE,
padding=(0, 1))
metrics_table.add_column("Epochs", style="cyan", width=8)
metrics_table.add_column("Loss Metrics", style="white")

for epoch, metrics in list(loss_metrics_by_epoch.items())[:-1]:
metrics_str = ", ".join([f"{k}: {v:.6f}" for k, v in metrics.items()])
metrics_table.add_row(str(epoch + 1), metrics_str, style="yellow")
except Exception:
pass

# Build combined group with metrics if available
if training_job.secondary_status_transitions:
if metrics_table:
combined = Group(header_table, Text(""), status_table, Text(""),
Text("Status Transitions", style="bold magenta"), transitions_table, Text(""),
Text("Loss Metrics by Epoch", style="bold magenta"), metrics_table)
else:
combined = Group(header_table, Text(""), status_table, Text(""),
Text("Status Transitions", style="bold magenta"), transitions_table)
else:
if metrics_table:
combined = Group(header_table, Text(""), status_table, Text(""),
Text("Loss Metrics by Epoch", style="bold magenta"), metrics_table)
combined = Group(header_table, Text(""), status_table, Text(""),
Text("Status Transitions", style="bold magenta"), transitions_table)
else:
combined = Group(header_table, Text(""), status_table)
if metrics_table:
combined = Group(header_table, Text(""), status_table, Text(""),
Text("Loss Metrics by Epoch", style="bold magenta"), metrics_table)
else:
combined = Group(header_table, Text(""), status_table)

panel_width = 80
if console.width and not _is_unassigned_attribute(console.width):
panel_width = int(console.width * 0.8)
console.print(Panel(combined, title="[bold bright_blue]Training Job Status[/bold bright_blue]",
border_style="orange3", width=panel_width))
panel_width = 80
if console.width and not _is_unassigned_attribute(console.width):
panel_width = int(console.width * 0.8)
console.print(Panel(combined, title="[bold bright_blue]Training Job Status[/bold bright_blue]",
border_style="orange3", width=panel_width))

if status in ["Completed", "Failed", "Stopped"]:
return
if status in ["Completed", "Failed", "Stopped"]:
return

if status == "Failed" or (failure_reason and not _is_unassigned_attribute(failure_reason)):
raise FailedStatusError(resource_type="TrainingJob", status=status, reason=failure_reason)
if status == "Failed" or (failure_reason and not _is_unassigned_attribute(failure_reason)):
raise FailedStatusError(resource_type="TrainingJob", status=status, reason=failure_reason)

if timeout and elapsed >= timeout:
raise TimeoutExceededError(resouce_type="TrainingJob", status=status)
if timeout and elapsed >= timeout:
raise TimeoutExceededError(resouce_type="TrainingJob", status=status)

else:
print(f"\nTrainingJob Name: {training_job.training_job_name}")
Expand Down
39 changes: 0 additions & 39 deletions sagemaker-train/tests/unit/train/common_utils/test_trainer_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,45 +99,6 @@ def test_mlflow_setup_no_config(self):
assert metrics_util is None
assert mlflow_run_name is None


class TestIsJupyterEnvironment:
"""Test cases for _is_jupyter_environment function."""

@patch('IPython.get_ipython')
def test_jupyter_environment_true(self, mock_get_ipython):
"""Test detection of Jupyter environment."""
mock_ipython = MagicMock()
mock_ipython.config = {'IPKernelApp': {}}
mock_get_ipython.return_value = mock_ipython

result = _is_jupyter_environment()
assert result is True

@patch('IPython.get_ipython')
def test_jupyter_environment_false_no_ipkernel(self, mock_get_ipython):
"""Test non-Jupyter environment without IPKernelApp."""
mock_ipython = MagicMock()
mock_ipython.config = {}
mock_get_ipython.return_value = mock_ipython

result = _is_jupyter_environment()
assert result is False

@patch('IPython.get_ipython')
def test_jupyter_environment_false_no_ipython(self, mock_get_ipython):
"""Test non-Jupyter environment without IPython."""
mock_get_ipython.return_value = None

result = _is_jupyter_environment()
assert result is False

def test_jupyter_environment_import_error(self):
"""Test ImportError when IPython is not available."""
with patch('builtins.__import__', side_effect=ImportError):
result = _is_jupyter_environment()
assert result is False


class TestIsUnassignedAttribute:
"""Test cases for _is_unassigned_attribute function."""

Expand Down
Loading