From 76b87ff53f98e63d288fe86505f1f2efc004e9e2 Mon Sep 17 00:00:00 2001 From: aviruthen <91846056+aviruthen@users.noreply.github.com> Date: Mon, 24 Nov 2025 18:12:12 -0800 Subject: [PATCH] Add training examples --- .../training-examples/aii3xf60/__init__.py | 14 + .../aii3xf60/common/__init__.py | 14 + .../aii3xf60/common/utils.py | 205 ++++++++ .../aii3xf60/distributed.json | 1 + .../aii3xf60/distributed_drivers/__init__.py | 14 + .../basic_script_driver.py | 81 +++ .../distributed_drivers/mpi_driver.py | 105 ++++ .../aii3xf60/distributed_drivers/mpi_utils.py | 302 +++++++++++ .../distributed_drivers/torchrun_driver.py | 129 +++++ .../aii3xf60/scripts/__init__.py | 14 + .../aii3xf60/scripts/environment.py | 305 ++++++++++++ .../training-examples/aii3xf60/sm_train.sh | 59 +++ .../aii3xf60/sourcecode.json | 1 + .../custom-distributed-training-example.ipynb | 376 ++++++++++++++ .../distributed-local-training-example.ipynb | 467 ++++++++++++++++++ .../training-examples/docker-compose.yaml | 54 ++ .../hyperparameter-training-example.ipynb | 372 ++++++++++++++ .../jumpstart-training-example.ipynb | 329 ++++++++++++ .../local-training-example.ipynb | 454 +++++++++++++++++ 19 files changed, 3296 insertions(+) create mode 100644 v3-examples/training-examples/aii3xf60/__init__.py create mode 100644 v3-examples/training-examples/aii3xf60/common/__init__.py create mode 100644 v3-examples/training-examples/aii3xf60/common/utils.py create mode 100644 v3-examples/training-examples/aii3xf60/distributed.json create mode 100644 v3-examples/training-examples/aii3xf60/distributed_drivers/__init__.py create mode 100644 v3-examples/training-examples/aii3xf60/distributed_drivers/basic_script_driver.py create mode 100644 v3-examples/training-examples/aii3xf60/distributed_drivers/mpi_driver.py create mode 100644 v3-examples/training-examples/aii3xf60/distributed_drivers/mpi_utils.py create mode 100644 v3-examples/training-examples/aii3xf60/distributed_drivers/torchrun_driver.py create mode 100644 v3-examples/training-examples/aii3xf60/scripts/__init__.py create mode 100644 v3-examples/training-examples/aii3xf60/scripts/environment.py create mode 100644 v3-examples/training-examples/aii3xf60/sm_train.sh create mode 100644 v3-examples/training-examples/aii3xf60/sourcecode.json create mode 100644 v3-examples/training-examples/custom-distributed-training-example.ipynb create mode 100644 v3-examples/training-examples/distributed-local-training-example.ipynb create mode 100644 v3-examples/training-examples/docker-compose.yaml create mode 100644 v3-examples/training-examples/hyperparameter-training-example.ipynb create mode 100644 v3-examples/training-examples/jumpstart-training-example.ipynb create mode 100644 v3-examples/training-examples/local-training-example.ipynb diff --git a/v3-examples/training-examples/aii3xf60/__init__.py b/v3-examples/training-examples/aii3xf60/__init__.py new file mode 100644 index 0000000000..864f3663b8 --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/__init__.py @@ -0,0 +1,14 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Sagemaker modules container drivers directory.""" +from __future__ import absolute_import diff --git a/v3-examples/training-examples/aii3xf60/common/__init__.py b/v3-examples/training-examples/aii3xf60/common/__init__.py new file mode 100644 index 0000000000..aab88c6b97 --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/common/__init__.py @@ -0,0 +1,14 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Sagemaker modules container drivers - common directory.""" +from __future__ import absolute_import diff --git a/v3-examples/training-examples/aii3xf60/common/utils.py b/v3-examples/training-examples/aii3xf60/common/utils.py new file mode 100644 index 0000000000..03146a3bbe --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/common/utils.py @@ -0,0 +1,205 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""This module provides utility functions for the container drivers.""" +from __future__ import absolute_import + +import os +import logging +import sys +import subprocess +import traceback +import json + +from typing import List, Dict, Any, Tuple, IO, Optional + +# Initialize logger +SM_LOG_LEVEL = os.environ.get("SM_LOG_LEVEL", 20) +logger = logging.getLogger(__name__) +console_handler = logging.StreamHandler(sys.stdout) +logger.addHandler(console_handler) +logger.setLevel(int(SM_LOG_LEVEL)) + +FAILURE_FILE = "/opt/ml/output/failure" +DEFAULT_FAILURE_MESSAGE = """ +Training Execution failed. +For more details, see CloudWatch logs at 'aws/sagemaker/TrainingJobs'. +TrainingJob - {training_job_name} +""" + +USER_CODE_PATH = "/opt/ml/input/data/code" +SOURCE_CODE_JSON = "/opt/ml/input/data/sm_drivers/sourcecode.json" +DISTRIBUTED_JSON = "/opt/ml/input/data/sm_drivers/distributed.json" + +HYPERPARAMETERS_JSON = "/opt/ml/input/config/hyperparameters.json" + +SM_EFA_NCCL_INSTANCES = [ + "ml.g4dn.8xlarge", + "ml.g4dn.12xlarge", + "ml.g5.48xlarge", + "ml.p3dn.24xlarge", + "ml.p4d.24xlarge", + "ml.p4de.24xlarge", + "ml.p5.48xlarge", + "ml.trn1.32xlarge", +] + +SM_EFA_RDMA_INSTANCES = [ + "ml.p4d.24xlarge", + "ml.p4de.24xlarge", + "ml.trn1.32xlarge", +] + + +def write_failure_file(message: Optional[str] = None): + """Write a failure file with the message.""" + if message is None: + message = DEFAULT_FAILURE_MESSAGE.format(training_job_name=os.environ["TRAINING_JOB_NAME"]) + if not os.path.exists(FAILURE_FILE): + with open(FAILURE_FILE, "w") as f: + f.write(message) + + +def read_source_code_json(source_code_json: Dict[str, Any] = SOURCE_CODE_JSON): + """Read the source code config json file.""" + try: + with open(source_code_json, "r") as f: + source_code_dict = json.load(f) or {} + except FileNotFoundError: + source_code_dict = {} + return source_code_dict + + +def read_distributed_json(distributed_json: Dict[str, Any] = DISTRIBUTED_JSON): + """Read the distribution config json file.""" + try: + with open(distributed_json, "r") as f: + distributed_dict = json.load(f) or {} + except FileNotFoundError: + distributed_dict = {} + return distributed_dict + + +def read_hyperparameters_json(hyperparameters_json: Dict[str, Any] = HYPERPARAMETERS_JSON): + """Read the hyperparameters config json file.""" + try: + with open(hyperparameters_json, "r") as f: + hyperparameters_dict = json.load(f) or {} + except FileNotFoundError: + hyperparameters_dict = {} + return hyperparameters_dict + + +def get_process_count(process_count: Optional[int] = None) -> int: + """Get the number of processes to run on each node in the training job.""" + return ( + process_count + or int(os.environ.get("SM_NUM_GPUS", 0)) + or int(os.environ.get("SM_NUM_NEURONS", 0)) + or 1 + ) + + +def hyperparameters_to_cli_args(hyperparameters: Dict[str, Any]) -> List[str]: + """Convert the hyperparameters to CLI arguments.""" + cli_args = [] + for key, value in hyperparameters.items(): + value = safe_deserialize(value) + cli_args.extend([f"--{key}", safe_serialize(value)]) + + return cli_args + + +def safe_deserialize(data: Any) -> Any: + """Safely deserialize data from a JSON string. + + This function handles the following cases: + 1. If `data` is not a string, it returns the input as-is. + 2. If `data` is a JSON-encoded string, it attempts to deserialize it using `json.loads()`. + 3. If `data` is a string but cannot be decoded as JSON, it returns the original string. + + Returns: + Any: The deserialized data, or the original input if it cannot be JSON-decoded. + """ + if not isinstance(data, str): + return data + + try: + return json.loads(data) + except json.JSONDecodeError: + return data + + +def safe_serialize(data): + """Serialize the data without wrapping strings in quotes. + + This function handles the following cases: + 1. If `data` is a string, it returns the string as-is without wrapping in quotes. + 2. If `data` is serializable (e.g., a dictionary, list, int, float), it returns + the JSON-encoded string using `json.dumps()`. + 3. If `data` cannot be serialized (e.g., a custom object), it returns the string + representation of the data using `str(data)`. + + Args: + data (Any): The data to serialize. + + Returns: + str: The serialized JSON-compatible string or the string representation of the input. + """ + if isinstance(data, str): + return data + try: + return json.dumps(data) + except TypeError: + return str(data) + + +def get_python_executable() -> str: + """Get the python executable path.""" + return sys.executable + + +def log_subprocess_output(pipe: IO[bytes]): + """Log the output from the subprocess.""" + for line in iter(pipe.readline, b""): + logger.info(line.decode("utf-8").strip()) + + +def execute_commands(commands: List[str]) -> Tuple[int, str]: + """Execute the provided commands and return exit code with failure traceback if any.""" + try: + process = subprocess.Popen( + commands, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + with process.stdout: + log_subprocess_output(process.stdout) + exitcode = process.wait() + if exitcode != 0: + raise subprocess.CalledProcessError(exitcode, commands) + return exitcode, "" + except subprocess.CalledProcessError as e: + # Capture the traceback in case of failure + error_traceback = traceback.format_exc() + print(f"Command failed with exit code {e.returncode}. Traceback: {error_traceback}") + return e.returncode, error_traceback + + +def is_worker_node() -> bool: + """Check if the current node is a worker node.""" + return os.environ.get("SM_CURRENT_HOST") != os.environ.get("SM_MASTER_ADDR") + + +def is_master_node() -> bool: + """Check if the current node is the master node.""" + return os.environ.get("SM_CURRENT_HOST") == os.environ.get("SM_MASTER_ADDR") diff --git a/v3-examples/training-examples/aii3xf60/distributed.json b/v3-examples/training-examples/aii3xf60/distributed.json new file mode 100644 index 0000000000..9e26dfeeb6 --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/distributed.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/v3-examples/training-examples/aii3xf60/distributed_drivers/__init__.py b/v3-examples/training-examples/aii3xf60/distributed_drivers/__init__.py new file mode 100644 index 0000000000..a44e7e81a9 --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/distributed_drivers/__init__.py @@ -0,0 +1,14 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Sagemaker modules container drivers - drivers directory.""" +from __future__ import absolute_import diff --git a/v3-examples/training-examples/aii3xf60/distributed_drivers/basic_script_driver.py b/v3-examples/training-examples/aii3xf60/distributed_drivers/basic_script_driver.py new file mode 100644 index 0000000000..a298da80a2 --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/distributed_drivers/basic_script_driver.py @@ -0,0 +1,81 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""This module is the entry point for the Basic Script Driver.""" +from __future__ import absolute_import + +import os +import sys +import json +import shlex + +from pathlib import Path +from typing import List + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from common.utils import ( # noqa: E402 # pylint: disable=C0413,E0611 + logger, + get_python_executable, + write_failure_file, + hyperparameters_to_cli_args, + execute_commands, +) + + +def create_commands() -> List[str]: + """Create the commands to execute.""" + entry_script = os.environ["SM_ENTRY_SCRIPT"] + hyperparameters = json.loads(os.environ["SM_HPS"]) + python_executable = get_python_executable() + + args = hyperparameters_to_cli_args(hyperparameters) + if entry_script.endswith(".py"): + commands = [python_executable, entry_script] + commands += args + elif entry_script.endswith(".sh"): + args_str = " ".join(shlex.quote(arg) for arg in args) + commands = [ + "/bin/sh", + "-c", + f"chmod +x {entry_script} && ./{entry_script} {args_str}", + ] + else: + raise ValueError( + f"Unsupported entry script type: {entry_script}. Only .py and .sh are supported." + ) + return commands + + +def main(): + """Main function for the Basic Script Driver. + + This function is the entry point for the Basic Script Driver. + + Execution Lifecycle: + 1. Read the source code and hyperparameters JSON files. + 2. Set hyperparameters as command line arguments. + 3. Create the commands to execute. + 4. Execute the commands. + """ + + cmd = create_commands() + + logger.info(f"Executing command: {' '.join(cmd)}") + exit_code, traceback = execute_commands(cmd) + if exit_code != 0: + write_failure_file(traceback) + sys.exit(exit_code) + + +if __name__ == "__main__": + main() diff --git a/v3-examples/training-examples/aii3xf60/distributed_drivers/mpi_driver.py b/v3-examples/training-examples/aii3xf60/distributed_drivers/mpi_driver.py new file mode 100644 index 0000000000..8ffe1f4318 --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/distributed_drivers/mpi_driver.py @@ -0,0 +1,105 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""This module is the entry point for the MPI driver script.""" +from __future__ import absolute_import + +import os +import sys +import json + +from sagemaker.train.container_drivers.distributed_drivers.mpi_utils import ( + start_sshd_daemon, + bootstrap_master_node, + bootstrap_worker_node, + get_mpirun_command, + write_status_file_to_workers, + write_env_vars_to_file, +) + + +from sagemaker.train.container_drivers.common.utils import ( + logger, + hyperparameters_to_cli_args, + get_process_count, + execute_commands, + write_failure_file, +) + + +def main(): + """Main function for the MPI driver script. + + The MPI Dirver is responsible for setting up the MPI environment, + generating the correct mpi commands, and launching the MPI job. + + Execution Lifecycle: + 1. Setup General Environment Variables at /etc/environment + 2. Start SSHD Daemon + 3. Bootstrap Worker Nodes + a. Wait to establish connection with Master Node + b. Wait for Master Node to write status file + 4. Bootstrap Master Node + a. Wait to establish connection with Worker Nodes + b. Generate MPI Command + c. Execute MPI Command with user script provided in `entry_script` + d. Write status file to Worker Nodes + 5. Exit + + """ + entry_script = os.environ["SM_ENTRY_SCRIPT"] + distributed_config = json.loads(os.environ["SM_DISTRIBUTED_CONFIG"]) + hyperparameters = json.loads(os.environ["SM_HPS"]) + + sm_current_host = os.environ["SM_CURRENT_HOST"] + sm_hosts = json.loads(os.environ["SM_HOSTS"]) + sm_master_addr = os.environ["SM_MASTER_ADDR"] + + write_env_vars_to_file() + start_sshd_daemon() + + if sm_current_host != sm_master_addr: + bootstrap_worker_node(sm_master_addr) + else: + worker_hosts = [host for host in sm_hosts if host != sm_master_addr] + bootstrap_master_node(worker_hosts) + + host_list = json.loads(os.environ["SM_HOSTS"]) + host_count = int(os.environ["SM_HOST_COUNT"]) + process_count = int(distributed_config["process_count_per_node"] or 0) + process_count = get_process_count(process_count) + + if process_count > 1: + host_list = ["{}:{}".format(host, process_count) for host in host_list] + + mpi_command = get_mpirun_command( + host_count=host_count, + host_list=host_list, + num_processes=process_count, + additional_options=distributed_config["mpi_additional_options"] or [], + entry_script_path=entry_script, + ) + + args = hyperparameters_to_cli_args(hyperparameters) + mpi_command += args + + logger.info(f"Executing command: {' '.join(mpi_command)}") + exit_code, error_traceback = execute_commands(mpi_command) + write_status_file_to_workers(worker_hosts) + + if exit_code != 0: + write_failure_file(error_traceback) + sys.exit(exit_code) + + +if __name__ == "__main__": + main() diff --git a/v3-examples/training-examples/aii3xf60/distributed_drivers/mpi_utils.py b/v3-examples/training-examples/aii3xf60/distributed_drivers/mpi_utils.py new file mode 100644 index 0000000000..ec9e1fcef9 --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/distributed_drivers/mpi_utils.py @@ -0,0 +1,302 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""This module provides mpi related utility functions for the container drivers.""" +from __future__ import absolute_import + +import os +import sys +import subprocess +import time + +from pathlib import Path +from typing import List + +import paramiko + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from common.utils import ( # noqa: E402 # pylint: disable=C0413,E0611 + SM_EFA_NCCL_INSTANCES, + SM_EFA_RDMA_INSTANCES, + get_python_executable, + logger, +) + +FINISHED_STATUS_FILE = "/tmp/done.algo-1" +READY_FILE = "/tmp/ready.%s" +DEFAULT_SSH_PORT = 22 + + +def _write_file_to_host(host: str, status_file: str) -> bool: + """Write the a file to the provided host.""" + try: + logger.info(f"Writing {status_file} to {host}") + subprocess.run( + ["ssh", host, "touch", f"{status_file}"], + capture_output=True, + text=True, + check=True, + ) + logger.info("Finished writing status file") + return True + except subprocess.CalledProcessError: + logger.info(f"Cannot connect to {host}") + return False + + +def write_status_file_to_workers(worker_hosts: List[str], status_file: str = FINISHED_STATUS_FILE): + """Write the status file to all worker nodes.""" + for worker in worker_hosts: + retry = 0 + while not _write_file_to_host(worker, status_file): + time.sleep(5) + retry += 1 + if retry > 5: + raise TimeoutError(f"Timed out waiting for {worker} to be reachable.") + logger.info(f"Retrying to write status file to {worker}") + + +def _wait_for_status_file(status_file: str): + """Wait for the status file to be created.""" + logger.info(f"Waiting for status file {status_file}") + while not os.path.exists(status_file): + time.sleep(30) + logger.info(f"Found status file {status_file}") + + +def start_sshd_daemon(): + """Start the SSH daemon on the current node.""" + sshd_executable = "/usr/sbin/sshd" + + if not os.path.exists(sshd_executable): + raise RuntimeError("SSH daemon not found.") + + # Start the sshd in daemon mode (-D) + subprocess.Popen([sshd_executable, "-D"]) + logger.info("Started SSH daemon.") + + +class CustomHostKeyPolicy(paramiko.client.MissingHostKeyPolicy): + """Class to handle host key policy for SageMaker distributed training SSH connections. + + Example: + >>> client = paramiko.SSHClient() + >>> client.set_missing_host_key_policy(CustomHostKeyPolicy()) + >>> # Will succeed for SageMaker algorithm containers + >>> client.connect('algo-1234.internal') + >>> # Will raise SSHException for other unknown hosts + >>> client.connect('unknown-host') # raises SSHException + """ + + def missing_host_key(self, client, hostname, key): + """Accept host keys for algo-* hostnames, reject others. + + Args: + client: The SSHClient instance + hostname: The hostname attempting to connect + key: The host key + + Raises: + paramiko.SSHException: If hostname doesn't match algo-* pattern + """ + if hostname.startswith("algo-"): + client.get_host_keys().add(hostname, key.get_name(), key) + return + raise paramiko.SSHException(f"Unknown host key for {hostname}") + + +def _can_connect(host: str, port: int = DEFAULT_SSH_PORT) -> bool: + """Check if the connection to the provided host and port is possible.""" + try: + logger.debug("Testing connection to host %s", host) + with paramiko.SSHClient() as client: + client.load_system_host_keys() + client.set_missing_host_key_policy(CustomHostKeyPolicy()) + client.connect(host, port=port) + logger.info("Can connect to host %s", host) + return True + except Exception as e: # pylint: disable=W0703 + logger.info("Cannot connect to host %s", host) + logger.debug(f"Connection failed with exception: {e}") + return False + + +def _wait_for_workers(worker_hosts: List[str], port: int = DEFAULT_SSH_PORT, timeout: int = 300): + """Master node waits until it can connect to all worker nodes.""" + start_time = time.time() + if not worker_hosts: + logger.info("No worker nodes to connect to.") + return + + while True: + logger.info("Master is attempting to connect to all workers...") + all_workers_connected = all( + _can_connect(worker, port) and os.path.exists(READY_FILE % worker) + for worker in worker_hosts + ) + + if all_workers_connected: + logger.info("Master can connect to all worker nodes.") + break + if time.time() - start_time > timeout: + raise TimeoutError("Timed out waiting for workers to be reachable.") + + time.sleep(5) # Wait for 5 seconds before trying again + + +def _wait_for_master(master_host: str, port: int = DEFAULT_SSH_PORT, timeout: int = 300): + """Worker nodes wait until they can connect to the master node.""" + start_time = time.time() + while True: + logger.info(f"Worker is attempting to connect to the master node {master_host}...") + if _can_connect(master_host, port): + logger.info(f"Worker can connect to master node {master_host}.") + break + if time.time() - start_time > timeout: + raise TimeoutError(f"Timed out waiting for master {master_host} to be reachable.") + + time.sleep(5) # Wait for 5 seconds before trying again + + +def bootstrap_worker_node(master_host: str, status_file: str = FINISHED_STATUS_FILE): + """Bootstrap the worker nodes.""" + logger.info("Bootstrapping worker node...") + _wait_for_master(master_host) + _write_file_to_host(master_host, READY_FILE % os.environ["SM_CURRENT_HOST"]) + _wait_for_status_file(status_file) + + +def bootstrap_master_node(worker_hosts: List[str]): + """Bootstrap the master node.""" + logger.info("Bootstrapping master node...") + _wait_for_workers(worker_hosts) + + +def validate_smddprun() -> bool: + """Whether smddprun is installed. + + Returns: + bool: True if installed + """ + try: + output = subprocess.run( + ["which", "smddprun"], + capture_output=True, + text=True, + check=True, + ) + return output.stdout != "" + except subprocess.CalledProcessError: + return False + + +def validate_smddpmprun() -> bool: + """Whether smddpmprun is installed. + + Returns: + bool: True if both are installed + """ + try: + output = subprocess.run( + ["which", "smddpmprun"], + capture_output=True, + text=True, + check=True, + ) + return output.stdout != "" + except subprocess.CalledProcessError: + return False + + +def write_env_vars_to_file(): + """Write environment variables to /etc/environment file.""" + with open("/etc/environment", "a", encoding="utf-8") as f: + for name in os.environ: + f.write(f"{name}={os.environ.get(name)}\n") + + +def get_mpirun_command( + host_count: int, + host_list: List[str], + num_processes: int, + additional_options: List[str], + entry_script_path: str, +): + """Fetch mpi command""" + network_interface_name = os.environ.get("SM_NETWORK_INTERFACE_NAME", "eth0") + + mpirun_command = [ + "mpirun", + "--host", + ",".join(host_list), + "-np", + str(num_processes), + "--allow-run-as-root", + "--tag-output", + "-mca", + "btl_tcp_if_include", + network_interface_name, + "-mca", + "oob_tcp_if_include", + network_interface_name, + "-mca", + "plm_rsh_no_tree_spawn", + "1", + "-mca", + "pml", + "ob1", + "-mca", + "btl", + "^openib", + "-mca", + "orte_abort_on_non_zero_status", + "1", + "-mca", + "btl_vader_single_copy_mechanism", + "none", + "-mca", + "plm_rsh_num_concurrent", + str(host_count), + "-x", + "NCCL_SOCKET_IFNAME=%s" % network_interface_name, + "-x", + "LD_LIBRARY_PATH", + "-x", + "PATH", + ] + + if additional_options: + mpirun_command.extend(additional_options) + + instance_type = os.environ["SM_CURRENT_INSTANCE_TYPE"] + # EFA settings + if instance_type in SM_EFA_NCCL_INSTANCES: + mpirun_command.extend(["-x", "FI_PROVIDER=efa"]) + # Use simple protocol to handle the out-of-order data delivery from EFA + mpirun_command.extend(["-x", "NCCL_PROTO=simple"]) + + if instance_type in SM_EFA_RDMA_INSTANCES: + # Use EFA's RDMA functionality for one-sided and two-sided transfer + mpirun_command.extend(["-x", "FI_EFA_USE_DEVICE_RDMA=1"]) + + for credential in [ + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "AWS_SESSION_TOKEN", + ]: + if credential in os.environ: + mpirun_command.extend(["-x", credential]) + + mpirun_command.extend([get_python_executable()]) + mpirun_command.extend(["-m", "mpi4py", entry_script_path]) + return mpirun_command diff --git a/v3-examples/training-examples/aii3xf60/distributed_drivers/torchrun_driver.py b/v3-examples/training-examples/aii3xf60/distributed_drivers/torchrun_driver.py new file mode 100644 index 0000000000..7fcfabe05d --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/distributed_drivers/torchrun_driver.py @@ -0,0 +1,129 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""This module is the entry point for the Torchrun driver script.""" +from __future__ import absolute_import + +import os +import sys +import json + +from pathlib import Path +from typing import List, Tuple + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from common.utils import ( # noqa: E402 # pylint: disable=C0413,E0611 + logger, + hyperparameters_to_cli_args, + get_process_count, + get_python_executable, + execute_commands, + write_failure_file, + SM_EFA_NCCL_INSTANCES, + SM_EFA_RDMA_INSTANCES, +) + + +def pytorch_version() -> Tuple[int, int]: + """Get the PyTorch version as a tuple of integers.""" + import torch + + return tuple(map(int, torch.__version__.split(".")[:2])) + + +def get_base_pytorch_command() -> List[str]: + """Get the base Torch Distributed launcher to execute""" + if pytorch_version() >= (1, 9): + return ["torchrun"] + return [f"{get_python_executable()}", "-m", "torch.distributed.launch"] + + +def setup_env(): + """Setup the environment variables for PyTorch distributed training""" + instance_type = os.environ["SM_CURRENT_INSTANCE_TYPE"] + network_interface_name = os.environ.get("SM_NETWORK_INTERFACE_NAME", "eth0") + if instance_type in SM_EFA_NCCL_INSTANCES: + # Enable EFA use + os.environ["FI_PROVIDER"] = "efa" + if instance_type in SM_EFA_RDMA_INSTANCES: + # Use EFA's RDMA functionality for one-sided and two-sided transfer + os.environ["FI_EFA_USE_DEVICE_RDMA"] = "1" + os.environ["RDMAV_FORK_SAFE"] = "1" + os.environ["NCCL_SOCKET_IFNAME"] = str(network_interface_name) + os.environ["NCCL_PROTO"] = "simple" + + +def create_commands(): + """Create the Torch Distributed command to execute""" + entry_script = os.environ["SM_ENTRY_SCRIPT"] + distributed_config = json.loads(os.environ["SM_DISTRIBUTED_CONFIG"]) + hyperparameters = json.loads(os.environ["SM_HPS"]) + + process_count = int(distributed_config["process_count_per_node"] or 0) + process_count = get_process_count(process_count) + host_count = int(os.environ["SM_HOST_COUNT"]) + + torch_cmd = [] + if os.environ.get("RUN_NEURON_PARALLEL_COMPILE") == "1": + torch_cmd.append("neuron_parallel_compile") + + torch_cmd.extend(get_base_pytorch_command()) + torch_cmd.extend( + [ + f"--nnodes={host_count}", + f"--nproc_per_node={process_count}", + ] + ) + + # If more than one node is used, add node rank information + if int(host_count) > 1: + torch_cmd.extend( + [ + f"--master_addr={os.environ['SM_MASTER_ADDR']}", + f"--master_port={os.environ['SM_MASTER_PORT']}", + f"--node_rank={os.environ['SM_CURRENT_HOST_RANK']}", + ] + ) + + torch_cmd.extend([entry_script]) + + args = hyperparameters_to_cli_args(hyperparameters) + torch_cmd += args + + return torch_cmd + + +def main(): + """Main function to execute the PyTorch distributed training script. + + This function sets some environment variables and executes the PyTorch + distributed training script. + + Execution Lifecycle: + 1. Setup Environment Variables for PyTorch Distributed Training + 2. Create Torch Distributed Command + 3. Execute Torch Distributed Command with user script provided in `entry_script` + 4. Exit + + """ + setup_env() + torch_cmd = create_commands() + logger.info(f"Executing command: {' '.join(torch_cmd)}") + exit_code, traceback = execute_commands(torch_cmd) + if exit_code != 0: + write_failure_file(traceback) + sys.exit(exit_code) + + +if __name__ == "__main__": + main() diff --git a/v3-examples/training-examples/aii3xf60/scripts/__init__.py b/v3-examples/training-examples/aii3xf60/scripts/__init__.py new file mode 100644 index 0000000000..f04c5b17a0 --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/scripts/__init__.py @@ -0,0 +1,14 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Sagemaker modules container drivers - scripts directory.""" +from __future__ import absolute_import diff --git a/v3-examples/training-examples/aii3xf60/scripts/environment.py b/v3-examples/training-examples/aii3xf60/scripts/environment.py new file mode 100644 index 0000000000..897b1f8af4 --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/scripts/environment.py @@ -0,0 +1,305 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""This module is used to define the environment variables for the training job container.""" +from __future__ import absolute_import + +from typing import Dict, Any +import multiprocessing +import subprocess +import json +import os +import sys +from pathlib import Path +import logging + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from common.utils import ( # noqa: E402 # pylint: disable=C0413,E0611 + safe_serialize, + safe_deserialize, + read_distributed_json, + read_source_code_json, +) + +# Initialize logger +SM_LOG_LEVEL = os.environ.get("SM_LOG_LEVEL", 20) +logger = logging.getLogger(__name__) +console_handler = logging.StreamHandler(sys.stdout) +logger.addHandler(console_handler) +logger.setLevel(int(SM_LOG_LEVEL)) + +SM_MODEL_DIR = "/opt/ml/model" + +SM_INPUT_DIR = "/opt/ml/input" +SM_INPUT_DATA_DIR = "/opt/ml/input/data" +SM_INPUT_CONFIG_DIR = "/opt/ml/input/config" + +SM_OUTPUT_DIR = "/opt/ml/output" +SM_OUTPUT_FAILURE = "/opt/ml/output/failure" +SM_OUTPUT_DATA_DIR = "/opt/ml/output/data" +SM_SOURCE_DIR_PATH = "/opt/ml/input/data/code" +SM_DISTRIBUTED_DRIVER_DIR_PATH = "/opt/ml/input/data/sm_drivers/distributed_drivers" + +SM_MASTER_ADDR = "algo-1" +SM_MASTER_PORT = 7777 + +RESOURCE_CONFIG = f"{SM_INPUT_CONFIG_DIR}/resourceconfig.json" +INPUT_DATA_CONFIG = f"{SM_INPUT_CONFIG_DIR}/inputdataconfig.json" +HYPERPARAMETERS_CONFIG = f"{SM_INPUT_CONFIG_DIR}/hyperparameters.json" + +ENV_OUTPUT_FILE = "/opt/ml/input/sm_training.env" + +SENSITIVE_KEYWORDS = ["SECRET", "PASSWORD", "KEY", "TOKEN", "PRIVATE", "CREDS", "CREDENTIALS"] +HIDDEN_VALUE = "******" + + +def num_cpus() -> int: + """Return the number of CPUs available in the current container. + + Returns: + int: Number of CPUs available in the current container. + """ + return multiprocessing.cpu_count() + + +def num_gpus() -> int: + """Return the number of GPUs available in the current container. + + Returns: + int: Number of GPUs available in the current container. + """ + try: + cmd = ["nvidia-smi", "--list-gpus"] + output = subprocess.check_output(cmd).decode("utf-8") + return sum(1 for line in output.splitlines() if line.startswith("GPU ")) + except (OSError, subprocess.CalledProcessError): + logger.info("No GPUs detected (normal if no gpus installed)") + return 0 + + +def num_neurons() -> int: + """Return the number of neuron cores available in the current container. + + Returns: + int: Number of Neuron Cores available in the current container. + """ + try: + cmd = ["neuron-ls", "-j"] + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8") + j = json.loads(output) + neuron_cores = 0 + for item in j: + neuron_cores += item.get("nc_count", 0) + logger.info("Found %s neurons on this instance", neuron_cores) + return neuron_cores + except OSError: + logger.info("No Neurons detected (normal if no neurons installed)") + return 0 + except subprocess.CalledProcessError as e: + if e.output is not None: + try: + msg = e.output.decode("utf-8").partition("error=")[2] + logger.info( + "No Neurons detected (normal if no neurons installed). \ + If neuron installed then %s", + msg, + ) + except AttributeError: + logger.info("No Neurons detected (normal if no neurons installed)") + else: + logger.info("No Neurons detected (normal if no neurons installed)") + + return 0 + + +def deserialize_hyperparameters(hyperparameters: Dict[str, str]) -> Dict[str, Any]: + """Deserialize hyperparameters from string to their original types. + + Args: + hyperparameters (Dict[str, str]): Hyperparameters as strings. + + Returns: + Dict[str, Any]: Hyperparameters as their original types. + """ + deserialized_hyperparameters = {} + for key, value in hyperparameters.items(): + deserialized_hyperparameters[key] = safe_deserialize(value) + return deserialized_hyperparameters + + +def set_env( + resource_config: Dict[str, Any], + input_data_config: Dict[str, Any], + hyperparameters_config: Dict[str, Any], + output_file: str = ENV_OUTPUT_FILE, +): + """Set environment variables for the training job container. + + Args: + resource_config (Dict[str, Any]): Resource configuration for the training job. + input_data_config (Dict[str, Any]): Input data configuration for the training job. + hyperparameters_config (Dict[str, Any]): Hyperparameters configuration for the training job. + output_file (str): Output file to write the environment variables. + """ + # Constants + env_vars = { + "SM_MODEL_DIR": SM_MODEL_DIR, + "SM_INPUT_DIR": SM_INPUT_DIR, + "SM_INPUT_DATA_DIR": SM_INPUT_DATA_DIR, + "SM_INPUT_CONFIG_DIR": SM_INPUT_CONFIG_DIR, + "SM_OUTPUT_DIR": SM_OUTPUT_DIR, + "SM_OUTPUT_FAILURE": SM_OUTPUT_FAILURE, + "SM_OUTPUT_DATA_DIR": SM_OUTPUT_DATA_DIR, + "SM_LOG_LEVEL": SM_LOG_LEVEL, + "SM_MASTER_ADDR": SM_MASTER_ADDR, + "SM_MASTER_PORT": SM_MASTER_PORT, + } + + # SourceCode and DistributedConfig Environment Variables + source_code = read_source_code_json() + if source_code: + env_vars["SM_SOURCE_DIR"] = SM_SOURCE_DIR_PATH + env_vars["SM_ENTRY_SCRIPT"] = source_code.get("entry_script", "") + + distributed = read_distributed_json() + if distributed: + env_vars["SM_DISTRIBUTED_DRIVER_DIR"] = SM_DISTRIBUTED_DRIVER_DIR_PATH + env_vars["SM_DISTRIBUTED_CONFIG"] = distributed + + # Data Channels + channels = list(input_data_config.keys()) + for channel in channels: + env_vars[f"SM_CHANNEL_{channel.upper()}"] = f"{SM_INPUT_DATA_DIR}/{channel}" + env_vars["SM_CHANNELS"] = channels + + # Hyperparameters + hps = deserialize_hyperparameters(hyperparameters_config) + for key, value in hps.items(): + key_upper = key.replace("-", "_").upper() + env_vars[f"SM_HP_{key_upper}"] = value + env_vars["SM_HPS"] = hps + + # Host Variables + current_host = resource_config["current_host"] + current_instance_type = resource_config["current_instance_type"] + hosts = resource_config["hosts"] + sorted_hosts = sorted(hosts) + + env_vars["SM_CURRENT_HOST"] = current_host + env_vars["SM_CURRENT_INSTANCE_TYPE"] = current_instance_type + env_vars["SM_HOSTS"] = sorted_hosts + env_vars["SM_NETWORK_INTERFACE_NAME"] = resource_config["network_interface_name"] + env_vars["SM_HOST_COUNT"] = len(sorted_hosts) + env_vars["SM_CURRENT_HOST_RANK"] = sorted_hosts.index(current_host) + + env_vars["SM_NUM_CPUS"] = num_cpus() + env_vars["SM_NUM_GPUS"] = num_gpus() + env_vars["SM_NUM_NEURONS"] = num_neurons() + + # Misc. + env_vars["SM_RESOURCE_CONFIG"] = resource_config + env_vars["SM_INPUT_DATA_CONFIG"] = input_data_config + + # All Training Environment Variables + env_vars["SM_TRAINING_ENV"] = { + "channel_input_dirs": { + channel: env_vars[f"SM_CHANNEL_{channel.upper()}"] for channel in channels + }, + "current_host": env_vars["SM_CURRENT_HOST"], + "current_instance_type": env_vars["SM_CURRENT_INSTANCE_TYPE"], + "hosts": env_vars["SM_HOSTS"], + "master_addr": env_vars["SM_MASTER_ADDR"], + "master_port": env_vars["SM_MASTER_PORT"], + "hyperparameters": env_vars["SM_HPS"], + "input_data_config": input_data_config, + "input_config_dir": env_vars["SM_INPUT_CONFIG_DIR"], + "input_data_dir": env_vars["SM_INPUT_DATA_DIR"], + "input_dir": env_vars["SM_INPUT_DIR"], + "job_name": os.environ["TRAINING_JOB_NAME"], + "log_level": env_vars["SM_LOG_LEVEL"], + "model_dir": env_vars["SM_MODEL_DIR"], + "network_interface_name": env_vars["SM_NETWORK_INTERFACE_NAME"], + "num_cpus": env_vars["SM_NUM_CPUS"], + "num_gpus": env_vars["SM_NUM_GPUS"], + "num_neurons": env_vars["SM_NUM_NEURONS"], + "output_data_dir": env_vars["SM_OUTPUT_DATA_DIR"], + "resource_config": env_vars["SM_RESOURCE_CONFIG"], + } + with open(output_file, "w") as f: + for key, value in env_vars.items(): + f.write(f"export {key}='{safe_serialize(value)}'\n") + + logger.info("Environment Variables:") + log_env_variables(env_vars_dict=env_vars) + + +def mask_sensitive_info(data): + """Recursively mask sensitive information in a dictionary.""" + if isinstance(data, dict): + for k, v in data.items(): + if isinstance(v, dict): + data[k] = mask_sensitive_info(v) + elif isinstance(v, str) and any( + keyword.lower() in k.lower() for keyword in SENSITIVE_KEYWORDS + ): + data[k] = HIDDEN_VALUE + return data + + +def log_key_value(key: str, value: str): + """Log a key-value pair, masking sensitive values if necessary.""" + if any(keyword.lower() in key.lower() for keyword in SENSITIVE_KEYWORDS): + logger.info("%s=%s", key, HIDDEN_VALUE) + elif isinstance(value, dict): + masked_value = mask_sensitive_info(value) + logger.info("%s=%s", key, json.dumps(masked_value)) + else: + try: + decoded_value = json.loads(value) + if isinstance(decoded_value, dict): + masked_value = mask_sensitive_info(decoded_value) + logger.info("%s=%s", key, json.dumps(masked_value)) + else: + logger.info("%s=%s", key, decoded_value) + except (json.JSONDecodeError, TypeError): + logger.info("%s=%s", key, value) + + +def log_env_variables(env_vars_dict: Dict[str, Any]): + """Log Environment Variables from the environment and an env_vars_dict.""" + for key, value in os.environ.items(): + log_key_value(key, value) + + for key, value in env_vars_dict.items(): + log_key_value(key, value) + + +def main(): + """Main function to set the environment variables for the training job container.""" + with open(RESOURCE_CONFIG, "r") as f: + resource_config = json.load(f) + with open(INPUT_DATA_CONFIG, "r") as f: + input_data_config = json.load(f) + with open(HYPERPARAMETERS_CONFIG, "r") as f: + hyperparameters_config = json.load(f) + + set_env( + resource_config=resource_config, + input_data_config=input_data_config, + hyperparameters_config=hyperparameters_config, + output_file=ENV_OUTPUT_FILE, + ) + + +if __name__ == "__main__": + main() diff --git a/v3-examples/training-examples/aii3xf60/sm_train.sh b/v3-examples/training-examples/aii3xf60/sm_train.sh new file mode 100644 index 0000000000..33ddf13bae --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/sm_train.sh @@ -0,0 +1,59 @@ + +#!/bin/bash +set -e +echo "Starting training script" + +handle_error() { + EXIT_STATUS=$? + echo "An error occurred with exit code $EXIT_STATUS" + if [ ! -s /opt/ml/output/failure ]; then + echo "Training Execution failed. For more details, see CloudWatch logs at 'aws/sagemaker/TrainingJobs'. +TrainingJob - $TRAINING_JOB_NAME" >> /opt/ml/output/failure + fi + exit $EXIT_STATUS +} + +check_python() { + SM_PYTHON_CMD=$(command -v python3 || command -v python) + SM_PIP_CMD=$(command -v pip3 || command -v pip) + + # Check if Python is found + if [[ -z "$SM_PYTHON_CMD" || -z "$SM_PIP_CMD" ]]; then + echo "Error: The Python executable was not found in the system path." + return 1 + fi + + return 0 +} + +trap 'handle_error' ERR + +check_python + +set -x +$SM_PYTHON_CMD --version + +echo "/opt/ml/input/config/resourceconfig.json:" +cat /opt/ml/input/config/resourceconfig.json +echo + +echo "/opt/ml/input/config/inputdataconfig.json:" +cat /opt/ml/input/config/inputdataconfig.json +echo + +echo "Setting up environment variables" +$SM_PYTHON_CMD /opt/ml/input/data/sm_drivers/scripts/environment.py + +set +x +source /opt/ml/input/sm_training.env +set -x + +cd /opt/ml/input/data/code + + + +echo "Running Basic Script driver" +$SM_PYTHON_CMD /opt/ml/input/data/sm_drivers/distributed_drivers/basic_script_driver.py + + +echo "Training Container Execution Completed" diff --git a/v3-examples/training-examples/aii3xf60/sourcecode.json b/v3-examples/training-examples/aii3xf60/sourcecode.json new file mode 100644 index 0000000000..fccf2e3f1b --- /dev/null +++ b/v3-examples/training-examples/aii3xf60/sourcecode.json @@ -0,0 +1 @@ +{"source_dir": "/var/folders/12/bjmscmk114v7hxrzj6v4wd840000gq/T/tmpy87jl49z/source", "requirements": null, "entry_script": "local_training_script.py", "command": null} \ No newline at end of file diff --git a/v3-examples/training-examples/custom-distributed-training-example.ipynb b/v3-examples/training-examples/custom-distributed-training-example.ipynb new file mode 100644 index 0000000000..d0b26aa574 --- /dev/null +++ b/v3-examples/training-examples/custom-distributed-training-example.ipynb @@ -0,0 +1,376 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# SageMaker V3 Custom Distributed Training Example\n", + "\n", + "This notebook demonstrates how to create and use custom distributed training drivers with SageMaker V3 ModelTrainer." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import tempfile\n", + "import shutil\n", + "\n", + "from sagemaker.train.model_trainer import ModelTrainer\n", + "from sagemaker.train.configs import SourceCode\n", + "from sagemaker.train.distributed import DistributedConfig\n", + "from sagemaker.core.helper.session_helper import Session, get_execution_role" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 1: Setup Session and Create Test Files\n", + "\n", + "Initialize the SageMaker session and create the custom distributed driver files." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker_session = Session()\n", + "role = get_execution_role()\n", + "\n", + "DEFAULT_CPU_IMAGE = \"763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0.0-cpu-py310\"\n", + "\n", + "# Create temporary directories\n", + "temp_dir = tempfile.mkdtemp()\n", + "custom_drivers_dir = os.path.join(temp_dir, \"custom_drivers\")\n", + "scripts_dir = os.path.join(temp_dir, \"scripts\")\n", + "\n", + "os.makedirs(custom_drivers_dir, exist_ok=True)\n", + "os.makedirs(scripts_dir, exist_ok=True)\n", + "\n", + "print(f\"Created temporary directories in: {temp_dir}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2: Create Custom Driver and Entry Script\n", + "\n", + "Create the custom driver script and entry script for training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create custom driver script\n", + "driver_script = '''\n", + "import json\n", + "import os\n", + "import subprocess\n", + "import sys\n", + "\n", + "def main():\n", + " driver_config = json.loads(os.environ[\"SM_DISTRIBUTED_CONFIG\"])\n", + " process_count_per_node = driver_config[\"process_count_per_node\"]\n", + " assert process_count_per_node != None\n", + "\n", + " hps = json.loads(os.environ[\"SM_HPS\"])\n", + " assert hps != None\n", + " assert isinstance(hps, dict)\n", + "\n", + " source_dir = os.environ[\"SM_SOURCE_DIR\"]\n", + " assert source_dir == \"/opt/ml/input/data/code\"\n", + " sm_drivers_dir = os.environ[\"SM_DISTRIBUTED_DRIVER_DIR\"]\n", + " assert sm_drivers_dir == \"/opt/ml/input/data/sm_drivers/distributed_drivers\"\n", + "\n", + " entry_script = os.environ[\"SM_ENTRY_SCRIPT\"]\n", + " assert entry_script != None\n", + "\n", + " python = sys.executable\n", + "\n", + " command = [python, entry_script]\n", + " print(f\"Running command: {command}\")\n", + " subprocess.run(command, check=True)\n", + "\n", + "if __name__ == \"__main__\":\n", + " print(\"Running custom driver script\")\n", + " main()\n", + " print(\"Finished running custom driver script\")\n", + "'''\n", + "\n", + "with open(os.path.join(custom_drivers_dir, \"driver.py\"), 'w') as f:\n", + " f.write(driver_script)\n", + "\n", + "print(\"Created custom driver script\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create entry script\n", + "entry_script = '''\n", + "import json\n", + "import os\n", + "import time\n", + "\n", + "def main():\n", + " hps = json.loads(os.environ[\"SM_HPS\"])\n", + " assert hps != None\n", + " print(f\"Hyperparameters: {hps}\")\n", + "\n", + " print(\"Running pseudo training script\")\n", + " for epochs in range(hps[\"epochs\"]):\n", + " print(f\"Epoch: {epochs}\")\n", + " time.sleep(1)\n", + " print(\"Finished running pseudo training script\")\n", + " \n", + " # Save results\n", + " model_dir = os.environ.get(\"SM_MODEL_DIR\", \"/opt/ml/model\")\n", + " os.makedirs(model_dir, exist_ok=True)\n", + " \n", + " results = {\"status\": \"success\", \"epochs_completed\": hps[\"epochs\"]}\n", + " with open(os.path.join(model_dir, \"results.json\"), \"w\") as f:\n", + " json.dump(results, f, indent=2)\n", + "\n", + "if __name__ == \"__main__\":\n", + " main()\n", + "'''\n", + "\n", + "with open(os.path.join(scripts_dir, \"entry_script.py\"), 'w') as f:\n", + " f.write(entry_script)\n", + "\n", + "print(\"Created entry script\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 3: Define Custom Distributed Driver\n", + "\n", + "Create the custom distributed driver class." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class CustomDriver(DistributedConfig):\n", + " process_count_per_node: int = None\n", + "\n", + " @property\n", + " def driver_dir(self) -> str:\n", + " return custom_drivers_dir\n", + "\n", + " @property\n", + " def driver_script(self) -> str:\n", + " return \"driver.py\"\n", + "\n", + "print(\"Custom distributed driver class defined!\")\n", + "print(f\"Driver directory: {custom_drivers_dir}\")\n", + "print(f\"Driver script: driver.py\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 4: Configure Source Code and Hyperparameters\n", + "\n", + "Set up the source code and hyperparameters for training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "source_code = SourceCode(\n", + " source_dir=scripts_dir,\n", + " entry_script=\"entry_script.py\",\n", + ")\n", + "\n", + "hyperparameters = {\"epochs\": 10}\n", + "\n", + "custom_driver = CustomDriver(process_count_per_node=2)\n", + "\n", + "print(f\"Source directory: {scripts_dir}\")\n", + "print(f\"Entry script: entry_script.py\")\n", + "print(f\"Hyperparameters: {hyperparameters}\")\n", + "print(f\"Custom driver: {custom_driver}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 5: Create ModelTrainer with Custom Driver\n", + "\n", + "Initialize ModelTrainer with the custom distributed configuration." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model_trainer = ModelTrainer(\n", + " sagemaker_session=sagemaker_session,\n", + " training_image=DEFAULT_CPU_IMAGE,\n", + " hyperparameters=hyperparameters,\n", + " source_code=source_code,\n", + " distributed=custom_driver,\n", + " base_job_name=\"custom-distributed-driver\",\n", + ")\n", + "\n", + "print(\"ModelTrainer created with custom distributed driver!\")\n", + "print(f\"Job name: custom-distributed-driver\")\n", + "print(f\"Distributed configuration: {model_trainer.distributed}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 6: Run Custom Distributed Training\n", + "\n", + "Start the distributed training job using the custom driver." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Starting custom distributed training...\")\n", + "\n", + "try:\n", + " model_trainer.train()\n", + " print(f\"Custom distributed training completed successfully!\")\n", + " print(f\"Job name: {model_trainer._latest_training_job.training_job_name}\")\n", + " training_successful = True\n", + "except Exception as e:\n", + " print(f\"Training failed with error: {e}\")\n", + " training_successful = False" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 7: Analyze Training Results\n", + "\n", + "Examine the results from the custom distributed training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if training_successful:\n", + " job_name = model_trainer._latest_training_job.training_job_name\n", + " model_artifacts = model_trainer._latest_training_job.model_artifacts\n", + " \n", + " print(\"Custom Distributed Training Results:\")\n", + " print(\"=\" * 40)\n", + " print(f\"Job Name: {job_name}\")\n", + " print(f\"Model Artifacts: {model_artifacts}\")\n", + " print(f\"Training Image: {DEFAULT_CPU_IMAGE}\")\n", + " \n", + " print(\"\\nCustom Driver Configuration:\")\n", + " print(f\"Driver Class: {custom_driver.__class__.__name__}\")\n", + " print(f\"Process Count Per Node: {custom_driver.process_count_per_node}\")\n", + " print(f\"Driver Directory: {custom_driver.driver_dir}\")\n", + " print(f\"Driver Script: {custom_driver.driver_script}\")\n", + " \n", + " print(\"\\nHyperparameters Used:\")\n", + " for key, value in hyperparameters.items():\n", + " print(f\" {key}: {value}\")\n", + " \n", + " print(\"\\n✓ Custom distributed training completed successfully!\")\n", + " \n", + "else:\n", + " print(\"Training was not successful.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 8: Clean Up\n", + "\n", + "Clean up temporary files." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " shutil.rmtree(temp_dir)\n", + " print(f\"Cleaned up temporary directory: {temp_dir}\")\n", + "except Exception as e:\n", + " print(f\"Could not clean up temp directory: {e}\")\n", + "\n", + "print(\"Cleanup completed!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "This notebook demonstrated:\n", + "1. **Custom distributed driver creation**: Extending DistributedConfig for specialized needs\n", + "2. **Driver coordination**: How custom drivers manage training processes\n", + "3. **ModelTrainer integration**: Seamless integration with SageMaker V3 training\n", + "4. **Custom training logic**: Implementing specialized training patterns\n", + "\n", + "Custom distributed drivers provide flexibility for implementing specialized coordination logic, framework integration, and advanced debugging capabilities for distributed training scenarios." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv-test", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/v3-examples/training-examples/distributed-local-training-example.ipynb b/v3-examples/training-examples/distributed-local-training-example.ipynb new file mode 100644 index 0000000000..63f46a31d9 --- /dev/null +++ b/v3-examples/training-examples/distributed-local-training-example.ipynb @@ -0,0 +1,467 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# SageMaker V3 Distributed Local Training Example\n", + "\n", + "This notebook demonstrates how to run distributed training locally using SageMaker V3 ModelTrainer with multiple Docker containers." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import subprocess\n", + "import tempfile\n", + "import shutil\n", + "import numpy as np\n", + "\n", + "from sagemaker.train.model_trainer import ModelTrainer, Mode\n", + "from sagemaker.train.configs import SourceCode, Compute, InputData\n", + "from sagemaker.train.distributed import Torchrun\n", + "from sagemaker.core.helper.session_helper import Session" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# NOTE: Local mode requires Docker to be installed and running.\n", + "import os\n", + "os.environ['PATH'] = '/usr/local/bin:/Applications/Docker.app/Contents/Resources/bin:' + os.environ['PATH']" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 1: Setup Session and Create Test Data\n", + "\n", + "Initialize the SageMaker session and create the necessary test data and training script." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker_session = Session()\n", + "DEFAULT_CPU_IMAGE = \"763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0.0-cpu-py310\"\n", + "\n", + "# Create temporary directories\n", + "temp_dir = tempfile.mkdtemp()\n", + "source_dir = os.path.join(temp_dir, \"source\")\n", + "data_dir = os.path.join(temp_dir, \"data\")\n", + "train_dir = os.path.join(data_dir, \"train\")\n", + "test_dir = os.path.join(data_dir, \"test\")\n", + "\n", + "os.makedirs(source_dir, exist_ok=True)\n", + "os.makedirs(train_dir, exist_ok=True)\n", + "os.makedirs(test_dir, exist_ok=True)\n", + "\n", + "print(f\"Created temporary directories in: {temp_dir}\")\n", + "print(\"Note: This will use multiple Docker containers locally for distributed training!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2: Create Training Data and Scripts\n", + "\n", + "Generate the test data and training scripts needed for distributed local training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create test data\n", + "np.random.seed(42)\n", + "x_train = np.random.randn(100, 4).astype(np.float32)\n", + "y_train = np.random.randn(100).astype(np.float32)\n", + "x_test = np.random.randn(20, 4).astype(np.float32)\n", + "y_test = np.random.randn(20).astype(np.float32)\n", + "\n", + "np.save(os.path.join(train_dir, \"x_train.npy\"), x_train)\n", + "np.save(os.path.join(train_dir, \"y_train.npy\"), y_train)\n", + "np.save(os.path.join(test_dir, \"x_test.npy\"), x_test)\n", + "np.save(os.path.join(test_dir, \"y_test.npy\"), y_test)\n", + "\n", + "print(f\"Created training data: {x_train.shape}, {y_train.shape}\")\n", + "print(f\"Created test data: {x_test.shape}, {y_test.shape}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create pytorch model definition\n", + "pytorch_model_def = '''\n", + "import torch\n", + "import torch.nn as nn\n", + "\n", + "def get_model():\n", + " return nn.Sequential(\n", + " nn.Linear(4, 10),\n", + " nn.ReLU(),\n", + " nn.Linear(10, 1)\n", + " )\n", + "'''\n", + "\n", + "with open(os.path.join(source_dir, \"pytorch_model_def.py\"), 'w') as f:\n", + " f.write(pytorch_model_def)\n", + "\n", + "print(\"Created pytorch_model_def.py\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create training script (same as single container for simplicity)\n", + "training_script = '''\n", + "import argparse\n", + "import numpy as np\n", + "import os\n", + "import sys\n", + "import logging\n", + "import json\n", + "import shutil\n", + "import torch\n", + "import torch.nn as nn\n", + "from torch.utils.data import DataLoader, TensorDataset\n", + "from pytorch_model_def import get_model\n", + "\n", + "logger = logging.getLogger(__name__)\n", + "logger.setLevel(logging.DEBUG)\n", + "logger.addHandler(logging.StreamHandler(sys.stdout))\n", + "current_dir = os.path.dirname(os.path.abspath(__file__))\n", + "data_dir = \"/opt/ml/input/data\"\n", + "\n", + "def get_train_data(train_dir):\n", + " x_train = np.load(os.path.join(train_dir, \"x_train.npy\"))\n", + " y_train = np.load(os.path.join(train_dir, \"y_train.npy\"))\n", + " logger.info(f\"x train: {x_train.shape}, y train: {y_train.shape}\")\n", + " return torch.from_numpy(x_train), torch.from_numpy(y_train)\n", + "\n", + "def get_test_data(test_dir):\n", + " x_test = np.load(os.path.join(test_dir, \"x_test.npy\"))\n", + " y_test = np.load(os.path.join(test_dir, \"y_test.npy\"))\n", + " logger.info(f\"x test: {x_test.shape}, y test: {y_test.shape}\")\n", + " return torch.from_numpy(x_test), torch.from_numpy(y_test)\n", + "\n", + "def train():\n", + " train_dir = os.path.join(data_dir, \"train\")\n", + " test_dir = os.path.join(data_dir, \"test\")\n", + " model_dir = os.environ.get(\"SM_MODEL_DIR\", os.path.join(current_dir, \"data/model\"))\n", + "\n", + " x_train, y_train = get_train_data(train_dir)\n", + " x_test, y_test = get_test_data(test_dir)\n", + " train_ds = TensorDataset(x_train, y_train)\n", + "\n", + " batch_size = 64\n", + " epochs = 1\n", + " learning_rate = 0.1\n", + " logger.info(f\"batch_size = {batch_size}, epochs = {epochs}, learning rate = {learning_rate}\")\n", + "\n", + " train_dl = DataLoader(train_ds, batch_size, shuffle=True)\n", + " device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", + " model = get_model().to(device)\n", + " criterion = nn.MSELoss()\n", + " optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)\n", + "\n", + " for epoch in range(epochs):\n", + " for x_train_batch, y_train_batch in train_dl:\n", + " y = model(x_train_batch.float())\n", + " loss = criterion(y.flatten(), y_train_batch.float())\n", + " optimizer.zero_grad()\n", + " loss.backward()\n", + " optimizer.step()\n", + " epoch += 1\n", + " logger.info(f\"epoch: {epoch} -> loss: {loss}\")\n", + "\n", + " with torch.no_grad():\n", + " y = model(x_test.float()).flatten()\n", + " mse = ((y - y_test) ** 2).sum() / y_test.shape[0]\n", + " print(\"Test MSE:\", mse.numpy())\n", + "\n", + " os.makedirs(model_dir, exist_ok=True)\n", + " torch.save(model.state_dict(), model_dir + \"/model.pth\")\n", + " \n", + " inference_code_path = model_dir + \"/code/\"\n", + " if not os.path.exists(inference_code_path):\n", + " os.mkdir(inference_code_path)\n", + " logger.info(f\"Created a folder at {inference_code_path}!\")\n", + "\n", + " shutil.copy(\"local_training_script.py\", inference_code_path)\n", + " shutil.copy(\"pytorch_model_def.py\", inference_code_path)\n", + " logger.info(f\"Saving models files to {inference_code_path}\")\n", + "\n", + "if __name__ == \"__main__\":\n", + " print(\"Running the training job ...\")\n", + " train()\n", + "'''\n", + "\n", + "with open(os.path.join(source_dir, \"local_training_script.py\"), 'w') as f:\n", + " f.write(training_script)\n", + "\n", + "print(\"Created local_training_script.py\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 3: Configure Distributed Local Training\n", + "\n", + "Set up ModelTrainer for distributed training in local container mode." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "source_code = SourceCode(\n", + " source_dir=source_dir,\n", + " entry_script=\"local_training_script.py\",\n", + ")\n", + "\n", + "distributed = Torchrun(\n", + " process_count_per_node=1,\n", + ")\n", + "\n", + "compute = Compute(\n", + " instance_type=\"local_cpu\",\n", + " instance_count=2,\n", + ")\n", + "\n", + "train_data = InputData(\n", + " channel_name=\"train\",\n", + " data_source=train_dir,\n", + ")\n", + "\n", + "test_data = InputData(\n", + " channel_name=\"test\",\n", + " data_source=test_dir,\n", + ")\n", + "\n", + "print(\"Distributed Local Training Configuration:\")\n", + "print(f\" Containers: {compute.instance_count}\")\n", + "print(f\" Processes per container: {distributed.process_count_per_node}\")\n", + "print(f\" Total processes: {compute.instance_count * distributed.process_count_per_node}\")\n", + "print(f\" Distributed framework: Torchrun\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 4: Create Distributed ModelTrainer\n", + "\n", + "Initialize ModelTrainer for distributed local container training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model_trainer = ModelTrainer(\n", + " training_image=DEFAULT_CPU_IMAGE,\n", + " sagemaker_session=sagemaker_session,\n", + " source_code=source_code,\n", + " distributed=distributed,\n", + " compute=compute,\n", + " input_data_config=[train_data, test_data],\n", + " base_job_name=\"local_mode_multi_container\",\n", + " training_mode=Mode.LOCAL_CONTAINER,\n", + ")\n", + "\n", + "print(\"Distributed ModelTrainer created successfully!\")\n", + "print(f\"Training mode: {Mode.LOCAL_CONTAINER}\")\n", + "print(f\"Distributed config: {distributed}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 5: Run Distributed Local Training\n", + "\n", + "Start the distributed training job using multiple local containers." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Starting distributed local container training...\")\n", + "print(\"This will launch 2 Docker containers with 1 training process each.\")\n", + "\n", + "try:\n", + " model_trainer.train()\n", + " print(\"Distributed local container training completed successfully!\")\n", + " operation_successful = True\n", + "except Exception as e:\n", + " print(f\"Training failed with error: {e}\")\n", + " operation_successful = False" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 6: Check Training Results\n", + "\n", + "Examine the results from distributed training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if operation_successful:\n", + " current_dir = os.getcwd()\n", + " \n", + " print(\"Distributed Training Results:\")\n", + " print(\"=\" * 35)\n", + " \n", + " # Check that certain directories don't exist (cleanup verification)\n", + " assert not os.path.exists(os.path.join(current_dir, \"shared\"))\n", + " assert not os.path.exists(os.path.join(current_dir, \"input\"))\n", + " assert not os.path.exists(os.path.join(current_dir, \"algo-1\"))\n", + " assert not os.path.exists(os.path.join(current_dir, \"algo-2\"))\n", + " print(\"✓ Temporary directories properly cleaned up\")\n", + " \n", + " # Check for expected artifacts\n", + " directories_to_check = [\n", + " \"compressed_artifacts\",\n", + " \"artifacts\",\n", + " \"model\",\n", + " \"output\",\n", + " ]\n", + " \n", + " for directory in directories_to_check:\n", + " path = os.path.join(current_dir, directory)\n", + " if os.path.exists(path):\n", + " print(f\"✓ {directory}: Found\")\n", + " else:\n", + " print(f\"✗ {directory}: Not found\")\n", + " \n", + " print(\"\\nDistributed Training Configuration:\")\n", + " print(f\" Training Image: {DEFAULT_CPU_IMAGE}\")\n", + " print(f\" Container Count: {compute.instance_count}\")\n", + " print(f\" Processes per Container: {distributed.process_count_per_node}\")\n", + " print(f\" Total Training Processes: {compute.instance_count * distributed.process_count_per_node}\")\n", + " \n", + "else:\n", + " print(\"Training was not successful.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 7: Clean Up\n", + "\n", + "Clean up local artifacts and temporary files." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " subprocess.run([\"docker\", \"compose\", \"down\", \"-v\"], check=False)\n", + " print(\"Docker containers stopped\")\n", + "except Exception:\n", + " pass\n", + "\n", + "# Clean up temporary files\n", + "try:\n", + " shutil.rmtree(temp_dir)\n", + " print(f\"Cleaned up temporary directory: {temp_dir}\")\n", + "except Exception as e:\n", + " print(f\"Could not clean up temp directory: {e}\")\n", + "\n", + "# Clean up training artifacts\n", + "current_dir = os.getcwd()\n", + "directories = [\"compressed_artifacts\", \"artifacts\", \"model\", \"output\"]\n", + "\n", + "for directory in directories:\n", + " path = os.path.join(current_dir, directory)\n", + " if os.path.exists(path):\n", + " try:\n", + " shutil.rmtree(path)\n", + " print(f\"Cleaned up: {directory}\")\n", + " except Exception as e:\n", + " print(f\"Could not clean up {directory}: {e}\")\n", + "\n", + "# Final assertion\n", + "assert operation_successful\n", + "print(\"\\n✓ Distributed local training completed successfully!\")\n", + "print(\"Cleanup completed - all artifacts removed.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "This notebook demonstrated:\n", + "1. **Multi-container distributed training**: Running training across multiple Docker containers locally\n", + "2. **Torchrun integration**: Using SageMaker's Torchrun distributed driver\n", + "3. **Local development workflow**: Testing distributed training before cloud deployment\n", + "4. **Proper cleanup**: Following cleanup patterns for local artifacts\n", + "\n", + "Distributed local training provides a great way to test distributed training patterns locally before deploying to SageMaker cloud instances, with no AWS costs and realistic container-based execution." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv-test", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/v3-examples/training-examples/docker-compose.yaml b/v3-examples/training-examples/docker-compose.yaml new file mode 100644 index 0000000000..5f4e35545b --- /dev/null +++ b/v3-examples/training-examples/docker-compose.yaml @@ -0,0 +1,54 @@ +networks: + sagemaker-local: + name: sagemaker-local +services: + algo-1: + entrypoint: + - /bin/bash + - -c + - chmod +x /opt/ml/input/data/sm_drivers/sm_train.sh && /opt/ml/input/data/sm_drivers/sm_train.sh + environment: + - TRAINING_JOB_NAME=local-mode-multi-container-20251124175521 + - AWS_REGION=us-west-2 + - AWS_ACCESS_KEY_ID= + - AWS_SECRET_ACCESS_KEY= + - AWS_SESSION_TOKEN= + image: 763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0.0-cpu-py310 + networks: + sagemaker-local: + aliases: + - algo-1 + volumes: + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/model:/opt/ml/model + - /var/folders/12/bjmscmk114v7hxrzj6v4wd840000gq/T/tmp2xe4qebu/data/train:/opt/ml/input/data/train + - /var/folders/12/bjmscmk114v7hxrzj6v4wd840000gq/T/tmp2xe4qebu/data/test:/opt/ml/input/data/test + - /var/folders/12/bjmscmk114v7hxrzj6v4wd840000gq/T/tmp2xe4qebu/source:/opt/ml/input/data/code + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/nfw8umuu:/opt/ml/input/data/sm_drivers + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/algo-1/output:/opt/ml/output + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/algo-1/output/data:/opt/ml/output/data + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/algo-1/input:/opt/ml/input + algo-2: + entrypoint: + - /bin/bash + - -c + - chmod +x /opt/ml/input/data/sm_drivers/sm_train.sh && /opt/ml/input/data/sm_drivers/sm_train.sh + environment: + - TRAINING_JOB_NAME=local-mode-multi-container-20251124175521 + - AWS_REGION=us-west-2 + - AWS_ACCESS_KEY_ID= + - AWS_SECRET_ACCESS_KEY= + - AWS_SESSION_TOKEN= + image: 763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0.0-cpu-py310 + networks: + sagemaker-local: + aliases: + - algo-2 + volumes: + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/model:/opt/ml/model + - /var/folders/12/bjmscmk114v7hxrzj6v4wd840000gq/T/tmp2xe4qebu/data/train:/opt/ml/input/data/train + - /var/folders/12/bjmscmk114v7hxrzj6v4wd840000gq/T/tmp2xe4qebu/data/test:/opt/ml/input/data/test + - /var/folders/12/bjmscmk114v7hxrzj6v4wd840000gq/T/tmp2xe4qebu/source:/opt/ml/input/data/code + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/nfw8umuu:/opt/ml/input/data/sm_drivers + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/algo-2/output:/opt/ml/output + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/algo-2/output/data:/opt/ml/output/data + - /Users/aruthen/amazon/sagemaker-python-sdk/v3-examples/training-examples/algo-2/input:/opt/ml/input \ No newline at end of file diff --git a/v3-examples/training-examples/hyperparameter-training-example.ipynb b/v3-examples/training-examples/hyperparameter-training-example.ipynb new file mode 100644 index 0000000000..f884ec3086 --- /dev/null +++ b/v3-examples/training-examples/hyperparameter-training-example.ipynb @@ -0,0 +1,372 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# SageMaker V3 Hyperparameter Training Example\n", + "\n", + "This notebook demonstrates hyperparameter handling in SageMaker V3 ModelTrainer using JSON and YAML files." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import json\n", + "import yaml\n", + "import tempfile\n", + "import shutil\n", + "\n", + "from sagemaker.train.model_trainer import ModelTrainer\n", + "from sagemaker.train.configs import SourceCode\n", + "from sagemaker.core.helper.session_helper import Session, get_execution_role" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 1: Setup Session and Create Test Files\n", + "\n", + "Initialize the SageMaker session and create the hyperparameter files and training script." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker_session = Session()\n", + "role = get_execution_role()\n", + "\n", + "# Expected hyperparameters\n", + "EXPECTED_HYPERPARAMETERS = {\n", + " \"integer\": 1,\n", + " \"boolean\": True,\n", + " \"float\": 3.14,\n", + " \"string\": \"Hello World\",\n", + " \"list\": [1, 2, 3],\n", + " \"dict\": {\n", + " \"string\": \"value\",\n", + " \"integer\": 3,\n", + " \"float\": 3.14,\n", + " \"list\": [1, 2, 3],\n", + " \"dict\": {\"key\": \"value\"},\n", + " \"boolean\": True,\n", + " },\n", + "}\n", + "\n", + "DEFAULT_CPU_IMAGE = \"763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0.0-cpu-py310\"\n", + "\n", + "# Create temporary directory\n", + "temp_dir = tempfile.mkdtemp()\n", + "source_dir = os.path.join(temp_dir, \"source\")\n", + "os.makedirs(source_dir, exist_ok=True)\n", + "\n", + "print(f\"Created temporary directory: {temp_dir}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2: Create Hyperparameter Files and Training Script\n", + "\n", + "Create JSON and YAML hyperparameter files and a training script that validates them." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create JSON hyperparameters file\n", + "json_file = os.path.join(source_dir, \"hyperparameters.json\")\n", + "with open(json_file, 'w') as f:\n", + " json.dump(EXPECTED_HYPERPARAMETERS, f, indent=2)\n", + "\n", + "# Create YAML hyperparameters file\n", + "yaml_file = os.path.join(source_dir, \"hyperparameters.yaml\")\n", + "with open(yaml_file, 'w') as f:\n", + " yaml.dump(EXPECTED_HYPERPARAMETERS, f, default_flow_style=False, indent=2)\n", + "\n", + "print(\"Created hyperparameter files\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create training script that validates hyperparameters\n", + "training_script = '''\n", + "import argparse\n", + "import json\n", + "import os\n", + "\n", + "EXPECTED_HYPERPARAMETERS = {\n", + " \"integer\": 1,\n", + " \"boolean\": True,\n", + " \"float\": 3.14,\n", + " \"string\": \"Hello World\",\n", + " \"list\": [1, 2, 3],\n", + " \"dict\": {\n", + " \"string\": \"value\",\n", + " \"integer\": 3,\n", + " \"float\": 3.14,\n", + " \"list\": [1, 2, 3],\n", + " \"dict\": {\"key\": \"value\"},\n", + " \"boolean\": True,\n", + " },\n", + "}\n", + "\n", + "def parse_args():\n", + " parser = argparse.ArgumentParser(description=\"Test Hyperparameters\")\n", + " parser.add_argument(\"--string\", type=str, required=True)\n", + " parser.add_argument(\"--integer\", type=int, required=True)\n", + " parser.add_argument(\"--float\", type=float, required=True)\n", + " parser.add_argument(\"--boolean\", type=lambda x: json.loads(x), required=True)\n", + " parser.add_argument(\"--list\", type=lambda x: json.loads(x), required=True)\n", + " parser.add_argument(\"--dict\", type=lambda x: json.loads(x), required=True)\n", + " return parser.parse_args()\n", + "\n", + "def main():\n", + " args = parse_args()\n", + " print(f\"Received hyperparameters: {args}\")\n", + "\n", + " # Validate hyperparameters\n", + " assert args.string == EXPECTED_HYPERPARAMETERS[\"string\"]\n", + " assert args.integer == EXPECTED_HYPERPARAMETERS[\"integer\"]\n", + " assert args.boolean == EXPECTED_HYPERPARAMETERS[\"boolean\"]\n", + " assert args.float == EXPECTED_HYPERPARAMETERS[\"float\"]\n", + " assert args.list == EXPECTED_HYPERPARAMETERS[\"list\"]\n", + " assert args.dict == EXPECTED_HYPERPARAMETERS[\"dict\"]\n", + "\n", + " # Validate environment variables\n", + " params = json.loads(os.environ[\"SM_HPS\"])\n", + " print(f\"SM_HPS: {params}\")\n", + " assert params == EXPECTED_HYPERPARAMETERS\n", + "\n", + " print(\"All hyperparameter validations passed!\")\n", + " \n", + " # Save results\n", + " model_dir = os.environ.get(\"SM_MODEL_DIR\", \"/opt/ml/model\")\n", + " os.makedirs(model_dir, exist_ok=True)\n", + " \n", + " results = {\"status\": \"success\", \"hyperparameters\": params}\n", + " with open(os.path.join(model_dir, \"results.json\"), \"w\") as f:\n", + " json.dump(results, f, indent=2)\n", + "\n", + "if __name__ == \"__main__\":\n", + " main()\n", + "'''\n", + "\n", + "with open(os.path.join(source_dir, \"train.py\"), 'w') as f:\n", + " f.write(training_script)\n", + "\n", + "# Create requirements file\n", + "with open(os.path.join(source_dir, \"requirements.txt\"), 'w') as f:\n", + " f.write(\"omegaconf\\n\")\n", + "\n", + "print(\"Created training script and requirements\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 3: Training with JSON Hyperparameters\n", + "\n", + "Train a model using hyperparameters loaded from a JSON file." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "source_code = SourceCode(\n", + " source_dir=source_dir,\n", + " requirements=\"requirements.txt\",\n", + " entry_script=\"train.py\",\n", + ")\n", + "\n", + "json_trainer = ModelTrainer(\n", + " sagemaker_session=sagemaker_session,\n", + " training_image=DEFAULT_CPU_IMAGE,\n", + " hyperparameters=json_file,\n", + " source_code=source_code,\n", + " base_job_name=\"hp-contract-hyperparameter-json\",\n", + ")\n", + "\n", + "print(\"ModelTrainer created with JSON hyperparameters!\")\n", + "print(f\"Hyperparameters loaded: {json_trainer.hyperparameters}\")\n", + "\n", + "# Verify hyperparameters match expected values\n", + "assert json_trainer.hyperparameters == EXPECTED_HYPERPARAMETERS\n", + "print(\"✓ JSON hyperparameters match expected values!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Starting training with JSON hyperparameters...\")\n", + "\n", + "json_trainer.train()\n", + "print(f\"JSON hyperparameters training completed: {json_trainer._latest_training_job.training_job_name}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 4: Training with YAML Hyperparameters\n", + "\n", + "Train a model using hyperparameters loaded from a YAML file." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "yaml_trainer = ModelTrainer(\n", + " sagemaker_session=sagemaker_session,\n", + " training_image=DEFAULT_CPU_IMAGE,\n", + " hyperparameters=yaml_file,\n", + " source_code=source_code,\n", + " base_job_name=\"hp-contract-hyperparameter-yaml\",\n", + ")\n", + "\n", + "print(\"ModelTrainer created with YAML hyperparameters!\")\n", + "print(f\"Hyperparameters loaded: {yaml_trainer.hyperparameters}\")\n", + "\n", + "# Verify hyperparameters match expected values\n", + "assert yaml_trainer.hyperparameters == EXPECTED_HYPERPARAMETERS\n", + "print(\"✓ YAML hyperparameters match expected values!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Starting training with YAML hyperparameters...\")\n", + "\n", + "yaml_trainer.train()\n", + "print(f\"YAML hyperparameters training completed: {yaml_trainer._latest_training_job.training_job_name}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 5: Compare Training Results\n", + "\n", + "Compare the results from both hyperparameter approaches." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "training_jobs = [\n", + " (\"JSON File\", json_trainer),\n", + " (\"YAML File\", yaml_trainer)\n", + "]\n", + "\n", + "print(\"Training Job Comparison:\")\n", + "print(\"=\" * 40)\n", + "\n", + "for approach_name, trainer in training_jobs:\n", + " job_name = trainer._latest_training_job.training_job_name\n", + " model_artifacts = trainer._latest_training_job.model_artifacts\n", + " \n", + " print(f\"\\n{approach_name}:\")\n", + " print(f\" Job Name: {job_name}\")\n", + " print(f\" Model Artifacts: {model_artifacts}\")\n", + " print(f\" Status: Completed\")\n", + " \n", + " # Verify all hyperparameters are identical\n", + " assert trainer.hyperparameters == EXPECTED_HYPERPARAMETERS\n", + "\n", + "print(\"\\n✓ All training jobs completed successfully with identical hyperparameters!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 6: Clean Up\n", + "\n", + "Clean up temporary files." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " shutil.rmtree(temp_dir)\n", + " print(f\"Cleaned up temporary directory: {temp_dir}\")\n", + "except Exception as e:\n", + " print(f\"Could not clean up temp directory: {e}\")\n", + "\n", + "print(\"Cleanup completed!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "This notebook demonstrated:\n", + "1. **JSON hyperparameters**: Loading from JSON files\n", + "2. **YAML hyperparameters**: Loading from YAML files\n", + "3. **Validation**: Ensuring loaded hyperparameters match expected values\n", + "4. **File-based configuration**: Managing hyperparameters as external files\n", + "\n", + "File-based hyperparameters provide better version control, reproducibility, and support for complex nested structures." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv-test", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/v3-examples/training-examples/jumpstart-training-example.ipynb b/v3-examples/training-examples/jumpstart-training-example.ipynb new file mode 100644 index 0000000000..b505d4b142 --- /dev/null +++ b/v3-examples/training-examples/jumpstart-training-example.ipynb @@ -0,0 +1,329 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# SageMaker V3 JumpStart Training Example\n", + "\n", + "This notebook demonstrates how to use SageMaker V3 ModelTrainer with JumpStart models for easy model training and fine-tuning." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Prerequisites\n", + "Note: Ensure you have sagemaker-train and ipywidgets installed in your environment. The ipywidgets package is required to monitor training job progress in Jupyter notebooks." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Import required libraries\n", + "import json\n", + "import uuid\n", + "\n", + "from sagemaker.train.model_trainer import ModelTrainer\n", + "from sagemaker.core.jumpstart import JumpStartConfig\n", + "from sagemaker.core.helper.session_helper import Session, get_execution_role" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 1: Setup Session and Configuration\n", + "\n", + "Initialize the SageMaker session and define our training configuration." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize SageMaker session\n", + "sagemaker_session = Session()\n", + "role = get_execution_role()\n", + "\n", + "# Configuration\n", + "JOB_NAME_PREFIX = \"js-v3-training-example\"\n", + "\n", + "# Generate unique identifier\n", + "unique_id = str(uuid.uuid4())[:8]\n", + "base_job_name = f\"{JOB_NAME_PREFIX}-{unique_id}\"\n", + "\n", + "print(f\"Base job name: {base_job_name}\")\n", + "print(f\"SageMaker execution role: {role}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2: Train HuggingFace BERT Model\n", + "\n", + "Train a HuggingFace BERT model for text classification using JumpStart." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Configure JumpStart for HuggingFace BERT\n", + "bert_jumpstart_config = JumpStartConfig(\n", + " model_id=\"huggingface-spc-bert-base-cased\",\n", + " accept_eula=False # This model doesn't require EULA acceptance\n", + ")\n", + "\n", + "# Create ModelTrainer from JumpStart config\n", + "bert_trainer = ModelTrainer.from_jumpstart_config(\n", + " jumpstart_config=bert_jumpstart_config,\n", + " base_job_name=f\"{base_job_name}-bert\",\n", + " hyperparameters={\n", + " \"epochs\": 1, # Set to 1 for quick demonstration\n", + " \"learning_rate\": 5e-5,\n", + " \"train_batch_size\": 32\n", + " },\n", + " sagemaker_session=sagemaker_session\n", + ")\n", + "\n", + "print(\"BERT ModelTrainer created successfully from JumpStart config!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Start BERT training\n", + "print(\"Starting BERT training job...\")\n", + "print(\"Note: This will use the default JumpStart dataset and may take 10-15 minutes.\")\n", + "\n", + "bert_trainer.train()\n", + "print(f\"BERT training job completed!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 3: Train XGBoost Classification Model\n", + "\n", + "Train an XGBoost model for classification tasks using JumpStart." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Configure JumpStart for XGBoost\n", + "xgboost_jumpstart_config = JumpStartConfig(\n", + " model_id=\"xgboost-classification-model\"\n", + ")\n", + "\n", + "# Create ModelTrainer from JumpStart config\n", + "xgboost_trainer = ModelTrainer.from_jumpstart_config(\n", + " jumpstart_config=xgboost_jumpstart_config,\n", + " base_job_name=f\"{base_job_name}-xgboost\",\n", + " hyperparameters={\n", + " \"num_round\": 10, # Reduced for quick demonstration\n", + " \"max_depth\": 5,\n", + " \"eta\": 0.2,\n", + " \"objective\": \"binary:logistic\"\n", + " },\n", + " sagemaker_session=sagemaker_session\n", + ")\n", + "\n", + "print(\"XGBoost ModelTrainer created successfully from JumpStart config!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Start XGBoost training\n", + "print(\"Starting XGBoost training job...\")\n", + "print(\"Note: This will use the default JumpStart dataset and should complete in 5-10 minutes.\")\n", + "\n", + "xgboost_trainer.train()\n", + "print(f\"XGBoost training job completed!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 4: Train CatBoost Regression Model\n", + "\n", + "Train a CatBoost model for regression tasks using JumpStart." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Configure JumpStart for CatBoost\n", + "catboost_jumpstart_config = JumpStartConfig(\n", + " model_id=\"catboost-regression-model\"\n", + ")\n", + "\n", + "# Create ModelTrainer from JumpStart config\n", + "catboost_trainer = ModelTrainer.from_jumpstart_config(\n", + " jumpstart_config=catboost_jumpstart_config,\n", + " base_job_name=f\"{base_job_name}-catboost\",\n", + " hyperparameters={\n", + " \"iterations\": 50, # Reduced for quick demonstration\n", + " \"learning_rate\": 0.1,\n", + " \"depth\": 6,\n", + " \"loss_function\": \"RMSE\"\n", + " },\n", + " sagemaker_session=sagemaker_session\n", + ")\n", + "\n", + "print(\"CatBoost ModelTrainer created successfully from JumpStart config!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Start CatBoost training\n", + "print(\"Starting CatBoost training job...\")\n", + "print(\"Note: This will use the default JumpStart dataset and should complete in 5-10 minutes.\")\n", + "\n", + "catboost_trainer.train()\n", + "print(f\"CatBoost training job completed!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 5: Review Training Results\n", + "\n", + "Check the status and results of our training jobs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Display training job information\n", + "training_jobs = [\n", + " (\"BERT\", bert_trainer),\n", + " (\"XGBoost\", xgboost_trainer),\n", + " (\"CatBoost\", catboost_trainer)\n", + "]\n", + "\n", + "print(\"Training Job Summary:\")\n", + "print(\"=\" * 50)\n", + "\n", + "for model_name, trainer in training_jobs:\n", + " job_name = trainer._latest_training_job.training_job_name\n", + " model_artifacts = trainer._latest_training_job.model_artifacts\n", + " \n", + " print(f\"\\n{model_name} Model:\")\n", + " print(f\" Job Name: {job_name}\")\n", + " print(f\" Model Artifacts: {model_artifacts}\")\n", + " print(f\" Status: Completed\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 6: Access Training Metrics (Optional)\n", + "\n", + "View training metrics and logs from CloudWatch." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Example: Access training job details\n", + "print(\"Training Job Details:\")\n", + "print(\"\\nTo view detailed training metrics and logs:\")\n", + "print(\"1. Go to the SageMaker Console\")\n", + "print(\"2. Navigate to 'Training' > 'Training jobs'\")\n", + "print(\"3. Search for jobs with prefix:\", base_job_name)\n", + "print(\"4. Click on any job to view metrics, logs, and model artifacts\")\n", + "\n", + "# You can also access logs programmatically\n", + "print(\"\\nProgrammatic access to logs:\")\n", + "for model_name, trainer in training_jobs:\n", + " print(f\"{model_name}: trainer.latest_training_job.describe()\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "This notebook demonstrated:\n", + "1. Creating ModelTrainer instances from JumpStart configurations\n", + "2. Training multiple model types (BERT, XGBoost, CatBoost) with custom hyperparameters\n", + "3. Using JumpStart's built-in datasets and training scripts\n", + "4. Monitoring training job progress and results\n", + "\n", + "## Benefits of JumpStart Training:\n", + "- **Pre-configured models**: No need to write training scripts or handle data preprocessing\n", + "- **Best practices**: Optimized hyperparameters and training configurations\n", + "- **Multiple frameworks**: Support for HuggingFace, XGBoost, CatBoost, and more\n", + "- **Easy customization**: Override hyperparameters while keeping proven defaults\n", + "- **Built-in datasets**: Start training immediately with curated datasets\n", + "\n", + "## Next Steps:\n", + "- Deploy trained models using SageMaker V3 ModelBuilder\n", + "- Fine-tune models with your own datasets\n", + "- Experiment with different hyperparameters\n", + "- Set up automated training pipelines\n", + "\n", + "JumpStart training with V3 ModelTrainer makes it incredibly easy to get started with machine learning while maintaining the flexibility to customize as needed!" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv-test", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/v3-examples/training-examples/local-training-example.ipynb b/v3-examples/training-examples/local-training-example.ipynb new file mode 100644 index 0000000000..0c2f09ccfe --- /dev/null +++ b/v3-examples/training-examples/local-training-example.ipynb @@ -0,0 +1,454 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# SageMaker V3 Local Training Example\n", + "\n", + "This notebook demonstrates how to use SageMaker V3 ModelTrainer in Local Container mode for testing training jobs in Docker containers locally." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import subprocess\n", + "import tempfile\n", + "import shutil\n", + "import numpy as np\n", + "\n", + "from sagemaker.train.model_trainer import ModelTrainer, Mode\n", + "from sagemaker.train.configs import SourceCode, Compute, InputData\n", + "from sagemaker.core.helper.session_helper import Session" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# NOTE: Local mode requires Docker to be installed and running.\n", + "import os\n", + "os.environ['PATH'] = '/usr/local/bin:/Applications/Docker.app/Contents/Resources/bin:' + os.environ['PATH']" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 1: Setup Session and Create Test Data\n", + "\n", + "Initialize the SageMaker session and create the necessary test data and training script." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker_session = Session()\n", + "region = sagemaker_session.boto_region_name\n", + "\n", + "# Get the correct ECR image for your region\n", + "from sagemaker.core import image_uris\n", + "DEFAULT_CPU_IMAGE = image_uris.retrieve(\n", + " framework=\"pytorch\",\n", + " region=region,\n", + " version=\"2.0.0\",\n", + " py_version=\"py310\",\n", + " instance_type=\"ml.m5.xlarge\",\n", + " image_scope=\"training\"\n", + ")\n", + "\n", + "# Set Docker platform for Apple Silicon compatibility\n", + "import platform\n", + "if platform.machine() == 'arm64':\n", + " os.environ['DOCKER_DEFAULT_PLATFORM'] = 'linux/amd64'\n", + "\n", + "# Create temporary directories\n", + "temp_dir = tempfile.mkdtemp()\n", + "source_dir = os.path.join(temp_dir, \"source\")\n", + "data_dir = os.path.join(temp_dir, \"data\")\n", + "train_dir = os.path.join(data_dir, \"train\")\n", + "test_dir = os.path.join(data_dir, \"test\")\n", + "\n", + "os.makedirs(source_dir, exist_ok=True)\n", + "os.makedirs(train_dir, exist_ok=True)\n", + "os.makedirs(test_dir, exist_ok=True)\n", + "\n", + "print(f\"Created temporary directories in: {temp_dir}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2: Create Training Data and Scripts\n", + "\n", + "Generate the test data and training scripts needed for local container training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create test data\n", + "np.random.seed(42)\n", + "x_train = np.random.randn(100, 4).astype(np.float32)\n", + "y_train = np.random.randn(100).astype(np.float32)\n", + "x_test = np.random.randn(20, 4).astype(np.float32)\n", + "y_test = np.random.randn(20).astype(np.float32)\n", + "\n", + "np.save(os.path.join(train_dir, \"x_train.npy\"), x_train)\n", + "np.save(os.path.join(train_dir, \"y_train.npy\"), y_train)\n", + "np.save(os.path.join(test_dir, \"x_test.npy\"), x_test)\n", + "np.save(os.path.join(test_dir, \"y_test.npy\"), y_test)\n", + "\n", + "print(f\"Created training data: {x_train.shape}, {y_train.shape}\")\n", + "print(f\"Created test data: {x_test.shape}, {y_test.shape}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create pytorch model definition\n", + "pytorch_model_def = '''\n", + "import torch\n", + "import torch.nn as nn\n", + "\n", + "def get_model():\n", + " return nn.Sequential(\n", + " nn.Linear(4, 10),\n", + " nn.ReLU(),\n", + " nn.Linear(10, 1)\n", + " )\n", + "'''\n", + "\n", + "with open(os.path.join(source_dir, \"pytorch_model_def.py\"), 'w') as f:\n", + " f.write(pytorch_model_def)\n", + "\n", + "print(\"Created pytorch_model_def.py\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create training script\n", + "training_script = '''\n", + "import argparse\n", + "import numpy as np\n", + "import os\n", + "import sys\n", + "import logging\n", + "import json\n", + "import shutil\n", + "import torch\n", + "import torch.nn as nn\n", + "from torch.utils.data import DataLoader, TensorDataset\n", + "from pytorch_model_def import get_model\n", + "\n", + "logger = logging.getLogger(__name__)\n", + "logger.setLevel(logging.DEBUG)\n", + "logger.addHandler(logging.StreamHandler(sys.stdout))\n", + "current_dir = os.path.dirname(os.path.abspath(__file__))\n", + "data_dir = \"/opt/ml/input/data\"\n", + "\n", + "def get_train_data(train_dir):\n", + " x_train = np.load(os.path.join(train_dir, \"x_train.npy\"))\n", + " y_train = np.load(os.path.join(train_dir, \"y_train.npy\"))\n", + " logger.info(f\"x train: {x_train.shape}, y train: {y_train.shape}\")\n", + " return torch.from_numpy(x_train), torch.from_numpy(y_train)\n", + "\n", + "def get_test_data(test_dir):\n", + " x_test = np.load(os.path.join(test_dir, \"x_test.npy\"))\n", + " y_test = np.load(os.path.join(test_dir, \"y_test.npy\"))\n", + " logger.info(f\"x test: {x_test.shape}, y test: {y_test.shape}\")\n", + " return torch.from_numpy(x_test), torch.from_numpy(y_test)\n", + "\n", + "def train():\n", + " train_dir = os.path.join(data_dir, \"train\")\n", + " test_dir = os.path.join(data_dir, \"test\")\n", + " model_dir = os.environ.get(\"SM_MODEL_DIR\", os.path.join(current_dir, \"data/model\"))\n", + "\n", + " x_train, y_train = get_train_data(train_dir)\n", + " x_test, y_test = get_test_data(test_dir)\n", + " train_ds = TensorDataset(x_train, y_train)\n", + "\n", + " batch_size = 64\n", + " epochs = 1\n", + " learning_rate = 0.1\n", + " logger.info(f\"batch_size = {batch_size}, epochs = {epochs}, learning rate = {learning_rate}\")\n", + "\n", + " train_dl = DataLoader(train_ds, batch_size, shuffle=True)\n", + " device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", + " model = get_model().to(device)\n", + " criterion = nn.MSELoss()\n", + " optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)\n", + "\n", + " for epoch in range(epochs):\n", + " for x_train_batch, y_train_batch in train_dl:\n", + " y = model(x_train_batch.float())\n", + " loss = criterion(y.flatten(), y_train_batch.float())\n", + " optimizer.zero_grad()\n", + " loss.backward()\n", + " optimizer.step()\n", + " epoch += 1\n", + " logger.info(f\"epoch: {epoch} -> loss: {loss}\")\n", + "\n", + " with torch.no_grad():\n", + " y = model(x_test.float()).flatten()\n", + " mse = ((y - y_test) ** 2).sum() / y_test.shape[0]\n", + " print(\"Test MSE:\", mse.numpy())\n", + "\n", + " os.makedirs(model_dir, exist_ok=True)\n", + " torch.save(model.state_dict(), model_dir + \"/model.pth\")\n", + " \n", + " inference_code_path = model_dir + \"/code/\"\n", + " if not os.path.exists(inference_code_path):\n", + " os.mkdir(inference_code_path)\n", + " logger.info(f\"Created a folder at {inference_code_path}!\")\n", + "\n", + " shutil.copy(\"local_training_script.py\", inference_code_path)\n", + " shutil.copy(\"pytorch_model_def.py\", inference_code_path)\n", + " logger.info(f\"Saving models files to {inference_code_path}\")\n", + "\n", + "if __name__ == \"__main__\":\n", + " print(\"Running the training job ...\")\n", + " train()\n", + "'''\n", + "\n", + "with open(os.path.join(source_dir, \"local_training_script.py\"), 'w') as f:\n", + " f.write(training_script)\n", + "\n", + "print(\"Created local_training_script.py\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 3: Configure Local Container Training\n", + "\n", + "Set up ModelTrainer to run in LOCAL_CONTAINER mode." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "source_code = SourceCode(\n", + " source_dir=source_dir,\n", + " entry_script=\"local_training_script.py\",\n", + ")\n", + "\n", + "compute = Compute(\n", + " instance_type=\"local_cpu\",\n", + " instance_count=1,\n", + ")\n", + "\n", + "train_data = InputData(\n", + " channel_name=\"train\",\n", + " data_source=train_dir,\n", + ")\n", + "\n", + "test_data = InputData(\n", + " channel_name=\"test\",\n", + " data_source=test_dir,\n", + ")\n", + "\n", + "print(\"Configuration complete\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 4: Create ModelTrainer\n", + "\n", + "Initialize ModelTrainer with the local container configuration." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model_trainer = ModelTrainer(\n", + " training_image=DEFAULT_CPU_IMAGE,\n", + " sagemaker_session=sagemaker_session,\n", + " source_code=source_code,\n", + " compute=compute,\n", + " input_data_config=[train_data, test_data],\n", + " base_job_name=\"local_mode_single_container_local_data\",\n", + " training_mode=Mode.LOCAL_CONTAINER,\n", + ")\n", + "\n", + "print(\"ModelTrainer created with LOCAL_CONTAINER mode!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 5: Run Local Container Training\n", + "\n", + "Start the training job in local Docker container." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Starting local container training...\")\n", + "\n", + "try:\n", + " model_trainer.train()\n", + " print(\"Local container training completed successfully!\")\n", + " operation_successful = True\n", + "except Exception as e:\n", + " print(f\"Training failed with error: {e}\")\n", + " operation_successful = False" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 6: Check Training Results\n", + "\n", + "Examine the training artifacts created by local container training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if operation_successful:\n", + " current_dir = os.getcwd()\n", + " \n", + " directories_to_check = [\n", + " \"compressed_artifacts\",\n", + " \"artifacts\", \n", + " \"model\",\n", + " \"output\",\n", + " ]\n", + " \n", + " print(\"Training Results:\")\n", + " for directory in directories_to_check:\n", + " path = os.path.join(current_dir, directory)\n", + " if os.path.exists(path):\n", + " print(f\"✓ {directory}: Found\")\n", + " if os.path.isdir(path):\n", + " files = os.listdir(path)\n", + " print(f\" Contents: {files}\")\n", + " else:\n", + " print(f\"✗ {directory}: Not found\")\n", + " \n", + " print(\"Local container training completed successfully!\")\n", + "else:\n", + " print(\"Training was not successful.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 7: Clean Up\n", + "\n", + "Clean up local artifacts and temporary files." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " subprocess.run([\"docker\", \"compose\", \"down\", \"-v\"], check=False)\n", + " print(\"Docker containers stopped\")\n", + "except Exception:\n", + " pass\n", + "\n", + "# Clean up temporary files\n", + "try:\n", + " shutil.rmtree(temp_dir)\n", + " print(f\"Cleaned up temporary directory: {temp_dir}\")\n", + "except Exception as e:\n", + " print(f\"Could not clean up temp directory: {e}\")\n", + "\n", + "# Clean up training artifacts\n", + "current_dir = os.getcwd()\n", + "directories = [\"compressed_artifacts\", \"artifacts\", \"model\", \"output\"]\n", + "\n", + "for directory in directories:\n", + " path = os.path.join(current_dir, directory)\n", + " if os.path.exists(path):\n", + " try:\n", + " shutil.rmtree(path)\n", + " print(f\"Cleaned up: {directory}\")\n", + " except Exception as e:\n", + " print(f\"Could not clean up {directory}: {e}\")\n", + "\n", + "print(\"Cleanup completed!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "This notebook demonstrated:\n", + "1. **Local container training**: Running training in Docker containers locally\n", + "2. **Data preparation**: Creating test data and training scripts\n", + "3. **Artifact management**: Checking and cleaning up training artifacts\n", + "4. **Docker integration**: Proper container lifecycle management\n", + "\n", + "Local container training provides a great way to test training jobs locally before deploying to SageMaker cloud instances." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv-test", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}